Coverage for src / mcp_server_langgraph / infrastructure / database.py: 41%
37 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Database Connection Infrastructure
4Provides reusable database connection functionality with retry logic.
5Separates I/O concerns from business logic for better testability.
6"""
8import asyncio
9from typing import Protocol
11import asyncpg
13from mcp_server_langgraph.utils.retry import retry_with_backoff
16class DatabaseConfig(Protocol):
17 """Protocol for database configuration"""
19 gdpr_postgres_url: str
22async def check_database_connectivity(postgres_url: str, timeout: float = 5.0) -> tuple[bool, str]:
23 """
24 Check if PostgreSQL database is accessible.
26 Args:
27 postgres_url: PostgreSQL connection URL
28 timeout: Connection timeout in seconds (default: 5.0)
30 Returns:
31 Tuple of (is_healthy, message)
33 Example:
34 >>> is_healthy, message = await check_database_connectivity(
35 ... "postgresql://user:pass@localhost:5432/db"
36 ... )
37 >>> if is_healthy:
38 ... print("Database is accessible")
39 """
40 try:
41 # Try to connect to PostgreSQL (with timeout)
42 try:
43 conn = await asyncio.wait_for(asyncpg.connect(postgres_url), timeout=timeout)
44 await conn.close()
45 return True, "PostgreSQL database accessible"
46 except TimeoutError:
47 return False, f"PostgreSQL connection timeout ({timeout}s)"
48 except asyncpg.InvalidPasswordError as e:
49 return False, f"PostgreSQL authentication failed: {e}"
50 except asyncpg.PostgresError as e:
51 # Check if it's a missing database error
52 if "does not exist" in str(e):
53 return False, f"PostgreSQL database does not exist: {e}"
54 return False, f"PostgreSQL error: {e}"
55 except ValueError as e:
56 return False, f"Invalid PostgreSQL connection string: {e}"
57 except Exception as e:
58 return False, f"PostgreSQL connection failed: {e}"
60 except ImportError:
61 return False, "asyncpg not installed - cannot validate database connectivity"
62 except Exception as e:
63 return False, f"Unexpected error during database validation: {e}"
66async def create_connection_pool(
67 postgres_url: str,
68 min_size: int = 2,
69 max_size: int = 10,
70 command_timeout: float = 60.0,
71 max_retries: int = 3,
72 initial_delay: float = 1.0,
73 max_delay: float = 8.0,
74) -> asyncpg.Pool:
75 """
76 Create PostgreSQL connection pool with retry logic.
78 Uses exponential backoff to handle transient connection failures.
80 Args:
81 postgres_url: PostgreSQL connection URL
82 min_size: Minimum pool size (default: 2)
83 max_size: Maximum pool size (default: 10)
84 command_timeout: Command timeout in seconds (default: 60.0)
85 max_retries: Maximum retry attempts (default: 3)
86 initial_delay: Initial retry delay in seconds (default: 1.0)
87 max_delay: Maximum retry delay in seconds (default: 8.0)
89 Returns:
90 asyncpg.Pool instance
92 Raises:
93 RuntimeError: If pool creation fails
94 asyncpg.PostgresError: If connection fails after retries
96 Example:
97 >>> pool = await create_connection_pool(
98 ... "postgresql://postgres:postgres@localhost:5432/gdpr"
99 ... )
100 >>> # Use pool for queries
101 >>> await pool.close()
102 """
104 async def _create_pool() -> asyncpg.Pool:
105 """Helper to create connection pool"""
106 pool = await asyncpg.create_pool(
107 postgres_url,
108 min_size=min_size,
109 max_size=max_size,
110 command_timeout=command_timeout,
111 )
112 if pool is None:
113 msg = "Failed to create connection pool"
114 raise RuntimeError(msg)
115 return pool
117 # Create connection pool with retry logic
118 # Retries: 3, Backoff: 1s, 2s, 4s (with jitter)
119 pool = await retry_with_backoff(
120 _create_pool,
121 max_retries=max_retries,
122 initial_delay=initial_delay,
123 max_delay=max_delay,
124 exponential_base=2.0,
125 jitter=True,
126 max_timeout=60.0,
127 )
129 return pool