Coverage for src / mcp_server_langgraph / auth / openfga.py: 68%
235 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2OpenFGA integration for fine-grained relationship-based access control
4Enhanced with resilience patterns (ADR-0026):
5- Circuit breaker for OpenFGA failures (fail-open by default)
6- Retry logic with exponential backoff
7- Timeout enforcement (5s for auth operations)
8- Bulkhead isolation (50 concurrent auth checks max)
9"""
11from typing import Any
13from openfga_sdk import ClientConfiguration, OpenFgaClient
14from openfga_sdk.client.models import ClientCheckRequest, ClientTuple, ClientWriteRequest
15from pydantic import BaseModel, ConfigDict, Field
17from mcp_server_langgraph.core.exceptions import OpenFGAError, OpenFGATimeoutError, OpenFGAUnavailableError
18from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer
19from mcp_server_langgraph.resilience import circuit_breaker, retry_with_backoff, with_bulkhead, with_timeout
22class OpenFGAConfig(BaseModel):
23 """
24 Type-safe OpenFGA configuration
26 Configuration for OpenFGA authorization service.
27 """
29 api_url: str = Field(default="http://localhost:8080", description="OpenFGA server API URL")
30 store_id: str | None = Field(default=None, description="Authorization store ID")
31 model_id: str | None = Field(default=None, description="Authorization model ID")
33 model_config = ConfigDict(
34 frozen=False,
35 validate_assignment=True,
36 str_strip_whitespace=True,
37 json_schema_extra={"example": {"api_url": "http://localhost:8080", "store_id": "01H...", "model_id": "01H..."}},
38 )
40 def to_dict(self) -> dict[str, Any]:
41 """Convert to dictionary for backward compatibility"""
42 return self.model_dump(exclude_none=True)
44 @classmethod
45 def from_dict(cls, data: dict[str, Any]) -> "OpenFGAConfig":
46 """Create OpenFGAConfig from dictionary"""
47 return cls(**data)
50class OpenFGAClient:
51 """
52 OpenFGA client for relationship-based authorization
54 Implements Zanzibar-style authorization with fine-grained permissions
55 based on relationships between users, resources, and roles.
56 """
58 def __init__(
59 self,
60 config: OpenFGAConfig | None = None,
61 api_url: str | None = None,
62 store_id: str | None = None,
63 model_id: str | None = None,
64 ):
65 """
66 Initialize OpenFGA client (lazy async initialization pattern)
68 NOTE: This __init__ is sync-safe and doesn't create async resources.
69 The actual OpenFgaClient is created lazily on first async method call.
71 This prevents "RuntimeError: no running event loop" when instantiating
72 from synchronous code or module imports.
74 Args:
75 config: OpenFGAConfig instance (recommended)
76 api_url: OpenFGA server URL (legacy, use config instead)
77 store_id: Authorization store ID (legacy, use config instead)
78 model_id: Authorization model ID (legacy, use config instead)
79 """
80 # Support both new config-based and legacy parameter-based initialization
81 if config is None:
82 config = OpenFGAConfig(api_url=api_url or "http://localhost:8080", store_id=store_id, model_id=model_id)
84 self.config = config
85 self.api_url = config.api_url
86 self.store_id = config.store_id
87 self.model_id = config.model_id
89 # Lazy initialization: Store configuration, don't create OpenFgaClient yet
90 # This prevents creating aiohttp resources which require an event loop
91 self._client: OpenFgaClient | None = None
92 self._initialized = False
94 logger.info("OpenFGA client wrapper created (lazy init)", extra={"api_url": config.api_url})
96 async def _ensure_initialized(self) -> None:
97 """
98 Ensure OpenFgaClient is initialized (lazy async initialization)
100 This method creates the actual OpenFgaClient on first async call.
101 Called by all async methods before performing operations.
102 """
103 if not self._initialized:
104 configuration = ClientConfiguration(
105 api_url=self.config.api_url,
106 store_id=self.config.store_id,
107 authorization_model_id=self.config.model_id,
108 )
109 self._client = OpenFgaClient(configuration)
110 self._initialized = True
111 logger.info("OpenFGA SDK client initialized", extra={"api_url": self.config.api_url})
113 @property
114 def client(self) -> OpenFgaClient:
115 """
116 Get OpenFgaClient instance
118 NOTE: This property should only be accessed from async methods
119 after calling _ensure_initialized().
120 """
121 if self._client is None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 msg = (
123 "OpenFgaClient not initialized. "
124 "Call await _ensure_initialized() before accessing client. "
125 "This is a bug - all async methods should call _ensure_initialized()."
126 )
127 raise RuntimeError(msg)
128 return self._client
130 def _circuit_breaker_fallback(
131 self, user: str, relation: str, object: str, context: dict[str, Any] | None = None, critical: bool = True
132 ) -> bool:
133 """
134 Circuit breaker fallback for check_permission.
136 Security policy:
137 - critical=True (default): Fail-closed (deny access) when circuit opens
138 - critical=False: Fail-open (allow access) when circuit opens
140 Args:
141 user: User identifier
142 relation: Relation to check
143 object: Object identifier
144 context: Additional contextual data
145 critical: If True, fail-closed; if False, fail-open
147 Returns:
148 False for critical resources (fail-closed), True for non-critical (fail-open)
149 """
150 if critical:
151 logger.warning(
152 "OpenFGA circuit breaker open: DENYING access to critical resource",
153 extra={"user": user, "relation": relation, "object": object, "critical": critical},
154 )
155 return False # Fail-closed for critical resources
156 else:
157 logger.warning(
158 "OpenFGA circuit breaker open: ALLOWING access to non-critical resource",
159 extra={"user": user, "relation": relation, "object": object, "critical": critical},
160 )
161 return True # Fail-open for non-critical resources
163 @circuit_breaker(
164 name="openfga",
165 fail_max=10,
166 timeout=30,
167 fallback=lambda self, *args, **kwargs: self._circuit_breaker_fallback(*args, **kwargs),
168 )
169 @retry_with_backoff() # Uses global config (prod: 3 attempts, test: 1 attempt for fast tests)
170 @with_timeout(operation_type="auth")
171 @with_bulkhead(resource_type="openfga")
172 async def check_permission(
173 self, user: str, relation: str, object: str, context: dict[str, Any] | None = None, critical: bool = True
174 ) -> bool:
175 """
176 Check if user has permission via relationship (with resilience protection).
178 Protected by:
179 - Circuit breaker: Fail-closed (deny) by default when OpenFGA is down (10 failures → open, 30s timeout)
180 - Retry logic: Configurable via global resilience config (default: 3 attempts with exponential backoff)
181 - Timeout: 5s timeout for auth operations
182 - Bulkhead: Limit to 50 concurrent auth checks
184 Args:
185 user: User identifier (e.g., "user:123")
186 relation: Relation to check (e.g., "can_read", "can_execute")
187 object: Object identifier (e.g., "tool:chat", "resource:conversation_123")
188 context: Additional contextual data for dynamic checks
189 critical: If True (default), fail-closed when circuit opens; if False, fail-open
191 Returns:
192 True if user has permission, False otherwise
193 When circuit breaker is open:
194 - False if critical=True (fail-closed, secure by default)
195 - True if critical=False (fail-open, prefer availability)
197 Raises:
198 CircuitBreakerOpenError: If circuit breaker is open (uses fallback for return value)
199 OpenFGATimeoutError: If check exceeds 5s timeout
200 OpenFGAError: For other OpenFGA errors
201 """
202 await self._ensure_initialized() # Lazy initialization
203 with tracer.start_as_current_span("openfga.check") as span:
204 span.set_attribute("user", user)
205 span.set_attribute("relation", relation)
206 span.set_attribute("object", object)
208 try:
209 request = ClientCheckRequest(user=user, relation=relation, object=object, contextual_tuples=[])
211 response = await self.client.check(request)
212 allowed = response.allowed
214 span.set_attribute("allowed", allowed)
216 logger.info(
217 "Permission check", extra={"user": user, "relation": relation, "object": object, "allowed": allowed}
218 )
220 # Track metrics
221 if allowed:
222 metrics.successful_calls.add(1, {"operation": "check_permission"})
223 else:
224 metrics.authz_failures.add(1, {"relation": relation})
226 return allowed # type: ignore[no-any-return]
228 except Exception as e:
229 error_msg = str(e).lower()
231 if "timeout" in error_msg or "timed out" in error_msg: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 raise OpenFGATimeoutError(
233 message=f"OpenFGA check timed out: {e}",
234 metadata={"user": user, "relation": relation, "object": object},
235 cause=e,
236 )
237 elif "unavailable" in error_msg or "connection" in error_msg: 237 ↛ 244line 237 didn't jump to line 244 because the condition on line 237 was always true
238 raise OpenFGAUnavailableError(
239 message=f"OpenFGA service unavailable: {e}",
240 metadata={"user": user, "relation": relation, "object": object},
241 cause=e,
242 )
243 else:
244 logger.error(
245 f"OpenFGA check failed: {e}",
246 extra={"user": user, "relation": relation, "object": object},
247 exc_info=True,
248 )
249 span.record_exception(e)
250 metrics.failed_calls.add(1, {"operation": "check_permission"})
252 raise OpenFGAError(
253 message=f"OpenFGA error: {e}",
254 metadata={"user": user, "relation": relation, "object": object},
255 cause=e,
256 )
258 @circuit_breaker(name="openfga")
259 @retry_with_backoff() # Uses global config (prod: 3 attempts, test: 1 attempt for fast tests)
260 @with_timeout(operation_type="auth")
261 async def write_tuples(self, tuples: list[dict[str, str]]) -> None:
262 """
263 Write relationship tuples to OpenFGA (with resilience protection).
265 Protected by:
266 - Circuit breaker: Fail fast if OpenFGA is down
267 - Retry logic: Configurable via global resilience config (default: 3 attempts, writes are idempotent)
268 - Timeout: 5s timeout for auth operations
270 Args:
271 tuples: List of relationship tuples
272 Each tuple: {"user": "user:123", "relation": "member", "object": "org:acme"}
274 Raises:
275 CircuitBreakerOpenError: If circuit breaker is open
276 OpenFGAError: For OpenFGA errors
277 """
278 await self._ensure_initialized() # Lazy initialization
279 with tracer.start_as_current_span("openfga.write_tuples"):
280 try:
281 client_tuples = [ClientTuple(user=t["user"], relation=t["relation"], object=t["object"]) for t in tuples]
283 request = ClientWriteRequest(writes=client_tuples)
284 await self.client.write(request)
286 logger.info("Tuples written to OpenFGA", extra={"count": len(tuples)})
288 metrics.successful_calls.add(1, {"operation": "write_tuples"})
290 except Exception as e:
291 logger.error(f"Failed to write tuples: {e}", exc_info=True)
292 metrics.failed_calls.add(1, {"operation": "write_tuples"})
294 raise OpenFGAError(
295 message=f"Failed to write tuples: {e}",
296 metadata={"tuple_count": len(tuples)},
297 cause=e,
298 )
300 async def delete_tuples(self, tuples: list[dict[str, str]]) -> None:
301 """
302 Delete relationship tuples from OpenFGA
304 Args:
305 tuples: List of relationship tuples to delete
306 """
307 await self._ensure_initialized() # Lazy initialization
308 with tracer.start_as_current_span("openfga.delete_tuples"):
309 try:
310 client_tuples = [ClientTuple(user=t["user"], relation=t["relation"], object=t["object"]) for t in tuples]
312 request = ClientWriteRequest(deletes=client_tuples)
313 await self.client.write(request)
315 logger.info("Tuples deleted from OpenFGA", extra={"count": len(tuples)})
317 except Exception as e:
318 logger.error(f"Failed to delete tuples: {e}", exc_info=True)
319 raise
321 async def delete_tuples_for_object(self, object_id: str) -> None:
322 """
323 Delete all tuples related to an object (helper for cleanup operations).
325 This implements proper cleanup for resource deletion, ensuring no orphaned
326 authorization tuples remain in OpenFGA (important for GDPR compliance).
328 Args:
329 object_id: Object identifier (e.g., "service_principal:batch-job")
331 Implementation:
332 1. Extracts object type from object_id
333 2. Gets model definition to find all available relations
334 3. Expands each relation to find all users with permissions
335 4. Deletes tuples in batches (100 per batch) with retry logic
336 """
337 await self._ensure_initialized() # Lazy initialization
338 logger.info(f"Deleting tuples for object: {object_id}")
340 # Extract object type from object_id (e.g., "service_principal:batch-job" -> "service_principal")
341 object_type = object_id.split(":")[0] if ":" in object_id else object_id
343 # Get authorization model to determine available relations for this type
344 model_def = OpenFGAAuthorizationModel.get_model_definition()
345 type_defs = model_def.get("type_definitions", [])
347 # Find the type definition for this object type
348 type_def = next((t for t in type_defs if t.get("type") == object_type), None)
350 if not type_def:
351 logger.warning(f"Type '{object_type}' not found in authorization model, skipping cleanup")
352 return
354 # Get all relations for this type
355 relations = type_def.get("relations", {})
356 if not relations:
357 logger.info(f"No relations defined for type '{object_type}', nothing to clean up")
358 return
360 # Collect all tuples to delete across all relations
361 tuples_to_delete: list[dict[str, str]] = []
363 for relation_name in relations:
364 try:
365 # Expand relation to find all users with this permission
366 expansion = await self.expand_relation(relation=relation_name, object=object_id)
368 # Extract users from expansion tree
369 users = _extract_users_from_expansion(expansion)
371 # Build tuples for deletion
372 for user in users:
373 tuples_to_delete.append({"user": user, "relation": relation_name, "object": object_id})
375 logger.debug(
376 f"Found {len(users)} users with '{relation_name}' relation to {object_id}",
377 extra={"object_id": object_id, "relation": relation_name, "user_count": len(users)},
378 )
380 except Exception as e:
381 # Log but continue with other relations
382 logger.warning(
383 f"Error expanding relation '{relation_name}' for {object_id}: {e}",
384 extra={"object_id": object_id, "relation": relation_name, "error": str(e)},
385 )
387 # Delete tuples in batches (100 per batch as per user requirement)
388 if not tuples_to_delete:
389 logger.info(f"No tuples found for {object_id}, nothing to delete")
390 return
392 batch_size = 100
393 total_batches = (len(tuples_to_delete) + batch_size - 1) // batch_size
395 logger.info(
396 f"Deleting {len(tuples_to_delete)} tuples for {object_id} in {total_batches} batch(es)",
397 extra={"object_id": object_id, "tuple_count": len(tuples_to_delete), "batch_count": total_batches},
398 )
400 for i in range(0, len(tuples_to_delete), batch_size):
401 batch = tuples_to_delete[i : i + batch_size]
402 batch_num = i // batch_size + 1
404 try:
405 await self.delete_tuples(batch)
406 logger.info(
407 f"Deleted batch {batch_num}/{total_batches} ({len(batch)} tuples)",
408 extra={"object_id": object_id, "batch": batch_num, "tuples_in_batch": len(batch)},
409 )
410 except Exception as e:
411 logger.error(
412 f"Failed to delete batch {batch_num}/{total_batches} for {object_id}: {e}",
413 extra={"object_id": object_id, "batch": batch_num, "error": str(e)},
414 )
415 # Note: Continue with next batch even if one fails
416 # This ensures partial cleanup is better than no cleanup
418 async def list_objects(self, user: str, relation: str, object_type: str) -> list[str]:
419 """
420 List all objects of a type that user has relation to
422 Args:
423 user: User identifier
424 relation: Relation to check
425 object_type: Type of objects to list (e.g., "tool", "conversation")
427 Returns:
428 List of object identifiers
429 """
430 await self._ensure_initialized() # Lazy initialization
431 with tracer.start_as_current_span("openfga.list_objects"):
432 try:
433 response = await self.client.list_objects(user=user, relation=relation, type=object_type)
435 objects = response.objects or []
437 logger.info(
438 "Objects listed",
439 extra={"user": user, "relation": relation, "object_type": object_type, "count": len(objects)},
440 )
442 return objects
444 except Exception as e:
445 logger.error(f"Failed to list objects: {e}", exc_info=True)
446 raise
448 async def expand_relation(self, relation: str, object: str) -> dict[str, Any]:
449 """
450 Expand a relation to see all users with access
452 Args:
453 relation: Relation to expand
454 object: Object identifier
456 Returns:
457 Tree structure showing all users with access
458 """
459 await self._ensure_initialized() # Lazy initialization
460 with tracer.start_as_current_span("openfga.expand"):
461 try:
462 response = await self.client.expand(relation=relation, object=object)
464 return response.tree.model_dump() if response.tree else {}
466 except Exception as e:
467 logger.error(f"Failed to expand relation: {e}", exc_info=True)
468 raise
471def _extract_users_from_expansion(expansion: dict[str, Any]) -> list[str]:
472 """
473 Extract all user IDs from an OpenFGA expansion tree.
475 Recursively traverses the expansion tree to find all leaf nodes containing users.
477 Args:
478 expansion: Expansion tree from OpenFGA expand() call
480 Returns:
481 List of user IDs (e.g., ["user:alice", "user:bob"])
483 Example expansion structures:
484 Simple leaf: {"leaf": {"users": {"users": ["user:alice"]}}}
485 Union: {"union": {"nodes": [{"leaf": ...}, {"leaf": ...}]}}
486 Empty: {}
487 """
488 if not expansion:
489 return []
491 users: list[str] = []
493 # Handle leaf nodes (direct user lists)
494 if "leaf" in expansion:
495 leaf = expansion["leaf"]
496 if isinstance(leaf, dict) and "users" in leaf: 496 ↛ 504line 496 didn't jump to line 504 because the condition on line 496 was always true
497 user_data = leaf["users"]
498 if isinstance(user_data, dict) and "users" in user_data: 498 ↛ 504line 498 didn't jump to line 504 because the condition on line 498 was always true
499 user_list = user_data["users"]
500 if isinstance(user_list, list): 500 ↛ 504line 500 didn't jump to line 504 because the condition on line 500 was always true
501 users.extend(user_list)
503 # Handle union nodes (multiple children)
504 if "union" in expansion:
505 union = expansion["union"]
506 if isinstance(union, dict) and "nodes" in union: 506 ↛ 513line 506 didn't jump to line 513 because the condition on line 506 was always true
507 nodes = union["nodes"]
508 if isinstance(nodes, list): 508 ↛ 513line 508 didn't jump to line 513 because the condition on line 508 was always true
509 for node in nodes:
510 users.extend(_extract_users_from_expansion(node))
512 # Handle intersection nodes (all children must be true)
513 if "intersection" in expansion: 513 ↛ 514line 513 didn't jump to line 514 because the condition on line 513 was never true
514 intersection = expansion["intersection"]
515 if isinstance(intersection, dict) and "nodes" in intersection:
516 nodes = intersection["nodes"]
517 if isinstance(nodes, list):
518 for node in nodes:
519 users.extend(_extract_users_from_expansion(node))
521 # Handle difference nodes (exclusion)
522 if "difference" in expansion: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true
523 difference = expansion["difference"]
524 if isinstance(difference, dict):
525 # Base users
526 if "base" in difference:
527 users.extend(_extract_users_from_expansion(difference["base"]))
528 # Subtract users are excluded, so we don't add them
530 return list(set(users)) # Deduplicate
533class OpenFGAAuthorizationModel:
534 """
535 Authorization model definition for the agent system
537 Defines types, relations, and permissions for the system.
538 """
540 @staticmethod
541 def get_model_definition() -> dict[str, Any]:
542 """
543 Get the authorization model definition
545 This defines:
546 - user: Individual users
547 - organization: Organizations that users belong to
548 - tool: AI tools (chat, search, etc.)
549 - conversation: Conversation threads
550 - role: Roles that grant permissions
551 - service_principal: Service accounts for machine-to-machine auth (ADR-0033)
553 Relations:
554 - member: User is a member of organization
555 - owner: User owns a resource
556 - viewer: User can view a resource
557 - executor: User can execute a tool
558 - admin: User has admin privileges
559 - acts_as: Service principal acts as user (permission inheritance, ADR-0039)
560 """
561 return {
562 "schema_version": "1.1",
563 "type_definitions": [
564 {"type": "user", "relations": {}, "metadata": {"relations": {}}},
565 {
566 "type": "organization",
567 "relations": {"member": {"this": {}}, "admin": {"this": {}}},
568 "metadata": {
569 "relations": {
570 "member": {"directly_related_user_types": [{"type": "user"}]},
571 "admin": {"directly_related_user_types": [{"type": "user"}]},
572 }
573 },
574 },
575 {
576 "type": "tool",
577 "relations": {
578 "owner": {"this": {}},
579 "executor": {
580 "union": {
581 "child": [
582 {"this": {}},
583 {"computedUserset": {"relation": "owner"}},
584 {
585 "tupleToUserset": {
586 "tupleset": {"relation": "organization"},
587 "computedUserset": {"relation": "member"},
588 }
589 },
590 ]
591 }
592 },
593 "organization": {"this": {}},
594 },
595 "metadata": {
596 "relations": {
597 "owner": {"directly_related_user_types": [{"type": "user"}]},
598 "executor": {"directly_related_user_types": [{"type": "user"}]},
599 "organization": {"directly_related_user_types": [{"type": "organization"}]},
600 }
601 },
602 },
603 {
604 "type": "conversation",
605 "relations": {
606 "owner": {"this": {}},
607 "viewer": {"union": {"child": [{"this": {}}, {"computedUserset": {"relation": "owner"}}]}},
608 "editor": {"union": {"child": [{"this": {}}, {"computedUserset": {"relation": "owner"}}]}},
609 },
610 "metadata": {
611 "relations": {
612 "owner": {"directly_related_user_types": [{"type": "user"}]},
613 "viewer": {"directly_related_user_types": [{"type": "user"}]},
614 "editor": {"directly_related_user_types": [{"type": "user"}]},
615 }
616 },
617 },
618 {
619 "type": "role",
620 "relations": {"assignee": {"this": {}}},
621 "metadata": {"relations": {"assignee": {"directly_related_user_types": [{"type": "user"}]}}},
622 },
623 {
624 "type": "service_principal",
625 "relations": {
626 "owner": {"this": {}},
627 "acts_as": {"this": {}},
628 "viewer": {"computedUserset": {"relation": "owner"}},
629 "editor": {"computedUserset": {"relation": "owner"}},
630 },
631 "metadata": {
632 "relations": {
633 "owner": {"directly_related_user_types": [{"type": "user"}]},
634 "acts_as": {"directly_related_user_types": [{"type": "user"}]},
635 "viewer": {"directly_related_user_types": [{"type": "user"}]},
636 "editor": {"directly_related_user_types": [{"type": "user"}]},
637 }
638 },
639 },
640 ],
641 }
644async def initialize_openfga_store(client: OpenFGAClient) -> str:
645 """
646 Initialize OpenFGA store with authorization model
648 Args:
649 client: OpenFGA client instance
651 Returns:
652 Store ID
653 """
654 await client._ensure_initialized() # Lazy initialization
655 with tracer.start_as_current_span("openfga.initialize_store"):
656 try:
657 # Create store
658 store = await client.client.create_store(body={"name": "langgraph-agent-store"})
659 store_id = store.id
661 logger.info("OpenFGA store created", extra={"store_id": store_id})
663 # Update client configuration
664 client.store_id = store_id
665 client.client.store_id = store_id
667 # Write authorization model
668 model_def = OpenFGAAuthorizationModel.get_model_definition()
669 model_response = await client.client.write_authorization_model(body=model_def)
670 model_id = model_response.authorization_model_id
672 logger.info("OpenFGA authorization model created", extra={"model_id": model_id})
674 # Update client with model ID
675 client.model_id = model_id
676 client.client.authorization_model_id = model_id
678 return store_id # type: ignore[no-any-return]
680 except Exception as e:
681 logger.error(f"Failed to initialize OpenFGA store: {e}", exc_info=True)
682 raise
685async def seed_sample_data(client: OpenFGAClient) -> None:
686 """
687 Seed sample relationship data for testing
688 """
689 sample_tuples = [
690 # Organization memberships
691 {"user": "user:alice", "relation": "member", "object": "organization:acme"},
692 {"user": "user:bob", "relation": "member", "object": "organization:acme"},
693 {"user": "user:alice", "relation": "admin", "object": "organization:acme"},
694 # Tool permissions
695 {"user": "user:alice", "relation": "executor", "object": "tool:chat"},
696 {"user": "user:bob", "relation": "executor", "object": "tool:chat"},
697 {"user": "organization:acme", "relation": "organization", "object": "tool:chat"},
698 # Conversation ownership
699 {"user": "user:alice", "relation": "owner", "object": "conversation:thread_1"},
700 {"user": "user:bob", "relation": "viewer", "object": "conversation:thread_1"},
701 # Role assignments
702 {"user": "user:alice", "relation": "assignee", "object": "role:premium"},
703 {"user": "user:bob", "relation": "assignee", "object": "role:standard"},
704 ]
706 await client.write_tuples(sample_tuples)
707 logger.info("Sample OpenFGA data seeded")
710async def check_permission(
711 user_id: str,
712 relation: str,
713 object: str,
714 openfga_client: OpenFGAClient,
715) -> bool:
716 """
717 Check if user has permission with support for service principal inheritance.
719 This function implements permission checking with acts_as relationship support (ADR-0039).
720 Service principals (user_id starting with "service:") can inherit permissions from
721 associated users via the acts_as relationship.
723 Args:
724 user_id: User or service principal ID (e.g., "user:alice" or "service:batch-job")
725 relation: Relation to check (e.g., "viewer", "editor", "executor")
726 object: Object to check permission on (e.g., "conversation:thread1")
727 openfga_client: OpenFGA client instance
729 Returns:
730 True if user/service has permission (directly or inherited), False otherwise
732 Example:
733 >>> # Direct permission check
734 >>> allowed = await check_permission("user:alice", "viewer", "conversation:1", openfga)
735 >>> # Service principal with inherited permission
736 >>> allowed = await check_permission("service:batch-job", "viewer", "conversation:1", openfga)
737 """
738 # 1. Direct permission check
739 has_direct_permission = await openfga_client.check_permission(
740 user=user_id,
741 relation=relation,
742 object=object,
743 )
745 if has_direct_permission:
746 return True
748 # 2. If service principal, check inherited permissions via acts_as
749 if user_id.startswith("service:"):
750 # List all users this service principal acts as
751 try:
752 # Query for acts_as relationships
753 associated_users = await openfga_client.list_objects(
754 user=user_id,
755 relation="acts_as",
756 object_type="user",
757 )
759 # Check if any associated user has the permission
760 for associated_user in associated_users:
761 user_has_permission = await openfga_client.check_permission(
762 user=associated_user,
763 relation=relation,
764 object=object,
765 )
767 if user_has_permission:
768 # Log inherited access for audit trail
769 logger.info(
770 f"{user_id} accessed {object} via inherited permission from {associated_user}",
771 extra={
772 "service_principal": user_id,
773 "associated_user": associated_user,
774 "resource": object,
775 "relation": relation,
776 "permission_type": "inherited",
777 },
778 )
779 return True
781 except Exception as e:
782 logger.warning(
783 f"Error checking acts_as relationships for {user_id}: {e}",
784 exc_info=True,
785 )
786 # Continue with denial if acts_as check fails
788 return False