Coverage for src / mcp_server_langgraph / llm / verifier.py: 92%

159 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Work Verification Component for Agentic Workflows 

3 

4Implements Anthropic's "Verify Work" step in the agent loop: 

5- LLM-as-judge: Use another LLM to evaluate outputs 

6- Rules-based validation: Check against explicit criteria 

7- Iterative refinement: Provide feedback for improvement 

8 

9References: 

10- https://www.anthropic.com/engineering/building-agents-with-the-claude-agent-sdk 

11""" 

12 

13from enum import Enum 

14from typing import Any, Literal 

15 

16from langchain_core.messages import BaseMessage, HumanMessage 

17from pydantic import BaseModel, Field 

18 

19from mcp_server_langgraph.llm.factory import create_verification_model 

20from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer 

21import contextlib 

22 

23 

24class VerificationCriterion(str, Enum): 

25 """Criteria for evaluating agent outputs.""" 

26 

27 ACCURACY = "accuracy" # Is the information correct? 

28 COMPLETENESS = "completeness" # Does it fully answer the question? 

29 CLARITY = "clarity" # Is it clear and well-structured? 

30 RELEVANCE = "relevance" # Is it relevant to the user's request? 

31 SAFETY = "safety" # Is it safe and appropriate? 

32 SOURCES = "sources" # Are sources cited when appropriate? 

33 

34 

35class VerificationResult(BaseModel): 

36 """Result of output verification.""" 

37 

38 passed: bool = Field(description="Whether verification passed") 

39 overall_score: float = Field(ge=0.0, le=1.0, description="Overall quality score (0-1)") 

40 criterion_scores: dict[str, float] = Field(default_factory=dict, description="Scores for individual criteria (0-1)") 

41 feedback: str = Field(description="Actionable feedback for improvement") 

42 requires_refinement: bool = Field(default=False, description="Whether output should be refined") 

43 critical_issues: list[str] = Field(default_factory=list, description="Critical issues that must be fixed") 

44 suggestions: list[str] = Field(default_factory=list, description="Optional suggestions for improvement") 

45 

46 

47class OutputVerifier: 

48 """ 

49 Verifies agent outputs using LLM-as-judge pattern. 

50 

51 Implements "Work Verification" from Anthropic's Agent SDK guide: 

52 - Evaluates outputs against quality criteria 

53 - Provides actionable feedback 

54 - Supports iterative refinement 

55 """ 

56 

57 def __init__( # type: ignore[no-untyped-def] 

58 self, 

59 criteria: list[VerificationCriterion] | None = None, 

60 quality_threshold: float = 0.7, 

61 settings=None, 

62 ): 

63 """ 

64 Initialize output verifier. 

65 

66 Args: 

67 criteria: Criteria to verify (default: all) 

68 quality_threshold: Minimum score to pass (default: 0.7) 

69 settings: Application settings (if None, uses global settings) 

70 """ 

71 self.criteria = criteria or list(VerificationCriterion) 

72 self.quality_threshold = quality_threshold 

73 

74 # Initialize dedicated LLM for verification (LLM-as-judge) 

75 if settings is None: 

76 from mcp_server_langgraph.core.config import settings as global_settings 

77 

78 settings = global_settings 

79 

80 self.llm = create_verification_model(settings) 

81 

82 logger.info( 

83 "OutputVerifier initialized", 

84 extra={ 

85 "criteria": [c.value for c in self.criteria], 

86 "quality_threshold": quality_threshold, 

87 }, 

88 ) 

89 

90 async def verify_response( 

91 self, 

92 response: str, 

93 user_request: str, 

94 conversation_context: list[BaseMessage] | None = None, 

95 verification_mode: Literal["standard", "strict", "lenient"] = "standard", 

96 ) -> VerificationResult: 

97 """ 

98 Verify agent response quality using LLM-as-judge. 

99 

100 Args: 

101 response: Agent's response to verify 

102 user_request: Original user request 

103 conversation_context: Conversation history for context 

104 verification_mode: Strictness level (default: standard) 

105 

106 Returns: 

107 VerificationResult with scores and feedback 

108 """ 

109 with tracer.start_as_current_span("verifier.verify_response") as span: 

110 span.set_attribute("response.length", len(response)) 

111 span.set_attribute("verification.mode", verification_mode) 

112 

113 # Adjust threshold based on mode 

114 threshold = self._get_threshold_for_mode(verification_mode) 

115 

116 # Build verification prompt using XML structure 

117 verification_prompt = self._build_verification_prompt(response, user_request, conversation_context) 

118 

119 try: 

120 # Get LLM judgment 

121 # BUGFIX: Wrap prompt in HumanMessage to avoid string-to-character-list iteration 

122 llm_response = await self.llm.ainvoke([HumanMessage(content=verification_prompt)]) 

123 

124 # Get content and ensure it's a string 

125 content = llm_response.content if hasattr(llm_response, "content") else str(llm_response) 

126 judgment = str(content) if not isinstance(content, str) else content 

127 

128 # Parse judgment into structured result 

129 result = self._parse_verification_judgment(judgment, threshold) 

130 

131 span.set_attribute("verification.passed", result.passed) 

132 span.set_attribute("verification.overall_score", result.overall_score) 

133 

134 metrics.successful_calls.add(1, {"operation": "verify_response", "passed": str(result.passed).lower()}) 

135 

136 logger.info( 

137 "Response verified", 

138 extra={ 

139 "passed": result.passed, 

140 "overall_score": result.overall_score, 

141 "requires_refinement": result.requires_refinement, 

142 "critical_issues_count": len(result.critical_issues), 

143 }, 

144 ) 

145 

146 return result 

147 

148 except Exception as e: 

149 logger.error(f"Verification failed: {e}", exc_info=True) 

150 metrics.failed_calls.add(1, {"operation": "verify_response"}) 

151 span.record_exception(e) 

152 

153 # Fallback: Return permissive result 

154 return VerificationResult( 

155 passed=True, # Fail-open on verification errors 

156 overall_score=0.5, 

157 feedback=f"Verification system unavailable. Response accepted by default. Error: {e!s}", 

158 requires_refinement=False, 

159 ) 

160 

161 def _build_verification_prompt( 

162 self, response: str, user_request: str, conversation_context: list[BaseMessage] | None = None 

163 ) -> str: 

164 """ 

165 Build verification prompt using XML structure (Anthropic best practice). 

166 

167 Args: 

168 response: Response to verify 

169 user_request: Original request 

170 conversation_context: Conversation history 

171 

172 Returns: 

173 Structured verification prompt 

174 """ 

175 # Format conversation context if provided 

176 context_section = "" 

177 if conversation_context: 

178 context_text = "\n".join([f"{self._get_role(msg)}: {msg.content[:200]}..." for msg in conversation_context[-3:]]) 

179 context_section = f"""<conversation_context> 

180{context_text} 

181</conversation_context> 

182 

183""" 

184 

185 # Build criteria section 

186 criteria_descriptions = { 

187 VerificationCriterion.ACCURACY: "Is the information factually correct?", 

188 VerificationCriterion.COMPLETENESS: "Does it fully address all aspects of the user's request?", 

189 VerificationCriterion.CLARITY: "Is it clear, well-organized, and easy to understand?", 

190 VerificationCriterion.RELEVANCE: "Is it directly relevant to what the user asked?", 

191 VerificationCriterion.SAFETY: "Is it safe, appropriate, and free from harmful content?", 

192 VerificationCriterion.SOURCES: "Are sources cited when making factual claims?", 

193 } 

194 

195 criteria_text = "\n".join([f"- {criterion.value}: {criteria_descriptions[criterion]}" for criterion in self.criteria]) 

196 

197 prompt = f"""<task> 

198Evaluate the quality of an AI assistant's response to a user request. 

199</task> 

200 

201<role> 

202You are a quality evaluator for AI assistant responses. 

203Your job is to provide objective, constructive feedback. 

204</role> 

205 

206{context_section}<user_request> 

207{user_request} 

208</user_request> 

209 

210<assistant_response> 

211{response} 

212</assistant_response> 

213 

214<evaluation_criteria> 

215Evaluate the response on these criteria (score each 0.0-1.0): 

216{criteria_text} 

217</evaluation_criteria> 

218 

219<instructions> 

2201. Evaluate each criterion independently with a score from 0.0 to 1.0 

2212. Calculate an overall score (average of all criteria) 

2223. Identify any critical issues that must be fixed 

2234. Provide actionable feedback for improvement 

2245. Suggest whether the response requires refinement 

225</instructions> 

226 

227<output_format> 

228Provide your evaluation in this exact format: 

229 

230SCORES: 

231- accuracy: [0.0-1.0] 

232- completeness: [0.0-1.0] 

233- clarity: [0.0-1.0] 

234- relevance: [0.0-1.0] 

235- safety: [0.0-1.0] 

236- sources: [0.0-1.0] 

237 

238OVERALL: [0.0-1.0] 

239 

240CRITICAL_ISSUES: 

241- [Issue 1, if any] 

242- [Issue 2, if any] 

243 

244SUGGESTIONS: 

245- [Suggestion 1] 

246- [Suggestion 2] 

247 

248REQUIRES_REFINEMENT: [yes/no] 

249 

250FEEDBACK: 

251[Detailed, actionable feedback in 2-3 sentences] 

252</output_format>""" 

253 

254 return prompt 

255 

256 def _parse_verification_judgment(self, judgment: str, threshold: float) -> VerificationResult: # noqa: C901 

257 """ 

258 Parse LLM judgment into structured VerificationResult. 

259 

260 Args: 

261 judgment: Raw LLM judgment text 

262 threshold: Quality threshold for passing 

263 

264 Returns: 

265 Structured VerificationResult 

266 """ 

267 # Extract scores using simple parsing (can be enhanced with regex) 

268 criterion_scores = {} 

269 overall_score = None # Will be set from OVERALL or calculated 

270 critical_issues = [] 

271 suggestions = [] 

272 requires_refinement = False 

273 feedback = "" 

274 

275 lines = judgment.split("\n") 

276 current_section = None 

277 

278 for line in lines: 

279 line = line.strip() 

280 

281 if line.startswith("SCORES:"): 

282 current_section = "scores" 

283 elif line.startswith("OVERALL:"): 

284 current_section = "overall" 

285 with contextlib.suppress(ValueError, IndexError): 

286 overall_score = float(line.split(":")[1].strip()) 

287 elif line.startswith("CRITICAL_ISSUES:"): 

288 current_section = "critical" 

289 elif line.startswith("SUGGESTIONS:"): 

290 current_section = "suggestions" 

291 elif line.startswith("REQUIRES_REFINEMENT:"): 

292 current_section = "refinement" 

293 requires_refinement = "yes" in line.lower() 

294 elif line.startswith("FEEDBACK:"): 

295 current_section = "feedback" 

296 elif current_section == "scores" and ":" in line: 

297 try: 

298 criterion, score = line.split(":", 1) 

299 criterion = criterion.strip(" -") 

300 score = float(score.strip()) # type: ignore[assignment] 

301 criterion_scores[criterion] = score 

302 except (ValueError, IndexError): 

303 pass 

304 elif current_section == "critical" and line.startswith("-"): 

305 issue = line[1:].strip() 

306 # Filter out "None" or empty issues 

307 if issue and issue.lower() not in ["none", "n/a", "na"]: 

308 critical_issues.append(issue) 

309 elif current_section == "suggestions" and line.startswith("-"): 

310 suggestion = line[1:].strip() 

311 if suggestion and suggestion.lower() not in ["none", "n/a", "na"]: 311 ↛ 278line 311 didn't jump to line 278 because the condition on line 311 was always true

312 suggestions.append(suggestion) 

313 elif current_section == "feedback" and line: 

314 feedback += line + " " 

315 

316 feedback = feedback.strip() or "No specific feedback provided." 

317 

318 # Calculate overall score from criteria if not explicitly provided in OVERALL 

319 if overall_score is None: 

320 if criterion_scores: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 overall_score = sum(criterion_scores.values()) / len(criterion_scores) # type: ignore[arg-type] 

322 logger.info("Calculated overall score from criterion scores") 

323 else: 

324 overall_score = 0.5 # Default fallback 

325 logger.warning("Failed to parse both overall score and criterion scores, using default") 

326 

327 passed = overall_score >= threshold and len(critical_issues) == 0 

328 

329 return VerificationResult( 

330 passed=passed, 

331 overall_score=overall_score, 

332 criterion_scores=criterion_scores, # type: ignore[arg-type] 

333 feedback=feedback, 

334 requires_refinement=requires_refinement or not passed, 

335 critical_issues=critical_issues, 

336 suggestions=suggestions, 

337 ) 

338 

339 def _get_threshold_for_mode(self, mode: Literal["standard", "strict", "lenient"]) -> float: 

340 """Get quality threshold based on verification mode.""" 

341 thresholds = { 

342 "strict": self.quality_threshold + 0.1, 

343 "standard": self.quality_threshold, 

344 "lenient": self.quality_threshold - 0.1, 

345 } 

346 # Round to avoid floating point precision issues in tests 

347 threshold = thresholds.get(mode, self.quality_threshold) 

348 return round(max(0.0, min(1.0, threshold)), 2) 

349 

350 def _get_role(self, message: BaseMessage) -> str: 

351 """Get role label for message.""" 

352 from langchain_core.messages import AIMessage, HumanMessage, SystemMessage 

353 

354 if isinstance(message, HumanMessage): 

355 return "User" 

356 elif isinstance(message, AIMessage): 356 ↛ 358line 356 didn't jump to line 358 because the condition on line 356 was always true

357 return "Assistant" 

358 elif isinstance(message, SystemMessage): 

359 return "System" 

360 else: 

361 return "Message" 

362 

363 async def verify_with_rules(self, response: str, rules: dict[str, Any]) -> VerificationResult: 

364 """ 

365 Verify response against explicit rules (rules-based validation). 

366 

367 Alternative to LLM-as-judge for deterministic checks. 

368 

369 Args: 

370 response: Response to verify 

371 rules: Dictionary of rules to check 

372 

373 Returns: 

374 VerificationResult based on rule compliance 

375 

376 Example rules: 

377 { 

378 "min_length": 50, 

379 "max_length": 2000, 

380 "required_keywords": ["example", "explanation"], 

381 "forbidden_keywords": ["sorry", "I don't know"], 

382 "must_include_code": True 

383 } 

384 """ 

385 issues = [] 

386 suggestions = [] 

387 criterion_scores = {} 

388 

389 # Check length constraints 

390 if "min_length" in rules and len(response) < rules["min_length"]: 

391 issues.append(f"Response too short (minimum: {rules['min_length']} characters)") 

392 criterion_scores["completeness"] = 0.3 

393 

394 if "max_length" in rules and len(response) > rules["max_length"]: 394 ↛ 395line 394 didn't jump to line 395 because the condition on line 394 was never true

395 suggestions.append(f"Response could be more concise (maximum: {rules['max_length']} characters)") 

396 criterion_scores["clarity"] = 0.7 

397 

398 # Check required keywords 

399 if "required_keywords" in rules: 

400 missing = [kw for kw in rules["required_keywords"] if kw.lower() not in response.lower()] 

401 if missing: 

402 issues.append(f"Missing required keywords: {', '.join(missing)}") 

403 criterion_scores["completeness"] = 0.5 

404 

405 # Check forbidden keywords 

406 if "forbidden_keywords" in rules: 

407 found = [kw for kw in rules["forbidden_keywords"] if kw.lower() in response.lower()] 

408 if found: 408 ↛ 413line 408 didn't jump to line 413 because the condition on line 408 was always true

409 issues.append(f"Contains forbidden keywords: {', '.join(found)}") 

410 criterion_scores["quality"] = 0.4 

411 

412 # Check code inclusion 

413 if rules.get("must_include_code") and "```" not in response: 

414 issues.append("Response must include code examples") 

415 criterion_scores["completeness"] = 0.6 

416 

417 # Calculate overall score 

418 overall_score = ( 

419 1.0 if not issues else (sum(criterion_scores.values()) / len(criterion_scores) if criterion_scores else 0.5) 

420 ) 

421 passed = len(issues) == 0 

422 

423 feedback = "All rule checks passed." if passed else f"Failed {len(issues)} rule check(s). " + "; ".join(issues) 

424 

425 logger.info( 

426 "Rules-based verification completed", 

427 extra={ 

428 "passed": passed, 

429 "issues_count": len(issues), 

430 "rules_checked": len(rules), 

431 }, 

432 ) 

433 

434 return VerificationResult( 

435 passed=passed, 

436 overall_score=overall_score, 

437 criterion_scores=criterion_scores, 

438 feedback=feedback, 

439 requires_refinement=not passed, 

440 critical_issues=issues, 

441 suggestions=suggestions, 

442 ) 

443 

444 

445# Convenience function for easy import 

446async def verify_output( 

447 response: str, 

448 user_request: str, 

449 conversation_context: list[BaseMessage] | None = None, 

450 verifier: OutputVerifier | None = None, 

451) -> VerificationResult: 

452 """ 

453 Verify agent output (convenience function). 

454 

455 Args: 

456 response: Response to verify 

457 user_request: Original user request 

458 conversation_context: Conversation history 

459 verifier: OutputVerifier instance (creates new if None) 

460 

461 Returns: 

462 VerificationResult 

463 """ 

464 if verifier is None: 464 ↛ 467line 464 didn't jump to line 467 because the condition on line 464 was always true

465 verifier = OutputVerifier() 

466 

467 return await verifier.verify_response(response, user_request, conversation_context)