Coverage for src / mcp_server_langgraph / scim / schema.py: 61%

181 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2SCIM 2.0 Schema Validation 

3 

4Implements SCIM 2.0 schema validation according to RFC 7643. 

5Validates User and Group resources against SCIM Core Schema and Enterprise Extension. 

6 

7See ADR-0038 for SCIM implementation approach. 

8 

9References: 

10- RFC 7643: SCIM Core Schema 

11- RFC 7644: SCIM Protocol 

12""" 

13 

14from enum import Enum 

15from typing import Any 

16 

17from pydantic import BaseModel, ConfigDict, Field, field_validator 

18 

19# SCIM Schema URNs 

20SCIM_USER_SCHEMA = "urn:ietf:params:scim:schemas:core:2.0:User" 

21SCIM_GROUP_SCHEMA = "urn:ietf:params:scim:schemas:core:2.0:Group" 

22SCIM_ENTERPRISE_USER_SCHEMA = "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" 

23 

24 

25class SCIMEmail(BaseModel): 

26 """SCIM email address""" 

27 

28 value: str 

29 type: str | None = "work" # work, home, other 

30 primary: bool = False 

31 

32 

33class SCIMName(BaseModel): 

34 """SCIM user name""" 

35 

36 formatted: str | None = None 

37 familyName: str | None = None 

38 givenName: str | None = None 

39 middleName: str | None = None 

40 honorificPrefix: str | None = None 

41 honorificSuffix: str | None = None 

42 

43 

44class SCIMPhoneNumber(BaseModel): 

45 """SCIM phone number""" 

46 

47 value: str 

48 type: str | None = "work" 

49 primary: bool = False 

50 

51 

52class SCIMAddress(BaseModel): 

53 """SCIM address""" 

54 

55 formatted: str | None = None 

56 streetAddress: str | None = None 

57 locality: str | None = None 

58 region: str | None = None 

59 postalCode: str | None = None 

60 country: str | None = None 

61 type: str | None = "work" 

62 primary: bool = False 

63 

64 

65class SCIMGroupMembership(BaseModel): 

66 """SCIM group membership""" 

67 

68 model_config = ConfigDict( 

69 populate_by_name=True, 

70 # Fix JSON schema to avoid $ref conflicts 

71 json_schema_extra=lambda schema, model: schema.update( 

72 {"properties": {k if k != "$ref" else "ref": v for k, v in schema.get("properties", {}).items()}} 

73 ), 

74 ) 

75 

76 value: str # Group ID 

77 # Use 'reference' as field name, serialize as '$ref' for SCIM compliance 

78 reference: str | None = Field(None, serialization_alias="$ref", validation_alias="$ref") 

79 display: str | None = None 

80 type: str | None = "direct" 

81 

82 

83class SCIMEnterpriseUser(BaseModel): 

84 """SCIM Enterprise User Extension (RFC 7643 Section 4.3)""" 

85 

86 employeeNumber: str | None = None 

87 costCenter: str | None = None 

88 organization: str | None = None 

89 division: str | None = None 

90 department: str | None = None 

91 manager: dict[str, str] | None = None # {value: manager_id, ref: url, displayName: name} 

92 

93 

94class SCIMUser(BaseModel): 

95 """ 

96 SCIM 2.0 User Resource 

97 

98 Core schema with optional Enterprise extension. 

99 """ 

100 

101 schemas: list[str] = Field(default=[SCIM_USER_SCHEMA]) 

102 id: str | None = None 

103 externalId: str | None = None 

104 userName: str 

105 name: SCIMName | None = None 

106 displayName: str | None = None 

107 nickName: str | None = None 

108 profileUrl: str | None = None 

109 title: str | None = None 

110 userType: str | None = None 

111 preferredLanguage: str | None = None 

112 locale: str | None = None 

113 timezone: str | None = None 

114 active: bool = True 

115 password: str | None = None 

116 emails: list[SCIMEmail] = [] 

117 phoneNumbers: list[SCIMPhoneNumber] = [] 

118 addresses: list[SCIMAddress] = [] 

119 groups: list[SCIMGroupMembership] = [] 

120 

121 # Meta 

122 meta: dict[str, Any] | None = None 

123 

124 # Enterprise extension 

125 enterpriseUser: SCIMEnterpriseUser | None = Field(None, alias="urn:ietf:params:scim:schemas:extension:enterprise:2.0:User") 

126 

127 @field_validator("schemas") 

128 @classmethod 

129 def validate_schemas(cls, v: list[str]) -> list[str]: 

130 """Validate that required schemas are present""" 

131 if SCIM_USER_SCHEMA not in v: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 msg = f"Missing required schema: {SCIM_USER_SCHEMA}" 

133 raise ValueError(msg) 

134 return v 

135 

136 @field_validator("emails") 

137 @classmethod 

138 def validate_primary_email(cls, v: list[SCIMEmail]) -> list[SCIMEmail]: 

139 """Ensure at most one primary email""" 

140 primary_count = sum(1 for email in v if email.primary) 

141 if primary_count > 1: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 msg = "Only one email can be marked as primary" 

143 raise ValueError(msg) 

144 return v 

145 

146 

147class SCIMMember(BaseModel): 

148 """SCIM group member""" 

149 

150 model_config = ConfigDict( 

151 populate_by_name=True, 

152 # Fix JSON schema to avoid $ref conflicts 

153 json_schema_extra=lambda schema, model: schema.update( 

154 {"properties": {k if k != "$ref" else "ref": v for k, v in schema.get("properties", {}).items()}} 

155 ), 

156 ) 

157 

158 value: str # User ID 

159 # Use 'reference' as field name, serialize as '$ref' for SCIM compliance 

160 reference: str | None = Field(None, serialization_alias="$ref", validation_alias="$ref") 

161 display: str | None = None 

162 type: str | None = "User" 

163 

164 

165class SCIMGroup(BaseModel): 

166 """SCIM 2.0 Group Resource""" 

167 

168 schemas: list[str] = Field(default=[SCIM_GROUP_SCHEMA]) 

169 id: str | None = None 

170 displayName: str 

171 members: list[SCIMMember] = [] 

172 meta: dict[str, Any] | None = None 

173 

174 @field_validator("schemas") 

175 @classmethod 

176 def validate_schemas(cls, v: list[str]) -> list[str]: 

177 """Validate that required schemas are present""" 

178 if SCIM_GROUP_SCHEMA not in v: 

179 msg = f"Missing required schema: {SCIM_GROUP_SCHEMA}" 

180 raise ValueError(msg) 

181 return v 

182 

183 

184class SCIMPatchOp(str, Enum): 

185 """SCIM PATCH operation types""" 

186 

187 ADD = "add" 

188 REMOVE = "remove" 

189 REPLACE = "replace" 

190 

191 

192class SCIMPatchOperation(BaseModel): 

193 """SCIM PATCH operation""" 

194 

195 op: SCIMPatchOp 

196 path: str | None = None 

197 value: Any = None 

198 

199 

200class SCIMPatchRequest(BaseModel): 

201 """SCIM PATCH request""" 

202 

203 schemas: list[str] = Field(default=["urn:ietf:params:scim:api:messages:2.0:PatchOp"]) 

204 Operations: list[SCIMPatchOperation] 

205 

206 

207class SCIMListResponse(BaseModel): 

208 """SCIM List Response""" 

209 

210 schemas: list[str] = Field(default=["urn:ietf:params:scim:api:messages:2.0:ListResponse"]) 

211 totalResults: int 

212 startIndex: int = 1 

213 itemsPerPage: int 

214 Resources: list[Any] # Can be Users or Groups 

215 

216 

217class SCIMError(BaseModel): 

218 """SCIM Error Response""" 

219 

220 schemas: list[str] = Field(default=["urn:ietf:params:scim:api:messages:2.0:Error"]) 

221 status: int 

222 scimType: str | None = None 

223 detail: str | None = None 

224 

225 

226def validate_scim_user(data: dict[str, Any]) -> SCIMUser: 

227 """ 

228 Validate SCIM user data 

229 

230 Args: 

231 data: Raw user data 

232 

233 Returns: 

234 Validated SCIMUser object 

235 

236 Raises: 

237 ValueError: If validation fails 

238 """ 

239 try: 

240 return SCIMUser(**data) 

241 except Exception as e: 

242 msg = f"Invalid SCIM user data: {e!s}" 

243 raise ValueError(msg) 

244 

245 

246def validate_scim_group(data: dict[str, Any]) -> SCIMGroup: 

247 """ 

248 Validate SCIM group data 

249 

250 Args: 

251 data: Raw group data 

252 

253 Returns: 

254 Validated SCIMGroup object 

255 

256 Raises: 

257 ValueError: If validation fails 

258 """ 

259 try: 

260 return SCIMGroup(**data) 

261 except Exception as e: 

262 msg = f"Invalid SCIM group data: {e!s}" 

263 raise ValueError(msg) 

264 

265 

266def user_to_keycloak(scim_user: SCIMUser) -> dict[str, Any]: 

267 """ 

268 Convert SCIM user to Keycloak user representation 

269 

270 Args: 

271 scim_user: SCIM user object 

272 

273 Returns: 

274 Keycloak user representation 

275 """ 

276 attributes: dict[str, str] = {} 

277 keycloak_user: dict[str, Any] = { 

278 "username": scim_user.userName, 

279 "enabled": scim_user.active, 

280 "emailVerified": False, 

281 "attributes": attributes, 

282 } 

283 

284 # Name 

285 if scim_user.name: 

286 if scim_user.name.givenName: 

287 keycloak_user["firstName"] = scim_user.name.givenName 

288 if scim_user.name.familyName: 

289 keycloak_user["lastName"] = scim_user.name.familyName 

290 

291 # Email (use primary or first) 

292 if scim_user.emails: 

293 primary_email = next((e for e in scim_user.emails if e.primary), None) 

294 email = primary_email or scim_user.emails[0] 

295 keycloak_user["email"] = email.value 

296 keycloak_user["emailVerified"] = True 

297 

298 # Optional attributes 

299 if scim_user.displayName: 

300 attributes["displayName"] = scim_user.displayName 

301 if scim_user.title: 

302 attributes["title"] = scim_user.title 

303 if scim_user.userType: 

304 attributes["userType"] = scim_user.userType 

305 

306 # Enterprise extension 

307 if scim_user.enterpriseUser: 

308 ent = scim_user.enterpriseUser 

309 if ent.department: 

310 attributes["department"] = ent.department 

311 if ent.organization: 

312 attributes["organization"] = ent.organization 

313 if ent.employeeNumber: 

314 attributes["employeeNumber"] = ent.employeeNumber 

315 if ent.costCenter: 

316 attributes["costCenter"] = ent.costCenter 

317 

318 # External ID 

319 if scim_user.externalId: 

320 attributes["externalId"] = scim_user.externalId 

321 

322 return keycloak_user 

323 

324 

325def keycloak_to_scim_user(keycloak_user: dict[str, Any]) -> SCIMUser: 

326 """ 

327 Convert Keycloak user to SCIM user representation 

328 

329 Args: 

330 keycloak_user: Keycloak user object 

331 

332 Returns: 

333 SCIM user object 

334 """ 

335 attributes = keycloak_user.get("attributes", {}) 

336 

337 # Build name 

338 name = SCIMName( 

339 givenName=keycloak_user.get("firstName"), 

340 familyName=keycloak_user.get("lastName"), 

341 ) 

342 

343 # Build emails 

344 emails = [] 

345 if keycloak_user.get("email"): 345 ↛ 355line 345 didn't jump to line 355 because the condition on line 345 was always true

346 emails.append( 

347 SCIMEmail( 

348 value=keycloak_user["email"], 

349 primary=True, 

350 type="work", 

351 ) 

352 ) 

353 

354 # Build enterprise extension if attributes present 

355 enterprise_user = None 

356 if any(k in attributes for k in ["department", "organization", "employeeNumber"]): 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true

357 enterprise_user = SCIMEnterpriseUser( 

358 department=attributes.get("department"), 

359 organization=attributes.get("organization"), 

360 employeeNumber=attributes.get("employeeNumber"), 

361 costCenter=attributes.get("costCenter"), 

362 ) 

363 

364 schemas = [SCIM_USER_SCHEMA] 

365 if enterprise_user: 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true

366 schemas.append(SCIM_ENTERPRISE_USER_SCHEMA) 

367 

368 user_dict = { 

369 "schemas": schemas, 

370 "id": keycloak_user["id"], 

371 "userName": keycloak_user["username"], 

372 "name": name, 

373 "displayName": attributes.get("displayName"), 

374 "title": attributes.get("title"), 

375 "active": keycloak_user.get("enabled", True), 

376 "emails": emails, 

377 "externalId": attributes.get("externalId"), 

378 "meta": { 

379 "resourceType": "User", 

380 "created": keycloak_user.get("createdTimestamp"), 

381 "lastModified": keycloak_user.get("createdTimestamp"), # Keycloak doesn't track modification time 

382 }, 

383 } 

384 

385 if enterprise_user: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 user_dict["urn:ietf:params:scim:schemas:extension:enterprise:2.0:User"] = enterprise_user 

387 

388 return SCIMUser(**user_dict)