Coverage for src / mcp_server_langgraph / auth / keycloak.py: 76%
783 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Keycloak integration for authentication and user management
4Provides production-ready authentication using Keycloak as the identity provider.
5Supports multiple authentication flows, token verification, and role/group mapping
6to OpenFGA for fine-grained authorization.
7"""
9from datetime import datetime, timedelta, UTC
10from typing import Any
11from urllib.parse import urlparse
13import httpx
14import jwt
15from pydantic import BaseModel, Field, field_validator
17from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer
20class KeycloakUser(BaseModel):
21 """Keycloak user model"""
23 id: str = Field(description="Keycloak user ID (UUID)")
24 username: str = Field(description="Username")
25 email: str | None = Field(default=None, description="Email address")
26 first_name: str | None = Field(default=None, description="First name")
27 last_name: str | None = Field(default=None, description="Last name")
28 enabled: bool = Field(default=True, description="Whether user account is enabled")
29 email_verified: bool = Field(default=False, description="Whether email is verified")
30 realm_roles: list[str] = Field(default_factory=list, description="Realm-level roles")
31 client_roles: dict[str, list[str]] = Field(default_factory=dict, description="Client-specific roles")
32 groups: list[str] = Field(default_factory=list, description="Group memberships")
33 attributes: dict[str, Any] = Field(default_factory=dict, description="Custom attributes")
35 @property
36 def user_id(self) -> str:
37 """Get user ID in format compatible with OpenFGA"""
38 return f"user:{self.username}"
40 @property
41 def full_name(self) -> str:
42 """Get full name"""
43 parts = filter(None, [self.first_name, self.last_name])
44 return " ".join(parts) or self.username
47class KeycloakConfig(BaseModel):
48 """Keycloak configuration"""
50 server_url: str = Field(description="Keycloak server URL (e.g., http://localhost:8180)")
51 realm: str = Field(description="Keycloak realm name")
52 client_id: str = Field(description="OAuth2/OIDC client ID")
53 client_secret: str | None = Field(default=None, description="OAuth2/OIDC client secret")
54 admin_username: str | None = Field(default=None, description="Admin username for admin API access")
55 admin_password: str | None = Field(default=None, description="Admin password for admin API access")
56 verify_ssl: bool = Field(default=True, description="Verify SSL certificates")
57 timeout: int = Field(default=30, description="HTTP request timeout in seconds")
59 @field_validator("server_url")
60 @classmethod
61 def validate_server_url(cls, v: str) -> str:
62 """
63 Validate server_url to prevent SSRF attacks.
65 Ensures:
66 - URL uses http or https scheme only
67 - URL has a valid hostname
68 - URL is normalized (no trailing slash)
70 Security: This prevents SSRF by rejecting file://, gopher://, dict://,
71 and other dangerous URL schemes that could be used to access internal
72 network resources or local files.
73 """
74 parsed = urlparse(v)
76 # Validate scheme (SSRF mitigation - only allow http/https)
77 if parsed.scheme not in ("http", "https"): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 msg = (
79 f"Invalid URL scheme '{parsed.scheme}'. Only 'http' and 'https' are allowed. "
80 "This restriction prevents SSRF attacks via dangerous URL schemes."
81 )
82 raise ValueError(msg)
84 # Validate hostname exists
85 if not parsed.netloc: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 msg = f"Invalid URL '{v}': missing hostname. Expected format: http(s)://hostname[:port]"
87 raise ValueError(msg)
89 # Normalize URL (remove trailing slash for consistency)
90 normalized = v.rstrip("/")
91 return normalized
93 @property
94 def realm_url(self) -> str:
95 """Get realm base URL"""
96 return f"{self.server_url}/realms/{self.realm}"
98 @property
99 def admin_url(self) -> str:
100 """Get admin API base URL"""
101 return f"{self.server_url}/admin/realms/{self.realm}"
103 @property
104 def token_endpoint(self) -> str:
105 """Get token endpoint URL"""
106 return f"{self.realm_url}/protocol/openid-connect/token"
108 @property
109 def userinfo_endpoint(self) -> str:
110 """Get userinfo endpoint URL"""
111 return f"{self.realm_url}/protocol/openid-connect/userinfo"
113 @property
114 def jwks_uri(self) -> str:
115 """Get JWKS URI for public key retrieval"""
116 return f"{self.realm_url}/protocol/openid-connect/certs"
118 @property
119 def well_known_url(self) -> str:
120 """Get OpenID configuration URL"""
121 return f"{self.realm_url}/.well-known/openid-configuration"
124class TokenValidator:
125 """JWT token validator using Keycloak JWKS"""
127 def __init__(self, config: KeycloakConfig) -> None:
128 self.config = config
129 self._jwks_cache: dict[str, Any] | None = None
130 self._jwks_cache_time: datetime | None = None
131 self._cache_ttl = timedelta(hours=1)
133 async def get_jwks(self, force_refresh: bool = False) -> dict[str, Any]:
134 """
135 Get JSON Web Key Set from Keycloak
137 Args:
138 force_refresh: Force refresh of cached keys
140 Returns:
141 JWKS dictionary
142 """
143 with tracer.start_as_current_span("keycloak.get_jwks"):
144 # Check cache
145 if not force_refresh and self._jwks_cache and self._jwks_cache_time:
146 if datetime.now(UTC) - self._jwks_cache_time < self._cache_ttl: 146 ↛ 151line 146 didn't jump to line 151 because the condition on line 146 was always true
147 logger.debug("Using cached JWKS")
148 return self._jwks_cache
150 # Fetch from Keycloak
151 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
152 try:
153 response = await client.get(self.config.jwks_uri)
154 response.raise_for_status()
155 jwks = response.json()
157 # Cache the result
158 self._jwks_cache = jwks
159 self._jwks_cache_time = datetime.now(UTC)
161 logger.info("JWKS fetched and cached", extra={"keys_count": len(jwks.get("keys", []))})
162 return jwks # type: ignore[no-any-return]
164 except httpx.HTTPError as e:
165 logger.error(f"Failed to fetch JWKS: {e}", exc_info=True)
166 metrics.failed_calls.add(1, {"operation": "get_jwks"})
167 raise
169 async def verify_token(self, token: str) -> dict[str, Any]:
170 """
171 Verify JWT token using Keycloak public keys
173 Args:
174 token: JWT access token
176 Returns:
177 Decoded token payload
179 Raises:
180 jwt.InvalidTokenError: If token is invalid
181 jwt.ExpiredSignatureError: If token is expired
182 """
183 with tracer.start_as_current_span("keycloak.verify_token") as span:
184 try:
185 # Decode header to get key ID
186 unverified_header = jwt.get_unverified_header(token)
187 kid = unverified_header.get("kid")
189 if not kid:
190 msg = "Token missing 'kid' in header"
191 raise jwt.InvalidTokenError(msg)
193 span.set_attribute("token.kid", kid)
195 # Get JWKS
196 jwks = await self.get_jwks()
198 # Find matching key
199 public_key = None
200 for key_data in jwks.get("keys", []):
201 if key_data.get("kid") == kid: 201 ↛ 200line 201 didn't jump to line 200 because the condition on line 201 was always true
202 public_key = jwt.algorithms.RSAAlgorithm.from_jwk(key_data)
203 break
205 if not public_key:
206 # Try refreshing JWKS in case keys were rotated
207 logger.warning(f"Key ID {kid} not found, refreshing JWKS")
208 jwks = await self.get_jwks(force_refresh=True)
210 for key_data in jwks.get("keys", []): 210 ↛ 211line 210 didn't jump to line 211 because the loop on line 210 never started
211 if key_data.get("kid") == kid:
212 public_key = jwt.algorithms.RSAAlgorithm.from_jwk(key_data)
213 break
215 if not public_key:
216 msg = f"Public key not found for kid: {kid}"
217 raise jwt.InvalidTokenError(msg)
219 # Verify and decode token
220 payload = jwt.decode(
221 token,
222 public_key, # type: ignore[arg-type]
223 algorithms=["RS256"],
224 audience=self.config.client_id,
225 options={
226 "verify_signature": True,
227 "verify_exp": True,
228 "verify_aud": True,
229 },
230 )
232 span.set_attribute("token.sub", payload.get("sub"))
233 span.set_attribute("token.preferred_username", payload.get("preferred_username"))
235 logger.info(
236 "Token verified successfully",
237 extra={"sub": payload.get("sub"), "username": payload.get("preferred_username")},
238 )
240 metrics.successful_calls.add(1, {"operation": "verify_token"})
242 return payload # type: ignore[no-any-return]
244 except jwt.ExpiredSignatureError:
245 logger.warning("Token expired")
246 metrics.auth_failures.add(1, {"reason": "expired_token"})
247 raise
248 except jwt.InvalidTokenError as e:
249 logger.warning(f"Invalid token: {e}")
250 metrics.auth_failures.add(1, {"reason": "invalid_token"})
251 raise
252 except Exception as e:
253 logger.error(f"Token verification error: {e}", exc_info=True)
254 metrics.failed_calls.add(1, {"operation": "verify_token"})
255 raise
258class KeycloakClient:
259 """
260 Keycloak client for authentication and user management
262 Provides methods for:
263 - Token verification (OIDC)
264 - User authentication (ROPC flow)
265 - Token refresh
266 - User info retrieval
267 - Admin API operations
268 """
270 def __init__(self, config: KeycloakConfig) -> None:
271 """
272 Initialize Keycloak client
274 Args:
275 config: Keycloak configuration
276 """
277 self.config = config
278 self.token_validator = TokenValidator(config)
279 self._admin_token: str | None = None
280 self._admin_token_expiry: datetime | None = None
282 async def authenticate_user(self, username: str, password: str) -> dict[str, Any]:
283 """
284 Authenticate user using Resource Owner Password Credentials (ROPC) flow
286 Args:
287 username: Username
288 password: Password
290 Returns:
291 Token response with access_token, refresh_token, etc.
293 Raises:
294 httpx.HTTPError: If authentication fails
295 """
296 with tracer.start_as_current_span("keycloak.authenticate_user") as span:
297 span.set_attribute("user.name", username)
299 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
300 try:
301 data = {
302 "grant_type": "password",
303 "client_id": self.config.client_id,
304 "username": username,
305 "password": password,
306 }
308 if self.config.client_secret: 308 ↛ 311line 308 didn't jump to line 311 because the condition on line 308 was always true
309 data["client_secret"] = self.config.client_secret
311 response = await client.post(self.config.token_endpoint, data=data)
312 response.raise_for_status()
314 tokens = response.json()
316 logger.info("User authenticated successfully", extra={"username": username})
317 metrics.successful_calls.add(1, {"operation": "authenticate_user"})
319 return tokens # type: ignore[no-any-return]
321 except httpx.HTTPStatusError as e:
322 logger.warning(
323 f"Authentication failed for {username}",
324 extra={"status_code": e.response.status_code, "detail": e.response.text},
325 )
326 metrics.auth_failures.add(1, {"reason": "invalid_credentials"})
327 raise
328 except httpx.HTTPError as e:
329 logger.error(f"Authentication error: {e}", exc_info=True)
330 metrics.failed_calls.add(1, {"operation": "authenticate_user"})
331 raise
333 async def verify_token(self, token: str) -> dict[str, Any]:
334 """
335 Verify JWT token
337 Args:
338 token: JWT access token
340 Returns:
341 Decoded token payload
342 """
343 return await self.token_validator.verify_token(token)
345 async def refresh_token(self, refresh_token: str) -> dict[str, Any]:
346 """
347 Refresh access token using refresh token
349 Args:
350 refresh_token: Refresh token
352 Returns:
353 New token response with access_token, refresh_token, etc.
354 """
355 with tracer.start_as_current_span("keycloak.refresh_token"):
356 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
357 try:
358 data = {
359 "grant_type": "refresh_token",
360 "client_id": self.config.client_id,
361 "refresh_token": refresh_token,
362 }
364 if self.config.client_secret: 364 ↛ 367line 364 didn't jump to line 367 because the condition on line 364 was always true
365 data["client_secret"] = self.config.client_secret
367 response = await client.post(self.config.token_endpoint, data=data)
368 response.raise_for_status()
370 tokens = response.json()
372 logger.info("Token refreshed successfully")
373 metrics.successful_calls.add(1, {"operation": "refresh_token"})
375 return tokens # type: ignore[no-any-return]
377 except httpx.HTTPStatusError as e:
378 logger.warning("Token refresh failed", extra={"status_code": e.response.status_code})
379 metrics.auth_failures.add(1, {"reason": "refresh_failed"})
380 raise
381 except httpx.HTTPError as e:
382 logger.error(f"Token refresh error: {e}", exc_info=True)
383 metrics.failed_calls.add(1, {"operation": "refresh_token"})
384 raise
386 async def get_userinfo(self, access_token: str) -> dict[str, Any]:
387 """
388 Get user information from userinfo endpoint
390 Args:
391 access_token: Access token
393 Returns:
394 User information dictionary
395 """
396 with tracer.start_as_current_span("keycloak.get_userinfo"):
397 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
398 try:
399 headers = {"Authorization": f"Bearer {access_token}"}
401 response = await client.get(self.config.userinfo_endpoint, headers=headers)
402 response.raise_for_status()
404 userinfo = response.json()
406 logger.info("User info retrieved", extra={"sub": userinfo.get("sub")})
407 metrics.successful_calls.add(1, {"operation": "get_userinfo"})
409 return userinfo # type: ignore[no-any-return]
411 except httpx.HTTPError as e:
412 logger.error(f"Failed to get user info: {e}", exc_info=True)
413 metrics.failed_calls.add(1, {"operation": "get_userinfo"})
414 raise
416 async def get_admin_token(self) -> str:
417 """
418 Get admin access token for admin API calls
420 Returns:
421 Admin access token
422 """
423 # Check if we have a valid cached token
424 if self._admin_token and self._admin_token_expiry:
425 if datetime.now(UTC) < self._admin_token_expiry - timedelta(minutes=1): 425 ↛ 430line 425 didn't jump to line 430 because the condition on line 425 was always true
426 assert self._admin_token is not None # Guaranteed by condition above
427 return self._admin_token
429 # Get new admin token
430 with tracer.start_as_current_span("keycloak.get_admin_token"):
431 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
432 try:
433 data = {
434 "grant_type": "password",
435 "client_id": "admin-cli",
436 "username": self.config.admin_username,
437 "password": self.config.admin_password,
438 }
440 # Admin token endpoint is at master realm
441 admin_token_url = f"{self.config.server_url}/realms/master/protocol/openid-connect/token"
443 response = await client.post(admin_token_url, data=data)
444 response.raise_for_status()
446 tokens = response.json()
447 self._admin_token = tokens["access_token"]
448 expires_in = tokens.get("expires_in", 300) # Default 5 min
449 self._admin_token_expiry = datetime.now(UTC) + timedelta(seconds=expires_in)
451 logger.info("Admin token obtained")
452 assert self._admin_token is not None # Just set on line above
453 return self._admin_token
455 except httpx.HTTPError as e:
456 logger.error(f"Failed to get admin token: {e}", exc_info=True)
457 raise
459 async def get_user_by_username(self, username: str) -> KeycloakUser | None:
460 """
461 Get user by username using admin API
463 Args:
464 username: Username to search for
466 Returns:
467 KeycloakUser if found, None otherwise
468 """
469 with tracer.start_as_current_span("keycloak.get_user_by_username") as span:
470 span.set_attribute("user.name", username)
472 try:
473 admin_token = await self.get_admin_token()
475 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
476 headers = {"Authorization": f"Bearer {admin_token}"}
477 params = {"username": username, "exact": "true"}
479 url = f"{self.config.admin_url}/users"
480 response = await client.get(url, headers=headers, params=params)
481 response.raise_for_status()
483 users = response.json()
485 if not users:
486 logger.info(f"User not found: {username}")
487 return None
489 user_data = users[0]
491 # Get user roles
492 user_id = user_data["id"]
493 realm_roles = await self._get_user_realm_roles(user_id, admin_token)
494 client_roles = await self._get_user_client_roles(user_id, admin_token)
495 groups = await self._get_user_groups(user_id, admin_token)
497 # Build KeycloakUser
498 keycloak_user = KeycloakUser(
499 id=user_id,
500 username=user_data["username"],
501 email=user_data.get("email"),
502 first_name=user_data.get("firstName"),
503 last_name=user_data.get("lastName"),
504 enabled=user_data.get("enabled", True),
505 email_verified=user_data.get("emailVerified", False),
506 realm_roles=realm_roles,
507 client_roles=client_roles,
508 groups=groups,
509 attributes=user_data.get("attributes", {}),
510 )
512 logger.info(f"User retrieved: {username}", extra={"user_id": user_id})
513 return keycloak_user
515 except httpx.HTTPError as e:
516 logger.error(f"Failed to get user: {e}", exc_info=True)
517 return None
519 async def _get_user_realm_roles(self, user_id: str, admin_token: str) -> list[str]:
520 """Get user's realm-level roles"""
521 try:
522 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
523 headers = {"Authorization": f"Bearer {admin_token}"}
524 url = f"{self.config.admin_url}/users/{user_id}/role-mappings/realm"
526 response = await client.get(url, headers=headers)
527 response.raise_for_status()
529 roles = response.json()
530 return [role["name"] for role in roles]
531 except httpx.HTTPError:
532 logger.warning(f"Failed to get realm roles for user {user_id}")
533 return []
535 async def _get_user_client_roles(self, user_id: str, admin_token: str) -> dict[str, list[str]]:
536 """Get user's client-level roles"""
537 try:
538 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
539 headers = {"Authorization": f"Bearer {admin_token}"}
541 # Get all clients
542 clients_url = f"{self.config.admin_url}/clients"
543 response = await client.get(clients_url, headers=headers)
544 response.raise_for_status()
545 clients = response.json()
547 client_roles = {}
549 # Get roles for our specific client
550 for client_data in clients:
551 if client_data.get("clientId") == self.config.client_id:
552 client_id = client_data["id"]
553 roles_url = f"{self.config.admin_url}/users/{user_id}/role-mappings/clients/{client_id}"
555 roles_response = await client.get(roles_url, headers=headers)
556 if roles_response.status_code == 200: 556 ↛ 550line 556 didn't jump to line 550 because the condition on line 556 was always true
557 roles = roles_response.json()
558 client_roles[self.config.client_id] = [role["name"] for role in roles]
560 return client_roles
562 except httpx.HTTPError:
563 logger.warning(f"Failed to get client roles for user {user_id}")
564 return {}
566 async def _get_user_groups(self, user_id: str, admin_token: str) -> list[str]:
567 """Get user's group memberships"""
568 try:
569 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
570 headers = {"Authorization": f"Bearer {admin_token}"}
571 url = f"{self.config.admin_url}/users/{user_id}/groups"
573 response = await client.get(url, headers=headers)
574 response.raise_for_status()
576 groups = response.json()
577 return [group.get("path", group.get("name", "")) for group in groups]
579 except httpx.HTTPError:
580 logger.warning(f"Failed to get groups for user {user_id}")
581 return []
583 # Admin API Methods (stubs for new functionality)
584 # These methods interact with Keycloak Admin API for user/client management
586 async def create_client(self, client_config: dict[str, Any]) -> str:
587 """
588 Create Keycloak client via Admin API.
590 Args:
591 client_config: Client configuration dictionary with fields:
592 - clientId: Client ID (required)
593 - name: Display name
594 - description: Description
595 - enabled: Whether client is enabled (default: True)
596 - serviceAccountsEnabled: Enable service accounts
597 - standardFlowEnabled: Enable authorization code flow
598 - directAccessGrantsEnabled: Enable ROPC flow
599 - implicitFlowEnabled: Enable implicit flow
600 - publicClient: Public vs confidential client
601 - clientAuthenticatorType: Authentication type (e.g., "client-secret")
602 - secret: Client secret (for confidential clients)
603 - attributes: Custom attributes
605 Returns:
606 Client UUID from Keycloak
608 Raises:
609 httpx.HTTPError: If client creation fails
610 """
611 with tracer.start_as_current_span("keycloak.create_client") as span:
612 span.set_attribute("client.id", client_config.get("clientId", "unknown"))
614 try:
615 admin_token = await self.get_admin_token()
617 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
618 headers = {
619 "Authorization": f"Bearer {admin_token}",
620 "Content-Type": "application/json",
621 }
623 url = f"{self.config.admin_url}/clients"
624 response = await client.post(url, headers=headers, json=client_config)
625 response.raise_for_status()
627 # Extract client UUID from Location header
628 location = response.headers.get("Location", "")
629 client_uuid = location.split("/")[-1] if location else ""
631 logger.info(
632 f"Keycloak client created: {client_config.get('clientId')}",
633 extra={"client_id": client_config.get("clientId"), "uuid": client_uuid},
634 )
636 metrics.successful_calls.add(1, {"operation": "create_client"})
638 return client_uuid
640 except httpx.HTTPStatusError as e:
641 logger.error(
642 f"Failed to create client: {e}",
643 extra={"status_code": e.response.status_code, "detail": e.response.text},
644 exc_info=True,
645 )
646 metrics.failed_calls.add(1, {"operation": "create_client"})
647 raise
648 except httpx.HTTPError as e:
649 logger.error(f"HTTP error creating client: {e}", exc_info=True)
650 metrics.failed_calls.add(1, {"operation": "create_client"})
651 raise
653 async def create_user(self, user_config: dict[str, Any]) -> str:
654 """
655 Create Keycloak user via Admin API.
657 Args:
658 user_config: User configuration dictionary with fields:
659 - username: Username (required)
660 - email: Email address
661 - emailVerified: Whether email is verified
662 - enabled: Whether user is enabled (default: True)
663 - firstName: First name
664 - lastName: Last name
665 - attributes: Custom attributes dictionary
666 - credentials: List of credentials (e.g., password)
668 Returns:
669 User UUID from Keycloak
671 Raises:
672 httpx.HTTPError: If user creation fails
673 """
674 with tracer.start_as_current_span("keycloak.create_user") as span:
675 span.set_attribute("user.username", user_config.get("username", "unknown"))
677 try:
678 admin_token = await self.get_admin_token()
680 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
681 headers = {
682 "Authorization": f"Bearer {admin_token}",
683 "Content-Type": "application/json",
684 }
686 url = f"{self.config.admin_url}/users"
687 response = await client.post(url, headers=headers, json=user_config)
688 response.raise_for_status()
690 # Extract user UUID from Location header
691 location = response.headers.get("Location", "")
692 user_uuid = location.split("/")[-1] if location else ""
694 logger.info(
695 f"Keycloak user created: {user_config.get('username')}",
696 extra={"username": user_config.get("username"), "uuid": user_uuid},
697 )
699 metrics.successful_calls.add(1, {"operation": "create_user"})
701 return user_uuid
703 except httpx.HTTPStatusError as e:
704 logger.error(
705 f"Failed to create user: {e}",
706 extra={"status_code": e.response.status_code, "detail": e.response.text},
707 exc_info=True,
708 )
709 metrics.failed_calls.add(1, {"operation": "create_user"})
710 raise
711 except httpx.HTTPError as e:
712 logger.error(f"HTTP error creating user: {e}", exc_info=True)
713 metrics.failed_calls.add(1, {"operation": "create_user"})
714 raise
716 async def update_user(self, user_id: str, user_config: dict[str, Any]) -> None:
717 """
718 Update Keycloak user via Admin API.
720 Args:
721 user_id: User UUID to update
722 user_config: Dictionary of user properties to update (camelCase format expected by Keycloak)
724 Raises:
725 httpx.HTTPError: If user update fails
726 """
727 with tracer.start_as_current_span("keycloak.update_user") as span:
728 span.set_attribute("user.uuid", user_id)
730 try:
731 admin_token = await self.get_admin_token()
733 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
734 headers = {
735 "Authorization": f"Bearer {admin_token}",
736 "Content-Type": "application/json",
737 }
739 url = f"{self.config.admin_url}/users/{user_id}"
740 response = await client.put(url, headers=headers, json=user_config)
741 response.raise_for_status()
743 logger.info(f"Updated user {user_id}")
744 metrics.successful_calls.add(1, {"operation": "update_user"})
746 except httpx.HTTPStatusError as e:
747 logger.error(
748 f"Failed to update user: {e}",
749 extra={"status_code": e.response.status_code, "user_id": user_id},
750 exc_info=True,
751 )
752 metrics.failed_calls.add(1, {"operation": "update_user"})
753 raise
754 except httpx.HTTPError as e:
755 logger.error(f"HTTP error updating user: {e}", exc_info=True)
756 metrics.failed_calls.add(1, {"operation": "update_user"})
757 raise
759 async def set_user_password(self, user_id: str, password: str, temporary: bool = False) -> None:
760 """
761 Set user password via Admin API.
763 Args:
764 user_id: User UUID
765 password: New password to set
766 temporary: If True, user must change password on next login
768 Raises:
769 httpx.HTTPError: If password update fails
770 """
771 with tracer.start_as_current_span("keycloak.set_user_password") as span:
772 span.set_attribute("user.uuid", user_id)
773 span.set_attribute("password.temporary", temporary)
775 try:
776 admin_token = await self.get_admin_token()
778 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
779 headers = {
780 "Authorization": f"Bearer {admin_token}",
781 "Content-Type": "application/json",
782 }
784 # Keycloak password reset endpoint
785 url = f"{self.config.admin_url}/users/{user_id}/reset-password"
786 password_config = {
787 "type": "password",
788 "value": password,
789 "temporary": temporary,
790 }
792 response = await client.put(url, headers=headers, json=password_config)
793 response.raise_for_status()
795 logger.info(f"Set password for user {user_id} (temporary={temporary})")
796 metrics.successful_calls.add(1, {"operation": "set_user_password"})
798 except httpx.HTTPStatusError as e:
799 logger.error(
800 f"Failed to set user password: {e}",
801 extra={"status_code": e.response.status_code, "user_id": user_id},
802 exc_info=True,
803 )
804 metrics.failed_calls.add(1, {"operation": "set_user_password"})
805 raise
806 except httpx.HTTPError as e:
807 logger.error(f"HTTP error setting user password: {e}", exc_info=True)
808 metrics.failed_calls.add(1, {"operation": "set_user_password"})
809 raise
811 async def get_user(self, user_id: str) -> dict[str, Any] | None:
812 """
813 Get user by ID via Admin API.
815 Args:
816 user_id: User UUID
818 Returns:
819 User data dictionary or None if not found
821 Raises:
822 httpx.HTTPError: If getting user fails (except 404 which returns None)
823 """
824 with tracer.start_as_current_span("keycloak.get_user") as span:
825 span.set_attribute("user.uuid", user_id)
827 try:
828 admin_token = await self.get_admin_token()
830 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
831 headers = {"Authorization": f"Bearer {admin_token}"}
833 url = f"{self.config.admin_url}/users/{user_id}"
834 response = await client.get(url, headers=headers)
835 response.raise_for_status()
837 user_data = response.json()
839 logger.info(f"Retrieved user {user_id}")
840 metrics.successful_calls.add(1, {"operation": "get_user"})
842 return user_data # type: ignore[no-any-return]
844 except httpx.HTTPStatusError as e:
845 # Return None for 404 (user not found)
846 if e.response.status_code == 404: 846 ↛ 850line 846 didn't jump to line 850 because the condition on line 846 was always true
847 logger.info(f"User {user_id} not found")
848 return None
850 logger.error(
851 f"Failed to get user: {e}",
852 extra={"status_code": e.response.status_code, "user_id": user_id},
853 exc_info=True,
854 )
855 metrics.failed_calls.add(1, {"operation": "get_user"})
856 raise
857 except httpx.HTTPError as e:
858 logger.error(f"HTTP error getting user: {e}", exc_info=True)
859 metrics.failed_calls.add(1, {"operation": "get_user"})
860 raise
862 async def get_user_attributes(self, user_id: str) -> dict[str, Any]:
863 """
864 Get user custom attributes via Admin API.
866 Args:
867 user_id: User UUID
869 Returns:
870 Dictionary of user attributes
872 Raises:
873 httpx.HTTPError: If getting user attributes fails
874 """
875 with tracer.start_as_current_span("keycloak.get_user_attributes") as span:
876 span.set_attribute("user.uuid", user_id)
878 try:
879 admin_token = await self.get_admin_token()
881 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
882 headers = {"Authorization": f"Bearer {admin_token}"}
884 url = f"{self.config.admin_url}/users/{user_id}"
885 response = await client.get(url, headers=headers)
886 response.raise_for_status()
888 user_data = response.json()
889 attributes = user_data.get("attributes", {})
891 logger.info(f"Retrieved attributes for user {user_id}")
892 metrics.successful_calls.add(1, {"operation": "get_user_attributes"})
894 return attributes # type: ignore[no-any-return]
896 except httpx.HTTPStatusError as e:
897 logger.error(
898 f"Failed to get user attributes: {e}",
899 extra={"status_code": e.response.status_code, "user_id": user_id},
900 exc_info=True,
901 )
902 metrics.failed_calls.add(1, {"operation": "get_user_attributes"})
903 raise
904 except httpx.HTTPError as e:
905 logger.error(f"HTTP error getting user attributes: {e}", exc_info=True)
906 metrics.failed_calls.add(1, {"operation": "get_user_attributes"})
907 raise
909 async def update_user_attributes(self, user_id: str, attributes: dict[str, Any]) -> None:
910 """
911 Update user custom attributes via Admin API.
913 Args:
914 user_id: User UUID
915 attributes: Dictionary of attributes to update
917 Raises:
918 httpx.HTTPError: If updating user attributes fails
919 """
920 with tracer.start_as_current_span("keycloak.update_user_attributes") as span:
921 span.set_attribute("user.uuid", user_id)
922 span.set_attribute("attributes.count", len(attributes))
924 try:
925 admin_token = await self.get_admin_token()
927 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
928 headers = {
929 "Authorization": f"Bearer {admin_token}",
930 "Content-Type": "application/json",
931 }
933 # Keycloak requires full user object in PUT request
934 # We need to get the current user first, then update attributes
935 get_url = f"{self.config.admin_url}/users/{user_id}"
936 get_response = await client.get(get_url, headers=headers)
937 get_response.raise_for_status()
939 user_data = get_response.json()
940 user_data["attributes"] = attributes
942 # Update user with new attributes
943 put_url = f"{self.config.admin_url}/users/{user_id}"
944 put_response = await client.put(put_url, headers=headers, json=user_data)
945 put_response.raise_for_status()
947 logger.info(f"Updated attributes for user {user_id}")
948 metrics.successful_calls.add(1, {"operation": "update_user_attributes"})
950 except httpx.HTTPStatusError as e:
951 logger.error(
952 f"Failed to update user attributes: {e}",
953 extra={"status_code": e.response.status_code, "user_id": user_id},
954 exc_info=True,
955 )
956 metrics.failed_calls.add(1, {"operation": "update_user_attributes"})
957 raise
958 except httpx.HTTPError as e:
959 logger.error(f"HTTP error updating user attributes: {e}", exc_info=True)
960 metrics.failed_calls.add(1, {"operation": "update_user_attributes"})
961 raise
963 async def update_client_attributes(self, client_id: str, attributes: dict[str, Any]) -> None:
964 """
965 Update client attributes via Admin API.
967 Args:
968 client_id: Client UUID (not clientId)
969 attributes: Dictionary of attributes to update
971 Raises:
972 httpx.HTTPError: If client attribute update fails
973 """
974 with tracer.start_as_current_span("keycloak.update_client_attributes") as span:
975 span.set_attribute("client.uuid", client_id)
977 try:
978 admin_token = await self.get_admin_token()
980 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
981 headers = {
982 "Authorization": f"Bearer {admin_token}",
983 "Content-Type": "application/json",
984 }
986 # Get current client data
987 get_url = f"{self.config.admin_url}/clients/{client_id}"
988 get_response = await client.get(get_url, headers=headers)
989 get_response.raise_for_status()
991 client_data = get_response.json()
992 client_data["attributes"] = attributes
994 # Update client with new attributes
995 put_url = f"{self.config.admin_url}/clients/{client_id}"
996 put_response = await client.put(put_url, headers=headers, json=client_data)
997 put_response.raise_for_status()
999 logger.info(f"Updated attributes for client {client_id}")
1000 metrics.successful_calls.add(1, {"operation": "update_client_attributes"})
1002 except httpx.HTTPStatusError as e:
1003 logger.error(
1004 f"Failed to update client attributes: {e}",
1005 extra={"status_code": e.response.status_code, "client_id": client_id},
1006 exc_info=True,
1007 )
1008 metrics.failed_calls.add(1, {"operation": "update_client_attributes"})
1009 raise
1010 except httpx.HTTPError as e:
1011 logger.error(f"HTTP error updating client attributes: {e}", exc_info=True)
1012 metrics.failed_calls.add(1, {"operation": "update_client_attributes"})
1013 raise
1015 async def update_client_secret(self, client_id: str, secret: str) -> None:
1016 """
1017 Update client secret via Admin API.
1019 Args:
1020 client_id: Client UUID (not clientId)
1021 secret: New client secret
1023 Raises:
1024 httpx.HTTPError: If client secret update fails
1025 """
1026 with tracer.start_as_current_span("keycloak.update_client_secret") as span:
1027 span.set_attribute("client.uuid", client_id)
1029 try:
1030 admin_token = await self.get_admin_token()
1032 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1033 headers = {
1034 "Authorization": f"Bearer {admin_token}",
1035 "Content-Type": "application/json",
1036 }
1038 # Keycloak client-secret regeneration endpoint
1039 url = f"{self.config.admin_url}/clients/{client_id}/client-secret"
1040 secret_config = {"type": "secret", "value": secret}
1042 response = await client.post(url, headers=headers, json=secret_config)
1043 response.raise_for_status()
1045 logger.info(f"Updated secret for client {client_id}")
1046 metrics.successful_calls.add(1, {"operation": "update_client_secret"})
1048 except httpx.HTTPStatusError as e:
1049 logger.error(
1050 f"Failed to update client secret: {e}",
1051 extra={"status_code": e.response.status_code, "client_id": client_id},
1052 exc_info=True,
1053 )
1054 metrics.failed_calls.add(1, {"operation": "update_client_secret"})
1055 raise
1056 except httpx.HTTPError as e:
1057 logger.error(f"HTTP error updating client secret: {e}", exc_info=True)
1058 metrics.failed_calls.add(1, {"operation": "update_client_secret"})
1059 raise
1061 async def get_clients(self, query: dict[str, Any] | None = None) -> list[dict[str, Any]]:
1062 """
1063 Get clients via Admin API.
1065 Args:
1066 query: Optional query parameters (e.g., {"clientId": "my-client"})
1068 Returns:
1069 List of client data dictionaries
1071 Raises:
1072 httpx.HTTPError: If getting clients fails
1073 """
1074 with tracer.start_as_current_span("keycloak.get_clients"):
1075 try:
1076 admin_token = await self.get_admin_token()
1078 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1079 headers = {"Authorization": f"Bearer {admin_token}"}
1081 url = f"{self.config.admin_url}/clients"
1082 # Pass query parameters if provided
1083 params = query if query else {}
1085 response = await client.get(url, headers=headers, params=params)
1086 response.raise_for_status()
1088 clients_data = response.json()
1090 logger.info(f"Retrieved {len(clients_data)} clients")
1091 metrics.successful_calls.add(1, {"operation": "get_clients"})
1093 return clients_data # type: ignore[no-any-return]
1095 except httpx.HTTPStatusError as e:
1096 logger.error(
1097 f"Failed to get clients: {e}",
1098 extra={"status_code": e.response.status_code},
1099 exc_info=True,
1100 )
1101 metrics.failed_calls.add(1, {"operation": "get_clients"})
1102 raise
1103 except httpx.HTTPError as e:
1104 logger.error(f"HTTP error getting clients: {e}", exc_info=True)
1105 metrics.failed_calls.add(1, {"operation": "get_clients"})
1106 raise
1108 async def get_client(self, client_id: str) -> dict[str, Any] | None:
1109 """
1110 Get client by ID via Admin API.
1112 Args:
1113 client_id: Client UUID (not clientId)
1115 Returns:
1116 Client data dictionary or None if not found
1118 Raises:
1119 httpx.HTTPError: If getting client fails (except 404 which returns None)
1120 """
1121 with tracer.start_as_current_span("keycloak.get_client") as span:
1122 span.set_attribute("client.uuid", client_id)
1124 try:
1125 admin_token = await self.get_admin_token()
1127 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1128 headers = {"Authorization": f"Bearer {admin_token}"}
1130 url = f"{self.config.admin_url}/clients/{client_id}"
1131 response = await client.get(url, headers=headers)
1132 response.raise_for_status()
1134 client_data = response.json()
1136 logger.info(f"Retrieved client {client_id}")
1137 metrics.successful_calls.add(1, {"operation": "get_client"})
1139 return client_data # type: ignore[no-any-return]
1141 except httpx.HTTPStatusError as e:
1142 # Return None for 404 (client not found)
1143 if e.response.status_code == 404: 1143 ↛ 1147line 1143 didn't jump to line 1147 because the condition on line 1143 was always true
1144 logger.info(f"Client {client_id} not found")
1145 return None
1147 logger.error(
1148 f"Failed to get client: {e}",
1149 extra={"status_code": e.response.status_code, "client_id": client_id},
1150 exc_info=True,
1151 )
1152 metrics.failed_calls.add(1, {"operation": "get_client"})
1153 raise
1154 except httpx.HTTPError as e:
1155 logger.error(f"HTTP error getting client: {e}", exc_info=True)
1156 metrics.failed_calls.add(1, {"operation": "get_client"})
1157 raise
1159 async def delete_client(self, client_id: str) -> None:
1160 """
1161 Delete Keycloak client via Admin API.
1163 Args:
1164 client_id: Client UUID (not clientId) to delete
1166 Raises:
1167 httpx.HTTPError: If client deletion fails
1168 """
1169 with tracer.start_as_current_span("keycloak.delete_client") as span:
1170 span.set_attribute("client.uuid", client_id)
1172 try:
1173 admin_token = await self.get_admin_token()
1175 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1176 headers = {"Authorization": f"Bearer {admin_token}"}
1178 url = f"{self.config.admin_url}/clients/{client_id}"
1179 response = await client.delete(url, headers=headers)
1180 response.raise_for_status()
1182 logger.info(f"Keycloak client deleted: {client_id}")
1183 metrics.successful_calls.add(1, {"operation": "delete_client"})
1185 except httpx.HTTPStatusError as e:
1186 logger.error(
1187 f"Failed to delete client: {e}",
1188 extra={"status_code": e.response.status_code, "client_id": client_id},
1189 exc_info=True,
1190 )
1191 metrics.failed_calls.add(1, {"operation": "delete_client"})
1192 raise
1193 except httpx.HTTPError as e:
1194 logger.error(f"HTTP error deleting client: {e}", exc_info=True)
1195 metrics.failed_calls.add(1, {"operation": "delete_client"})
1196 raise
1198 async def delete_user(self, user_id: str) -> None:
1199 """
1200 Delete Keycloak user via Admin API.
1202 Args:
1203 user_id: User UUID to delete
1205 Raises:
1206 httpx.HTTPError: If user deletion fails
1207 """
1208 with tracer.start_as_current_span("keycloak.delete_user") as span:
1209 span.set_attribute("user.uuid", user_id)
1211 try:
1212 admin_token = await self.get_admin_token()
1214 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1215 headers = {"Authorization": f"Bearer {admin_token}"}
1217 url = f"{self.config.admin_url}/users/{user_id}"
1218 response = await client.delete(url, headers=headers)
1219 response.raise_for_status()
1221 logger.info(f"Keycloak user deleted: {user_id}")
1222 metrics.successful_calls.add(1, {"operation": "delete_user"})
1224 except httpx.HTTPStatusError as e:
1225 logger.error(
1226 f"Failed to delete user: {e}",
1227 extra={"status_code": e.response.status_code, "user_id": user_id},
1228 exc_info=True,
1229 )
1230 metrics.failed_calls.add(1, {"operation": "delete_user"})
1231 raise
1232 except httpx.HTTPError as e:
1233 logger.error(f"HTTP error deleting user: {e}", exc_info=True)
1234 metrics.failed_calls.add(1, {"operation": "delete_user"})
1235 raise
1237 async def search_users(self, query: dict[str, Any] | None = None, first: int = 0, max: int = 100) -> list[dict[str, Any]]:
1238 """
1239 Search users via Admin API with pagination.
1241 Args:
1242 query: Optional search query (e.g., {"username": "alice", "email": "alice@example.com"})
1243 first: Pagination offset (default: 0)
1244 max: Maximum results to return (default: 100)
1246 Returns:
1247 List of user data dictionaries matching the query
1249 Raises:
1250 httpx.HTTPError: If user search fails
1251 """
1252 with tracer.start_as_current_span("keycloak.search_users") as span:
1253 span.set_attribute("pagination.first", first)
1254 span.set_attribute("pagination.max", max)
1256 try:
1257 admin_token = await self.get_admin_token()
1259 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1260 headers = {"Authorization": f"Bearer {admin_token}"}
1262 url = f"{self.config.admin_url}/users"
1264 # Build query parameters
1265 params: dict[str, Any] = {"first": first, "max": max}
1266 if query: 1266 ↛ 1269line 1266 didn't jump to line 1269 because the condition on line 1266 was always true
1267 params.update(query)
1269 response = await client.get(url, headers=headers, params=params)
1270 response.raise_for_status()
1272 users_data = response.json()
1274 logger.info(f"Searched users, found {len(users_data)} results")
1275 metrics.successful_calls.add(1, {"operation": "search_users"})
1277 return users_data # type: ignore[no-any-return]
1279 except httpx.HTTPStatusError as e:
1280 logger.error(
1281 f"Failed to search users: {e}",
1282 extra={"status_code": e.response.status_code},
1283 exc_info=True,
1284 )
1285 metrics.failed_calls.add(1, {"operation": "search_users"})
1286 raise
1287 except httpx.HTTPError as e:
1288 logger.error(f"HTTP error searching users: {e}", exc_info=True)
1289 metrics.failed_calls.add(1, {"operation": "search_users"})
1290 raise
1292 async def get_users(self, query: dict[str, Any] | None = None) -> list[dict[str, Any]]:
1293 """
1294 Get all users via Admin API with optional filtering.
1296 Args:
1297 query: Optional query parameters (e.g., {"email": "example.com"})
1299 Returns:
1300 List of all user data dictionaries
1302 Raises:
1303 httpx.HTTPError: If getting users fails
1304 """
1305 with tracer.start_as_current_span("keycloak.get_users"):
1306 try:
1307 admin_token = await self.get_admin_token()
1309 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1310 headers = {"Authorization": f"Bearer {admin_token}"}
1312 url = f"{self.config.admin_url}/users"
1313 params = query if query else {}
1315 response = await client.get(url, headers=headers, params=params)
1316 response.raise_for_status()
1318 users_data = response.json()
1320 logger.info(f"Retrieved {len(users_data)} users")
1321 metrics.successful_calls.add(1, {"operation": "get_users"})
1323 return users_data # type: ignore[no-any-return]
1325 except httpx.HTTPStatusError as e:
1326 logger.error(
1327 f"Failed to get users: {e}",
1328 extra={"status_code": e.response.status_code},
1329 exc_info=True,
1330 )
1331 metrics.failed_calls.add(1, {"operation": "get_users"})
1332 raise
1333 except httpx.HTTPError as e:
1334 logger.error(f"HTTP error getting users: {e}", exc_info=True)
1335 metrics.failed_calls.add(1, {"operation": "get_users"})
1336 raise
1338 async def create_group(self, group_config: dict[str, Any]) -> str:
1339 """
1340 Create group via Admin API.
1342 Args:
1343 group_config: Dictionary of group properties (must include "name")
1345 Returns:
1346 Group UUID extracted from Location header
1348 Raises:
1349 httpx.HTTPError: If group creation fails
1350 """
1351 with tracer.start_as_current_span("keycloak.create_group"):
1352 try:
1353 admin_token = await self.get_admin_token()
1355 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1356 headers = {
1357 "Authorization": f"Bearer {admin_token}",
1358 "Content-Type": "application/json",
1359 }
1361 url = f"{self.config.admin_url}/groups"
1362 response = await client.post(url, headers=headers, json=group_config)
1363 response.raise_for_status()
1365 # Extract group ID from Location header
1366 location = response.headers.get("Location", "")
1367 group_id: str = location.split("/")[-1]
1369 logger.info(f"Created group {group_id}")
1370 metrics.successful_calls.add(1, {"operation": "create_group"})
1372 return group_id
1374 except httpx.HTTPStatusError as e:
1375 logger.error(
1376 f"Failed to create group: {e}",
1377 extra={"status_code": e.response.status_code},
1378 exc_info=True,
1379 )
1380 metrics.failed_calls.add(1, {"operation": "create_group"})
1381 raise
1382 except httpx.HTTPError as e:
1383 logger.error(f"HTTP error creating group: {e}", exc_info=True)
1384 metrics.failed_calls.add(1, {"operation": "create_group"})
1385 raise
1387 async def get_group(self, group_id: str) -> dict[str, Any] | None:
1388 """
1389 Get group by ID via Admin API.
1391 Args:
1392 group_id: Group UUID
1394 Returns:
1395 Group data dictionary or None if not found
1397 Raises:
1398 httpx.HTTPError: If getting group fails (except 404 which returns None)
1399 """
1400 with tracer.start_as_current_span("keycloak.get_group") as span:
1401 span.set_attribute("group.uuid", group_id)
1403 try:
1404 admin_token = await self.get_admin_token()
1406 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1407 headers = {"Authorization": f"Bearer {admin_token}"}
1409 url = f"{self.config.admin_url}/groups/{group_id}"
1410 response = await client.get(url, headers=headers)
1411 response.raise_for_status()
1413 group_data = response.json()
1415 logger.info(f"Retrieved group {group_id}")
1416 metrics.successful_calls.add(1, {"operation": "get_group"})
1418 return group_data # type: ignore[no-any-return]
1420 except httpx.HTTPStatusError as e:
1421 # Return None for 404 (group not found)
1422 if e.response.status_code == 404: 1422 ↛ 1426line 1422 didn't jump to line 1426 because the condition on line 1422 was always true
1423 logger.info(f"Group {group_id} not found")
1424 return None
1426 logger.error(
1427 f"Failed to get group: {e}",
1428 extra={"status_code": e.response.status_code, "group_id": group_id},
1429 exc_info=True,
1430 )
1431 metrics.failed_calls.add(1, {"operation": "get_group"})
1432 raise
1433 except httpx.HTTPError as e:
1434 logger.error(f"HTTP error getting group: {e}", exc_info=True)
1435 metrics.failed_calls.add(1, {"operation": "get_group"})
1436 raise
1438 async def get_group_members(self, group_id: str) -> list[dict[str, Any]]:
1439 """
1440 Get group members via Admin API.
1442 Args:
1443 group_id: Group UUID
1445 Returns:
1446 List of user data dictionaries for group members
1448 Raises:
1449 httpx.HTTPError: If getting group members fails
1450 """
1451 with tracer.start_as_current_span("keycloak.get_group_members") as span:
1452 span.set_attribute("group.uuid", group_id)
1454 try:
1455 admin_token = await self.get_admin_token()
1457 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1458 headers = {"Authorization": f"Bearer {admin_token}"}
1460 url = f"{self.config.admin_url}/groups/{group_id}/members"
1461 response = await client.get(url, headers=headers)
1462 response.raise_for_status()
1464 members_data = response.json()
1466 logger.info(f"Retrieved {len(members_data)} members for group {group_id}")
1467 metrics.successful_calls.add(1, {"operation": "get_group_members"})
1469 return members_data # type: ignore[no-any-return]
1471 except httpx.HTTPStatusError as e:
1472 logger.error(
1473 f"Failed to get group members: {e}",
1474 extra={"status_code": e.response.status_code, "group_id": group_id},
1475 exc_info=True,
1476 )
1477 metrics.failed_calls.add(1, {"operation": "get_group_members"})
1478 raise
1479 except httpx.HTTPError as e:
1480 logger.error(f"HTTP error getting group members: {e}", exc_info=True)
1481 metrics.failed_calls.add(1, {"operation": "get_group_members"})
1482 raise
1484 async def add_user_to_group(self, user_id: str, group_id: str) -> None:
1485 """
1486 Add user to group via Admin API.
1488 Args:
1489 user_id: User UUID
1490 group_id: Group UUID
1492 Raises:
1493 httpx.HTTPError: If adding user to group fails
1494 """
1495 with tracer.start_as_current_span("keycloak.add_user_to_group") as span:
1496 span.set_attribute("user.uuid", user_id)
1497 span.set_attribute("group.uuid", group_id)
1499 try:
1500 admin_token = await self.get_admin_token()
1502 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1503 headers = {"Authorization": f"Bearer {admin_token}"}
1505 # Keycloak endpoint for adding user to group
1506 url = f"{self.config.admin_url}/users/{user_id}/groups/{group_id}"
1507 response = await client.put(url, headers=headers)
1508 response.raise_for_status()
1510 logger.info(f"Added user {user_id} to group {group_id}")
1511 metrics.successful_calls.add(1, {"operation": "add_user_to_group"})
1513 except httpx.HTTPStatusError as e:
1514 logger.error(
1515 f"Failed to add user to group: {e}",
1516 extra={"status_code": e.response.status_code, "user_id": user_id, "group_id": group_id},
1517 exc_info=True,
1518 )
1519 metrics.failed_calls.add(1, {"operation": "add_user_to_group"})
1520 raise
1521 except httpx.HTTPError as e:
1522 logger.error(f"HTTP error adding user to group: {e}", exc_info=True)
1523 metrics.failed_calls.add(1, {"operation": "add_user_to_group"})
1524 raise
1526 async def issue_token_for_user(
1527 self,
1528 user_id: str,
1529 requested_token_type: str = "urn:ietf:params:oauth:token-type:access_token",
1530 audience: str | None = None,
1531 ) -> dict[str, Any]:
1532 """
1533 Issue JWT token for user using OAuth 2.0 Token Exchange (RFC 8693).
1535 This enables programmatic token issuance for API key → JWT exchange workflows.
1536 Uses Keycloak's token exchange protocol to exchange admin token for user token.
1538 Args:
1539 user_id: User UUID to issue token for
1540 requested_token_type: Type of token requested (default: access_token)
1541 audience: Optional audience for the issued token
1543 Returns:
1544 Token response with access_token, refresh_token, expires_in, etc.
1546 Raises:
1547 httpx.HTTPError: If token exchange fails
1549 Note:
1550 Requires Keycloak realm to have token exchange enabled and admin user
1551 to have appropriate permissions (token-exchange scope).
1552 """
1553 with tracer.start_as_current_span("keycloak.issue_token_for_user") as span:
1554 span.set_attribute("user.uuid", user_id)
1555 span.set_attribute("requested_token_type", requested_token_type)
1557 try:
1558 admin_token = await self.get_admin_token()
1560 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client:
1561 headers = {"Content-Type": "application/x-www-form-urlencoded"}
1563 # OAuth 2.0 Token Exchange (RFC 8693)
1564 # https://www.rfc-editor.org/rfc/rfc8693.html
1565 data = {
1566 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
1567 "client_id": self.config.client_id,
1568 "subject_token": admin_token,
1569 "subject_token_type": "urn:ietf:params:oauth:token-type:access_token",
1570 "requested_token_type": requested_token_type,
1571 "requested_subject": user_id, # User to issue token for
1572 }
1574 if self.config.client_secret: 1574 ↛ 1577line 1574 didn't jump to line 1577 because the condition on line 1574 was always true
1575 data["client_secret"] = self.config.client_secret
1577 if audience: 1577 ↛ 1578line 1577 didn't jump to line 1578 because the condition on line 1577 was never true
1578 data["audience"] = audience
1580 response = await client.post(self.config.token_endpoint, headers=headers, data=data)
1581 response.raise_for_status()
1583 tokens = response.json()
1585 logger.info(f"Issued token for user {user_id}")
1586 metrics.successful_calls.add(1, {"operation": "issue_token_for_user"})
1588 return tokens # type: ignore[no-any-return]
1590 except httpx.HTTPStatusError as e:
1591 logger.error(
1592 f"Failed to issue token for user: {e}",
1593 extra={"status_code": e.response.status_code, "user_id": user_id, "detail": e.response.text},
1594 exc_info=True,
1595 )
1596 metrics.failed_calls.add(1, {"operation": "issue_token_for_user"})
1597 raise
1598 except httpx.HTTPError as e:
1599 logger.error(f"HTTP error issuing token for user: {e}", exc_info=True)
1600 metrics.failed_calls.add(1, {"operation": "issue_token_for_user"})
1601 raise
1604async def sync_user_to_openfga(
1605 keycloak_user: KeycloakUser, openfga_client: Any, role_mapper: Any | None = None, use_legacy_mapping: bool = False
1606) -> None:
1607 """
1608 Synchronize Keycloak user roles/groups to OpenFGA tuples
1610 Supports two modes:
1611 1. RoleMapper (recommended): Uses configurable YAML-based mapping
1612 2. Legacy (backward compatible): Uses hardcoded mapping rules
1614 Args:
1615 keycloak_user: Keycloak user to sync
1616 openfga_client: OpenFGA client instance
1617 role_mapper: RoleMapper instance (optional, will use default if None)
1618 use_legacy_mapping: Use legacy hardcoded mapping (default: False)
1619 """
1620 with tracer.start_as_current_span("keycloak.sync_to_openfga") as span:
1621 span.set_attribute("user.id", keycloak_user.user_id)
1622 span.set_attribute("mapping.mode", "legacy" if use_legacy_mapping else "role_mapper")
1624 tuples = []
1626 if use_legacy_mapping: 1626 ↛ 1628line 1626 didn't jump to line 1628 because the condition on line 1626 was never true
1627 # Legacy hardcoded mapping for backward compatibility
1628 logger.info("Using legacy role mapping")
1630 # Map realm roles
1631 if "admin" in keycloak_user.realm_roles:
1632 tuples.append({"user": keycloak_user.user_id, "relation": "admin", "object": "system:global"})
1634 # Map groups to organizations
1635 for group in keycloak_user.groups:
1636 org_name = group.strip("/").split("/")[-1]
1637 if org_name:
1638 tuples.append({"user": keycloak_user.user_id, "relation": "member", "object": f"organization:{org_name}"})
1640 # Map client roles
1641 client_roles = keycloak_user.client_roles.get(keycloak_user.client_id, []) # type: ignore[attr-defined]
1642 for role in client_roles:
1643 tuples.append({"user": keycloak_user.user_id, "relation": "assignee", "object": f"role:{role}"})
1645 # Map premium role for backward compatibility
1646 if "premium" in keycloak_user.realm_roles or "premium" in client_roles:
1647 tuples.append({"user": keycloak_user.user_id, "relation": "assignee", "object": "role:premium"})
1649 else:
1650 # Use RoleMapper for flexible, configurable mapping
1651 if role_mapper is None: 1651 ↛ 1657line 1651 didn't jump to line 1657 because the condition on line 1651 was always true
1652 # Import here to avoid circular dependency
1653 from mcp_server_langgraph.auth.role_mapper import RoleMapper
1655 role_mapper = RoleMapper() # Uses default config
1657 logger.info("Using RoleMapper for role mapping")
1658 tuples = await role_mapper.map_user_to_tuples(keycloak_user)
1660 # Write tuples to OpenFGA
1661 if tuples:
1662 try:
1663 await openfga_client.write_tuples(tuples)
1664 logger.info(f"Synced {len(tuples)} tuples to OpenFGA for user {keycloak_user.username}")
1665 except Exception as e:
1666 logger.error(f"Failed to sync user to OpenFGA: {e}", exc_info=True)
1667 raise
1668 else:
1669 logger.info(f"No tuples to sync for user {keycloak_user.username}")