Merge master into feature/session-proxy
This commit is contained in:
120
src/grist_mcp/logging.py
Normal file
120
src/grist_mcp/logging.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""Logging configuration and utilities."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
__all__ = [
|
||||
"setup_logging",
|
||||
"get_logger",
|
||||
"truncate_token",
|
||||
"extract_stats",
|
||||
"format_tool_log",
|
||||
]
|
||||
|
||||
|
||||
def setup_logging() -> None:
|
||||
"""Configure logging based on LOG_LEVEL environment variable.
|
||||
|
||||
Valid levels: DEBUG, INFO, WARNING, ERROR (default: INFO)
|
||||
"""
|
||||
level_name = os.environ.get("LOG_LEVEL", "INFO").upper()
|
||||
level = getattr(logging, level_name, None)
|
||||
|
||||
if not isinstance(level, int):
|
||||
level = logging.INFO
|
||||
|
||||
logger = logging.getLogger("grist_mcp")
|
||||
logger.setLevel(level)
|
||||
logger.propagate = False # Prevent duplicate logs to root logger
|
||||
|
||||
# Only add handler if not already configured
|
||||
if not logger.handlers:
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(logging.Formatter("%(message)s"))
|
||||
logger.addHandler(handler)
|
||||
|
||||
|
||||
def extract_stats(tool_name: str, arguments: dict, result: dict) -> str:
|
||||
"""Extract meaningful stats from tool call based on tool type."""
|
||||
if tool_name == "list_documents":
|
||||
count = len(result.get("documents", []))
|
||||
return f"{count} docs"
|
||||
|
||||
if tool_name == "list_tables":
|
||||
count = len(result.get("tables", []))
|
||||
return f"{count} tables"
|
||||
|
||||
if tool_name == "describe_table":
|
||||
count = len(result.get("columns", []))
|
||||
return f"{count} columns"
|
||||
|
||||
if tool_name == "get_records":
|
||||
count = len(result.get("records", []))
|
||||
return f"{count} records"
|
||||
|
||||
if tool_name == "sql_query":
|
||||
count = len(result.get("records", []))
|
||||
return f"{count} rows"
|
||||
|
||||
if tool_name == "add_records":
|
||||
count = len(arguments.get("records", []))
|
||||
return f"{count} records"
|
||||
|
||||
if tool_name == "update_records":
|
||||
count = len(arguments.get("records", []))
|
||||
return f"{count} records"
|
||||
|
||||
if tool_name == "delete_records":
|
||||
count = len(arguments.get("record_ids", []))
|
||||
return f"{count} records"
|
||||
|
||||
if tool_name == "create_table":
|
||||
count = len(arguments.get("columns", []))
|
||||
return f"{count} columns"
|
||||
|
||||
if tool_name in ("add_column", "modify_column", "delete_column"):
|
||||
return "1 column"
|
||||
|
||||
return "-"
|
||||
|
||||
|
||||
def truncate_token(token: str) -> str:
|
||||
"""Truncate token to show first 3 and last 3 chars.
|
||||
|
||||
Tokens 8 chars or shorter show *** for security.
|
||||
"""
|
||||
if len(token) <= 8:
|
||||
return "***"
|
||||
return f"{token[:3]}...{token[-3:]}"
|
||||
|
||||
|
||||
def format_tool_log(
|
||||
agent_name: str,
|
||||
token: str,
|
||||
tool: str,
|
||||
document: str | None,
|
||||
stats: str,
|
||||
status: str,
|
||||
duration_ms: int,
|
||||
error_message: str | None = None,
|
||||
) -> str:
|
||||
"""Format a tool call log line.
|
||||
|
||||
Format: YYYY-MM-DD HH:MM:SS | agent (token) | tool | doc | stats | status | duration
|
||||
"""
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
truncated = truncate_token(token)
|
||||
doc = document if document else "-"
|
||||
|
||||
line = f"{timestamp} | {agent_name} ({truncated}) | {tool} | {doc} | {stats} | {status} | {duration_ms}ms"
|
||||
|
||||
if error_message:
|
||||
line += f"\n {error_message}"
|
||||
|
||||
return line
|
||||
|
||||
|
||||
def get_logger(name: str) -> logging.Logger:
|
||||
"""Get a child logger under the grist_mcp namespace."""
|
||||
return logging.getLogger(f"grist_mcp.{name}")
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Main entry point for the MCP server with SSE transport."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from typing import Any
|
||||
@@ -13,6 +14,7 @@ from grist_mcp.config import Config, load_config
|
||||
from grist_mcp.auth import Authenticator, AuthError
|
||||
from grist_mcp.session import SessionTokenManager
|
||||
from grist_mcp.proxy import parse_proxy_request, dispatch_proxy_request, ProxyError
|
||||
from grist_mcp.logging import setup_logging
|
||||
|
||||
|
||||
Scope = dict[str, Any]
|
||||
@@ -260,12 +262,29 @@ def _print_mcp_config(external_port: int, tokens: list) -> None:
|
||||
print()
|
||||
|
||||
|
||||
class UvicornAccessFilter(logging.Filter):
|
||||
"""Suppress uvicorn access logs unless LOG_LEVEL is DEBUG.
|
||||
|
||||
At INFO level, only grist_mcp tool logs are shown.
|
||||
At DEBUG level, all HTTP requests are visible.
|
||||
"""
|
||||
|
||||
def filter(self, record: logging.LogRecord) -> bool:
|
||||
# Only show uvicorn access logs at DEBUG level
|
||||
return os.environ.get("LOG_LEVEL", "INFO").upper() == "DEBUG"
|
||||
|
||||
|
||||
def main():
|
||||
"""Run the SSE server."""
|
||||
port = int(os.environ.get("PORT", "3000"))
|
||||
external_port = int(os.environ.get("EXTERNAL_PORT", str(port)))
|
||||
config_path = os.environ.get("CONFIG_PATH", "/app/config.yaml")
|
||||
|
||||
setup_logging()
|
||||
|
||||
# Suppress uvicorn access logs at INFO level (only show tool logs)
|
||||
logging.getLogger("uvicorn.access").addFilter(UvicornAccessFilter())
|
||||
|
||||
if not _ensure_config(config_path):
|
||||
return
|
||||
|
||||
@@ -278,7 +297,13 @@ def main():
|
||||
_print_mcp_config(external_port, config.tokens)
|
||||
|
||||
app = create_app(config)
|
||||
uvicorn.run(app, host="0.0.0.0", port=port)
|
||||
|
||||
# Configure uvicorn logging to reduce health check noise
|
||||
log_config = uvicorn.config.LOGGING_CONFIG
|
||||
log_config["formatters"]["default"]["fmt"] = "%(message)s"
|
||||
log_config["formatters"]["access"]["fmt"] = "%(message)s"
|
||||
|
||||
uvicorn.run(app, host="0.0.0.0", port=port, log_config=log_config)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""MCP server setup and tool registration."""
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
from mcp.server import Server
|
||||
from mcp.types import Tool, TextContent
|
||||
@@ -9,6 +10,9 @@ from grist_mcp.auth import Authenticator, Agent, AuthError
|
||||
from grist_mcp.session import SessionTokenManager
|
||||
from grist_mcp.tools.session import get_proxy_documentation as _get_proxy_documentation
|
||||
from grist_mcp.tools.session import request_session_token as _request_session_token
|
||||
from grist_mcp.logging import get_logger, extract_stats, format_tool_log
|
||||
|
||||
logger = get_logger("server")
|
||||
|
||||
from grist_mcp.tools.discovery import list_documents as _list_documents
|
||||
from grist_mcp.tools.read import list_tables as _list_tables
|
||||
@@ -239,6 +243,22 @@ def create_server(auth: Authenticator, agent: Agent, token_manager: SessionToken
|
||||
|
||||
@server.call_tool()
|
||||
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
||||
start_time = time.time()
|
||||
document = arguments.get("document")
|
||||
|
||||
# Log arguments at DEBUG level
|
||||
logger.debug(
|
||||
format_tool_log(
|
||||
agent_name=_current_agent.name,
|
||||
token=_current_agent.token,
|
||||
tool=name,
|
||||
document=document,
|
||||
stats=f"args: {json.dumps(arguments)}",
|
||||
status="started",
|
||||
duration_ms=0,
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
if name == "list_documents":
|
||||
result = await _list_documents(_current_agent)
|
||||
@@ -311,11 +331,53 @@ def create_server(auth: Authenticator, agent: Agent, token_manager: SessionToken
|
||||
else:
|
||||
return [TextContent(type="text", text=f"Unknown tool: {name}")]
|
||||
|
||||
duration_ms = int((time.time() - start_time) * 1000)
|
||||
stats = extract_stats(name, arguments, result)
|
||||
|
||||
logger.info(
|
||||
format_tool_log(
|
||||
agent_name=_current_agent.name,
|
||||
token=_current_agent.token,
|
||||
tool=name,
|
||||
document=document,
|
||||
stats=stats,
|
||||
status="success",
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
)
|
||||
|
||||
return [TextContent(type="text", text=json.dumps(result))]
|
||||
|
||||
except AuthError as e:
|
||||
duration_ms = int((time.time() - start_time) * 1000)
|
||||
logger.warning(
|
||||
format_tool_log(
|
||||
agent_name=_current_agent.name,
|
||||
token=_current_agent.token,
|
||||
tool=name,
|
||||
document=document,
|
||||
stats="-",
|
||||
status="auth_error",
|
||||
duration_ms=duration_ms,
|
||||
error_message=str(e),
|
||||
)
|
||||
)
|
||||
return [TextContent(type="text", text=f"Authorization error: {e}")]
|
||||
|
||||
except Exception as e:
|
||||
duration_ms = int((time.time() - start_time) * 1000)
|
||||
logger.error(
|
||||
format_tool_log(
|
||||
agent_name=_current_agent.name,
|
||||
token=_current_agent.token,
|
||||
tool=name,
|
||||
document=document,
|
||||
stats="-",
|
||||
status="error",
|
||||
duration_ms=duration_ms,
|
||||
error_message=str(e),
|
||||
)
|
||||
)
|
||||
return [TextContent(type="text", text=f"Error: {e}")]
|
||||
|
||||
return server
|
||||
|
||||
Reference in New Issue
Block a user