Coverage for src / mcp_server_langgraph / auth / service_principal.py: 82%
103 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Service Principal Management
4Manages service principal lifecycle including creation, user association,
5permission inheritance, secret rotation, and OpenFGA synchronization.
7Service principals enable machine-to-machine authentication with two modes:
81. Client Credentials (OAuth2) - Keycloak clients with service accounts
92. Service Account Users - Special user accounts marked as services
11See ADR-0033 for architectural decisions.
12"""
14import secrets
15from dataclasses import dataclass
16from datetime import datetime, UTC
18from mcp_server_langgraph.auth.keycloak import KeycloakClient
19from mcp_server_langgraph.auth.openfga import OpenFGAClient
22@dataclass
23class ServicePrincipal:
24 """Service principal identity and configuration"""
26 service_id: str # e.g., "batch-etl-job"
27 name: str
28 description: str
29 authentication_mode: str # "client_credentials" or "service_account_user"
30 associated_user_id: str | None = None # e.g., "user:alice"
31 owner_user_id: str | None = None # e.g., "user:bob"
32 inherit_permissions: bool = False
33 enabled: bool = True
34 created_at: str | None = None
35 client_secret: str | None = None # Only returned on creation/rotation
38class ServicePrincipalManager:
39 """Manage service principal lifecycle"""
41 def __init__(self, keycloak_client: KeycloakClient, openfga_client: OpenFGAClient | None):
42 """
43 Initialize service principal manager
45 Args:
46 keycloak_client: Keycloak client for user/client management
47 openfga_client: OpenFGA client for authorization tuples (None if disabled)
48 """
49 self.keycloak = keycloak_client
50 self.openfga = openfga_client
52 async def create_service_principal(
53 self,
54 service_id: str,
55 name: str,
56 description: str,
57 authentication_mode: str = "client_credentials",
58 associated_user_id: str | None = None,
59 owner_user_id: str | None = None,
60 inherit_permissions: bool = False,
61 ) -> ServicePrincipal:
62 """
63 Create service principal in Keycloak and OpenFGA
65 Args:
66 service_id: Unique identifier for service (e.g., "batch-etl-job")
67 name: Human-readable name
68 description: Purpose/description
69 authentication_mode: "client_credentials" or "service_account_user"
70 associated_user_id: Optional user to act as (e.g., "user:alice")
71 owner_user_id: User who owns this service principal
72 inherit_permissions: Whether to inherit permissions from associated user
74 Returns:
75 ServicePrincipal with generated credentials
77 Raises:
78 ValueError: If authentication_mode is invalid
79 """
80 created_at = datetime.now(UTC).isoformat()
82 if authentication_mode == "client_credentials":
83 client_secret = await self._create_client_credentials_service(
84 service_id=service_id,
85 name=name,
86 description=description,
87 associated_user_id=associated_user_id,
88 owner_user_id=owner_user_id,
89 inherit_permissions=inherit_permissions,
90 )
92 elif authentication_mode == "service_account_user":
93 client_secret = await self._create_service_account_user(
94 service_id=service_id,
95 name=name,
96 description=description,
97 associated_user_id=associated_user_id,
98 owner_user_id=owner_user_id,
99 inherit_permissions=inherit_permissions,
100 )
102 else:
103 msg = f"Invalid authentication mode: {authentication_mode}"
104 raise ValueError(msg)
106 # Sync to OpenFGA
107 await self._sync_to_openfga(
108 service_id=service_id,
109 associated_user_id=associated_user_id,
110 owner_user_id=owner_user_id,
111 inherit_permissions=inherit_permissions,
112 )
114 return ServicePrincipal(
115 service_id=service_id,
116 name=name,
117 description=description,
118 authentication_mode=authentication_mode,
119 associated_user_id=associated_user_id,
120 owner_user_id=owner_user_id,
121 inherit_permissions=inherit_permissions,
122 enabled=True,
123 created_at=created_at,
124 client_secret=client_secret,
125 )
127 async def _create_client_credentials_service(
128 self,
129 service_id: str,
130 name: str,
131 description: str,
132 associated_user_id: str | None,
133 owner_user_id: str | None,
134 inherit_permissions: bool,
135 ) -> str:
136 """Create Keycloak client with service account enabled"""
137 # Generate secure client secret
138 client_secret = secrets.token_urlsafe(32)
140 client_config = {
141 "clientId": service_id,
142 "name": name,
143 "description": description,
144 "enabled": True,
145 "serviceAccountsEnabled": True, # Enable service accounts
146 "standardFlowEnabled": False, # No authorization code flow
147 "directAccessGrantsEnabled": False, # No ROPC
148 "implicitFlowEnabled": False, # No implicit flow
149 "publicClient": False, # Confidential client
150 "clientAuthenticatorType": "client-secret",
151 "secret": client_secret,
152 "attributes": {
153 "associatedUserId": associated_user_id or "",
154 "inheritPermissions": str(inherit_permissions).lower(),
155 "owner": owner_user_id or "",
156 "purpose": description,
157 "createdAt": datetime.now(UTC).isoformat(),
158 },
159 }
161 await self.keycloak.create_client(client_config)
162 return client_secret
164 async def _create_service_account_user(
165 self,
166 service_id: str,
167 name: str,
168 description: str,
169 associated_user_id: str | None,
170 owner_user_id: str | None,
171 inherit_permissions: bool,
172 ) -> str:
173 """Create Keycloak user marked as service account"""
174 # Generate secure password
175 password = secrets.token_urlsafe(32)
177 user_config = {
178 "username": f"svc_{service_id}",
179 "enabled": True,
180 "email": f"svc-{service_id}@example.com",
181 "emailVerified": True,
182 "attributes": {
183 "serviceAccount": "true", # Mark as service account
184 "associatedUserId": associated_user_id or "",
185 "inheritPermissions": str(inherit_permissions).lower(),
186 "owner": owner_user_id or "",
187 "purpose": description,
188 "createdAt": datetime.now(UTC).isoformat(),
189 },
190 "credentials": [{"type": "password", "value": password, "temporary": False}],
191 "realmRoles": ["service-principal"],
192 }
194 await self.keycloak.create_user(user_config)
195 return password
197 async def _sync_to_openfga(
198 self,
199 service_id: str,
200 associated_user_id: str | None,
201 owner_user_id: str | None,
202 inherit_permissions: bool,
203 ) -> None:
204 """
205 Sync service principal relationships to OpenFGA
207 Gracefully handles disabled OpenFGA (when self.openfga is None).
208 """
209 # Guard: Skip OpenFGA sync if client is not available
210 if self.openfga is None:
211 return
213 tuples = []
215 # Permission inheritance via acts_as
216 if inherit_permissions and associated_user_id:
217 tuples.append(
218 {
219 "user": f"service:{service_id}",
220 "relation": "acts_as",
221 "object": associated_user_id,
222 }
223 )
225 # Ownership
226 if owner_user_id:
227 tuples.append(
228 {
229 "user": owner_user_id,
230 "relation": "owner",
231 "object": f"service_principal:{service_id}",
232 }
233 )
235 if tuples:
236 await self.openfga.write_tuples(tuples)
238 async def associate_with_user(
239 self,
240 service_id: str,
241 user_id: str,
242 inherit_permissions: bool = True,
243 ) -> None:
244 """
245 Associate service principal with user for permission inheritance
247 Args:
248 service_id: Service principal identifier
249 user_id: User to associate with (e.g., "user:alice")
250 inherit_permissions: Whether to inherit user's permissions
251 """
252 # Update Keycloak attributes
253 await self.keycloak.update_client_attributes(
254 service_id,
255 {
256 "associatedUserId": user_id,
257 "inheritPermissions": str(inherit_permissions).lower(),
258 },
259 )
261 # Create OpenFGA tuple for permission inheritance
262 if inherit_permissions and self.openfga is not None:
263 await self.openfga.write_tuples(
264 [
265 {
266 "user": f"service:{service_id}",
267 "relation": "acts_as",
268 "object": user_id,
269 }
270 ]
271 )
273 async def rotate_secret(self, service_id: str) -> str:
274 """
275 Rotate service principal secret
277 Args:
278 service_id: Service principal identifier
280 Returns:
281 New client secret (store securely, won't be shown again)
282 """
283 new_secret = secrets.token_urlsafe(32)
285 # Update in Keycloak
286 await self.keycloak.update_client_secret(service_id, new_secret)
288 return new_secret
290 async def list_service_principals(self, owner_user_id: str | None = None) -> list[ServicePrincipal]:
291 """
292 List all service principals, optionally filtered by owner
294 Args:
295 owner_user_id: Optional filter by owner (e.g., "user:bob")
297 Returns:
298 List of ServicePrincipal objects
299 """
300 service_principals = []
302 # Query Keycloak for clients with serviceAccountsEnabled
303 clients = await self.keycloak.get_clients(query={"serviceAccountsEnabled": True})
305 for client in clients:
306 attrs = client.get("attributes", {})
308 # Filter by owner if specified
309 if owner_user_id and attrs.get("owner") != owner_user_id:
310 continue
312 service_principals.append(
313 ServicePrincipal(
314 service_id=client["clientId"],
315 name=client.get("name", ""),
316 description=client.get("description", ""),
317 authentication_mode="client_credentials",
318 associated_user_id=attrs.get("associatedUserId") or None,
319 owner_user_id=attrs.get("owner") or None,
320 inherit_permissions=attrs.get("inheritPermissions") == "true",
321 enabled=client.get("enabled", True),
322 created_at=attrs.get("createdAt"),
323 client_secret=None, # Never return secrets in list
324 )
325 )
327 # Also query for service account users
328 users = await self.keycloak.get_users(query={"serviceAccount": "true"})
330 for user in users: 330 ↛ 331line 330 didn't jump to line 331 because the loop on line 330 never started
331 attrs = user.get("attributes", {})
333 # Filter by owner if specified
334 if owner_user_id and attrs.get("owner") != owner_user_id:
335 continue
337 # Extract service_id from username (format: svc_<service_id>)
338 username = user.get("username", "")
339 service_id = username.replace("svc_", "") if username.startswith("svc_") else username
341 service_principals.append(
342 ServicePrincipal(
343 service_id=service_id,
344 name=user.get("firstName", "") or user.get("username", ""),
345 description=attrs.get("purpose", ""),
346 authentication_mode="service_account_user",
347 associated_user_id=attrs.get("associatedUserId") or None,
348 owner_user_id=attrs.get("owner") or None,
349 inherit_permissions=attrs.get("inheritPermissions") == "true",
350 enabled=user.get("enabled", True),
351 created_at=attrs.get("createdAt"),
352 client_secret=None,
353 )
354 )
356 return service_principals
358 async def get_service_principal(self, service_id: str) -> ServicePrincipal | None:
359 """
360 Get specific service principal by ID
362 Args:
363 service_id: Service principal identifier
365 Returns:
366 ServicePrincipal or None if not found
367 """
368 # Try to find as client first
369 try:
370 client = await self.keycloak.get_client(service_id)
371 if client and client.get("serviceAccountsEnabled"): 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true
372 attrs = client.get("attributes", {})
373 return ServicePrincipal(
374 service_id=client["clientId"],
375 name=client.get("name", ""),
376 description=client.get("description", ""),
377 authentication_mode="client_credentials",
378 associated_user_id=attrs.get("associatedUserId") or None,
379 owner_user_id=attrs.get("owner") or None,
380 inherit_permissions=attrs.get("inheritPermissions") == "true",
381 enabled=client.get("enabled", True),
382 created_at=attrs.get("createdAt"),
383 )
384 except Exception:
385 pass
387 # Try to find as service account user
388 try:
389 user = await self.keycloak.get_user_by_username(f"svc_{service_id}")
390 if user: 390 ↛ 407line 390 didn't jump to line 407 because the condition on line 390 was always true
391 attrs = user.get("attributes", {}) # type: ignore[attr-defined]
392 if attrs.get("serviceAccount") == "true": 392 ↛ 407line 392 didn't jump to line 407 because the condition on line 392 was always true
393 return ServicePrincipal(
394 service_id=service_id,
395 name=user.get("firstName", "") or user.get("username", ""), # type: ignore[attr-defined]
396 description=attrs.get("purpose", ""),
397 authentication_mode="service_account_user",
398 associated_user_id=attrs.get("associatedUserId") or None,
399 owner_user_id=attrs.get("owner") or None,
400 inherit_permissions=attrs.get("inheritPermissions") == "true",
401 enabled=user.get("enabled", True), # type: ignore[attr-defined]
402 created_at=attrs.get("createdAt"),
403 )
404 except Exception:
405 pass
407 return None
409 async def delete_service_principal(self, service_id: str) -> None:
410 """
411 Delete service principal from Keycloak and OpenFGA
413 Args:
414 service_id: Service principal identifier
415 """
416 # Remove from Keycloak (try both client and user)
417 try:
418 await self.keycloak.delete_client(service_id)
419 except Exception:
420 try:
421 await self.keycloak.delete_user(f"svc_{service_id}")
422 except Exception:
423 pass # May not exist
425 # Remove from OpenFGA (if available)
426 if self.openfga is not None:
427 await self.openfga.delete_tuples_for_object(f"service_principal:{service_id}")