Coverage for src / mcp_server_langgraph / api / api_keys.py: 100%

67 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2API Key Management Endpoints 

3 

4REST API for managing API keys including creation, listing, rotation, and revocation. 

5Also provides validation endpoint for Kong API key→JWT exchange. 

6 

7See ADR-0034 for API key to JWT exchange pattern. 

8""" 

9 

10from typing import Annotated, Any 

11 

12from fastapi import APIRouter, Depends, Header, HTTPException, status 

13from pydantic import BaseModel, Field 

14 

15from mcp_server_langgraph.auth.api_keys import APIKeyManager 

16from mcp_server_langgraph.auth.keycloak import KeycloakClient 

17from mcp_server_langgraph.auth.middleware import get_current_user 

18from mcp_server_langgraph.core.dependencies import get_api_key_manager, get_keycloak_client 

19 

20# Type aliases for FastAPI dependencies (FAST002) 

21CurrentUser = Annotated[dict[str, Any], Depends(get_current_user)] 

22APIKeyManagerDep = Annotated[APIKeyManager, Depends(get_api_key_manager)] 

23KeycloakClientDep = Annotated[KeycloakClient, Depends(get_keycloak_client)] 

24 

25router = APIRouter( 

26 prefix="/api/v1/api-keys", 

27 tags=["API Keys"], 

28) 

29 

30 

31# Request/Response Models 

32 

33 

34class CreateAPIKeyRequest(BaseModel): 

35 """Request to create a new API key""" 

36 

37 name: str = Field(..., description="Human-readable name for the API key") 

38 expires_days: int = Field(default=365, description="Days until expiration (default: 365)") 

39 

40 

41class APIKeyResponse(BaseModel): 

42 """Response containing API key metadata""" 

43 

44 key_id: str 

45 name: str 

46 created: str 

47 expires_at: str 

48 last_used: str | None = None 

49 

50 

51class CreateAPIKeyResponse(APIKeyResponse): 

52 """Response when creating API key (includes the key itself)""" 

53 

54 api_key: str = Field(..., description="API key (save securely, won't be shown again)") 

55 message: str = Field(default="API key created successfully. Save it securely - it will not be shown again.") 

56 

57 

58class RotateAPIKeyResponse(BaseModel): 

59 """Response when rotating API key""" 

60 

61 key_id: str 

62 new_api_key: str = Field(..., description="New API key") 

63 message: str = Field(default="API key rotated successfully. Update your client configuration.") 

64 

65 

66class ValidateAPIKeyResponse(BaseModel): 

67 """Response when validating API key (for Kong plugin)""" 

68 

69 access_token: str = Field(..., description="JWT access token") 

70 expires_in: int = Field(..., description="Token expiration in seconds") 

71 user_id: str 

72 username: str 

73 

74 

75# User-Facing API Endpoints 

76 

77 

78@router.post("/", status_code=status.HTTP_201_CREATED) 

79async def create_api_key( 

80 request: CreateAPIKeyRequest, 

81 current_user: CurrentUser, 

82 api_key_manager: APIKeyManagerDep, 

83) -> CreateAPIKeyResponse: 

84 """ 

85 Create a new API key for the current user 

86 

87 Creates a cryptographically secure API key with bcrypt hashing. 

88 **Save the returned api_key securely** - it will not be shown again. 

89 

90 Maximum 5 keys per user. Revoke an existing key before creating more. 

91 

92 Example: 

93 ```json 

94 { 

95 "name": "Production API Key", 

96 "expires_days": 365 

97 } 

98 ``` 

99 """ 

100 try: 

101 result = await api_key_manager.create_api_key( 

102 user_id=current_user.get("keycloak_id") or current_user["user_id"], # Use UUID for Keycloak 

103 name=request.name, 

104 expires_days=request.expires_days, 

105 ) 

106 

107 return CreateAPIKeyResponse( 

108 key_id=result["key_id"], 

109 api_key=result["api_key"], 

110 name=result["name"], 

111 created=result.get("created", ""), 

112 expires_at=result["expires_at"], 

113 ) 

114 

115 except ValueError as e: 

116 raise HTTPException( 

117 status_code=status.HTTP_400_BAD_REQUEST, 

118 detail=str(e), 

119 ) 

120 

121 

122@router.get("/") 

123async def list_api_keys( 

124 current_user: CurrentUser, 

125 api_key_manager: APIKeyManagerDep, 

126) -> list[APIKeyResponse]: 

127 """ 

128 List all API keys for the current user 

129 

130 Returns metadata for all keys (name, created, expires, last_used). 

131 Does not include the actual API keys. 

132 """ 

133 keys = await api_key_manager.list_api_keys(current_user.get("keycloak_id") or current_user["user_id"]) 

134 

135 return [ 

136 APIKeyResponse( 

137 key_id=key["key_id"], 

138 name=key["name"], 

139 created=key["created"], 

140 expires_at=key["expires_at"], 

141 last_used=key.get("last_used"), 

142 ) 

143 for key in keys 

144 ] 

145 

146 

147@router.post("/{key_id}/rotate") 

148async def rotate_api_key( 

149 key_id: str, 

150 current_user: CurrentUser, 

151 api_key_manager: APIKeyManagerDep, 

152) -> RotateAPIKeyResponse: 

153 """ 

154 Rotate an API key 

155 

156 Generates a new API key while keeping the same key_id. 

157 The old key is invalidated immediately. 

158 

159 **Save the new_api_key securely** - update your client configuration. 

160 """ 

161 try: 

162 result = await api_key_manager.rotate_api_key( 

163 user_id=current_user.get("keycloak_id") or current_user["user_id"], # Use UUID for Keycloak 

164 key_id=key_id, 

165 ) 

166 

167 return RotateAPIKeyResponse( 

168 key_id=result["key_id"], 

169 new_api_key=result["new_api_key"], 

170 ) 

171 

172 except ValueError as e: 

173 raise HTTPException( 

174 status_code=status.HTTP_404_NOT_FOUND, 

175 detail=str(e), 

176 ) 

177 

178 

179@router.delete("/{key_id}", status_code=status.HTTP_204_NO_CONTENT) 

180async def revoke_api_key( 

181 key_id: str, 

182 current_user: CurrentUser, 

183 api_key_manager: APIKeyManagerDep, 

184) -> None: 

185 """ 

186 Revoke an API key 

187 

188 Permanently deletes the API key. This action cannot be undone. 

189 Any clients using this key will immediately lose access. 

190 """ 

191 await api_key_manager.revoke_api_key( 

192 user_id=current_user.get("keycloak_id") or current_user["user_id"], # Use UUID for Keycloak 

193 key_id=key_id, 

194 ) 

195 

196 return None 

197 

198 

199# Internal Endpoint for Kong Plugin 

200 

201 

202@router.post("/validate", include_in_schema=False) 

203async def validate_api_key( 

204 api_key_manager: APIKeyManagerDep, 

205 keycloak: KeycloakClientDep, 

206 api_key: Annotated[str | None, Header(alias="X-API-Key")] = None, 

207) -> ValidateAPIKeyResponse: 

208 """ 

209 Validate API key and return JWT (internal endpoint for Kong plugin) 

210 

211 This endpoint is called by the Kong API key→JWT exchange plugin. 

212 It validates the API key and issues a JWT for the associated user. 

213 

214 **This endpoint is not intended for direct client use.** 

215 """ 

216 if not api_key: 

217 raise HTTPException( 

218 status_code=status.HTTP_400_BAD_REQUEST, 

219 detail="Missing X-API-Key header", 

220 ) 

221 

222 # Validate API key 

223 user_info = await api_key_manager.validate_and_get_user(api_key) 

224 

225 if not user_info: 

226 raise HTTPException( 

227 status_code=status.HTTP_401_UNAUTHORIZED, 

228 detail="Invalid or expired API key", 

229 ) 

230 

231 # Issue JWT for this user 

232 try: 

233 # Exchange for JWT using Keycloak 

234 # This simulates a user login to get a JWT 

235 # Use keycloak_id (UUID) for Admin API, fallback to user_id for backward compatibility 

236 keycloak_user_id = user_info.get("keycloak_id") or user_info["user_id"] 

237 token_response = await keycloak.issue_token_for_user(keycloak_user_id) 

238 

239 return ValidateAPIKeyResponse( 

240 access_token=token_response["access_token"], 

241 expires_in=token_response.get("expires_in", 900), # Default 15 min 

242 user_id=user_info["user_id"], 

243 username=user_info["username"], 

244 ) 

245 

246 except Exception as e: 

247 raise HTTPException( 

248 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

249 detail=f"Failed to issue JWT: {e!s}", 

250 )