Coverage for src / mcp_server_langgraph / api / service_principals.py: 94%

107 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Service Principal API Endpoints 

3 

4REST API for managing service principals including creation, listing, 

5secret rotation, and deletion. 

6 

7See ADR-0033 for service principal design decisions. 

8""" 

9 

10from typing import Any 

11 

12from fastapi import APIRouter, Depends, HTTPException, status 

13from pydantic import BaseModel, Field 

14 

15from mcp_server_langgraph.auth.middleware import get_current_user 

16from mcp_server_langgraph.auth.openfga import OpenFGAClient 

17from mcp_server_langgraph.auth.service_principal import ServicePrincipalManager 

18from mcp_server_langgraph.core.dependencies import get_openfga_client, get_service_principal_manager 

19 

20router = APIRouter( 

21 prefix="/api/v1/service-principals", 

22 tags=["Service Principals"], 

23) 

24 

25 

26# Request/Response Models 

27 

28 

29class CreateServicePrincipalRequest(BaseModel): 

30 """Request to create a new service principal""" 

31 

32 name: str = Field( 

33 ..., 

34 description="Human-readable name for the service", 

35 pattern=r"^[a-zA-Z0-9 _-]{1,50}$", # SECURITY: Prevent CWE-20 injection into Keycloak/OpenFGA 

36 min_length=1, 

37 max_length=50, 

38 ) 

39 description: str = Field(..., description="Purpose/description of the service") 

40 authentication_mode: str = Field( 

41 default="client_credentials", 

42 description="Authentication mode: 'client_credentials' or 'service_account_user'", 

43 ) 

44 associated_user_id: str | None = Field(None, description="User to act as for permission inheritance (e.g., 'user:alice')") 

45 inherit_permissions: bool = Field(default=False, description="Whether to inherit permissions from associated user") 

46 

47 

48class ServicePrincipalResponse(BaseModel): 

49 """Response containing service principal details""" 

50 

51 service_id: str 

52 name: str 

53 description: str 

54 authentication_mode: str 

55 associated_user_id: str | None 

56 owner_user_id: str | None 

57 inherit_permissions: bool 

58 enabled: bool 

59 created_at: str | None 

60 

61 

62class CreateServicePrincipalResponse(ServicePrincipalResponse): 

63 """Response when creating service principal (includes secret)""" 

64 

65 client_secret: str = Field(..., description="Client secret (save securely, won't be shown again)") 

66 message: str = Field(default="Service principal created successfully. Save the client_secret securely.") 

67 

68 

69class RotateSecretResponse(BaseModel): 

70 """Response when rotating service principal secret""" 

71 

72 service_id: str 

73 client_secret: str = Field(..., description="New client secret") 

74 message: str = Field(default="Secret rotated successfully. Update your service configuration.") 

75 

76 

77# Authorization Helpers 

78 

79 

80async def _validate_user_association_permission( 

81 current_user: dict[str, Any], 

82 target_user_id: str, 

83 openfga: Any | None = None, 

84) -> None: 

85 """ 

86 Validate that the current user has permission to create service principals 

87 that act as the target user. 

88 

89 SECURITY (OpenAI Codex Finding #6): 

90 Enhanced to support OpenFGA relation-based delegation for fine-grained control. 

91 

92 Authorization rules (checked in order): 

93 1. Users can create SPs that act as themselves (self-service) 

94 2. Admin users can create SPs for any user 

95 3. OpenFGA: can_manage_service_principals relation (delegation) 

96 4. All other cases are denied 

97 

98 Args: 

99 current_user: The authenticated user making the request 

100 target_user_id: The user ID to associate with the service principal 

101 openfga: Optional OpenFGA client for relation-based delegation 

102 

103 Raises: 

104 HTTPException: 403 Forbidden if user is not authorized 

105 

106 References: 

107 - OpenAI Codex Finding #6: Ad-hoc role lists → OpenFGA relations 

108 - CWE-863: Incorrect Authorization (was CWE-269) 

109 """ 

110 user_id = current_user["user_id"] 

111 user_roles = current_user.get("roles", []) 

112 

113 # Rule 1: Users can create SPs for themselves 

114 if user_id == target_user_id: 

115 return # Authorized (self-service) 

116 

117 # Rule 2: Admin users can create SPs for anyone 

118 if "admin" in user_roles: 

119 return # Authorized (admin override) 

120 

121 # Rule 3 (ENHANCEMENT - OpenAI Codex Finding #6): Check OpenFGA delegation 

122 if openfga is not None: 

123 try: 

124 # Check can_manage_service_principals relation 

125 authorized = await openfga.check_permission( 

126 user=user_id, relation="can_manage_service_principals", object=target_user_id, context=None 

127 ) 

128 

129 if authorized: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 from mcp_server_langgraph.observability.telemetry import logger 

131 

132 logger.info( 

133 "Service Principal authorization granted via OpenFGA delegation", 

134 extra={ 

135 "user_id": user_id, 

136 "target_user_id": target_user_id, 

137 "relation": "can_manage_service_principals", 

138 }, 

139 ) 

140 return # Authorized via OpenFGA delegation 

141 

142 except Exception as e: 

143 # Log error but continue to denial 

144 from mcp_server_langgraph.observability.telemetry import logger 

145 

146 logger.warning( 

147 f"OpenFGA check failed for Service Principal authorization: {e}", 

148 extra={"user_id": user_id, "target_user_id": target_user_id}, 

149 ) 

150 

151 # Rule 4: All other cases are denied 

152 raise HTTPException( 

153 status_code=status.HTTP_403_FORBIDDEN, 

154 detail=( 

155 f"You are not authorized to create service principals that act as '{target_user_id}'. " 

156 f"You can create SPs for yourself ('{user_id}'), or with admin privileges, " 

157 f"or can_manage_service_principals OpenFGA relation." 

158 ), 

159 ) 

160 

161 

162# API Endpoints 

163 

164 

165@router.post("/", status_code=status.HTTP_201_CREATED) 

166async def create_service_principal( 

167 request: CreateServicePrincipalRequest, 

168 current_user: dict[str, Any] = Depends(get_current_user), 

169 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager), 

170 openfga: OpenFGAClient = Depends(get_openfga_client), 

171) -> CreateServicePrincipalResponse: 

172 """ 

173 Create a new service principal 

174 

175 Creates a service principal with the specified authentication mode. 

176 The calling user becomes the owner of the service principal. 

177 

178 Returns the created service principal with credentials (client_secret). 

179 **Save the client_secret securely** - it will not be shown again. 

180 

181 Example: 

182 ```json 

183 { 

184 "name": "Batch ETL Job", 

185 "description": "Nightly data processing", 

186 "authentication_mode": "client_credentials", 

187 "associated_user_id": "user:alice", 

188 "inherit_permissions": true 

189 } 

190 ``` 

191 """ 

192 # Validate authentication mode 

193 if request.authentication_mode not in ["client_credentials", "service_account_user"]: 

194 raise HTTPException( 

195 status_code=status.HTTP_400_BAD_REQUEST, 

196 detail="Invalid authentication_mode. Must be 'client_credentials' or 'service_account_user'", 

197 ) 

198 

199 # SECURITY FIX (CWE-269): Validate user association authorization 

200 # Prevent privilege escalation by validating that the caller has permission 

201 # to create service principals that act as the specified user 

202 if request.associated_user_id and request.inherit_permissions: 

203 await _validate_user_association_permission( 

204 current_user=current_user, 

205 target_user_id=request.associated_user_id, 

206 openfga=openfga, 

207 ) 

208 

209 # Generate service ID from name 

210 service_id = request.name.lower().replace(" ", "-").replace("_", "-") 

211 

212 # Validate service ID doesn't already exist 

213 existing = await sp_manager.get_service_principal(service_id) 

214 if existing: 

215 raise HTTPException( 

216 status_code=status.HTTP_409_CONFLICT, 

217 detail=f"Service principal with ID '{service_id}' already exists", 

218 ) 

219 

220 # Create service principal 

221 sp = await sp_manager.create_service_principal( 

222 service_id=service_id, 

223 name=request.name, 

224 description=request.description, 

225 authentication_mode=request.authentication_mode, 

226 associated_user_id=request.associated_user_id, 

227 owner_user_id=current_user["user_id"], 

228 inherit_permissions=request.inherit_permissions, 

229 ) 

230 

231 # Ensure client_secret is set (should always be present after creation) 

232 if sp.client_secret is None: 

233 raise HTTPException( 

234 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

235 detail="Failed to generate client secret", 

236 ) 

237 

238 return CreateServicePrincipalResponse( 

239 service_id=sp.service_id, 

240 name=sp.name, 

241 description=sp.description, 

242 authentication_mode=sp.authentication_mode, 

243 associated_user_id=sp.associated_user_id, 

244 owner_user_id=sp.owner_user_id, 

245 inherit_permissions=sp.inherit_permissions, 

246 enabled=sp.enabled, 

247 created_at=sp.created_at, 

248 client_secret=sp.client_secret, 

249 ) 

250 

251 

252@router.get("/") 

253async def list_service_principals( 

254 current_user: dict[str, Any] = Depends(get_current_user), 

255 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager), 

256) -> list[ServicePrincipalResponse]: 

257 """ 

258 List service principals owned by the current user 

259 

260 Returns all service principals where the current user is the owner. 

261 Does not include client secrets. 

262 """ 

263 sps = await sp_manager.list_service_principals(owner_user_id=current_user["user_id"]) 

264 

265 return [ 

266 ServicePrincipalResponse( 

267 service_id=sp.service_id, 

268 name=sp.name, 

269 description=sp.description, 

270 authentication_mode=sp.authentication_mode, 

271 associated_user_id=sp.associated_user_id, 

272 owner_user_id=sp.owner_user_id, 

273 inherit_permissions=sp.inherit_permissions, 

274 enabled=sp.enabled, 

275 created_at=sp.created_at, 

276 ) 

277 for sp in sps 

278 ] 

279 

280 

281@router.get("/{service_id}") 

282async def get_service_principal( 

283 service_id: str, 

284 current_user: dict[str, Any] = Depends(get_current_user), 

285 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager), 

286) -> ServicePrincipalResponse: 

287 """ 

288 Get details of a specific service principal 

289 

290 Returns service principal details if the current user is the owner. 

291 """ 

292 sp = await sp_manager.get_service_principal(service_id) 

293 

294 if not sp: 

295 raise HTTPException( 

296 status_code=status.HTTP_404_NOT_FOUND, 

297 detail=f"Service principal '{service_id}' not found", 

298 ) 

299 

300 # Verify ownership 

301 if sp.owner_user_id != current_user["user_id"]: 

302 raise HTTPException( 

303 status_code=status.HTTP_403_FORBIDDEN, 

304 detail="You do not have permission to view this service principal", 

305 ) 

306 

307 return ServicePrincipalResponse( 

308 service_id=sp.service_id, 

309 name=sp.name, 

310 description=sp.description, 

311 authentication_mode=sp.authentication_mode, 

312 associated_user_id=sp.associated_user_id, 

313 owner_user_id=sp.owner_user_id, 

314 inherit_permissions=sp.inherit_permissions, 

315 enabled=sp.enabled, 

316 created_at=sp.created_at, 

317 ) 

318 

319 

320@router.post("/{service_id}/rotate-secret") 

321async def rotate_service_principal_secret( 

322 service_id: str, 

323 current_user: dict[str, Any] = Depends(get_current_user), 

324 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager), 

325) -> RotateSecretResponse: 

326 """ 

327 Rotate service principal secret 

328 

329 Generates a new client secret for the service principal. 

330 The old secret will be invalidated immediately. 

331 

332 **Save the new client_secret securely** - update your service configuration 

333 before the old secret expires. 

334 """ 

335 # Get service principal 

336 sp = await sp_manager.get_service_principal(service_id) 

337 

338 if not sp: 

339 raise HTTPException( 

340 status_code=status.HTTP_404_NOT_FOUND, 

341 detail=f"Service principal '{service_id}' not found", 

342 ) 

343 

344 # Verify ownership 

345 if sp.owner_user_id != current_user["user_id"]: 

346 raise HTTPException( 

347 status_code=status.HTTP_403_FORBIDDEN, 

348 detail="You do not have permission to rotate this service principal's secret", 

349 ) 

350 

351 # Rotate secret 

352 new_secret = await sp_manager.rotate_secret(service_id) 

353 

354 return RotateSecretResponse( 

355 service_id=service_id, 

356 client_secret=new_secret, 

357 ) 

358 

359 

360@router.delete("/{service_id}", status_code=status.HTTP_204_NO_CONTENT) 

361async def delete_service_principal( 

362 service_id: str, 

363 current_user: dict[str, Any] = Depends(get_current_user), 

364 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager), 

365) -> None: 

366 """ 

367 Delete a service principal 

368 

369 Permanently deletes the service principal from Keycloak and OpenFGA. 

370 This action cannot be undone. 

371 """ 

372 # Get service principal 

373 sp = await sp_manager.get_service_principal(service_id) 

374 

375 if not sp: 

376 raise HTTPException( 

377 status_code=status.HTTP_404_NOT_FOUND, 

378 detail=f"Service principal '{service_id}' not found", 

379 ) 

380 

381 # Verify ownership 

382 if sp.owner_user_id != current_user["user_id"]: 

383 raise HTTPException( 

384 status_code=status.HTTP_403_FORBIDDEN, 

385 detail="You do not have permission to delete this service principal", 

386 ) 

387 

388 # Delete service principal 

389 await sp_manager.delete_service_principal(service_id) 

390 

391 return None 

392 

393 

394@router.post("/{service_id}/associate-user") 

395async def associate_service_principal_with_user( 

396 service_id: str, 

397 user_id: str, 

398 inherit_permissions: bool = True, 

399 current_user: dict[str, Any] = Depends(get_current_user), 

400 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager), 

401 openfga: OpenFGAClient = Depends(get_openfga_client), 

402) -> ServicePrincipalResponse: 

403 """ 

404 Associate service principal with a user for permission inheritance 

405 

406 Links a service principal to a user, optionally enabling permission inheritance. 

407 When inherit_permissions is true, the service principal can act on behalf of 

408 the user and inherit all their permissions. 

409 """ 

410 # Get service principal 

411 sp = await sp_manager.get_service_principal(service_id) 

412 

413 if not sp: 

414 raise HTTPException( 

415 status_code=status.HTTP_404_NOT_FOUND, 

416 detail=f"Service principal '{service_id}' not found", 

417 ) 

418 

419 # Verify ownership 

420 if sp.owner_user_id != current_user["user_id"]: 

421 raise HTTPException( 

422 status_code=status.HTTP_403_FORBIDDEN, 

423 detail="You do not have permission to modify this service principal", 

424 ) 

425 

426 # SECURITY FIX (CWE-269): Validate user association authorization 

427 # Prevent privilege escalation by validating permission to associate with target user 

428 if inherit_permissions: 428 ↛ 436line 428 didn't jump to line 436 because the condition on line 428 was always true

429 await _validate_user_association_permission( 

430 current_user=current_user, 

431 target_user_id=user_id, 

432 openfga=openfga, 

433 ) 

434 

435 # Associate with user 

436 await sp_manager.associate_with_user( 

437 service_id=service_id, 

438 user_id=user_id, 

439 inherit_permissions=inherit_permissions, 

440 ) 

441 

442 # Return updated service principal 

443 updated_sp = await sp_manager.get_service_principal(service_id) 

444 

445 if not updated_sp: 

446 raise HTTPException( 

447 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

448 detail="Failed to retrieve updated service principal", 

449 ) 

450 

451 return ServicePrincipalResponse( 

452 service_id=updated_sp.service_id, 

453 name=updated_sp.name, 

454 description=updated_sp.description, 

455 authentication_mode=updated_sp.authentication_mode, 

456 associated_user_id=updated_sp.associated_user_id, 

457 owner_user_id=updated_sp.owner_user_id, 

458 inherit_permissions=updated_sp.inherit_permissions, 

459 enabled=updated_sp.enabled, 

460 created_at=updated_sp.created_at, 

461 )