Coverage for src / mcp_server_langgraph / scim / schema.py: 61%
181 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2SCIM 2.0 Schema Validation
4Implements SCIM 2.0 schema validation according to RFC 7643.
5Validates User and Group resources against SCIM Core Schema and Enterprise Extension.
7See ADR-0038 for SCIM implementation approach.
9References:
10- RFC 7643: SCIM Core Schema
11- RFC 7644: SCIM Protocol
12"""
14from enum import Enum
15from typing import Any
17from pydantic import BaseModel, ConfigDict, Field, field_validator
19# SCIM Schema URNs
20SCIM_USER_SCHEMA = "urn:ietf:params:scim:schemas:core:2.0:User"
21SCIM_GROUP_SCHEMA = "urn:ietf:params:scim:schemas:core:2.0:Group"
22SCIM_ENTERPRISE_USER_SCHEMA = "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User"
25class SCIMEmail(BaseModel):
26 """SCIM email address"""
28 value: str
29 type: str | None = "work" # work, home, other
30 primary: bool = False
33class SCIMName(BaseModel):
34 """SCIM user name"""
36 formatted: str | None = None
37 familyName: str | None = None
38 givenName: str | None = None
39 middleName: str | None = None
40 honorificPrefix: str | None = None
41 honorificSuffix: str | None = None
44class SCIMPhoneNumber(BaseModel):
45 """SCIM phone number"""
47 value: str
48 type: str | None = "work"
49 primary: bool = False
52class SCIMAddress(BaseModel):
53 """SCIM address"""
55 formatted: str | None = None
56 streetAddress: str | None = None
57 locality: str | None = None
58 region: str | None = None
59 postalCode: str | None = None
60 country: str | None = None
61 type: str | None = "work"
62 primary: bool = False
65class SCIMGroupMembership(BaseModel):
66 """SCIM group membership"""
68 model_config = ConfigDict(
69 populate_by_name=True,
70 # Fix JSON schema to avoid $ref conflicts
71 json_schema_extra=lambda schema, model: schema.update(
72 {"properties": {k if k != "$ref" else "ref": v for k, v in schema.get("properties", {}).items()}}
73 ),
74 )
76 value: str # Group ID
77 # Use 'reference' as field name, serialize as '$ref' for SCIM compliance
78 reference: str | None = Field(None, serialization_alias="$ref", validation_alias="$ref")
79 display: str | None = None
80 type: str | None = "direct"
83class SCIMEnterpriseUser(BaseModel):
84 """SCIM Enterprise User Extension (RFC 7643 Section 4.3)"""
86 employeeNumber: str | None = None
87 costCenter: str | None = None
88 organization: str | None = None
89 division: str | None = None
90 department: str | None = None
91 manager: dict[str, str] | None = None # {value: manager_id, ref: url, displayName: name}
94class SCIMUser(BaseModel):
95 """
96 SCIM 2.0 User Resource
98 Core schema with optional Enterprise extension.
99 """
101 schemas: list[str] = Field(default=[SCIM_USER_SCHEMA])
102 id: str | None = None
103 externalId: str | None = None
104 userName: str
105 name: SCIMName | None = None
106 displayName: str | None = None
107 nickName: str | None = None
108 profileUrl: str | None = None
109 title: str | None = None
110 userType: str | None = None
111 preferredLanguage: str | None = None
112 locale: str | None = None
113 timezone: str | None = None
114 active: bool = True
115 password: str | None = None
116 emails: list[SCIMEmail] = []
117 phoneNumbers: list[SCIMPhoneNumber] = []
118 addresses: list[SCIMAddress] = []
119 groups: list[SCIMGroupMembership] = []
121 # Meta
122 meta: dict[str, Any] | None = None
124 # Enterprise extension
125 enterpriseUser: SCIMEnterpriseUser | None = Field(None, alias="urn:ietf:params:scim:schemas:extension:enterprise:2.0:User")
127 @field_validator("schemas")
128 @classmethod
129 def validate_schemas(cls, v: list[str]) -> list[str]:
130 """Validate that required schemas are present"""
131 if SCIM_USER_SCHEMA not in v: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 msg = f"Missing required schema: {SCIM_USER_SCHEMA}"
133 raise ValueError(msg)
134 return v
136 @field_validator("emails")
137 @classmethod
138 def validate_primary_email(cls, v: list[SCIMEmail]) -> list[SCIMEmail]:
139 """Ensure at most one primary email"""
140 primary_count = sum(1 for email in v if email.primary)
141 if primary_count > 1: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 msg = "Only one email can be marked as primary"
143 raise ValueError(msg)
144 return v
147class SCIMMember(BaseModel):
148 """SCIM group member"""
150 model_config = ConfigDict(
151 populate_by_name=True,
152 # Fix JSON schema to avoid $ref conflicts
153 json_schema_extra=lambda schema, model: schema.update(
154 {"properties": {k if k != "$ref" else "ref": v for k, v in schema.get("properties", {}).items()}}
155 ),
156 )
158 value: str # User ID
159 # Use 'reference' as field name, serialize as '$ref' for SCIM compliance
160 reference: str | None = Field(None, serialization_alias="$ref", validation_alias="$ref")
161 display: str | None = None
162 type: str | None = "User"
165class SCIMGroup(BaseModel):
166 """SCIM 2.0 Group Resource"""
168 schemas: list[str] = Field(default=[SCIM_GROUP_SCHEMA])
169 id: str | None = None
170 displayName: str
171 members: list[SCIMMember] = []
172 meta: dict[str, Any] | None = None
174 @field_validator("schemas")
175 @classmethod
176 def validate_schemas(cls, v: list[str]) -> list[str]:
177 """Validate that required schemas are present"""
178 if SCIM_GROUP_SCHEMA not in v:
179 msg = f"Missing required schema: {SCIM_GROUP_SCHEMA}"
180 raise ValueError(msg)
181 return v
184class SCIMPatchOp(str, Enum):
185 """SCIM PATCH operation types"""
187 ADD = "add"
188 REMOVE = "remove"
189 REPLACE = "replace"
192class SCIMPatchOperation(BaseModel):
193 """SCIM PATCH operation"""
195 op: SCIMPatchOp
196 path: str | None = None
197 value: Any = None
200class SCIMPatchRequest(BaseModel):
201 """SCIM PATCH request"""
203 schemas: list[str] = Field(default=["urn:ietf:params:scim:api:messages:2.0:PatchOp"])
204 Operations: list[SCIMPatchOperation]
207class SCIMListResponse(BaseModel):
208 """SCIM List Response"""
210 schemas: list[str] = Field(default=["urn:ietf:params:scim:api:messages:2.0:ListResponse"])
211 totalResults: int
212 startIndex: int = 1
213 itemsPerPage: int
214 Resources: list[Any] # Can be Users or Groups
217class SCIMError(BaseModel):
218 """SCIM Error Response"""
220 schemas: list[str] = Field(default=["urn:ietf:params:scim:api:messages:2.0:Error"])
221 status: int
222 scimType: str | None = None
223 detail: str | None = None
226def validate_scim_user(data: dict[str, Any]) -> SCIMUser:
227 """
228 Validate SCIM user data
230 Args:
231 data: Raw user data
233 Returns:
234 Validated SCIMUser object
236 Raises:
237 ValueError: If validation fails
238 """
239 try:
240 return SCIMUser(**data)
241 except Exception as e:
242 msg = f"Invalid SCIM user data: {e!s}"
243 raise ValueError(msg)
246def validate_scim_group(data: dict[str, Any]) -> SCIMGroup:
247 """
248 Validate SCIM group data
250 Args:
251 data: Raw group data
253 Returns:
254 Validated SCIMGroup object
256 Raises:
257 ValueError: If validation fails
258 """
259 try:
260 return SCIMGroup(**data)
261 except Exception as e:
262 msg = f"Invalid SCIM group data: {e!s}"
263 raise ValueError(msg)
266def user_to_keycloak(scim_user: SCIMUser) -> dict[str, Any]:
267 """
268 Convert SCIM user to Keycloak user representation
270 Args:
271 scim_user: SCIM user object
273 Returns:
274 Keycloak user representation
275 """
276 attributes: dict[str, str] = {}
277 keycloak_user: dict[str, Any] = {
278 "username": scim_user.userName,
279 "enabled": scim_user.active,
280 "emailVerified": False,
281 "attributes": attributes,
282 }
284 # Name
285 if scim_user.name:
286 if scim_user.name.givenName:
287 keycloak_user["firstName"] = scim_user.name.givenName
288 if scim_user.name.familyName:
289 keycloak_user["lastName"] = scim_user.name.familyName
291 # Email (use primary or first)
292 if scim_user.emails:
293 primary_email = next((e for e in scim_user.emails if e.primary), None)
294 email = primary_email or scim_user.emails[0]
295 keycloak_user["email"] = email.value
296 keycloak_user["emailVerified"] = True
298 # Optional attributes
299 if scim_user.displayName:
300 attributes["displayName"] = scim_user.displayName
301 if scim_user.title:
302 attributes["title"] = scim_user.title
303 if scim_user.userType:
304 attributes["userType"] = scim_user.userType
306 # Enterprise extension
307 if scim_user.enterpriseUser:
308 ent = scim_user.enterpriseUser
309 if ent.department:
310 attributes["department"] = ent.department
311 if ent.organization:
312 attributes["organization"] = ent.organization
313 if ent.employeeNumber:
314 attributes["employeeNumber"] = ent.employeeNumber
315 if ent.costCenter:
316 attributes["costCenter"] = ent.costCenter
318 # External ID
319 if scim_user.externalId:
320 attributes["externalId"] = scim_user.externalId
322 return keycloak_user
325def keycloak_to_scim_user(keycloak_user: dict[str, Any]) -> SCIMUser:
326 """
327 Convert Keycloak user to SCIM user representation
329 Args:
330 keycloak_user: Keycloak user object
332 Returns:
333 SCIM user object
334 """
335 attributes = keycloak_user.get("attributes", {})
337 # Build name
338 name = SCIMName(
339 givenName=keycloak_user.get("firstName"),
340 familyName=keycloak_user.get("lastName"),
341 )
343 # Build emails
344 emails = []
345 if keycloak_user.get("email"): 345 ↛ 355line 345 didn't jump to line 355 because the condition on line 345 was always true
346 emails.append(
347 SCIMEmail(
348 value=keycloak_user["email"],
349 primary=True,
350 type="work",
351 )
352 )
354 # Build enterprise extension if attributes present
355 enterprise_user = None
356 if any(k in attributes for k in ["department", "organization", "employeeNumber"]): 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true
357 enterprise_user = SCIMEnterpriseUser(
358 department=attributes.get("department"),
359 organization=attributes.get("organization"),
360 employeeNumber=attributes.get("employeeNumber"),
361 costCenter=attributes.get("costCenter"),
362 )
364 schemas = [SCIM_USER_SCHEMA]
365 if enterprise_user: 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true
366 schemas.append(SCIM_ENTERPRISE_USER_SCHEMA)
368 user_dict = {
369 "schemas": schemas,
370 "id": keycloak_user["id"],
371 "userName": keycloak_user["username"],
372 "name": name,
373 "displayName": attributes.get("displayName"),
374 "title": attributes.get("title"),
375 "active": keycloak_user.get("enabled", True),
376 "emails": emails,
377 "externalId": attributes.get("externalId"),
378 "meta": {
379 "resourceType": "User",
380 "created": keycloak_user.get("createdTimestamp"),
381 "lastModified": keycloak_user.get("createdTimestamp"), # Keycloak doesn't track modification time
382 },
383 }
385 if enterprise_user: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true
386 user_dict["urn:ietf:params:scim:schemas:extension:enterprise:2.0:User"] = enterprise_user
388 return SCIMUser(**user_dict)