Coverage for src / mcp_server_langgraph / core / context_manager.py: 97%

149 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Context Management for Agentic Workflows 

3 

4Implements Anthropic's context engineering best practices: 

5- Compaction: Summarize conversation history when approaching limits 

6- Just-in-time loading: Load context dynamically as needed 

7- Token efficiency: Minimize context while maximizing signal 

8 

9References: 

10- https://www.anthropic.com/engineering/effective-context-engineering-for-ai-agents 

11""" 

12 

13from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage 

14from pydantic import BaseModel, Field 

15 

16from mcp_server_langgraph.llm.factory import create_summarization_model 

17from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer 

18from mcp_server_langgraph.utils.response_optimizer import count_tokens 

19 

20 

21class CompactionResult(BaseModel): 

22 """Result of conversation compaction operation.""" 

23 

24 compacted_messages: list[BaseMessage] = Field(description="Compacted conversation messages") 

25 original_token_count: int = Field(description="Token count before compaction") 

26 compacted_token_count: int = Field(description="Token count after compaction") 

27 messages_summarized: int = Field(description="Number of messages summarized") 

28 compression_ratio: float = Field(ge=0.0, le=1.0, description="Compression achieved (1.0 = no compression)") 

29 

30 

31class ContextManager: 

32 """ 

33 Manages conversation context following Anthropic's best practices. 

34 

35 Key strategies: 

36 1. Compaction: Summarize old messages when approaching token limits 

37 2. Structured note-taking: Maintain persistent architectural decisions 

38 3. Progressive disclosure: Keep recent messages, summarize older ones 

39 

40 Implementation follows "Long-Horizon Task Techniques" from Anthropic's guide. 

41 """ 

42 

43 def __init__( # type: ignore[no-untyped-def] 

44 self, 

45 compaction_threshold: int = 8000, 

46 target_after_compaction: int = 4000, 

47 recent_message_count: int = 5, 

48 settings=None, 

49 ): 

50 """ 

51 Initialize context manager. 

52 

53 Args: 

54 compaction_threshold: Token count that triggers compaction (default: 8000) 

55 target_after_compaction: Target token count after compaction (default: 4000) 

56 recent_message_count: Number of recent messages to keep uncompacted (default: 5) 

57 settings: Application settings (if None, uses global settings) 

58 """ 

59 self.compaction_threshold = compaction_threshold 

60 self.target_after_compaction = target_after_compaction 

61 self.recent_message_count = recent_message_count 

62 

63 # Initialize dedicated summarization LLM (lighter/cheaper model) 

64 if settings is None: 

65 from mcp_server_langgraph.core.config import settings as global_settings 

66 

67 settings = global_settings 

68 

69 self.settings = settings 

70 self.llm = create_summarization_model(settings) 

71 logger.info( 

72 "ContextManager initialized", 

73 extra={ 

74 "compaction_threshold": compaction_threshold, 

75 "target_after_compaction": target_after_compaction, 

76 "recent_message_count": recent_message_count, 

77 "model": settings.model_name, 

78 }, 

79 ) 

80 

81 def needs_compaction(self, messages: list[BaseMessage]) -> bool: 

82 """ 

83 Check if conversation needs compaction. 

84 

85 Args: 

86 messages: Conversation messages 

87 

88 Returns: 

89 True if token count exceeds threshold 

90 """ 

91 # Use model-aware token counting 

92 model_name = self.settings.model_name 

93 total_tokens = sum(count_tokens(self._message_to_text(msg), model=model_name) for msg in messages) 

94 

95 with tracer.start_as_current_span("context.check_compaction") as span: 

96 span.set_attribute("message.count", len(messages)) 

97 span.set_attribute("token.count", total_tokens) 

98 span.set_attribute("needs.compaction", total_tokens > self.compaction_threshold) 

99 

100 if total_tokens > self.compaction_threshold: 

101 logger.info( 

102 "Compaction needed", 

103 extra={ 

104 "total_tokens": total_tokens, 

105 "threshold": self.compaction_threshold, 

106 "message_count": len(messages), 

107 }, 

108 ) 

109 return True 

110 

111 return False 

112 

113 async def compact_conversation(self, messages: list[BaseMessage], preserve_system: bool = True) -> CompactionResult: 

114 """ 

115 Compact conversation by summarizing older messages. 

116 

117 Strategy (Anthropic's "Compaction" technique): 

118 1. Keep system messages (architectural context) 

119 2. Keep recent N messages (working context) 

120 3. Summarize older messages (historical context) 

121 

122 Args: 

123 messages: Full conversation history 

124 preserve_system: Keep system messages intact (default: True) 

125 

126 Returns: 

127 CompactionResult with compacted messages and metrics 

128 """ 

129 with tracer.start_as_current_span("context.compact") as span: 

130 # Use model-aware token counting 

131 model_name = self.settings.model_name 

132 original_tokens = sum(count_tokens(self._message_to_text(msg), model=model_name) for msg in messages) 

133 

134 span.set_attribute("message.count.original", len(messages)) 

135 span.set_attribute("token.count.original", original_tokens) 

136 

137 # Separate messages by type 

138 system_messages = [msg for msg in messages if isinstance(msg, SystemMessage)] if preserve_system else [] 

139 other_messages = [msg for msg in messages if not isinstance(msg, SystemMessage)] 

140 

141 # Keep recent messages 

142 recent_messages = other_messages[-self.recent_message_count :] 

143 older_messages = other_messages[: -self.recent_message_count] 

144 

145 if not older_messages: 

146 # Nothing to compact 

147 logger.info("No older messages to compact") 

148 return CompactionResult( 

149 compacted_messages=messages, 

150 original_token_count=original_tokens, 

151 compacted_token_count=original_tokens, 

152 messages_summarized=0, 

153 compression_ratio=1.0, 

154 ) 

155 

156 # Summarize older messages 

157 summary_text = await self._summarize_messages(older_messages) 

158 summary_message = SystemMessage( 

159 content=f"<conversation_summary>\n{summary_text}\n</conversation_summary>\n\n" 

160 f"[Previous {len(older_messages)} messages summarized to preserve context]" 

161 ) 

162 

163 # Reconstruct conversation: system + summary + recent 

164 compacted_messages = system_messages + [summary_message] + recent_messages 

165 

166 compacted_tokens = sum(count_tokens(self._message_to_text(msg), model=model_name) for msg in compacted_messages) 

167 

168 # Calculate compression ratio (clamped to max 1.0) 

169 # In rare cases, summary may be longer than original, so clamp to prevent validation errors 

170 compression_ratio = min(1.0, compacted_tokens / original_tokens) if original_tokens > 0 else 1.0 

171 

172 span.set_attribute("message.count.compacted", len(compacted_messages)) 

173 span.set_attribute("token.count.compacted", compacted_tokens) 

174 span.set_attribute("compression.ratio", compression_ratio) 

175 

176 metrics.successful_calls.add(1, {"operation": "compact_conversation"}) 

177 

178 logger.info( 

179 "Conversation compacted", 

180 extra={ 

181 "original_messages": len(messages), 

182 "compacted_messages": len(compacted_messages), 

183 "original_tokens": original_tokens, 

184 "compacted_tokens": compacted_tokens, 

185 "compression_ratio": compression_ratio, 

186 "messages_summarized": len(older_messages), 

187 }, 

188 ) 

189 

190 return CompactionResult( 

191 compacted_messages=compacted_messages, 

192 original_token_count=original_tokens, 

193 compacted_token_count=compacted_tokens, 

194 messages_summarized=len(older_messages), 

195 compression_ratio=compression_ratio, 

196 ) 

197 

198 async def _summarize_messages(self, messages: list[BaseMessage]) -> str: 

199 """ 

200 Summarize a sequence of messages preserving key information. 

201 

202 Follows Anthropic's guidance: 

203 "Preserve architectural decisions and critical details 

204 while discarding redundant outputs" 

205 

206 Args: 

207 messages: Messages to summarize 

208 

209 Returns: 

210 Summary text preserving key information 

211 """ 

212 with tracer.start_as_current_span("context.summarize"): 

213 # Format conversation for summarization 

214 conversation_text = "\n\n".join([f"{self._get_role_label(msg)}: {msg.content}" for msg in messages]) 

215 

216 # Summarization prompt using XML structure (Anthropic best practice) 

217 summarization_prompt = f"""<task> 

218Summarize the following conversation segment, preserving critical information. 

219</task> 

220 

221<instructions> 

222Focus on: 

2231. Key decisions made 

2242. Important facts or data discovered 

2253. User preferences or requirements 

2264. Action items or next steps 

2275. Any errors or issues encountered 

228 

229Omit: 

230- Redundant greetings or small talk 

231- Repetitive information 

232- Verbose explanations already acted upon 

233</instructions> 

234 

235<conversation_to_summarize> 

236{conversation_text} 

237</conversation_to_summarize> 

238 

239<output_format> 

240Provide a concise summary in 3-5 bullet points. 

241Focus on high-signal information that maintains conversation context. 

242</output_format>""" 

243 

244 try: 

245 # BUGFIX: Wrap prompt in HumanMessage to avoid string-to-character-list iteration 

246 response = await self.llm.ainvoke([HumanMessage(content=summarization_prompt)]) 

247 content = response.content if hasattr(response, "content") else str(response) 

248 summary = str(content) if not isinstance(content, str) else content 

249 

250 logger.info("Messages summarized", extra={"message_count": len(messages), "summary_length": len(summary)}) 

251 

252 return summary 

253 

254 except Exception as e: 

255 logger.error(f"Summarization failed: {e}", exc_info=True) 

256 metrics.failed_calls.add(1, {"operation": "summarize_messages"}) 

257 

258 # Fallback: Simple concatenation with truncation 

259 fallback_summary = f"Previous conversation ({len(messages)} messages): " + conversation_text[:500] + "..." 

260 return fallback_summary 

261 

262 def count_tokens(self, text: str) -> int: 

263 """ 

264 Count tokens in text using LiteLLM model-aware counting. 

265 

266 Args: 

267 text: Text to count tokens for 

268 

269 Returns: 

270 Number of tokens 

271 """ 

272 return count_tokens(text, model=self.settings.model_name) 

273 

274 def _message_to_text(self, message: BaseMessage) -> str: 

275 """Convert message to text for token counting.""" 

276 if hasattr(message, "content"): 276 ↛ 278line 276 didn't jump to line 278 because the condition on line 276 was always true

277 return str(message.content) 

278 return str(message) 

279 

280 def _get_role_label(self, message: BaseMessage) -> str: 

281 """Get role label for message formatting.""" 

282 if isinstance(message, HumanMessage): 

283 return "User" 

284 elif isinstance(message, AIMessage): 

285 return "Assistant" 

286 elif isinstance(message, SystemMessage): 286 ↛ 289line 286 didn't jump to line 289 because the condition on line 286 was always true

287 return "System" 

288 else: 

289 return "Message" 

290 

291 def extract_key_information(self, messages: list[BaseMessage]) -> dict[str, list[str]]: 

292 """ 

293 Extract and categorize key information from conversation. 

294 

295 Implements "Structured Note-Taking" pattern from Anthropic's guide. 

296 This is a rule-based fallback method. 

297 

298 Args: 

299 messages: Conversation messages 

300 

301 Returns: 

302 Dictionary with categorized key information 

303 """ 

304 key_info = { # type: ignore[var-annotated] 

305 "decisions": [], 

306 "requirements": [], 

307 "facts": [], 

308 "action_items": [], 

309 "issues": [], 

310 "preferences": [], 

311 } 

312 

313 # Keyword-based extraction for all 6 categories 

314 # Expanded from 10 to ~35 keywords for better coverage 

315 for msg in messages: 

316 msg_content = msg.content if hasattr(msg, "content") else "" 

317 content = msg_content.lower() if isinstance(msg_content, str) else "" 

318 

319 # Decisions (voting, choosing, selecting) 

320 if any( 

321 keyword in content 

322 for keyword in [ 

323 "decided", 

324 "agreed", 

325 "chose", 

326 "selected", 

327 "picked", 

328 "opted", 

329 "determined", 

330 "concluded", 

331 ] 

332 ) and isinstance(msg_content, str): 

333 key_info["decisions"].append(msg_content[:200]) 

334 

335 # Requirements (obligations, constraints) 

336 if any( 

337 keyword in content 

338 for keyword in [ 

339 "need", 

340 "require", 

341 "must", 

342 "should", 

343 "have to", 

344 "necessary", 

345 "essential", 

346 "mandatory", 

347 ] 

348 ) and isinstance(msg_content, str): 

349 key_info["requirements"].append(msg_content[:200]) 

350 

351 # Facts (statements of truth, data points) 

352 if any( 

353 keyword in content 

354 for keyword in [ 

355 " is ", 

356 " are ", 

357 " was ", 

358 " were ", 

359 "according to", 

360 "version", 

361 "default", 

362 "by default", 

363 "currently", 

364 ] 

365 ) and isinstance(msg_content, str): 

366 key_info["facts"].append(msg_content[:200]) 

367 

368 # Action Items (tasks, TODOs) 

369 if any( 

370 keyword in content 

371 for keyword in [ 

372 "todo", 

373 "to-do", 

374 "please", 

375 "need to", 

376 "should", 

377 "will", 

378 "let's", 

379 "we'll", 

380 "add", 

381 "fix", 

382 "update", 

383 "refactor", 

384 ] 

385 ) and isinstance(msg_content, str): 

386 key_info["action_items"].append(msg_content[:200]) 

387 

388 # Issues (problems, bugs, errors) 

389 if any( 

390 keyword in content 

391 for keyword in [ 

392 "error", 

393 "issue", 

394 "problem", 

395 "failed", 

396 "bug", 

397 "broken", 

398 "crash", 

399 "exception", 

400 ] 

401 ) and isinstance(msg_content, str): 

402 key_info["issues"].append(msg_content[:200]) 

403 

404 # Preferences (likes, dislikes, choices) 

405 if any( 

406 keyword in content 

407 for keyword in [ 

408 "prefer", 

409 "like", 

410 "favorite", 

411 "dislike", 

412 "hate", 

413 "love", 

414 "enjoy", 

415 "rather", 

416 ] 

417 ) and isinstance(msg_content, str): 

418 key_info["preferences"].append(msg_content[:200]) 

419 

420 return key_info 

421 

422 async def extract_key_information_llm(self, messages: list[BaseMessage]) -> dict[str, list[str]]: 

423 """ 

424 Extract and categorize key information using LLM. 

425 

426 Enhanced version of extract_key_information with LLM-based extraction. 

427 Implements Anthropic's "Structured Note-Taking" pattern with 6 categories. 

428 

429 Args: 

430 messages: Conversation messages 

431 

432 Returns: 

433 Dictionary with categorized key information 

434 """ 

435 with tracer.start_as_current_span("context.extract_key_info_llm"): 

436 # Format conversation 

437 conversation_text = "\n\n".join([f"{self._get_role_label(msg)}: {msg.content}" for msg in messages]) 

438 

439 # Extraction prompt with XML structure (Anthropic best practice) 

440 extraction_prompt = f"""<task> 

441Extract and categorize key information from this conversation. 

442</task> 

443 

444<categories> 

4451. **Decisions**: Choices made, agreements reached, directions chosen 

4462. **Requirements**: Needs, must-haves, constraints, specifications 

4473. **Facts**: Important factual information discovered or confirmed 

4484. **Action Items**: Tasks to do, next steps, follow-ups 

4495. **Issues**: Problems encountered, errors, blockers 

4506. **Preferences**: User preferences, settings, customizations 

451</categories> 

452 

453<conversation> 

454{conversation_text} 

455</conversation> 

456 

457<instructions> 

458For each category, list the key items found. 

459If a category has no items, write "None". 

460Be concise (1-2 sentences per item). 

461Focus on information that should be remembered long-term. 

462</instructions> 

463 

464<output_format> 

465DECISIONS: 

466- [Decision 1] 

467- [Decision 2] 

468 

469REQUIREMENTS: 

470- [Requirement 1] 

471 

472FACTS: 

473- [Fact 1] 

474 

475ACTION_ITEMS: 

476- [Item 1] 

477 

478ISSUES: 

479- [Issue 1 or None] 

480 

481PREFERENCES: 

482- [Preference 1 or None] 

483</output_format>""" 

484 

485 try: 

486 # BUGFIX: Wrap prompt in HumanMessage to avoid string-to-character-list iteration 

487 response = await self.llm.ainvoke([HumanMessage(content=extraction_prompt)]) 

488 response_content = response.content if hasattr(response, "content") else str(response) 

489 extraction_text = response_content if isinstance(response_content, str) else str(response_content) 

490 

491 # Parse response 

492 key_info = self._parse_extraction_response(extraction_text) 

493 

494 logger.info( 

495 "Key information extracted with LLM", 

496 extra={ 

497 "decisions": len(key_info.get("decisions", [])), 

498 "requirements": len(key_info.get("requirements", [])), 

499 "facts": len(key_info.get("facts", [])), 

500 "action_items": len(key_info.get("action_items", [])), 

501 "issues": len(key_info.get("issues", [])), 

502 "preferences": len(key_info.get("preferences", [])), 

503 }, 

504 ) 

505 

506 metrics.successful_calls.add(1, {"operation": "extract_key_info_llm"}) 

507 

508 return key_info 

509 

510 except Exception as e: 

511 logger.error(f"LLM-based extraction failed: {e}", exc_info=True) 

512 metrics.failed_calls.add(1, {"operation": "extract_key_info_llm"}) 

513 # Fallback to rule-based 

514 return self.extract_key_information(messages) 

515 

516 def _parse_extraction_response(self, text: str) -> dict[str, list[str]]: 

517 """ 

518 Parse LLM extraction response into structured dict. 

519 

520 Args: 

521 text: LLM response text 

522 

523 Returns: 

524 Dictionary with categorized information 

525 """ 

526 categories = { # type: ignore[var-annotated] 

527 "decisions": [], 

528 "requirements": [], 

529 "facts": [], 

530 "action_items": [], 

531 "issues": [], 

532 "preferences": [], 

533 } 

534 

535 # Extract each section 

536 current_category = None 

537 

538 for line in text.split("\n"): 

539 line = line.strip() 

540 

541 # Check for category headers 

542 if line.upper().startswith("DECISIONS:"): 

543 current_category = "decisions" 

544 elif line.upper().startswith("REQUIREMENTS:"): 

545 current_category = "requirements" 

546 elif line.upper().startswith("FACTS:"): 

547 current_category = "facts" 

548 elif line.upper().startswith("ACTION_ITEMS:") or line.upper().startswith("ACTION ITEMS:"): 

549 current_category = "action_items" 

550 elif line.upper().startswith("ISSUES:"): 

551 current_category = "issues" 

552 elif line.upper().startswith("PREFERENCES:"): 

553 current_category = "preferences" 

554 # Check for bullet points 

555 elif line.startswith("-") and current_category: 

556 item = line[1:].strip() 

557 if item and item.lower() != "none": 

558 categories[current_category].append(item) 

559 

560 return categories 

561 

562 

563# Convenience functions for easy import 

564async def compact_if_needed(messages: list[BaseMessage], context_manager: ContextManager | None = None) -> list[BaseMessage]: 

565 """ 

566 Compact conversation if needed, otherwise return unchanged. 

567 

568 Args: 

569 messages: Conversation messages 

570 context_manager: ContextManager instance (creates new if None) 

571 

572 Returns: 

573 Original or compacted messages 

574 """ 

575 if context_manager is None: 575 ↛ 578line 575 didn't jump to line 578 because the condition on line 575 was always true

576 context_manager = ContextManager() 

577 

578 if context_manager.needs_compaction(messages): 

579 result = await context_manager.compact_conversation(messages) 

580 return result.compacted_messages 

581 

582 return messages