Coverage for src / mcp_server_langgraph / monitoring / budget_monitor.py: 81%
261 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Budget Monitor and Alert System
4Monitors LLM spending against budgets and triggers alerts when thresholds are exceeded.
6Features:
7- Budget tracking (daily, weekly, monthly, quarterly)
8- Configurable alert thresholds (e.g., 75%, 90%)
9- Multi-channel alerts (logging, webhooks, email)
10- Budget forecasting
11- Automatic budget rollover
13Example:
14 >>> from mcp_server_langgraph.monitoring.budget_monitor import BudgetMonitor
15 >>> monitor = BudgetMonitor()
16 >>> await monitor.create_budget(
17 ... name="Development Team Monthly",
18 ... limit_usd=1000.00,
19 ... period="monthly"
20 ... )
21 >>> await monitor.check_budget("budget_001")
22"""
24import asyncio
25import logging
26import smtplib
27from datetime import datetime, timedelta, UTC
28from decimal import Decimal
29from email.mime.multipart import MIMEMultipart
30from email.mime.text import MIMEText
31from enum import Enum
32from typing import TYPE_CHECKING, Any, Optional
34import httpx
35from pydantic import BaseModel, ConfigDict, Field, field_serializer
37if TYPE_CHECKING:
38 from mcp_server_langgraph.monitoring.cost_tracker import CostMetricsCollector
40# ==============================================================================
41# Configuration
42# ==============================================================================
44logger = logging.getLogger(__name__)
47class BudgetPeriod(str, Enum):
48 """Budget period types."""
50 DAILY = "daily"
51 WEEKLY = "weekly"
52 MONTHLY = "monthly"
53 QUARTERLY = "quarterly"
54 YEARLY = "yearly"
57class AlertLevel(str, Enum):
58 """Alert severity levels."""
60 INFO = "info"
61 WARNING = "warning"
62 CRITICAL = "critical"
65# ==============================================================================
66# Data Models
67# ==============================================================================
70class Budget(BaseModel):
71 """Budget configuration."""
73 id: str = Field(description="Unique budget identifier")
74 name: str = Field(description="Budget name")
75 limit_usd: Decimal = Field(description="Budget limit in USD", gt=0)
76 period: BudgetPeriod = Field(description="Budget period")
77 start_date: datetime = Field(description="Budget start date")
78 end_date: datetime | None = Field(default=None, description="Budget end date (optional)")
79 alert_thresholds: list[Decimal] = Field(
80 default=[Decimal("0.75"), Decimal("0.90")],
81 description="Alert thresholds as percentages (e.g., 0.75 = 75%)",
82 )
83 enabled: bool = Field(default=True, description="Whether budget monitoring is enabled")
84 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
86 model_config = ConfigDict()
88 @field_serializer("limit_usd")
89 def serialize_limit(self, value: Decimal) -> str:
90 """Serialize Decimal as string for JSON compatibility."""
91 return str(value)
93 @field_serializer("alert_thresholds")
94 def serialize_thresholds(self, value: list[Decimal]) -> list[str]:
95 """Serialize list of Decimals as strings for JSON compatibility."""
96 return [str(v) for v in value]
98 @field_serializer("start_date", "end_date")
99 def serialize_dates(self, value: datetime | None) -> str | None:
100 """Serialize datetime as ISO 8601 string."""
101 return value.isoformat() if value else None
104class BudgetStatus(BaseModel):
105 """Current budget status."""
107 budget_id: str
108 budget_name: str
109 limit_usd: Decimal
110 spent_usd: Decimal
111 remaining_usd: Decimal
112 utilization: Decimal # Percentage (0-1)
113 period_start: datetime
114 period_end: datetime
115 is_exceeded: bool
116 days_remaining: int
117 projected_end_of_period_spend: Decimal | None = None
119 model_config = ConfigDict()
121 @field_serializer("limit_usd", "spent_usd", "remaining_usd", "utilization", "projected_end_of_period_spend")
122 def serialize_decimals(self, value: Decimal | None) -> str | None:
123 """Serialize Decimal as string for JSON compatibility."""
124 return str(value) if value is not None else None
126 @field_serializer("period_start", "period_end")
127 def serialize_dates(self, value: datetime) -> str:
128 """Serialize datetime as ISO 8601 string."""
129 return value.isoformat()
132class BudgetAlert(BaseModel):
133 """Budget alert notification."""
135 alert_id: str
136 budget_id: str
137 budget_name: str
138 level: AlertLevel
139 utilization: Decimal
140 threshold: Decimal
141 message: str
142 timestamp: datetime
143 acknowledged: bool = False
145 model_config = ConfigDict()
147 @field_serializer("utilization", "threshold")
148 def serialize_decimals(self, value: Decimal) -> str:
149 """Serialize Decimal as string for JSON compatibility."""
150 return str(value)
152 @field_serializer("timestamp")
153 def serialize_timestamp(self, value: datetime) -> str:
154 """Serialize datetime as ISO 8601 string."""
155 return value.isoformat()
158# ==============================================================================
159# Budget Monitor
160# ==============================================================================
163class BudgetMonitor:
164 """
165 Monitor LLM spending against budgets and trigger alerts.
167 Features:
168 - Budget tracking by period
169 - Configurable alert thresholds
170 - Multi-level alerts (info, warning, critical)
171 - Budget forecasting
172 """
174 def __init__(
175 self,
176 cost_collector: Optional["CostMetricsCollector"] = None,
177 smtp_host: str | None = None,
178 smtp_port: int = 587,
179 smtp_username: str | None = None,
180 smtp_password: str | None = None,
181 email_from: str | None = None,
182 email_to: list[str] | None = None,
183 webhook_url: str | None = None,
184 ) -> None:
185 """
186 Initialize budget monitor.
188 Args:
189 cost_collector: CostMetricsCollector instance for querying actual spend.
190 smtp_host: SMTP server hostname for email alerts
191 smtp_port: SMTP server port (default: 587)
192 smtp_username: SMTP authentication username
193 smtp_password: SMTP authentication password
194 email_from: From email address for alerts
195 email_to: List of recipient email addresses
196 webhook_url: Webhook URL for POST notifications
197 """
198 from mcp_server_langgraph.monitoring.cost_tracker import CostMetricsCollector
200 self._budgets: dict[str, Budget] = {}
201 self._alerts: list[BudgetAlert] = []
202 self._alerted_thresholds: dict[str, set[Decimal]] = {}
203 self._lock = asyncio.Lock()
204 self._cost_collector = cost_collector or CostMetricsCollector()
206 # Alert transport configuration
207 self._smtp_host = smtp_host
208 self._smtp_port = smtp_port
209 self._smtp_username = smtp_username
210 self._smtp_password = smtp_password
211 self._email_from = email_from
212 self._email_to = email_to or []
213 self._webhook_url = webhook_url
215 async def create_budget(
216 self,
217 id: str,
218 name: str,
219 limit_usd: Decimal,
220 period: BudgetPeriod,
221 start_date: datetime | None = None,
222 alert_thresholds: list[Decimal] | None = None,
223 ) -> Budget:
224 """
225 Create a new budget.
227 Args:
228 id: Budget identifier
229 name: Budget name
230 limit_usd: Budget limit in USD
231 period: Budget period
232 start_date: Start date (defaults to now)
233 alert_thresholds: Alert thresholds (defaults to [0.75, 0.90])
235 Returns:
236 Created Budget
238 Example:
239 >>> budget = await monitor.create_budget(
240 ... id="dev_team_monthly",
241 ... name="Development Team - Monthly",
242 ... limit_usd=Decimal("1000.00"),
243 ... period=BudgetPeriod.MONTHLY
244 ... )
245 """
246 if start_date is None:
247 start_date = datetime.now(UTC)
249 budget = Budget(
250 id=id,
251 name=name,
252 limit_usd=limit_usd,
253 period=period,
254 start_date=start_date,
255 alert_thresholds=alert_thresholds or [Decimal("0.75"), Decimal("0.90")],
256 )
258 async with self._lock:
259 self._budgets[id] = budget
260 self._alerted_thresholds[id] = set()
262 logger.info(f"Created budget: {name} (${limit_usd} {period})")
263 return budget
265 async def get_budget(self, budget_id: str) -> Budget | None:
266 """Get budget by ID."""
267 async with self._lock:
268 return self._budgets.get(budget_id)
270 async def get_period_spend(self, budget_id: str) -> Decimal:
271 """
272 Get total spending for current budget period.
274 Queries CostMetricsCollector for actual spending within the current period.
276 Args:
277 budget_id: Budget identifier
279 Returns:
280 Total spend in USD for current period
281 """
282 async with self._lock:
283 budget = self._budgets.get(budget_id)
285 if not budget:
286 return Decimal("0.00")
288 # Calculate current period boundaries
289 now = datetime.now(UTC)
290 period_start, period_end = self._calculate_period_boundaries(budget, now)
292 # Get all records from cost collector (using "day" but we'll filter manually)
293 all_records = await self._cost_collector.get_records(period="day")
295 # Filter records by budget period
296 period_records = [record for record in all_records if period_start <= record.timestamp <= period_end]
298 # Sum costs
299 total_cost = sum(
300 (record.estimated_cost_usd for record in period_records),
301 Decimal("0.00"),
302 )
304 logger.debug(
305 f"Budget {budget_id}: {len(period_records)} records in period "
306 f"{period_start} to {period_end}, total cost: ${total_cost}"
307 )
309 return total_cost
311 async def get_budget_status(self, budget_id: str) -> BudgetStatus | None:
312 """
313 Get current budget status.
315 Args:
316 budget_id: Budget identifier
318 Returns:
319 BudgetStatus with current utilization and projections
320 """
321 budget = await self.get_budget(budget_id)
322 if not budget:
323 return None
325 spent = await self.get_period_spend(budget_id)
326 remaining = budget.limit_usd - spent
327 utilization = spent / budget.limit_usd if budget.limit_usd > 0 else Decimal("0")
329 # Calculate period dates
330 period_start = budget.start_date
331 period_end = self._calculate_period_end(period_start, budget.period)
333 # Calculate days remaining
334 now = datetime.now(UTC)
335 days_remaining = (period_end - now).days
337 # Project end-of-period spend
338 days_elapsed = (now - period_start).days
339 if days_elapsed > 0: 339 ↛ 344line 339 didn't jump to line 344 because the condition on line 339 was always true
340 daily_burn_rate = spent / Decimal(days_elapsed)
341 total_period_days = (period_end - period_start).days
342 projected_spend = daily_burn_rate * Decimal(total_period_days)
343 else:
344 projected_spend = None
346 return BudgetStatus(
347 budget_id=budget.id,
348 budget_name=budget.name,
349 limit_usd=budget.limit_usd,
350 spent_usd=spent,
351 remaining_usd=remaining,
352 utilization=utilization,
353 period_start=period_start,
354 period_end=period_end,
355 is_exceeded=spent > budget.limit_usd,
356 days_remaining=days_remaining,
357 projected_end_of_period_spend=projected_spend,
358 )
360 async def check_budget(self, budget_id: str) -> BudgetAlert | None:
361 """
362 Check budget and trigger alerts if thresholds exceeded.
364 Args:
365 budget_id: Budget identifier
367 Returns:
368 BudgetAlert if threshold exceeded, None otherwise
370 Example:
371 >>> alert = await monitor.check_budget("dev_team_monthly")
372 >>> if alert:
373 ... print(f"Alert: {alert.message}")
374 """
375 budget = await self.get_budget(budget_id)
376 if not budget or not budget.enabled:
377 return None
379 spent = await self.get_period_spend(budget_id)
380 utilization = spent / budget.limit_usd if budget.limit_usd > 0 else Decimal("0")
382 # Check each threshold
383 for threshold in sorted(budget.alert_thresholds, reverse=True):
384 if utilization >= threshold:
385 # Check if we've already alerted for this threshold. We need to
386 # grab the flag while holding the lock, but release it before
387 # creating the alert to avoid deadlocking when _create_alert
388 # tries to append to the shared alerts list (which also uses
389 # the same lock).
390 should_alert = False
391 async with self._lock:
392 if threshold not in self._alerted_thresholds[budget_id]:
393 self._alerted_thresholds[budget_id].add(threshold)
394 should_alert = True
396 if should_alert:
397 # Determine alert level based on utilization
398 # Critical: >= 90% (aligns with standard budget alert thresholds)
399 # Warning: >= 75%
400 # Info: < 75%
401 if utilization >= Decimal("0.90"):
402 level = AlertLevel.CRITICAL
403 elif utilization >= Decimal("0.75"): 403 ↛ 406line 403 didn't jump to line 406 because the condition on line 403 was always true
404 level = AlertLevel.WARNING
405 else:
406 level = AlertLevel.INFO
408 # Create alert
409 alert = await self._create_alert(
410 budget=budget,
411 level=level,
412 utilization=utilization,
413 threshold=threshold,
414 )
416 # Send alert
417 await self.send_alert(
418 level=level.value,
419 message=alert.message,
420 budget_id=budget_id,
421 utilization=float(utilization),
422 )
424 return alert
426 return None
428 async def _create_alert(
429 self,
430 budget: Budget,
431 level: AlertLevel,
432 utilization: Decimal,
433 threshold: Decimal,
434 ) -> BudgetAlert:
435 """Create a budget alert."""
436 alert_id = f"alert_{budget.id}_{int(datetime.now(UTC).timestamp())}"
438 message = (
439 f"Budget '{budget.name}' at {utilization * 100:.1f}% "
440 f"(threshold: {threshold * 100:.0f}%, limit: ${budget.limit_usd})"
441 )
443 alert = BudgetAlert(
444 alert_id=alert_id,
445 budget_id=budget.id,
446 budget_name=budget.name,
447 level=level,
448 utilization=utilization,
449 threshold=threshold,
450 message=message,
451 timestamp=datetime.now(UTC),
452 )
454 async with self._lock:
455 self._alerts.append(alert)
457 return alert
459 async def send_alert(
460 self,
461 level: str,
462 message: str,
463 budget_id: str,
464 utilization: float,
465 ) -> None:
466 """
467 Send budget alert notification via multiple channels.
469 Supports:
470 - Logging (always enabled)
471 - Email via SMTP (if configured)
472 - Generic webhooks for Slack/Teams/custom (if configured)
474 Args:
475 level: Alert level (info, warning, critical)
476 message: Alert message
477 budget_id: Budget identifier
478 utilization: Current utilization percentage
479 """
480 # 1. Log alert (always enabled)
481 log_level = {
482 "info": logging.INFO,
483 "warning": logging.WARNING,
484 "critical": logging.CRITICAL,
485 }.get(level, logging.WARNING)
487 logger.log(log_level, f"BUDGET ALERT [{level.upper()}]: {message}")
489 # 2. Send email alert (if configured)
490 if self._smtp_host and self._email_from and self._email_to:
491 try:
492 await self._send_email_alert(level, message, budget_id, utilization)
493 except Exception as e:
494 logger.exception(f"Failed to send email alert: {e}")
496 # 3. Send webhook notification (if configured)
497 if self._webhook_url:
498 try:
499 await self._send_webhook_alert(level, message, budget_id, utilization)
500 except Exception as e:
501 logger.exception(f"Failed to send webhook alert: {e}")
503 async def _send_email_alert(self, level: str, message: str, budget_id: str, utilization: float) -> None:
504 """Send email alert via SMTP."""
505 # Guard: Skip if SMTP not configured
506 if not self._smtp_host or not self._email_from or not self._email_to:
507 logger.warning("Email alerts not configured - skipping email notification")
508 return
510 subject = f"[{level.upper()}] Budget Alert: {budget_id}"
512 # Create HTML email body
513 html_body = f"""
514 <html>
515 <body>
516 <h2 style="color: {"red" if level == "critical" else "orange"};">Budget Alert</h2>
517 <p><strong>Level:</strong> {level.upper()}</p>
518 <p><strong>Budget ID:</strong> {budget_id}</p>
519 <p><strong>Utilization:</strong> {utilization:.1f}%</p>
520 <p><strong>Message:</strong> {message}</p>
521 <p><strong>Timestamp:</strong> {datetime.now(UTC).isoformat()}</p>
522 </body>
523 </html>
524 """
526 # Create plain text fallback
527 text_body = f"""
528Budget Alert [{level.upper()}]
530Budget ID: {budget_id}
531Utilization: {utilization:.1f}%
532Message: {message}
533Timestamp: {datetime.now(UTC).isoformat()}
534 """
536 # Create message
537 msg = MIMEMultipart("alternative")
538 msg["Subject"] = subject
539 msg["From"] = self._email_from
540 msg["To"] = ", ".join(self._email_to)
542 msg.attach(MIMEText(text_body, "plain"))
543 msg.attach(MIMEText(html_body, "html"))
545 # Send email (run in thread pool to avoid blocking)
546 await asyncio.to_thread(self._send_smtp, msg)
548 logger.info(f"Email alert sent to {len(self._email_to)} recipients")
550 def _send_smtp(self, msg: MIMEMultipart) -> None:
551 """Send SMTP message (blocking, meant to be called via to_thread)."""
552 # Type guard: smtp_host guaranteed non-None by _send_email_alert guard
553 assert self._smtp_host is not None, "smtp_host must be configured to send emails"
554 with smtplib.SMTP(self._smtp_host, self._smtp_port) as server:
555 server.starttls()
556 if self._smtp_username and self._smtp_password:
557 server.login(self._smtp_username, self._smtp_password)
558 server.send_message(msg)
560 async def _send_webhook_alert(self, level: str, message: str, budget_id: str, utilization: float) -> None:
561 """Send webhook notification via HTTP POST."""
562 # Guard: Skip if webhook not configured
563 if not self._webhook_url:
564 logger.warning("Webhook URL not configured - skipping webhook notification")
565 return
567 payload = {
568 "alert_type": "budget",
569 "level": level,
570 "budget_id": budget_id,
571 "message": message,
572 "utilization": utilization,
573 "timestamp": datetime.now(UTC).isoformat(),
574 }
576 async with httpx.AsyncClient() as client:
577 response = await client.post(
578 self._webhook_url,
579 json=payload,
580 headers={"Content-Type": "application/json"},
581 timeout=10.0,
582 )
583 response.raise_for_status()
585 logger.info(f"Webhook alert sent to {self._webhook_url}")
587 def _calculate_period_boundaries(self, budget: Budget, current_time: datetime) -> tuple[datetime, datetime]:
588 """
589 Calculate the start and end boundaries for the current budget period.
591 For recurring periods, this calculates which period we're currently in
592 based on the budget start date.
594 Args:
595 budget: Budget configuration
596 current_time: Current timestamp
598 Returns:
599 Tuple of (period_start, period_end)
600 """
601 budget.start_date
603 if budget.period == BudgetPeriod.DAILY:
604 # Find current day boundary
605 period_start = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
606 period_end = period_start + timedelta(days=1)
607 elif budget.period == BudgetPeriod.WEEKLY:
608 # Find current week boundary (Monday-Sunday)
609 days_since_monday = current_time.weekday()
610 period_start = (current_time - timedelta(days=days_since_monday)).replace(
611 hour=0, minute=0, second=0, microsecond=0
612 )
613 period_end = period_start + timedelta(weeks=1)
614 elif budget.period == BudgetPeriod.MONTHLY:
615 # Find current month boundary
616 period_start = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
617 # Next month's first day
618 if current_time.month == 12:
619 period_end = period_start.replace(year=period_start.year + 1, month=1)
620 else:
621 period_end = period_start.replace(month=period_start.month + 1)
622 elif budget.period == BudgetPeriod.QUARTERLY:
623 # Find current quarter boundary
624 quarter_month = ((current_time.month - 1) // 3) * 3 + 1
625 period_start = current_time.replace(month=quarter_month, day=1, hour=0, minute=0, second=0, microsecond=0)
626 period_end = period_start + timedelta(days=90)
627 else: # BudgetPeriod.YEARLY
628 # Find current year boundary
629 period_start = current_time.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
630 period_end = period_start.replace(year=period_start.year + 1)
632 return period_start, period_end
634 def _calculate_period_end(self, start_date: datetime, period: BudgetPeriod) -> datetime:
635 """Calculate end date for budget period."""
636 if period == BudgetPeriod.DAILY: 636 ↛ 637line 636 didn't jump to line 637 because the condition on line 636 was never true
637 return start_date + timedelta(days=1)
638 elif period == BudgetPeriod.WEEKLY: 638 ↛ 639line 638 didn't jump to line 639 because the condition on line 638 was never true
639 return start_date + timedelta(weeks=1)
640 elif period == BudgetPeriod.MONTHLY: 640 ↛ 643line 640 didn't jump to line 643 because the condition on line 640 was always true
641 # Add one month (approximate)
642 return start_date + timedelta(days=30)
643 elif period == BudgetPeriod.QUARTERLY:
644 return start_date + timedelta(days=90)
645 else: # BudgetPeriod.YEARLY
646 return start_date + timedelta(days=365)
648 async def reset_budget(self, budget_id: str) -> None:
649 """
650 Reset budget for new period.
652 Clears alerted thresholds so alerts can trigger again.
654 Args:
655 budget_id: Budget identifier
656 """
657 async with self._lock:
658 if budget_id in self._alerted_thresholds: 658 ↛ 661line 658 didn't jump to line 661
659 self._alerted_thresholds[budget_id].clear()
661 logger.info(f"Reset budget: {budget_id}")
663 async def get_all_budgets(self) -> list[Budget]:
664 """Get all budgets."""
665 async with self._lock:
666 return list(self._budgets.values())
668 async def get_alerts(
669 self,
670 budget_id: str | None = None,
671 acknowledged: bool | None = None,
672 ) -> list[BudgetAlert]:
673 """
674 Get budget alerts with optional filtering.
676 Args:
677 budget_id: Filter by budget (optional)
678 acknowledged: Filter by acknowledgment status (optional)
680 Returns:
681 List of BudgetAlerts
682 """
683 async with self._lock:
684 alerts = self._alerts.copy()
686 if budget_id:
687 alerts = [a for a in alerts if a.budget_id == budget_id]
689 if acknowledged is not None: 689 ↛ 690line 689 didn't jump to line 690 because the condition on line 689 was never true
690 alerts = [a for a in alerts if a.acknowledged == acknowledged]
692 return alerts
695# ==============================================================================
696# Singleton Instance
697# ==============================================================================
699_monitor_instance: BudgetMonitor | None = None
702def get_budget_monitor() -> BudgetMonitor:
703 """Get or create singleton budget monitor instance."""
704 global _monitor_instance
705 if _monitor_instance is None:
706 _monitor_instance = BudgetMonitor()
707 return _monitor_instance