Coverage for src / mcp_server_langgraph / auth / keycloak.py: 76%

783 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Keycloak integration for authentication and user management 

3 

4Provides production-ready authentication using Keycloak as the identity provider. 

5Supports multiple authentication flows, token verification, and role/group mapping 

6to OpenFGA for fine-grained authorization. 

7""" 

8 

9from datetime import datetime, timedelta, UTC 

10from typing import Any 

11from urllib.parse import urlparse 

12 

13import httpx 

14import jwt 

15from pydantic import BaseModel, Field, field_validator 

16 

17from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer 

18 

19 

20class KeycloakUser(BaseModel): 

21 """Keycloak user model""" 

22 

23 id: str = Field(description="Keycloak user ID (UUID)") 

24 username: str = Field(description="Username") 

25 email: str | None = Field(default=None, description="Email address") 

26 first_name: str | None = Field(default=None, description="First name") 

27 last_name: str | None = Field(default=None, description="Last name") 

28 enabled: bool = Field(default=True, description="Whether user account is enabled") 

29 email_verified: bool = Field(default=False, description="Whether email is verified") 

30 realm_roles: list[str] = Field(default_factory=list, description="Realm-level roles") 

31 client_roles: dict[str, list[str]] = Field(default_factory=dict, description="Client-specific roles") 

32 groups: list[str] = Field(default_factory=list, description="Group memberships") 

33 attributes: dict[str, Any] = Field(default_factory=dict, description="Custom attributes") 

34 

35 @property 

36 def user_id(self) -> str: 

37 """Get user ID in format compatible with OpenFGA""" 

38 return f"user:{self.username}" 

39 

40 @property 

41 def full_name(self) -> str: 

42 """Get full name""" 

43 parts = filter(None, [self.first_name, self.last_name]) 

44 return " ".join(parts) or self.username 

45 

46 

47class KeycloakConfig(BaseModel): 

48 """Keycloak configuration""" 

49 

50 server_url: str = Field(description="Keycloak server URL (e.g., http://localhost:8180)") 

51 realm: str = Field(description="Keycloak realm name") 

52 client_id: str = Field(description="OAuth2/OIDC client ID") 

53 client_secret: str | None = Field(default=None, description="OAuth2/OIDC client secret") 

54 admin_username: str | None = Field(default=None, description="Admin username for admin API access") 

55 admin_password: str | None = Field(default=None, description="Admin password for admin API access") 

56 verify_ssl: bool = Field(default=True, description="Verify SSL certificates") 

57 timeout: int = Field(default=30, description="HTTP request timeout in seconds") 

58 

59 @field_validator("server_url") 

60 @classmethod 

61 def validate_server_url(cls, v: str) -> str: 

62 """ 

63 Validate server_url to prevent SSRF attacks. 

64 

65 Ensures: 

66 - URL uses http or https scheme only 

67 - URL has a valid hostname 

68 - URL is normalized (no trailing slash) 

69 

70 Security: This prevents SSRF by rejecting file://, gopher://, dict://, 

71 and other dangerous URL schemes that could be used to access internal 

72 network resources or local files. 

73 """ 

74 parsed = urlparse(v) 

75 

76 # Validate scheme (SSRF mitigation - only allow http/https) 

77 if parsed.scheme not in ("http", "https"): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true

78 msg = ( 

79 f"Invalid URL scheme '{parsed.scheme}'. Only 'http' and 'https' are allowed. " 

80 "This restriction prevents SSRF attacks via dangerous URL schemes." 

81 ) 

82 raise ValueError(msg) 

83 

84 # Validate hostname exists 

85 if not parsed.netloc: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 msg = f"Invalid URL '{v}': missing hostname. Expected format: http(s)://hostname[:port]" 

87 raise ValueError(msg) 

88 

89 # Normalize URL (remove trailing slash for consistency) 

90 normalized = v.rstrip("/") 

91 return normalized 

92 

93 @property 

94 def realm_url(self) -> str: 

95 """Get realm base URL""" 

96 return f"{self.server_url}/realms/{self.realm}" 

97 

98 @property 

99 def admin_url(self) -> str: 

100 """Get admin API base URL""" 

101 return f"{self.server_url}/admin/realms/{self.realm}" 

102 

103 @property 

104 def token_endpoint(self) -> str: 

105 """Get token endpoint URL""" 

106 return f"{self.realm_url}/protocol/openid-connect/token" 

107 

108 @property 

109 def userinfo_endpoint(self) -> str: 

110 """Get userinfo endpoint URL""" 

111 return f"{self.realm_url}/protocol/openid-connect/userinfo" 

112 

113 @property 

114 def jwks_uri(self) -> str: 

115 """Get JWKS URI for public key retrieval""" 

116 return f"{self.realm_url}/protocol/openid-connect/certs" 

117 

118 @property 

119 def well_known_url(self) -> str: 

120 """Get OpenID configuration URL""" 

121 return f"{self.realm_url}/.well-known/openid-configuration" 

122 

123 

124class TokenValidator: 

125 """JWT token validator using Keycloak JWKS""" 

126 

127 def __init__(self, config: KeycloakConfig) -> None: 

128 self.config = config 

129 self._jwks_cache: dict[str, Any] | None = None 

130 self._jwks_cache_time: datetime | None = None 

131 self._cache_ttl = timedelta(hours=1) 

132 

133 async def get_jwks(self, force_refresh: bool = False) -> dict[str, Any]: 

134 """ 

135 Get JSON Web Key Set from Keycloak 

136 

137 Args: 

138 force_refresh: Force refresh of cached keys 

139 

140 Returns: 

141 JWKS dictionary 

142 """ 

143 with tracer.start_as_current_span("keycloak.get_jwks"): 

144 # Check cache 

145 if not force_refresh and self._jwks_cache and self._jwks_cache_time: 

146 if datetime.now(UTC) - self._jwks_cache_time < self._cache_ttl: 146 ↛ 151line 146 didn't jump to line 151 because the condition on line 146 was always true

147 logger.debug("Using cached JWKS") 

148 return self._jwks_cache 

149 

150 # Fetch from Keycloak 

151 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

152 try: 

153 response = await client.get(self.config.jwks_uri) 

154 response.raise_for_status() 

155 jwks = response.json() 

156 

157 # Cache the result 

158 self._jwks_cache = jwks 

159 self._jwks_cache_time = datetime.now(UTC) 

160 

161 logger.info("JWKS fetched and cached", extra={"keys_count": len(jwks.get("keys", []))}) 

162 return jwks # type: ignore[no-any-return] 

163 

164 except httpx.HTTPError as e: 

165 logger.error(f"Failed to fetch JWKS: {e}", exc_info=True) 

166 metrics.failed_calls.add(1, {"operation": "get_jwks"}) 

167 raise 

168 

169 async def verify_token(self, token: str) -> dict[str, Any]: 

170 """ 

171 Verify JWT token using Keycloak public keys 

172 

173 Args: 

174 token: JWT access token 

175 

176 Returns: 

177 Decoded token payload 

178 

179 Raises: 

180 jwt.InvalidTokenError: If token is invalid 

181 jwt.ExpiredSignatureError: If token is expired 

182 """ 

183 with tracer.start_as_current_span("keycloak.verify_token") as span: 

184 try: 

185 # Decode header to get key ID 

186 unverified_header = jwt.get_unverified_header(token) 

187 kid = unverified_header.get("kid") 

188 

189 if not kid: 

190 msg = "Token missing 'kid' in header" 

191 raise jwt.InvalidTokenError(msg) 

192 

193 span.set_attribute("token.kid", kid) 

194 

195 # Get JWKS 

196 jwks = await self.get_jwks() 

197 

198 # Find matching key 

199 public_key = None 

200 for key_data in jwks.get("keys", []): 

201 if key_data.get("kid") == kid: 201 ↛ 200line 201 didn't jump to line 200 because the condition on line 201 was always true

202 public_key = jwt.algorithms.RSAAlgorithm.from_jwk(key_data) 

203 break 

204 

205 if not public_key: 

206 # Try refreshing JWKS in case keys were rotated 

207 logger.warning(f"Key ID {kid} not found, refreshing JWKS") 

208 jwks = await self.get_jwks(force_refresh=True) 

209 

210 for key_data in jwks.get("keys", []): 210 ↛ 211line 210 didn't jump to line 211 because the loop on line 210 never started

211 if key_data.get("kid") == kid: 

212 public_key = jwt.algorithms.RSAAlgorithm.from_jwk(key_data) 

213 break 

214 

215 if not public_key: 

216 msg = f"Public key not found for kid: {kid}" 

217 raise jwt.InvalidTokenError(msg) 

218 

219 # Verify and decode token 

220 payload = jwt.decode( 

221 token, 

222 public_key, # type: ignore[arg-type] 

223 algorithms=["RS256"], 

224 audience=self.config.client_id, 

225 options={ 

226 "verify_signature": True, 

227 "verify_exp": True, 

228 "verify_aud": True, 

229 }, 

230 ) 

231 

232 span.set_attribute("token.sub", payload.get("sub")) 

233 span.set_attribute("token.preferred_username", payload.get("preferred_username")) 

234 

235 logger.info( 

236 "Token verified successfully", 

237 extra={"sub": payload.get("sub"), "username": payload.get("preferred_username")}, 

238 ) 

239 

240 metrics.successful_calls.add(1, {"operation": "verify_token"}) 

241 

242 return payload # type: ignore[no-any-return] 

243 

244 except jwt.ExpiredSignatureError: 

245 logger.warning("Token expired") 

246 metrics.auth_failures.add(1, {"reason": "expired_token"}) 

247 raise 

248 except jwt.InvalidTokenError as e: 

249 logger.warning(f"Invalid token: {e}") 

250 metrics.auth_failures.add(1, {"reason": "invalid_token"}) 

251 raise 

252 except Exception as e: 

253 logger.error(f"Token verification error: {e}", exc_info=True) 

254 metrics.failed_calls.add(1, {"operation": "verify_token"}) 

255 raise 

256 

257 

258class KeycloakClient: 

259 """ 

260 Keycloak client for authentication and user management 

261 

262 Provides methods for: 

263 - Token verification (OIDC) 

264 - User authentication (ROPC flow) 

265 - Token refresh 

266 - User info retrieval 

267 - Admin API operations 

268 """ 

269 

270 def __init__(self, config: KeycloakConfig) -> None: 

271 """ 

272 Initialize Keycloak client 

273 

274 Args: 

275 config: Keycloak configuration 

276 """ 

277 self.config = config 

278 self.token_validator = TokenValidator(config) 

279 self._admin_token: str | None = None 

280 self._admin_token_expiry: datetime | None = None 

281 

282 async def authenticate_user(self, username: str, password: str) -> dict[str, Any]: 

283 """ 

284 Authenticate user using Resource Owner Password Credentials (ROPC) flow 

285 

286 Args: 

287 username: Username 

288 password: Password 

289 

290 Returns: 

291 Token response with access_token, refresh_token, etc. 

292 

293 Raises: 

294 httpx.HTTPError: If authentication fails 

295 """ 

296 with tracer.start_as_current_span("keycloak.authenticate_user") as span: 

297 span.set_attribute("user.name", username) 

298 

299 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

300 try: 

301 data = { 

302 "grant_type": "password", 

303 "client_id": self.config.client_id, 

304 "username": username, 

305 "password": password, 

306 } 

307 

308 if self.config.client_secret: 308 ↛ 311line 308 didn't jump to line 311 because the condition on line 308 was always true

309 data["client_secret"] = self.config.client_secret 

310 

311 response = await client.post(self.config.token_endpoint, data=data) 

312 response.raise_for_status() 

313 

314 tokens = response.json() 

315 

316 logger.info("User authenticated successfully", extra={"username": username}) 

317 metrics.successful_calls.add(1, {"operation": "authenticate_user"}) 

318 

319 return tokens # type: ignore[no-any-return] 

320 

321 except httpx.HTTPStatusError as e: 

322 logger.warning( 

323 f"Authentication failed for {username}", 

324 extra={"status_code": e.response.status_code, "detail": e.response.text}, 

325 ) 

326 metrics.auth_failures.add(1, {"reason": "invalid_credentials"}) 

327 raise 

328 except httpx.HTTPError as e: 

329 logger.error(f"Authentication error: {e}", exc_info=True) 

330 metrics.failed_calls.add(1, {"operation": "authenticate_user"}) 

331 raise 

332 

333 async def verify_token(self, token: str) -> dict[str, Any]: 

334 """ 

335 Verify JWT token 

336 

337 Args: 

338 token: JWT access token 

339 

340 Returns: 

341 Decoded token payload 

342 """ 

343 return await self.token_validator.verify_token(token) 

344 

345 async def refresh_token(self, refresh_token: str) -> dict[str, Any]: 

346 """ 

347 Refresh access token using refresh token 

348 

349 Args: 

350 refresh_token: Refresh token 

351 

352 Returns: 

353 New token response with access_token, refresh_token, etc. 

354 """ 

355 with tracer.start_as_current_span("keycloak.refresh_token"): 

356 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

357 try: 

358 data = { 

359 "grant_type": "refresh_token", 

360 "client_id": self.config.client_id, 

361 "refresh_token": refresh_token, 

362 } 

363 

364 if self.config.client_secret: 364 ↛ 367line 364 didn't jump to line 367 because the condition on line 364 was always true

365 data["client_secret"] = self.config.client_secret 

366 

367 response = await client.post(self.config.token_endpoint, data=data) 

368 response.raise_for_status() 

369 

370 tokens = response.json() 

371 

372 logger.info("Token refreshed successfully") 

373 metrics.successful_calls.add(1, {"operation": "refresh_token"}) 

374 

375 return tokens # type: ignore[no-any-return] 

376 

377 except httpx.HTTPStatusError as e: 

378 logger.warning("Token refresh failed", extra={"status_code": e.response.status_code}) 

379 metrics.auth_failures.add(1, {"reason": "refresh_failed"}) 

380 raise 

381 except httpx.HTTPError as e: 

382 logger.error(f"Token refresh error: {e}", exc_info=True) 

383 metrics.failed_calls.add(1, {"operation": "refresh_token"}) 

384 raise 

385 

386 async def get_userinfo(self, access_token: str) -> dict[str, Any]: 

387 """ 

388 Get user information from userinfo endpoint 

389 

390 Args: 

391 access_token: Access token 

392 

393 Returns: 

394 User information dictionary 

395 """ 

396 with tracer.start_as_current_span("keycloak.get_userinfo"): 

397 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

398 try: 

399 headers = {"Authorization": f"Bearer {access_token}"} 

400 

401 response = await client.get(self.config.userinfo_endpoint, headers=headers) 

402 response.raise_for_status() 

403 

404 userinfo = response.json() 

405 

406 logger.info("User info retrieved", extra={"sub": userinfo.get("sub")}) 

407 metrics.successful_calls.add(1, {"operation": "get_userinfo"}) 

408 

409 return userinfo # type: ignore[no-any-return] 

410 

411 except httpx.HTTPError as e: 

412 logger.error(f"Failed to get user info: {e}", exc_info=True) 

413 metrics.failed_calls.add(1, {"operation": "get_userinfo"}) 

414 raise 

415 

416 async def get_admin_token(self) -> str: 

417 """ 

418 Get admin access token for admin API calls 

419 

420 Returns: 

421 Admin access token 

422 """ 

423 # Check if we have a valid cached token 

424 if self._admin_token and self._admin_token_expiry: 

425 if datetime.now(UTC) < self._admin_token_expiry - timedelta(minutes=1): 425 ↛ 430line 425 didn't jump to line 430 because the condition on line 425 was always true

426 assert self._admin_token is not None # Guaranteed by condition above 

427 return self._admin_token 

428 

429 # Get new admin token 

430 with tracer.start_as_current_span("keycloak.get_admin_token"): 

431 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

432 try: 

433 data = { 

434 "grant_type": "password", 

435 "client_id": "admin-cli", 

436 "username": self.config.admin_username, 

437 "password": self.config.admin_password, 

438 } 

439 

440 # Admin token endpoint is at master realm 

441 admin_token_url = f"{self.config.server_url}/realms/master/protocol/openid-connect/token" 

442 

443 response = await client.post(admin_token_url, data=data) 

444 response.raise_for_status() 

445 

446 tokens = response.json() 

447 self._admin_token = tokens["access_token"] 

448 expires_in = tokens.get("expires_in", 300) # Default 5 min 

449 self._admin_token_expiry = datetime.now(UTC) + timedelta(seconds=expires_in) 

450 

451 logger.info("Admin token obtained") 

452 assert self._admin_token is not None # Just set on line above 

453 return self._admin_token 

454 

455 except httpx.HTTPError as e: 

456 logger.error(f"Failed to get admin token: {e}", exc_info=True) 

457 raise 

458 

459 async def get_user_by_username(self, username: str) -> KeycloakUser | None: 

460 """ 

461 Get user by username using admin API 

462 

463 Args: 

464 username: Username to search for 

465 

466 Returns: 

467 KeycloakUser if found, None otherwise 

468 """ 

469 with tracer.start_as_current_span("keycloak.get_user_by_username") as span: 

470 span.set_attribute("user.name", username) 

471 

472 try: 

473 admin_token = await self.get_admin_token() 

474 

475 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

476 headers = {"Authorization": f"Bearer {admin_token}"} 

477 params = {"username": username, "exact": "true"} 

478 

479 url = f"{self.config.admin_url}/users" 

480 response = await client.get(url, headers=headers, params=params) 

481 response.raise_for_status() 

482 

483 users = response.json() 

484 

485 if not users: 

486 logger.info(f"User not found: {username}") 

487 return None 

488 

489 user_data = users[0] 

490 

491 # Get user roles 

492 user_id = user_data["id"] 

493 realm_roles = await self._get_user_realm_roles(user_id, admin_token) 

494 client_roles = await self._get_user_client_roles(user_id, admin_token) 

495 groups = await self._get_user_groups(user_id, admin_token) 

496 

497 # Build KeycloakUser 

498 keycloak_user = KeycloakUser( 

499 id=user_id, 

500 username=user_data["username"], 

501 email=user_data.get("email"), 

502 first_name=user_data.get("firstName"), 

503 last_name=user_data.get("lastName"), 

504 enabled=user_data.get("enabled", True), 

505 email_verified=user_data.get("emailVerified", False), 

506 realm_roles=realm_roles, 

507 client_roles=client_roles, 

508 groups=groups, 

509 attributes=user_data.get("attributes", {}), 

510 ) 

511 

512 logger.info(f"User retrieved: {username}", extra={"user_id": user_id}) 

513 return keycloak_user 

514 

515 except httpx.HTTPError as e: 

516 logger.error(f"Failed to get user: {e}", exc_info=True) 

517 return None 

518 

519 async def _get_user_realm_roles(self, user_id: str, admin_token: str) -> list[str]: 

520 """Get user's realm-level roles""" 

521 try: 

522 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

523 headers = {"Authorization": f"Bearer {admin_token}"} 

524 url = f"{self.config.admin_url}/users/{user_id}/role-mappings/realm" 

525 

526 response = await client.get(url, headers=headers) 

527 response.raise_for_status() 

528 

529 roles = response.json() 

530 return [role["name"] for role in roles] 

531 except httpx.HTTPError: 

532 logger.warning(f"Failed to get realm roles for user {user_id}") 

533 return [] 

534 

535 async def _get_user_client_roles(self, user_id: str, admin_token: str) -> dict[str, list[str]]: 

536 """Get user's client-level roles""" 

537 try: 

538 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

539 headers = {"Authorization": f"Bearer {admin_token}"} 

540 

541 # Get all clients 

542 clients_url = f"{self.config.admin_url}/clients" 

543 response = await client.get(clients_url, headers=headers) 

544 response.raise_for_status() 

545 clients = response.json() 

546 

547 client_roles = {} 

548 

549 # Get roles for our specific client 

550 for client_data in clients: 

551 if client_data.get("clientId") == self.config.client_id: 

552 client_id = client_data["id"] 

553 roles_url = f"{self.config.admin_url}/users/{user_id}/role-mappings/clients/{client_id}" 

554 

555 roles_response = await client.get(roles_url, headers=headers) 

556 if roles_response.status_code == 200: 556 ↛ 550line 556 didn't jump to line 550 because the condition on line 556 was always true

557 roles = roles_response.json() 

558 client_roles[self.config.client_id] = [role["name"] for role in roles] 

559 

560 return client_roles 

561 

562 except httpx.HTTPError: 

563 logger.warning(f"Failed to get client roles for user {user_id}") 

564 return {} 

565 

566 async def _get_user_groups(self, user_id: str, admin_token: str) -> list[str]: 

567 """Get user's group memberships""" 

568 try: 

569 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

570 headers = {"Authorization": f"Bearer {admin_token}"} 

571 url = f"{self.config.admin_url}/users/{user_id}/groups" 

572 

573 response = await client.get(url, headers=headers) 

574 response.raise_for_status() 

575 

576 groups = response.json() 

577 return [group.get("path", group.get("name", "")) for group in groups] 

578 

579 except httpx.HTTPError: 

580 logger.warning(f"Failed to get groups for user {user_id}") 

581 return [] 

582 

583 # Admin API Methods (stubs for new functionality) 

584 # These methods interact with Keycloak Admin API for user/client management 

585 

586 async def create_client(self, client_config: dict[str, Any]) -> str: 

587 """ 

588 Create Keycloak client via Admin API. 

589 

590 Args: 

591 client_config: Client configuration dictionary with fields: 

592 - clientId: Client ID (required) 

593 - name: Display name 

594 - description: Description 

595 - enabled: Whether client is enabled (default: True) 

596 - serviceAccountsEnabled: Enable service accounts 

597 - standardFlowEnabled: Enable authorization code flow 

598 - directAccessGrantsEnabled: Enable ROPC flow 

599 - implicitFlowEnabled: Enable implicit flow 

600 - publicClient: Public vs confidential client 

601 - clientAuthenticatorType: Authentication type (e.g., "client-secret") 

602 - secret: Client secret (for confidential clients) 

603 - attributes: Custom attributes 

604 

605 Returns: 

606 Client UUID from Keycloak 

607 

608 Raises: 

609 httpx.HTTPError: If client creation fails 

610 """ 

611 with tracer.start_as_current_span("keycloak.create_client") as span: 

612 span.set_attribute("client.id", client_config.get("clientId", "unknown")) 

613 

614 try: 

615 admin_token = await self.get_admin_token() 

616 

617 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

618 headers = { 

619 "Authorization": f"Bearer {admin_token}", 

620 "Content-Type": "application/json", 

621 } 

622 

623 url = f"{self.config.admin_url}/clients" 

624 response = await client.post(url, headers=headers, json=client_config) 

625 response.raise_for_status() 

626 

627 # Extract client UUID from Location header 

628 location = response.headers.get("Location", "") 

629 client_uuid = location.split("/")[-1] if location else "" 

630 

631 logger.info( 

632 f"Keycloak client created: {client_config.get('clientId')}", 

633 extra={"client_id": client_config.get("clientId"), "uuid": client_uuid}, 

634 ) 

635 

636 metrics.successful_calls.add(1, {"operation": "create_client"}) 

637 

638 return client_uuid 

639 

640 except httpx.HTTPStatusError as e: 

641 logger.error( 

642 f"Failed to create client: {e}", 

643 extra={"status_code": e.response.status_code, "detail": e.response.text}, 

644 exc_info=True, 

645 ) 

646 metrics.failed_calls.add(1, {"operation": "create_client"}) 

647 raise 

648 except httpx.HTTPError as e: 

649 logger.error(f"HTTP error creating client: {e}", exc_info=True) 

650 metrics.failed_calls.add(1, {"operation": "create_client"}) 

651 raise 

652 

653 async def create_user(self, user_config: dict[str, Any]) -> str: 

654 """ 

655 Create Keycloak user via Admin API. 

656 

657 Args: 

658 user_config: User configuration dictionary with fields: 

659 - username: Username (required) 

660 - email: Email address 

661 - emailVerified: Whether email is verified 

662 - enabled: Whether user is enabled (default: True) 

663 - firstName: First name 

664 - lastName: Last name 

665 - attributes: Custom attributes dictionary 

666 - credentials: List of credentials (e.g., password) 

667 

668 Returns: 

669 User UUID from Keycloak 

670 

671 Raises: 

672 httpx.HTTPError: If user creation fails 

673 """ 

674 with tracer.start_as_current_span("keycloak.create_user") as span: 

675 span.set_attribute("user.username", user_config.get("username", "unknown")) 

676 

677 try: 

678 admin_token = await self.get_admin_token() 

679 

680 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

681 headers = { 

682 "Authorization": f"Bearer {admin_token}", 

683 "Content-Type": "application/json", 

684 } 

685 

686 url = f"{self.config.admin_url}/users" 

687 response = await client.post(url, headers=headers, json=user_config) 

688 response.raise_for_status() 

689 

690 # Extract user UUID from Location header 

691 location = response.headers.get("Location", "") 

692 user_uuid = location.split("/")[-1] if location else "" 

693 

694 logger.info( 

695 f"Keycloak user created: {user_config.get('username')}", 

696 extra={"username": user_config.get("username"), "uuid": user_uuid}, 

697 ) 

698 

699 metrics.successful_calls.add(1, {"operation": "create_user"}) 

700 

701 return user_uuid 

702 

703 except httpx.HTTPStatusError as e: 

704 logger.error( 

705 f"Failed to create user: {e}", 

706 extra={"status_code": e.response.status_code, "detail": e.response.text}, 

707 exc_info=True, 

708 ) 

709 metrics.failed_calls.add(1, {"operation": "create_user"}) 

710 raise 

711 except httpx.HTTPError as e: 

712 logger.error(f"HTTP error creating user: {e}", exc_info=True) 

713 metrics.failed_calls.add(1, {"operation": "create_user"}) 

714 raise 

715 

716 async def update_user(self, user_id: str, user_config: dict[str, Any]) -> None: 

717 """ 

718 Update Keycloak user via Admin API. 

719 

720 Args: 

721 user_id: User UUID to update 

722 user_config: Dictionary of user properties to update (camelCase format expected by Keycloak) 

723 

724 Raises: 

725 httpx.HTTPError: If user update fails 

726 """ 

727 with tracer.start_as_current_span("keycloak.update_user") as span: 

728 span.set_attribute("user.uuid", user_id) 

729 

730 try: 

731 admin_token = await self.get_admin_token() 

732 

733 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

734 headers = { 

735 "Authorization": f"Bearer {admin_token}", 

736 "Content-Type": "application/json", 

737 } 

738 

739 url = f"{self.config.admin_url}/users/{user_id}" 

740 response = await client.put(url, headers=headers, json=user_config) 

741 response.raise_for_status() 

742 

743 logger.info(f"Updated user {user_id}") 

744 metrics.successful_calls.add(1, {"operation": "update_user"}) 

745 

746 except httpx.HTTPStatusError as e: 

747 logger.error( 

748 f"Failed to update user: {e}", 

749 extra={"status_code": e.response.status_code, "user_id": user_id}, 

750 exc_info=True, 

751 ) 

752 metrics.failed_calls.add(1, {"operation": "update_user"}) 

753 raise 

754 except httpx.HTTPError as e: 

755 logger.error(f"HTTP error updating user: {e}", exc_info=True) 

756 metrics.failed_calls.add(1, {"operation": "update_user"}) 

757 raise 

758 

759 async def set_user_password(self, user_id: str, password: str, temporary: bool = False) -> None: 

760 """ 

761 Set user password via Admin API. 

762 

763 Args: 

764 user_id: User UUID 

765 password: New password to set 

766 temporary: If True, user must change password on next login 

767 

768 Raises: 

769 httpx.HTTPError: If password update fails 

770 """ 

771 with tracer.start_as_current_span("keycloak.set_user_password") as span: 

772 span.set_attribute("user.uuid", user_id) 

773 span.set_attribute("password.temporary", temporary) 

774 

775 try: 

776 admin_token = await self.get_admin_token() 

777 

778 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

779 headers = { 

780 "Authorization": f"Bearer {admin_token}", 

781 "Content-Type": "application/json", 

782 } 

783 

784 # Keycloak password reset endpoint 

785 url = f"{self.config.admin_url}/users/{user_id}/reset-password" 

786 password_config = { 

787 "type": "password", 

788 "value": password, 

789 "temporary": temporary, 

790 } 

791 

792 response = await client.put(url, headers=headers, json=password_config) 

793 response.raise_for_status() 

794 

795 logger.info(f"Set password for user {user_id} (temporary={temporary})") 

796 metrics.successful_calls.add(1, {"operation": "set_user_password"}) 

797 

798 except httpx.HTTPStatusError as e: 

799 logger.error( 

800 f"Failed to set user password: {e}", 

801 extra={"status_code": e.response.status_code, "user_id": user_id}, 

802 exc_info=True, 

803 ) 

804 metrics.failed_calls.add(1, {"operation": "set_user_password"}) 

805 raise 

806 except httpx.HTTPError as e: 

807 logger.error(f"HTTP error setting user password: {e}", exc_info=True) 

808 metrics.failed_calls.add(1, {"operation": "set_user_password"}) 

809 raise 

810 

811 async def get_user(self, user_id: str) -> dict[str, Any] | None: 

812 """ 

813 Get user by ID via Admin API. 

814 

815 Args: 

816 user_id: User UUID 

817 

818 Returns: 

819 User data dictionary or None if not found 

820 

821 Raises: 

822 httpx.HTTPError: If getting user fails (except 404 which returns None) 

823 """ 

824 with tracer.start_as_current_span("keycloak.get_user") as span: 

825 span.set_attribute("user.uuid", user_id) 

826 

827 try: 

828 admin_token = await self.get_admin_token() 

829 

830 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

831 headers = {"Authorization": f"Bearer {admin_token}"} 

832 

833 url = f"{self.config.admin_url}/users/{user_id}" 

834 response = await client.get(url, headers=headers) 

835 response.raise_for_status() 

836 

837 user_data = response.json() 

838 

839 logger.info(f"Retrieved user {user_id}") 

840 metrics.successful_calls.add(1, {"operation": "get_user"}) 

841 

842 return user_data # type: ignore[no-any-return] 

843 

844 except httpx.HTTPStatusError as e: 

845 # Return None for 404 (user not found) 

846 if e.response.status_code == 404: 846 ↛ 850line 846 didn't jump to line 850 because the condition on line 846 was always true

847 logger.info(f"User {user_id} not found") 

848 return None 

849 

850 logger.error( 

851 f"Failed to get user: {e}", 

852 extra={"status_code": e.response.status_code, "user_id": user_id}, 

853 exc_info=True, 

854 ) 

855 metrics.failed_calls.add(1, {"operation": "get_user"}) 

856 raise 

857 except httpx.HTTPError as e: 

858 logger.error(f"HTTP error getting user: {e}", exc_info=True) 

859 metrics.failed_calls.add(1, {"operation": "get_user"}) 

860 raise 

861 

862 async def get_user_attributes(self, user_id: str) -> dict[str, Any]: 

863 """ 

864 Get user custom attributes via Admin API. 

865 

866 Args: 

867 user_id: User UUID 

868 

869 Returns: 

870 Dictionary of user attributes 

871 

872 Raises: 

873 httpx.HTTPError: If getting user attributes fails 

874 """ 

875 with tracer.start_as_current_span("keycloak.get_user_attributes") as span: 

876 span.set_attribute("user.uuid", user_id) 

877 

878 try: 

879 admin_token = await self.get_admin_token() 

880 

881 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

882 headers = {"Authorization": f"Bearer {admin_token}"} 

883 

884 url = f"{self.config.admin_url}/users/{user_id}" 

885 response = await client.get(url, headers=headers) 

886 response.raise_for_status() 

887 

888 user_data = response.json() 

889 attributes = user_data.get("attributes", {}) 

890 

891 logger.info(f"Retrieved attributes for user {user_id}") 

892 metrics.successful_calls.add(1, {"operation": "get_user_attributes"}) 

893 

894 return attributes # type: ignore[no-any-return] 

895 

896 except httpx.HTTPStatusError as e: 

897 logger.error( 

898 f"Failed to get user attributes: {e}", 

899 extra={"status_code": e.response.status_code, "user_id": user_id}, 

900 exc_info=True, 

901 ) 

902 metrics.failed_calls.add(1, {"operation": "get_user_attributes"}) 

903 raise 

904 except httpx.HTTPError as e: 

905 logger.error(f"HTTP error getting user attributes: {e}", exc_info=True) 

906 metrics.failed_calls.add(1, {"operation": "get_user_attributes"}) 

907 raise 

908 

909 async def update_user_attributes(self, user_id: str, attributes: dict[str, Any]) -> None: 

910 """ 

911 Update user custom attributes via Admin API. 

912 

913 Args: 

914 user_id: User UUID 

915 attributes: Dictionary of attributes to update 

916 

917 Raises: 

918 httpx.HTTPError: If updating user attributes fails 

919 """ 

920 with tracer.start_as_current_span("keycloak.update_user_attributes") as span: 

921 span.set_attribute("user.uuid", user_id) 

922 span.set_attribute("attributes.count", len(attributes)) 

923 

924 try: 

925 admin_token = await self.get_admin_token() 

926 

927 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

928 headers = { 

929 "Authorization": f"Bearer {admin_token}", 

930 "Content-Type": "application/json", 

931 } 

932 

933 # Keycloak requires full user object in PUT request 

934 # We need to get the current user first, then update attributes 

935 get_url = f"{self.config.admin_url}/users/{user_id}" 

936 get_response = await client.get(get_url, headers=headers) 

937 get_response.raise_for_status() 

938 

939 user_data = get_response.json() 

940 user_data["attributes"] = attributes 

941 

942 # Update user with new attributes 

943 put_url = f"{self.config.admin_url}/users/{user_id}" 

944 put_response = await client.put(put_url, headers=headers, json=user_data) 

945 put_response.raise_for_status() 

946 

947 logger.info(f"Updated attributes for user {user_id}") 

948 metrics.successful_calls.add(1, {"operation": "update_user_attributes"}) 

949 

950 except httpx.HTTPStatusError as e: 

951 logger.error( 

952 f"Failed to update user attributes: {e}", 

953 extra={"status_code": e.response.status_code, "user_id": user_id}, 

954 exc_info=True, 

955 ) 

956 metrics.failed_calls.add(1, {"operation": "update_user_attributes"}) 

957 raise 

958 except httpx.HTTPError as e: 

959 logger.error(f"HTTP error updating user attributes: {e}", exc_info=True) 

960 metrics.failed_calls.add(1, {"operation": "update_user_attributes"}) 

961 raise 

962 

963 async def update_client_attributes(self, client_id: str, attributes: dict[str, Any]) -> None: 

964 """ 

965 Update client attributes via Admin API. 

966 

967 Args: 

968 client_id: Client UUID (not clientId) 

969 attributes: Dictionary of attributes to update 

970 

971 Raises: 

972 httpx.HTTPError: If client attribute update fails 

973 """ 

974 with tracer.start_as_current_span("keycloak.update_client_attributes") as span: 

975 span.set_attribute("client.uuid", client_id) 

976 

977 try: 

978 admin_token = await self.get_admin_token() 

979 

980 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

981 headers = { 

982 "Authorization": f"Bearer {admin_token}", 

983 "Content-Type": "application/json", 

984 } 

985 

986 # Get current client data 

987 get_url = f"{self.config.admin_url}/clients/{client_id}" 

988 get_response = await client.get(get_url, headers=headers) 

989 get_response.raise_for_status() 

990 

991 client_data = get_response.json() 

992 client_data["attributes"] = attributes 

993 

994 # Update client with new attributes 

995 put_url = f"{self.config.admin_url}/clients/{client_id}" 

996 put_response = await client.put(put_url, headers=headers, json=client_data) 

997 put_response.raise_for_status() 

998 

999 logger.info(f"Updated attributes for client {client_id}") 

1000 metrics.successful_calls.add(1, {"operation": "update_client_attributes"}) 

1001 

1002 except httpx.HTTPStatusError as e: 

1003 logger.error( 

1004 f"Failed to update client attributes: {e}", 

1005 extra={"status_code": e.response.status_code, "client_id": client_id}, 

1006 exc_info=True, 

1007 ) 

1008 metrics.failed_calls.add(1, {"operation": "update_client_attributes"}) 

1009 raise 

1010 except httpx.HTTPError as e: 

1011 logger.error(f"HTTP error updating client attributes: {e}", exc_info=True) 

1012 metrics.failed_calls.add(1, {"operation": "update_client_attributes"}) 

1013 raise 

1014 

1015 async def update_client_secret(self, client_id: str, secret: str) -> None: 

1016 """ 

1017 Update client secret via Admin API. 

1018 

1019 Args: 

1020 client_id: Client UUID (not clientId) 

1021 secret: New client secret 

1022 

1023 Raises: 

1024 httpx.HTTPError: If client secret update fails 

1025 """ 

1026 with tracer.start_as_current_span("keycloak.update_client_secret") as span: 

1027 span.set_attribute("client.uuid", client_id) 

1028 

1029 try: 

1030 admin_token = await self.get_admin_token() 

1031 

1032 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1033 headers = { 

1034 "Authorization": f"Bearer {admin_token}", 

1035 "Content-Type": "application/json", 

1036 } 

1037 

1038 # Keycloak client-secret regeneration endpoint 

1039 url = f"{self.config.admin_url}/clients/{client_id}/client-secret" 

1040 secret_config = {"type": "secret", "value": secret} 

1041 

1042 response = await client.post(url, headers=headers, json=secret_config) 

1043 response.raise_for_status() 

1044 

1045 logger.info(f"Updated secret for client {client_id}") 

1046 metrics.successful_calls.add(1, {"operation": "update_client_secret"}) 

1047 

1048 except httpx.HTTPStatusError as e: 

1049 logger.error( 

1050 f"Failed to update client secret: {e}", 

1051 extra={"status_code": e.response.status_code, "client_id": client_id}, 

1052 exc_info=True, 

1053 ) 

1054 metrics.failed_calls.add(1, {"operation": "update_client_secret"}) 

1055 raise 

1056 except httpx.HTTPError as e: 

1057 logger.error(f"HTTP error updating client secret: {e}", exc_info=True) 

1058 metrics.failed_calls.add(1, {"operation": "update_client_secret"}) 

1059 raise 

1060 

1061 async def get_clients(self, query: dict[str, Any] | None = None) -> list[dict[str, Any]]: 

1062 """ 

1063 Get clients via Admin API. 

1064 

1065 Args: 

1066 query: Optional query parameters (e.g., {"clientId": "my-client"}) 

1067 

1068 Returns: 

1069 List of client data dictionaries 

1070 

1071 Raises: 

1072 httpx.HTTPError: If getting clients fails 

1073 """ 

1074 with tracer.start_as_current_span("keycloak.get_clients"): 

1075 try: 

1076 admin_token = await self.get_admin_token() 

1077 

1078 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1079 headers = {"Authorization": f"Bearer {admin_token}"} 

1080 

1081 url = f"{self.config.admin_url}/clients" 

1082 # Pass query parameters if provided 

1083 params = query if query else {} 

1084 

1085 response = await client.get(url, headers=headers, params=params) 

1086 response.raise_for_status() 

1087 

1088 clients_data = response.json() 

1089 

1090 logger.info(f"Retrieved {len(clients_data)} clients") 

1091 metrics.successful_calls.add(1, {"operation": "get_clients"}) 

1092 

1093 return clients_data # type: ignore[no-any-return] 

1094 

1095 except httpx.HTTPStatusError as e: 

1096 logger.error( 

1097 f"Failed to get clients: {e}", 

1098 extra={"status_code": e.response.status_code}, 

1099 exc_info=True, 

1100 ) 

1101 metrics.failed_calls.add(1, {"operation": "get_clients"}) 

1102 raise 

1103 except httpx.HTTPError as e: 

1104 logger.error(f"HTTP error getting clients: {e}", exc_info=True) 

1105 metrics.failed_calls.add(1, {"operation": "get_clients"}) 

1106 raise 

1107 

1108 async def get_client(self, client_id: str) -> dict[str, Any] | None: 

1109 """ 

1110 Get client by ID via Admin API. 

1111 

1112 Args: 

1113 client_id: Client UUID (not clientId) 

1114 

1115 Returns: 

1116 Client data dictionary or None if not found 

1117 

1118 Raises: 

1119 httpx.HTTPError: If getting client fails (except 404 which returns None) 

1120 """ 

1121 with tracer.start_as_current_span("keycloak.get_client") as span: 

1122 span.set_attribute("client.uuid", client_id) 

1123 

1124 try: 

1125 admin_token = await self.get_admin_token() 

1126 

1127 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1128 headers = {"Authorization": f"Bearer {admin_token}"} 

1129 

1130 url = f"{self.config.admin_url}/clients/{client_id}" 

1131 response = await client.get(url, headers=headers) 

1132 response.raise_for_status() 

1133 

1134 client_data = response.json() 

1135 

1136 logger.info(f"Retrieved client {client_id}") 

1137 metrics.successful_calls.add(1, {"operation": "get_client"}) 

1138 

1139 return client_data # type: ignore[no-any-return] 

1140 

1141 except httpx.HTTPStatusError as e: 

1142 # Return None for 404 (client not found) 

1143 if e.response.status_code == 404: 1143 ↛ 1147line 1143 didn't jump to line 1147 because the condition on line 1143 was always true

1144 logger.info(f"Client {client_id} not found") 

1145 return None 

1146 

1147 logger.error( 

1148 f"Failed to get client: {e}", 

1149 extra={"status_code": e.response.status_code, "client_id": client_id}, 

1150 exc_info=True, 

1151 ) 

1152 metrics.failed_calls.add(1, {"operation": "get_client"}) 

1153 raise 

1154 except httpx.HTTPError as e: 

1155 logger.error(f"HTTP error getting client: {e}", exc_info=True) 

1156 metrics.failed_calls.add(1, {"operation": "get_client"}) 

1157 raise 

1158 

1159 async def delete_client(self, client_id: str) -> None: 

1160 """ 

1161 Delete Keycloak client via Admin API. 

1162 

1163 Args: 

1164 client_id: Client UUID (not clientId) to delete 

1165 

1166 Raises: 

1167 httpx.HTTPError: If client deletion fails 

1168 """ 

1169 with tracer.start_as_current_span("keycloak.delete_client") as span: 

1170 span.set_attribute("client.uuid", client_id) 

1171 

1172 try: 

1173 admin_token = await self.get_admin_token() 

1174 

1175 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1176 headers = {"Authorization": f"Bearer {admin_token}"} 

1177 

1178 url = f"{self.config.admin_url}/clients/{client_id}" 

1179 response = await client.delete(url, headers=headers) 

1180 response.raise_for_status() 

1181 

1182 logger.info(f"Keycloak client deleted: {client_id}") 

1183 metrics.successful_calls.add(1, {"operation": "delete_client"}) 

1184 

1185 except httpx.HTTPStatusError as e: 

1186 logger.error( 

1187 f"Failed to delete client: {e}", 

1188 extra={"status_code": e.response.status_code, "client_id": client_id}, 

1189 exc_info=True, 

1190 ) 

1191 metrics.failed_calls.add(1, {"operation": "delete_client"}) 

1192 raise 

1193 except httpx.HTTPError as e: 

1194 logger.error(f"HTTP error deleting client: {e}", exc_info=True) 

1195 metrics.failed_calls.add(1, {"operation": "delete_client"}) 

1196 raise 

1197 

1198 async def delete_user(self, user_id: str) -> None: 

1199 """ 

1200 Delete Keycloak user via Admin API. 

1201 

1202 Args: 

1203 user_id: User UUID to delete 

1204 

1205 Raises: 

1206 httpx.HTTPError: If user deletion fails 

1207 """ 

1208 with tracer.start_as_current_span("keycloak.delete_user") as span: 

1209 span.set_attribute("user.uuid", user_id) 

1210 

1211 try: 

1212 admin_token = await self.get_admin_token() 

1213 

1214 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1215 headers = {"Authorization": f"Bearer {admin_token}"} 

1216 

1217 url = f"{self.config.admin_url}/users/{user_id}" 

1218 response = await client.delete(url, headers=headers) 

1219 response.raise_for_status() 

1220 

1221 logger.info(f"Keycloak user deleted: {user_id}") 

1222 metrics.successful_calls.add(1, {"operation": "delete_user"}) 

1223 

1224 except httpx.HTTPStatusError as e: 

1225 logger.error( 

1226 f"Failed to delete user: {e}", 

1227 extra={"status_code": e.response.status_code, "user_id": user_id}, 

1228 exc_info=True, 

1229 ) 

1230 metrics.failed_calls.add(1, {"operation": "delete_user"}) 

1231 raise 

1232 except httpx.HTTPError as e: 

1233 logger.error(f"HTTP error deleting user: {e}", exc_info=True) 

1234 metrics.failed_calls.add(1, {"operation": "delete_user"}) 

1235 raise 

1236 

1237 async def search_users(self, query: dict[str, Any] | None = None, first: int = 0, max: int = 100) -> list[dict[str, Any]]: 

1238 """ 

1239 Search users via Admin API with pagination. 

1240 

1241 Args: 

1242 query: Optional search query (e.g., {"username": "alice", "email": "alice@example.com"}) 

1243 first: Pagination offset (default: 0) 

1244 max: Maximum results to return (default: 100) 

1245 

1246 Returns: 

1247 List of user data dictionaries matching the query 

1248 

1249 Raises: 

1250 httpx.HTTPError: If user search fails 

1251 """ 

1252 with tracer.start_as_current_span("keycloak.search_users") as span: 

1253 span.set_attribute("pagination.first", first) 

1254 span.set_attribute("pagination.max", max) 

1255 

1256 try: 

1257 admin_token = await self.get_admin_token() 

1258 

1259 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1260 headers = {"Authorization": f"Bearer {admin_token}"} 

1261 

1262 url = f"{self.config.admin_url}/users" 

1263 

1264 # Build query parameters 

1265 params: dict[str, Any] = {"first": first, "max": max} 

1266 if query: 1266 ↛ 1269line 1266 didn't jump to line 1269 because the condition on line 1266 was always true

1267 params.update(query) 

1268 

1269 response = await client.get(url, headers=headers, params=params) 

1270 response.raise_for_status() 

1271 

1272 users_data = response.json() 

1273 

1274 logger.info(f"Searched users, found {len(users_data)} results") 

1275 metrics.successful_calls.add(1, {"operation": "search_users"}) 

1276 

1277 return users_data # type: ignore[no-any-return] 

1278 

1279 except httpx.HTTPStatusError as e: 

1280 logger.error( 

1281 f"Failed to search users: {e}", 

1282 extra={"status_code": e.response.status_code}, 

1283 exc_info=True, 

1284 ) 

1285 metrics.failed_calls.add(1, {"operation": "search_users"}) 

1286 raise 

1287 except httpx.HTTPError as e: 

1288 logger.error(f"HTTP error searching users: {e}", exc_info=True) 

1289 metrics.failed_calls.add(1, {"operation": "search_users"}) 

1290 raise 

1291 

1292 async def get_users(self, query: dict[str, Any] | None = None) -> list[dict[str, Any]]: 

1293 """ 

1294 Get all users via Admin API with optional filtering. 

1295 

1296 Args: 

1297 query: Optional query parameters (e.g., {"email": "example.com"}) 

1298 

1299 Returns: 

1300 List of all user data dictionaries 

1301 

1302 Raises: 

1303 httpx.HTTPError: If getting users fails 

1304 """ 

1305 with tracer.start_as_current_span("keycloak.get_users"): 

1306 try: 

1307 admin_token = await self.get_admin_token() 

1308 

1309 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1310 headers = {"Authorization": f"Bearer {admin_token}"} 

1311 

1312 url = f"{self.config.admin_url}/users" 

1313 params = query if query else {} 

1314 

1315 response = await client.get(url, headers=headers, params=params) 

1316 response.raise_for_status() 

1317 

1318 users_data = response.json() 

1319 

1320 logger.info(f"Retrieved {len(users_data)} users") 

1321 metrics.successful_calls.add(1, {"operation": "get_users"}) 

1322 

1323 return users_data # type: ignore[no-any-return] 

1324 

1325 except httpx.HTTPStatusError as e: 

1326 logger.error( 

1327 f"Failed to get users: {e}", 

1328 extra={"status_code": e.response.status_code}, 

1329 exc_info=True, 

1330 ) 

1331 metrics.failed_calls.add(1, {"operation": "get_users"}) 

1332 raise 

1333 except httpx.HTTPError as e: 

1334 logger.error(f"HTTP error getting users: {e}", exc_info=True) 

1335 metrics.failed_calls.add(1, {"operation": "get_users"}) 

1336 raise 

1337 

1338 async def create_group(self, group_config: dict[str, Any]) -> str: 

1339 """ 

1340 Create group via Admin API. 

1341 

1342 Args: 

1343 group_config: Dictionary of group properties (must include "name") 

1344 

1345 Returns: 

1346 Group UUID extracted from Location header 

1347 

1348 Raises: 

1349 httpx.HTTPError: If group creation fails 

1350 """ 

1351 with tracer.start_as_current_span("keycloak.create_group"): 

1352 try: 

1353 admin_token = await self.get_admin_token() 

1354 

1355 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1356 headers = { 

1357 "Authorization": f"Bearer {admin_token}", 

1358 "Content-Type": "application/json", 

1359 } 

1360 

1361 url = f"{self.config.admin_url}/groups" 

1362 response = await client.post(url, headers=headers, json=group_config) 

1363 response.raise_for_status() 

1364 

1365 # Extract group ID from Location header 

1366 location = response.headers.get("Location", "") 

1367 group_id: str = location.split("/")[-1] 

1368 

1369 logger.info(f"Created group {group_id}") 

1370 metrics.successful_calls.add(1, {"operation": "create_group"}) 

1371 

1372 return group_id 

1373 

1374 except httpx.HTTPStatusError as e: 

1375 logger.error( 

1376 f"Failed to create group: {e}", 

1377 extra={"status_code": e.response.status_code}, 

1378 exc_info=True, 

1379 ) 

1380 metrics.failed_calls.add(1, {"operation": "create_group"}) 

1381 raise 

1382 except httpx.HTTPError as e: 

1383 logger.error(f"HTTP error creating group: {e}", exc_info=True) 

1384 metrics.failed_calls.add(1, {"operation": "create_group"}) 

1385 raise 

1386 

1387 async def get_group(self, group_id: str) -> dict[str, Any] | None: 

1388 """ 

1389 Get group by ID via Admin API. 

1390 

1391 Args: 

1392 group_id: Group UUID 

1393 

1394 Returns: 

1395 Group data dictionary or None if not found 

1396 

1397 Raises: 

1398 httpx.HTTPError: If getting group fails (except 404 which returns None) 

1399 """ 

1400 with tracer.start_as_current_span("keycloak.get_group") as span: 

1401 span.set_attribute("group.uuid", group_id) 

1402 

1403 try: 

1404 admin_token = await self.get_admin_token() 

1405 

1406 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1407 headers = {"Authorization": f"Bearer {admin_token}"} 

1408 

1409 url = f"{self.config.admin_url}/groups/{group_id}" 

1410 response = await client.get(url, headers=headers) 

1411 response.raise_for_status() 

1412 

1413 group_data = response.json() 

1414 

1415 logger.info(f"Retrieved group {group_id}") 

1416 metrics.successful_calls.add(1, {"operation": "get_group"}) 

1417 

1418 return group_data # type: ignore[no-any-return] 

1419 

1420 except httpx.HTTPStatusError as e: 

1421 # Return None for 404 (group not found) 

1422 if e.response.status_code == 404: 1422 ↛ 1426line 1422 didn't jump to line 1426 because the condition on line 1422 was always true

1423 logger.info(f"Group {group_id} not found") 

1424 return None 

1425 

1426 logger.error( 

1427 f"Failed to get group: {e}", 

1428 extra={"status_code": e.response.status_code, "group_id": group_id}, 

1429 exc_info=True, 

1430 ) 

1431 metrics.failed_calls.add(1, {"operation": "get_group"}) 

1432 raise 

1433 except httpx.HTTPError as e: 

1434 logger.error(f"HTTP error getting group: {e}", exc_info=True) 

1435 metrics.failed_calls.add(1, {"operation": "get_group"}) 

1436 raise 

1437 

1438 async def get_group_members(self, group_id: str) -> list[dict[str, Any]]: 

1439 """ 

1440 Get group members via Admin API. 

1441 

1442 Args: 

1443 group_id: Group UUID 

1444 

1445 Returns: 

1446 List of user data dictionaries for group members 

1447 

1448 Raises: 

1449 httpx.HTTPError: If getting group members fails 

1450 """ 

1451 with tracer.start_as_current_span("keycloak.get_group_members") as span: 

1452 span.set_attribute("group.uuid", group_id) 

1453 

1454 try: 

1455 admin_token = await self.get_admin_token() 

1456 

1457 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1458 headers = {"Authorization": f"Bearer {admin_token}"} 

1459 

1460 url = f"{self.config.admin_url}/groups/{group_id}/members" 

1461 response = await client.get(url, headers=headers) 

1462 response.raise_for_status() 

1463 

1464 members_data = response.json() 

1465 

1466 logger.info(f"Retrieved {len(members_data)} members for group {group_id}") 

1467 metrics.successful_calls.add(1, {"operation": "get_group_members"}) 

1468 

1469 return members_data # type: ignore[no-any-return] 

1470 

1471 except httpx.HTTPStatusError as e: 

1472 logger.error( 

1473 f"Failed to get group members: {e}", 

1474 extra={"status_code": e.response.status_code, "group_id": group_id}, 

1475 exc_info=True, 

1476 ) 

1477 metrics.failed_calls.add(1, {"operation": "get_group_members"}) 

1478 raise 

1479 except httpx.HTTPError as e: 

1480 logger.error(f"HTTP error getting group members: {e}", exc_info=True) 

1481 metrics.failed_calls.add(1, {"operation": "get_group_members"}) 

1482 raise 

1483 

1484 async def add_user_to_group(self, user_id: str, group_id: str) -> None: 

1485 """ 

1486 Add user to group via Admin API. 

1487 

1488 Args: 

1489 user_id: User UUID 

1490 group_id: Group UUID 

1491 

1492 Raises: 

1493 httpx.HTTPError: If adding user to group fails 

1494 """ 

1495 with tracer.start_as_current_span("keycloak.add_user_to_group") as span: 

1496 span.set_attribute("user.uuid", user_id) 

1497 span.set_attribute("group.uuid", group_id) 

1498 

1499 try: 

1500 admin_token = await self.get_admin_token() 

1501 

1502 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1503 headers = {"Authorization": f"Bearer {admin_token}"} 

1504 

1505 # Keycloak endpoint for adding user to group 

1506 url = f"{self.config.admin_url}/users/{user_id}/groups/{group_id}" 

1507 response = await client.put(url, headers=headers) 

1508 response.raise_for_status() 

1509 

1510 logger.info(f"Added user {user_id} to group {group_id}") 

1511 metrics.successful_calls.add(1, {"operation": "add_user_to_group"}) 

1512 

1513 except httpx.HTTPStatusError as e: 

1514 logger.error( 

1515 f"Failed to add user to group: {e}", 

1516 extra={"status_code": e.response.status_code, "user_id": user_id, "group_id": group_id}, 

1517 exc_info=True, 

1518 ) 

1519 metrics.failed_calls.add(1, {"operation": "add_user_to_group"}) 

1520 raise 

1521 except httpx.HTTPError as e: 

1522 logger.error(f"HTTP error adding user to group: {e}", exc_info=True) 

1523 metrics.failed_calls.add(1, {"operation": "add_user_to_group"}) 

1524 raise 

1525 

1526 async def issue_token_for_user( 

1527 self, 

1528 user_id: str, 

1529 requested_token_type: str = "urn:ietf:params:oauth:token-type:access_token", 

1530 audience: str | None = None, 

1531 ) -> dict[str, Any]: 

1532 """ 

1533 Issue JWT token for user using OAuth 2.0 Token Exchange (RFC 8693). 

1534 

1535 This enables programmatic token issuance for API key → JWT exchange workflows. 

1536 Uses Keycloak's token exchange protocol to exchange admin token for user token. 

1537 

1538 Args: 

1539 user_id: User UUID to issue token for 

1540 requested_token_type: Type of token requested (default: access_token) 

1541 audience: Optional audience for the issued token 

1542 

1543 Returns: 

1544 Token response with access_token, refresh_token, expires_in, etc. 

1545 

1546 Raises: 

1547 httpx.HTTPError: If token exchange fails 

1548 

1549 Note: 

1550 Requires Keycloak realm to have token exchange enabled and admin user 

1551 to have appropriate permissions (token-exchange scope). 

1552 """ 

1553 with tracer.start_as_current_span("keycloak.issue_token_for_user") as span: 

1554 span.set_attribute("user.uuid", user_id) 

1555 span.set_attribute("requested_token_type", requested_token_type) 

1556 

1557 try: 

1558 admin_token = await self.get_admin_token() 

1559 

1560 async with httpx.AsyncClient(verify=self.config.verify_ssl, timeout=self.config.timeout) as client: 

1561 headers = {"Content-Type": "application/x-www-form-urlencoded"} 

1562 

1563 # OAuth 2.0 Token Exchange (RFC 8693) 

1564 # https://www.rfc-editor.org/rfc/rfc8693.html 

1565 data = { 

1566 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", 

1567 "client_id": self.config.client_id, 

1568 "subject_token": admin_token, 

1569 "subject_token_type": "urn:ietf:params:oauth:token-type:access_token", 

1570 "requested_token_type": requested_token_type, 

1571 "requested_subject": user_id, # User to issue token for 

1572 } 

1573 

1574 if self.config.client_secret: 1574 ↛ 1577line 1574 didn't jump to line 1577 because the condition on line 1574 was always true

1575 data["client_secret"] = self.config.client_secret 

1576 

1577 if audience: 1577 ↛ 1578line 1577 didn't jump to line 1578 because the condition on line 1577 was never true

1578 data["audience"] = audience 

1579 

1580 response = await client.post(self.config.token_endpoint, headers=headers, data=data) 

1581 response.raise_for_status() 

1582 

1583 tokens = response.json() 

1584 

1585 logger.info(f"Issued token for user {user_id}") 

1586 metrics.successful_calls.add(1, {"operation": "issue_token_for_user"}) 

1587 

1588 return tokens # type: ignore[no-any-return] 

1589 

1590 except httpx.HTTPStatusError as e: 

1591 logger.error( 

1592 f"Failed to issue token for user: {e}", 

1593 extra={"status_code": e.response.status_code, "user_id": user_id, "detail": e.response.text}, 

1594 exc_info=True, 

1595 ) 

1596 metrics.failed_calls.add(1, {"operation": "issue_token_for_user"}) 

1597 raise 

1598 except httpx.HTTPError as e: 

1599 logger.error(f"HTTP error issuing token for user: {e}", exc_info=True) 

1600 metrics.failed_calls.add(1, {"operation": "issue_token_for_user"}) 

1601 raise 

1602 

1603 

1604async def sync_user_to_openfga( 

1605 keycloak_user: KeycloakUser, openfga_client: Any, role_mapper: Any | None = None, use_legacy_mapping: bool = False 

1606) -> None: 

1607 """ 

1608 Synchronize Keycloak user roles/groups to OpenFGA tuples 

1609 

1610 Supports two modes: 

1611 1. RoleMapper (recommended): Uses configurable YAML-based mapping 

1612 2. Legacy (backward compatible): Uses hardcoded mapping rules 

1613 

1614 Args: 

1615 keycloak_user: Keycloak user to sync 

1616 openfga_client: OpenFGA client instance 

1617 role_mapper: RoleMapper instance (optional, will use default if None) 

1618 use_legacy_mapping: Use legacy hardcoded mapping (default: False) 

1619 """ 

1620 with tracer.start_as_current_span("keycloak.sync_to_openfga") as span: 

1621 span.set_attribute("user.id", keycloak_user.user_id) 

1622 span.set_attribute("mapping.mode", "legacy" if use_legacy_mapping else "role_mapper") 

1623 

1624 tuples = [] 

1625 

1626 if use_legacy_mapping: 1626 ↛ 1628line 1626 didn't jump to line 1628 because the condition on line 1626 was never true

1627 # Legacy hardcoded mapping for backward compatibility 

1628 logger.info("Using legacy role mapping") 

1629 

1630 # Map realm roles 

1631 if "admin" in keycloak_user.realm_roles: 

1632 tuples.append({"user": keycloak_user.user_id, "relation": "admin", "object": "system:global"}) 

1633 

1634 # Map groups to organizations 

1635 for group in keycloak_user.groups: 

1636 org_name = group.strip("/").split("/")[-1] 

1637 if org_name: 

1638 tuples.append({"user": keycloak_user.user_id, "relation": "member", "object": f"organization:{org_name}"}) 

1639 

1640 # Map client roles 

1641 client_roles = keycloak_user.client_roles.get(keycloak_user.client_id, []) # type: ignore[attr-defined] 

1642 for role in client_roles: 

1643 tuples.append({"user": keycloak_user.user_id, "relation": "assignee", "object": f"role:{role}"}) 

1644 

1645 # Map premium role for backward compatibility 

1646 if "premium" in keycloak_user.realm_roles or "premium" in client_roles: 

1647 tuples.append({"user": keycloak_user.user_id, "relation": "assignee", "object": "role:premium"}) 

1648 

1649 else: 

1650 # Use RoleMapper for flexible, configurable mapping 

1651 if role_mapper is None: 1651 ↛ 1657line 1651 didn't jump to line 1657 because the condition on line 1651 was always true

1652 # Import here to avoid circular dependency 

1653 from mcp_server_langgraph.auth.role_mapper import RoleMapper 

1654 

1655 role_mapper = RoleMapper() # Uses default config 

1656 

1657 logger.info("Using RoleMapper for role mapping") 

1658 tuples = await role_mapper.map_user_to_tuples(keycloak_user) 

1659 

1660 # Write tuples to OpenFGA 

1661 if tuples: 

1662 try: 

1663 await openfga_client.write_tuples(tuples) 

1664 logger.info(f"Synced {len(tuples)} tuples to OpenFGA for user {keycloak_user.username}") 

1665 except Exception as e: 

1666 logger.error(f"Failed to sync user to OpenFGA: {e}", exc_info=True) 

1667 raise 

1668 else: 

1669 logger.info(f"No tuples to sync for user {keycloak_user.username}")