Coverage for src / mcp_server_langgraph / api / scim.py: 40%

165 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2SCIM 2.0 API Endpoints 

3 

4Implements SCIM 2.0 protocol for user and group provisioning. 

5Bridges to Keycloak Admin API for actual user management. 

6 

7See ADR-0038 for SCIM implementation approach. 

8 

9Endpoints: 

10- POST /scim/v2/Users - Create user 

11- GET /scim/v2/Users/{id} - Get user 

12- PUT /scim/v2/Users/{id} - Replace user 

13- PATCH /scim/v2/Users/{id} - Update user 

14- DELETE /scim/v2/Users/{id} - Deactivate user 

15- GET /scim/v2/Users?filter=... - Search users 

16- POST /scim/v2/Groups - Create group 

17- GET /scim/v2/Groups/{id} - Get group 

18- PATCH /scim/v2/Groups/{id} - Update group 

19 

20References: 

21- RFC 7643: https://datatracker.ietf.org/doc/html/rfc7643 

22- RFC 7644: https://datatracker.ietf.org/doc/html/rfc7644 

23""" 

24 

25from typing import Any 

26 

27from fastapi import APIRouter, Depends, HTTPException, Query, status 

28from fastapi.responses import JSONResponse 

29 

30from mcp_server_langgraph.auth.keycloak import KeycloakClient 

31from mcp_server_langgraph.auth.middleware import get_current_user 

32from mcp_server_langgraph.auth.openfga import OpenFGAClient 

33from mcp_server_langgraph.core.dependencies import get_keycloak_client, get_openfga_client 

34from mcp_server_langgraph.scim.schema import ( 

35 SCIMError, 

36 SCIMGroup, 

37 SCIMListResponse, 

38 SCIMMember, 

39 SCIMPatchRequest, 

40 SCIMUser, 

41 keycloak_to_scim_user, 

42 user_to_keycloak, 

43 validate_scim_group, 

44 validate_scim_user, 

45) 

46 

47router = APIRouter( 

48 prefix="/scim/v2", 

49 tags=["SCIM 2.0"], 

50) 

51 

52 

53# Error response helper 

54def scim_error(status_code: int, detail: str, scim_type: str | None = None) -> JSONResponse: 

55 """Return SCIM-formatted error response""" 

56 error = SCIMError( 

57 status=status_code, 

58 detail=detail, 

59 scimType=scim_type, 

60 ) 

61 return JSONResponse( 

62 status_code=status_code, 

63 content=error.model_dump(exclude_none=True), 

64 ) 

65 

66 

67# Authorization Helpers 

68 

69 

70async def _require_admin_or_scim_role( 

71 current_user: dict[str, Any], openfga: Any | None = None, resource: str = "scim:users" 

72) -> None: 

73 """ 

74 Validate that the current user has admin or SCIM provisioner role. 

75 

76 SECURITY (OpenAI Codex Finding #6): 

77 Enhanced to support OpenFGA relation-based authorization for fine-grained control. 

78 

79 Authorization rules (checked in order): 

80 1. OpenFGA: Check can_provision_users relation (if OpenFGA available) 

81 2. Admin role: Users with 'admin' role can perform SCIM operations 

82 3. SCIM provisioner role: Service accounts with 'scim-provisioner' role 

83 4. Deny: All other cases 

84 

85 Args: 

86 current_user: The authenticated user making the request 

87 openfga: Optional OpenFGA client for relation-based checks 

88 resource: Resource identifier for OpenFGA (default: "scim:users") 

89 

90 Raises: 

91 HTTPException: 403 Forbidden if user lacks required permissions 

92 

93 References: 

94 - OpenAI Codex Finding #6: Ad-hoc role lists → OpenFGA relations 

95 - CWE-863: Incorrect Authorization 

96 """ 

97 user_id = current_user.get("user_id", f"user:{current_user.get('username', '')}") 

98 user_roles = current_user.get("roles", []) 

99 

100 # ENHANCEMENT (OpenAI Codex Finding #6): Check OpenFGA relation first 

101 if openfga is not None: 101 ↛ 131line 101 didn't jump to line 131 because the condition on line 101 was always true

102 try: 

103 # Check can_provision_users relation 

104 authorized = await openfga.check_permission( 

105 user=user_id, relation="can_provision_users", object=resource, context=None 

106 ) 

107 

108 if authorized: 

109 from mcp_server_langgraph.observability.telemetry import logger 

110 

111 logger.info( 

112 "SCIM authorization granted via OpenFGA relation", 

113 extra={ 

114 "user_id": user_id, 

115 "relation": "can_provision_users", 

116 "resource": resource, 

117 }, 

118 ) 

119 return # Authorized via OpenFGA 

120 

121 except Exception as e: 

122 # Log error but continue to role-based fallback 

123 from mcp_server_langgraph.observability.telemetry import logger 

124 

125 logger.warning( 

126 f"OpenFGA check failed for SCIM authorization, falling back to roles: {e}", 

127 extra={"user_id": user_id, "resource": resource}, 

128 ) 

129 

130 # FALLBACK: Check for admin or SCIM provisioner role 

131 if "admin" in user_roles or "scim-provisioner" in user_roles: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 return # Authorized 

133 

134 # Deny access 

135 raise HTTPException( 

136 status_code=status.HTTP_403_FORBIDDEN, 

137 detail=( 

138 "SCIM identity management operations require admin privileges, SCIM provisioner role, " 

139 f"or can_provision_users OpenFGA relation. Your roles: {user_roles}." 

140 ), 

141 ) 

142 

143 

144# User Endpoints 

145 

146 

147@router.post("/Users", status_code=status.HTTP_201_CREATED) 

148async def create_user( 

149 user_data: dict[str, Any], 

150 current_user: dict[str, Any] = Depends(get_current_user), 

151 keycloak: KeycloakClient = Depends(get_keycloak_client), 

152 openfga: OpenFGAClient = Depends(get_openfga_client), 

153) -> SCIMUser: 

154 """ 

155 Create a new user (SCIM 2.0) 

156 

157 Provisions user in Keycloak and syncs roles to OpenFGA. 

158 

159 Example: 

160 ```json 

161 { 

162 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], 

163 "userName": "alice@example.com", 

164 "name": { 

165 "givenName": "Alice", 

166 "familyName": "Smith" 

167 }, 

168 "emails": [{ 

169 "value": "alice@example.com", 

170 "primary": true 

171 }], 

172 "active": true 

173 } 

174 ``` 

175 """ 

176 # SECURITY FIX (CWE-862): Require admin or SCIM provisioner role 

177 await _require_admin_or_scim_role(current_user, openfga=openfga, resource="scim:users") 

178 

179 try: 

180 # Validate SCIM schema 

181 scim_user = validate_scim_user(user_data) 

182 

183 # Convert to Keycloak user 

184 keycloak_user = user_to_keycloak(scim_user) 

185 

186 # Create in Keycloak 

187 user_id = await keycloak.create_user(keycloak_user) 

188 

189 # Set password if provided 

190 if scim_user.password: 

191 await keycloak.set_user_password(user_id, scim_user.password, temporary=False) 

192 

193 # Sync to OpenFGA (assign default roles) 

194 # Note: sync_user_to_openfga requires KeycloakUser, not just user_id 

195 # For now, skip sync or implement proper user retrieval 

196 # from mcp_server_langgraph.auth.keycloak import sync_user_to_openfga 

197 # await sync_user_to_openfga(keycloak_user_obj, openfga) 

198 

199 # Get created user and convert back to SCIM 

200 created_user = await keycloak.get_user(user_id) 

201 

202 if not created_user: 

203 raise HTTPException(status_code=500, detail="Failed to retrieve created user") 

204 

205 response_user = keycloak_to_scim_user(created_user) 

206 

207 return response_user 

208 

209 except ValueError as e: 

210 raise HTTPException(status_code=400, detail=str(e)) 

211 except Exception as e: 

212 raise HTTPException(status_code=500, detail=f"Failed to create user: {e!s}") 

213 

214 

215@router.get("/Users/{user_id}") 

216async def get_user( 

217 user_id: str, 

218 current_user: dict[str, Any] = Depends(get_current_user), 

219 keycloak: KeycloakClient = Depends(get_keycloak_client), 

220) -> SCIMUser: 

221 """ 

222 Get user by ID (SCIM 2.0) 

223 

224 Returns user in SCIM format. 

225 """ 

226 try: 

227 # Get user from Keycloak 

228 keycloak_user = await keycloak.get_user(user_id) 

229 

230 if not keycloak_user: 

231 raise HTTPException(status_code=404, detail=f"User {user_id} not found") 

232 

233 # Convert to SCIM format 

234 scim_user = keycloak_to_scim_user(keycloak_user) 

235 return scim_user 

236 

237 except HTTPException: 

238 raise 

239 except Exception as e: 

240 raise HTTPException(status_code=500, detail=f"Failed to get user: {e!s}") 

241 

242 

243@router.put("/Users/{user_id}", response_model=SCIMUser) 

244async def replace_user( 

245 user_id: str, 

246 user_data: dict[str, Any], 

247 current_user: dict[str, Any] = Depends(get_current_user), 

248 keycloak: KeycloakClient = Depends(get_keycloak_client), 

249 openfga: OpenFGAClient = Depends(get_openfga_client), 

250) -> SCIMUser | JSONResponse: 

251 """ 

252 Replace user (SCIM 2.0 PUT) 

253 

254 Replaces entire user resource. 

255 """ 

256 # SECURITY FIX (CWE-862): Require admin or SCIM provisioner role 

257 await _require_admin_or_scim_role(current_user, openfga=openfga, resource="scim:users") 

258 

259 try: 

260 # Validate SCIM schema 

261 scim_user = validate_scim_user(user_data) 

262 

263 # Convert to Keycloak user 

264 keycloak_user = user_to_keycloak(scim_user) 

265 

266 # Update in Keycloak 

267 await keycloak.update_user(user_id, keycloak_user) 

268 

269 # Get updated user 

270 updated_user = await keycloak.get_user(user_id) 

271 if not updated_user: 

272 return scim_error(404, f"User {user_id} not found", "notFound") 

273 response_user = keycloak_to_scim_user(updated_user) 

274 

275 return response_user 

276 

277 except ValueError as e: 

278 return scim_error(400, str(e), "invalidValue") 

279 except Exception as e: 

280 return scim_error(500, f"Failed to update user: {e!s}", "internalError") 

281 

282 

283@router.patch("/Users/{user_id}", response_model=SCIMUser) 

284async def update_user( 

285 user_id: str, 

286 patch_request: SCIMPatchRequest, 

287 current_user: dict[str, Any] = Depends(get_current_user), 

288 keycloak: KeycloakClient = Depends(get_keycloak_client), 

289 openfga: OpenFGAClient = Depends(get_openfga_client), 

290) -> SCIMUser | JSONResponse: 

291 """ 

292 Update user with PATCH operations (SCIM 2.0) 

293 

294 Supports add, remove, replace operations. 

295 

296 Example: 

297 ```json 

298 { 

299 "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], 

300 "Operations": [ 

301 { 

302 "op": "replace", 

303 "path": "active", 

304 "value": false 

305 } 

306 ] 

307 } 

308 ``` 

309 """ 

310 # SECURITY FIX (CWE-862): Require admin or SCIM provisioner role 

311 await _require_admin_or_scim_role(current_user, openfga=openfga, resource="scim:users") 

312 

313 try: 

314 # Get current user 

315 keycloak_user = await keycloak.get_user(user_id) 

316 

317 if not keycloak_user: 

318 return scim_error(404, f"User {user_id} not found", "notFound") 

319 

320 # Apply PATCH operations 

321 for operation in patch_request.Operations: 

322 if operation.path == "active": 

323 keycloak_user["enabled"] = operation.value 

324 elif operation.path == "emails" and operation.op == "replace" and operation.value: 

325 keycloak_user["email"] = operation.value[0]["value"] 

326 # Add more PATCH operation handlers as needed 

327 

328 # Update in Keycloak 

329 await keycloak.update_user(user_id, keycloak_user) 

330 

331 # Get updated user 

332 updated_user = await keycloak.get_user(user_id) 

333 if not updated_user: 

334 return scim_error(404, f"User {user_id} not found", "notFound") 

335 response_user = keycloak_to_scim_user(updated_user) 

336 

337 return response_user 

338 

339 except Exception as e: 

340 return scim_error(500, f"Failed to patch user: {e!s}", "internalError") 

341 

342 

343@router.delete("/Users/{user_id}", status_code=status.HTTP_204_NO_CONTENT) 

344async def delete_user( 

345 user_id: str, 

346 current_user: dict[str, Any] = Depends(get_current_user), 

347 keycloak: KeycloakClient = Depends(get_keycloak_client), 

348 openfga: OpenFGAClient = Depends(get_openfga_client), 

349) -> None: 

350 """ 

351 Delete (deactivate) user (SCIM 2.0) 

352 

353 Deactivates user in Keycloak and removes OpenFGA tuples. 

354 """ 

355 # SECURITY FIX (CWE-862): Require admin or SCIM provisioner role 

356 await _require_admin_or_scim_role(current_user, openfga=openfga, resource="scim:users") 

357 

358 try: 

359 # Soft delete - disable user 

360 await keycloak.update_user(user_id, {"enabled": False}) 

361 

362 # Remove OpenFGA tuples 

363 await openfga.delete_tuples_for_object(f"user:{user_id}") 

364 

365 except Exception as e: 

366 # For 204 responses, we must raise HTTPException not return error body 

367 raise HTTPException(status_code=500, detail=f"Failed to delete user: {e!s}") 

368 

369 

370@router.get("/Users", response_model=SCIMListResponse) 

371async def list_users( 

372 filter: str | None = Query(None, description="SCIM filter expression"), 

373 startIndex: int = Query(1, ge=1, description="1-based start index"), 

374 count: int = Query(100, ge=1, le=1000, description="Number of results"), 

375 current_user: dict[str, Any] = Depends(get_current_user), 

376 keycloak: KeycloakClient = Depends(get_keycloak_client), 

377) -> SCIMListResponse | JSONResponse: 

378 """ 

379 List/search users (SCIM 2.0) 

380 

381 Supports SCIM filtering with 'eq' (equals) and 'sw' (startsWith) operators: 

382 - userName eq "alice" - exact match 

383 - userName sw "ali" - prefix match 

384 - email eq "alice@example.com" - exact match 

385 - email sw "alice@" - prefix match 

386 """ 

387 try: 

388 # Parse filter (simple implementation) 

389 query = {} 

390 if filter: 

391 # SECURITY: Safe SCIM filter parsing with error handling (CWE-20 prevention) 

392 # Supported filters: 

393 # - "userName eq \"alice\"" - exact match 

394 # - "userName sw \"ali\"" - startsWith (prefix match) 

395 # - "email eq \"alice@example.com\"" - exact match 

396 # - "email sw \"alice@\"" - startsWith (prefix match) 

397 try: 

398 if "userName sw" in filter: 

399 # startsWith - use Keycloak prefix search 

400 parts = filter.split('"') 

401 if len(parts) >= 2: 401 ↛ 432line 401 didn't jump to line 432 because the condition on line 401 was always true

402 username = parts[1] 

403 query["username"] = username 

404 # No "exact" flag = prefix search in Keycloak 

405 elif "userName eq" in filter: 

406 # equals - use exact match 

407 parts = filter.split('"') 

408 if len(parts) >= 2: 

409 username = parts[1] 

410 query["username"] = username 

411 query["exact"] = "true" 

412 elif "email sw" in filter: 

413 # startsWith for email 

414 parts = filter.split('"') 

415 if len(parts) >= 2: 415 ↛ 432line 415 didn't jump to line 432 because the condition on line 415 was always true

416 email = parts[1] 

417 query["email"] = email 

418 # No "exact" flag = prefix search 

419 elif "email eq" in filter: 419 ↛ 432line 419 didn't jump to line 432 because the condition on line 419 was always true

420 # equals for email 

421 parts = filter.split('"') 

422 if len(parts) >= 2: 422 ↛ 432line 422 didn't jump to line 432 because the condition on line 422 was always true

423 email = parts[1] 

424 query["email"] = email 

425 query["exact"] = "true" 

426 except (IndexError, ValueError): 

427 # Malformed filter - continue with empty query (fail-safe, return no results) 

428 # Don't raise - prevents DoS via malformed SCIM queries 

429 pass 

430 

431 # Get users from Keycloak 

432 users = await keycloak.search_users(query=query, first=startIndex - 1, max=count) 

433 

434 # Convert to SCIM format 

435 scim_users = [keycloak_to_scim_user(user) for user in users] 

436 

437 return SCIMListResponse( 

438 totalResults=len(scim_users), 

439 startIndex=startIndex, 

440 itemsPerPage=len(scim_users), 

441 Resources=scim_users, 

442 ) 

443 

444 except Exception as e: 

445 return scim_error(500, f"Failed to list users: {e!s}", "internalError") 

446 

447 

448# Group Endpoints 

449 

450 

451@router.post("/Groups", response_model=SCIMGroup, status_code=status.HTTP_201_CREATED) 

452async def create_group( 

453 group_data: dict[str, Any], 

454 current_user: dict[str, Any] = Depends(get_current_user), 

455 keycloak: KeycloakClient = Depends(get_keycloak_client), 

456 openfga: OpenFGAClient = Depends(get_openfga_client), 

457) -> SCIMGroup | JSONResponse: 

458 """ 

459 Create a new group (SCIM 2.0) 

460 

461 Example: 

462 ```json 

463 { 

464 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 

465 "displayName": "Engineering", 

466 "members": [ 

467 {"value": "user-id-123", "display": "Alice Smith"} 

468 ] 

469 } 

470 ``` 

471 """ 

472 # SECURITY FIX (CWE-862): Require admin or SCIM provisioner role 

473 await _require_admin_or_scim_role(current_user, openfga=openfga, resource="scim:users") 

474 

475 try: 

476 # Validate SCIM schema 

477 scim_group = validate_scim_group(group_data) 

478 

479 # Create group in Keycloak 

480 group_config = { 

481 "name": scim_group.displayName, 

482 } 

483 

484 group_id = await keycloak.create_group(group_config) 

485 

486 # Add members 

487 for member in scim_group.members: 

488 await keycloak.add_user_to_group(member.value, group_id) 

489 

490 # Get created group 

491 created_group = await keycloak.get_group(group_id) 

492 if not created_group: 

493 return scim_error(500, "Failed to retrieve created group", "internalError") 

494 

495 return SCIMGroup( 

496 id=created_group["id"], 

497 displayName=created_group["name"], 

498 members=scim_group.members, 

499 meta={ 

500 "resourceType": "Group", 

501 "created": created_group.get("createdTimestamp"), 

502 }, 

503 ) 

504 

505 except ValueError as e: 

506 return scim_error(400, str(e), "invalidValue") 

507 except Exception as e: 

508 return scim_error(500, f"Failed to create group: {e!s}", "internalError") 

509 

510 

511@router.get("/Groups/{group_id}", response_model=SCIMGroup) 

512async def get_group( 

513 group_id: str, 

514 current_user: dict[str, Any] = Depends(get_current_user), 

515 keycloak: KeycloakClient = Depends(get_keycloak_client), 

516) -> SCIMGroup | JSONResponse: 

517 """Get group by ID (SCIM 2.0)""" 

518 try: 

519 group = await keycloak.get_group(group_id) 

520 

521 if not group: 

522 return scim_error(404, f"Group {group_id} not found", "notFound") 

523 

524 # Get group members 

525 members = await keycloak.get_group_members(group_id) 

526 

527 scim_members = [SCIMMember(value=member["id"], display=member.get("username"), reference=None) for member in members] 

528 

529 return SCIMGroup( 

530 id=group["id"], 

531 displayName=group["name"], 

532 members=scim_members, 

533 meta={"resourceType": "Group"}, 

534 ) 

535 

536 except Exception as e: 

537 return scim_error(500, f"Failed to get group: {e!s}", "internalError")