Coverage for src / mcp_server_langgraph / auth / openfga.py: 68%

235 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2OpenFGA integration for fine-grained relationship-based access control 

3 

4Enhanced with resilience patterns (ADR-0026): 

5- Circuit breaker for OpenFGA failures (fail-open by default) 

6- Retry logic with exponential backoff 

7- Timeout enforcement (5s for auth operations) 

8- Bulkhead isolation (50 concurrent auth checks max) 

9""" 

10 

11from typing import Any 

12 

13from openfga_sdk import ClientConfiguration, OpenFgaClient 

14from openfga_sdk.client.models import ClientCheckRequest, ClientTuple, ClientWriteRequest 

15from pydantic import BaseModel, ConfigDict, Field 

16 

17from mcp_server_langgraph.core.exceptions import OpenFGAError, OpenFGATimeoutError, OpenFGAUnavailableError 

18from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer 

19from mcp_server_langgraph.resilience import circuit_breaker, retry_with_backoff, with_bulkhead, with_timeout 

20 

21 

22class OpenFGAConfig(BaseModel): 

23 """ 

24 Type-safe OpenFGA configuration 

25 

26 Configuration for OpenFGA authorization service. 

27 """ 

28 

29 api_url: str = Field(default="http://localhost:8080", description="OpenFGA server API URL") 

30 store_id: str | None = Field(default=None, description="Authorization store ID") 

31 model_id: str | None = Field(default=None, description="Authorization model ID") 

32 

33 model_config = ConfigDict( 

34 frozen=False, 

35 validate_assignment=True, 

36 str_strip_whitespace=True, 

37 json_schema_extra={"example": {"api_url": "http://localhost:8080", "store_id": "01H...", "model_id": "01H..."}}, 

38 ) 

39 

40 def to_dict(self) -> dict[str, Any]: 

41 """Convert to dictionary for backward compatibility""" 

42 return self.model_dump(exclude_none=True) 

43 

44 @classmethod 

45 def from_dict(cls, data: dict[str, Any]) -> "OpenFGAConfig": 

46 """Create OpenFGAConfig from dictionary""" 

47 return cls(**data) 

48 

49 

50class OpenFGAClient: 

51 """ 

52 OpenFGA client for relationship-based authorization 

53 

54 Implements Zanzibar-style authorization with fine-grained permissions 

55 based on relationships between users, resources, and roles. 

56 """ 

57 

58 def __init__( 

59 self, 

60 config: OpenFGAConfig | None = None, 

61 api_url: str | None = None, 

62 store_id: str | None = None, 

63 model_id: str | None = None, 

64 ): 

65 """ 

66 Initialize OpenFGA client (lazy async initialization pattern) 

67 

68 NOTE: This __init__ is sync-safe and doesn't create async resources. 

69 The actual OpenFgaClient is created lazily on first async method call. 

70 

71 This prevents "RuntimeError: no running event loop" when instantiating 

72 from synchronous code or module imports. 

73 

74 Args: 

75 config: OpenFGAConfig instance (recommended) 

76 api_url: OpenFGA server URL (legacy, use config instead) 

77 store_id: Authorization store ID (legacy, use config instead) 

78 model_id: Authorization model ID (legacy, use config instead) 

79 """ 

80 # Support both new config-based and legacy parameter-based initialization 

81 if config is None: 

82 config = OpenFGAConfig(api_url=api_url or "http://localhost:8080", store_id=store_id, model_id=model_id) 

83 

84 self.config = config 

85 self.api_url = config.api_url 

86 self.store_id = config.store_id 

87 self.model_id = config.model_id 

88 

89 # Lazy initialization: Store configuration, don't create OpenFgaClient yet 

90 # This prevents creating aiohttp resources which require an event loop 

91 self._client: OpenFgaClient | None = None 

92 self._initialized = False 

93 

94 logger.info("OpenFGA client wrapper created (lazy init)", extra={"api_url": config.api_url}) 

95 

96 async def _ensure_initialized(self) -> None: 

97 """ 

98 Ensure OpenFgaClient is initialized (lazy async initialization) 

99 

100 This method creates the actual OpenFgaClient on first async call. 

101 Called by all async methods before performing operations. 

102 """ 

103 if not self._initialized: 

104 configuration = ClientConfiguration( 

105 api_url=self.config.api_url, 

106 store_id=self.config.store_id, 

107 authorization_model_id=self.config.model_id, 

108 ) 

109 self._client = OpenFgaClient(configuration) 

110 self._initialized = True 

111 logger.info("OpenFGA SDK client initialized", extra={"api_url": self.config.api_url}) 

112 

113 @property 

114 def client(self) -> OpenFgaClient: 

115 """ 

116 Get OpenFgaClient instance 

117 

118 NOTE: This property should only be accessed from async methods 

119 after calling _ensure_initialized(). 

120 """ 

121 if self._client is None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 msg = ( 

123 "OpenFgaClient not initialized. " 

124 "Call await _ensure_initialized() before accessing client. " 

125 "This is a bug - all async methods should call _ensure_initialized()." 

126 ) 

127 raise RuntimeError(msg) 

128 return self._client 

129 

130 def _circuit_breaker_fallback( 

131 self, user: str, relation: str, object: str, context: dict[str, Any] | None = None, critical: bool = True 

132 ) -> bool: 

133 """ 

134 Circuit breaker fallback for check_permission. 

135 

136 Security policy: 

137 - critical=True (default): Fail-closed (deny access) when circuit opens 

138 - critical=False: Fail-open (allow access) when circuit opens 

139 

140 Args: 

141 user: User identifier 

142 relation: Relation to check 

143 object: Object identifier 

144 context: Additional contextual data 

145 critical: If True, fail-closed; if False, fail-open 

146 

147 Returns: 

148 False for critical resources (fail-closed), True for non-critical (fail-open) 

149 """ 

150 if critical: 

151 logger.warning( 

152 "OpenFGA circuit breaker open: DENYING access to critical resource", 

153 extra={"user": user, "relation": relation, "object": object, "critical": critical}, 

154 ) 

155 return False # Fail-closed for critical resources 

156 else: 

157 logger.warning( 

158 "OpenFGA circuit breaker open: ALLOWING access to non-critical resource", 

159 extra={"user": user, "relation": relation, "object": object, "critical": critical}, 

160 ) 

161 return True # Fail-open for non-critical resources 

162 

163 @circuit_breaker( 

164 name="openfga", 

165 fail_max=10, 

166 timeout=30, 

167 fallback=lambda self, *args, **kwargs: self._circuit_breaker_fallback(*args, **kwargs), 

168 ) 

169 @retry_with_backoff() # Uses global config (prod: 3 attempts, test: 1 attempt for fast tests) 

170 @with_timeout(operation_type="auth") 

171 @with_bulkhead(resource_type="openfga") 

172 async def check_permission( 

173 self, user: str, relation: str, object: str, context: dict[str, Any] | None = None, critical: bool = True 

174 ) -> bool: 

175 """ 

176 Check if user has permission via relationship (with resilience protection). 

177 

178 Protected by: 

179 - Circuit breaker: Fail-closed (deny) by default when OpenFGA is down (10 failures → open, 30s timeout) 

180 - Retry logic: Configurable via global resilience config (default: 3 attempts with exponential backoff) 

181 - Timeout: 5s timeout for auth operations 

182 - Bulkhead: Limit to 50 concurrent auth checks 

183 

184 Args: 

185 user: User identifier (e.g., "user:123") 

186 relation: Relation to check (e.g., "can_read", "can_execute") 

187 object: Object identifier (e.g., "tool:chat", "resource:conversation_123") 

188 context: Additional contextual data for dynamic checks 

189 critical: If True (default), fail-closed when circuit opens; if False, fail-open 

190 

191 Returns: 

192 True if user has permission, False otherwise 

193 When circuit breaker is open: 

194 - False if critical=True (fail-closed, secure by default) 

195 - True if critical=False (fail-open, prefer availability) 

196 

197 Raises: 

198 CircuitBreakerOpenError: If circuit breaker is open (uses fallback for return value) 

199 OpenFGATimeoutError: If check exceeds 5s timeout 

200 OpenFGAError: For other OpenFGA errors 

201 """ 

202 await self._ensure_initialized() # Lazy initialization 

203 with tracer.start_as_current_span("openfga.check") as span: 

204 span.set_attribute("user", user) 

205 span.set_attribute("relation", relation) 

206 span.set_attribute("object", object) 

207 

208 try: 

209 request = ClientCheckRequest(user=user, relation=relation, object=object, contextual_tuples=[]) 

210 

211 response = await self.client.check(request) 

212 allowed = response.allowed 

213 

214 span.set_attribute("allowed", allowed) 

215 

216 logger.info( 

217 "Permission check", extra={"user": user, "relation": relation, "object": object, "allowed": allowed} 

218 ) 

219 

220 # Track metrics 

221 if allowed: 

222 metrics.successful_calls.add(1, {"operation": "check_permission"}) 

223 else: 

224 metrics.authz_failures.add(1, {"relation": relation}) 

225 

226 return allowed # type: ignore[no-any-return] 

227 

228 except Exception as e: 

229 error_msg = str(e).lower() 

230 

231 if "timeout" in error_msg or "timed out" in error_msg: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 raise OpenFGATimeoutError( 

233 message=f"OpenFGA check timed out: {e}", 

234 metadata={"user": user, "relation": relation, "object": object}, 

235 cause=e, 

236 ) 

237 elif "unavailable" in error_msg or "connection" in error_msg: 237 ↛ 244line 237 didn't jump to line 244 because the condition on line 237 was always true

238 raise OpenFGAUnavailableError( 

239 message=f"OpenFGA service unavailable: {e}", 

240 metadata={"user": user, "relation": relation, "object": object}, 

241 cause=e, 

242 ) 

243 else: 

244 logger.error( 

245 f"OpenFGA check failed: {e}", 

246 extra={"user": user, "relation": relation, "object": object}, 

247 exc_info=True, 

248 ) 

249 span.record_exception(e) 

250 metrics.failed_calls.add(1, {"operation": "check_permission"}) 

251 

252 raise OpenFGAError( 

253 message=f"OpenFGA error: {e}", 

254 metadata={"user": user, "relation": relation, "object": object}, 

255 cause=e, 

256 ) 

257 

258 @circuit_breaker(name="openfga") 

259 @retry_with_backoff() # Uses global config (prod: 3 attempts, test: 1 attempt for fast tests) 

260 @with_timeout(operation_type="auth") 

261 async def write_tuples(self, tuples: list[dict[str, str]]) -> None: 

262 """ 

263 Write relationship tuples to OpenFGA (with resilience protection). 

264 

265 Protected by: 

266 - Circuit breaker: Fail fast if OpenFGA is down 

267 - Retry logic: Configurable via global resilience config (default: 3 attempts, writes are idempotent) 

268 - Timeout: 5s timeout for auth operations 

269 

270 Args: 

271 tuples: List of relationship tuples 

272 Each tuple: {"user": "user:123", "relation": "member", "object": "org:acme"} 

273 

274 Raises: 

275 CircuitBreakerOpenError: If circuit breaker is open 

276 OpenFGAError: For OpenFGA errors 

277 """ 

278 await self._ensure_initialized() # Lazy initialization 

279 with tracer.start_as_current_span("openfga.write_tuples"): 

280 try: 

281 client_tuples = [ClientTuple(user=t["user"], relation=t["relation"], object=t["object"]) for t in tuples] 

282 

283 request = ClientWriteRequest(writes=client_tuples) 

284 await self.client.write(request) 

285 

286 logger.info("Tuples written to OpenFGA", extra={"count": len(tuples)}) 

287 

288 metrics.successful_calls.add(1, {"operation": "write_tuples"}) 

289 

290 except Exception as e: 

291 logger.error(f"Failed to write tuples: {e}", exc_info=True) 

292 metrics.failed_calls.add(1, {"operation": "write_tuples"}) 

293 

294 raise OpenFGAError( 

295 message=f"Failed to write tuples: {e}", 

296 metadata={"tuple_count": len(tuples)}, 

297 cause=e, 

298 ) 

299 

300 async def delete_tuples(self, tuples: list[dict[str, str]]) -> None: 

301 """ 

302 Delete relationship tuples from OpenFGA 

303 

304 Args: 

305 tuples: List of relationship tuples to delete 

306 """ 

307 await self._ensure_initialized() # Lazy initialization 

308 with tracer.start_as_current_span("openfga.delete_tuples"): 

309 try: 

310 client_tuples = [ClientTuple(user=t["user"], relation=t["relation"], object=t["object"]) for t in tuples] 

311 

312 request = ClientWriteRequest(deletes=client_tuples) 

313 await self.client.write(request) 

314 

315 logger.info("Tuples deleted from OpenFGA", extra={"count": len(tuples)}) 

316 

317 except Exception as e: 

318 logger.error(f"Failed to delete tuples: {e}", exc_info=True) 

319 raise 

320 

321 async def delete_tuples_for_object(self, object_id: str) -> None: 

322 """ 

323 Delete all tuples related to an object (helper for cleanup operations). 

324 

325 This implements proper cleanup for resource deletion, ensuring no orphaned 

326 authorization tuples remain in OpenFGA (important for GDPR compliance). 

327 

328 Args: 

329 object_id: Object identifier (e.g., "service_principal:batch-job") 

330 

331 Implementation: 

332 1. Extracts object type from object_id 

333 2. Gets model definition to find all available relations 

334 3. Expands each relation to find all users with permissions 

335 4. Deletes tuples in batches (100 per batch) with retry logic 

336 """ 

337 await self._ensure_initialized() # Lazy initialization 

338 logger.info(f"Deleting tuples for object: {object_id}") 

339 

340 # Extract object type from object_id (e.g., "service_principal:batch-job" -> "service_principal") 

341 object_type = object_id.split(":")[0] if ":" in object_id else object_id 

342 

343 # Get authorization model to determine available relations for this type 

344 model_def = OpenFGAAuthorizationModel.get_model_definition() 

345 type_defs = model_def.get("type_definitions", []) 

346 

347 # Find the type definition for this object type 

348 type_def = next((t for t in type_defs if t.get("type") == object_type), None) 

349 

350 if not type_def: 

351 logger.warning(f"Type '{object_type}' not found in authorization model, skipping cleanup") 

352 return 

353 

354 # Get all relations for this type 

355 relations = type_def.get("relations", {}) 

356 if not relations: 

357 logger.info(f"No relations defined for type '{object_type}', nothing to clean up") 

358 return 

359 

360 # Collect all tuples to delete across all relations 

361 tuples_to_delete: list[dict[str, str]] = [] 

362 

363 for relation_name in relations: 

364 try: 

365 # Expand relation to find all users with this permission 

366 expansion = await self.expand_relation(relation=relation_name, object=object_id) 

367 

368 # Extract users from expansion tree 

369 users = _extract_users_from_expansion(expansion) 

370 

371 # Build tuples for deletion 

372 for user in users: 

373 tuples_to_delete.append({"user": user, "relation": relation_name, "object": object_id}) 

374 

375 logger.debug( 

376 f"Found {len(users)} users with '{relation_name}' relation to {object_id}", 

377 extra={"object_id": object_id, "relation": relation_name, "user_count": len(users)}, 

378 ) 

379 

380 except Exception as e: 

381 # Log but continue with other relations 

382 logger.warning( 

383 f"Error expanding relation '{relation_name}' for {object_id}: {e}", 

384 extra={"object_id": object_id, "relation": relation_name, "error": str(e)}, 

385 ) 

386 

387 # Delete tuples in batches (100 per batch as per user requirement) 

388 if not tuples_to_delete: 

389 logger.info(f"No tuples found for {object_id}, nothing to delete") 

390 return 

391 

392 batch_size = 100 

393 total_batches = (len(tuples_to_delete) + batch_size - 1) // batch_size 

394 

395 logger.info( 

396 f"Deleting {len(tuples_to_delete)} tuples for {object_id} in {total_batches} batch(es)", 

397 extra={"object_id": object_id, "tuple_count": len(tuples_to_delete), "batch_count": total_batches}, 

398 ) 

399 

400 for i in range(0, len(tuples_to_delete), batch_size): 

401 batch = tuples_to_delete[i : i + batch_size] 

402 batch_num = i // batch_size + 1 

403 

404 try: 

405 await self.delete_tuples(batch) 

406 logger.info( 

407 f"Deleted batch {batch_num}/{total_batches} ({len(batch)} tuples)", 

408 extra={"object_id": object_id, "batch": batch_num, "tuples_in_batch": len(batch)}, 

409 ) 

410 except Exception as e: 

411 logger.error( 

412 f"Failed to delete batch {batch_num}/{total_batches} for {object_id}: {e}", 

413 extra={"object_id": object_id, "batch": batch_num, "error": str(e)}, 

414 ) 

415 # Note: Continue with next batch even if one fails 

416 # This ensures partial cleanup is better than no cleanup 

417 

418 async def list_objects(self, user: str, relation: str, object_type: str) -> list[str]: 

419 """ 

420 List all objects of a type that user has relation to 

421 

422 Args: 

423 user: User identifier 

424 relation: Relation to check 

425 object_type: Type of objects to list (e.g., "tool", "conversation") 

426 

427 Returns: 

428 List of object identifiers 

429 """ 

430 await self._ensure_initialized() # Lazy initialization 

431 with tracer.start_as_current_span("openfga.list_objects"): 

432 try: 

433 response = await self.client.list_objects(user=user, relation=relation, type=object_type) 

434 

435 objects = response.objects or [] 

436 

437 logger.info( 

438 "Objects listed", 

439 extra={"user": user, "relation": relation, "object_type": object_type, "count": len(objects)}, 

440 ) 

441 

442 return objects 

443 

444 except Exception as e: 

445 logger.error(f"Failed to list objects: {e}", exc_info=True) 

446 raise 

447 

448 async def expand_relation(self, relation: str, object: str) -> dict[str, Any]: 

449 """ 

450 Expand a relation to see all users with access 

451 

452 Args: 

453 relation: Relation to expand 

454 object: Object identifier 

455 

456 Returns: 

457 Tree structure showing all users with access 

458 """ 

459 await self._ensure_initialized() # Lazy initialization 

460 with tracer.start_as_current_span("openfga.expand"): 

461 try: 

462 response = await self.client.expand(relation=relation, object=object) 

463 

464 return response.tree.model_dump() if response.tree else {} 

465 

466 except Exception as e: 

467 logger.error(f"Failed to expand relation: {e}", exc_info=True) 

468 raise 

469 

470 

471def _extract_users_from_expansion(expansion: dict[str, Any]) -> list[str]: 

472 """ 

473 Extract all user IDs from an OpenFGA expansion tree. 

474 

475 Recursively traverses the expansion tree to find all leaf nodes containing users. 

476 

477 Args: 

478 expansion: Expansion tree from OpenFGA expand() call 

479 

480 Returns: 

481 List of user IDs (e.g., ["user:alice", "user:bob"]) 

482 

483 Example expansion structures: 

484 Simple leaf: {"leaf": {"users": {"users": ["user:alice"]}}} 

485 Union: {"union": {"nodes": [{"leaf": ...}, {"leaf": ...}]}} 

486 Empty: {} 

487 """ 

488 if not expansion: 

489 return [] 

490 

491 users: list[str] = [] 

492 

493 # Handle leaf nodes (direct user lists) 

494 if "leaf" in expansion: 

495 leaf = expansion["leaf"] 

496 if isinstance(leaf, dict) and "users" in leaf: 496 ↛ 504line 496 didn't jump to line 504 because the condition on line 496 was always true

497 user_data = leaf["users"] 

498 if isinstance(user_data, dict) and "users" in user_data: 498 ↛ 504line 498 didn't jump to line 504 because the condition on line 498 was always true

499 user_list = user_data["users"] 

500 if isinstance(user_list, list): 500 ↛ 504line 500 didn't jump to line 504 because the condition on line 500 was always true

501 users.extend(user_list) 

502 

503 # Handle union nodes (multiple children) 

504 if "union" in expansion: 

505 union = expansion["union"] 

506 if isinstance(union, dict) and "nodes" in union: 506 ↛ 513line 506 didn't jump to line 513 because the condition on line 506 was always true

507 nodes = union["nodes"] 

508 if isinstance(nodes, list): 508 ↛ 513line 508 didn't jump to line 513 because the condition on line 508 was always true

509 for node in nodes: 

510 users.extend(_extract_users_from_expansion(node)) 

511 

512 # Handle intersection nodes (all children must be true) 

513 if "intersection" in expansion: 513 ↛ 514line 513 didn't jump to line 514 because the condition on line 513 was never true

514 intersection = expansion["intersection"] 

515 if isinstance(intersection, dict) and "nodes" in intersection: 

516 nodes = intersection["nodes"] 

517 if isinstance(nodes, list): 

518 for node in nodes: 

519 users.extend(_extract_users_from_expansion(node)) 

520 

521 # Handle difference nodes (exclusion) 

522 if "difference" in expansion: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true

523 difference = expansion["difference"] 

524 if isinstance(difference, dict): 

525 # Base users 

526 if "base" in difference: 

527 users.extend(_extract_users_from_expansion(difference["base"])) 

528 # Subtract users are excluded, so we don't add them 

529 

530 return list(set(users)) # Deduplicate 

531 

532 

533class OpenFGAAuthorizationModel: 

534 """ 

535 Authorization model definition for the agent system 

536 

537 Defines types, relations, and permissions for the system. 

538 """ 

539 

540 @staticmethod 

541 def get_model_definition() -> dict[str, Any]: 

542 """ 

543 Get the authorization model definition 

544 

545 This defines: 

546 - user: Individual users 

547 - organization: Organizations that users belong to 

548 - tool: AI tools (chat, search, etc.) 

549 - conversation: Conversation threads 

550 - role: Roles that grant permissions 

551 - service_principal: Service accounts for machine-to-machine auth (ADR-0033) 

552 

553 Relations: 

554 - member: User is a member of organization 

555 - owner: User owns a resource 

556 - viewer: User can view a resource 

557 - executor: User can execute a tool 

558 - admin: User has admin privileges 

559 - acts_as: Service principal acts as user (permission inheritance, ADR-0039) 

560 """ 

561 return { 

562 "schema_version": "1.1", 

563 "type_definitions": [ 

564 {"type": "user", "relations": {}, "metadata": {"relations": {}}}, 

565 { 

566 "type": "organization", 

567 "relations": {"member": {"this": {}}, "admin": {"this": {}}}, 

568 "metadata": { 

569 "relations": { 

570 "member": {"directly_related_user_types": [{"type": "user"}]}, 

571 "admin": {"directly_related_user_types": [{"type": "user"}]}, 

572 } 

573 }, 

574 }, 

575 { 

576 "type": "tool", 

577 "relations": { 

578 "owner": {"this": {}}, 

579 "executor": { 

580 "union": { 

581 "child": [ 

582 {"this": {}}, 

583 {"computedUserset": {"relation": "owner"}}, 

584 { 

585 "tupleToUserset": { 

586 "tupleset": {"relation": "organization"}, 

587 "computedUserset": {"relation": "member"}, 

588 } 

589 }, 

590 ] 

591 } 

592 }, 

593 "organization": {"this": {}}, 

594 }, 

595 "metadata": { 

596 "relations": { 

597 "owner": {"directly_related_user_types": [{"type": "user"}]}, 

598 "executor": {"directly_related_user_types": [{"type": "user"}]}, 

599 "organization": {"directly_related_user_types": [{"type": "organization"}]}, 

600 } 

601 }, 

602 }, 

603 { 

604 "type": "conversation", 

605 "relations": { 

606 "owner": {"this": {}}, 

607 "viewer": {"union": {"child": [{"this": {}}, {"computedUserset": {"relation": "owner"}}]}}, 

608 "editor": {"union": {"child": [{"this": {}}, {"computedUserset": {"relation": "owner"}}]}}, 

609 }, 

610 "metadata": { 

611 "relations": { 

612 "owner": {"directly_related_user_types": [{"type": "user"}]}, 

613 "viewer": {"directly_related_user_types": [{"type": "user"}]}, 

614 "editor": {"directly_related_user_types": [{"type": "user"}]}, 

615 } 

616 }, 

617 }, 

618 { 

619 "type": "role", 

620 "relations": {"assignee": {"this": {}}}, 

621 "metadata": {"relations": {"assignee": {"directly_related_user_types": [{"type": "user"}]}}}, 

622 }, 

623 { 

624 "type": "service_principal", 

625 "relations": { 

626 "owner": {"this": {}}, 

627 "acts_as": {"this": {}}, 

628 "viewer": {"computedUserset": {"relation": "owner"}}, 

629 "editor": {"computedUserset": {"relation": "owner"}}, 

630 }, 

631 "metadata": { 

632 "relations": { 

633 "owner": {"directly_related_user_types": [{"type": "user"}]}, 

634 "acts_as": {"directly_related_user_types": [{"type": "user"}]}, 

635 "viewer": {"directly_related_user_types": [{"type": "user"}]}, 

636 "editor": {"directly_related_user_types": [{"type": "user"}]}, 

637 } 

638 }, 

639 }, 

640 ], 

641 } 

642 

643 

644async def initialize_openfga_store(client: OpenFGAClient) -> str: 

645 """ 

646 Initialize OpenFGA store with authorization model 

647 

648 Args: 

649 client: OpenFGA client instance 

650 

651 Returns: 

652 Store ID 

653 """ 

654 await client._ensure_initialized() # Lazy initialization 

655 with tracer.start_as_current_span("openfga.initialize_store"): 

656 try: 

657 # Create store 

658 store = await client.client.create_store(body={"name": "langgraph-agent-store"}) 

659 store_id = store.id 

660 

661 logger.info("OpenFGA store created", extra={"store_id": store_id}) 

662 

663 # Update client configuration 

664 client.store_id = store_id 

665 client.client.store_id = store_id 

666 

667 # Write authorization model 

668 model_def = OpenFGAAuthorizationModel.get_model_definition() 

669 model_response = await client.client.write_authorization_model(body=model_def) 

670 model_id = model_response.authorization_model_id 

671 

672 logger.info("OpenFGA authorization model created", extra={"model_id": model_id}) 

673 

674 # Update client with model ID 

675 client.model_id = model_id 

676 client.client.authorization_model_id = model_id 

677 

678 return store_id # type: ignore[no-any-return] 

679 

680 except Exception as e: 

681 logger.error(f"Failed to initialize OpenFGA store: {e}", exc_info=True) 

682 raise 

683 

684 

685async def seed_sample_data(client: OpenFGAClient) -> None: 

686 """ 

687 Seed sample relationship data for testing 

688 """ 

689 sample_tuples = [ 

690 # Organization memberships 

691 {"user": "user:alice", "relation": "member", "object": "organization:acme"}, 

692 {"user": "user:bob", "relation": "member", "object": "organization:acme"}, 

693 {"user": "user:alice", "relation": "admin", "object": "organization:acme"}, 

694 # Tool permissions 

695 {"user": "user:alice", "relation": "executor", "object": "tool:chat"}, 

696 {"user": "user:bob", "relation": "executor", "object": "tool:chat"}, 

697 {"user": "organization:acme", "relation": "organization", "object": "tool:chat"}, 

698 # Conversation ownership 

699 {"user": "user:alice", "relation": "owner", "object": "conversation:thread_1"}, 

700 {"user": "user:bob", "relation": "viewer", "object": "conversation:thread_1"}, 

701 # Role assignments 

702 {"user": "user:alice", "relation": "assignee", "object": "role:premium"}, 

703 {"user": "user:bob", "relation": "assignee", "object": "role:standard"}, 

704 ] 

705 

706 await client.write_tuples(sample_tuples) 

707 logger.info("Sample OpenFGA data seeded") 

708 

709 

710async def check_permission( 

711 user_id: str, 

712 relation: str, 

713 object: str, 

714 openfga_client: OpenFGAClient, 

715) -> bool: 

716 """ 

717 Check if user has permission with support for service principal inheritance. 

718 

719 This function implements permission checking with acts_as relationship support (ADR-0039). 

720 Service principals (user_id starting with "service:") can inherit permissions from 

721 associated users via the acts_as relationship. 

722 

723 Args: 

724 user_id: User or service principal ID (e.g., "user:alice" or "service:batch-job") 

725 relation: Relation to check (e.g., "viewer", "editor", "executor") 

726 object: Object to check permission on (e.g., "conversation:thread1") 

727 openfga_client: OpenFGA client instance 

728 

729 Returns: 

730 True if user/service has permission (directly or inherited), False otherwise 

731 

732 Example: 

733 >>> # Direct permission check 

734 >>> allowed = await check_permission("user:alice", "viewer", "conversation:1", openfga) 

735 >>> # Service principal with inherited permission 

736 >>> allowed = await check_permission("service:batch-job", "viewer", "conversation:1", openfga) 

737 """ 

738 # 1. Direct permission check 

739 has_direct_permission = await openfga_client.check_permission( 

740 user=user_id, 

741 relation=relation, 

742 object=object, 

743 ) 

744 

745 if has_direct_permission: 

746 return True 

747 

748 # 2. If service principal, check inherited permissions via acts_as 

749 if user_id.startswith("service:"): 

750 # List all users this service principal acts as 

751 try: 

752 # Query for acts_as relationships 

753 associated_users = await openfga_client.list_objects( 

754 user=user_id, 

755 relation="acts_as", 

756 object_type="user", 

757 ) 

758 

759 # Check if any associated user has the permission 

760 for associated_user in associated_users: 

761 user_has_permission = await openfga_client.check_permission( 

762 user=associated_user, 

763 relation=relation, 

764 object=object, 

765 ) 

766 

767 if user_has_permission: 

768 # Log inherited access for audit trail 

769 logger.info( 

770 f"{user_id} accessed {object} via inherited permission from {associated_user}", 

771 extra={ 

772 "service_principal": user_id, 

773 "associated_user": associated_user, 

774 "resource": object, 

775 "relation": relation, 

776 "permission_type": "inherited", 

777 }, 

778 ) 

779 return True 

780 

781 except Exception as e: 

782 logger.warning( 

783 f"Error checking acts_as relationships for {user_id}: {e}", 

784 exc_info=True, 

785 ) 

786 # Continue with denial if acts_as check fails 

787 

788 return False