Coverage for src / mcp_server_langgraph / auth / role_mapper.py: 87%

224 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Advanced Role Mapping Engine 

3 

4Provides flexible, configurable mapping of Keycloak roles/groups to OpenFGA tuples. 

5Supports: 

6- Simple role mappings 

7- Group pattern matching (regex) 

8- Conditional mappings based on attributes 

9- Role hierarchies 

10- Custom transformation rules 

11""" 

12 

13import re 

14from pathlib import Path 

15from typing import Any 

16 

17import yaml 

18from pydantic import BaseModel, ConfigDict, Field, field_validator 

19 

20from mcp_server_langgraph.auth.keycloak import KeycloakUser 

21from mcp_server_langgraph.observability.telemetry import logger 

22 

23# ============================================================================ 

24# Pydantic Models for Type-Safe Role Mapping Configuration 

25# ============================================================================ 

26 

27 

28class OpenFGATuple(BaseModel): 

29 """ 

30 Type-safe OpenFGA tuple structure 

31 

32 Represents a relationship tuple in OpenFGA. 

33 """ 

34 

35 user: str = Field(..., description="User ID (e.g., 'user:alice')") 

36 relation: str = Field(..., description="Relation type (e.g., 'member', 'admin')") 

37 object: str = Field(..., description="Object identifier (e.g., 'workspace:engineering')") 

38 

39 model_config = ConfigDict( 

40 frozen=False, 

41 validate_assignment=True, 

42 str_strip_whitespace=True, 

43 json_schema_extra={"example": {"user": "user:alice", "relation": "member", "object": "workspace:engineering"}}, 

44 ) 

45 

46 def to_dict(self) -> dict[str, str]: 

47 """Convert to dictionary for backward compatibility""" 

48 return self.model_dump() 

49 

50 @classmethod 

51 def from_dict(cls, data: dict[str, str]) -> "OpenFGATuple": 

52 """Create OpenFGATuple from dictionary""" 

53 return cls(**data) 

54 

55 

56class SimpleRoleMappingConfig(BaseModel): 

57 """ 

58 Configuration for simple role mapping 

59 """ 

60 

61 keycloak_role: str = Field(..., description="Keycloak role name") 

62 realm: bool = Field(True, description="Whether this is a realm role (vs client role)") 

63 openfga_relation: str = Field(..., description="OpenFGA relation to assign") 

64 openfga_object: str = Field(..., description="OpenFGA object to relate to") 

65 

66 model_config = ConfigDict(frozen=False, validate_assignment=True, str_strip_whitespace=True) 

67 

68 

69class GroupMappingConfig(BaseModel): 

70 """ 

71 Configuration for group pattern mapping 

72 """ 

73 

74 pattern: str = Field(..., description="Regex pattern to match groups") 

75 openfga_relation: str = Field(..., description="OpenFGA relation to assign") 

76 openfga_object_template: str = Field(..., description="Template for object name (e.g., 'workspace:{group_name}')") 

77 

78 model_config = ConfigDict(frozen=False, validate_assignment=True, str_strip_whitespace=True) 

79 

80 

81class ConditionConfig(BaseModel): 

82 """ 

83 Configuration for conditional mapping condition 

84 """ 

85 

86 attribute: str = Field(..., description="User attribute to check") 

87 operator: str = Field("==", description="Comparison operator (==, !=, in, >=, <=)") 

88 value: Any = Field(..., description="Value to compare against") 

89 

90 @field_validator("operator") 

91 @classmethod 

92 def validate_operator(cls, v: str) -> str: 

93 """Validate operator is supported""" 

94 supported = {"==", "!=", "in", ">=", "<="} 

95 if v not in supported: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 msg = f"Operator must be one of {supported}, got: {v}" 

97 raise ValueError(msg) 

98 return v 

99 

100 model_config = ConfigDict(frozen=False, validate_assignment=True, str_strip_whitespace=True) 

101 

102 

103class ConditionalMappingConfig(BaseModel): 

104 """ 

105 Configuration for conditional mapping 

106 """ 

107 

108 condition: ConditionConfig = Field(..., description="Condition to evaluate") 

109 openfga_tuples: list[dict[str, str]] = Field(..., description="Tuples to create if condition is met") 

110 

111 model_config = ConfigDict(frozen=False, validate_assignment=True) 

112 

113 

114class MappingRule: 

115 """Base class for mapping rules""" 

116 

117 def __init__(self, config: dict[str, Any]) -> None: 

118 self.config = config 

119 

120 def applies_to(self, user: KeycloakUser) -> bool: 

121 """Check if this rule applies to the user""" 

122 raise NotImplementedError 

123 

124 def generate_tuples(self, user: KeycloakUser) -> list[dict[str, str]]: 

125 """Generate OpenFGA tuples for this rule""" 

126 raise NotImplementedError 

127 

128 

129class SimpleRoleMapping(MappingRule): 

130 """Simple 1:1 role to tuple mapping""" 

131 

132 def __init__(self, config: dict[str, Any]) -> None: 

133 super().__init__(config) 

134 # Validate and store config as Pydantic model 

135 self.mapping_config = SimpleRoleMappingConfig(**config) 

136 self.keycloak_role = self.mapping_config.keycloak_role 

137 self.is_realm_role = self.mapping_config.realm 

138 self.openfga_relation = self.mapping_config.openfga_relation 

139 self.openfga_object = self.mapping_config.openfga_object 

140 

141 def applies_to(self, user: KeycloakUser) -> bool: 

142 """Check if user has the role""" 

143 if self.is_realm_role: 

144 return self.keycloak_role in user.realm_roles 

145 else: 

146 # Check client roles 

147 return any(self.keycloak_role in client_roles for client_roles in user.client_roles.values()) 

148 

149 def generate_tuples(self, user: KeycloakUser) -> list[dict[str, str]]: 

150 """Generate tuple if role matches""" 

151 if not self.applies_to(user): 

152 return [] 

153 

154 # Create Pydantic tuple, then convert to dict for backward compatibility 

155 tuple_obj = OpenFGATuple(user=user.user_id, relation=self.openfga_relation, object=self.openfga_object) 

156 return [tuple_obj.to_dict()] 

157 

158 

159class GroupMapping(MappingRule): 

160 """Pattern-based group mapping with regex""" 

161 

162 def __init__(self, config: dict[str, Any]) -> None: 

163 super().__init__(config) 

164 # Validate and store config as Pydantic model 

165 self.mapping_config = GroupMappingConfig(**config) 

166 self.pattern = re.compile(self.mapping_config.pattern) 

167 self.openfga_relation = self.mapping_config.openfga_relation 

168 self.openfga_object_template = self.mapping_config.openfga_object_template 

169 

170 def applies_to(self, user: KeycloakUser) -> bool: 

171 """Check if user has any matching groups""" 

172 return any(self.pattern.match(group) for group in user.groups) 

173 

174 def generate_tuples(self, user: KeycloakUser) -> list[dict[str, str]]: 

175 """Generate tuples for all matching groups""" 

176 tuples = [] 

177 

178 for group in user.groups: 

179 match = self.pattern.match(group) 

180 if match: 

181 # Extract group name from pattern 

182 group_name = match.groups()[-1] if match.groups() else group.strip("/").split("/")[-1] 

183 

184 # Apply template 

185 openfga_object = self.openfga_object_template.format(group_name=group_name) 

186 

187 # Create Pydantic tuple, then convert to dict 

188 tuple_obj = OpenFGATuple(user=user.user_id, relation=self.openfga_relation, object=openfga_object) 

189 tuples.append(tuple_obj.to_dict()) 

190 

191 return tuples 

192 

193 

194class ConditionalMapping(MappingRule): 

195 """Conditional mapping based on user attributes""" 

196 

197 def __init__(self, config: dict[str, Any]) -> None: 

198 super().__init__(config) 

199 # Validate and store config as Pydantic model 

200 self.mapping_config = ConditionalMappingConfig(**config) 

201 self.condition = self.mapping_config.condition 

202 self.attribute = self.condition.attribute 

203 self.operator = self.condition.operator 

204 self.value = self.condition.value 

205 self.openfga_tuples = self.mapping_config.openfga_tuples 

206 

207 def applies_to(self, user: KeycloakUser) -> bool: 

208 """Check if condition is met""" 

209 attr_value = user.attributes.get(self.attribute) 

210 

211 if attr_value is None: 

212 return False 

213 

214 # Handle list attributes 

215 if isinstance(attr_value, list): 215 ↛ 218line 215 didn't jump to line 218 because the condition on line 215 was always true

216 attr_value = attr_value[0] if attr_value else None 

217 

218 if attr_value is None: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 return False 

220 

221 # Apply operator (already validated by Pydantic) 

222 if self.operator == "==": 

223 return attr_value == self.value # type: ignore[no-any-return] 

224 elif self.operator == "!=": 

225 return attr_value != self.value # type: ignore[no-any-return] 

226 elif self.operator == "in": 

227 return attr_value in self.value 

228 elif self.operator == ">=": 228 ↛ 230line 228 didn't jump to line 230 because the condition on line 228 was always true

229 return float(attr_value) >= float(self.value) 

230 elif self.operator == "<=": 

231 return float(attr_value) <= float(self.value) 

232 else: 

233 logger.warning(f"Unknown operator: {self.operator}") 

234 return False 

235 

236 def generate_tuples(self, user: KeycloakUser) -> list[dict[str, str]]: 

237 """Generate tuples if condition is met""" 

238 if not self.applies_to(user): 

239 return [] 

240 

241 tuples = [] 

242 for tuple_config in self.openfga_tuples: 

243 # Create Pydantic tuple, then convert to dict 

244 tuple_obj = OpenFGATuple(user=user.user_id, relation=tuple_config["relation"], object=tuple_config["object"]) 

245 tuples.append(tuple_obj.to_dict()) 

246 

247 return tuples 

248 

249 

250class RoleMapper: 

251 """ 

252 Advanced role mapping engine 

253 

254 Loads configuration from YAML and applies mapping rules to generate 

255 OpenFGA tuples from Keycloak user data. 

256 """ 

257 

258 def __init__(self, config_path: str | None = None, config_dict: dict[str, Any] | None = None) -> None: 

259 """ 

260 Initialize role mapper 

261 

262 Args: 

263 config_path: Path to YAML configuration file 

264 config_dict: Configuration dictionary (alternative to file) 

265 """ 

266 self.rules: list[MappingRule] = [] 

267 self.hierarchies: dict[str, list[str]] = {} 

268 

269 # Load configuration 

270 if config_path: 

271 self.load_from_file(config_path) 

272 elif config_dict: 

273 self.load_from_dict(config_dict) 

274 else: 

275 # Use default hardcoded mapping for backward compatibility 

276 self._load_default_config() 

277 

278 logger.info(f"RoleMapper initialized with {len(self.rules)} rules") 

279 

280 def load_from_file(self, config_path: str) -> None: 

281 """Load configuration from YAML file""" 

282 path = Path(config_path) 

283 

284 if not path.exists(): 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true

285 logger.warning(f"Role mapping config not found: {config_path}, using defaults") 

286 self._load_default_config() 

287 return 

288 

289 try: 

290 with open(path) as f: 

291 config = yaml.safe_load(f) 

292 

293 self.load_from_dict(config) 

294 logger.info(f"Loaded role mapping config from {config_path}") 

295 

296 except Exception as e: 

297 logger.error(f"Failed to load role mapping config: {e}", exc_info=True) 

298 self._load_default_config() 

299 

300 def load_from_dict(self, config: dict[str, Any]) -> None: 

301 """Load configuration from dictionary""" 

302 self.rules = [] 

303 

304 # Load simple mappings 

305 for mapping in config.get("simple_mappings", []): 

306 self.rules.append(SimpleRoleMapping(mapping)) 

307 

308 # Load group mappings 

309 for mapping in config.get("group_mappings", []): 

310 self.rules.append(GroupMapping(mapping)) 

311 

312 # Load conditional mappings 

313 for mapping in config.get("conditional_mappings", []): 

314 self.rules.append(ConditionalMapping(mapping)) 

315 

316 # Load role hierarchies 

317 self.hierarchies = config.get("hierarchies", {}) 

318 

319 logger.info(f"Loaded {len(self.rules)} mapping rules") 

320 

321 def _load_default_config(self) -> None: 

322 """Load default hardcoded mapping for backward compatibility""" 

323 default_config = { 

324 "simple_mappings": [ 

325 {"keycloak_role": "admin", "realm": True, "openfga_relation": "admin", "openfga_object": "system:global"}, 

326 {"keycloak_role": "premium", "realm": True, "openfga_relation": "assignee", "openfga_object": "role:premium"}, 

327 {"keycloak_role": "user", "realm": True, "openfga_relation": "assignee", "openfga_object": "role:user"}, 

328 ], 

329 "group_mappings": [ 

330 { 

331 "pattern": "^/(?:.+/)?([^/]+)$", 

332 "openfga_relation": "member", 

333 "openfga_object_template": "organization:{group_name}", 

334 } 

335 ], 

336 } 

337 

338 self.load_from_dict(default_config) 

339 logger.info("Using default role mapping configuration") 

340 

341 async def map_user_to_tuples(self, user: KeycloakUser) -> list[dict[str, str]]: 

342 """ 

343 Map Keycloak user to OpenFGA tuples 

344 

345 Args: 

346 user: Keycloak user to map 

347 

348 Returns: 

349 List of OpenFGA tuples 

350 """ 

351 tuples = [] 

352 seen_tuples: set[tuple[str, ...]] = set() # Deduplicate 

353 

354 # Apply all mapping rules 

355 for rule in self.rules: 

356 try: 

357 rule_tuples = rule.generate_tuples(user) 

358 for t in rule_tuples: 

359 # Deduplicate using tuple representation 

360 tuple_key = (t["user"], t["relation"], t["object"]) 

361 if tuple_key not in seen_tuples: 

362 tuples.append(t) 

363 seen_tuples.add(tuple_key) 

364 except Exception as e: 

365 logger.error(f"Error applying mapping rule: {e}", exc_info=True) 

366 

367 # Apply role hierarchies 

368 tuples = self._apply_hierarchies(user, tuples) 

369 

370 logger.info( 

371 f"Mapped user to {len(tuples)} OpenFGA tuples", extra={"username": user.username, "tuple_count": len(tuples)} 

372 ) 

373 

374 return tuples 

375 

376 def _apply_hierarchies(self, user: KeycloakUser, tuples: list[dict[str, str]]) -> list[dict[str, str]]: 

377 """Apply role hierarchies to expand tuples""" 

378 if not self.hierarchies: 

379 return tuples 

380 

381 expanded_tuples = tuples.copy() 

382 seen_tuples = {(t["user"], t["relation"], t["object"]) for t in tuples} 

383 

384 # Find all roles user has 

385 user_roles = set() 

386 for tuple_data in tuples: 

387 if tuple_data["relation"] == "assignee" and tuple_data["object"].startswith("role:"): 

388 role_name = tuple_data["object"].split(":")[-1] 

389 user_roles.add(role_name) 

390 

391 # Expand with inherited roles 

392 for role in user_roles: 

393 if role in self.hierarchies: 393 ↛ 392line 393 didn't jump to line 392 because the condition on line 393 was always true

394 inherited_roles = self.hierarchies[role] 

395 for inherited_role in inherited_roles: 

396 tuple_key = (user.user_id, "assignee", f"role:{inherited_role}") 

397 if tuple_key not in seen_tuples: 397 ↛ 395line 397 didn't jump to line 395 because the condition on line 397 was always true

398 expanded_tuples.append( 

399 {"user": user.user_id, "relation": "assignee", "object": f"role:{inherited_role}"} 

400 ) 

401 seen_tuples.add(tuple_key) 

402 

403 if len(expanded_tuples) > len(tuples): 

404 logger.info(f"Role hierarchies expanded {len(tuples)} to {len(expanded_tuples)} tuples") 

405 

406 return expanded_tuples 

407 

408 def add_rule(self, rule: MappingRule) -> None: 

409 """Dynamically add a mapping rule""" 

410 self.rules.append(rule) 

411 logger.info(f"Added new mapping rule: {type(rule).__name__}") 

412 

413 def validate_config(self) -> list[str]: 

414 """ 

415 Validate configuration 

416 

417 Returns: 

418 List of validation errors (empty if valid) 

419 """ 

420 errors = [] 

421 

422 # Check that all rules are valid 

423 for i, rule in enumerate(self.rules): 

424 try: 

425 # Try to access required attributes 

426 if isinstance(rule, SimpleRoleMapping): 426 ↛ 430line 426 didn't jump to line 430 because the condition on line 426 was always true

427 _ = rule.keycloak_role 

428 _ = rule.openfga_relation 

429 _ = rule.openfga_object 

430 elif isinstance(rule, GroupMapping): 

431 _ = rule.pattern 

432 _ = rule.openfga_relation 

433 _ = rule.openfga_object_template 

434 elif isinstance(rule, ConditionalMapping): 

435 _ = rule.attribute 

436 _ = rule.openfga_tuples 

437 except Exception as e: 

438 errors.append(f"Rule {i}: {e}") 

439 

440 # Validate hierarchies 

441 for role, inherited in self.hierarchies.items(): 

442 if not isinstance(inherited, list): 

443 errors.append(f"Hierarchy for '{role}': must be a list") # type: ignore[unreachable] 

444 

445 # Check for circular dependencies 

446 if role in inherited: 

447 errors.append(f"Hierarchy for '{role}': circular dependency") 

448 

449 return errors