Coverage for src / mcp_server_langgraph / schedulers / cleanup.py: 93%

92 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Data Cleanup Scheduler 

3 

4Scheduled background jobs for data retention policy enforcement. 

5Runs daily at configured time (default: 3 AM UTC). 

6""" 

7 

8from datetime import datetime, UTC 

9from typing import Any 

10 

11from apscheduler.schedulers.asyncio import AsyncIOScheduler 

12from apscheduler.triggers.cron import CronTrigger 

13 

14from mcp_server_langgraph.auth.session import SessionStore 

15from mcp_server_langgraph.compliance.retention import DataRetentionService 

16from mcp_server_langgraph.integrations.alerting import Alert, AlertCategory, AlertingService, AlertSeverity 

17from mcp_server_langgraph.observability.telemetry import logger 

18 

19 

20class CleanupScheduler: 

21 """ 

22 Scheduler for automated data retention cleanup 

23 

24 Features: 

25 - Daily execution at configured time 

26 - Graceful error handling 

27 - Metrics tracking 

28 - Dry-run support for testing 

29 """ 

30 

31 def __init__( 

32 self, 

33 session_store: SessionStore | None = None, 

34 config_path: str = "config/retention_policies.yaml", 

35 dry_run: bool = False, 

36 ): 

37 """ 

38 Initialize cleanup scheduler 

39 

40 Args: 

41 session_store: Session storage backend 

42 config_path: Path to retention policies configuration 

43 dry_run: If True, simulate cleanups without actually deleting 

44 """ 

45 self.session_store = session_store 

46 self.config_path = config_path 

47 self.dry_run = dry_run 

48 self.scheduler = AsyncIOScheduler() 

49 self.retention_service: DataRetentionService | None = None 

50 

51 async def start(self) -> None: 

52 """ 

53 Start the cleanup scheduler 

54 

55 Loads configuration and schedules daily cleanup job. 

56 """ 

57 logger.info( 

58 "Starting data retention cleanup scheduler", 

59 extra={"config_path": self.config_path, "dry_run": self.dry_run}, 

60 ) 

61 

62 # Initialize retention service 

63 self.retention_service = DataRetentionService( 

64 config_path=self.config_path, 

65 session_store=self.session_store, 

66 dry_run=self.dry_run, 

67 ) 

68 

69 # Get cleanup schedule from configuration 

70 config = self.retention_service.config 

71 schedule = config.get("global", {}).get("cleanup_schedule", "0 3 * * *") 

72 

73 # Parse cron expression (default: 0 3 * * * = daily at 3 AM) 

74 parts = schedule.split() 

75 if len(parts) == 5: 

76 minute, hour, day, month, day_of_week = parts 

77 

78 # Create cron trigger 

79 trigger = CronTrigger( 

80 minute=minute, 

81 hour=hour, 

82 day=day, 

83 month=month, 

84 day_of_week=day_of_week, 

85 timezone="UTC", 

86 ) 

87 

88 # Add scheduled job 

89 self.scheduler.add_job( 

90 self._run_cleanup, 

91 trigger=trigger, 

92 id="data_retention_cleanup", 

93 name="Data Retention Cleanup", 

94 replace_existing=True, 

95 max_instances=1, # Prevent overlapping runs 

96 ) 

97 

98 logger.info( 

99 "Scheduled data retention cleanup", 

100 extra={"schedule": schedule, "next_run": self._get_next_run_time()}, 

101 ) 

102 

103 else: 

104 logger.error(f"Invalid cron schedule: {schedule}", extra={"schedule": schedule}) 

105 

106 # Start scheduler 

107 self.scheduler.start() 

108 logger.info("Cleanup scheduler started") 

109 

110 async def stop(self) -> None: 

111 """Stop the cleanup scheduler""" 

112 logger.info("Stopping data retention cleanup scheduler") 

113 self.scheduler.shutdown(wait=True) 

114 logger.info("Cleanup scheduler stopped") 

115 

116 async def _run_cleanup(self) -> None: 

117 """ 

118 Execute data retention cleanup 

119 

120 This is the main scheduled job that runs all retention policies. 

121 """ 

122 logger.info( 

123 "Executing scheduled data retention cleanup", 

124 extra={"timestamp": datetime.now(UTC).isoformat(), "dry_run": self.dry_run}, 

125 ) 

126 

127 try: 

128 if not self.retention_service: 

129 logger.error("Retention service not initialized") 

130 return 

131 

132 # Run all cleanup policies 

133 results = await self.retention_service.run_all_cleanups() 

134 

135 # Log summary 

136 total_deleted = sum(r.deleted_count for r in results) 

137 total_archived = sum(r.archived_count for r in results) 

138 total_errors = sum(len(r.errors) for r in results) 

139 

140 log_level = "error" if total_errors > 0 else "info" 

141 log_method = getattr(logger, log_level) 

142 

143 log_method( 

144 "Data retention cleanup completed", 

145 extra={ 

146 "total_deleted": total_deleted, 

147 "total_archived": total_archived, 

148 "total_errors": total_errors, 

149 "policies_executed": len(results), 

150 "dry_run": self.dry_run, 

151 }, 

152 ) 

153 

154 # Send notification if configured 

155 if self.retention_service.config.get("notifications", {}).get("enabled", True): 

156 await self._send_cleanup_notification(results) 

157 

158 except Exception as e: 

159 logger.error(f"Data retention cleanup failed: {e}", exc_info=True) 

160 

161 async def _send_cleanup_notification(self, results: list[Any]) -> None: 

162 """ 

163 Send notification about cleanup execution 

164 

165 Args: 

166 results: List of RetentionResults 

167 """ 

168 total_deleted = sum(r.deleted_count for r in results) 

169 

170 logger.info( 

171 "Cleanup notification", 

172 extra={ 

173 "results_count": len(results), 

174 "total_deleted": total_deleted, 

175 }, 

176 ) 

177 

178 # Send notification to operations team 

179 try: 

180 alerting_service = AlertingService() 

181 await alerting_service.initialize() 

182 

183 # Determine severity based on deleted count 

184 if total_deleted > 1000: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true

185 severity = AlertSeverity.WARNING # type: ignore[attr-defined] 

186 title = "Large Data Cleanup Executed" 

187 else: 

188 severity = AlertSeverity.INFO 

189 title = "Data Cleanup Completed" 

190 

191 alert = Alert( 

192 title=title, 

193 description=f"Data retention cleanup processed {len(results)} data types, deleted {total_deleted} records", 

194 severity=severity, 

195 category=AlertCategory.INFRASTRUCTURE, 

196 source="cleanup_scheduler", 

197 metadata={ 

198 "results_count": len(results), 

199 "total_deleted": total_deleted, 

200 "data_types": [r.data_type for r in results], 

201 }, 

202 ) 

203 

204 await alerting_service.send_alert(alert) 

205 logger.info("Cleanup notification sent", extra={"alert_id": alert.alert_id}) 

206 

207 except Exception as e: 

208 logger.error(f"Failed to send cleanup notification: {e}", exc_info=True) 

209 

210 def _get_next_run_time(self) -> str: 

211 """Get next scheduled run time""" 

212 try: 

213 job = self.scheduler.get_job("data_retention_cleanup") 

214 if job and hasattr(job, "next_run_time") and job.next_run_time: 

215 return job.next_run_time.isoformat() # type: ignore[no-any-return] 

216 except Exception: 

217 pass 

218 return "Not scheduled" 

219 

220 async def run_now(self) -> None: 

221 """ 

222 Manually trigger cleanup immediately (for testing/admin) 

223 

224 Returns: 

225 List of RetentionResults 

226 """ 

227 logger.info("Manual cleanup triggered") 

228 await self._run_cleanup() 

229 

230 

231# Global scheduler instance 

232_cleanup_scheduler: CleanupScheduler | None = None 

233 

234 

235async def start_cleanup_scheduler( # type: ignore[no-untyped-def] 

236 session_store: SessionStore | None = None, 

237 config_path: str = "config/retention_policies.yaml", 

238 dry_run: bool = False, 

239): 

240 """ 

241 Start the global cleanup scheduler 

242 

243 Args: 

244 session_store: Session storage backend 

245 config_path: Path to retention policies 

246 dry_run: Test mode without actual deletions 

247 

248 Example: 

249 # At application startup 

250 await start_cleanup_scheduler( 

251 session_store=session_store, 

252 dry_run=False 

253 ) 

254 """ 

255 global _cleanup_scheduler 

256 

257 if _cleanup_scheduler is not None: 

258 logger.warning("Cleanup scheduler already running") 

259 return 

260 

261 _cleanup_scheduler = CleanupScheduler( 

262 session_store=session_store, 

263 config_path=config_path, 

264 dry_run=dry_run, 

265 ) 

266 

267 await _cleanup_scheduler.start() 

268 

269 

270async def stop_cleanup_scheduler() -> None: 

271 """Stop the global cleanup scheduler""" 

272 global _cleanup_scheduler 

273 

274 if _cleanup_scheduler is None: 

275 logger.warning("Cleanup scheduler not running") 

276 return 

277 

278 await _cleanup_scheduler.stop() 

279 _cleanup_scheduler = None 

280 

281 

282def get_cleanup_scheduler() -> CleanupScheduler | None: 

283 """Get the global cleanup scheduler instance""" 

284 return _cleanup_scheduler