Coverage for src / mcp_server_langgraph / auth / user_provider.py: 86%

263 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Abstract user provider interface for pluggable authentication backends 

3 

4Enables switching between different user management systems: 

5- InMemoryUserProvider (development/testing) 

6- KeycloakUserProvider (production) 

7- Custom providers (future extensions) 

8""" 

9 

10from abc import ABC, abstractmethod 

11from datetime import datetime, timedelta, UTC 

12from typing import Any, TypedDict 

13 

14import jwt 

15from pydantic import BaseModel, ConfigDict, Field 

16 

17from mcp_server_langgraph.auth.keycloak import KeycloakClient, KeycloakConfig, sync_user_to_openfga 

18from mcp_server_langgraph.observability.telemetry import logger, tracer 

19 

20# Try to import bcrypt for password hashing 

21try: 

22 import bcrypt 

23 

24 BCRYPT_AVAILABLE = True 

25except ImportError: 

26 BCRYPT_AVAILABLE = False 

27 # Note: logger warning deferred to InMemoryUserProvider.__init__ 

28 # to avoid import-time observability dependency 

29 

30# ============================================================================ 

31# Pydantic Models for Type-Safe Authentication Responses 

32# ============================================================================ 

33 

34 

35class UserDBEntry(TypedDict): 

36 """Type definition for user database entries""" 

37 

38 user_id: str 

39 email: str 

40 password: str 

41 roles: list[str] 

42 active: bool 

43 

44 

45class UserData(BaseModel): 

46 """ 

47 Type-safe user data structure 

48 

49 Represents user information returned from user queries. 

50 """ 

51 

52 user_id: str = Field(..., description="User identifier (e.g., 'user:alice')") 

53 username: str = Field(..., description="Username") 

54 email: str = Field(..., description="Email address") 

55 roles: list[str] = Field(default_factory=list, description="User roles") 

56 active: bool = Field(default=True, description="Whether user account is active") 

57 

58 model_config = ConfigDict( 

59 frozen=False, 

60 validate_assignment=True, 

61 str_strip_whitespace=True, 

62 json_schema_extra={ 

63 "example": { 

64 "user_id": "user:alice", 

65 "username": "alice", 

66 "email": "alice@acme.com", 

67 "roles": ["user", "premium"], 

68 "active": True, 

69 } 

70 }, 

71 ) 

72 

73 def to_dict(self) -> dict[str, Any]: 

74 """Convert to dictionary for backward compatibility""" 

75 return self.model_dump() 

76 

77 @classmethod 

78 def from_dict(cls, data: dict[str, Any]) -> "UserData": 

79 """Create UserData from dictionary""" 

80 return cls(**data) 

81 

82 

83class AuthResponse(BaseModel): 

84 """ 

85 Type-safe authentication response 

86 

87 Returned from authenticate() operations. 

88 """ 

89 

90 authorized: bool = Field(..., description="Whether authentication was successful") 

91 username: str | None = Field(default=None, description="Username if authorized") 

92 user_id: str | None = Field(default=None, description="User ID if authorized") 

93 email: str | None = Field(default=None, description="Email if authorized") 

94 roles: list[str] = Field(default_factory=list, description="User roles if authorized") 

95 reason: str | None = Field(default=None, description="Failure reason if not authorized") 

96 error: str | None = Field(default=None, description="Error details if not authorized") 

97 # Keycloak-specific fields (optional) 

98 access_token: str | None = Field(default=None, description="JWT access token (Keycloak)") 

99 refresh_token: str | None = Field(default=None, description="JWT refresh token (Keycloak)") 

100 expires_in: int | None = Field(default=None, description="Token expiration in seconds (Keycloak)") 

101 

102 model_config = ConfigDict( 

103 frozen=False, 

104 validate_assignment=True, 

105 str_strip_whitespace=True, 

106 json_schema_extra={ 

107 "example": { 

108 "authorized": True, 

109 "username": "alice", 

110 "user_id": "user:alice", 

111 "email": "alice@acme.com", 

112 "roles": ["user", "premium"], 

113 "reason": None, 

114 "error": None, 

115 "access_token": "eyJ...", 

116 "refresh_token": "eyJ...", 

117 "expires_in": 300, 

118 } 

119 }, 

120 ) 

121 

122 def to_dict(self) -> dict[str, Any]: 

123 """Convert to dictionary for backward compatibility""" 

124 return self.model_dump(exclude_none=True) 

125 

126 @classmethod 

127 def from_dict(cls, data: dict[str, Any]) -> "AuthResponse": 

128 """Create AuthResponse from dictionary""" 

129 return cls(**data) 

130 

131 

132class TokenVerification(BaseModel): 

133 """ 

134 Type-safe token verification response 

135 

136 Returned from verify_token() operations. 

137 """ 

138 

139 valid: bool = Field(..., description="Whether token is valid") 

140 payload: dict[str, Any] | None = Field(default=None, description="Token payload if valid") 

141 error: str | None = Field(default=None, description="Error message if not valid") 

142 

143 model_config = ConfigDict( 

144 frozen=False, 

145 validate_assignment=True, 

146 json_schema_extra={"example": {"valid": True, "payload": {"sub": "user:alice", "exp": 1234567890}, "error": None}}, 

147 ) 

148 

149 def to_dict(self) -> dict[str, Any]: 

150 """Convert to dictionary for backward compatibility""" 

151 return self.model_dump(exclude_none=True) 

152 

153 @classmethod 

154 def from_dict(cls, data: dict[str, Any]) -> "TokenVerification": 

155 """Create TokenVerification from dictionary""" 

156 return cls(**data) 

157 

158 

159class PasswordVerification(BaseModel): 

160 """ 

161 Type-safe password verification response 

162 

163 Returned from verify_password() operations. 

164 Provides structured response for password validation with user data on success. 

165 """ 

166 

167 valid: bool = Field(..., description="Whether password is valid") 

168 user: dict[str, Any] | None = Field(default=None, description="User data if password valid") 

169 error: str | None = Field(default=None, description="Error message if password invalid") 

170 

171 model_config = ConfigDict( 

172 frozen=False, 

173 validate_assignment=True, 

174 json_schema_extra={ 

175 "example": { 

176 "valid": True, 

177 "user": {"username": "alice", "user_id": "user:alice", "email": "alice@example.com", "roles": ["user"]}, 

178 "error": None, 

179 } 

180 }, 

181 ) 

182 

183 def to_dict(self) -> dict[str, Any]: 

184 """Convert to dictionary for backward compatibility""" 

185 return self.model_dump(exclude_none=True) 

186 

187 @classmethod 

188 def from_dict(cls, data: dict[str, Any]) -> "PasswordVerification": 

189 """Create PasswordVerification from dictionary""" 

190 return cls(**data) 

191 

192 

193class UserProvider(ABC): 

194 """ 

195 Abstract base class for user providers 

196 

197 Defines the interface that all authentication backends must implement. 

198 """ 

199 

200 @abstractmethod 

201 async def authenticate(self, username: str, password: str | None = None) -> AuthResponse: 

202 """ 

203 Authenticate user by username/password 

204 

205 Args: 

206 username: Username 

207 password: Password (optional for some providers) 

208 

209 Returns: 

210 AuthResponse with authentication result 

211 """ 

212 

213 @abstractmethod 

214 async def get_user_by_id(self, user_id: str) -> UserData | None: 

215 """ 

216 Get user by user ID 

217 

218 Args: 

219 user_id: User identifier (e.g., "user:alice") 

220 

221 Returns: 

222 UserData or None if not found 

223 """ 

224 

225 @abstractmethod 

226 async def get_user_by_username(self, username: str) -> UserData | None: 

227 """ 

228 Get user by username 

229 

230 Args: 

231 username: Username 

232 

233 Returns: 

234 UserData or None if not found 

235 """ 

236 

237 @abstractmethod 

238 async def verify_token(self, token: str) -> TokenVerification: 

239 """ 

240 Verify JWT token 

241 

242 Args: 

243 token: JWT token to verify 

244 

245 Returns: 

246 TokenVerification with validation result 

247 """ 

248 

249 @abstractmethod 

250 async def verify_password(self, username: str, password: str) -> PasswordVerification: 

251 """ 

252 Verify username and password combination 

253 

254 Convenience method for testing and validation scenarios. 

255 For full authentication with token generation, use authenticate() instead. 

256 

257 Args: 

258 username: Username to verify 

259 password: Password to verify 

260 

261 Returns: 

262 PasswordVerification with validation result and user data if valid 

263 """ 

264 

265 @abstractmethod 

266 async def list_users(self) -> list[UserData]: 

267 """ 

268 List all users (for admin operations) 

269 

270 Returns: 

271 List of UserData objects 

272 """ 

273 

274 

275class InMemoryUserProvider(UserProvider): 

276 """ 

277 In-memory user provider for development and testing 

278 

279 Features: 

280 - Optional bcrypt password hashing (install bcrypt package) 

281 - JWT token generation 

282 - User lookup and authentication 

283 

284 NOT suitable for large-scale production (use KeycloakUserProvider instead). 

285 """ 

286 

287 def __init__(self, secret_key: str | None = None, use_password_hashing: bool = False) -> None: 

288 """ 

289 Initialize in-memory user provider 

290 

291 Args: 

292 secret_key: Secret key for JWT token signing. 

293 Required for production use. Should be loaded from environment variables. 

294 use_password_hashing: Enable bcrypt password hashing (requires bcrypt package). 

295 If True and bcrypt not available, falls back to plaintext. 

296 """ 

297 if not secret_key: 

298 # Use a random key for testing/development only 

299 import secrets as secrets_module 

300 

301 secret_key = secrets_module.token_urlsafe(32) 

302 logger.warning( 

303 "No secret_key provided to InMemoryUserProvider. Using random key. " 

304 "This is only suitable for testing/development. " 

305 "In production, always provide a secure secret key via environment variables." 

306 ) 

307 self.secret_key = secret_key 

308 

309 # Check bcrypt availability if hashing requested 

310 if use_password_hashing and not BCRYPT_AVAILABLE: 310 ↛ 312line 310 didn't jump to line 312 because the condition on line 310 was never true

311 # SECURITY: Fail-closed pattern - refuse to start with insecure config 

312 msg = ( 

313 "CRITICAL SECURITY ERROR: Password hashing requested (use_password_hashing=True) " 

314 "but bcrypt library is not available. " 

315 "This would result in INSECURE plaintext passwords. " 

316 "\n\nTo fix:\n" 

317 "1. Install bcrypt: Add 'bcrypt' to pyproject.toml dependencies, then run: uv sync\n" 

318 "2. Or disable password hashing: USE_PASSWORD_HASHING=false (NOT recommended for production)\n" 

319 "\nRefusing to start with insecure configuration." 

320 ) 

321 raise RuntimeError(msg) 

322 

323 self.use_password_hashing = use_password_hashing and BCRYPT_AVAILABLE 

324 

325 # SECURITY (OpenAI Codex Finding #2): Start with empty user database 

326 # No hard-coded credentials. Users must be explicitly created via add_user() or configuration. 

327 # This prevents CWE-798: Use of Hard-coded Credentials 

328 self.users_db: dict[str, UserDBEntry] = {} 

329 

330 if self.use_password_hashing: 

331 logger.info( 

332 "InMemoryUserProvider initialized with BCRYPT password hashing (secure)", 

333 extra={"user_count": len(self.users_db)}, 

334 ) 

335 else: 

336 logger.warning( 

337 "InMemoryUserProvider initialized with PLAINTEXT PASSWORDS (INSECURE). " 

338 "This is only suitable for development/testing. " 

339 "Use password hashing (use_password_hashing=True) or KeycloakUserProvider for production.", 

340 extra={"user_count": len(self.users_db)}, 

341 ) 

342 

343 def _hash_password(self, password: str) -> str: 

344 """ 

345 Hash password using bcrypt 

346 

347 Args: 

348 password: Plaintext password 

349 

350 Returns: 

351 Bcrypt password hash 

352 """ 

353 if not BCRYPT_AVAILABLE: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true

354 return password # Fallback to plaintext 

355 

356 password_bytes = password.encode("utf-8") 

357 

358 # bcrypt 5.0+ enforces 72-byte password limit (raises ValueError instead of silent truncation) 

359 if len(password_bytes) > 72: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true

360 msg = ( 

361 f"Password exceeds bcrypt's 72-byte limit ({len(password_bytes)} bytes). " 

362 "Consider hashing long passwords with SHA256 before bcrypt." 

363 ) 

364 raise ValueError(msg) 

365 

366 salt = bcrypt.gensalt() 

367 hashed = bcrypt.hashpw(password_bytes, salt) 

368 return hashed.decode("utf-8") 

369 

370 def _verify_password(self, password: str, password_hash: str) -> bool: 

371 """ 

372 Verify password against hash 

373 

374 Args: 

375 password: Plaintext password to verify 

376 password_hash: Stored password hash (or plaintext if hashing disabled) 

377 

378 Returns: 

379 True if password matches, False otherwise 

380 """ 

381 if not self.use_password_hashing: 

382 # Plaintext comparison (insecure, dev/test only) 

383 return password == password_hash 

384 

385 # Bcrypt comparison (secure) 

386 password_bytes = password.encode("utf-8") 

387 

388 # Silently handle overly long passwords during verification 

389 # (they would have been rejected during hashing with bcrypt 5.0+) 

390 if len(password_bytes) > 72: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true

391 return False 

392 

393 hash_bytes = password_hash.encode("utf-8") 

394 return bcrypt.checkpw(password_bytes, hash_bytes) 

395 

396 async def verify_password(self, username: str, password: str) -> PasswordVerification: 

397 """ 

398 Verify username and password combination 

399 

400 Convenience method for testing and validation scenarios. 

401 Returns structured PasswordVerification with user data on success. 

402 

403 For full authentication with token generation, use authenticate() instead. 

404 

405 Args: 

406 username: Username to verify 

407 password: Password to verify 

408 

409 Returns: 

410 PasswordVerification with validation result and user data if valid 

411 

412 Security: 

413 - Uses constant-time comparison for password hashes when bcrypt is available 

414 - Does not reveal whether username exists (timing-safe where possible) 

415 - Returns same error structure for non-existent users and invalid passwords 

416 """ 

417 with tracer.start_as_current_span("inmemory.verify_password") as span: 

418 span.set_attribute("auth.username", username) 

419 

420 # Validate inputs 

421 if not username or not password: 

422 span.set_attribute("auth.result", "invalid_input") 

423 return PasswordVerification(valid=False, error="Username and password are required") 

424 

425 # Check if user exists 

426 if username not in self.users_db: 

427 span.set_attribute("auth.result", "user_not_found") 

428 # Don't reveal user existence - return generic error 

429 return PasswordVerification(valid=False, error="Invalid credentials") 

430 

431 user = self.users_db[username] 

432 

433 # Verify password 

434 if not self._verify_password(password, user["password"]): 

435 span.set_attribute("auth.result", "invalid_password") 

436 return PasswordVerification(valid=False, error="Invalid credentials") 

437 

438 # Success - return user data 

439 span.set_attribute("auth.result", "success") 

440 return PasswordVerification( 

441 valid=True, 

442 user={ 

443 "username": username, 

444 "user_id": user["user_id"], 

445 "email": user["email"], 

446 "roles": user["roles"], 

447 }, 

448 ) 

449 

450 def add_user(self, username: str, password: str, email: str, roles: list[str], user_id: str | None = None) -> None: 

451 """ 

452 Add a new user to the in-memory database 

453 

454 IMPORTANT: InMemoryUserProvider no longer seeds default users for security. 

455 You must explicitly create users for testing/development using this method. 

456 

457 Example: 

458 provider = InMemoryUserProvider(use_password_hashing=True) 

459 provider.add_user( 

460 username="testuser", 

461 password="secure-password-123", 

462 email="testuser@example.com", 

463 roles=["user", "premium"] 

464 ) 

465 

466 Args: 

467 username: Username (unique identifier) 

468 password: Plaintext password (will be hashed if use_password_hashing=True) 

469 email: Email address 

470 roles: List of role names (e.g., ["user"], ["admin"], ["user", "premium"]) 

471 user_id: Optional user_id override for testing (e.g., worker-safe IDs in pytest-xdist) 

472 

473 Security: 

474 - Passwords are automatically hashed with bcrypt if use_password_hashing=True 

475 - For production, use KeycloakUserProvider instead of InMemoryUserProvider 

476 """ 

477 stored_password = self._hash_password(password) if self.use_password_hashing else password 

478 generated_user_id = user_id if user_id is not None else f"user:{username}" 

479 

480 self.users_db[username] = UserDBEntry( 

481 user_id=generated_user_id, 

482 email=email, 

483 password=stored_password, 

484 roles=roles, 

485 active=True, 

486 ) 

487 

488 logger.info(f"Added user: {username}", extra={"user_id": generated_user_id, "roles": roles}) 

489 

490 async def authenticate(self, username: str, password: str | None = None) -> AuthResponse: 

491 """ 

492 Authenticate user by username and password 

493 

494 SECURITY: This method now requires a password for authentication. 

495 Plaintext comparison is used for development/testing only. 

496 Use KeycloakUserProvider or implement password hashing for production. 

497 

498 Args: 

499 username: Username to authenticate 

500 password: Password (REQUIRED for InMemoryUserProvider) 

501 

502 Returns: 

503 AuthResponse with authentication result 

504 """ 

505 with tracer.start_as_current_span("inmemory.authenticate") as span: 

506 span.set_attribute("auth.username", username) 

507 

508 # SECURITY: Require password 

509 if not password: 

510 logger.warning("Authentication failed - password required", extra={"username": username}) 

511 return AuthResponse(authorized=False, reason="password_required") 

512 

513 # Check user database 

514 if username not in self.users_db: 

515 logger.warning("Authentication failed - user not found", extra={"username": username}) 

516 # Use same error message as invalid password to prevent username enumeration 

517 return AuthResponse(authorized=False, reason="invalid_credentials") 

518 

519 user = self.users_db[username] 

520 

521 # Check if account is active 

522 if not user["active"]: 

523 logger.warning("User account inactive", extra={"username": username}) 

524 return AuthResponse(authorized=False, reason="account_inactive") 

525 

526 # SECURITY: Validate password using bcrypt (if enabled) or plaintext comparison 

527 if not self._verify_password(password, user["password"]): 

528 logger.warning("Authentication failed - invalid password", extra={"username": username}) 

529 return AuthResponse(authorized=False, reason="invalid_credentials") 

530 

531 logger.info("User authenticated", extra={"username": username, "user_id": user["user_id"]}) 

532 

533 return AuthResponse( 

534 authorized=True, 

535 username=username, 

536 user_id=user["user_id"], 

537 email=user["email"], 

538 roles=user["roles"], 

539 ) 

540 

541 async def get_user_by_id(self, user_id: str) -> UserData | None: 

542 """Get user by ID""" 

543 # Extract username from user_id format "user:username" 

544 # Also handle worker-safe IDs from pytest-xdist (e.g., "user:test_gw0_alice" → "alice") 

545 if ":" in user_id: 545 ↛ 553line 545 didn't jump to line 553 because the condition on line 545 was always true

546 id_part = user_id.split(":", 1)[1] # Remove "user:" prefix 

547 # Check if it's a worker-safe ID (format: test_gw\d+_username) 

548 import re 

549 

550 match = re.match(r"test_gw\d+_(.*)", id_part) 

551 username = match.group(1) if match else id_part 

552 else: 

553 username = user_id 

554 return await self.get_user_by_username(username) 

555 

556 async def get_user_by_username(self, username: str) -> UserData | None: 

557 """Get user by username""" 

558 if username in self.users_db: 

559 user_data = self.users_db[username] 

560 return UserData( 

561 username=username, 

562 user_id=user_data["user_id"], 

563 email=user_data["email"], 

564 roles=user_data["roles"], 

565 active=user_data["active"], 

566 ) 

567 return None 

568 

569 async def verify_token(self, token: str) -> TokenVerification: 

570 """Verify self-issued JWT token""" 

571 try: 

572 payload = jwt.decode(token, self.secret_key, algorithms=["HS256"]) 

573 logger.info("Token verified", extra={"user_id": payload.get("sub")}) 

574 return TokenVerification(valid=True, payload=payload) 

575 except jwt.ExpiredSignatureError: 

576 logger.warning("Token expired") 

577 return TokenVerification(valid=False, error="Token expired") 

578 except jwt.InvalidTokenError as e: 

579 logger.warning(f"Invalid token: {e}") 

580 return TokenVerification(valid=False, error="Invalid token") 

581 

582 async def list_users(self) -> list[UserData]: 

583 """List all users""" 

584 return [ 

585 UserData( 

586 username=username, 

587 user_id=user_data["user_id"], 

588 email=user_data["email"], 

589 roles=user_data["roles"], 

590 active=user_data["active"], 

591 ) 

592 for username, user_data in self.users_db.items() 

593 ] 

594 

595 def create_token(self, username: str, expires_in: int = 3600) -> str: 

596 """ 

597 Create JWT token for user (helper method for in-memory provider) 

598 

599 Args: 

600 username: Username 

601 expires_in: Token expiration in seconds 

602 

603 Returns: 

604 JWT token string 

605 """ 

606 if username not in self.users_db: 

607 msg = f"User not found: {username}" 

608 raise ValueError(msg) 

609 

610 user = self.users_db[username] 

611 

612 # Use timestamp with microseconds for unique jti to ensure each token is different 

613 now = datetime.now(UTC) 

614 jti = f"{username}_{int(now.timestamp() * 1000000)}" # Microsecond precision 

615 

616 payload = { 

617 "sub": user["user_id"], 

618 "username": username, 

619 "email": user["email"], 

620 "roles": user["roles"], 

621 "exp": now + timedelta(seconds=expires_in), 

622 "iat": now, 

623 "jti": jti, # Unique token ID to ensure each token is different 

624 } 

625 

626 token = jwt.encode(payload, self.secret_key, algorithm="HS256") 

627 

628 logger.info("Token created", extra={"username": username, "user_id": user["user_id"], "expires_in": expires_in}) 

629 

630 return token 

631 

632 

633class KeycloakUserProvider(UserProvider): 

634 """ 

635 Keycloak user provider for production 

636 

637 Uses Keycloak as the identity provider for authentication and user management. 

638 """ 

639 

640 def __init__(self, config: KeycloakConfig, openfga_client: Any | None = None, sync_on_login: bool = True) -> None: 

641 """ 

642 Initialize Keycloak user provider 

643 

644 Args: 

645 config: Keycloak configuration 

646 openfga_client: Optional OpenFGA client for role synchronization 

647 sync_on_login: Whether to sync roles to OpenFGA on login 

648 """ 

649 self.client = KeycloakClient(config) 

650 self.config = config 

651 self.openfga_client = openfga_client 

652 self.sync_on_login = sync_on_login 

653 

654 logger.info( 

655 "Initialized KeycloakUserProvider", 

656 extra={"realm": config.realm, "client_id": config.client_id, "sync_on_login": sync_on_login}, 

657 ) 

658 

659 async def authenticate(self, username: str, password: str | None = None) -> AuthResponse: 

660 """ 

661 Authenticate user using Keycloak 

662 

663 Args: 

664 username: Username 

665 password: Password 

666 

667 Returns: 

668 AuthResponse with tokens 

669 """ 

670 with tracer.start_as_current_span("keycloak.authenticate") as span: 

671 span.set_attribute("auth.username", username) 

672 

673 if not password: 

674 logger.warning("Password required for Keycloak authentication") 

675 return AuthResponse(authorized=False, reason="password_required") 

676 

677 try: 

678 # Authenticate with Keycloak 

679 tokens = await self.client.authenticate_user(username, password) 

680 

681 # Get full user details for role sync 

682 keycloak_user = await self.client.get_user_by_username(username) 

683 

684 if not keycloak_user: 684 ↛ 685line 684 didn't jump to line 685 because the condition on line 684 was never true

685 logger.error(f"User authenticated but not found in admin API: {username}") 

686 return AuthResponse(authorized=False, reason="user_data_error") 

687 

688 # Sync to OpenFGA if enabled 

689 if self.sync_on_login and self.openfga_client: 

690 try: 

691 await sync_user_to_openfga(keycloak_user, self.openfga_client) 

692 except Exception as e: 

693 logger.error(f"Failed to sync user to OpenFGA: {e}", exc_info=True) 

694 # Don't fail authentication if sync fails 

695 

696 logger.info("User authenticated via Keycloak", extra={"username": username, "user_id": keycloak_user.user_id}) 

697 

698 return AuthResponse( 

699 authorized=True, 

700 username=keycloak_user.username, 

701 user_id=keycloak_user.user_id, 

702 email=keycloak_user.email or "", 

703 roles=keycloak_user.realm_roles, 

704 access_token=tokens["access_token"], 

705 refresh_token=tokens.get("refresh_token"), 

706 expires_in=tokens.get("expires_in", 300), 

707 ) 

708 

709 except Exception as e: 

710 logger.warning(f"Keycloak authentication failed for {username}: {e}") 

711 return AuthResponse(authorized=False, reason="authentication_failed", error=str(e)) 

712 

713 async def get_user_by_id(self, user_id: str) -> UserData | None: 

714 """Get user by ID""" 

715 # Extract username from user_id format "user:username" 

716 username = user_id.split(":")[-1] if ":" in user_id else user_id 

717 return await self.get_user_by_username(username) 

718 

719 async def get_user_by_username(self, username: str) -> UserData | None: 

720 """Get user by username from Keycloak""" 

721 try: 

722 keycloak_user = await self.client.get_user_by_username(username) 

723 

724 if keycloak_user: 

725 return UserData( 

726 username=keycloak_user.username, 

727 user_id=keycloak_user.user_id, 

728 email=keycloak_user.email or "", 

729 roles=keycloak_user.realm_roles, 

730 active=keycloak_user.enabled, 

731 ) 

732 

733 return None 

734 

735 except Exception as e: 

736 logger.error(f"Failed to get user {username}: {e}", exc_info=True) 

737 return None 

738 

739 async def verify_token(self, token: str) -> TokenVerification: 

740 """Verify Keycloak-issued JWT token""" 

741 try: 

742 payload = await self.client.verify_token(token) 

743 return TokenVerification(valid=True, payload=payload) 

744 except Exception as e: 

745 return TokenVerification(valid=False, error=str(e)) 

746 

747 async def verify_password(self, username: str, password: str) -> PasswordVerification: 

748 """ 

749 Verify username and password combination using Keycloak 

750 

751 Convenience method for testing and validation scenarios. 

752 Returns structured PasswordVerification with user data on success. 

753 

754 For full authentication with token generation, use authenticate() instead. 

755 

756 Args: 

757 username: Username to verify 

758 password: Password to verify 

759 

760 Returns: 

761 PasswordVerification with validation result and user data if valid 

762 

763 Security: 

764 - Delegates to Keycloak for password verification 

765 - Uses Keycloak's secure password hashing and verification 

766 - Returns same error structure for non-existent users and invalid passwords 

767 """ 

768 with tracer.start_as_current_span("keycloak.verify_password") as span: 

769 span.set_attribute("auth.username", username) 

770 

771 # Validate inputs 

772 if not username or not password: 

773 span.set_attribute("auth.result", "invalid_input") 

774 return PasswordVerification(valid=False, error="Username and password are required") 

775 

776 try: 

777 # Attempt authentication with Keycloak 

778 auth_response = await self.authenticate(username, password) 

779 

780 if not auth_response.authorized: 

781 span.set_attribute("auth.result", "authentication_failed") 

782 return PasswordVerification(valid=False, error="Invalid credentials") 

783 

784 # Success - return user data 

785 span.set_attribute("auth.result", "success") 

786 return PasswordVerification( 

787 valid=True, 

788 user={ 

789 "username": auth_response.username, 

790 "user_id": auth_response.user_id, 

791 "email": auth_response.email, 

792 "roles": auth_response.roles, 

793 }, 

794 ) 

795 

796 except Exception as e: 

797 span.set_attribute("auth.result", "error") 

798 logger.error(f"Password verification failed for {username}: {e}", exc_info=True) 

799 return PasswordVerification(valid=False, error="Password verification failed") 

800 

801 async def list_users(self) -> list[UserData]: 

802 """ 

803 List users (requires admin permissions) 

804 

805 Note: This is a placeholder. In production, you'd implement pagination 

806 and filtering using Keycloak admin API. 

807 """ 

808 logger.warning("list_users() not fully implemented for KeycloakUserProvider") 

809 return [] 

810 

811 async def refresh_token(self, refresh_token: str) -> dict[str, Any]: 

812 """ 

813 Refresh access token 

814 

815 Args: 

816 refresh_token: Refresh token 

817 

818 Returns: 

819 New tokens 

820 """ 

821 try: 

822 tokens = await self.client.refresh_token(refresh_token) 

823 return {"success": True, "tokens": tokens} 

824 except Exception as e: 

825 logger.error(f"Token refresh failed: {e}", exc_info=True) 

826 return {"success": False, "error": str(e)} 

827 

828 

829def create_user_provider( 

830 provider_type: str = "inmemory", 

831 secret_key: str | None = None, 

832 keycloak_config: KeycloakConfig | None = None, 

833 openfga_client: Any | None = None, 

834) -> UserProvider: 

835 """ 

836 Factory function to create user provider based on explicit parameters (test use) 

837 

838 This factory is designed for test code where you want explicit control 

839 over provider configuration without requiring a Settings object. 

840 

841 For production code, use `mcp_server_langgraph.auth.factory.create_user_provider()` 

842 which reads from application settings. 

843 

844 Args: 

845 provider_type: Type of provider ("inmemory", "keycloak") 

846 secret_key: Secret key for in-memory provider 

847 keycloak_config: Keycloak configuration for keycloak provider 

848 openfga_client: OpenFGA client for role synchronization 

849 

850 Returns: 

851 User provider instance 

852 

853 Raises: 

854 ValueError: If provider type is unknown or required config is missing 

855 

856 See Also: 

857 mcp_server_langgraph.auth.factory.create_user_provider: Recommended for production use 

858 """ 

859 provider_type = provider_type.lower() 

860 

861 if provider_type == "inmemory": 

862 logger.info("Creating InMemoryUserProvider (development mode)") 

863 return InMemoryUserProvider(secret_key=secret_key or "your-secret-key-change-in-production") 

864 

865 elif provider_type == "keycloak": 

866 if not keycloak_config: 

867 msg = "keycloak_config required for KeycloakUserProvider" 

868 raise ValueError(msg) 

869 

870 logger.info("Creating KeycloakUserProvider (production mode)") 

871 return KeycloakUserProvider(config=keycloak_config, openfga_client=openfga_client) 

872 

873 else: 

874 msg = f"Unknown provider type: {provider_type}. Supported: 'inmemory', 'keycloak'" 

875 raise ValueError(msg)