Coverage for src / mcp_server_langgraph / schedulers / cleanup.py: 93%
92 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Data Cleanup Scheduler
4Scheduled background jobs for data retention policy enforcement.
5Runs daily at configured time (default: 3 AM UTC).
6"""
8from datetime import datetime, UTC
9from typing import Any
11from apscheduler.schedulers.asyncio import AsyncIOScheduler
12from apscheduler.triggers.cron import CronTrigger
14from mcp_server_langgraph.auth.session import SessionStore
15from mcp_server_langgraph.compliance.retention import DataRetentionService
16from mcp_server_langgraph.integrations.alerting import Alert, AlertCategory, AlertingService, AlertSeverity
17from mcp_server_langgraph.observability.telemetry import logger
20class CleanupScheduler:
21 """
22 Scheduler for automated data retention cleanup
24 Features:
25 - Daily execution at configured time
26 - Graceful error handling
27 - Metrics tracking
28 - Dry-run support for testing
29 """
31 def __init__(
32 self,
33 session_store: SessionStore | None = None,
34 config_path: str = "config/retention_policies.yaml",
35 dry_run: bool = False,
36 ):
37 """
38 Initialize cleanup scheduler
40 Args:
41 session_store: Session storage backend
42 config_path: Path to retention policies configuration
43 dry_run: If True, simulate cleanups without actually deleting
44 """
45 self.session_store = session_store
46 self.config_path = config_path
47 self.dry_run = dry_run
48 self.scheduler = AsyncIOScheduler()
49 self.retention_service: DataRetentionService | None = None
51 async def start(self) -> None:
52 """
53 Start the cleanup scheduler
55 Loads configuration and schedules daily cleanup job.
56 """
57 logger.info(
58 "Starting data retention cleanup scheduler",
59 extra={"config_path": self.config_path, "dry_run": self.dry_run},
60 )
62 # Initialize retention service
63 self.retention_service = DataRetentionService(
64 config_path=self.config_path,
65 session_store=self.session_store,
66 dry_run=self.dry_run,
67 )
69 # Get cleanup schedule from configuration
70 config = self.retention_service.config
71 schedule = config.get("global", {}).get("cleanup_schedule", "0 3 * * *")
73 # Parse cron expression (default: 0 3 * * * = daily at 3 AM)
74 parts = schedule.split()
75 if len(parts) == 5:
76 minute, hour, day, month, day_of_week = parts
78 # Create cron trigger
79 trigger = CronTrigger(
80 minute=minute,
81 hour=hour,
82 day=day,
83 month=month,
84 day_of_week=day_of_week,
85 timezone="UTC",
86 )
88 # Add scheduled job
89 self.scheduler.add_job(
90 self._run_cleanup,
91 trigger=trigger,
92 id="data_retention_cleanup",
93 name="Data Retention Cleanup",
94 replace_existing=True,
95 max_instances=1, # Prevent overlapping runs
96 )
98 logger.info(
99 "Scheduled data retention cleanup",
100 extra={"schedule": schedule, "next_run": self._get_next_run_time()},
101 )
103 else:
104 logger.error(f"Invalid cron schedule: {schedule}", extra={"schedule": schedule})
106 # Start scheduler
107 self.scheduler.start()
108 logger.info("Cleanup scheduler started")
110 async def stop(self) -> None:
111 """Stop the cleanup scheduler"""
112 logger.info("Stopping data retention cleanup scheduler")
113 self.scheduler.shutdown(wait=True)
114 logger.info("Cleanup scheduler stopped")
116 async def _run_cleanup(self) -> None:
117 """
118 Execute data retention cleanup
120 This is the main scheduled job that runs all retention policies.
121 """
122 logger.info(
123 "Executing scheduled data retention cleanup",
124 extra={"timestamp": datetime.now(UTC).isoformat(), "dry_run": self.dry_run},
125 )
127 try:
128 if not self.retention_service:
129 logger.error("Retention service not initialized")
130 return
132 # Run all cleanup policies
133 results = await self.retention_service.run_all_cleanups()
135 # Log summary
136 total_deleted = sum(r.deleted_count for r in results)
137 total_archived = sum(r.archived_count for r in results)
138 total_errors = sum(len(r.errors) for r in results)
140 log_level = "error" if total_errors > 0 else "info"
141 log_method = getattr(logger, log_level)
143 log_method(
144 "Data retention cleanup completed",
145 extra={
146 "total_deleted": total_deleted,
147 "total_archived": total_archived,
148 "total_errors": total_errors,
149 "policies_executed": len(results),
150 "dry_run": self.dry_run,
151 },
152 )
154 # Send notification if configured
155 if self.retention_service.config.get("notifications", {}).get("enabled", True):
156 await self._send_cleanup_notification(results)
158 except Exception as e:
159 logger.error(f"Data retention cleanup failed: {e}", exc_info=True)
161 async def _send_cleanup_notification(self, results: list[Any]) -> None:
162 """
163 Send notification about cleanup execution
165 Args:
166 results: List of RetentionResults
167 """
168 total_deleted = sum(r.deleted_count for r in results)
170 logger.info(
171 "Cleanup notification",
172 extra={
173 "results_count": len(results),
174 "total_deleted": total_deleted,
175 },
176 )
178 # Send notification to operations team
179 try:
180 alerting_service = AlertingService()
181 await alerting_service.initialize()
183 # Determine severity based on deleted count
184 if total_deleted > 1000: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true
185 severity = AlertSeverity.WARNING # type: ignore[attr-defined]
186 title = "Large Data Cleanup Executed"
187 else:
188 severity = AlertSeverity.INFO
189 title = "Data Cleanup Completed"
191 alert = Alert(
192 title=title,
193 description=f"Data retention cleanup processed {len(results)} data types, deleted {total_deleted} records",
194 severity=severity,
195 category=AlertCategory.INFRASTRUCTURE,
196 source="cleanup_scheduler",
197 metadata={
198 "results_count": len(results),
199 "total_deleted": total_deleted,
200 "data_types": [r.data_type for r in results],
201 },
202 )
204 await alerting_service.send_alert(alert)
205 logger.info("Cleanup notification sent", extra={"alert_id": alert.alert_id})
207 except Exception as e:
208 logger.error(f"Failed to send cleanup notification: {e}", exc_info=True)
210 def _get_next_run_time(self) -> str:
211 """Get next scheduled run time"""
212 try:
213 job = self.scheduler.get_job("data_retention_cleanup")
214 if job and hasattr(job, "next_run_time") and job.next_run_time:
215 return job.next_run_time.isoformat() # type: ignore[no-any-return]
216 except Exception:
217 pass
218 return "Not scheduled"
220 async def run_now(self) -> None:
221 """
222 Manually trigger cleanup immediately (for testing/admin)
224 Returns:
225 List of RetentionResults
226 """
227 logger.info("Manual cleanup triggered")
228 await self._run_cleanup()
231# Global scheduler instance
232_cleanup_scheduler: CleanupScheduler | None = None
235async def start_cleanup_scheduler( # type: ignore[no-untyped-def]
236 session_store: SessionStore | None = None,
237 config_path: str = "config/retention_policies.yaml",
238 dry_run: bool = False,
239):
240 """
241 Start the global cleanup scheduler
243 Args:
244 session_store: Session storage backend
245 config_path: Path to retention policies
246 dry_run: Test mode without actual deletions
248 Example:
249 # At application startup
250 await start_cleanup_scheduler(
251 session_store=session_store,
252 dry_run=False
253 )
254 """
255 global _cleanup_scheduler
257 if _cleanup_scheduler is not None:
258 logger.warning("Cleanup scheduler already running")
259 return
261 _cleanup_scheduler = CleanupScheduler(
262 session_store=session_store,
263 config_path=config_path,
264 dry_run=dry_run,
265 )
267 await _cleanup_scheduler.start()
270async def stop_cleanup_scheduler() -> None:
271 """Stop the global cleanup scheduler"""
272 global _cleanup_scheduler
274 if _cleanup_scheduler is None:
275 logger.warning("Cleanup scheduler not running")
276 return
278 await _cleanup_scheduler.stop()
279 _cleanup_scheduler = None
282def get_cleanup_scheduler() -> CleanupScheduler | None:
283 """Get the global cleanup scheduler instance"""
284 return _cleanup_scheduler