Coverage for src / mcp_server_langgraph / api / api_keys.py: 100%
67 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2API Key Management Endpoints
4REST API for managing API keys including creation, listing, rotation, and revocation.
5Also provides validation endpoint for Kong API key→JWT exchange.
7See ADR-0034 for API key to JWT exchange pattern.
8"""
10from typing import Annotated, Any
12from fastapi import APIRouter, Depends, Header, HTTPException, status
13from pydantic import BaseModel, Field
15from mcp_server_langgraph.auth.api_keys import APIKeyManager
16from mcp_server_langgraph.auth.keycloak import KeycloakClient
17from mcp_server_langgraph.auth.middleware import get_current_user
18from mcp_server_langgraph.core.dependencies import get_api_key_manager, get_keycloak_client
20# Type aliases for FastAPI dependencies (FAST002)
21CurrentUser = Annotated[dict[str, Any], Depends(get_current_user)]
22APIKeyManagerDep = Annotated[APIKeyManager, Depends(get_api_key_manager)]
23KeycloakClientDep = Annotated[KeycloakClient, Depends(get_keycloak_client)]
25router = APIRouter(
26 prefix="/api/v1/api-keys",
27 tags=["API Keys"],
28)
31# Request/Response Models
34class CreateAPIKeyRequest(BaseModel):
35 """Request to create a new API key"""
37 name: str = Field(..., description="Human-readable name for the API key")
38 expires_days: int = Field(default=365, description="Days until expiration (default: 365)")
41class APIKeyResponse(BaseModel):
42 """Response containing API key metadata"""
44 key_id: str
45 name: str
46 created: str
47 expires_at: str
48 last_used: str | None = None
51class CreateAPIKeyResponse(APIKeyResponse):
52 """Response when creating API key (includes the key itself)"""
54 api_key: str = Field(..., description="API key (save securely, won't be shown again)")
55 message: str = Field(default="API key created successfully. Save it securely - it will not be shown again.")
58class RotateAPIKeyResponse(BaseModel):
59 """Response when rotating API key"""
61 key_id: str
62 new_api_key: str = Field(..., description="New API key")
63 message: str = Field(default="API key rotated successfully. Update your client configuration.")
66class ValidateAPIKeyResponse(BaseModel):
67 """Response when validating API key (for Kong plugin)"""
69 access_token: str = Field(..., description="JWT access token")
70 expires_in: int = Field(..., description="Token expiration in seconds")
71 user_id: str
72 username: str
75# User-Facing API Endpoints
78@router.post("/", status_code=status.HTTP_201_CREATED)
79async def create_api_key(
80 request: CreateAPIKeyRequest,
81 current_user: CurrentUser,
82 api_key_manager: APIKeyManagerDep,
83) -> CreateAPIKeyResponse:
84 """
85 Create a new API key for the current user
87 Creates a cryptographically secure API key with bcrypt hashing.
88 **Save the returned api_key securely** - it will not be shown again.
90 Maximum 5 keys per user. Revoke an existing key before creating more.
92 Example:
93 ```json
94 {
95 "name": "Production API Key",
96 "expires_days": 365
97 }
98 ```
99 """
100 try:
101 result = await api_key_manager.create_api_key(
102 user_id=current_user.get("keycloak_id") or current_user["user_id"], # Use UUID for Keycloak
103 name=request.name,
104 expires_days=request.expires_days,
105 )
107 return CreateAPIKeyResponse(
108 key_id=result["key_id"],
109 api_key=result["api_key"],
110 name=result["name"],
111 created=result.get("created", ""),
112 expires_at=result["expires_at"],
113 )
115 except ValueError as e:
116 raise HTTPException(
117 status_code=status.HTTP_400_BAD_REQUEST,
118 detail=str(e),
119 )
122@router.get("/")
123async def list_api_keys(
124 current_user: CurrentUser,
125 api_key_manager: APIKeyManagerDep,
126) -> list[APIKeyResponse]:
127 """
128 List all API keys for the current user
130 Returns metadata for all keys (name, created, expires, last_used).
131 Does not include the actual API keys.
132 """
133 keys = await api_key_manager.list_api_keys(current_user.get("keycloak_id") or current_user["user_id"])
135 return [
136 APIKeyResponse(
137 key_id=key["key_id"],
138 name=key["name"],
139 created=key["created"],
140 expires_at=key["expires_at"],
141 last_used=key.get("last_used"),
142 )
143 for key in keys
144 ]
147@router.post("/{key_id}/rotate")
148async def rotate_api_key(
149 key_id: str,
150 current_user: CurrentUser,
151 api_key_manager: APIKeyManagerDep,
152) -> RotateAPIKeyResponse:
153 """
154 Rotate an API key
156 Generates a new API key while keeping the same key_id.
157 The old key is invalidated immediately.
159 **Save the new_api_key securely** - update your client configuration.
160 """
161 try:
162 result = await api_key_manager.rotate_api_key(
163 user_id=current_user.get("keycloak_id") or current_user["user_id"], # Use UUID for Keycloak
164 key_id=key_id,
165 )
167 return RotateAPIKeyResponse(
168 key_id=result["key_id"],
169 new_api_key=result["new_api_key"],
170 )
172 except ValueError as e:
173 raise HTTPException(
174 status_code=status.HTTP_404_NOT_FOUND,
175 detail=str(e),
176 )
179@router.delete("/{key_id}", status_code=status.HTTP_204_NO_CONTENT)
180async def revoke_api_key(
181 key_id: str,
182 current_user: CurrentUser,
183 api_key_manager: APIKeyManagerDep,
184) -> None:
185 """
186 Revoke an API key
188 Permanently deletes the API key. This action cannot be undone.
189 Any clients using this key will immediately lose access.
190 """
191 await api_key_manager.revoke_api_key(
192 user_id=current_user.get("keycloak_id") or current_user["user_id"], # Use UUID for Keycloak
193 key_id=key_id,
194 )
196 return None
199# Internal Endpoint for Kong Plugin
202@router.post("/validate", include_in_schema=False)
203async def validate_api_key(
204 api_key_manager: APIKeyManagerDep,
205 keycloak: KeycloakClientDep,
206 api_key: Annotated[str | None, Header(alias="X-API-Key")] = None,
207) -> ValidateAPIKeyResponse:
208 """
209 Validate API key and return JWT (internal endpoint for Kong plugin)
211 This endpoint is called by the Kong API key→JWT exchange plugin.
212 It validates the API key and issues a JWT for the associated user.
214 **This endpoint is not intended for direct client use.**
215 """
216 if not api_key:
217 raise HTTPException(
218 status_code=status.HTTP_400_BAD_REQUEST,
219 detail="Missing X-API-Key header",
220 )
222 # Validate API key
223 user_info = await api_key_manager.validate_and_get_user(api_key)
225 if not user_info:
226 raise HTTPException(
227 status_code=status.HTTP_401_UNAUTHORIZED,
228 detail="Invalid or expired API key",
229 )
231 # Issue JWT for this user
232 try:
233 # Exchange for JWT using Keycloak
234 # This simulates a user login to get a JWT
235 # Use keycloak_id (UUID) for Admin API, fallback to user_id for backward compatibility
236 keycloak_user_id = user_info.get("keycloak_id") or user_info["user_id"]
237 token_response = await keycloak.issue_token_for_user(keycloak_user_id)
239 return ValidateAPIKeyResponse(
240 access_token=token_response["access_token"],
241 expires_in=token_response.get("expires_in", 900), # Default 15 min
242 user_id=user_info["user_id"],
243 username=user_info["username"],
244 )
246 except Exception as e:
247 raise HTTPException(
248 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
249 detail=f"Failed to issue JWT: {e!s}",
250 )