Coverage for src / mcp_server_langgraph / api / scim.py: 40%
165 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2SCIM 2.0 API Endpoints
4Implements SCIM 2.0 protocol for user and group provisioning.
5Bridges to Keycloak Admin API for actual user management.
7See ADR-0038 for SCIM implementation approach.
9Endpoints:
10- POST /scim/v2/Users - Create user
11- GET /scim/v2/Users/{id} - Get user
12- PUT /scim/v2/Users/{id} - Replace user
13- PATCH /scim/v2/Users/{id} - Update user
14- DELETE /scim/v2/Users/{id} - Deactivate user
15- GET /scim/v2/Users?filter=... - Search users
16- POST /scim/v2/Groups - Create group
17- GET /scim/v2/Groups/{id} - Get group
18- PATCH /scim/v2/Groups/{id} - Update group
20References:
21- RFC 7643: https://datatracker.ietf.org/doc/html/rfc7643
22- RFC 7644: https://datatracker.ietf.org/doc/html/rfc7644
23"""
25from typing import Any
27from fastapi import APIRouter, Depends, HTTPException, Query, status
28from fastapi.responses import JSONResponse
30from mcp_server_langgraph.auth.keycloak import KeycloakClient
31from mcp_server_langgraph.auth.middleware import get_current_user
32from mcp_server_langgraph.auth.openfga import OpenFGAClient
33from mcp_server_langgraph.core.dependencies import get_keycloak_client, get_openfga_client
34from mcp_server_langgraph.scim.schema import (
35 SCIMError,
36 SCIMGroup,
37 SCIMListResponse,
38 SCIMMember,
39 SCIMPatchRequest,
40 SCIMUser,
41 keycloak_to_scim_user,
42 user_to_keycloak,
43 validate_scim_group,
44 validate_scim_user,
45)
47router = APIRouter(
48 prefix="/scim/v2",
49 tags=["SCIM 2.0"],
50)
53# Error response helper
54def scim_error(status_code: int, detail: str, scim_type: str | None = None) -> JSONResponse:
55 """Return SCIM-formatted error response"""
56 error = SCIMError(
57 status=status_code,
58 detail=detail,
59 scimType=scim_type,
60 )
61 return JSONResponse(
62 status_code=status_code,
63 content=error.model_dump(exclude_none=True),
64 )
67# Authorization Helpers
70async def _require_admin_or_scim_role(
71 current_user: dict[str, Any], openfga: Any | None = None, resource: str = "scim:users"
72) -> None:
73 """
74 Validate that the current user has admin or SCIM provisioner role.
76 SECURITY (OpenAI Codex Finding #6):
77 Enhanced to support OpenFGA relation-based authorization for fine-grained control.
79 Authorization rules (checked in order):
80 1. OpenFGA: Check can_provision_users relation (if OpenFGA available)
81 2. Admin role: Users with 'admin' role can perform SCIM operations
82 3. SCIM provisioner role: Service accounts with 'scim-provisioner' role
83 4. Deny: All other cases
85 Args:
86 current_user: The authenticated user making the request
87 openfga: Optional OpenFGA client for relation-based checks
88 resource: Resource identifier for OpenFGA (default: "scim:users")
90 Raises:
91 HTTPException: 403 Forbidden if user lacks required permissions
93 References:
94 - OpenAI Codex Finding #6: Ad-hoc role lists → OpenFGA relations
95 - CWE-863: Incorrect Authorization
96 """
97 user_id = current_user.get("user_id", f"user:{current_user.get('username', '')}")
98 user_roles = current_user.get("roles", [])
100 # ENHANCEMENT (OpenAI Codex Finding #6): Check OpenFGA relation first
101 if openfga is not None: 101 ↛ 131line 101 didn't jump to line 131 because the condition on line 101 was always true
102 try:
103 # Check can_provision_users relation
104 authorized = await openfga.check_permission(
105 user=user_id, relation="can_provision_users", object=resource, context=None
106 )
108 if authorized:
109 from mcp_server_langgraph.observability.telemetry import logger
111 logger.info(
112 "SCIM authorization granted via OpenFGA relation",
113 extra={
114 "user_id": user_id,
115 "relation": "can_provision_users",
116 "resource": resource,
117 },
118 )
119 return # Authorized via OpenFGA
121 except Exception as e:
122 # Log error but continue to role-based fallback
123 from mcp_server_langgraph.observability.telemetry import logger
125 logger.warning(
126 f"OpenFGA check failed for SCIM authorization, falling back to roles: {e}",
127 extra={"user_id": user_id, "resource": resource},
128 )
130 # FALLBACK: Check for admin or SCIM provisioner role
131 if "admin" in user_roles or "scim-provisioner" in user_roles: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 return # Authorized
134 # Deny access
135 raise HTTPException(
136 status_code=status.HTTP_403_FORBIDDEN,
137 detail=(
138 "SCIM identity management operations require admin privileges, SCIM provisioner role, "
139 f"or can_provision_users OpenFGA relation. Your roles: {user_roles}."
140 ),
141 )
144# User Endpoints
147@router.post("/Users", status_code=status.HTTP_201_CREATED)
148async def create_user(
149 user_data: dict[str, Any],
150 current_user: dict[str, Any] = Depends(get_current_user),
151 keycloak: KeycloakClient = Depends(get_keycloak_client),
152 openfga: OpenFGAClient = Depends(get_openfga_client),
153) -> SCIMUser:
154 """
155 Create a new user (SCIM 2.0)
157 Provisions user in Keycloak and syncs roles to OpenFGA.
159 Example:
160 ```json
161 {
162 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
163 "userName": "alice@example.com",
164 "name": {
165 "givenName": "Alice",
166 "familyName": "Smith"
167 },
168 "emails": [{
169 "value": "alice@example.com",
170 "primary": true
171 }],
172 "active": true
173 }
174 ```
175 """
176 # SECURITY FIX (CWE-862): Require admin or SCIM provisioner role
177 await _require_admin_or_scim_role(current_user, openfga=openfga, resource="scim:users")
179 try:
180 # Validate SCIM schema
181 scim_user = validate_scim_user(user_data)
183 # Convert to Keycloak user
184 keycloak_user = user_to_keycloak(scim_user)
186 # Create in Keycloak
187 user_id = await keycloak.create_user(keycloak_user)
189 # Set password if provided
190 if scim_user.password:
191 await keycloak.set_user_password(user_id, scim_user.password, temporary=False)
193 # Sync to OpenFGA (assign default roles)
194 # Note: sync_user_to_openfga requires KeycloakUser, not just user_id
195 # For now, skip sync or implement proper user retrieval
196 # from mcp_server_langgraph.auth.keycloak import sync_user_to_openfga
197 # await sync_user_to_openfga(keycloak_user_obj, openfga)
199 # Get created user and convert back to SCIM
200 created_user = await keycloak.get_user(user_id)
202 if not created_user:
203 raise HTTPException(status_code=500, detail="Failed to retrieve created user")
205 response_user = keycloak_to_scim_user(created_user)
207 return response_user
209 except ValueError as e:
210 raise HTTPException(status_code=400, detail=str(e))
211 except Exception as e:
212 raise HTTPException(status_code=500, detail=f"Failed to create user: {e!s}")
215@router.get("/Users/{user_id}")
216async def get_user(
217 user_id: str,
218 current_user: dict[str, Any] = Depends(get_current_user),
219 keycloak: KeycloakClient = Depends(get_keycloak_client),
220) -> SCIMUser:
221 """
222 Get user by ID (SCIM 2.0)
224 Returns user in SCIM format.
225 """
226 try:
227 # Get user from Keycloak
228 keycloak_user = await keycloak.get_user(user_id)
230 if not keycloak_user:
231 raise HTTPException(status_code=404, detail=f"User {user_id} not found")
233 # Convert to SCIM format
234 scim_user = keycloak_to_scim_user(keycloak_user)
235 return scim_user
237 except HTTPException:
238 raise
239 except Exception as e:
240 raise HTTPException(status_code=500, detail=f"Failed to get user: {e!s}")
243@router.put("/Users/{user_id}", response_model=SCIMUser)
244async def replace_user(
245 user_id: str,
246 user_data: dict[str, Any],
247 current_user: dict[str, Any] = Depends(get_current_user),
248 keycloak: KeycloakClient = Depends(get_keycloak_client),
249 openfga: OpenFGAClient = Depends(get_openfga_client),
250) -> SCIMUser | JSONResponse:
251 """
252 Replace user (SCIM 2.0 PUT)
254 Replaces entire user resource.
255 """
256 # SECURITY FIX (CWE-862): Require admin or SCIM provisioner role
257 await _require_admin_or_scim_role(current_user, openfga=openfga, resource="scim:users")
259 try:
260 # Validate SCIM schema
261 scim_user = validate_scim_user(user_data)
263 # Convert to Keycloak user
264 keycloak_user = user_to_keycloak(scim_user)
266 # Update in Keycloak
267 await keycloak.update_user(user_id, keycloak_user)
269 # Get updated user
270 updated_user = await keycloak.get_user(user_id)
271 if not updated_user:
272 return scim_error(404, f"User {user_id} not found", "notFound")
273 response_user = keycloak_to_scim_user(updated_user)
275 return response_user
277 except ValueError as e:
278 return scim_error(400, str(e), "invalidValue")
279 except Exception as e:
280 return scim_error(500, f"Failed to update user: {e!s}", "internalError")
283@router.patch("/Users/{user_id}", response_model=SCIMUser)
284async def update_user(
285 user_id: str,
286 patch_request: SCIMPatchRequest,
287 current_user: dict[str, Any] = Depends(get_current_user),
288 keycloak: KeycloakClient = Depends(get_keycloak_client),
289 openfga: OpenFGAClient = Depends(get_openfga_client),
290) -> SCIMUser | JSONResponse:
291 """
292 Update user with PATCH operations (SCIM 2.0)
294 Supports add, remove, replace operations.
296 Example:
297 ```json
298 {
299 "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
300 "Operations": [
301 {
302 "op": "replace",
303 "path": "active",
304 "value": false
305 }
306 ]
307 }
308 ```
309 """
310 # SECURITY FIX (CWE-862): Require admin or SCIM provisioner role
311 await _require_admin_or_scim_role(current_user, openfga=openfga, resource="scim:users")
313 try:
314 # Get current user
315 keycloak_user = await keycloak.get_user(user_id)
317 if not keycloak_user:
318 return scim_error(404, f"User {user_id} not found", "notFound")
320 # Apply PATCH operations
321 for operation in patch_request.Operations:
322 if operation.path == "active":
323 keycloak_user["enabled"] = operation.value
324 elif operation.path == "emails" and operation.op == "replace" and operation.value:
325 keycloak_user["email"] = operation.value[0]["value"]
326 # Add more PATCH operation handlers as needed
328 # Update in Keycloak
329 await keycloak.update_user(user_id, keycloak_user)
331 # Get updated user
332 updated_user = await keycloak.get_user(user_id)
333 if not updated_user:
334 return scim_error(404, f"User {user_id} not found", "notFound")
335 response_user = keycloak_to_scim_user(updated_user)
337 return response_user
339 except Exception as e:
340 return scim_error(500, f"Failed to patch user: {e!s}", "internalError")
343@router.delete("/Users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
344async def delete_user(
345 user_id: str,
346 current_user: dict[str, Any] = Depends(get_current_user),
347 keycloak: KeycloakClient = Depends(get_keycloak_client),
348 openfga: OpenFGAClient = Depends(get_openfga_client),
349) -> None:
350 """
351 Delete (deactivate) user (SCIM 2.0)
353 Deactivates user in Keycloak and removes OpenFGA tuples.
354 """
355 # SECURITY FIX (CWE-862): Require admin or SCIM provisioner role
356 await _require_admin_or_scim_role(current_user, openfga=openfga, resource="scim:users")
358 try:
359 # Soft delete - disable user
360 await keycloak.update_user(user_id, {"enabled": False})
362 # Remove OpenFGA tuples
363 await openfga.delete_tuples_for_object(f"user:{user_id}")
365 except Exception as e:
366 # For 204 responses, we must raise HTTPException not return error body
367 raise HTTPException(status_code=500, detail=f"Failed to delete user: {e!s}")
370@router.get("/Users", response_model=SCIMListResponse)
371async def list_users(
372 filter: str | None = Query(None, description="SCIM filter expression"),
373 startIndex: int = Query(1, ge=1, description="1-based start index"),
374 count: int = Query(100, ge=1, le=1000, description="Number of results"),
375 current_user: dict[str, Any] = Depends(get_current_user),
376 keycloak: KeycloakClient = Depends(get_keycloak_client),
377) -> SCIMListResponse | JSONResponse:
378 """
379 List/search users (SCIM 2.0)
381 Supports SCIM filtering with 'eq' (equals) and 'sw' (startsWith) operators:
382 - userName eq "alice" - exact match
383 - userName sw "ali" - prefix match
384 - email eq "alice@example.com" - exact match
385 - email sw "alice@" - prefix match
386 """
387 try:
388 # Parse filter (simple implementation)
389 query = {}
390 if filter:
391 # SECURITY: Safe SCIM filter parsing with error handling (CWE-20 prevention)
392 # Supported filters:
393 # - "userName eq \"alice\"" - exact match
394 # - "userName sw \"ali\"" - startsWith (prefix match)
395 # - "email eq \"alice@example.com\"" - exact match
396 # - "email sw \"alice@\"" - startsWith (prefix match)
397 try:
398 if "userName sw" in filter:
399 # startsWith - use Keycloak prefix search
400 parts = filter.split('"')
401 if len(parts) >= 2: 401 ↛ 432line 401 didn't jump to line 432 because the condition on line 401 was always true
402 username = parts[1]
403 query["username"] = username
404 # No "exact" flag = prefix search in Keycloak
405 elif "userName eq" in filter:
406 # equals - use exact match
407 parts = filter.split('"')
408 if len(parts) >= 2:
409 username = parts[1]
410 query["username"] = username
411 query["exact"] = "true"
412 elif "email sw" in filter:
413 # startsWith for email
414 parts = filter.split('"')
415 if len(parts) >= 2: 415 ↛ 432line 415 didn't jump to line 432 because the condition on line 415 was always true
416 email = parts[1]
417 query["email"] = email
418 # No "exact" flag = prefix search
419 elif "email eq" in filter: 419 ↛ 432line 419 didn't jump to line 432 because the condition on line 419 was always true
420 # equals for email
421 parts = filter.split('"')
422 if len(parts) >= 2: 422 ↛ 432line 422 didn't jump to line 432 because the condition on line 422 was always true
423 email = parts[1]
424 query["email"] = email
425 query["exact"] = "true"
426 except (IndexError, ValueError):
427 # Malformed filter - continue with empty query (fail-safe, return no results)
428 # Don't raise - prevents DoS via malformed SCIM queries
429 pass
431 # Get users from Keycloak
432 users = await keycloak.search_users(query=query, first=startIndex - 1, max=count)
434 # Convert to SCIM format
435 scim_users = [keycloak_to_scim_user(user) for user in users]
437 return SCIMListResponse(
438 totalResults=len(scim_users),
439 startIndex=startIndex,
440 itemsPerPage=len(scim_users),
441 Resources=scim_users,
442 )
444 except Exception as e:
445 return scim_error(500, f"Failed to list users: {e!s}", "internalError")
448# Group Endpoints
451@router.post("/Groups", response_model=SCIMGroup, status_code=status.HTTP_201_CREATED)
452async def create_group(
453 group_data: dict[str, Any],
454 current_user: dict[str, Any] = Depends(get_current_user),
455 keycloak: KeycloakClient = Depends(get_keycloak_client),
456 openfga: OpenFGAClient = Depends(get_openfga_client),
457) -> SCIMGroup | JSONResponse:
458 """
459 Create a new group (SCIM 2.0)
461 Example:
462 ```json
463 {
464 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
465 "displayName": "Engineering",
466 "members": [
467 {"value": "user-id-123", "display": "Alice Smith"}
468 ]
469 }
470 ```
471 """
472 # SECURITY FIX (CWE-862): Require admin or SCIM provisioner role
473 await _require_admin_or_scim_role(current_user, openfga=openfga, resource="scim:users")
475 try:
476 # Validate SCIM schema
477 scim_group = validate_scim_group(group_data)
479 # Create group in Keycloak
480 group_config = {
481 "name": scim_group.displayName,
482 }
484 group_id = await keycloak.create_group(group_config)
486 # Add members
487 for member in scim_group.members:
488 await keycloak.add_user_to_group(member.value, group_id)
490 # Get created group
491 created_group = await keycloak.get_group(group_id)
492 if not created_group:
493 return scim_error(500, "Failed to retrieve created group", "internalError")
495 return SCIMGroup(
496 id=created_group["id"],
497 displayName=created_group["name"],
498 members=scim_group.members,
499 meta={
500 "resourceType": "Group",
501 "created": created_group.get("createdTimestamp"),
502 },
503 )
505 except ValueError as e:
506 return scim_error(400, str(e), "invalidValue")
507 except Exception as e:
508 return scim_error(500, f"Failed to create group: {e!s}", "internalError")
511@router.get("/Groups/{group_id}", response_model=SCIMGroup)
512async def get_group(
513 group_id: str,
514 current_user: dict[str, Any] = Depends(get_current_user),
515 keycloak: KeycloakClient = Depends(get_keycloak_client),
516) -> SCIMGroup | JSONResponse:
517 """Get group by ID (SCIM 2.0)"""
518 try:
519 group = await keycloak.get_group(group_id)
521 if not group:
522 return scim_error(404, f"Group {group_id} not found", "notFound")
524 # Get group members
525 members = await keycloak.get_group_members(group_id)
527 scim_members = [SCIMMember(value=member["id"], display=member.get("username"), reference=None) for member in members]
529 return SCIMGroup(
530 id=group["id"],
531 displayName=group["name"],
532 members=scim_members,
533 meta={"resourceType": "Group"},
534 )
536 except Exception as e:
537 return scim_error(500, f"Failed to get group: {e!s}", "internalError")