Coverage for src / mcp_server_langgraph / auth / role_mapper.py: 87%
224 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Advanced Role Mapping Engine
4Provides flexible, configurable mapping of Keycloak roles/groups to OpenFGA tuples.
5Supports:
6- Simple role mappings
7- Group pattern matching (regex)
8- Conditional mappings based on attributes
9- Role hierarchies
10- Custom transformation rules
11"""
13import re
14from pathlib import Path
15from typing import Any
17import yaml
18from pydantic import BaseModel, ConfigDict, Field, field_validator
20from mcp_server_langgraph.auth.keycloak import KeycloakUser
21from mcp_server_langgraph.observability.telemetry import logger
23# ============================================================================
24# Pydantic Models for Type-Safe Role Mapping Configuration
25# ============================================================================
28class OpenFGATuple(BaseModel):
29 """
30 Type-safe OpenFGA tuple structure
32 Represents a relationship tuple in OpenFGA.
33 """
35 user: str = Field(..., description="User ID (e.g., 'user:alice')")
36 relation: str = Field(..., description="Relation type (e.g., 'member', 'admin')")
37 object: str = Field(..., description="Object identifier (e.g., 'workspace:engineering')")
39 model_config = ConfigDict(
40 frozen=False,
41 validate_assignment=True,
42 str_strip_whitespace=True,
43 json_schema_extra={"example": {"user": "user:alice", "relation": "member", "object": "workspace:engineering"}},
44 )
46 def to_dict(self) -> dict[str, str]:
47 """Convert to dictionary for backward compatibility"""
48 return self.model_dump()
50 @classmethod
51 def from_dict(cls, data: dict[str, str]) -> "OpenFGATuple":
52 """Create OpenFGATuple from dictionary"""
53 return cls(**data)
56class SimpleRoleMappingConfig(BaseModel):
57 """
58 Configuration for simple role mapping
59 """
61 keycloak_role: str = Field(..., description="Keycloak role name")
62 realm: bool = Field(True, description="Whether this is a realm role (vs client role)")
63 openfga_relation: str = Field(..., description="OpenFGA relation to assign")
64 openfga_object: str = Field(..., description="OpenFGA object to relate to")
66 model_config = ConfigDict(frozen=False, validate_assignment=True, str_strip_whitespace=True)
69class GroupMappingConfig(BaseModel):
70 """
71 Configuration for group pattern mapping
72 """
74 pattern: str = Field(..., description="Regex pattern to match groups")
75 openfga_relation: str = Field(..., description="OpenFGA relation to assign")
76 openfga_object_template: str = Field(..., description="Template for object name (e.g., 'workspace:{group_name}')")
78 model_config = ConfigDict(frozen=False, validate_assignment=True, str_strip_whitespace=True)
81class ConditionConfig(BaseModel):
82 """
83 Configuration for conditional mapping condition
84 """
86 attribute: str = Field(..., description="User attribute to check")
87 operator: str = Field("==", description="Comparison operator (==, !=, in, >=, <=)")
88 value: Any = Field(..., description="Value to compare against")
90 @field_validator("operator")
91 @classmethod
92 def validate_operator(cls, v: str) -> str:
93 """Validate operator is supported"""
94 supported = {"==", "!=", "in", ">=", "<="}
95 if v not in supported: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true
96 msg = f"Operator must be one of {supported}, got: {v}"
97 raise ValueError(msg)
98 return v
100 model_config = ConfigDict(frozen=False, validate_assignment=True, str_strip_whitespace=True)
103class ConditionalMappingConfig(BaseModel):
104 """
105 Configuration for conditional mapping
106 """
108 condition: ConditionConfig = Field(..., description="Condition to evaluate")
109 openfga_tuples: list[dict[str, str]] = Field(..., description="Tuples to create if condition is met")
111 model_config = ConfigDict(frozen=False, validate_assignment=True)
114class MappingRule:
115 """Base class for mapping rules"""
117 def __init__(self, config: dict[str, Any]) -> None:
118 self.config = config
120 def applies_to(self, user: KeycloakUser) -> bool:
121 """Check if this rule applies to the user"""
122 raise NotImplementedError
124 def generate_tuples(self, user: KeycloakUser) -> list[dict[str, str]]:
125 """Generate OpenFGA tuples for this rule"""
126 raise NotImplementedError
129class SimpleRoleMapping(MappingRule):
130 """Simple 1:1 role to tuple mapping"""
132 def __init__(self, config: dict[str, Any]) -> None:
133 super().__init__(config)
134 # Validate and store config as Pydantic model
135 self.mapping_config = SimpleRoleMappingConfig(**config)
136 self.keycloak_role = self.mapping_config.keycloak_role
137 self.is_realm_role = self.mapping_config.realm
138 self.openfga_relation = self.mapping_config.openfga_relation
139 self.openfga_object = self.mapping_config.openfga_object
141 def applies_to(self, user: KeycloakUser) -> bool:
142 """Check if user has the role"""
143 if self.is_realm_role:
144 return self.keycloak_role in user.realm_roles
145 else:
146 # Check client roles
147 return any(self.keycloak_role in client_roles for client_roles in user.client_roles.values())
149 def generate_tuples(self, user: KeycloakUser) -> list[dict[str, str]]:
150 """Generate tuple if role matches"""
151 if not self.applies_to(user):
152 return []
154 # Create Pydantic tuple, then convert to dict for backward compatibility
155 tuple_obj = OpenFGATuple(user=user.user_id, relation=self.openfga_relation, object=self.openfga_object)
156 return [tuple_obj.to_dict()]
159class GroupMapping(MappingRule):
160 """Pattern-based group mapping with regex"""
162 def __init__(self, config: dict[str, Any]) -> None:
163 super().__init__(config)
164 # Validate and store config as Pydantic model
165 self.mapping_config = GroupMappingConfig(**config)
166 self.pattern = re.compile(self.mapping_config.pattern)
167 self.openfga_relation = self.mapping_config.openfga_relation
168 self.openfga_object_template = self.mapping_config.openfga_object_template
170 def applies_to(self, user: KeycloakUser) -> bool:
171 """Check if user has any matching groups"""
172 return any(self.pattern.match(group) for group in user.groups)
174 def generate_tuples(self, user: KeycloakUser) -> list[dict[str, str]]:
175 """Generate tuples for all matching groups"""
176 tuples = []
178 for group in user.groups:
179 match = self.pattern.match(group)
180 if match:
181 # Extract group name from pattern
182 group_name = match.groups()[-1] if match.groups() else group.strip("/").split("/")[-1]
184 # Apply template
185 openfga_object = self.openfga_object_template.format(group_name=group_name)
187 # Create Pydantic tuple, then convert to dict
188 tuple_obj = OpenFGATuple(user=user.user_id, relation=self.openfga_relation, object=openfga_object)
189 tuples.append(tuple_obj.to_dict())
191 return tuples
194class ConditionalMapping(MappingRule):
195 """Conditional mapping based on user attributes"""
197 def __init__(self, config: dict[str, Any]) -> None:
198 super().__init__(config)
199 # Validate and store config as Pydantic model
200 self.mapping_config = ConditionalMappingConfig(**config)
201 self.condition = self.mapping_config.condition
202 self.attribute = self.condition.attribute
203 self.operator = self.condition.operator
204 self.value = self.condition.value
205 self.openfga_tuples = self.mapping_config.openfga_tuples
207 def applies_to(self, user: KeycloakUser) -> bool:
208 """Check if condition is met"""
209 attr_value = user.attributes.get(self.attribute)
211 if attr_value is None:
212 return False
214 # Handle list attributes
215 if isinstance(attr_value, list): 215 ↛ 218line 215 didn't jump to line 218 because the condition on line 215 was always true
216 attr_value = attr_value[0] if attr_value else None
218 if attr_value is None: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 return False
221 # Apply operator (already validated by Pydantic)
222 if self.operator == "==":
223 return attr_value == self.value # type: ignore[no-any-return]
224 elif self.operator == "!=":
225 return attr_value != self.value # type: ignore[no-any-return]
226 elif self.operator == "in":
227 return attr_value in self.value
228 elif self.operator == ">=": 228 ↛ 230line 228 didn't jump to line 230 because the condition on line 228 was always true
229 return float(attr_value) >= float(self.value)
230 elif self.operator == "<=":
231 return float(attr_value) <= float(self.value)
232 else:
233 logger.warning(f"Unknown operator: {self.operator}")
234 return False
236 def generate_tuples(self, user: KeycloakUser) -> list[dict[str, str]]:
237 """Generate tuples if condition is met"""
238 if not self.applies_to(user):
239 return []
241 tuples = []
242 for tuple_config in self.openfga_tuples:
243 # Create Pydantic tuple, then convert to dict
244 tuple_obj = OpenFGATuple(user=user.user_id, relation=tuple_config["relation"], object=tuple_config["object"])
245 tuples.append(tuple_obj.to_dict())
247 return tuples
250class RoleMapper:
251 """
252 Advanced role mapping engine
254 Loads configuration from YAML and applies mapping rules to generate
255 OpenFGA tuples from Keycloak user data.
256 """
258 def __init__(self, config_path: str | None = None, config_dict: dict[str, Any] | None = None) -> None:
259 """
260 Initialize role mapper
262 Args:
263 config_path: Path to YAML configuration file
264 config_dict: Configuration dictionary (alternative to file)
265 """
266 self.rules: list[MappingRule] = []
267 self.hierarchies: dict[str, list[str]] = {}
269 # Load configuration
270 if config_path:
271 self.load_from_file(config_path)
272 elif config_dict:
273 self.load_from_dict(config_dict)
274 else:
275 # Use default hardcoded mapping for backward compatibility
276 self._load_default_config()
278 logger.info(f"RoleMapper initialized with {len(self.rules)} rules")
280 def load_from_file(self, config_path: str) -> None:
281 """Load configuration from YAML file"""
282 path = Path(config_path)
284 if not path.exists(): 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true
285 logger.warning(f"Role mapping config not found: {config_path}, using defaults")
286 self._load_default_config()
287 return
289 try:
290 with open(path) as f:
291 config = yaml.safe_load(f)
293 self.load_from_dict(config)
294 logger.info(f"Loaded role mapping config from {config_path}")
296 except Exception as e:
297 logger.error(f"Failed to load role mapping config: {e}", exc_info=True)
298 self._load_default_config()
300 def load_from_dict(self, config: dict[str, Any]) -> None:
301 """Load configuration from dictionary"""
302 self.rules = []
304 # Load simple mappings
305 for mapping in config.get("simple_mappings", []):
306 self.rules.append(SimpleRoleMapping(mapping))
308 # Load group mappings
309 for mapping in config.get("group_mappings", []):
310 self.rules.append(GroupMapping(mapping))
312 # Load conditional mappings
313 for mapping in config.get("conditional_mappings", []):
314 self.rules.append(ConditionalMapping(mapping))
316 # Load role hierarchies
317 self.hierarchies = config.get("hierarchies", {})
319 logger.info(f"Loaded {len(self.rules)} mapping rules")
321 def _load_default_config(self) -> None:
322 """Load default hardcoded mapping for backward compatibility"""
323 default_config = {
324 "simple_mappings": [
325 {"keycloak_role": "admin", "realm": True, "openfga_relation": "admin", "openfga_object": "system:global"},
326 {"keycloak_role": "premium", "realm": True, "openfga_relation": "assignee", "openfga_object": "role:premium"},
327 {"keycloak_role": "user", "realm": True, "openfga_relation": "assignee", "openfga_object": "role:user"},
328 ],
329 "group_mappings": [
330 {
331 "pattern": "^/(?:.+/)?([^/]+)$",
332 "openfga_relation": "member",
333 "openfga_object_template": "organization:{group_name}",
334 }
335 ],
336 }
338 self.load_from_dict(default_config)
339 logger.info("Using default role mapping configuration")
341 async def map_user_to_tuples(self, user: KeycloakUser) -> list[dict[str, str]]:
342 """
343 Map Keycloak user to OpenFGA tuples
345 Args:
346 user: Keycloak user to map
348 Returns:
349 List of OpenFGA tuples
350 """
351 tuples = []
352 seen_tuples: set[tuple[str, ...]] = set() # Deduplicate
354 # Apply all mapping rules
355 for rule in self.rules:
356 try:
357 rule_tuples = rule.generate_tuples(user)
358 for t in rule_tuples:
359 # Deduplicate using tuple representation
360 tuple_key = (t["user"], t["relation"], t["object"])
361 if tuple_key not in seen_tuples:
362 tuples.append(t)
363 seen_tuples.add(tuple_key)
364 except Exception as e:
365 logger.error(f"Error applying mapping rule: {e}", exc_info=True)
367 # Apply role hierarchies
368 tuples = self._apply_hierarchies(user, tuples)
370 logger.info(
371 f"Mapped user to {len(tuples)} OpenFGA tuples", extra={"username": user.username, "tuple_count": len(tuples)}
372 )
374 return tuples
376 def _apply_hierarchies(self, user: KeycloakUser, tuples: list[dict[str, str]]) -> list[dict[str, str]]:
377 """Apply role hierarchies to expand tuples"""
378 if not self.hierarchies:
379 return tuples
381 expanded_tuples = tuples.copy()
382 seen_tuples = {(t["user"], t["relation"], t["object"]) for t in tuples}
384 # Find all roles user has
385 user_roles = set()
386 for tuple_data in tuples:
387 if tuple_data["relation"] == "assignee" and tuple_data["object"].startswith("role:"):
388 role_name = tuple_data["object"].split(":")[-1]
389 user_roles.add(role_name)
391 # Expand with inherited roles
392 for role in user_roles:
393 if role in self.hierarchies: 393 ↛ 392line 393 didn't jump to line 392 because the condition on line 393 was always true
394 inherited_roles = self.hierarchies[role]
395 for inherited_role in inherited_roles:
396 tuple_key = (user.user_id, "assignee", f"role:{inherited_role}")
397 if tuple_key not in seen_tuples: 397 ↛ 395line 397 didn't jump to line 395 because the condition on line 397 was always true
398 expanded_tuples.append(
399 {"user": user.user_id, "relation": "assignee", "object": f"role:{inherited_role}"}
400 )
401 seen_tuples.add(tuple_key)
403 if len(expanded_tuples) > len(tuples):
404 logger.info(f"Role hierarchies expanded {len(tuples)} to {len(expanded_tuples)} tuples")
406 return expanded_tuples
408 def add_rule(self, rule: MappingRule) -> None:
409 """Dynamically add a mapping rule"""
410 self.rules.append(rule)
411 logger.info(f"Added new mapping rule: {type(rule).__name__}")
413 def validate_config(self) -> list[str]:
414 """
415 Validate configuration
417 Returns:
418 List of validation errors (empty if valid)
419 """
420 errors = []
422 # Check that all rules are valid
423 for i, rule in enumerate(self.rules):
424 try:
425 # Try to access required attributes
426 if isinstance(rule, SimpleRoleMapping): 426 ↛ 430line 426 didn't jump to line 430 because the condition on line 426 was always true
427 _ = rule.keycloak_role
428 _ = rule.openfga_relation
429 _ = rule.openfga_object
430 elif isinstance(rule, GroupMapping):
431 _ = rule.pattern
432 _ = rule.openfga_relation
433 _ = rule.openfga_object_template
434 elif isinstance(rule, ConditionalMapping):
435 _ = rule.attribute
436 _ = rule.openfga_tuples
437 except Exception as e:
438 errors.append(f"Rule {i}: {e}")
440 # Validate hierarchies
441 for role, inherited in self.hierarchies.items():
442 if not isinstance(inherited, list):
443 errors.append(f"Hierarchy for '{role}': must be a list") # type: ignore[unreachable]
445 # Check for circular dependencies
446 if role in inherited:
447 errors.append(f"Hierarchy for '{role}': circular dependency")
449 return errors