Coverage for src / mcp_server_langgraph / auth / service_principal.py: 82%

103 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Service Principal Management 

3 

4Manages service principal lifecycle including creation, user association, 

5permission inheritance, secret rotation, and OpenFGA synchronization. 

6 

7Service principals enable machine-to-machine authentication with two modes: 

81. Client Credentials (OAuth2) - Keycloak clients with service accounts 

92. Service Account Users - Special user accounts marked as services 

10 

11See ADR-0033 for architectural decisions. 

12""" 

13 

14import secrets 

15from dataclasses import dataclass 

16from datetime import datetime, UTC 

17 

18from mcp_server_langgraph.auth.keycloak import KeycloakClient 

19from mcp_server_langgraph.auth.openfga import OpenFGAClient 

20 

21 

22@dataclass 

23class ServicePrincipal: 

24 """Service principal identity and configuration""" 

25 

26 service_id: str # e.g., "batch-etl-job" 

27 name: str 

28 description: str 

29 authentication_mode: str # "client_credentials" or "service_account_user" 

30 associated_user_id: str | None = None # e.g., "user:alice" 

31 owner_user_id: str | None = None # e.g., "user:bob" 

32 inherit_permissions: bool = False 

33 enabled: bool = True 

34 created_at: str | None = None 

35 client_secret: str | None = None # Only returned on creation/rotation 

36 

37 

38class ServicePrincipalManager: 

39 """Manage service principal lifecycle""" 

40 

41 def __init__(self, keycloak_client: KeycloakClient, openfga_client: OpenFGAClient | None): 

42 """ 

43 Initialize service principal manager 

44 

45 Args: 

46 keycloak_client: Keycloak client for user/client management 

47 openfga_client: OpenFGA client for authorization tuples (None if disabled) 

48 """ 

49 self.keycloak = keycloak_client 

50 self.openfga = openfga_client 

51 

52 async def create_service_principal( 

53 self, 

54 service_id: str, 

55 name: str, 

56 description: str, 

57 authentication_mode: str = "client_credentials", 

58 associated_user_id: str | None = None, 

59 owner_user_id: str | None = None, 

60 inherit_permissions: bool = False, 

61 ) -> ServicePrincipal: 

62 """ 

63 Create service principal in Keycloak and OpenFGA 

64 

65 Args: 

66 service_id: Unique identifier for service (e.g., "batch-etl-job") 

67 name: Human-readable name 

68 description: Purpose/description 

69 authentication_mode: "client_credentials" or "service_account_user" 

70 associated_user_id: Optional user to act as (e.g., "user:alice") 

71 owner_user_id: User who owns this service principal 

72 inherit_permissions: Whether to inherit permissions from associated user 

73 

74 Returns: 

75 ServicePrincipal with generated credentials 

76 

77 Raises: 

78 ValueError: If authentication_mode is invalid 

79 """ 

80 created_at = datetime.now(UTC).isoformat() 

81 

82 if authentication_mode == "client_credentials": 

83 client_secret = await self._create_client_credentials_service( 

84 service_id=service_id, 

85 name=name, 

86 description=description, 

87 associated_user_id=associated_user_id, 

88 owner_user_id=owner_user_id, 

89 inherit_permissions=inherit_permissions, 

90 ) 

91 

92 elif authentication_mode == "service_account_user": 

93 client_secret = await self._create_service_account_user( 

94 service_id=service_id, 

95 name=name, 

96 description=description, 

97 associated_user_id=associated_user_id, 

98 owner_user_id=owner_user_id, 

99 inherit_permissions=inherit_permissions, 

100 ) 

101 

102 else: 

103 msg = f"Invalid authentication mode: {authentication_mode}" 

104 raise ValueError(msg) 

105 

106 # Sync to OpenFGA 

107 await self._sync_to_openfga( 

108 service_id=service_id, 

109 associated_user_id=associated_user_id, 

110 owner_user_id=owner_user_id, 

111 inherit_permissions=inherit_permissions, 

112 ) 

113 

114 return ServicePrincipal( 

115 service_id=service_id, 

116 name=name, 

117 description=description, 

118 authentication_mode=authentication_mode, 

119 associated_user_id=associated_user_id, 

120 owner_user_id=owner_user_id, 

121 inherit_permissions=inherit_permissions, 

122 enabled=True, 

123 created_at=created_at, 

124 client_secret=client_secret, 

125 ) 

126 

127 async def _create_client_credentials_service( 

128 self, 

129 service_id: str, 

130 name: str, 

131 description: str, 

132 associated_user_id: str | None, 

133 owner_user_id: str | None, 

134 inherit_permissions: bool, 

135 ) -> str: 

136 """Create Keycloak client with service account enabled""" 

137 # Generate secure client secret 

138 client_secret = secrets.token_urlsafe(32) 

139 

140 client_config = { 

141 "clientId": service_id, 

142 "name": name, 

143 "description": description, 

144 "enabled": True, 

145 "serviceAccountsEnabled": True, # Enable service accounts 

146 "standardFlowEnabled": False, # No authorization code flow 

147 "directAccessGrantsEnabled": False, # No ROPC 

148 "implicitFlowEnabled": False, # No implicit flow 

149 "publicClient": False, # Confidential client 

150 "clientAuthenticatorType": "client-secret", 

151 "secret": client_secret, 

152 "attributes": { 

153 "associatedUserId": associated_user_id or "", 

154 "inheritPermissions": str(inherit_permissions).lower(), 

155 "owner": owner_user_id or "", 

156 "purpose": description, 

157 "createdAt": datetime.now(UTC).isoformat(), 

158 }, 

159 } 

160 

161 await self.keycloak.create_client(client_config) 

162 return client_secret 

163 

164 async def _create_service_account_user( 

165 self, 

166 service_id: str, 

167 name: str, 

168 description: str, 

169 associated_user_id: str | None, 

170 owner_user_id: str | None, 

171 inherit_permissions: bool, 

172 ) -> str: 

173 """Create Keycloak user marked as service account""" 

174 # Generate secure password 

175 password = secrets.token_urlsafe(32) 

176 

177 user_config = { 

178 "username": f"svc_{service_id}", 

179 "enabled": True, 

180 "email": f"svc-{service_id}@example.com", 

181 "emailVerified": True, 

182 "attributes": { 

183 "serviceAccount": "true", # Mark as service account 

184 "associatedUserId": associated_user_id or "", 

185 "inheritPermissions": str(inherit_permissions).lower(), 

186 "owner": owner_user_id or "", 

187 "purpose": description, 

188 "createdAt": datetime.now(UTC).isoformat(), 

189 }, 

190 "credentials": [{"type": "password", "value": password, "temporary": False}], 

191 "realmRoles": ["service-principal"], 

192 } 

193 

194 await self.keycloak.create_user(user_config) 

195 return password 

196 

197 async def _sync_to_openfga( 

198 self, 

199 service_id: str, 

200 associated_user_id: str | None, 

201 owner_user_id: str | None, 

202 inherit_permissions: bool, 

203 ) -> None: 

204 """ 

205 Sync service principal relationships to OpenFGA 

206 

207 Gracefully handles disabled OpenFGA (when self.openfga is None). 

208 """ 

209 # Guard: Skip OpenFGA sync if client is not available 

210 if self.openfga is None: 

211 return 

212 

213 tuples = [] 

214 

215 # Permission inheritance via acts_as 

216 if inherit_permissions and associated_user_id: 

217 tuples.append( 

218 { 

219 "user": f"service:{service_id}", 

220 "relation": "acts_as", 

221 "object": associated_user_id, 

222 } 

223 ) 

224 

225 # Ownership 

226 if owner_user_id: 

227 tuples.append( 

228 { 

229 "user": owner_user_id, 

230 "relation": "owner", 

231 "object": f"service_principal:{service_id}", 

232 } 

233 ) 

234 

235 if tuples: 

236 await self.openfga.write_tuples(tuples) 

237 

238 async def associate_with_user( 

239 self, 

240 service_id: str, 

241 user_id: str, 

242 inherit_permissions: bool = True, 

243 ) -> None: 

244 """ 

245 Associate service principal with user for permission inheritance 

246 

247 Args: 

248 service_id: Service principal identifier 

249 user_id: User to associate with (e.g., "user:alice") 

250 inherit_permissions: Whether to inherit user's permissions 

251 """ 

252 # Update Keycloak attributes 

253 await self.keycloak.update_client_attributes( 

254 service_id, 

255 { 

256 "associatedUserId": user_id, 

257 "inheritPermissions": str(inherit_permissions).lower(), 

258 }, 

259 ) 

260 

261 # Create OpenFGA tuple for permission inheritance 

262 if inherit_permissions and self.openfga is not None: 

263 await self.openfga.write_tuples( 

264 [ 

265 { 

266 "user": f"service:{service_id}", 

267 "relation": "acts_as", 

268 "object": user_id, 

269 } 

270 ] 

271 ) 

272 

273 async def rotate_secret(self, service_id: str) -> str: 

274 """ 

275 Rotate service principal secret 

276 

277 Args: 

278 service_id: Service principal identifier 

279 

280 Returns: 

281 New client secret (store securely, won't be shown again) 

282 """ 

283 new_secret = secrets.token_urlsafe(32) 

284 

285 # Update in Keycloak 

286 await self.keycloak.update_client_secret(service_id, new_secret) 

287 

288 return new_secret 

289 

290 async def list_service_principals(self, owner_user_id: str | None = None) -> list[ServicePrincipal]: 

291 """ 

292 List all service principals, optionally filtered by owner 

293 

294 Args: 

295 owner_user_id: Optional filter by owner (e.g., "user:bob") 

296 

297 Returns: 

298 List of ServicePrincipal objects 

299 """ 

300 service_principals = [] 

301 

302 # Query Keycloak for clients with serviceAccountsEnabled 

303 clients = await self.keycloak.get_clients(query={"serviceAccountsEnabled": True}) 

304 

305 for client in clients: 

306 attrs = client.get("attributes", {}) 

307 

308 # Filter by owner if specified 

309 if owner_user_id and attrs.get("owner") != owner_user_id: 

310 continue 

311 

312 service_principals.append( 

313 ServicePrincipal( 

314 service_id=client["clientId"], 

315 name=client.get("name", ""), 

316 description=client.get("description", ""), 

317 authentication_mode="client_credentials", 

318 associated_user_id=attrs.get("associatedUserId") or None, 

319 owner_user_id=attrs.get("owner") or None, 

320 inherit_permissions=attrs.get("inheritPermissions") == "true", 

321 enabled=client.get("enabled", True), 

322 created_at=attrs.get("createdAt"), 

323 client_secret=None, # Never return secrets in list 

324 ) 

325 ) 

326 

327 # Also query for service account users 

328 users = await self.keycloak.get_users(query={"serviceAccount": "true"}) 

329 

330 for user in users: 330 ↛ 331line 330 didn't jump to line 331 because the loop on line 330 never started

331 attrs = user.get("attributes", {}) 

332 

333 # Filter by owner if specified 

334 if owner_user_id and attrs.get("owner") != owner_user_id: 

335 continue 

336 

337 # Extract service_id from username (format: svc_<service_id>) 

338 username = user.get("username", "") 

339 service_id = username.replace("svc_", "") if username.startswith("svc_") else username 

340 

341 service_principals.append( 

342 ServicePrincipal( 

343 service_id=service_id, 

344 name=user.get("firstName", "") or user.get("username", ""), 

345 description=attrs.get("purpose", ""), 

346 authentication_mode="service_account_user", 

347 associated_user_id=attrs.get("associatedUserId") or None, 

348 owner_user_id=attrs.get("owner") or None, 

349 inherit_permissions=attrs.get("inheritPermissions") == "true", 

350 enabled=user.get("enabled", True), 

351 created_at=attrs.get("createdAt"), 

352 client_secret=None, 

353 ) 

354 ) 

355 

356 return service_principals 

357 

358 async def get_service_principal(self, service_id: str) -> ServicePrincipal | None: 

359 """ 

360 Get specific service principal by ID 

361 

362 Args: 

363 service_id: Service principal identifier 

364 

365 Returns: 

366 ServicePrincipal or None if not found 

367 """ 

368 # Try to find as client first 

369 try: 

370 client = await self.keycloak.get_client(service_id) 

371 if client and client.get("serviceAccountsEnabled"): 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true

372 attrs = client.get("attributes", {}) 

373 return ServicePrincipal( 

374 service_id=client["clientId"], 

375 name=client.get("name", ""), 

376 description=client.get("description", ""), 

377 authentication_mode="client_credentials", 

378 associated_user_id=attrs.get("associatedUserId") or None, 

379 owner_user_id=attrs.get("owner") or None, 

380 inherit_permissions=attrs.get("inheritPermissions") == "true", 

381 enabled=client.get("enabled", True), 

382 created_at=attrs.get("createdAt"), 

383 ) 

384 except Exception: 

385 pass 

386 

387 # Try to find as service account user 

388 try: 

389 user = await self.keycloak.get_user_by_username(f"svc_{service_id}") 

390 if user: 390 ↛ 407line 390 didn't jump to line 407 because the condition on line 390 was always true

391 attrs = user.get("attributes", {}) # type: ignore[attr-defined] 

392 if attrs.get("serviceAccount") == "true": 392 ↛ 407line 392 didn't jump to line 407 because the condition on line 392 was always true

393 return ServicePrincipal( 

394 service_id=service_id, 

395 name=user.get("firstName", "") or user.get("username", ""), # type: ignore[attr-defined] 

396 description=attrs.get("purpose", ""), 

397 authentication_mode="service_account_user", 

398 associated_user_id=attrs.get("associatedUserId") or None, 

399 owner_user_id=attrs.get("owner") or None, 

400 inherit_permissions=attrs.get("inheritPermissions") == "true", 

401 enabled=user.get("enabled", True), # type: ignore[attr-defined] 

402 created_at=attrs.get("createdAt"), 

403 ) 

404 except Exception: 

405 pass 

406 

407 return None 

408 

409 async def delete_service_principal(self, service_id: str) -> None: 

410 """ 

411 Delete service principal from Keycloak and OpenFGA 

412 

413 Args: 

414 service_id: Service principal identifier 

415 """ 

416 # Remove from Keycloak (try both client and user) 

417 try: 

418 await self.keycloak.delete_client(service_id) 

419 except Exception: 

420 try: 

421 await self.keycloak.delete_user(f"svc_{service_id}") 

422 except Exception: 

423 pass # May not exist 

424 

425 # Remove from OpenFGA (if available) 

426 if self.openfga is not None: 

427 await self.openfga.delete_tuples_for_object(f"service_principal:{service_id}")