Endre | Blackr4tE
ZITADEL16mo ago
11 replies
Endre | Blackr4t

Howto authenticate with Machine User key towards admin api

I am trying in Python to access the admin api uising a Machine user(s) key json file based on the example found on github I created a httpx wrapper for async io python *

I get {'code': 7, 'message': 'could not read projectid by clientid (AUTH-GHpw2)', 'details': [{'@type': 'type.googleapis.com/zitadel.v1.ErrorDetail', 'id': 'AUTH-GHpw2', 'message': 'could not read projectid by clientid'}]} and I can't see what I am doing wrong for it as the PAT token works totally fine.

!/usr/bin/env python3

import asyncio
import datetime
import time
import typing
import httpx
import os

import jwt
import pydantic

CLIENT_PRIVATE_KEY_FILE_PATH = os.environ.get("ZITADEL_KEY", "key.json")
PROJECT_ID = os.getenv("PROJECT_ID", "256684781289541828")
ZITADEL_DOMAIN = os.environ.get("ZITADEL_DOMAIN", "https://id.foo.com")

ZITADEL_TOKEN_URL = "/oauth/v2/token"

DEFAULT_SCOPES = ["openid", "profile", "email", "read:messages"]


class ZitadelServiceKey(pydantic.BaseModel):
    type_: str = pydantic.Field(alias="type")
    keyId: str
    key: str
    expirationDate: datetime.datetime
    userId: str


TOKEN = "PAT TOKEN THAT WORKS"


class ZitadelAuth(httpx.Auth):
    url: str
    scope: list[str]

    key: ZitadelServiceKey
    token: typing.Optional[str]

    client: httpx.AsyncClient

    def __init__(
        self,
        url: str,
        key: typing.Optional[str] = None,
        scope=[],
        token: typing.Optional[str] = None,
    ):
        self.url = url

        self.client = httpx.AsyncClient(base_url=url)
        self.scope = scope

        self.token = None

        if key is not None:
            with open(key, "r") as f:
                self.key = ZitadelServiceKey.model_validate_json(f.read())
        elif token is not None:
            self.token = token
        else:
            raise ValueError("Need either key or token")

    async def make_auth_token(self):
        private_key = self.key.key
        kid = self.key.keyId
        user_id = self.key.userId

        # Create JWT header and payload
        header = {"alg": "RS256", "kid": kid}

        payload = {
            "iss": user_id,
            "sub": user_id,
            "aud": self.url,
            "iat": int(time.time()),
            "exp": int(time.time()) + 3600,
        }

        jwt_token = jwt.encode(payload, private_key, algorithm="RS256", headers=header)

        return jwt_token

    async def authenticate(self):
        jwt_token = await self.make_auth_token()
        scope = " ".join(self.scope)

        data = {
            "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
            "scope": scope,
            "assertion": jwt_token,
        }

        response = await self.client.post(ZITADEL_TOKEN_URL, data=data)

        return response

    async def async_auth_flow(self, request: httpx.Request):
        if self.token is not None:
            token = self.token
        else:
            response = await self.authenticate()
            token = response.json()["access_token"]

        request.headers["Authorization"] = f"Bearer {token}"

        yield request


async def run():
    # project_scope = f"urn:zitadel:iam:org:project:id:{PROJECT_ID}:aud"
    extra_scope = ["urn:zitadel:iam:org:project:id:zitadel:aud"]

    scope = DEFAULT_SCOPES + extra_scope

    auth = ZitadelAuth(
        ZITADEL_DOMAIN,
        key=CLIENT_PRIVATE_KEY_FILE_PATH,
        # token=TOKEN,
        scope=scope,
    )

    client = httpx.AsyncClient(base_url=ZITADEL_DOMAIN, auth=auth)
    headers = {"Accept": "application/json"}

    payload: dict[str, str] = {}
    response = await client.post(
        "/admin/v1/trusted_domains/_search", headers=headers, json=payload
    )
    print(response.json())
Was this page helpful?