Coverage for src / mcp_server_langgraph / auth / user_provider.py: 86%
263 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Abstract user provider interface for pluggable authentication backends
4Enables switching between different user management systems:
5- InMemoryUserProvider (development/testing)
6- KeycloakUserProvider (production)
7- Custom providers (future extensions)
8"""
10from abc import ABC, abstractmethod
11from datetime import datetime, timedelta, UTC
12from typing import Any, TypedDict
14import jwt
15from pydantic import BaseModel, ConfigDict, Field
17from mcp_server_langgraph.auth.keycloak import KeycloakClient, KeycloakConfig, sync_user_to_openfga
18from mcp_server_langgraph.observability.telemetry import logger, tracer
20# Try to import bcrypt for password hashing
21try:
22 import bcrypt
24 BCRYPT_AVAILABLE = True
25except ImportError:
26 BCRYPT_AVAILABLE = False
27 # Note: logger warning deferred to InMemoryUserProvider.__init__
28 # to avoid import-time observability dependency
30# ============================================================================
31# Pydantic Models for Type-Safe Authentication Responses
32# ============================================================================
35class UserDBEntry(TypedDict):
36 """Type definition for user database entries"""
38 user_id: str
39 email: str
40 password: str
41 roles: list[str]
42 active: bool
45class UserData(BaseModel):
46 """
47 Type-safe user data structure
49 Represents user information returned from user queries.
50 """
52 user_id: str = Field(..., description="User identifier (e.g., 'user:alice')")
53 username: str = Field(..., description="Username")
54 email: str = Field(..., description="Email address")
55 roles: list[str] = Field(default_factory=list, description="User roles")
56 active: bool = Field(default=True, description="Whether user account is active")
58 model_config = ConfigDict(
59 frozen=False,
60 validate_assignment=True,
61 str_strip_whitespace=True,
62 json_schema_extra={
63 "example": {
64 "user_id": "user:alice",
65 "username": "alice",
66 "email": "alice@acme.com",
67 "roles": ["user", "premium"],
68 "active": True,
69 }
70 },
71 )
73 def to_dict(self) -> dict[str, Any]:
74 """Convert to dictionary for backward compatibility"""
75 return self.model_dump()
77 @classmethod
78 def from_dict(cls, data: dict[str, Any]) -> "UserData":
79 """Create UserData from dictionary"""
80 return cls(**data)
83class AuthResponse(BaseModel):
84 """
85 Type-safe authentication response
87 Returned from authenticate() operations.
88 """
90 authorized: bool = Field(..., description="Whether authentication was successful")
91 username: str | None = Field(default=None, description="Username if authorized")
92 user_id: str | None = Field(default=None, description="User ID if authorized")
93 email: str | None = Field(default=None, description="Email if authorized")
94 roles: list[str] = Field(default_factory=list, description="User roles if authorized")
95 reason: str | None = Field(default=None, description="Failure reason if not authorized")
96 error: str | None = Field(default=None, description="Error details if not authorized")
97 # Keycloak-specific fields (optional)
98 access_token: str | None = Field(default=None, description="JWT access token (Keycloak)")
99 refresh_token: str | None = Field(default=None, description="JWT refresh token (Keycloak)")
100 expires_in: int | None = Field(default=None, description="Token expiration in seconds (Keycloak)")
102 model_config = ConfigDict(
103 frozen=False,
104 validate_assignment=True,
105 str_strip_whitespace=True,
106 json_schema_extra={
107 "example": {
108 "authorized": True,
109 "username": "alice",
110 "user_id": "user:alice",
111 "email": "alice@acme.com",
112 "roles": ["user", "premium"],
113 "reason": None,
114 "error": None,
115 "access_token": "eyJ...",
116 "refresh_token": "eyJ...",
117 "expires_in": 300,
118 }
119 },
120 )
122 def to_dict(self) -> dict[str, Any]:
123 """Convert to dictionary for backward compatibility"""
124 return self.model_dump(exclude_none=True)
126 @classmethod
127 def from_dict(cls, data: dict[str, Any]) -> "AuthResponse":
128 """Create AuthResponse from dictionary"""
129 return cls(**data)
132class TokenVerification(BaseModel):
133 """
134 Type-safe token verification response
136 Returned from verify_token() operations.
137 """
139 valid: bool = Field(..., description="Whether token is valid")
140 payload: dict[str, Any] | None = Field(default=None, description="Token payload if valid")
141 error: str | None = Field(default=None, description="Error message if not valid")
143 model_config = ConfigDict(
144 frozen=False,
145 validate_assignment=True,
146 json_schema_extra={"example": {"valid": True, "payload": {"sub": "user:alice", "exp": 1234567890}, "error": None}},
147 )
149 def to_dict(self) -> dict[str, Any]:
150 """Convert to dictionary for backward compatibility"""
151 return self.model_dump(exclude_none=True)
153 @classmethod
154 def from_dict(cls, data: dict[str, Any]) -> "TokenVerification":
155 """Create TokenVerification from dictionary"""
156 return cls(**data)
159class PasswordVerification(BaseModel):
160 """
161 Type-safe password verification response
163 Returned from verify_password() operations.
164 Provides structured response for password validation with user data on success.
165 """
167 valid: bool = Field(..., description="Whether password is valid")
168 user: dict[str, Any] | None = Field(default=None, description="User data if password valid")
169 error: str | None = Field(default=None, description="Error message if password invalid")
171 model_config = ConfigDict(
172 frozen=False,
173 validate_assignment=True,
174 json_schema_extra={
175 "example": {
176 "valid": True,
177 "user": {"username": "alice", "user_id": "user:alice", "email": "alice@example.com", "roles": ["user"]},
178 "error": None,
179 }
180 },
181 )
183 def to_dict(self) -> dict[str, Any]:
184 """Convert to dictionary for backward compatibility"""
185 return self.model_dump(exclude_none=True)
187 @classmethod
188 def from_dict(cls, data: dict[str, Any]) -> "PasswordVerification":
189 """Create PasswordVerification from dictionary"""
190 return cls(**data)
193class UserProvider(ABC):
194 """
195 Abstract base class for user providers
197 Defines the interface that all authentication backends must implement.
198 """
200 @abstractmethod
201 async def authenticate(self, username: str, password: str | None = None) -> AuthResponse:
202 """
203 Authenticate user by username/password
205 Args:
206 username: Username
207 password: Password (optional for some providers)
209 Returns:
210 AuthResponse with authentication result
211 """
213 @abstractmethod
214 async def get_user_by_id(self, user_id: str) -> UserData | None:
215 """
216 Get user by user ID
218 Args:
219 user_id: User identifier (e.g., "user:alice")
221 Returns:
222 UserData or None if not found
223 """
225 @abstractmethod
226 async def get_user_by_username(self, username: str) -> UserData | None:
227 """
228 Get user by username
230 Args:
231 username: Username
233 Returns:
234 UserData or None if not found
235 """
237 @abstractmethod
238 async def verify_token(self, token: str) -> TokenVerification:
239 """
240 Verify JWT token
242 Args:
243 token: JWT token to verify
245 Returns:
246 TokenVerification with validation result
247 """
249 @abstractmethod
250 async def verify_password(self, username: str, password: str) -> PasswordVerification:
251 """
252 Verify username and password combination
254 Convenience method for testing and validation scenarios.
255 For full authentication with token generation, use authenticate() instead.
257 Args:
258 username: Username to verify
259 password: Password to verify
261 Returns:
262 PasswordVerification with validation result and user data if valid
263 """
265 @abstractmethod
266 async def list_users(self) -> list[UserData]:
267 """
268 List all users (for admin operations)
270 Returns:
271 List of UserData objects
272 """
275class InMemoryUserProvider(UserProvider):
276 """
277 In-memory user provider for development and testing
279 Features:
280 - Optional bcrypt password hashing (install bcrypt package)
281 - JWT token generation
282 - User lookup and authentication
284 NOT suitable for large-scale production (use KeycloakUserProvider instead).
285 """
287 def __init__(self, secret_key: str | None = None, use_password_hashing: bool = False) -> None:
288 """
289 Initialize in-memory user provider
291 Args:
292 secret_key: Secret key for JWT token signing.
293 Required for production use. Should be loaded from environment variables.
294 use_password_hashing: Enable bcrypt password hashing (requires bcrypt package).
295 If True and bcrypt not available, falls back to plaintext.
296 """
297 if not secret_key:
298 # Use a random key for testing/development only
299 import secrets as secrets_module
301 secret_key = secrets_module.token_urlsafe(32)
302 logger.warning(
303 "No secret_key provided to InMemoryUserProvider. Using random key. "
304 "This is only suitable for testing/development. "
305 "In production, always provide a secure secret key via environment variables."
306 )
307 self.secret_key = secret_key
309 # Check bcrypt availability if hashing requested
310 if use_password_hashing and not BCRYPT_AVAILABLE: 310 ↛ 312line 310 didn't jump to line 312 because the condition on line 310 was never true
311 # SECURITY: Fail-closed pattern - refuse to start with insecure config
312 msg = (
313 "CRITICAL SECURITY ERROR: Password hashing requested (use_password_hashing=True) "
314 "but bcrypt library is not available. "
315 "This would result in INSECURE plaintext passwords. "
316 "\n\nTo fix:\n"
317 "1. Install bcrypt: Add 'bcrypt' to pyproject.toml dependencies, then run: uv sync\n"
318 "2. Or disable password hashing: USE_PASSWORD_HASHING=false (NOT recommended for production)\n"
319 "\nRefusing to start with insecure configuration."
320 )
321 raise RuntimeError(msg)
323 self.use_password_hashing = use_password_hashing and BCRYPT_AVAILABLE
325 # SECURITY (OpenAI Codex Finding #2): Start with empty user database
326 # No hard-coded credentials. Users must be explicitly created via add_user() or configuration.
327 # This prevents CWE-798: Use of Hard-coded Credentials
328 self.users_db: dict[str, UserDBEntry] = {}
330 if self.use_password_hashing:
331 logger.info(
332 "InMemoryUserProvider initialized with BCRYPT password hashing (secure)",
333 extra={"user_count": len(self.users_db)},
334 )
335 else:
336 logger.warning(
337 "InMemoryUserProvider initialized with PLAINTEXT PASSWORDS (INSECURE). "
338 "This is only suitable for development/testing. "
339 "Use password hashing (use_password_hashing=True) or KeycloakUserProvider for production.",
340 extra={"user_count": len(self.users_db)},
341 )
343 def _hash_password(self, password: str) -> str:
344 """
345 Hash password using bcrypt
347 Args:
348 password: Plaintext password
350 Returns:
351 Bcrypt password hash
352 """
353 if not BCRYPT_AVAILABLE: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true
354 return password # Fallback to plaintext
356 password_bytes = password.encode("utf-8")
358 # bcrypt 5.0+ enforces 72-byte password limit (raises ValueError instead of silent truncation)
359 if len(password_bytes) > 72: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true
360 msg = (
361 f"Password exceeds bcrypt's 72-byte limit ({len(password_bytes)} bytes). "
362 "Consider hashing long passwords with SHA256 before bcrypt."
363 )
364 raise ValueError(msg)
366 salt = bcrypt.gensalt()
367 hashed = bcrypt.hashpw(password_bytes, salt)
368 return hashed.decode("utf-8")
370 def _verify_password(self, password: str, password_hash: str) -> bool:
371 """
372 Verify password against hash
374 Args:
375 password: Plaintext password to verify
376 password_hash: Stored password hash (or plaintext if hashing disabled)
378 Returns:
379 True if password matches, False otherwise
380 """
381 if not self.use_password_hashing:
382 # Plaintext comparison (insecure, dev/test only)
383 return password == password_hash
385 # Bcrypt comparison (secure)
386 password_bytes = password.encode("utf-8")
388 # Silently handle overly long passwords during verification
389 # (they would have been rejected during hashing with bcrypt 5.0+)
390 if len(password_bytes) > 72: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true
391 return False
393 hash_bytes = password_hash.encode("utf-8")
394 return bcrypt.checkpw(password_bytes, hash_bytes)
396 async def verify_password(self, username: str, password: str) -> PasswordVerification:
397 """
398 Verify username and password combination
400 Convenience method for testing and validation scenarios.
401 Returns structured PasswordVerification with user data on success.
403 For full authentication with token generation, use authenticate() instead.
405 Args:
406 username: Username to verify
407 password: Password to verify
409 Returns:
410 PasswordVerification with validation result and user data if valid
412 Security:
413 - Uses constant-time comparison for password hashes when bcrypt is available
414 - Does not reveal whether username exists (timing-safe where possible)
415 - Returns same error structure for non-existent users and invalid passwords
416 """
417 with tracer.start_as_current_span("inmemory.verify_password") as span:
418 span.set_attribute("auth.username", username)
420 # Validate inputs
421 if not username or not password:
422 span.set_attribute("auth.result", "invalid_input")
423 return PasswordVerification(valid=False, error="Username and password are required")
425 # Check if user exists
426 if username not in self.users_db:
427 span.set_attribute("auth.result", "user_not_found")
428 # Don't reveal user existence - return generic error
429 return PasswordVerification(valid=False, error="Invalid credentials")
431 user = self.users_db[username]
433 # Verify password
434 if not self._verify_password(password, user["password"]):
435 span.set_attribute("auth.result", "invalid_password")
436 return PasswordVerification(valid=False, error="Invalid credentials")
438 # Success - return user data
439 span.set_attribute("auth.result", "success")
440 return PasswordVerification(
441 valid=True,
442 user={
443 "username": username,
444 "user_id": user["user_id"],
445 "email": user["email"],
446 "roles": user["roles"],
447 },
448 )
450 def add_user(self, username: str, password: str, email: str, roles: list[str], user_id: str | None = None) -> None:
451 """
452 Add a new user to the in-memory database
454 IMPORTANT: InMemoryUserProvider no longer seeds default users for security.
455 You must explicitly create users for testing/development using this method.
457 Example:
458 provider = InMemoryUserProvider(use_password_hashing=True)
459 provider.add_user(
460 username="testuser",
461 password="secure-password-123",
462 email="testuser@example.com",
463 roles=["user", "premium"]
464 )
466 Args:
467 username: Username (unique identifier)
468 password: Plaintext password (will be hashed if use_password_hashing=True)
469 email: Email address
470 roles: List of role names (e.g., ["user"], ["admin"], ["user", "premium"])
471 user_id: Optional user_id override for testing (e.g., worker-safe IDs in pytest-xdist)
473 Security:
474 - Passwords are automatically hashed with bcrypt if use_password_hashing=True
475 - For production, use KeycloakUserProvider instead of InMemoryUserProvider
476 """
477 stored_password = self._hash_password(password) if self.use_password_hashing else password
478 generated_user_id = user_id if user_id is not None else f"user:{username}"
480 self.users_db[username] = UserDBEntry(
481 user_id=generated_user_id,
482 email=email,
483 password=stored_password,
484 roles=roles,
485 active=True,
486 )
488 logger.info(f"Added user: {username}", extra={"user_id": generated_user_id, "roles": roles})
490 async def authenticate(self, username: str, password: str | None = None) -> AuthResponse:
491 """
492 Authenticate user by username and password
494 SECURITY: This method now requires a password for authentication.
495 Plaintext comparison is used for development/testing only.
496 Use KeycloakUserProvider or implement password hashing for production.
498 Args:
499 username: Username to authenticate
500 password: Password (REQUIRED for InMemoryUserProvider)
502 Returns:
503 AuthResponse with authentication result
504 """
505 with tracer.start_as_current_span("inmemory.authenticate") as span:
506 span.set_attribute("auth.username", username)
508 # SECURITY: Require password
509 if not password:
510 logger.warning("Authentication failed - password required", extra={"username": username})
511 return AuthResponse(authorized=False, reason="password_required")
513 # Check user database
514 if username not in self.users_db:
515 logger.warning("Authentication failed - user not found", extra={"username": username})
516 # Use same error message as invalid password to prevent username enumeration
517 return AuthResponse(authorized=False, reason="invalid_credentials")
519 user = self.users_db[username]
521 # Check if account is active
522 if not user["active"]:
523 logger.warning("User account inactive", extra={"username": username})
524 return AuthResponse(authorized=False, reason="account_inactive")
526 # SECURITY: Validate password using bcrypt (if enabled) or plaintext comparison
527 if not self._verify_password(password, user["password"]):
528 logger.warning("Authentication failed - invalid password", extra={"username": username})
529 return AuthResponse(authorized=False, reason="invalid_credentials")
531 logger.info("User authenticated", extra={"username": username, "user_id": user["user_id"]})
533 return AuthResponse(
534 authorized=True,
535 username=username,
536 user_id=user["user_id"],
537 email=user["email"],
538 roles=user["roles"],
539 )
541 async def get_user_by_id(self, user_id: str) -> UserData | None:
542 """Get user by ID"""
543 # Extract username from user_id format "user:username"
544 # Also handle worker-safe IDs from pytest-xdist (e.g., "user:test_gw0_alice" → "alice")
545 if ":" in user_id: 545 ↛ 553line 545 didn't jump to line 553 because the condition on line 545 was always true
546 id_part = user_id.split(":", 1)[1] # Remove "user:" prefix
547 # Check if it's a worker-safe ID (format: test_gw\d+_username)
548 import re
550 match = re.match(r"test_gw\d+_(.*)", id_part)
551 username = match.group(1) if match else id_part
552 else:
553 username = user_id
554 return await self.get_user_by_username(username)
556 async def get_user_by_username(self, username: str) -> UserData | None:
557 """Get user by username"""
558 if username in self.users_db:
559 user_data = self.users_db[username]
560 return UserData(
561 username=username,
562 user_id=user_data["user_id"],
563 email=user_data["email"],
564 roles=user_data["roles"],
565 active=user_data["active"],
566 )
567 return None
569 async def verify_token(self, token: str) -> TokenVerification:
570 """Verify self-issued JWT token"""
571 try:
572 payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
573 logger.info("Token verified", extra={"user_id": payload.get("sub")})
574 return TokenVerification(valid=True, payload=payload)
575 except jwt.ExpiredSignatureError:
576 logger.warning("Token expired")
577 return TokenVerification(valid=False, error="Token expired")
578 except jwt.InvalidTokenError as e:
579 logger.warning(f"Invalid token: {e}")
580 return TokenVerification(valid=False, error="Invalid token")
582 async def list_users(self) -> list[UserData]:
583 """List all users"""
584 return [
585 UserData(
586 username=username,
587 user_id=user_data["user_id"],
588 email=user_data["email"],
589 roles=user_data["roles"],
590 active=user_data["active"],
591 )
592 for username, user_data in self.users_db.items()
593 ]
595 def create_token(self, username: str, expires_in: int = 3600) -> str:
596 """
597 Create JWT token for user (helper method for in-memory provider)
599 Args:
600 username: Username
601 expires_in: Token expiration in seconds
603 Returns:
604 JWT token string
605 """
606 if username not in self.users_db:
607 msg = f"User not found: {username}"
608 raise ValueError(msg)
610 user = self.users_db[username]
612 # Use timestamp with microseconds for unique jti to ensure each token is different
613 now = datetime.now(UTC)
614 jti = f"{username}_{int(now.timestamp() * 1000000)}" # Microsecond precision
616 payload = {
617 "sub": user["user_id"],
618 "username": username,
619 "email": user["email"],
620 "roles": user["roles"],
621 "exp": now + timedelta(seconds=expires_in),
622 "iat": now,
623 "jti": jti, # Unique token ID to ensure each token is different
624 }
626 token = jwt.encode(payload, self.secret_key, algorithm="HS256")
628 logger.info("Token created", extra={"username": username, "user_id": user["user_id"], "expires_in": expires_in})
630 return token
633class KeycloakUserProvider(UserProvider):
634 """
635 Keycloak user provider for production
637 Uses Keycloak as the identity provider for authentication and user management.
638 """
640 def __init__(self, config: KeycloakConfig, openfga_client: Any | None = None, sync_on_login: bool = True) -> None:
641 """
642 Initialize Keycloak user provider
644 Args:
645 config: Keycloak configuration
646 openfga_client: Optional OpenFGA client for role synchronization
647 sync_on_login: Whether to sync roles to OpenFGA on login
648 """
649 self.client = KeycloakClient(config)
650 self.config = config
651 self.openfga_client = openfga_client
652 self.sync_on_login = sync_on_login
654 logger.info(
655 "Initialized KeycloakUserProvider",
656 extra={"realm": config.realm, "client_id": config.client_id, "sync_on_login": sync_on_login},
657 )
659 async def authenticate(self, username: str, password: str | None = None) -> AuthResponse:
660 """
661 Authenticate user using Keycloak
663 Args:
664 username: Username
665 password: Password
667 Returns:
668 AuthResponse with tokens
669 """
670 with tracer.start_as_current_span("keycloak.authenticate") as span:
671 span.set_attribute("auth.username", username)
673 if not password:
674 logger.warning("Password required for Keycloak authentication")
675 return AuthResponse(authorized=False, reason="password_required")
677 try:
678 # Authenticate with Keycloak
679 tokens = await self.client.authenticate_user(username, password)
681 # Get full user details for role sync
682 keycloak_user = await self.client.get_user_by_username(username)
684 if not keycloak_user: 684 ↛ 685line 684 didn't jump to line 685 because the condition on line 684 was never true
685 logger.error(f"User authenticated but not found in admin API: {username}")
686 return AuthResponse(authorized=False, reason="user_data_error")
688 # Sync to OpenFGA if enabled
689 if self.sync_on_login and self.openfga_client:
690 try:
691 await sync_user_to_openfga(keycloak_user, self.openfga_client)
692 except Exception as e:
693 logger.error(f"Failed to sync user to OpenFGA: {e}", exc_info=True)
694 # Don't fail authentication if sync fails
696 logger.info("User authenticated via Keycloak", extra={"username": username, "user_id": keycloak_user.user_id})
698 return AuthResponse(
699 authorized=True,
700 username=keycloak_user.username,
701 user_id=keycloak_user.user_id,
702 email=keycloak_user.email or "",
703 roles=keycloak_user.realm_roles,
704 access_token=tokens["access_token"],
705 refresh_token=tokens.get("refresh_token"),
706 expires_in=tokens.get("expires_in", 300),
707 )
709 except Exception as e:
710 logger.warning(f"Keycloak authentication failed for {username}: {e}")
711 return AuthResponse(authorized=False, reason="authentication_failed", error=str(e))
713 async def get_user_by_id(self, user_id: str) -> UserData | None:
714 """Get user by ID"""
715 # Extract username from user_id format "user:username"
716 username = user_id.split(":")[-1] if ":" in user_id else user_id
717 return await self.get_user_by_username(username)
719 async def get_user_by_username(self, username: str) -> UserData | None:
720 """Get user by username from Keycloak"""
721 try:
722 keycloak_user = await self.client.get_user_by_username(username)
724 if keycloak_user:
725 return UserData(
726 username=keycloak_user.username,
727 user_id=keycloak_user.user_id,
728 email=keycloak_user.email or "",
729 roles=keycloak_user.realm_roles,
730 active=keycloak_user.enabled,
731 )
733 return None
735 except Exception as e:
736 logger.error(f"Failed to get user {username}: {e}", exc_info=True)
737 return None
739 async def verify_token(self, token: str) -> TokenVerification:
740 """Verify Keycloak-issued JWT token"""
741 try:
742 payload = await self.client.verify_token(token)
743 return TokenVerification(valid=True, payload=payload)
744 except Exception as e:
745 return TokenVerification(valid=False, error=str(e))
747 async def verify_password(self, username: str, password: str) -> PasswordVerification:
748 """
749 Verify username and password combination using Keycloak
751 Convenience method for testing and validation scenarios.
752 Returns structured PasswordVerification with user data on success.
754 For full authentication with token generation, use authenticate() instead.
756 Args:
757 username: Username to verify
758 password: Password to verify
760 Returns:
761 PasswordVerification with validation result and user data if valid
763 Security:
764 - Delegates to Keycloak for password verification
765 - Uses Keycloak's secure password hashing and verification
766 - Returns same error structure for non-existent users and invalid passwords
767 """
768 with tracer.start_as_current_span("keycloak.verify_password") as span:
769 span.set_attribute("auth.username", username)
771 # Validate inputs
772 if not username or not password:
773 span.set_attribute("auth.result", "invalid_input")
774 return PasswordVerification(valid=False, error="Username and password are required")
776 try:
777 # Attempt authentication with Keycloak
778 auth_response = await self.authenticate(username, password)
780 if not auth_response.authorized:
781 span.set_attribute("auth.result", "authentication_failed")
782 return PasswordVerification(valid=False, error="Invalid credentials")
784 # Success - return user data
785 span.set_attribute("auth.result", "success")
786 return PasswordVerification(
787 valid=True,
788 user={
789 "username": auth_response.username,
790 "user_id": auth_response.user_id,
791 "email": auth_response.email,
792 "roles": auth_response.roles,
793 },
794 )
796 except Exception as e:
797 span.set_attribute("auth.result", "error")
798 logger.error(f"Password verification failed for {username}: {e}", exc_info=True)
799 return PasswordVerification(valid=False, error="Password verification failed")
801 async def list_users(self) -> list[UserData]:
802 """
803 List users (requires admin permissions)
805 Note: This is a placeholder. In production, you'd implement pagination
806 and filtering using Keycloak admin API.
807 """
808 logger.warning("list_users() not fully implemented for KeycloakUserProvider")
809 return []
811 async def refresh_token(self, refresh_token: str) -> dict[str, Any]:
812 """
813 Refresh access token
815 Args:
816 refresh_token: Refresh token
818 Returns:
819 New tokens
820 """
821 try:
822 tokens = await self.client.refresh_token(refresh_token)
823 return {"success": True, "tokens": tokens}
824 except Exception as e:
825 logger.error(f"Token refresh failed: {e}", exc_info=True)
826 return {"success": False, "error": str(e)}
829def create_user_provider(
830 provider_type: str = "inmemory",
831 secret_key: str | None = None,
832 keycloak_config: KeycloakConfig | None = None,
833 openfga_client: Any | None = None,
834) -> UserProvider:
835 """
836 Factory function to create user provider based on explicit parameters (test use)
838 This factory is designed for test code where you want explicit control
839 over provider configuration without requiring a Settings object.
841 For production code, use `mcp_server_langgraph.auth.factory.create_user_provider()`
842 which reads from application settings.
844 Args:
845 provider_type: Type of provider ("inmemory", "keycloak")
846 secret_key: Secret key for in-memory provider
847 keycloak_config: Keycloak configuration for keycloak provider
848 openfga_client: OpenFGA client for role synchronization
850 Returns:
851 User provider instance
853 Raises:
854 ValueError: If provider type is unknown or required config is missing
856 See Also:
857 mcp_server_langgraph.auth.factory.create_user_provider: Recommended for production use
858 """
859 provider_type = provider_type.lower()
861 if provider_type == "inmemory":
862 logger.info("Creating InMemoryUserProvider (development mode)")
863 return InMemoryUserProvider(secret_key=secret_key or "your-secret-key-change-in-production")
865 elif provider_type == "keycloak":
866 if not keycloak_config:
867 msg = "keycloak_config required for KeycloakUserProvider"
868 raise ValueError(msg)
870 logger.info("Creating KeycloakUserProvider (production mode)")
871 return KeycloakUserProvider(config=keycloak_config, openfga_client=openfga_client)
873 else:
874 msg = f"Unknown provider type: {provider_type}. Supported: 'inmemory', 'keycloak'"
875 raise ValueError(msg)