Coverage for src / mcp_server_langgraph / observability / json_logger.py: 96%

65 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Structured JSON Logging with OpenTelemetry Integration 

3 

4Provides JSON-formatted logging with automatic trace context injection, 

5compatible with centralized log aggregation platforms (ELK, Datadog, Splunk, CloudWatch). 

6""" 

7 

8import json 

9import logging 

10from datetime import datetime, UTC 

11from typing import TYPE_CHECKING, Any 

12 

13from opentelemetry import trace 

14 

15# Type annotation for conditional base class 

16if TYPE_CHECKING: 

17 JsonFormatterBase: type[logging.Formatter] = logging.Formatter 

18else: 

19 try: 

20 from pythonjsonlogger.json import JsonFormatter 

21 

22 JsonFormatterBase = JsonFormatter 

23 except (ImportError, AttributeError): 

24 # Fallback if pythonjsonlogger is not available 

25 JsonFormatterBase = logging.Formatter 

26 

27 

28class CustomJSONFormatter(JsonFormatterBase): # type: ignore[valid-type, misc] 

29 """ 

30 Enhanced JSON formatter with OpenTelemetry trace context injection 

31 

32 Features: 

33 - Automatic trace_id and span_id injection from OpenTelemetry 

34 - ISO 8601 timestamp formatting 

35 - Exception stack trace capture 

36 - Custom field support via extra parameter 

37 - Hostname/service name injection 

38 """ 

39 

40 def __init__( 

41 self, 

42 fmt: str | None = None, 

43 datefmt: str | None = None, 

44 style: str = "%", 

45 service_name: str = "mcp-server-langgraph", 

46 include_hostname: bool = True, 

47 indent: int | None = None, 

48 ): 

49 """ 

50 Initialize JSON formatter 

51 

52 Args: 

53 fmt: Log message format (supports standard logging format keys) 

54 datefmt: Date format (ISO 8601 if not specified) 

55 style: Format style (%, {, $) 

56 service_name: Service name to include in logs 

57 include_hostname: Whether to include hostname 

58 indent: JSON indentation for pretty-printing (None for compact) 

59 """ 

60 super().__init__(fmt=fmt, datefmt=datefmt, style=style) 

61 self.service_name = service_name 

62 self.include_hostname = include_hostname 

63 self.indent = indent 

64 

65 def add_fields( 

66 self, 

67 log_record: dict[str, Any], 

68 record: logging.LogRecord, 

69 message_dict: dict[str, Any], 

70 ) -> None: 

71 """ 

72 Add custom fields to log record 

73 

74 Injects: 

75 - timestamp (ISO 8601) 

76 - level (INFO, ERROR, etc.) 

77 - logger name 

78 - trace_id and span_id from OpenTelemetry 

79 - service name 

80 - hostname (optional) 

81 - exception details (if present) 

82 """ 

83 super().add_fields(log_record, record, message_dict) 

84 

85 # Add timestamp in ISO 8601 format with milliseconds 

86 if "timestamp" not in log_record: 

87 now = datetime.now(UTC) 

88 log_record["timestamp"] = now.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" 

89 

90 # Add log level 

91 log_record["level"] = record.levelname 

92 

93 # Add logger name 

94 log_record["logger"] = record.name 

95 

96 # Add service name 

97 log_record["service"] = self.service_name 

98 

99 # Add hostname if enabled 

100 if self.include_hostname: 

101 import socket 

102 

103 try: 

104 log_record["hostname"] = socket.gethostname() 

105 except Exception: 

106 log_record["hostname"] = "unknown" 

107 

108 # Add OpenTelemetry trace context 

109 span = trace.get_current_span() 

110 if span: 

111 span_context = span.get_span_context() 

112 if span_context and span_context.is_valid: 

113 log_record["trace_id"] = format(span_context.trace_id, "032x") 

114 log_record["span_id"] = format(span_context.span_id, "016x") 

115 log_record["trace_flags"] = f"{span_context.trace_flags:02x}" 

116 

117 # Add exception info if present 

118 if record.exc_info: 

119 # exc_info can be True (capture current exception) or a tuple (type, value, traceback) 

120 # When logging.LogRecord is created with exc_info=True, it should auto-capture 

121 # But in tests, it might just be True. Handle both cases. 

122 if isinstance(record.exc_info, tuple) and len(record.exc_info) == 3: 

123 log_record["exception"] = { 

124 "type": record.exc_info[0].__name__ if record.exc_info[0] else None, 

125 "message": str(record.exc_info[1]) if record.exc_info[1] else None, 

126 "stacktrace": self.formatException(record.exc_info), 

127 } 

128 # Note: exc_info=True case is rare and often indicates a logging misconfiguration 

129 # The tuple case above handles normal exception logging 

130 

131 # Add process and thread info 

132 log_record["process"] = { 

133 "pid": record.process, 

134 "name": record.processName, 

135 } 

136 log_record["thread"] = { 

137 "id": record.thread, 

138 "name": record.threadName, 

139 } 

140 

141 # Add file location 

142 log_record["location"] = { 

143 "file": record.pathname, 

144 "line": record.lineno, 

145 "function": record.funcName, 

146 } 

147 

148 def format(self, record: logging.LogRecord) -> str: 

149 """ 

150 Format log record as JSON string 

151 

152 Args: 

153 record: Log record to format 

154 

155 Returns: 

156 JSON-formatted log string 

157 """ 

158 message_dict = {} 

159 # Always get the formatted message (combines msg + args) 

160 message_dict["message"] = record.getMessage() 

161 

162 # Merge any extra fields passed via extra parameter 

163 if hasattr(record, "__dict__"): 163 ↛ 194line 163 didn't jump to line 194 because the condition on line 163 was always true

164 for key, value in record.__dict__.items(): 

165 # Skip standard logging attributes 

166 if key not in [ 

167 "name", 

168 "msg", 

169 "args", 

170 "created", 

171 "filename", 

172 "funcName", 

173 "levelname", 

174 "levelno", 

175 "lineno", 

176 "module", 

177 "msecs", 

178 "message", 

179 "pathname", 

180 "process", 

181 "processName", 

182 "relativeCreated", 

183 "thread", 

184 "threadName", 

185 "exc_info", 

186 "exc_text", 

187 "stack_info", 

188 "taskName", 

189 ]: 

190 # Add custom extra fields 

191 if not key.startswith("_"): 

192 message_dict[key] = value 

193 

194 log_record: dict[str, Any] = {} 

195 self.add_fields(log_record, record, message_dict) 

196 

197 # Serialize to JSON 

198 return json.dumps(log_record, default=str, indent=self.indent) 

199 

200 

201def setup_json_logging( 

202 logger: logging.Logger, 

203 level: int = logging.INFO, 

204 service_name: str = "mcp-server-langgraph", 

205 include_hostname: bool = True, 

206 indent: int | None = None, 

207) -> logging.Logger: 

208 """ 

209 Configure logger with JSON formatting 

210 

211 Args: 

212 logger: Logger instance to configure 

213 level: Logging level 

214 service_name: Service name for log entries 

215 include_hostname: Include hostname in logs 

216 indent: JSON indentation (None for compact, 2 for pretty-print) 

217 

218 Returns: 

219 Configured logger instance 

220 

221 Example: 

222 >>> logger = logging.getLogger(__name__) 

223 >>> setup_json_logging(logger, level=logging.INFO) 

224 >>> logger.info("User logged in", extra={"user_id": "alice", "ip": "1.2.3.4"}) 

225 """ 

226 # Remove existing handlers 

227 logger.handlers = [] 

228 

229 # Create JSON formatter 

230 formatter = CustomJSONFormatter( 

231 service_name=service_name, 

232 include_hostname=include_hostname, 

233 indent=indent, 

234 ) 

235 

236 # Create console handler 

237 handler = logging.StreamHandler() 

238 handler.setFormatter(formatter) 

239 handler.setLevel(level) 

240 

241 # Add handler to logger 

242 logger.addHandler(handler) 

243 logger.setLevel(level) 

244 

245 return logger 

246 

247 

248# Helper function for logging with extra context 

249def log_with_context( 

250 logger: logging.Logger, 

251 level: int, 

252 message: str, 

253 **context: Any, 

254) -> None: 

255 """ 

256 Log message with additional context fields 

257 

258 Args: 

259 logger: Logger instance 

260 level: Log level (logging.INFO, logging.ERROR, etc.) 

261 message: Log message 

262 **context: Additional context fields (user_id, ip_address, etc.) 

263 

264 Example: 

265 >>> log_with_context( 

266 ... logger, 

267 ... logging.INFO, 

268 ... "User action", 

269 ... user_id="alice", 

270 ... action="login", 

271 ... ip="1.2.3.4" 

272 ... ) 

273 """ 

274 logger.log(level, message, extra=context)