Coverage for src / mcp_server_langgraph / api / service_principals.py: 94%
107 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Service Principal API Endpoints
4REST API for managing service principals including creation, listing,
5secret rotation, and deletion.
7See ADR-0033 for service principal design decisions.
8"""
10from typing import Any
12from fastapi import APIRouter, Depends, HTTPException, status
13from pydantic import BaseModel, Field
15from mcp_server_langgraph.auth.middleware import get_current_user
16from mcp_server_langgraph.auth.openfga import OpenFGAClient
17from mcp_server_langgraph.auth.service_principal import ServicePrincipalManager
18from mcp_server_langgraph.core.dependencies import get_openfga_client, get_service_principal_manager
20router = APIRouter(
21 prefix="/api/v1/service-principals",
22 tags=["Service Principals"],
23)
26# Request/Response Models
29class CreateServicePrincipalRequest(BaseModel):
30 """Request to create a new service principal"""
32 name: str = Field(
33 ...,
34 description="Human-readable name for the service",
35 pattern=r"^[a-zA-Z0-9 _-]{1,50}$", # SECURITY: Prevent CWE-20 injection into Keycloak/OpenFGA
36 min_length=1,
37 max_length=50,
38 )
39 description: str = Field(..., description="Purpose/description of the service")
40 authentication_mode: str = Field(
41 default="client_credentials",
42 description="Authentication mode: 'client_credentials' or 'service_account_user'",
43 )
44 associated_user_id: str | None = Field(None, description="User to act as for permission inheritance (e.g., 'user:alice')")
45 inherit_permissions: bool = Field(default=False, description="Whether to inherit permissions from associated user")
48class ServicePrincipalResponse(BaseModel):
49 """Response containing service principal details"""
51 service_id: str
52 name: str
53 description: str
54 authentication_mode: str
55 associated_user_id: str | None
56 owner_user_id: str | None
57 inherit_permissions: bool
58 enabled: bool
59 created_at: str | None
62class CreateServicePrincipalResponse(ServicePrincipalResponse):
63 """Response when creating service principal (includes secret)"""
65 client_secret: str = Field(..., description="Client secret (save securely, won't be shown again)")
66 message: str = Field(default="Service principal created successfully. Save the client_secret securely.")
69class RotateSecretResponse(BaseModel):
70 """Response when rotating service principal secret"""
72 service_id: str
73 client_secret: str = Field(..., description="New client secret")
74 message: str = Field(default="Secret rotated successfully. Update your service configuration.")
77# Authorization Helpers
80async def _validate_user_association_permission(
81 current_user: dict[str, Any],
82 target_user_id: str,
83 openfga: Any | None = None,
84) -> None:
85 """
86 Validate that the current user has permission to create service principals
87 that act as the target user.
89 SECURITY (OpenAI Codex Finding #6):
90 Enhanced to support OpenFGA relation-based delegation for fine-grained control.
92 Authorization rules (checked in order):
93 1. Users can create SPs that act as themselves (self-service)
94 2. Admin users can create SPs for any user
95 3. OpenFGA: can_manage_service_principals relation (delegation)
96 4. All other cases are denied
98 Args:
99 current_user: The authenticated user making the request
100 target_user_id: The user ID to associate with the service principal
101 openfga: Optional OpenFGA client for relation-based delegation
103 Raises:
104 HTTPException: 403 Forbidden if user is not authorized
106 References:
107 - OpenAI Codex Finding #6: Ad-hoc role lists → OpenFGA relations
108 - CWE-863: Incorrect Authorization (was CWE-269)
109 """
110 user_id = current_user["user_id"]
111 user_roles = current_user.get("roles", [])
113 # Rule 1: Users can create SPs for themselves
114 if user_id == target_user_id:
115 return # Authorized (self-service)
117 # Rule 2: Admin users can create SPs for anyone
118 if "admin" in user_roles:
119 return # Authorized (admin override)
121 # Rule 3 (ENHANCEMENT - OpenAI Codex Finding #6): Check OpenFGA delegation
122 if openfga is not None:
123 try:
124 # Check can_manage_service_principals relation
125 authorized = await openfga.check_permission(
126 user=user_id, relation="can_manage_service_principals", object=target_user_id, context=None
127 )
129 if authorized: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true
130 from mcp_server_langgraph.observability.telemetry import logger
132 logger.info(
133 "Service Principal authorization granted via OpenFGA delegation",
134 extra={
135 "user_id": user_id,
136 "target_user_id": target_user_id,
137 "relation": "can_manage_service_principals",
138 },
139 )
140 return # Authorized via OpenFGA delegation
142 except Exception as e:
143 # Log error but continue to denial
144 from mcp_server_langgraph.observability.telemetry import logger
146 logger.warning(
147 f"OpenFGA check failed for Service Principal authorization: {e}",
148 extra={"user_id": user_id, "target_user_id": target_user_id},
149 )
151 # Rule 4: All other cases are denied
152 raise HTTPException(
153 status_code=status.HTTP_403_FORBIDDEN,
154 detail=(
155 f"You are not authorized to create service principals that act as '{target_user_id}'. "
156 f"You can create SPs for yourself ('{user_id}'), or with admin privileges, "
157 f"or can_manage_service_principals OpenFGA relation."
158 ),
159 )
162# API Endpoints
165@router.post("/", status_code=status.HTTP_201_CREATED)
166async def create_service_principal(
167 request: CreateServicePrincipalRequest,
168 current_user: dict[str, Any] = Depends(get_current_user),
169 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager),
170 openfga: OpenFGAClient = Depends(get_openfga_client),
171) -> CreateServicePrincipalResponse:
172 """
173 Create a new service principal
175 Creates a service principal with the specified authentication mode.
176 The calling user becomes the owner of the service principal.
178 Returns the created service principal with credentials (client_secret).
179 **Save the client_secret securely** - it will not be shown again.
181 Example:
182 ```json
183 {
184 "name": "Batch ETL Job",
185 "description": "Nightly data processing",
186 "authentication_mode": "client_credentials",
187 "associated_user_id": "user:alice",
188 "inherit_permissions": true
189 }
190 ```
191 """
192 # Validate authentication mode
193 if request.authentication_mode not in ["client_credentials", "service_account_user"]:
194 raise HTTPException(
195 status_code=status.HTTP_400_BAD_REQUEST,
196 detail="Invalid authentication_mode. Must be 'client_credentials' or 'service_account_user'",
197 )
199 # SECURITY FIX (CWE-269): Validate user association authorization
200 # Prevent privilege escalation by validating that the caller has permission
201 # to create service principals that act as the specified user
202 if request.associated_user_id and request.inherit_permissions:
203 await _validate_user_association_permission(
204 current_user=current_user,
205 target_user_id=request.associated_user_id,
206 openfga=openfga,
207 )
209 # Generate service ID from name
210 service_id = request.name.lower().replace(" ", "-").replace("_", "-")
212 # Validate service ID doesn't already exist
213 existing = await sp_manager.get_service_principal(service_id)
214 if existing:
215 raise HTTPException(
216 status_code=status.HTTP_409_CONFLICT,
217 detail=f"Service principal with ID '{service_id}' already exists",
218 )
220 # Create service principal
221 sp = await sp_manager.create_service_principal(
222 service_id=service_id,
223 name=request.name,
224 description=request.description,
225 authentication_mode=request.authentication_mode,
226 associated_user_id=request.associated_user_id,
227 owner_user_id=current_user["user_id"],
228 inherit_permissions=request.inherit_permissions,
229 )
231 # Ensure client_secret is set (should always be present after creation)
232 if sp.client_secret is None:
233 raise HTTPException(
234 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
235 detail="Failed to generate client secret",
236 )
238 return CreateServicePrincipalResponse(
239 service_id=sp.service_id,
240 name=sp.name,
241 description=sp.description,
242 authentication_mode=sp.authentication_mode,
243 associated_user_id=sp.associated_user_id,
244 owner_user_id=sp.owner_user_id,
245 inherit_permissions=sp.inherit_permissions,
246 enabled=sp.enabled,
247 created_at=sp.created_at,
248 client_secret=sp.client_secret,
249 )
252@router.get("/")
253async def list_service_principals(
254 current_user: dict[str, Any] = Depends(get_current_user),
255 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager),
256) -> list[ServicePrincipalResponse]:
257 """
258 List service principals owned by the current user
260 Returns all service principals where the current user is the owner.
261 Does not include client secrets.
262 """
263 sps = await sp_manager.list_service_principals(owner_user_id=current_user["user_id"])
265 return [
266 ServicePrincipalResponse(
267 service_id=sp.service_id,
268 name=sp.name,
269 description=sp.description,
270 authentication_mode=sp.authentication_mode,
271 associated_user_id=sp.associated_user_id,
272 owner_user_id=sp.owner_user_id,
273 inherit_permissions=sp.inherit_permissions,
274 enabled=sp.enabled,
275 created_at=sp.created_at,
276 )
277 for sp in sps
278 ]
281@router.get("/{service_id}")
282async def get_service_principal(
283 service_id: str,
284 current_user: dict[str, Any] = Depends(get_current_user),
285 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager),
286) -> ServicePrincipalResponse:
287 """
288 Get details of a specific service principal
290 Returns service principal details if the current user is the owner.
291 """
292 sp = await sp_manager.get_service_principal(service_id)
294 if not sp:
295 raise HTTPException(
296 status_code=status.HTTP_404_NOT_FOUND,
297 detail=f"Service principal '{service_id}' not found",
298 )
300 # Verify ownership
301 if sp.owner_user_id != current_user["user_id"]:
302 raise HTTPException(
303 status_code=status.HTTP_403_FORBIDDEN,
304 detail="You do not have permission to view this service principal",
305 )
307 return ServicePrincipalResponse(
308 service_id=sp.service_id,
309 name=sp.name,
310 description=sp.description,
311 authentication_mode=sp.authentication_mode,
312 associated_user_id=sp.associated_user_id,
313 owner_user_id=sp.owner_user_id,
314 inherit_permissions=sp.inherit_permissions,
315 enabled=sp.enabled,
316 created_at=sp.created_at,
317 )
320@router.post("/{service_id}/rotate-secret")
321async def rotate_service_principal_secret(
322 service_id: str,
323 current_user: dict[str, Any] = Depends(get_current_user),
324 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager),
325) -> RotateSecretResponse:
326 """
327 Rotate service principal secret
329 Generates a new client secret for the service principal.
330 The old secret will be invalidated immediately.
332 **Save the new client_secret securely** - update your service configuration
333 before the old secret expires.
334 """
335 # Get service principal
336 sp = await sp_manager.get_service_principal(service_id)
338 if not sp:
339 raise HTTPException(
340 status_code=status.HTTP_404_NOT_FOUND,
341 detail=f"Service principal '{service_id}' not found",
342 )
344 # Verify ownership
345 if sp.owner_user_id != current_user["user_id"]:
346 raise HTTPException(
347 status_code=status.HTTP_403_FORBIDDEN,
348 detail="You do not have permission to rotate this service principal's secret",
349 )
351 # Rotate secret
352 new_secret = await sp_manager.rotate_secret(service_id)
354 return RotateSecretResponse(
355 service_id=service_id,
356 client_secret=new_secret,
357 )
360@router.delete("/{service_id}", status_code=status.HTTP_204_NO_CONTENT)
361async def delete_service_principal(
362 service_id: str,
363 current_user: dict[str, Any] = Depends(get_current_user),
364 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager),
365) -> None:
366 """
367 Delete a service principal
369 Permanently deletes the service principal from Keycloak and OpenFGA.
370 This action cannot be undone.
371 """
372 # Get service principal
373 sp = await sp_manager.get_service_principal(service_id)
375 if not sp:
376 raise HTTPException(
377 status_code=status.HTTP_404_NOT_FOUND,
378 detail=f"Service principal '{service_id}' not found",
379 )
381 # Verify ownership
382 if sp.owner_user_id != current_user["user_id"]:
383 raise HTTPException(
384 status_code=status.HTTP_403_FORBIDDEN,
385 detail="You do not have permission to delete this service principal",
386 )
388 # Delete service principal
389 await sp_manager.delete_service_principal(service_id)
391 return None
394@router.post("/{service_id}/associate-user")
395async def associate_service_principal_with_user(
396 service_id: str,
397 user_id: str,
398 inherit_permissions: bool = True,
399 current_user: dict[str, Any] = Depends(get_current_user),
400 sp_manager: ServicePrincipalManager = Depends(get_service_principal_manager),
401 openfga: OpenFGAClient = Depends(get_openfga_client),
402) -> ServicePrincipalResponse:
403 """
404 Associate service principal with a user for permission inheritance
406 Links a service principal to a user, optionally enabling permission inheritance.
407 When inherit_permissions is true, the service principal can act on behalf of
408 the user and inherit all their permissions.
409 """
410 # Get service principal
411 sp = await sp_manager.get_service_principal(service_id)
413 if not sp:
414 raise HTTPException(
415 status_code=status.HTTP_404_NOT_FOUND,
416 detail=f"Service principal '{service_id}' not found",
417 )
419 # Verify ownership
420 if sp.owner_user_id != current_user["user_id"]:
421 raise HTTPException(
422 status_code=status.HTTP_403_FORBIDDEN,
423 detail="You do not have permission to modify this service principal",
424 )
426 # SECURITY FIX (CWE-269): Validate user association authorization
427 # Prevent privilege escalation by validating permission to associate with target user
428 if inherit_permissions: 428 ↛ 436line 428 didn't jump to line 436 because the condition on line 428 was always true
429 await _validate_user_association_permission(
430 current_user=current_user,
431 target_user_id=user_id,
432 openfga=openfga,
433 )
435 # Associate with user
436 await sp_manager.associate_with_user(
437 service_id=service_id,
438 user_id=user_id,
439 inherit_permissions=inherit_permissions,
440 )
442 # Return updated service principal
443 updated_sp = await sp_manager.get_service_principal(service_id)
445 if not updated_sp:
446 raise HTTPException(
447 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
448 detail="Failed to retrieve updated service principal",
449 )
451 return ServicePrincipalResponse(
452 service_id=updated_sp.service_id,
453 name=updated_sp.name,
454 description=updated_sp.description,
455 authentication_mode=updated_sp.authentication_mode,
456 associated_user_id=updated_sp.associated_user_id,
457 owner_user_id=updated_sp.owner_user_id,
458 inherit_permissions=updated_sp.inherit_permissions,
459 enabled=updated_sp.enabled,
460 created_at=updated_sp.created_at,
461 )