Coverage for src / mcp_server_langgraph / monitoring / prometheus_client.py: 86%
202 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Prometheus client service for querying metrics.
4Provides a high-level interface for querying Prometheus metrics with:
5- Uptime/downtime calculations
6- Response time percentiles (p50, p95, p99)
7- Error rate calculations
8- SLA compliance metrics
9- Custom PromQL queries
10- Time-range aggregations
12Resolves production TODOs:
13- monitoring/sla.py:157 - Query actual downtime
14- monitoring/sla.py:235 - Query actual response times
15- monitoring/sla.py:300 - Query actual error rate
16- core/compliance/evidence.py:419 - Uptime data for SOC2 evidence
17"""
19import logging
20import ssl
21from dataclasses import dataclass
22from datetime import datetime
23from typing import Any
25import certifi
26import httpx
27from pydantic import BaseModel, Field
29from mcp_server_langgraph.core.config import settings
31logger = logging.getLogger(__name__)
34class PrometheusConfig(BaseModel):
35 """Prometheus client configuration"""
37 url: str = Field(default="http://prometheus:9090", description="Prometheus server URL")
38 timeout: int = Field(default=30, description="Query timeout in seconds")
39 retry_attempts: int = Field(default=3, description="Number of retry attempts")
40 retry_backoff: float = Field(default=1.0, description="Retry backoff multiplier")
43@dataclass
44class MetricValue:
45 """Single metric value with timestamp"""
47 timestamp: datetime
48 value: float
50 @classmethod
51 def from_prometheus(cls, data: list[float | str]) -> "MetricValue":
52 """Parse from Prometheus response format: [timestamp, value]"""
53 return cls(timestamp=datetime.fromtimestamp(data[0]), value=float(data[1])) # type: ignore[arg-type]
56@dataclass
57class QueryResult:
58 """Result of a Prometheus query"""
60 metric: dict[str, str] # Label key-value pairs
61 values: list[MetricValue]
63 def get_latest_value(self) -> float | None:
64 """Get the most recent value"""
65 return self.values[-1].value if self.values else None
67 def get_average(self) -> float | None:
68 """Calculate average across all values"""
69 if not self.values:
70 return None
71 return sum(v.value for v in self.values) / len(self.values)
74class PrometheusClient:
75 """
76 High-level Prometheus query client.
78 Usage:
79 client = PrometheusClient()
80 await client.initialize()
82 # Query uptime
83 uptime_pct = await client.query_uptime(service="mcp-server", timerange="30d")
85 # Query percentiles
86 response_times = await client.query_percentiles(
87 metric="http_request_duration_seconds",
88 percentiles=[50, 95, 99],
89 timerange="1h"
90 )
92 # Query error rate
93 error_rate = await client.query_error_rate(timerange="5m")
95 await client.close()
96 """
98 def __init__(self, config: PrometheusConfig | None = None) -> None:
99 self.config = config or self._load_config_from_settings()
100 self.client: httpx.AsyncClient | None = None
101 self._initialized = False
103 def _load_config_from_settings(self) -> PrometheusConfig:
104 """Load configuration from application settings"""
105 return PrometheusConfig(
106 url=getattr(settings, "prometheus_url", "http://prometheus:9090"),
107 timeout=getattr(settings, "prometheus_timeout", 30),
108 retry_attempts=getattr(settings, "prometheus_retry_attempts", 3),
109 )
111 async def initialize(self) -> None:
112 """Initialize HTTP client with cross-platform SSL support."""
113 if self._initialized:
114 return
116 # Use certifi CA bundle for cross-platform SSL verification
117 # CI environments (especially isolated Python builds) may lack system certs
118 ssl_context = ssl.create_default_context()
119 try:
120 ssl_context.load_verify_locations(certifi.where())
121 except FileNotFoundError:
122 logger.warning("certifi CA bundle not found, falling back to system certs")
124 self.client = httpx.AsyncClient(
125 timeout=self.config.timeout,
126 follow_redirects=True,
127 verify=ssl_context,
128 )
130 self._initialized = True
131 logger.info(f"Prometheus client initialized: {self.config.url}")
133 async def close(self) -> None:
134 """Close HTTP client"""
135 if self.client: 135 ↛ 137line 135 didn't jump to line 137 because the condition on line 135 was always true
136 await self.client.aclose()
137 logger.info("Prometheus client closed")
139 async def query(self, promql: str, time: datetime | None = None) -> list[QueryResult]:
140 """
141 Execute instant query.
143 Args:
144 promql: PromQL query string
145 time: Optional evaluation timestamp (defaults to now)
147 Returns:
148 List of query results
149 """
150 if not self._initialized:
151 await self.initialize()
153 params = {"query": promql}
154 if time: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 params["time"] = time.timestamp() # type: ignore[assignment]
157 url = f"{self.config.url}/api/v1/query"
159 try:
160 response = await self.client.get(url, params=params) # type: ignore[union-attr]
161 response.raise_for_status()
163 data = response.json()
165 if data.get("status") != "success":
166 msg = f"Prometheus query failed: {data.get('error', 'Unknown error')}"
167 raise ValueError(msg)
169 return self._parse_query_result(data["data"]["result"])
171 except Exception as e:
172 logger.error(f"Prometheus query failed: {e}", exc_info=True, extra={"query": promql})
173 raise
175 async def query_range(
176 self,
177 promql: str,
178 start: datetime,
179 end: datetime,
180 step: str = "1m",
181 ) -> list[QueryResult]:
182 """
183 Execute range query.
185 Args:
186 promql: PromQL query string
187 start: Range start time
188 end: Range end time
189 step: Query resolution step (e.g., "1m", "5m", "1h")
191 Returns:
192 List of query results with time series
193 """
194 if not self._initialized: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true
195 await self.initialize()
197 params: dict[str, str | float] = {
198 "query": promql,
199 "start": start.timestamp(),
200 "end": end.timestamp(),
201 "step": step,
202 }
204 url = f"{self.config.url}/api/v1/query_range"
206 try:
207 if self.client is not None:
208 response = await self.client.get(url, params=params)
209 response.raise_for_status()
210 else:
211 msg = "Prometheus client not initialized"
212 raise ValueError(msg)
214 data = response.json()
216 if data.get("status") != "success": 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 msg = f"Prometheus range query failed: {data.get('error', 'Unknown error')}"
218 raise ValueError(msg)
220 return self._parse_range_result(data["data"]["result"])
222 except Exception as e:
223 logger.error(f"Prometheus range query failed: {e}", exc_info=True, extra={"query": promql})
224 raise
226 def _parse_query_result(self, result: list[dict]) -> list[QueryResult]: # type: ignore[type-arg]
227 """Parse instant query result"""
228 parsed = []
229 for item in result:
230 metric = item.get("metric", {})
231 value_data = item.get("value", [])
233 if value_data: 233 ↛ 229line 233 didn't jump to line 229 because the condition on line 233 was always true
234 values = [MetricValue.from_prometheus(value_data)]
235 parsed.append(QueryResult(metric=metric, values=values))
237 return parsed
239 def _parse_range_result(self, result: list[dict]) -> list[QueryResult]: # type: ignore[type-arg]
240 """Parse range query result"""
241 parsed = []
242 for item in result:
243 metric = item.get("metric", {})
244 values_data = item.get("values", [])
246 values = [MetricValue.from_prometheus(v) for v in values_data]
247 parsed.append(QueryResult(metric=metric, values=values))
249 return parsed
251 async def query_uptime(
252 self,
253 service: str = "mcp-server-langgraph",
254 timerange: str = "30d",
255 ) -> float:
256 """
257 Calculate service uptime percentage.
259 Args:
260 service: Service name (label filter)
261 timerange: Time range (e.g., "1h", "24h", "30d")
263 Returns:
264 Uptime percentage (0-100)
266 Resolves: monitoring/sla.py:157, core/compliance/evidence.py:419
267 """
268 # Query: (total time - downtime) / total time * 100
269 # Assuming 'up' metric where 0 = down, 1 = up
271 promql = f'avg_over_time(up{{job="{service}"}}[{timerange}]) * 100'
273 try:
274 results = await self.query(promql)
276 if results and results[0].values:
277 uptime_pct = results[0].get_latest_value()
278 logger.info(f"Uptime queried: {uptime_pct:.2f}% over {timerange}", extra={"service": service})
279 return uptime_pct # type: ignore[return-value]
281 # Fallback: assume 100% if no data
282 logger.warning(f"No uptime data found for {service}, assuming 100%")
283 return 100.0
285 except Exception as e:
286 logger.error(f"Failed to query uptime: {e}", exc_info=True)
287 # Return conservative estimate
288 return 99.0
290 async def query_downtime(
291 self,
292 service: str = "mcp-server-langgraph",
293 timerange: str = "30d",
294 ) -> float:
295 """
296 Calculate total downtime in seconds.
298 Args:
299 service: Service name
300 timerange: Time range
302 Returns:
303 Total downtime in seconds
305 Resolves: monitoring/sla.py:157
306 """
307 uptime_pct = await self.query_uptime(service=service, timerange=timerange)
309 # Calculate total time in seconds
310 if timerange.endswith("d"):
311 total_seconds = int(timerange[:-1]) * 86400
312 elif timerange.endswith("h"):
313 total_seconds = int(timerange[:-1]) * 3600
314 elif timerange.endswith("m"):
315 total_seconds = int(timerange[:-1]) * 60
316 else:
317 msg = f"Unsupported timerange format: {timerange}"
318 raise ValueError(msg)
320 downtime_seconds = total_seconds * (100 - uptime_pct) / 100
322 logger.info(f"Downtime calculated: {downtime_seconds:.2f}s over {timerange}", extra={"service": service})
324 return downtime_seconds
326 async def query_percentiles(
327 self,
328 metric: str,
329 percentiles: list[int] | None = None,
330 timerange: str = "1h",
331 label_filters: dict[str, str] | None = None,
332 ) -> dict[int, float]:
333 """
334 Query metric percentiles (p50, p95, p99, etc.).
336 Args:
337 metric: Metric name (e.g., "http_request_duration_seconds")
338 percentiles: List of percentiles to query (default: [50, 95, 99])
339 timerange: Time range
340 label_filters: Additional label filters
342 Returns:
343 Dictionary mapping percentile to value
345 Resolves: monitoring/sla.py:235
346 """
347 if percentiles is None: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 percentiles = [0.5, 0.95, 0.99] # type: ignore[list-item]
350 # Build label filter string
351 label_str = ""
352 if label_filters: 352 ↛ 353line 352 didn't jump to line 353 because the condition on line 352 was never true
353 label_parts = [f'{k}="{v}"' for k, v in label_filters.items()]
354 label_str = "{" + ", ".join(label_parts) + "}"
356 results = {}
358 for p in percentiles:
359 # Query using histogram_quantile
360 quantile = p / 100.0
361 promql = f"histogram_quantile({quantile}, rate({metric}_bucket{label_str}[{timerange}]))"
363 try:
364 query_results = await self.query(promql)
366 if query_results and query_results[0].values: 366 ↛ 370line 366 didn't jump to line 370 because the condition on line 366 was always true
367 value = query_results[0].get_latest_value()
368 results[p] = value
369 else:
370 results[p] = 0.0
372 except Exception as e:
373 logger.error(f"Failed to query p{p}: {e}", exc_info=True)
374 results[p] = 0.0
376 logger.info(f"Percentiles queried: {results}", extra={"metric": metric, "timerange": timerange})
378 return results # type: ignore[return-value]
380 async def query_error_rate(
381 self,
382 timerange: str = "5m",
383 service: str = "mcp-server-langgraph",
384 ) -> float:
385 """
386 Calculate error rate (errors per second / total requests per second).
388 Args:
389 timerange: Time range
390 service: Service name
392 Returns:
393 Error rate as percentage (0-100)
395 Resolves: monitoring/sla.py:300
396 """
397 # Query error requests
398 error_promql = f'rate(http_requests_total{{job="{service}", status=~"5.."}}[{timerange}])'
400 # Query total requests
401 total_promql = f'rate(http_requests_total{{job="{service}"}}[{timerange}])'
403 try:
404 error_results = await self.query(error_promql)
405 total_results = await self.query(total_promql)
407 error_rate_value = 0.0
408 total_rate_value = 0.0
410 if error_results and error_results[0].values:
411 error_rate_value = error_results[0].get_latest_value() # type: ignore[assignment]
413 if total_results and total_results[0].values:
414 total_rate_value = total_results[0].get_latest_value() # type: ignore[assignment]
416 error_pct = error_rate_value / total_rate_value * 100 if total_rate_value > 0 else 0.0
418 logger.info(f"Error rate: {error_pct:.2f}% over {timerange}", extra={"service": service})
420 return error_pct
422 except Exception as e:
423 logger.error(f"Failed to query error rate: {e}", exc_info=True)
424 return 0.0
426 async def query_request_rate(
427 self,
428 timerange: str = "5m",
429 service: str = "mcp-server-langgraph",
430 ) -> float:
431 """
432 Query requests per second.
434 Args:
435 timerange: Time range
436 service: Service name
438 Returns:
439 Requests per second
440 """
441 promql = f'rate(http_requests_total{{job="{service}"}}[{timerange}])'
443 try:
444 results = await self.query(promql)
446 if results and results[0].values:
447 rps = results[0].get_latest_value()
448 logger.info(f"Request rate: {rps:.2f} req/s", extra={"service": service})
449 return rps # type: ignore[return-value]
451 return 0.0
453 except Exception as e:
454 logger.error(f"Failed to query request rate: {e}", exc_info=True)
455 return 0.0
457 async def query_custom(
458 self,
459 promql: str,
460 timerange: str | None = None,
461 ) -> list[QueryResult]:
462 """
463 Execute custom PromQL query.
465 Args:
466 promql: Custom PromQL query
467 timerange: Optional time range for rate/increase queries
469 Returns:
470 Query results
471 """
472 if timerange and "[" not in promql:
473 # Auto-inject timerange into rate/increase functions
474 promql = promql.replace("rate(", "rate(").replace(")", "[{timerange}])")
476 return await self.query(promql)
478 async def get_sla_metrics(
479 self,
480 service: str = "mcp-server-langgraph",
481 timerange: str = "30d",
482 ) -> dict[str, Any]:
483 """
484 Get comprehensive SLA metrics.
486 Returns:
487 Dictionary with uptime, downtime, error_rate, response_times
488 """
489 uptime_pct = await self.query_uptime(service=service, timerange=timerange)
490 downtime_sec = await self.query_downtime(service=service, timerange=timerange)
491 error_rate = await self.query_error_rate(service=service, timerange="5m")
492 response_times = await self.query_percentiles(
493 metric="http_request_duration_seconds",
494 percentiles=[50, 95, 99],
495 timerange="1h",
496 )
498 return {
499 "uptime_percentage": uptime_pct,
500 "downtime_seconds": downtime_sec,
501 "error_rate_percentage": error_rate,
502 "response_times": {
503 "p50_seconds": response_times.get(50, 0),
504 "p95_seconds": response_times.get(95, 0),
505 "p99_seconds": response_times.get(99, 0),
506 },
507 "timerange": timerange,
508 "service": service,
509 }
512# Global Prometheus client instance
513_prometheus_client: PrometheusClient | None = None
516async def get_prometheus_client() -> PrometheusClient:
517 """Get or create global Prometheus client instance"""
518 global _prometheus_client
520 if _prometheus_client is None:
521 _prometheus_client = PrometheusClient()
522 await _prometheus_client.initialize()
524 return _prometheus_client