class ZitadelIntrospectToken(OAuth2AuthorizationCodeBearer):
def __init__(
self,
base_url: str,
private_key: ApiPrivateKey,
scopes: dict[str, str] = None,
):
super().__init__(
authorizationUrl=f"#{base_url}/oauth/v2/authorize",
tokenUrl=f"#{base_url}/oauth/v2/token",
refreshUrl=f"#{base_url}/oauth/v2/token",
scopes=scopes,
auto_error=True
)
self.base_url = base_url
self.private_key = private_key
self.scopes = scopes
async def __call__(self, request: Request) -> Optional[str]:
token = await super().__call__(request)
if not token:
return None
now = time.time()
resp = requests.post(
url=f'{self.base_url}/oauth/v2/introspect',
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": jwt.encode(
header={"alg": self.private_key.alg, "kid": self.private_key.key_id},
payload={
"iss": self.private_key.client_id,
"sub": self.private_key.client_id,
"aud": self.private_key.aud,
"exp": int(now) + self.private_key.exp_in,
"iat": int(now),
},
key=self.private_key.key,
),
"token": token,
}
)
return self.process_response(resp)
def process_response(self, resp):
if resp.status_code != 200:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
introspected_token = resp.json()
if not introspected_token:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Token was revoked.",
headers={"WWW-Authenticate": "Bearer"},
)
if not introspected_token.get("active"):
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Token is inactive.",
headers={"WWW-Authenticate": "Bearer"},
)
now = int(time.time())
if introspected_token["exp"] < now:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Token has expired.",
headers={"WWW-Authenticate": "Bearer"},
)
if not match_token_scopes(introspected_token, self.scopes):
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail=f"Token has insufficient scope. Route requires: {self.scopes.values()}",
headers={"WWW-Authenticate": "Bearer"},
)
return introspected_token
class ZitadelIntrospectToken(OAuth2AuthorizationCodeBearer):
def __init__(
self,
base_url: str,
private_key: ApiPrivateKey,
scopes: dict[str, str] = None,
):
super().__init__(
authorizationUrl=f"#{base_url}/oauth/v2/authorize",
tokenUrl=f"#{base_url}/oauth/v2/token",
refreshUrl=f"#{base_url}/oauth/v2/token",
scopes=scopes,
auto_error=True
)
self.base_url = base_url
self.private_key = private_key
self.scopes = scopes
async def __call__(self, request: Request) -> Optional[str]:
token = await super().__call__(request)
if not token:
return None
now = time.time()
resp = requests.post(
url=f'{self.base_url}/oauth/v2/introspect',
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": jwt.encode(
header={"alg": self.private_key.alg, "kid": self.private_key.key_id},
payload={
"iss": self.private_key.client_id,
"sub": self.private_key.client_id,
"aud": self.private_key.aud,
"exp": int(now) + self.private_key.exp_in,
"iat": int(now),
},
key=self.private_key.key,
),
"token": token,
}
)
return self.process_response(resp)
def process_response(self, resp):
if resp.status_code != 200:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
introspected_token = resp.json()
if not introspected_token:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Token was revoked.",
headers={"WWW-Authenticate": "Bearer"},
)
if not introspected_token.get("active"):
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Token is inactive.",
headers={"WWW-Authenticate": "Bearer"},
)
now = int(time.time())
if introspected_token["exp"] < now:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Token has expired.",
headers={"WWW-Authenticate": "Bearer"},
)
if not match_token_scopes(introspected_token, self.scopes):
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail=f"Token has insufficient scope. Route requires: {self.scopes.values()}",
headers={"WWW-Authenticate": "Bearer"},
)
return introspected_token