Coverage for src / mcp_server_langgraph / monitoring / prometheus_client.py: 86%

202 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Prometheus client service for querying metrics. 

3 

4Provides a high-level interface for querying Prometheus metrics with: 

5- Uptime/downtime calculations 

6- Response time percentiles (p50, p95, p99) 

7- Error rate calculations 

8- SLA compliance metrics 

9- Custom PromQL queries 

10- Time-range aggregations 

11 

12Resolves production TODOs: 

13- monitoring/sla.py:157 - Query actual downtime 

14- monitoring/sla.py:235 - Query actual response times 

15- monitoring/sla.py:300 - Query actual error rate 

16- core/compliance/evidence.py:419 - Uptime data for SOC2 evidence 

17""" 

18 

19import logging 

20import ssl 

21from dataclasses import dataclass 

22from datetime import datetime 

23from typing import Any 

24 

25import certifi 

26import httpx 

27from pydantic import BaseModel, Field 

28 

29from mcp_server_langgraph.core.config import settings 

30 

31logger = logging.getLogger(__name__) 

32 

33 

34class PrometheusConfig(BaseModel): 

35 """Prometheus client configuration""" 

36 

37 url: str = Field(default="http://prometheus:9090", description="Prometheus server URL") 

38 timeout: int = Field(default=30, description="Query timeout in seconds") 

39 retry_attempts: int = Field(default=3, description="Number of retry attempts") 

40 retry_backoff: float = Field(default=1.0, description="Retry backoff multiplier") 

41 

42 

43@dataclass 

44class MetricValue: 

45 """Single metric value with timestamp""" 

46 

47 timestamp: datetime 

48 value: float 

49 

50 @classmethod 

51 def from_prometheus(cls, data: list[float | str]) -> "MetricValue": 

52 """Parse from Prometheus response format: [timestamp, value]""" 

53 return cls(timestamp=datetime.fromtimestamp(data[0]), value=float(data[1])) # type: ignore[arg-type] 

54 

55 

56@dataclass 

57class QueryResult: 

58 """Result of a Prometheus query""" 

59 

60 metric: dict[str, str] # Label key-value pairs 

61 values: list[MetricValue] 

62 

63 def get_latest_value(self) -> float | None: 

64 """Get the most recent value""" 

65 return self.values[-1].value if self.values else None 

66 

67 def get_average(self) -> float | None: 

68 """Calculate average across all values""" 

69 if not self.values: 

70 return None 

71 return sum(v.value for v in self.values) / len(self.values) 

72 

73 

74class PrometheusClient: 

75 """ 

76 High-level Prometheus query client. 

77 

78 Usage: 

79 client = PrometheusClient() 

80 await client.initialize() 

81 

82 # Query uptime 

83 uptime_pct = await client.query_uptime(service="mcp-server", timerange="30d") 

84 

85 # Query percentiles 

86 response_times = await client.query_percentiles( 

87 metric="http_request_duration_seconds", 

88 percentiles=[50, 95, 99], 

89 timerange="1h" 

90 ) 

91 

92 # Query error rate 

93 error_rate = await client.query_error_rate(timerange="5m") 

94 

95 await client.close() 

96 """ 

97 

98 def __init__(self, config: PrometheusConfig | None = None) -> None: 

99 self.config = config or self._load_config_from_settings() 

100 self.client: httpx.AsyncClient | None = None 

101 self._initialized = False 

102 

103 def _load_config_from_settings(self) -> PrometheusConfig: 

104 """Load configuration from application settings""" 

105 return PrometheusConfig( 

106 url=getattr(settings, "prometheus_url", "http://prometheus:9090"), 

107 timeout=getattr(settings, "prometheus_timeout", 30), 

108 retry_attempts=getattr(settings, "prometheus_retry_attempts", 3), 

109 ) 

110 

111 async def initialize(self) -> None: 

112 """Initialize HTTP client with cross-platform SSL support.""" 

113 if self._initialized: 

114 return 

115 

116 # Use certifi CA bundle for cross-platform SSL verification 

117 # CI environments (especially isolated Python builds) may lack system certs 

118 ssl_context = ssl.create_default_context() 

119 try: 

120 ssl_context.load_verify_locations(certifi.where()) 

121 except FileNotFoundError: 

122 logger.warning("certifi CA bundle not found, falling back to system certs") 

123 

124 self.client = httpx.AsyncClient( 

125 timeout=self.config.timeout, 

126 follow_redirects=True, 

127 verify=ssl_context, 

128 ) 

129 

130 self._initialized = True 

131 logger.info(f"Prometheus client initialized: {self.config.url}") 

132 

133 async def close(self) -> None: 

134 """Close HTTP client""" 

135 if self.client: 135 ↛ 137line 135 didn't jump to line 137 because the condition on line 135 was always true

136 await self.client.aclose() 

137 logger.info("Prometheus client closed") 

138 

139 async def query(self, promql: str, time: datetime | None = None) -> list[QueryResult]: 

140 """ 

141 Execute instant query. 

142 

143 Args: 

144 promql: PromQL query string 

145 time: Optional evaluation timestamp (defaults to now) 

146 

147 Returns: 

148 List of query results 

149 """ 

150 if not self._initialized: 

151 await self.initialize() 

152 

153 params = {"query": promql} 

154 if time: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 params["time"] = time.timestamp() # type: ignore[assignment] 

156 

157 url = f"{self.config.url}/api/v1/query" 

158 

159 try: 

160 response = await self.client.get(url, params=params) # type: ignore[union-attr] 

161 response.raise_for_status() 

162 

163 data = response.json() 

164 

165 if data.get("status") != "success": 

166 msg = f"Prometheus query failed: {data.get('error', 'Unknown error')}" 

167 raise ValueError(msg) 

168 

169 return self._parse_query_result(data["data"]["result"]) 

170 

171 except Exception as e: 

172 logger.error(f"Prometheus query failed: {e}", exc_info=True, extra={"query": promql}) 

173 raise 

174 

175 async def query_range( 

176 self, 

177 promql: str, 

178 start: datetime, 

179 end: datetime, 

180 step: str = "1m", 

181 ) -> list[QueryResult]: 

182 """ 

183 Execute range query. 

184 

185 Args: 

186 promql: PromQL query string 

187 start: Range start time 

188 end: Range end time 

189 step: Query resolution step (e.g., "1m", "5m", "1h") 

190 

191 Returns: 

192 List of query results with time series 

193 """ 

194 if not self._initialized: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true

195 await self.initialize() 

196 

197 params: dict[str, str | float] = { 

198 "query": promql, 

199 "start": start.timestamp(), 

200 "end": end.timestamp(), 

201 "step": step, 

202 } 

203 

204 url = f"{self.config.url}/api/v1/query_range" 

205 

206 try: 

207 if self.client is not None: 

208 response = await self.client.get(url, params=params) 

209 response.raise_for_status() 

210 else: 

211 msg = "Prometheus client not initialized" 

212 raise ValueError(msg) 

213 

214 data = response.json() 

215 

216 if data.get("status") != "success": 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 msg = f"Prometheus range query failed: {data.get('error', 'Unknown error')}" 

218 raise ValueError(msg) 

219 

220 return self._parse_range_result(data["data"]["result"]) 

221 

222 except Exception as e: 

223 logger.error(f"Prometheus range query failed: {e}", exc_info=True, extra={"query": promql}) 

224 raise 

225 

226 def _parse_query_result(self, result: list[dict]) -> list[QueryResult]: # type: ignore[type-arg] 

227 """Parse instant query result""" 

228 parsed = [] 

229 for item in result: 

230 metric = item.get("metric", {}) 

231 value_data = item.get("value", []) 

232 

233 if value_data: 233 ↛ 229line 233 didn't jump to line 229 because the condition on line 233 was always true

234 values = [MetricValue.from_prometheus(value_data)] 

235 parsed.append(QueryResult(metric=metric, values=values)) 

236 

237 return parsed 

238 

239 def _parse_range_result(self, result: list[dict]) -> list[QueryResult]: # type: ignore[type-arg] 

240 """Parse range query result""" 

241 parsed = [] 

242 for item in result: 

243 metric = item.get("metric", {}) 

244 values_data = item.get("values", []) 

245 

246 values = [MetricValue.from_prometheus(v) for v in values_data] 

247 parsed.append(QueryResult(metric=metric, values=values)) 

248 

249 return parsed 

250 

251 async def query_uptime( 

252 self, 

253 service: str = "mcp-server-langgraph", 

254 timerange: str = "30d", 

255 ) -> float: 

256 """ 

257 Calculate service uptime percentage. 

258 

259 Args: 

260 service: Service name (label filter) 

261 timerange: Time range (e.g., "1h", "24h", "30d") 

262 

263 Returns: 

264 Uptime percentage (0-100) 

265 

266 Resolves: monitoring/sla.py:157, core/compliance/evidence.py:419 

267 """ 

268 # Query: (total time - downtime) / total time * 100 

269 # Assuming 'up' metric where 0 = down, 1 = up 

270 

271 promql = f'avg_over_time(up{{job="{service}"}}[{timerange}]) * 100' 

272 

273 try: 

274 results = await self.query(promql) 

275 

276 if results and results[0].values: 

277 uptime_pct = results[0].get_latest_value() 

278 logger.info(f"Uptime queried: {uptime_pct:.2f}% over {timerange}", extra={"service": service}) 

279 return uptime_pct # type: ignore[return-value] 

280 

281 # Fallback: assume 100% if no data 

282 logger.warning(f"No uptime data found for {service}, assuming 100%") 

283 return 100.0 

284 

285 except Exception as e: 

286 logger.error(f"Failed to query uptime: {e}", exc_info=True) 

287 # Return conservative estimate 

288 return 99.0 

289 

290 async def query_downtime( 

291 self, 

292 service: str = "mcp-server-langgraph", 

293 timerange: str = "30d", 

294 ) -> float: 

295 """ 

296 Calculate total downtime in seconds. 

297 

298 Args: 

299 service: Service name 

300 timerange: Time range 

301 

302 Returns: 

303 Total downtime in seconds 

304 

305 Resolves: monitoring/sla.py:157 

306 """ 

307 uptime_pct = await self.query_uptime(service=service, timerange=timerange) 

308 

309 # Calculate total time in seconds 

310 if timerange.endswith("d"): 

311 total_seconds = int(timerange[:-1]) * 86400 

312 elif timerange.endswith("h"): 

313 total_seconds = int(timerange[:-1]) * 3600 

314 elif timerange.endswith("m"): 

315 total_seconds = int(timerange[:-1]) * 60 

316 else: 

317 msg = f"Unsupported timerange format: {timerange}" 

318 raise ValueError(msg) 

319 

320 downtime_seconds = total_seconds * (100 - uptime_pct) / 100 

321 

322 logger.info(f"Downtime calculated: {downtime_seconds:.2f}s over {timerange}", extra={"service": service}) 

323 

324 return downtime_seconds 

325 

326 async def query_percentiles( 

327 self, 

328 metric: str, 

329 percentiles: list[int] | None = None, 

330 timerange: str = "1h", 

331 label_filters: dict[str, str] | None = None, 

332 ) -> dict[int, float]: 

333 """ 

334 Query metric percentiles (p50, p95, p99, etc.). 

335 

336 Args: 

337 metric: Metric name (e.g., "http_request_duration_seconds") 

338 percentiles: List of percentiles to query (default: [50, 95, 99]) 

339 timerange: Time range 

340 label_filters: Additional label filters 

341 

342 Returns: 

343 Dictionary mapping percentile to value 

344 

345 Resolves: monitoring/sla.py:235 

346 """ 

347 if percentiles is None: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 percentiles = [0.5, 0.95, 0.99] # type: ignore[list-item] 

349 

350 # Build label filter string 

351 label_str = "" 

352 if label_filters: 352 ↛ 353line 352 didn't jump to line 353 because the condition on line 352 was never true

353 label_parts = [f'{k}="{v}"' for k, v in label_filters.items()] 

354 label_str = "{" + ", ".join(label_parts) + "}" 

355 

356 results = {} 

357 

358 for p in percentiles: 

359 # Query using histogram_quantile 

360 quantile = p / 100.0 

361 promql = f"histogram_quantile({quantile}, rate({metric}_bucket{label_str}[{timerange}]))" 

362 

363 try: 

364 query_results = await self.query(promql) 

365 

366 if query_results and query_results[0].values: 366 ↛ 370line 366 didn't jump to line 370 because the condition on line 366 was always true

367 value = query_results[0].get_latest_value() 

368 results[p] = value 

369 else: 

370 results[p] = 0.0 

371 

372 except Exception as e: 

373 logger.error(f"Failed to query p{p}: {e}", exc_info=True) 

374 results[p] = 0.0 

375 

376 logger.info(f"Percentiles queried: {results}", extra={"metric": metric, "timerange": timerange}) 

377 

378 return results # type: ignore[return-value] 

379 

380 async def query_error_rate( 

381 self, 

382 timerange: str = "5m", 

383 service: str = "mcp-server-langgraph", 

384 ) -> float: 

385 """ 

386 Calculate error rate (errors per second / total requests per second). 

387 

388 Args: 

389 timerange: Time range 

390 service: Service name 

391 

392 Returns: 

393 Error rate as percentage (0-100) 

394 

395 Resolves: monitoring/sla.py:300 

396 """ 

397 # Query error requests 

398 error_promql = f'rate(http_requests_total{{job="{service}", status=~"5.."}}[{timerange}])' 

399 

400 # Query total requests 

401 total_promql = f'rate(http_requests_total{{job="{service}"}}[{timerange}])' 

402 

403 try: 

404 error_results = await self.query(error_promql) 

405 total_results = await self.query(total_promql) 

406 

407 error_rate_value = 0.0 

408 total_rate_value = 0.0 

409 

410 if error_results and error_results[0].values: 

411 error_rate_value = error_results[0].get_latest_value() # type: ignore[assignment] 

412 

413 if total_results and total_results[0].values: 

414 total_rate_value = total_results[0].get_latest_value() # type: ignore[assignment] 

415 

416 error_pct = error_rate_value / total_rate_value * 100 if total_rate_value > 0 else 0.0 

417 

418 logger.info(f"Error rate: {error_pct:.2f}% over {timerange}", extra={"service": service}) 

419 

420 return error_pct 

421 

422 except Exception as e: 

423 logger.error(f"Failed to query error rate: {e}", exc_info=True) 

424 return 0.0 

425 

426 async def query_request_rate( 

427 self, 

428 timerange: str = "5m", 

429 service: str = "mcp-server-langgraph", 

430 ) -> float: 

431 """ 

432 Query requests per second. 

433 

434 Args: 

435 timerange: Time range 

436 service: Service name 

437 

438 Returns: 

439 Requests per second 

440 """ 

441 promql = f'rate(http_requests_total{{job="{service}"}}[{timerange}])' 

442 

443 try: 

444 results = await self.query(promql) 

445 

446 if results and results[0].values: 

447 rps = results[0].get_latest_value() 

448 logger.info(f"Request rate: {rps:.2f} req/s", extra={"service": service}) 

449 return rps # type: ignore[return-value] 

450 

451 return 0.0 

452 

453 except Exception as e: 

454 logger.error(f"Failed to query request rate: {e}", exc_info=True) 

455 return 0.0 

456 

457 async def query_custom( 

458 self, 

459 promql: str, 

460 timerange: str | None = None, 

461 ) -> list[QueryResult]: 

462 """ 

463 Execute custom PromQL query. 

464 

465 Args: 

466 promql: Custom PromQL query 

467 timerange: Optional time range for rate/increase queries 

468 

469 Returns: 

470 Query results 

471 """ 

472 if timerange and "[" not in promql: 

473 # Auto-inject timerange into rate/increase functions 

474 promql = promql.replace("rate(", "rate(").replace(")", "[{timerange}])") 

475 

476 return await self.query(promql) 

477 

478 async def get_sla_metrics( 

479 self, 

480 service: str = "mcp-server-langgraph", 

481 timerange: str = "30d", 

482 ) -> dict[str, Any]: 

483 """ 

484 Get comprehensive SLA metrics. 

485 

486 Returns: 

487 Dictionary with uptime, downtime, error_rate, response_times 

488 """ 

489 uptime_pct = await self.query_uptime(service=service, timerange=timerange) 

490 downtime_sec = await self.query_downtime(service=service, timerange=timerange) 

491 error_rate = await self.query_error_rate(service=service, timerange="5m") 

492 response_times = await self.query_percentiles( 

493 metric="http_request_duration_seconds", 

494 percentiles=[50, 95, 99], 

495 timerange="1h", 

496 ) 

497 

498 return { 

499 "uptime_percentage": uptime_pct, 

500 "downtime_seconds": downtime_sec, 

501 "error_rate_percentage": error_rate, 

502 "response_times": { 

503 "p50_seconds": response_times.get(50, 0), 

504 "p95_seconds": response_times.get(95, 0), 

505 "p99_seconds": response_times.get(99, 0), 

506 }, 

507 "timerange": timerange, 

508 "service": service, 

509 } 

510 

511 

512# Global Prometheus client instance 

513_prometheus_client: PrometheusClient | None = None 

514 

515 

516async def get_prometheus_client() -> PrometheusClient: 

517 """Get or create global Prometheus client instance""" 

518 global _prometheus_client 

519 

520 if _prometheus_client is None: 

521 _prometheus_client = PrometheusClient() 

522 await _prometheus_client.initialize() 

523 

524 return _prometheus_client