Coverage for src / mcp_server_langgraph / core / context_manager.py: 97%
149 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Context Management for Agentic Workflows
4Implements Anthropic's context engineering best practices:
5- Compaction: Summarize conversation history when approaching limits
6- Just-in-time loading: Load context dynamically as needed
7- Token efficiency: Minimize context while maximizing signal
9References:
10- https://www.anthropic.com/engineering/effective-context-engineering-for-ai-agents
11"""
13from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
14from pydantic import BaseModel, Field
16from mcp_server_langgraph.llm.factory import create_summarization_model
17from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer
18from mcp_server_langgraph.utils.response_optimizer import count_tokens
21class CompactionResult(BaseModel):
22 """Result of conversation compaction operation."""
24 compacted_messages: list[BaseMessage] = Field(description="Compacted conversation messages")
25 original_token_count: int = Field(description="Token count before compaction")
26 compacted_token_count: int = Field(description="Token count after compaction")
27 messages_summarized: int = Field(description="Number of messages summarized")
28 compression_ratio: float = Field(ge=0.0, le=1.0, description="Compression achieved (1.0 = no compression)")
31class ContextManager:
32 """
33 Manages conversation context following Anthropic's best practices.
35 Key strategies:
36 1. Compaction: Summarize old messages when approaching token limits
37 2. Structured note-taking: Maintain persistent architectural decisions
38 3. Progressive disclosure: Keep recent messages, summarize older ones
40 Implementation follows "Long-Horizon Task Techniques" from Anthropic's guide.
41 """
43 def __init__( # type: ignore[no-untyped-def]
44 self,
45 compaction_threshold: int = 8000,
46 target_after_compaction: int = 4000,
47 recent_message_count: int = 5,
48 settings=None,
49 ):
50 """
51 Initialize context manager.
53 Args:
54 compaction_threshold: Token count that triggers compaction (default: 8000)
55 target_after_compaction: Target token count after compaction (default: 4000)
56 recent_message_count: Number of recent messages to keep uncompacted (default: 5)
57 settings: Application settings (if None, uses global settings)
58 """
59 self.compaction_threshold = compaction_threshold
60 self.target_after_compaction = target_after_compaction
61 self.recent_message_count = recent_message_count
63 # Initialize dedicated summarization LLM (lighter/cheaper model)
64 if settings is None:
65 from mcp_server_langgraph.core.config import settings as global_settings
67 settings = global_settings
69 self.settings = settings
70 self.llm = create_summarization_model(settings)
71 logger.info(
72 "ContextManager initialized",
73 extra={
74 "compaction_threshold": compaction_threshold,
75 "target_after_compaction": target_after_compaction,
76 "recent_message_count": recent_message_count,
77 "model": settings.model_name,
78 },
79 )
81 def needs_compaction(self, messages: list[BaseMessage]) -> bool:
82 """
83 Check if conversation needs compaction.
85 Args:
86 messages: Conversation messages
88 Returns:
89 True if token count exceeds threshold
90 """
91 # Use model-aware token counting
92 model_name = self.settings.model_name
93 total_tokens = sum(count_tokens(self._message_to_text(msg), model=model_name) for msg in messages)
95 with tracer.start_as_current_span("context.check_compaction") as span:
96 span.set_attribute("message.count", len(messages))
97 span.set_attribute("token.count", total_tokens)
98 span.set_attribute("needs.compaction", total_tokens > self.compaction_threshold)
100 if total_tokens > self.compaction_threshold:
101 logger.info(
102 "Compaction needed",
103 extra={
104 "total_tokens": total_tokens,
105 "threshold": self.compaction_threshold,
106 "message_count": len(messages),
107 },
108 )
109 return True
111 return False
113 async def compact_conversation(self, messages: list[BaseMessage], preserve_system: bool = True) -> CompactionResult:
114 """
115 Compact conversation by summarizing older messages.
117 Strategy (Anthropic's "Compaction" technique):
118 1. Keep system messages (architectural context)
119 2. Keep recent N messages (working context)
120 3. Summarize older messages (historical context)
122 Args:
123 messages: Full conversation history
124 preserve_system: Keep system messages intact (default: True)
126 Returns:
127 CompactionResult with compacted messages and metrics
128 """
129 with tracer.start_as_current_span("context.compact") as span:
130 # Use model-aware token counting
131 model_name = self.settings.model_name
132 original_tokens = sum(count_tokens(self._message_to_text(msg), model=model_name) for msg in messages)
134 span.set_attribute("message.count.original", len(messages))
135 span.set_attribute("token.count.original", original_tokens)
137 # Separate messages by type
138 system_messages = [msg for msg in messages if isinstance(msg, SystemMessage)] if preserve_system else []
139 other_messages = [msg for msg in messages if not isinstance(msg, SystemMessage)]
141 # Keep recent messages
142 recent_messages = other_messages[-self.recent_message_count :]
143 older_messages = other_messages[: -self.recent_message_count]
145 if not older_messages:
146 # Nothing to compact
147 logger.info("No older messages to compact")
148 return CompactionResult(
149 compacted_messages=messages,
150 original_token_count=original_tokens,
151 compacted_token_count=original_tokens,
152 messages_summarized=0,
153 compression_ratio=1.0,
154 )
156 # Summarize older messages
157 summary_text = await self._summarize_messages(older_messages)
158 summary_message = SystemMessage(
159 content=f"<conversation_summary>\n{summary_text}\n</conversation_summary>\n\n"
160 f"[Previous {len(older_messages)} messages summarized to preserve context]"
161 )
163 # Reconstruct conversation: system + summary + recent
164 compacted_messages = system_messages + [summary_message] + recent_messages
166 compacted_tokens = sum(count_tokens(self._message_to_text(msg), model=model_name) for msg in compacted_messages)
168 # Calculate compression ratio (clamped to max 1.0)
169 # In rare cases, summary may be longer than original, so clamp to prevent validation errors
170 compression_ratio = min(1.0, compacted_tokens / original_tokens) if original_tokens > 0 else 1.0
172 span.set_attribute("message.count.compacted", len(compacted_messages))
173 span.set_attribute("token.count.compacted", compacted_tokens)
174 span.set_attribute("compression.ratio", compression_ratio)
176 metrics.successful_calls.add(1, {"operation": "compact_conversation"})
178 logger.info(
179 "Conversation compacted",
180 extra={
181 "original_messages": len(messages),
182 "compacted_messages": len(compacted_messages),
183 "original_tokens": original_tokens,
184 "compacted_tokens": compacted_tokens,
185 "compression_ratio": compression_ratio,
186 "messages_summarized": len(older_messages),
187 },
188 )
190 return CompactionResult(
191 compacted_messages=compacted_messages,
192 original_token_count=original_tokens,
193 compacted_token_count=compacted_tokens,
194 messages_summarized=len(older_messages),
195 compression_ratio=compression_ratio,
196 )
198 async def _summarize_messages(self, messages: list[BaseMessage]) -> str:
199 """
200 Summarize a sequence of messages preserving key information.
202 Follows Anthropic's guidance:
203 "Preserve architectural decisions and critical details
204 while discarding redundant outputs"
206 Args:
207 messages: Messages to summarize
209 Returns:
210 Summary text preserving key information
211 """
212 with tracer.start_as_current_span("context.summarize"):
213 # Format conversation for summarization
214 conversation_text = "\n\n".join([f"{self._get_role_label(msg)}: {msg.content}" for msg in messages])
216 # Summarization prompt using XML structure (Anthropic best practice)
217 summarization_prompt = f"""<task>
218Summarize the following conversation segment, preserving critical information.
219</task>
221<instructions>
222Focus on:
2231. Key decisions made
2242. Important facts or data discovered
2253. User preferences or requirements
2264. Action items or next steps
2275. Any errors or issues encountered
229Omit:
230- Redundant greetings or small talk
231- Repetitive information
232- Verbose explanations already acted upon
233</instructions>
235<conversation_to_summarize>
236{conversation_text}
237</conversation_to_summarize>
239<output_format>
240Provide a concise summary in 3-5 bullet points.
241Focus on high-signal information that maintains conversation context.
242</output_format>"""
244 try:
245 # BUGFIX: Wrap prompt in HumanMessage to avoid string-to-character-list iteration
246 response = await self.llm.ainvoke([HumanMessage(content=summarization_prompt)])
247 content = response.content if hasattr(response, "content") else str(response)
248 summary = str(content) if not isinstance(content, str) else content
250 logger.info("Messages summarized", extra={"message_count": len(messages), "summary_length": len(summary)})
252 return summary
254 except Exception as e:
255 logger.error(f"Summarization failed: {e}", exc_info=True)
256 metrics.failed_calls.add(1, {"operation": "summarize_messages"})
258 # Fallback: Simple concatenation with truncation
259 fallback_summary = f"Previous conversation ({len(messages)} messages): " + conversation_text[:500] + "..."
260 return fallback_summary
262 def count_tokens(self, text: str) -> int:
263 """
264 Count tokens in text using LiteLLM model-aware counting.
266 Args:
267 text: Text to count tokens for
269 Returns:
270 Number of tokens
271 """
272 return count_tokens(text, model=self.settings.model_name)
274 def _message_to_text(self, message: BaseMessage) -> str:
275 """Convert message to text for token counting."""
276 if hasattr(message, "content"): 276 ↛ 278line 276 didn't jump to line 278 because the condition on line 276 was always true
277 return str(message.content)
278 return str(message)
280 def _get_role_label(self, message: BaseMessage) -> str:
281 """Get role label for message formatting."""
282 if isinstance(message, HumanMessage):
283 return "User"
284 elif isinstance(message, AIMessage):
285 return "Assistant"
286 elif isinstance(message, SystemMessage): 286 ↛ 289line 286 didn't jump to line 289 because the condition on line 286 was always true
287 return "System"
288 else:
289 return "Message"
291 def extract_key_information(self, messages: list[BaseMessage]) -> dict[str, list[str]]:
292 """
293 Extract and categorize key information from conversation.
295 Implements "Structured Note-Taking" pattern from Anthropic's guide.
296 This is a rule-based fallback method.
298 Args:
299 messages: Conversation messages
301 Returns:
302 Dictionary with categorized key information
303 """
304 key_info = { # type: ignore[var-annotated]
305 "decisions": [],
306 "requirements": [],
307 "facts": [],
308 "action_items": [],
309 "issues": [],
310 "preferences": [],
311 }
313 # Keyword-based extraction for all 6 categories
314 # Expanded from 10 to ~35 keywords for better coverage
315 for msg in messages:
316 msg_content = msg.content if hasattr(msg, "content") else ""
317 content = msg_content.lower() if isinstance(msg_content, str) else ""
319 # Decisions (voting, choosing, selecting)
320 if any(
321 keyword in content
322 for keyword in [
323 "decided",
324 "agreed",
325 "chose",
326 "selected",
327 "picked",
328 "opted",
329 "determined",
330 "concluded",
331 ]
332 ) and isinstance(msg_content, str):
333 key_info["decisions"].append(msg_content[:200])
335 # Requirements (obligations, constraints)
336 if any(
337 keyword in content
338 for keyword in [
339 "need",
340 "require",
341 "must",
342 "should",
343 "have to",
344 "necessary",
345 "essential",
346 "mandatory",
347 ]
348 ) and isinstance(msg_content, str):
349 key_info["requirements"].append(msg_content[:200])
351 # Facts (statements of truth, data points)
352 if any(
353 keyword in content
354 for keyword in [
355 " is ",
356 " are ",
357 " was ",
358 " were ",
359 "according to",
360 "version",
361 "default",
362 "by default",
363 "currently",
364 ]
365 ) and isinstance(msg_content, str):
366 key_info["facts"].append(msg_content[:200])
368 # Action Items (tasks, TODOs)
369 if any(
370 keyword in content
371 for keyword in [
372 "todo",
373 "to-do",
374 "please",
375 "need to",
376 "should",
377 "will",
378 "let's",
379 "we'll",
380 "add",
381 "fix",
382 "update",
383 "refactor",
384 ]
385 ) and isinstance(msg_content, str):
386 key_info["action_items"].append(msg_content[:200])
388 # Issues (problems, bugs, errors)
389 if any(
390 keyword in content
391 for keyword in [
392 "error",
393 "issue",
394 "problem",
395 "failed",
396 "bug",
397 "broken",
398 "crash",
399 "exception",
400 ]
401 ) and isinstance(msg_content, str):
402 key_info["issues"].append(msg_content[:200])
404 # Preferences (likes, dislikes, choices)
405 if any(
406 keyword in content
407 for keyword in [
408 "prefer",
409 "like",
410 "favorite",
411 "dislike",
412 "hate",
413 "love",
414 "enjoy",
415 "rather",
416 ]
417 ) and isinstance(msg_content, str):
418 key_info["preferences"].append(msg_content[:200])
420 return key_info
422 async def extract_key_information_llm(self, messages: list[BaseMessage]) -> dict[str, list[str]]:
423 """
424 Extract and categorize key information using LLM.
426 Enhanced version of extract_key_information with LLM-based extraction.
427 Implements Anthropic's "Structured Note-Taking" pattern with 6 categories.
429 Args:
430 messages: Conversation messages
432 Returns:
433 Dictionary with categorized key information
434 """
435 with tracer.start_as_current_span("context.extract_key_info_llm"):
436 # Format conversation
437 conversation_text = "\n\n".join([f"{self._get_role_label(msg)}: {msg.content}" for msg in messages])
439 # Extraction prompt with XML structure (Anthropic best practice)
440 extraction_prompt = f"""<task>
441Extract and categorize key information from this conversation.
442</task>
444<categories>
4451. **Decisions**: Choices made, agreements reached, directions chosen
4462. **Requirements**: Needs, must-haves, constraints, specifications
4473. **Facts**: Important factual information discovered or confirmed
4484. **Action Items**: Tasks to do, next steps, follow-ups
4495. **Issues**: Problems encountered, errors, blockers
4506. **Preferences**: User preferences, settings, customizations
451</categories>
453<conversation>
454{conversation_text}
455</conversation>
457<instructions>
458For each category, list the key items found.
459If a category has no items, write "None".
460Be concise (1-2 sentences per item).
461Focus on information that should be remembered long-term.
462</instructions>
464<output_format>
465DECISIONS:
466- [Decision 1]
467- [Decision 2]
469REQUIREMENTS:
470- [Requirement 1]
472FACTS:
473- [Fact 1]
475ACTION_ITEMS:
476- [Item 1]
478ISSUES:
479- [Issue 1 or None]
481PREFERENCES:
482- [Preference 1 or None]
483</output_format>"""
485 try:
486 # BUGFIX: Wrap prompt in HumanMessage to avoid string-to-character-list iteration
487 response = await self.llm.ainvoke([HumanMessage(content=extraction_prompt)])
488 response_content = response.content if hasattr(response, "content") else str(response)
489 extraction_text = response_content if isinstance(response_content, str) else str(response_content)
491 # Parse response
492 key_info = self._parse_extraction_response(extraction_text)
494 logger.info(
495 "Key information extracted with LLM",
496 extra={
497 "decisions": len(key_info.get("decisions", [])),
498 "requirements": len(key_info.get("requirements", [])),
499 "facts": len(key_info.get("facts", [])),
500 "action_items": len(key_info.get("action_items", [])),
501 "issues": len(key_info.get("issues", [])),
502 "preferences": len(key_info.get("preferences", [])),
503 },
504 )
506 metrics.successful_calls.add(1, {"operation": "extract_key_info_llm"})
508 return key_info
510 except Exception as e:
511 logger.error(f"LLM-based extraction failed: {e}", exc_info=True)
512 metrics.failed_calls.add(1, {"operation": "extract_key_info_llm"})
513 # Fallback to rule-based
514 return self.extract_key_information(messages)
516 def _parse_extraction_response(self, text: str) -> dict[str, list[str]]:
517 """
518 Parse LLM extraction response into structured dict.
520 Args:
521 text: LLM response text
523 Returns:
524 Dictionary with categorized information
525 """
526 categories = { # type: ignore[var-annotated]
527 "decisions": [],
528 "requirements": [],
529 "facts": [],
530 "action_items": [],
531 "issues": [],
532 "preferences": [],
533 }
535 # Extract each section
536 current_category = None
538 for line in text.split("\n"):
539 line = line.strip()
541 # Check for category headers
542 if line.upper().startswith("DECISIONS:"):
543 current_category = "decisions"
544 elif line.upper().startswith("REQUIREMENTS:"):
545 current_category = "requirements"
546 elif line.upper().startswith("FACTS:"):
547 current_category = "facts"
548 elif line.upper().startswith("ACTION_ITEMS:") or line.upper().startswith("ACTION ITEMS:"):
549 current_category = "action_items"
550 elif line.upper().startswith("ISSUES:"):
551 current_category = "issues"
552 elif line.upper().startswith("PREFERENCES:"):
553 current_category = "preferences"
554 # Check for bullet points
555 elif line.startswith("-") and current_category:
556 item = line[1:].strip()
557 if item and item.lower() != "none":
558 categories[current_category].append(item)
560 return categories
563# Convenience functions for easy import
564async def compact_if_needed(messages: list[BaseMessage], context_manager: ContextManager | None = None) -> list[BaseMessage]:
565 """
566 Compact conversation if needed, otherwise return unchanged.
568 Args:
569 messages: Conversation messages
570 context_manager: ContextManager instance (creates new if None)
572 Returns:
573 Original or compacted messages
574 """
575 if context_manager is None: 575 ↛ 578line 575 didn't jump to line 578 because the condition on line 575 was always true
576 context_manager = ContextManager()
578 if context_manager.needs_compaction(messages):
579 result = await context_manager.compact_conversation(messages)
580 return result.compacted_messages
582 return messages