Compare commits

70 Commits

Author SHA1 Message Date
5b0ade3d63 chore(deps): update docker/metadata-action action to v6 2026-03-28 22:03:09 +00:00
540e57ec81 Merge pull request 'chore(deps): update actions/checkout digest to de0fac2' (#4) from renovate/actions-checkout-digest into master
Reviewed-on: #4
2026-03-28 16:26:53 -04:00
d1e1043896 Merge pull request 'chore(deps): pin dependencies' (#2) from renovate/pin-dependencies into master
Reviewed-on: #2
2026-03-28 16:25:58 -04:00
6521078b6a chore(deps): pin dependencies 2026-03-26 22:12:00 +00:00
2f0a24aceb chore(deps): update actions/checkout digest to de0fac2 2026-02-04 16:03:19 +00:00
77bf95817d chore: update uv.lock
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
2026-01-26 15:23:21 -05:00
29a72ab005 docs: update changelog for v1.5.0 column label support 2026-01-26 15:19:20 -05:00
33bb464102 feat: add label parameter to add_column and modify_column tools
Allow setting a human-readable display label for columns, separate from
the column_id used in formulas and API calls. The label defaults to the
column_id if not provided.
2026-01-26 15:18:11 -05:00
d4e793224b chore: update uv.lock 2026-01-14 18:04:00 -05:00
bf8f301ded chore: bump version to 1.4.1 and update changelog
All checks were successful
Build and Push Docker Image / build (push) Successful in 9s
Document the reference column filter support feature.
2026-01-14 17:57:33 -05:00
a97930848b feat: normalize filter values to array format for Grist API
The Grist API requires all filter values to be arrays. This change adds
automatic normalization of filter values in get_records, wrapping single
values in lists before sending to the API.

This fixes 400 errors when filtering on Ref columns with single integer IDs.

Changes:
- Add filters.py module with normalize_filter function
- Update get_records to normalize filters before API call
- Add Orders table with Ref column to mock Grist server
- Add filter validation to mock server (rejects non-array values)
- Fix shell script shebangs for portability (#!/usr/bin/env bash)
2026-01-14 17:56:18 -05:00
c868e8a7fa chore: bump version to 1.4.0 and update changelog
All checks were successful
Build and Push Docker Image / build (push) Successful in 11s
2026-01-12 12:14:49 -05:00
734cc0a525 feat: add attachment download via proxy endpoint
Add GET /api/v1/attachments/{id} endpoint for downloading attachments
through the MCP proxy. This complements the existing upload endpoint and
enables complete attachment workflows via the proxy API.
2026-01-12 12:13:23 -05:00
a7c87128ef feat: replace MCP attachment tool with proxy endpoint
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
The MCP tool approach was impractical because it required the LLM to
generate large base64 strings token-by-token, causing timeouts.

Changes:
- Remove upload_attachment MCP tool
- Add POST /api/v1/attachments endpoint for multipart/form-data uploads
- Update proxy documentation to show both endpoints
- Uses existing GristClient.upload_attachment() method
- Requires write permission in session token
2026-01-03 20:26:36 -05:00
848cfd684f feat: add upload_attachment MCP tool
All checks were successful
Build and Push Docker Image / build (push) Successful in 24s
Add support for uploading file attachments to Grist documents:

- GristClient.upload_attachment() method using multipart/form-data
- upload_attachment tool function with base64 decoding and MIME detection
- Tool registration in server.py
- Comprehensive unit tests (7 new tests)

Returns attachment ID for linking to records via update_records.

Bumps version to 1.3.0.
2026-01-03 19:59:47 -05:00
ea175d55a2 Add attachment upload feature design 2026-01-03 19:50:01 -05:00
db12fca615 Merge pull request 'chore(deps): update actions/checkout action to v6' (#3) from renovate/actions-checkout-6.x into master
Reviewed-on: #3
2026-01-02 17:19:43 -05:00
d540105d09 docs(proxy): clarify proxy_url usage in documentation
All checks were successful
Build and Push Docker Image / build (push) Successful in 21s
2026-01-02 15:01:33 -05:00
d40ae0b238 feat(main): use GRIST_MCP_URL in startup config output 2026-01-02 14:58:55 -05:00
2a60de1bf1 docs: add GRIST_MCP_URL to environment variables 2026-01-02 14:56:02 -05:00
ba45de4582 fix(session): include full proxy URL from GRIST_MCP_URL env var 2026-01-02 14:54:25 -05:00
d176b03d56 chore: bump version to 1.2.0
All checks were successful
Build and Push Docker Image / build (push) Successful in 21s
2026-01-02 14:43:50 -05:00
50c5cfbab1 Merge master into feature/session-proxy 2026-01-02 14:40:37 -05:00
8484536aae fix(integration): add auth headers and fix mock server routes 2026-01-02 14:36:25 -05:00
b3bfdf97c2 fix(test): increase sleep duration for flaky expiry test 2026-01-02 14:24:10 -05:00
eabddee737 docs: update CHANGELOG for session proxy feature 2026-01-02 14:20:45 -05:00
3d1ac1fe60 test(integration): add session proxy integration test 2026-01-02 14:17:59 -05:00
ed1d14a4d4 feat(main): add /api/v1/proxy HTTP endpoint 2026-01-02 14:16:24 -05:00
80e93ab3d9 test(proxy): add permission denial test 2026-01-02 14:08:58 -05:00
7073182f9e feat(proxy): add method dispatch 2026-01-02 14:07:47 -05:00
caa435d972 feat(proxy): add request parsing 2026-01-02 13:57:38 -05:00
ba88ba01f3 feat(server): register session token tools
Add get_proxy_documentation and request_session_token tools to the MCP
server. The create_server function now accepts an optional token_manager
parameter (SessionTokenManager | None) to maintain backward compatibility.

When token_manager is None, request_session_token returns an error
message instead of creating tokens.
2026-01-02 13:51:47 -05:00
fb6d4af973 feat(tools): add request_session_token tool
Add MCP tool for agents to request short-lived session tokens for HTTP
proxy access. The tool validates that agents can only request permissions
they already have (no privilege escalation).

- Validates document access and each requested permission
- Creates session token via SessionTokenManager
- Returns token metadata including proxy URL and expiration
- Includes tests for success case and permission denial scenarios
2026-01-02 13:45:07 -05:00
a7bb11d765 feat(tools): add get_proxy_documentation tool
Add a new MCP tool that returns complete documentation for the HTTP
proxy API. This enables agents to get all the information they need
to construct valid proxy requests when writing scripts.

The tool is stateless and returns a static documentation dict
describing endpoints, methods, authentication, and example usage.
2026-01-02 13:39:02 -05:00
c65ec0489c test(session): add tests for invalid and expired tokens 2026-01-02 13:34:52 -05:00
681cb0f67c feat(session): add token validation 2026-01-02 13:31:18 -05:00
3c97ad407c feat(session): cap TTL at 1 hour maximum 2026-01-02 13:27:30 -05:00
110f87e53f docs: add logging configuration to README
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
2026-01-02 13:24:38 -05:00
b310ee10a9 feat(session): add SessionTokenManager with token creation
Add SessionTokenManager class that creates short-lived session tokens
for HTTP proxy access. Each token includes agent identity, document
scope, permissions, and expiration time.
2026-01-02 13:22:53 -05:00
f48dafc88f fix(logging): suppress uvicorn access logs and prevent duplicate logging
All checks were successful
Build and Push Docker Image / build (push) Successful in 14s
2026-01-02 13:20:35 -05:00
d80eac4a0d fix(logging): properly suppress health checks at INFO level
All checks were successful
Build and Push Docker Image / build (push) Successful in 14s
2026-01-02 13:14:52 -05:00
4923d3110c docs: add session proxy implementation plan 2026-01-02 13:04:49 -05:00
58807ddbd0 Merge branch 'feature/logging-improvements'
All checks were successful
Build and Push Docker Image / build (push) Successful in 23s
2026-01-02 13:04:48 -05:00
09b6be14df chore: bump version to 1.1.0 and update changelog 2026-01-02 13:03:57 -05:00
4e75709c4e chore: update uv.lock for version 1.0.0 2026-01-02 13:02:40 -05:00
0cdf06546c docs: add get_proxy_documentation tool to design
Dedicated tool returns complete API spec so agents can write
reusable scripts before requesting session tokens.
2026-01-02 12:58:37 -05:00
c6fbadecfc chore(logging): add module exports 2026-01-02 12:52:40 -05:00
3cf2400232 docs: add session token proxy design
Enables agents to delegate bulk data operations to scripts,
bypassing LLM generation time for data-intensive operations.
Scripts authenticate via short-lived session tokens requested
through MCP, then call a simplified HTTP proxy endpoint.
2026-01-02 12:52:02 -05:00
1eb64803be feat(logging): suppress health checks at INFO level 2026-01-02 12:51:36 -05:00
38ccaa9cb8 feat(logging): initialize logging on server startup 2026-01-02 12:49:59 -05:00
51e90abd2d feat(logging): add tool call logging to server 2026-01-02 12:47:45 -05:00
d6fb3f4ef0 feat(logging): add get_logger helper 2026-01-02 12:45:17 -05:00
163b48f1f4 feat(logging): add setup_logging with LOG_LEVEL support 2026-01-02 12:41:40 -05:00
a668baa4d0 feat(logging): add log line formatter 2026-01-02 12:37:19 -05:00
69a65a68a6 feat(logging): add stats extraction for all tools 2026-01-02 12:35:18 -05:00
ff7dff7571 feat(logging): add token truncation helper 2026-01-02 12:30:09 -05:00
77027d762e docs: add logging improvements implementation plan 2026-01-02 12:20:26 -05:00
210cfabb52 chore: add .worktrees to gitignore 2026-01-02 12:17:11 -05:00
a31b2652bb docs: add logging improvements design 2026-01-02 12:15:59 -05:00
f79ae5546f chore(deps): update actions/checkout action to v6 2026-01-02 05:20:49 +00:00
16691e1d21 Merge pull request 'chore: Configure Renovate' (#1) from renovate/configure into master
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
Reviewed-on: #1
2026-01-01 14:08:06 -05:00
204d00caf4 feat: add host_header config for Docker networking
All checks were successful
Build and Push Docker Image / build (push) Successful in 14s
When Grist validates the Host header (common with reverse proxy setups),
internal Docker networking fails because requests arrive with
Host: container-name instead of the external domain.

The new host_header config option allows overriding the Host header
sent to Grist while still connecting via internal Docker hostnames.
2026-01-01 14:06:31 -05:00
ca03d22b97 fix: handle missing config file gracefully in Docker
All checks were successful
Build and Push Docker Image / build (push) Successful in 14s
2026-01-01 12:51:25 -05:00
107db82c52 docs: update README with step-by-step first-time setup 2026-01-01 12:10:17 -05:00
4b89837b43 chore: remove logging and resource limits from prod config 2026-01-01 12:07:47 -05:00
5aaa943010 Add renovate.json 2026-01-01 16:50:55 +00:00
c8cea249bc chore: use ghcr.io image for production deployment
- Update prod docker-compose to pull from ghcr.io/xe138/grist-mcp-server
- Remove debug step from Gitea workflow
2026-01-01 11:48:39 -05:00
ae894ff52e debug: add environment diagnostics
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m56s
2026-01-01 11:29:06 -05:00
7b7eea2f67 fix: use ubuntu-docker runner with Docker CLI 2026-01-01 11:20:21 -05:00
0f2544c960 fix: use git clone instead of actions/checkout for host runner
Some checks failed
Build and Push Docker Image / build (push) Failing after 1s
2026-01-01 11:15:15 -05:00
43 changed files with 5375 additions and 208 deletions

View File

@@ -10,27 +10,12 @@ env:
IMAGE_NAME: bill/grist-mcp
jobs:
test:
runs-on: linux_x64
steps:
- uses: actions/checkout@v4
- name: Run tests
run: echo "Tests would run here"
build:
needs: test
runs-on: linux_x64
runs-on: ubuntu-docker
steps:
- uses: actions/checkout@v4
- name: Debug environment
- name: Checkout repository
run: |
echo "PATH: $PATH"
echo "Which docker: $(which docker || echo 'not found')"
echo "Docker version: $(docker --version || echo 'docker not available')"
ls -la /usr/bin/docker || echo "/usr/bin/docker not found"
ls -la /var/run/docker.sock || echo "docker.sock not found"
git clone --depth 1 --branch ${GITHUB_REF_NAME} ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git .
- name: Extract version from tag
id: version

View File

@@ -18,10 +18,10 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
- name: Log in to Container Registry
uses: docker/login-action@v3
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
@@ -29,7 +29,7 @@ jobs:
- name: Extract metadata for Docker
id: meta
uses: docker/metadata-action@v5
uses: docker/metadata-action@030e881283bb7a6894de51c315a6bfe6a94e05cf # v6
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
@@ -38,10 +38,10 @@ jobs:
type=raw,value=latest,enable=${{ !contains(github.ref, '-alpha') && !contains(github.ref, '-beta') && !contains(github.ref, '-rc') }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # v3
- name: Build and push Docker image
uses: docker/build-push-action@v6
uses: docker/build-push-action@10e90e3645eae34f1e60eeb005ba3a3d33f178e8 # v6
with:
context: .
push: true

1
.gitignore vendored
View File

@@ -6,3 +6,4 @@ __pycache__/
*.egg-info/
dist/
.pytest_cache/
.worktrees/

View File

@@ -5,6 +5,157 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.5.0] - 2026-01-26
### Added
#### Column Label Support
- **`add_column`**: New optional `label` parameter for setting display name
- **`modify_column`**: New optional `label` parameter for updating display name
Labels are human-readable names shown in Grist column headers, separate from the `column_id` used in formulas and API calls. If not provided, Grist defaults the label to the column ID.
#### Usage
```python
# Create column with display label
add_column(document="crm", table="Contacts", column_id="first_name", column_type="Text", label="First Name")
# Update existing column's label
modify_column(document="crm", table="Contacts", column_id="first_name", label="Given Name")
```
## [1.4.1] - 2026-01-14
### Added
#### Reference Column Filter Support
- **Filter normalization**: `get_records` now automatically normalizes filter values to array format
- Fixes 400 errors when filtering on `Ref:*` (reference/foreign key) columns
- Single values are wrapped in arrays before sending to Grist API
#### Usage
```python
# Before: Failed with 400 Bad Request
get_records(document="accounting", table="TransactionLines", filter={"Transaction": 44})
# After: Works - filter normalized to {"Transaction": [44]}
get_records(document="accounting", table="TransactionLines", filter={"Transaction": 44})
# Multiple values also supported
get_records(document="accounting", table="TransactionLines", filter={"Transaction": [44, 45, 46]})
```
### Fixed
- Shell script shebangs updated to `#!/usr/bin/env bash` for portability across environments
## [1.4.0] - 2026-01-12
### Added
#### Attachment Download via Proxy
- **`GET /api/v1/attachments/{id}`**: New HTTP endpoint for downloading attachments
- Returns binary content with appropriate `Content-Type` and `Content-Disposition` headers
- Requires read permission in session token
- Complements the existing upload endpoint for complete attachment workflows
#### Usage
```bash
# Get session token with read permission
TOKEN=$(curl -s ... | jq -r '.token')
# Download attachment
curl -H "Authorization: Bearer $TOKEN" \
https://example.com/api/v1/attachments/42 \
-o downloaded.pdf
```
```python
# Python example
import requests
response = requests.get(
f'{base_url}/api/v1/attachments/42',
headers={'Authorization': f'Bearer {token}'}
)
with open('downloaded.pdf', 'wb') as f:
f.write(response.content)
```
## [1.3.0] - 2026-01-03
### Added
#### Attachment Upload via Proxy
- **`POST /api/v1/attachments`**: New HTTP endpoint for file uploads
- Uses `multipart/form-data` for efficient binary transfer (no base64 overhead)
- Automatic MIME type detection from filename
- Returns attachment ID for linking to records via `update_records`
- Requires write permission in session token
#### Usage
```bash
# Get session token with write permission
TOKEN=$(curl -s ... | jq -r '.token')
# Upload file
curl -X POST \
-H "Authorization: Bearer $TOKEN" \
-F "file=@invoice.pdf" \
https://example.com/api/v1/attachments
# Returns: {"success": true, "data": {"attachment_id": 42, "filename": "invoice.pdf", "size_bytes": 31395}}
```
```python
# Python example
import requests
response = requests.post(
f'{proxy_url.replace("/proxy", "/attachments")}',
headers={'Authorization': f'Bearer {token}'},
files={'file': open('invoice.pdf', 'rb')}
)
attachment_id = response.json()['data']['attachment_id']
# Link to record via proxy
requests.post(proxy_url, headers={'Authorization': f'Bearer {token}'}, json={
'method': 'update_records',
'table': 'Bills',
'records': [{'id': 1, 'fields': {'Attachment': [attachment_id]}}]
})
```
## [1.2.0] - 2026-01-02
### Added
#### Session Token Proxy
- **Session token proxy**: Agents can request short-lived tokens for bulk operations
- `get_proxy_documentation` MCP tool: returns complete proxy API spec
- `request_session_token` MCP tool: creates scoped session tokens with TTL (max 1 hour)
- `POST /api/v1/proxy` HTTP endpoint: accepts session tokens for direct API access
- Supports all 11 Grist operations (read, write, schema) via HTTP
## [1.1.0] - 2026-01-02
### Added
#### Logging
- **Tool Call Logging**: Human-readable logs for every MCP tool call with agent identity, document, stats, and duration
- **Token Truncation**: Secure token display in logs (first/last 3 chars only)
- **Stats Extraction**: Meaningful operation stats per tool (e.g., "42 records", "3 tables")
- **LOG_LEVEL Support**: Configure logging verbosity via environment variable (DEBUG, INFO, WARNING, ERROR)
- **Health Check Suppression**: `/health` requests logged at DEBUG level to reduce noise
#### Log Format
```
2026-01-02 10:15:23 | agent-name (abc...xyz) | get_records | sales | 42 records | success | 125ms
```
- Pipe-delimited format for easy parsing
- Multi-line error details with indentation
- Duration tracking in milliseconds
## [1.0.0] - 2026-01-01
Initial release of grist-mcp, an MCP server for AI agents to interact with Grist spreadsheets.

View File

@@ -1,8 +1,8 @@
# Stage 1: Builder
FROM python:3.14-slim AS builder
FROM python:3.14-slim@sha256:fb83750094b46fd6b8adaa80f66e2302ecbe45d513f6cece637a841e1025b4ca AS builder
# Install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
COPY --from=ghcr.io/astral-sh/uv:latest@sha256:c4f5de312ee66d46810635ffc5df34a1973ba753e7241ce3a08ef979ddd7bea5 /uv /usr/local/bin/uv
WORKDIR /app
@@ -20,7 +20,7 @@ RUN uv sync --frozen --no-dev
# Stage 2: Runtime
FROM python:3.14-slim
FROM python:3.14-slim@sha256:fb83750094b46fd6b8adaa80f66e2302ecbe45d513f6cece637a841e1025b4ca
# Create non-root user
RUN useradd --create-home --shell /bin/bash appuser

319
README.md
View File

@@ -15,50 +15,33 @@ grist-mcp is a [Model Context Protocol (MCP)](https://modelcontextprotocol.io/)
- **Security**: Token-based authentication with per-document permission scopes (read, write, schema)
- **Multi-tenant**: Support multiple Grist instances and documents
## Requirements
## Quick Start (Docker)
- Python 3.14+
### Prerequisites
- Docker and Docker Compose
- Access to one or more Grist documents with API keys
## Installation
### 1. Create configuration directory
```bash
# Clone the repository
git clone https://github.com/your-org/grist-mcp.git
cd grist-mcp
# Install with uv
uv sync --dev
mkdir grist-mcp && cd grist-mcp
```
## Configuration
Create a `config.yaml` file based on the example:
### 2. Download configuration files
```bash
# Download docker-compose.yml
curl -O https://raw.githubusercontent.com/Xe138/grist-mcp-server/master/deploy/prod/docker-compose.yml
# Download example config
curl -O https://raw.githubusercontent.com/Xe138/grist-mcp-server/master/config.yaml.example
cp config.yaml.example config.yaml
```
### Configuration Structure
### 3. Generate tokens
```yaml
# Document definitions
documents:
my-document:
url: https://docs.getgrist.com # Grist instance URL
doc_id: abcd1234 # Document ID from URL
api_key: ${GRIST_API_KEY} # API key (supports env vars)
# Agent tokens with access scopes
tokens:
- token: your-secret-token # Unique token for this agent
name: my-agent # Human-readable name
scope:
- document: my-document
permissions: [read, write] # Allowed: read, write, schema
```
### Generating Tokens
Generate a secure token for your agent:
```bash
python -c "import secrets; print(secrets.token_urlsafe(32))"
@@ -66,34 +49,53 @@ python -c "import secrets; print(secrets.token_urlsafe(32))"
openssl rand -base64 32
```
### Environment Variables
### 4. Configure config.yaml
- `CONFIG_PATH`: Path to config file (default: `/app/config.yaml`)
- `GRIST_MCP_TOKEN`: Agent token for authentication
- Config file supports `${VAR}` syntax for API keys
Edit `config.yaml` to define your Grist documents and agent tokens:
## Usage
```yaml
# Document definitions
documents:
my-document: # Friendly name (used in token scopes)
url: https://docs.getgrist.com # Your Grist instance URL
doc_id: abcd1234efgh5678 # Document ID from the URL
api_key: your-grist-api-key # Grist API key (or use ${ENV_VAR} syntax)
### Running the Server
The server uses SSE (Server-Sent Events) transport over HTTP:
```bash
# Set your agent token
export GRIST_MCP_TOKEN="your-agent-token"
# Run with custom config path (defaults to port 3000)
CONFIG_PATH=./config.yaml uv run python -m grist_mcp.main
# Or specify a custom port
PORT=8080 CONFIG_PATH=./config.yaml uv run python -m grist_mcp.main
# Agent tokens with access scopes
tokens:
- token: your-generated-token-here # The token you generated in step 3
name: my-agent # Human-readable name
scope:
- document: my-document # Must match a document name above
permissions: [read, write] # Allowed: read, write, schema
```
The server exposes two endpoints:
- `http://localhost:3000/sse` - SSE connection endpoint
- `http://localhost:3000/messages` - Message posting endpoint
**Finding your Grist document ID**: Open your Grist document in a browser. The URL will look like:
`https://docs.getgrist.com/abcd1234efgh5678/My-Document` - the document ID is `abcd1234efgh5678`.
### MCP Client Configuration
**Getting a Grist API key**: In Grist, go to Profile Settings → API → Create API Key.
### 5. Create .env file
Create a `.env` file with your agent token:
```bash
# .env
GRIST_MCP_TOKEN=your-generated-token-here
PORT=3000
```
The `GRIST_MCP_TOKEN` must match one of the tokens defined in `config.yaml`.
### 6. Start the server
```bash
docker compose up -d
```
The server will be available at `http://localhost:3000`.
### 7. Configure your MCP client
Add to your MCP client configuration (e.g., Claude Desktop):
@@ -101,24 +103,13 @@ Add to your MCP client configuration (e.g., Claude Desktop):
{
"mcpServers": {
"grist": {
"type": "sse",
"url": "http://localhost:3000/sse"
}
}
}
```
For remote deployments, use the server's public URL:
```json
{
"mcpServers": {
"grist": {
"url": "https://your-server.example.com/sse"
}
}
}
```
## Available Tools
### Discovery
@@ -149,6 +140,105 @@ For remote deployments, use the server's public URL:
| `modify_column` | Change a column's type or formula |
| `delete_column` | Remove a column from a table |
## Configuration Reference
### Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| `PORT` | Server port | `3000` |
| `GRIST_MCP_TOKEN` | Agent authentication token (required) | - |
| `CONFIG_PATH` | Path to config file inside container | `/app/config.yaml` |
| `LOG_LEVEL` | Logging verbosity (`DEBUG`, `INFO`, `WARNING`, `ERROR`) | `INFO` |
| `GRIST_MCP_URL` | Public URL of this server (for session proxy tokens) | - |
### config.yaml Structure
```yaml
# Document definitions (each is self-contained)
documents:
budget-2024:
url: https://work.getgrist.com
doc_id: mK7xB2pQ9mN4v
api_key: ${GRIST_WORK_API_KEY} # Supports environment variable substitution
personal-tracker:
url: https://docs.getgrist.com
doc_id: pN0zE5sT2qP7x
api_key: ${GRIST_PERSONAL_API_KEY}
# Agent tokens with access scopes
tokens:
- token: your-secure-token-here
name: finance-agent
scope:
- document: budget-2024
permissions: [read, write] # Can read and write
- token: another-token-here
name: readonly-agent
scope:
- document: budget-2024
permissions: [read] # Read only
- document: personal-tracker
permissions: [read, write, schema] # Full access
```
### Permission Levels
- `read`: Query tables and records, run SQL queries
- `write`: Add, update, delete records
- `schema`: Create tables, add/modify/delete columns
## Logging
### Configuration
Set the `LOG_LEVEL` environment variable to control logging verbosity:
| Level | Description |
|-------|-------------|
| `DEBUG` | Show all logs including HTTP requests and tool arguments |
| `INFO` | Show tool calls with stats (default) |
| `WARNING` | Show only auth errors and warnings |
| `ERROR` | Show only errors |
```bash
# In .env or docker-compose.yml
LOG_LEVEL=INFO
```
### Log Format
At `INFO` level, each tool call produces a single log line:
```
2026-01-02 10:15:23 | agent-name (abc...xyz) | get_records | sales | 42 records | success | 125ms
```
| Field | Description |
|-------|-------------|
| Timestamp | `YYYY-MM-DD HH:MM:SS` |
| Agent | Agent name with truncated token |
| Tool | MCP tool name |
| Document | Document name (or `-` for list_documents) |
| Stats | Operation result (e.g., `42 records`, `3 tables`) |
| Status | `success`, `auth_error`, or `error` |
| Duration | Execution time in milliseconds |
Errors include details on a second indented line:
```
2026-01-02 10:15:23 | agent-name (abc...xyz) | add_records | sales | - | error | 89ms
Grist API error: Invalid column 'foo'
```
### Production Recommendations
- Use `LOG_LEVEL=INFO` for normal operation (default)
- Use `LOG_LEVEL=DEBUG` for troubleshooting (shows HTTP traffic)
- Use `LOG_LEVEL=WARNING` for minimal logging
## Security
- **Token-based auth**: Each agent has a unique token with specific document access
@@ -159,10 +249,30 @@ For remote deployments, use the server's public URL:
## Development
### Running Tests
### Requirements
- Python 3.14+
- uv package manager
### Local Setup
```bash
uv run pytest -v
# Clone the repository
git clone https://github.com/Xe138/grist-mcp-server.git
cd grist-mcp-server
# Install dependencies
uv sync --dev
# Run tests
make test-unit
```
### Running Locally
```bash
export GRIST_MCP_TOKEN="your-agent-token"
CONFIG_PATH=./config.yaml uv run python -m grist_mcp.main
```
### Project Structure
@@ -170,7 +280,6 @@ uv run pytest -v
```
grist-mcp/
├── src/grist_mcp/
│ ├── __init__.py
│ ├── main.py # Entry point
│ ├── server.py # MCP server setup and tool registration
│ ├── config.py # Configuration loading
@@ -182,73 +291,13 @@ grist-mcp/
│ ├── write.py # Write operations
│ └── schema.py # Schema operations
├── tests/
├── config.yaml.example
└── pyproject.toml
```
## Docker Deployment
### Prerequisites
- Docker and Docker Compose
### Quick Start
```bash
# 1. Copy example files
cp .env.example .env
cp config.yaml.example config.yaml
# 2. Edit .env with your tokens and API keys
# - Set GRIST_MCP_TOKEN to a secure agent token
# - Set your Grist API keys
# 3. Edit config.yaml with your document settings
# - Configure your Grist documents
# - Set up token scopes and permissions
# 4. Start the server
docker compose up -d
```
### Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| `PORT` | Server port | `3000` |
| `GRIST_MCP_TOKEN` | Agent authentication token (required) | - |
| `CONFIG_PATH` | Path to config file inside container | `/app/config.yaml` |
| `GRIST_*_API_KEY` | Grist API keys referenced in config.yaml | - |
### Using Prebuilt Images
To use a prebuilt image from a container registry:
```yaml
# docker-compose.yaml
services:
grist-mcp:
image: your-registry/grist-mcp:latest
ports:
- "${PORT:-3000}:3000"
volumes:
- ./config.yaml:/app/config.yaml:ro
env_file:
- .env
restart: unless-stopped
```
### Building Locally
```bash
# Build the image
docker build -t grist-mcp .
# Run directly
docker run -p 3000:3000 \
-v $(pwd)/config.yaml:/app/config.yaml:ro \
--env-file .env \
grist-mcp
│ ├── unit/ # Unit tests
│ └── integration/ # Integration tests
├── deploy/
│ ├── dev/ # Development docker-compose
│ ├── test/ # Test docker-compose
│ └── prod/ # Production docker-compose
└── config.yaml.example
```
## License

View File

@@ -23,6 +23,14 @@ documents:
doc_id: pN0zE5sT2qP7x
api_key: ${GRIST_PERSONAL_API_KEY}
# Docker networking example: connect via internal hostname,
# but send the external domain in the Host header
docker-grist:
url: http://grist:8080
doc_id: abc123
api_key: ${GRIST_API_KEY}
host_header: grist.example.com # Required when Grist validates Host header
# Agent tokens with access scopes
tokens:
- token: REPLACE_WITH_GENERATED_TOKEN

View File

@@ -1,9 +1,7 @@
# Production environment - resource limits, logging, restart policy
# Production environment
services:
grist-mcp:
build:
context: ../..
dockerfile: Dockerfile
image: ghcr.io/xe138/grist-mcp-server:latest@sha256:2ef22bfac6cfbcbbfc513f61eaea3414b3a531d79e9d1d39bf6757cc9e27ea9a
ports:
- "${PORT:-3000}:3000"
volumes:
@@ -12,18 +10,6 @@ services:
- CONFIG_PATH=/app/config.yaml
- EXTERNAL_PORT=${PORT:-3000}
restart: unless-stopped
deploy:
resources:
limits:
memory: 512M
cpus: "1"
reservations:
memory: 128M
logging:
driver: "json-file"
options:
max-size: "50m"
max-file: "5"
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3000/health')"]
interval: 30s

View File

@@ -0,0 +1,96 @@
# Logging Improvements Design
## Overview
Improve MCP server logging to provide meaningful operational visibility. Replace generic HTTP request logs with application-level context including agent identity, tool usage, document access, and operation stats.
## Current State
Logs show only uvicorn HTTP requests with no application context:
```
INFO: 172.20.0.2:43254 - "POST /messages?session_id=... HTTP/1.1" 202 Accepted
INFO: 127.0.0.1:41508 - "GET /health HTTP/1.1" 200 OK
```
## Desired State
Human-readable single-line format with full context:
```
2025-01-02 10:15:23 | dev-agent (abc...xyz) | get_records | sales | 42 records | success | 125ms
2025-01-02 10:15:24 | dev-agent (abc...xyz) | update_records | sales | 3 records | success | 89ms
2025-01-02 10:15:25 | dev-agent (abc...xyz) | add_records | inventory | 5 records | error | 89ms
Grist API error: Invalid column 'foo'
```
## Design Decisions
| Decision | Choice |
|----------|--------|
| Log format | Human-readable single-line (pipe-delimited) |
| Configuration | Environment variable only (`LOG_LEVEL`) |
| Log levels | Standard (DEBUG/INFO/WARNING/ERROR) |
| Health checks | DEBUG level only (suppressed at INFO) |
| Error details | Multi-line (indented on second line) |
## Log Format
```
YYYY-MM-DD HH:MM:SS | <agent_name> (<token_truncated>) | <tool> | <document> | <stats> | <status> | <duration>
```
**Token truncation:** First 3 and last 3 characters (e.g., `abc...xyz`). Tokens <=8 chars show `***`.
**Document field:** Shows `-` for tools without a document (e.g., `list_documents`).
## Log Levels
| Level | Events |
|-------|--------|
| ERROR | Unhandled exceptions, Grist API failures |
| WARNING | Auth errors (invalid token, permission denied) |
| INFO | Tool calls (one line per call with stats) |
| DEBUG | Health checks, detailed arguments, full results |
**Environment variable:** `LOG_LEVEL` (default: `INFO`)
## Stats Per Tool
| Tool | Stats |
|------|-------|
| `list_documents` | `N docs` |
| `list_tables` | `N tables` |
| `describe_table` | `N columns` |
| `get_records` | `N records` |
| `sql_query` | `N rows` |
| `add_records` | `N records` |
| `update_records` | `N records` |
| `delete_records` | `N records` |
| `create_table` | `N columns` |
| `add_column` | `1 column` |
| `modify_column` | `1 column` |
| `delete_column` | `1 column` |
## Files Changed
| File | Change |
|------|--------|
| `src/grist_mcp/logging.py` | New - logging setup, formatters, stats extraction |
| `src/grist_mcp/main.py` | Call `setup_logging()`, configure uvicorn logger |
| `src/grist_mcp/server.py` | Wrap `call_tool` with logging |
| `tests/unit/test_logging.py` | New - unit tests for logging module |
Tool implementations in `tools/` remain unchanged - logging is handled at the server layer.
## Testing
**Unit tests:**
- `test_setup_logging_default_level`
- `test_setup_logging_from_env`
- `test_token_truncation`
- `test_extract_stats`
- `test_format_log_line`
- `test_error_multiline_format`
**Manual verification:**
- Run `make dev-up`, make tool calls, verify log format
- Test with `LOG_LEVEL=DEBUG` for verbose output

View File

@@ -0,0 +1,821 @@
# Logging Improvements Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Add informative application-level logging that shows agent identity, tool usage, document access, and operation stats.
**Architecture:** New `logging.py` module provides setup and formatting. `server.py` wraps tool calls with timing and stats extraction. `main.py` initializes logging and configures uvicorn to suppress health check noise.
**Tech Stack:** Python `logging` stdlib, custom `Formatter`, uvicorn log config
---
### Task 1: Token Truncation Helper
**Files:**
- Create: `src/grist_mcp/logging.py`
- Test: `tests/unit/test_logging.py`
**Step 1: Write the failing test**
Create `tests/unit/test_logging.py`:
```python
"""Unit tests for logging module."""
import pytest
from grist_mcp.logging import truncate_token
class TestTruncateToken:
def test_normal_token_shows_prefix_suffix(self):
token = "abcdefghijklmnop"
assert truncate_token(token) == "abc...nop"
def test_short_token_shows_asterisks(self):
token = "abcdefgh" # 8 chars
assert truncate_token(token) == "***"
def test_very_short_token_shows_asterisks(self):
token = "abc"
assert truncate_token(token) == "***"
```
**Step 2: Run test to verify it fails**
Run: `uv run pytest tests/unit/test_logging.py -v`
Expected: FAIL with "No module named 'grist_mcp.logging'"
**Step 3: Write minimal implementation**
Create `src/grist_mcp/logging.py`:
```python
"""Logging configuration and utilities."""
def truncate_token(token: str) -> str:
"""Truncate token to show first 3 and last 3 chars.
Tokens 8 chars or shorter show *** for security.
"""
if len(token) <= 8:
return "***"
return f"{token[:3]}...{token[-3:]}"
```
**Step 4: Run test to verify it passes**
Run: `uv run pytest tests/unit/test_logging.py -v`
Expected: PASS (3 tests)
**Step 5: Commit**
```bash
git add src/grist_mcp/logging.py tests/unit/test_logging.py
git commit -m "feat(logging): add token truncation helper"
```
---
### Task 2: Stats Extraction Function
**Files:**
- Modify: `src/grist_mcp/logging.py`
- Modify: `tests/unit/test_logging.py`
**Step 1: Write the failing tests**
Add to `tests/unit/test_logging.py`:
```python
from grist_mcp.logging import truncate_token, extract_stats
class TestExtractStats:
def test_list_documents(self):
result = {"documents": [{"name": "a"}, {"name": "b"}, {"name": "c"}]}
assert extract_stats("list_documents", {}, result) == "3 docs"
def test_list_tables(self):
result = {"tables": ["Orders", "Products"]}
assert extract_stats("list_tables", {}, result) == "2 tables"
def test_describe_table(self):
result = {"columns": [{"id": "A"}, {"id": "B"}]}
assert extract_stats("describe_table", {}, result) == "2 columns"
def test_get_records(self):
result = {"records": [{"id": 1}, {"id": 2}]}
assert extract_stats("get_records", {}, result) == "2 records"
def test_sql_query(self):
result = {"records": [{"a": 1}, {"a": 2}, {"a": 3}]}
assert extract_stats("sql_query", {}, result) == "3 rows"
def test_add_records_from_args(self):
args = {"records": [{"a": 1}, {"a": 2}]}
assert extract_stats("add_records", args, {"ids": [1, 2]}) == "2 records"
def test_update_records_from_args(self):
args = {"records": [{"id": 1, "fields": {}}, {"id": 2, "fields": {}}]}
assert extract_stats("update_records", args, {}) == "2 records"
def test_delete_records_from_args(self):
args = {"record_ids": [1, 2, 3]}
assert extract_stats("delete_records", args, {}) == "3 records"
def test_create_table(self):
args = {"columns": [{"id": "A"}, {"id": "B"}]}
assert extract_stats("create_table", args, {}) == "2 columns"
def test_single_column_ops(self):
assert extract_stats("add_column", {}, {}) == "1 column"
assert extract_stats("modify_column", {}, {}) == "1 column"
assert extract_stats("delete_column", {}, {}) == "1 column"
def test_unknown_tool(self):
assert extract_stats("unknown_tool", {}, {}) == "-"
```
**Step 2: Run test to verify it fails**
Run: `uv run pytest tests/unit/test_logging.py::TestExtractStats -v`
Expected: FAIL with "cannot import name 'extract_stats'"
**Step 3: Write minimal implementation**
Add to `src/grist_mcp/logging.py`:
```python
def extract_stats(tool_name: str, arguments: dict, result: dict) -> str:
"""Extract meaningful stats from tool call based on tool type."""
if tool_name == "list_documents":
count = len(result.get("documents", []))
return f"{count} docs"
if tool_name == "list_tables":
count = len(result.get("tables", []))
return f"{count} tables"
if tool_name == "describe_table":
count = len(result.get("columns", []))
return f"{count} columns"
if tool_name == "get_records":
count = len(result.get("records", []))
return f"{count} records"
if tool_name == "sql_query":
count = len(result.get("records", []))
return f"{count} rows"
if tool_name == "add_records":
count = len(arguments.get("records", []))
return f"{count} records"
if tool_name == "update_records":
count = len(arguments.get("records", []))
return f"{count} records"
if tool_name == "delete_records":
count = len(arguments.get("record_ids", []))
return f"{count} records"
if tool_name == "create_table":
count = len(arguments.get("columns", []))
return f"{count} columns"
if tool_name in ("add_column", "modify_column", "delete_column"):
return "1 column"
return "-"
```
**Step 4: Run test to verify it passes**
Run: `uv run pytest tests/unit/test_logging.py -v`
Expected: PASS (all tests)
**Step 5: Commit**
```bash
git add src/grist_mcp/logging.py tests/unit/test_logging.py
git commit -m "feat(logging): add stats extraction for all tools"
```
---
### Task 3: Log Line Formatter
**Files:**
- Modify: `src/grist_mcp/logging.py`
- Modify: `tests/unit/test_logging.py`
**Step 1: Write the failing tests**
Add to `tests/unit/test_logging.py`:
```python
from grist_mcp.logging import truncate_token, extract_stats, format_tool_log
class TestFormatToolLog:
def test_success_format(self):
line = format_tool_log(
agent_name="dev-agent",
token="abcdefghijklmnop",
tool="get_records",
document="sales",
stats="42 records",
status="success",
duration_ms=125,
)
assert "dev-agent" in line
assert "abc...nop" in line
assert "get_records" in line
assert "sales" in line
assert "42 records" in line
assert "success" in line
assert "125ms" in line
# Check pipe-delimited format
assert line.count("|") == 6
def test_no_document(self):
line = format_tool_log(
agent_name="dev-agent",
token="abcdefghijklmnop",
tool="list_documents",
document=None,
stats="3 docs",
status="success",
duration_ms=45,
)
assert "| - |" in line
def test_error_format(self):
line = format_tool_log(
agent_name="dev-agent",
token="abcdefghijklmnop",
tool="add_records",
document="inventory",
stats="5 records",
status="error",
duration_ms=89,
error_message="Grist API error: Invalid column 'foo'",
)
assert "error" in line
assert "\n Grist API error: Invalid column 'foo'" in line
```
**Step 2: Run test to verify it fails**
Run: `uv run pytest tests/unit/test_logging.py::TestFormatToolLog -v`
Expected: FAIL with "cannot import name 'format_tool_log'"
**Step 3: Write minimal implementation**
Add to `src/grist_mcp/logging.py`:
```python
from datetime import datetime
def format_tool_log(
agent_name: str,
token: str,
tool: str,
document: str | None,
stats: str,
status: str,
duration_ms: int,
error_message: str | None = None,
) -> str:
"""Format a tool call log line.
Format: YYYY-MM-DD HH:MM:SS | agent (token) | tool | doc | stats | status | duration
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
truncated = truncate_token(token)
doc = document if document else "-"
line = f"{timestamp} | {agent_name} ({truncated}) | {tool} | {doc} | {stats} | {status} | {duration_ms}ms"
if error_message:
line += f"\n {error_message}"
return line
```
**Step 4: Run test to verify it passes**
Run: `uv run pytest tests/unit/test_logging.py -v`
Expected: PASS (all tests)
**Step 5: Commit**
```bash
git add src/grist_mcp/logging.py tests/unit/test_logging.py
git commit -m "feat(logging): add log line formatter"
```
---
### Task 4: Setup Logging Function
**Files:**
- Modify: `src/grist_mcp/logging.py`
- Modify: `tests/unit/test_logging.py`
**Step 1: Write the failing tests**
Add to `tests/unit/test_logging.py`:
```python
import logging
import os
class TestSetupLogging:
def test_default_level_is_info(self, monkeypatch):
monkeypatch.delenv("LOG_LEVEL", raising=False)
from grist_mcp.logging import setup_logging
setup_logging()
logger = logging.getLogger("grist_mcp")
assert logger.level == logging.INFO
def test_respects_log_level_env(self, monkeypatch):
monkeypatch.setenv("LOG_LEVEL", "DEBUG")
from grist_mcp.logging import setup_logging
setup_logging()
logger = logging.getLogger("grist_mcp")
assert logger.level == logging.DEBUG
def test_invalid_level_defaults_to_info(self, monkeypatch):
monkeypatch.setenv("LOG_LEVEL", "INVALID")
from grist_mcp.logging import setup_logging
setup_logging()
logger = logging.getLogger("grist_mcp")
assert logger.level == logging.INFO
```
**Step 2: Run test to verify it fails**
Run: `uv run pytest tests/unit/test_logging.py::TestSetupLogging -v`
Expected: FAIL with "cannot import name 'setup_logging'"
**Step 3: Write minimal implementation**
Add to `src/grist_mcp/logging.py`:
```python
import logging
import os
def setup_logging() -> None:
"""Configure logging based on LOG_LEVEL environment variable.
Valid levels: DEBUG, INFO, WARNING, ERROR (default: INFO)
"""
level_name = os.environ.get("LOG_LEVEL", "INFO").upper()
level = getattr(logging, level_name, None)
if not isinstance(level, int):
level = logging.INFO
logger = logging.getLogger("grist_mcp")
logger.setLevel(level)
# Only add handler if not already configured
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
```
**Step 4: Run test to verify it passes**
Run: `uv run pytest tests/unit/test_logging.py -v`
Expected: PASS (all tests)
**Step 5: Commit**
```bash
git add src/grist_mcp/logging.py tests/unit/test_logging.py
git commit -m "feat(logging): add setup_logging with LOG_LEVEL support"
```
---
### Task 5: Get Logger Helper
**Files:**
- Modify: `src/grist_mcp/logging.py`
- Modify: `tests/unit/test_logging.py`
**Step 1: Write the failing test**
Add to `tests/unit/test_logging.py`:
```python
class TestGetLogger:
def test_returns_child_logger(self):
from grist_mcp.logging import get_logger
logger = get_logger("server")
assert logger.name == "grist_mcp.server"
def test_inherits_parent_level(self, monkeypatch):
monkeypatch.setenv("LOG_LEVEL", "WARNING")
from grist_mcp.logging import setup_logging, get_logger
setup_logging()
logger = get_logger("test")
# Child inherits from parent when level is NOTSET
assert logger.getEffectiveLevel() == logging.WARNING
```
**Step 2: Run test to verify it fails**
Run: `uv run pytest tests/unit/test_logging.py::TestGetLogger -v`
Expected: FAIL with "cannot import name 'get_logger'"
**Step 3: Write minimal implementation**
Add to `src/grist_mcp/logging.py`:
```python
def get_logger(name: str) -> logging.Logger:
"""Get a child logger under the grist_mcp namespace."""
return logging.getLogger(f"grist_mcp.{name}")
```
**Step 4: Run test to verify it passes**
Run: `uv run pytest tests/unit/test_logging.py -v`
Expected: PASS (all tests)
**Step 5: Commit**
```bash
git add src/grist_mcp/logging.py tests/unit/test_logging.py
git commit -m "feat(logging): add get_logger helper"
```
---
### Task 6: Integrate Logging into Server
**Files:**
- Modify: `src/grist_mcp/server.py`
**Step 1: Add logging imports and logger**
At the top of `src/grist_mcp/server.py`, add imports:
```python
import time
from grist_mcp.logging import get_logger, extract_stats, format_tool_log
logger = get_logger("server")
```
**Step 2: Wrap call_tool with logging**
Replace the `call_tool` function body (lines 209-276) with this logged version:
```python
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
start_time = time.time()
document = arguments.get("document")
# Log arguments at DEBUG level
logger.debug(
format_tool_log(
agent_name=_current_agent.name,
token=_current_agent.token,
tool=name,
document=document,
stats=f"args: {json.dumps(arguments)}",
status="started",
duration_ms=0,
)
)
try:
if name == "list_documents":
result = await _list_documents(_current_agent)
elif name == "list_tables":
result = await _list_tables(_current_agent, auth, arguments["document"])
elif name == "describe_table":
result = await _describe_table(
_current_agent, auth, arguments["document"], arguments["table"]
)
elif name == "get_records":
result = await _get_records(
_current_agent, auth, arguments["document"], arguments["table"],
filter=arguments.get("filter"),
sort=arguments.get("sort"),
limit=arguments.get("limit"),
)
elif name == "sql_query":
result = await _sql_query(
_current_agent, auth, arguments["document"], arguments["query"]
)
elif name == "add_records":
result = await _add_records(
_current_agent, auth, arguments["document"], arguments["table"],
arguments["records"],
)
elif name == "update_records":
result = await _update_records(
_current_agent, auth, arguments["document"], arguments["table"],
arguments["records"],
)
elif name == "delete_records":
result = await _delete_records(
_current_agent, auth, arguments["document"], arguments["table"],
arguments["record_ids"],
)
elif name == "create_table":
result = await _create_table(
_current_agent, auth, arguments["document"], arguments["table_id"],
arguments["columns"],
)
elif name == "add_column":
result = await _add_column(
_current_agent, auth, arguments["document"], arguments["table"],
arguments["column_id"], arguments["column_type"],
formula=arguments.get("formula"),
)
elif name == "modify_column":
result = await _modify_column(
_current_agent, auth, arguments["document"], arguments["table"],
arguments["column_id"],
type=arguments.get("type"),
formula=arguments.get("formula"),
)
elif name == "delete_column":
result = await _delete_column(
_current_agent, auth, arguments["document"], arguments["table"],
arguments["column_id"],
)
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
duration_ms = int((time.time() - start_time) * 1000)
stats = extract_stats(name, arguments, result)
logger.info(
format_tool_log(
agent_name=_current_agent.name,
token=_current_agent.token,
tool=name,
document=document,
stats=stats,
status="success",
duration_ms=duration_ms,
)
)
return [TextContent(type="text", text=json.dumps(result))]
except AuthError as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.warning(
format_tool_log(
agent_name=_current_agent.name,
token=_current_agent.token,
tool=name,
document=document,
stats="-",
status="auth_error",
duration_ms=duration_ms,
error_message=str(e),
)
)
return [TextContent(type="text", text=f"Authorization error: {e}")]
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(
format_tool_log(
agent_name=_current_agent.name,
token=_current_agent.token,
tool=name,
document=document,
stats="-",
status="error",
duration_ms=duration_ms,
error_message=str(e),
)
)
return [TextContent(type="text", text=f"Error: {e}")]
```
**Step 3: Run tests to verify nothing broke**
Run: `uv run pytest tests/unit/ -v`
Expected: PASS (all tests)
**Step 4: Commit**
```bash
git add src/grist_mcp/server.py
git commit -m "feat(logging): add tool call logging to server"
```
---
### Task 7: Initialize Logging in Main
**Files:**
- Modify: `src/grist_mcp/main.py`
**Step 1: Add logging setup to main()**
Add import at top of `src/grist_mcp/main.py`:
```python
from grist_mcp.logging import setup_logging
```
**Step 2: Call setup_logging at start of main()**
In the `main()` function, add as the first line after the port/config variables:
```python
def main():
"""Run the SSE server."""
port = int(os.environ.get("PORT", "3000"))
external_port = int(os.environ.get("EXTERNAL_PORT", str(port)))
config_path = os.environ.get("CONFIG_PATH", "/app/config.yaml")
setup_logging() # <-- Add this line
if not _ensure_config(config_path):
```
**Step 3: Run tests to verify nothing broke**
Run: `uv run pytest tests/unit/ -v`
Expected: PASS (all tests)
**Step 4: Commit**
```bash
git add src/grist_mcp/main.py
git commit -m "feat(logging): initialize logging on server startup"
```
---
### Task 8: Suppress Health Check Noise
**Files:**
- Modify: `src/grist_mcp/main.py`
**Step 1: Configure uvicorn to use custom log config**
Replace the `uvicorn.run` call in `main()` with:
```python
# Configure uvicorn logging to reduce health check noise
log_config = uvicorn.config.LOGGING_CONFIG
log_config["formatters"]["default"]["fmt"] = "%(message)s"
log_config["formatters"]["access"]["fmt"] = "%(message)s"
uvicorn.run(app, host="0.0.0.0", port=port, log_config=log_config)
```
**Step 2: Add health check filter**
Create a filter class and apply it. Add before the `main()` function:
```python
class HealthCheckFilter(logging.Filter):
"""Filter out health check requests at INFO level."""
def filter(self, record: logging.LogRecord) -> bool:
message = record.getMessage()
if "/health" in message:
# Downgrade to DEBUG by changing the level
record.levelno = logging.DEBUG
record.levelname = "DEBUG"
return True
```
Add import at top:
```python
import logging
```
**Step 3: Apply filter in main()**
After `setup_logging()` call, add:
```python
setup_logging()
# Add health check filter to uvicorn access logger
logging.getLogger("uvicorn.access").addFilter(HealthCheckFilter())
```
**Step 4: Run tests to verify nothing broke**
Run: `uv run pytest tests/unit/ -v`
Expected: PASS (all tests)
**Step 5: Commit**
```bash
git add src/grist_mcp/main.py
git commit -m "feat(logging): suppress health checks at INFO level"
```
---
### Task 9: Manual Verification
**Step 1: Start development environment**
Run: `make dev-up`
**Step 2: Make some tool calls**
Use Claude Code or another MCP client to call some tools (list_documents, get_records, etc.)
**Step 3: Verify log format**
Check docker logs show the expected format:
```
2026-01-02 10:15:23 | dev-agent (abc...xyz) | get_records | sales | 42 records | success | 125ms
```
**Step 4: Test DEBUG level**
Restart with `LOG_LEVEL=DEBUG` and verify:
- Health checks appear
- Detailed args appear for each call
**Step 5: Clean up**
Run: `make dev-down`
---
### Task 10: Update Module Exports
**Files:**
- Modify: `src/grist_mcp/logging.py`
**Step 1: Add __all__ export list**
At the top of `src/grist_mcp/logging.py` (after imports), add:
```python
__all__ = [
"setup_logging",
"get_logger",
"truncate_token",
"extract_stats",
"format_tool_log",
]
```
**Step 2: Run all tests**
Run: `uv run pytest tests/unit/ -v`
Expected: PASS (all tests)
**Step 3: Final commit**
```bash
git add src/grist_mcp/logging.py
git commit -m "chore(logging): add module exports"
```
---
## Summary
After completing all tasks, the logging module provides:
- `LOG_LEVEL` environment variable support (DEBUG/INFO/WARNING/ERROR)
- Human-readable pipe-delimited log format
- Token truncation for security
- Stats extraction per tool type
- Health check suppression at INFO level
- Multi-line error details
The implementation follows TDD with frequent commits, keeping each change small and verifiable.

View File

@@ -0,0 +1,310 @@
# Session Token Proxy Design
## Problem
When an agent needs to insert, update, or query thousands of records, the LLM must generate all that JSON in its response. This is slow regardless of how fast the actual API call is. The LLM generation time is the bottleneck.
## Solution
Add a "session token" mechanism that lets agents delegate bulk data operations to scripts that call grist-mcp directly over HTTP, bypassing LLM generation entirely.
## Flow
```
1. Agent calls MCP tool:
request_session_token(document="sales", permissions=["write"], ttl_seconds=300)
2. Server generates token, stores in memory:
{"sess_abc123...": {document: "sales", permissions: ["write"], expires: <timestamp>}}
3. Server returns token to agent:
{"token": "sess_abc123...", "expires_in": 300, "proxy_url": "/api/v1/proxy"}
4. Agent spawns script with token:
python bulk_insert.py --token sess_abc123... --file data.csv
5. Script calls grist-mcp HTTP endpoint:
POST /api/v1/proxy
Authorization: Bearer sess_abc123...
{"table": "Orders", "method": "add_records", "records": [...]}
6. Server validates token, executes against Grist, returns result
```
## Design Decisions
| Decision | Choice | Rationale |
|----------|--------|-----------|
| Token scope | Single document + permission level | Simpler than multi-doc; matches existing permission model |
| Token storage | In-memory dict | Appropriate for short-lived tokens; restart invalidates (acceptable) |
| HTTP interface | Wrapped endpoint `/api/v1/proxy` | Simpler than mirroring Grist API paths |
| Request format | Discrete fields (table, method, etc.) | Scripts don't need to know Grist internals or doc IDs |
| Document in request | Implicit from token | Token is scoped to one document; no need to specify |
| Server architecture | Single process, add routes | Already running HTTP for SSE; just add routes |
## MCP Tool: get_proxy_documentation
Returns complete documentation for the HTTP proxy API. Agents call this when writing scripts that will use the proxy.
**Input schema**:
```json
{
"type": "object",
"properties": {},
"required": []
}
```
**Response**:
```json
{
"description": "HTTP proxy API for bulk data operations. Use request_session_token to get a short-lived token, then call the proxy endpoint directly from scripts.",
"endpoint": "POST /api/v1/proxy",
"authentication": "Bearer token in Authorization header",
"request_format": {
"method": "Operation name (required)",
"table": "Table name (required for most operations)",
"...": "Additional fields vary by method"
},
"methods": {
"get_records": {
"description": "Fetch records from a table",
"fields": {"table": "string", "filter": "object (optional)", "sort": "string (optional)", "limit": "integer (optional)"}
},
"sql_query": {
"description": "Run a read-only SQL query",
"fields": {"query": "string"}
},
"list_tables": {
"description": "List all tables in the document",
"fields": {}
},
"describe_table": {
"description": "Get column information for a table",
"fields": {"table": "string"}
},
"add_records": {
"description": "Add records to a table",
"fields": {"table": "string", "records": "array of objects"}
},
"update_records": {
"description": "Update existing records",
"fields": {"table": "string", "records": "array of {id, fields}"}
},
"delete_records": {
"description": "Delete records by ID",
"fields": {"table": "string", "record_ids": "array of integers"}
},
"create_table": {
"description": "Create a new table",
"fields": {"table_id": "string", "columns": "array of {id, type}"}
},
"add_column": {
"description": "Add a column to a table",
"fields": {"table": "string", "column_id": "string", "column_type": "string", "formula": "string (optional)"}
},
"modify_column": {
"description": "Modify a column's type or formula",
"fields": {"table": "string", "column_id": "string", "type": "string (optional)", "formula": "string (optional)"}
},
"delete_column": {
"description": "Delete a column",
"fields": {"table": "string", "column_id": "string"}
}
},
"response_format": {
"success": {"success": true, "data": "..."},
"error": {"success": false, "error": "message", "code": "ERROR_CODE"}
},
"error_codes": ["UNAUTHORIZED", "INVALID_TOKEN", "TOKEN_EXPIRED", "INVALID_REQUEST", "GRIST_ERROR"],
"example_script": "#!/usr/bin/env python3\nimport requests\nimport sys\n\ntoken = sys.argv[1]\nhost = sys.argv[2]\n\nresponse = requests.post(\n f'{host}/api/v1/proxy',\n headers={'Authorization': f'Bearer {token}'},\n json={'method': 'add_records', 'table': 'Orders', 'records': [{'item': 'Widget', 'qty': 100}]}\n)\nprint(response.json())"
}
```
## MCP Tool: request_session_token
**Input schema**:
```json
{
"type": "object",
"properties": {
"document": {
"type": "string",
"description": "Document name to grant access to"
},
"permissions": {
"type": "array",
"items": {"type": "string", "enum": ["read", "write", "schema"]},
"description": "Permission levels to grant (cannot exceed agent's permissions)"
},
"ttl_seconds": {
"type": "integer",
"description": "Token lifetime in seconds (max 3600, default 300)"
}
},
"required": ["document", "permissions"]
}
```
**Response**:
```json
{
"token": "sess_a1b2c3d4...",
"document": "sales",
"permissions": ["write"],
"expires_at": "2025-01-02T15:30:00Z",
"proxy_url": "/api/v1/proxy"
}
```
**Validation**:
- Agent must have access to the requested document
- Requested permissions cannot exceed agent's permissions for that document
- TTL capped at 3600 seconds (1 hour), default 300 seconds (5 minutes)
## Proxy Endpoint
**Endpoint**: `POST /api/v1/proxy`
**Authentication**: `Authorization: Bearer <session_token>`
**Request body** - method determines required fields:
```python
# Read operations
{"method": "get_records", "table": "Orders", "filter": {...}, "sort": "date", "limit": 1000}
{"method": "sql_query", "query": "SELECT * FROM Orders WHERE amount > 100"}
{"method": "list_tables"}
{"method": "describe_table", "table": "Orders"}
# Write operations
{"method": "add_records", "table": "Orders", "records": [{...}, {...}]}
{"method": "update_records", "table": "Orders", "records": [{"id": 1, "fields": {...}}]}
{"method": "delete_records", "table": "Orders", "record_ids": [1, 2, 3]}
# Schema operations
{"method": "create_table", "table_id": "NewTable", "columns": [{...}]}
{"method": "add_column", "table": "Orders", "column_id": "status", "column_type": "Text"}
{"method": "modify_column", "table": "Orders", "column_id": "status", "type": "Choice"}
{"method": "delete_column", "table": "Orders", "column_id": "old_field"}
```
**Response format**:
```json
{"success": true, "data": {...}}
{"success": false, "error": "Permission denied", "code": "UNAUTHORIZED"}
```
**Error codes**:
- `UNAUTHORIZED` - Permission denied for this operation
- `INVALID_TOKEN` - Token format invalid or not found
- `TOKEN_EXPIRED` - Token has expired
- `INVALID_REQUEST` - Malformed request body
- `GRIST_ERROR` - Error from Grist API
## Implementation Architecture
### New Files
**`src/grist_mcp/session.py`** - Session token management:
```python
@dataclass
class SessionToken:
token: str
document: str
permissions: list[str]
agent_name: str
created_at: datetime
expires_at: datetime
class SessionTokenManager:
def __init__(self):
self._tokens: dict[str, SessionToken] = {}
def create_token(self, agent: Agent, document: str,
permissions: list[str], ttl_seconds: int) -> SessionToken:
"""Create a new session token. Validates permissions against agent's scope."""
...
def validate_token(self, token: str) -> SessionToken | None:
"""Validate token and return session info. Returns None if invalid/expired."""
# Also cleans up this token if expired
...
def cleanup_expired(self) -> int:
"""Remove all expired tokens. Returns count removed."""
...
```
**`src/grist_mcp/proxy.py`** - HTTP proxy handler:
```python
async def handle_proxy(
scope: Scope,
receive: Receive,
send: Send,
token_manager: SessionTokenManager,
auth: Authenticator
) -> None:
"""Handle POST /api/v1/proxy requests."""
# 1. Extract Bearer token from Authorization header
# 2. Validate session token
# 3. Parse request body (method, table, etc.)
# 4. Check permissions for requested method
# 5. Build GristClient for the token's document
# 6. Dispatch to appropriate tool function
# 7. Return JSON response
```
### Modified Files
**`src/grist_mcp/main.py`**:
- Import `SessionTokenManager` and `handle_proxy`
- Instantiate `SessionTokenManager` in `create_app()`
- Add route: `elif path == "/api/v1/proxy" and method == "POST"`
- Pass `token_manager` to `create_server()`
**`src/grist_mcp/server.py`**:
- Accept `token_manager` parameter in `create_server()`
- Add `get_proxy_documentation` tool to `list_tools()` (no parameters, returns static docs)
- Add `request_session_token` tool to `list_tools()`
- Add handlers in `call_tool()` for both tools
## Security
1. **No privilege escalation** - Session token can only grant permissions the agent already has for the document. Validated at token creation.
2. **Short-lived by default** - 5 minute default TTL, 1 hour maximum cap.
3. **Token format** - Prefixed with `sess_` to distinguish from agent tokens. Generated with `secrets.token_urlsafe(32)`.
4. **Lazy cleanup** - Expired tokens removed during validation. No background task needed.
5. **Audit logging** - Token creation and proxy requests logged with agent name, document, method.
## Testing
### Unit Tests
**`tests/unit/test_session.py`**:
- Token creation with valid permissions
- Token creation fails when exceeding agent permissions
- Token validation succeeds for valid token
- Token validation fails for expired token
- Token validation fails for unknown token
- TTL capping at maximum
- Cleanup removes expired tokens
**`tests/unit/test_proxy.py`**:
- Request parsing for each method type
- Error response for invalid token
- Error response for expired token
- Error response for permission denied
- Error response for malformed request
- Successful dispatch to each tool function (mocked)
### Integration Tests
**`tests/integration/test_session_proxy.py`**:
- Full flow: MCP token request → HTTP proxy call → Grist operation
- Verify data actually written to Grist
- Verify token expiry prevents access

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,187 @@
# Attachment Upload Feature Design
**Date:** 2026-01-03
**Status:** Approved
## Summary
Add an `upload_attachment` MCP tool to upload files to Grist documents and receive an attachment ID for linking to records.
## Design Decisions
| Decision | Choice | Rationale |
|----------|--------|-----------|
| Content encoding | Base64 string | MCP tools use JSON; binary must be encoded |
| Batch support | Single file only | YAGNI; caller can loop if needed |
| Linking behavior | Upload only, return ID | Single responsibility; use existing `update_records` to link |
| Download support | Not included | YAGNI; can add later if needed |
| Permission level | Write | Attachments are data, not schema |
| Proxy support | MCP tool only | Reduces scope; scripts can use Grist API directly |
## Tool Interface
### Input Schema
```json
{
"type": "object",
"properties": {
"document": {
"type": "string",
"description": "Document name"
},
"filename": {
"type": "string",
"description": "Filename with extension (e.g., 'invoice.pdf')"
},
"content_base64": {
"type": "string",
"description": "File content as base64-encoded string"
},
"content_type": {
"type": "string",
"description": "MIME type (optional, auto-detected from filename if omitted)"
}
},
"required": ["document", "filename", "content_base64"]
}
```
### Response
```json
{
"attachment_id": 42,
"filename": "invoice.pdf",
"size_bytes": 30720
}
```
### Usage Example
```python
# 1. Upload attachment
result = upload_attachment(
document="accounting",
filename="Invoice-001.pdf",
content_base64="JVBERi0xLjQK..."
)
# 2. Link to record via existing update_records tool
update_records("Bills", [{
"id": 1,
"fields": {"Attachment": [result["attachment_id"]]}
}])
```
## Implementation
### Files to Modify
1. **`src/grist_mcp/grist_client.py`** - Add `upload_attachment()` method
2. **`src/grist_mcp/tools/write.py`** - Add tool function
3. **`src/grist_mcp/server.py`** - Register tool
### GristClient Method
```python
async def upload_attachment(
self,
filename: str,
content: bytes,
content_type: str | None = None
) -> dict:
"""Upload a file attachment. Returns attachment metadata."""
if content_type is None:
content_type = "application/octet-stream"
files = {"upload": (filename, content, content_type)}
async with httpx.AsyncClient(timeout=self._timeout) as client:
response = await client.post(
f"{self._base_url}/attachments",
headers=self._headers,
files=files,
)
response.raise_for_status()
# Grist returns list of attachment IDs
attachment_ids = response.json()
return {
"attachment_id": attachment_ids[0],
"filename": filename,
"size_bytes": len(content),
}
```
### Tool Function
```python
import base64
import mimetypes
async def upload_attachment(
agent: Agent,
auth: Authenticator,
document: str,
filename: str,
content_base64: str,
content_type: str | None = None,
client: GristClient | None = None,
) -> dict:
"""Upload a file attachment to a document."""
auth.authorize(agent, document, Permission.WRITE)
# Decode base64
try:
content = base64.b64decode(content_base64)
except Exception:
raise ValueError("Invalid base64 encoding")
# Auto-detect MIME type if not provided
if content_type is None:
content_type, _ = mimetypes.guess_type(filename)
if content_type is None:
content_type = "application/octet-stream"
if client is None:
doc = auth.get_document(document)
client = GristClient(doc)
return await client.upload_attachment(filename, content, content_type)
```
## Error Handling
| Error | Cause | Response |
|-------|-------|----------|
| Invalid base64 | Malformed content_base64 | `ValueError: Invalid base64 encoding` |
| Authorization | Agent lacks write permission | `AuthError` (existing pattern) |
| Grist API error | Upload fails | `httpx.HTTPStatusError` (existing pattern) |
## Testing
### Unit Tests
**`tests/unit/test_tools_write.py`:**
- `test_upload_attachment_success` - Valid base64, returns attachment_id
- `test_upload_attachment_invalid_base64` - Raises ValueError
- `test_upload_attachment_auth_required` - Verifies write permission check
- `test_upload_attachment_mime_detection` - Auto-detects type from filename
**`tests/unit/test_grist_client.py`:**
- `test_upload_attachment_api_call` - Correct multipart request format
- `test_upload_attachment_with_explicit_content_type` - Passes through MIME type
### Mock Approach
Mock `httpx.AsyncClient` responses; no Grist server needed for unit tests.
## Future Considerations
Not included in this implementation (YAGNI):
- Batch upload (multiple files)
- Download attachment
- Proxy API support
- Size limit validation (rely on Grist's limits)
These can be added if real use cases emerge.

View File

@@ -1,6 +1,6 @@
[project]
name = "grist-mcp"
version = "1.0.0"
version = "1.4.1"
description = "MCP server for AI agents to interact with Grist documents"
requires-python = ">=3.14"
dependencies = [
@@ -28,3 +28,6 @@ build-backend = "hatchling.build"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests/unit", "tests/integration"]
markers = [
"integration: marks tests as integration tests (require Docker containers)",
]

3
renovate.json Normal file
View File

@@ -0,0 +1,3 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}

View File

@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# scripts/get-test-instance-id.sh
# Generate a unique instance ID from git branch for parallel test isolation

View File

@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# scripts/run-integration-tests.sh
# Run integration tests with branch isolation and dynamic port discovery
set -e

View File

@@ -14,6 +14,7 @@ class Document:
url: str
doc_id: str
api_key: str
host_header: str | None = None # Override Host header for Docker networking
@dataclass
@@ -78,6 +79,7 @@ def load_config(config_path: str) -> Config:
url=doc_data["url"],
doc_id=doc_data["doc_id"],
api_key=doc_data["api_key"],
host_header=doc_data.get("host_header"),
)
# Parse tokens

View File

@@ -17,6 +17,8 @@ class GristClient:
self._doc = document
self._base_url = f"{document.url.rstrip('/')}/api/docs/{document.doc_id}"
self._headers = {"Authorization": f"Bearer {document.api_key}"}
if document.host_header:
self._headers["Host"] = document.host_header
self._timeout = timeout
async def _request(self, method: str, path: str, **kwargs) -> dict:
@@ -114,6 +116,71 @@ class GristClient:
"""Delete records by ID."""
await self._request("POST", f"/tables/{table}/data/delete", json=record_ids)
async def upload_attachment(
self,
filename: str,
content: bytes,
content_type: str = "application/octet-stream",
) -> dict:
"""Upload a file attachment. Returns attachment metadata.
Args:
filename: Name for the uploaded file.
content: File content as bytes.
content_type: MIME type of the file.
Returns:
Dict with attachment_id, filename, and size_bytes.
"""
files = {"upload": (filename, content, content_type)}
async with httpx.AsyncClient(timeout=self._timeout) as client:
response = await client.post(
f"{self._base_url}/attachments",
headers=self._headers,
files=files,
)
response.raise_for_status()
# Grist returns list of attachment IDs
attachment_ids = response.json()
return {
"attachment_id": attachment_ids[0],
"filename": filename,
"size_bytes": len(content),
}
async def download_attachment(self, attachment_id: int) -> dict:
"""Download an attachment by ID.
Args:
attachment_id: The ID of the attachment to download.
Returns:
Dict with content (bytes), content_type, and filename.
"""
import re
async with httpx.AsyncClient(timeout=self._timeout) as client:
response = await client.get(
f"{self._base_url}/attachments/{attachment_id}/download",
headers=self._headers,
)
response.raise_for_status()
# Extract filename from Content-Disposition header
content_disp = response.headers.get("content-disposition", "")
filename = None
if "filename=" in content_disp:
match = re.search(r'filename="?([^";]+)"?', content_disp)
if match:
filename = match.group(1)
return {
"content": response.content,
"content_type": response.headers.get("content-type", "application/octet-stream"),
"filename": filename,
}
# Schema operations
async def create_table(self, table_id: str, columns: list[dict]) -> str:
@@ -136,11 +203,14 @@ class GristClient:
column_id: str,
column_type: str,
formula: str | None = None,
label: str | None = None,
) -> str:
"""Add a column to a table. Returns column ID."""
fields = {"type": column_type}
if formula:
fields["formula"] = formula
if label:
fields["label"] = label
payload = {"columns": [{"id": column_id, "fields": fields}]}
data = await self._request("POST", f"/tables/{table}/columns", json=payload)
@@ -152,13 +222,16 @@ class GristClient:
column_id: str,
type: str | None = None,
formula: str | None = None,
label: str | None = None,
) -> None:
"""Modify a column's type or formula."""
"""Modify a column's type, formula, or label."""
fields = {}
if type is not None:
fields["type"] = type
if formula is not None:
fields["formula"] = formula
if label is not None:
fields["label"] = label
payload = {"columns": [{"id": column_id, "fields": fields}]}
await self._request("PATCH", f"/tables/{table}/columns", json=payload)

120
src/grist_mcp/logging.py Normal file
View File

@@ -0,0 +1,120 @@
"""Logging configuration and utilities."""
import logging
import os
from datetime import datetime
__all__ = [
"setup_logging",
"get_logger",
"truncate_token",
"extract_stats",
"format_tool_log",
]
def setup_logging() -> None:
"""Configure logging based on LOG_LEVEL environment variable.
Valid levels: DEBUG, INFO, WARNING, ERROR (default: INFO)
"""
level_name = os.environ.get("LOG_LEVEL", "INFO").upper()
level = getattr(logging, level_name, None)
if not isinstance(level, int):
level = logging.INFO
logger = logging.getLogger("grist_mcp")
logger.setLevel(level)
logger.propagate = False # Prevent duplicate logs to root logger
# Only add handler if not already configured
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
def extract_stats(tool_name: str, arguments: dict, result: dict) -> str:
"""Extract meaningful stats from tool call based on tool type."""
if tool_name == "list_documents":
count = len(result.get("documents", []))
return f"{count} docs"
if tool_name == "list_tables":
count = len(result.get("tables", []))
return f"{count} tables"
if tool_name == "describe_table":
count = len(result.get("columns", []))
return f"{count} columns"
if tool_name == "get_records":
count = len(result.get("records", []))
return f"{count} records"
if tool_name == "sql_query":
count = len(result.get("records", []))
return f"{count} rows"
if tool_name == "add_records":
count = len(arguments.get("records", []))
return f"{count} records"
if tool_name == "update_records":
count = len(arguments.get("records", []))
return f"{count} records"
if tool_name == "delete_records":
count = len(arguments.get("record_ids", []))
return f"{count} records"
if tool_name == "create_table":
count = len(arguments.get("columns", []))
return f"{count} columns"
if tool_name in ("add_column", "modify_column", "delete_column"):
return "1 column"
return "-"
def truncate_token(token: str) -> str:
"""Truncate token to show first 3 and last 3 chars.
Tokens 8 chars or shorter show *** for security.
"""
if len(token) <= 8:
return "***"
return f"{token[:3]}...{token[-3:]}"
def format_tool_log(
agent_name: str,
token: str,
tool: str,
document: str | None,
stats: str,
status: str,
duration_ms: int,
error_message: str | None = None,
) -> str:
"""Format a tool call log line.
Format: YYYY-MM-DD HH:MM:SS | agent (token) | tool | doc | stats | status | duration
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
truncated = truncate_token(token)
doc = document if document else "-"
line = f"{timestamp} | {agent_name} ({truncated}) | {tool} | {doc} | {stats} | {status} | {duration_ms}ms"
if error_message:
line += f"\n {error_message}"
return line
def get_logger(name: str) -> logging.Logger:
"""Get a child logger under the grist_mcp namespace."""
return logging.getLogger(f"grist_mcp.{name}")

View File

@@ -1,6 +1,7 @@
"""Main entry point for the MCP server with SSE transport."""
import json
import logging
import os
import sys
from typing import Any
@@ -11,6 +12,10 @@ from mcp.server.sse import SseServerTransport
from grist_mcp.server import create_server
from grist_mcp.config import Config, load_config
from grist_mcp.auth import Authenticator, AuthError
from grist_mcp.session import SessionTokenManager
from grist_mcp.proxy import parse_proxy_request, dispatch_proxy_request, ProxyError
from grist_mcp.grist_client import GristClient
from grist_mcp.logging import setup_logging
Scope = dict[str, Any]
@@ -41,6 +46,76 @@ async def send_error(send: Send, status: int, message: str) -> None:
})
async def send_json_response(send: Send, status: int, data: dict) -> None:
"""Send a JSON response."""
body = json.dumps(data).encode()
await send({
"type": "http.response.start",
"status": status,
"headers": [[b"content-type", b"application/json"]],
})
await send({
"type": "http.response.body",
"body": body,
})
def _parse_multipart(content_type: str, body: bytes) -> tuple[str | None, bytes | None]:
"""Parse multipart/form-data to extract uploaded file.
Returns (filename, content) or (None, None) if parsing fails.
"""
import re
# Extract boundary from content-type
match = re.search(r'boundary=([^\s;]+)', content_type)
if not match:
return None, None
boundary = match.group(1).encode()
if boundary.startswith(b'"') and boundary.endswith(b'"'):
boundary = boundary[1:-1]
# Split by boundary
parts = body.split(b'--' + boundary)
for part in parts:
if b'Content-Disposition' not in part:
continue
# Split headers from content
if b'\r\n\r\n' in part:
header_section, content = part.split(b'\r\n\r\n', 1)
elif b'\n\n' in part:
header_section, content = part.split(b'\n\n', 1)
else:
continue
headers = header_section.decode('utf-8', errors='replace')
# Check if this is a file upload
if 'filename=' not in headers:
continue
# Extract filename
filename_match = re.search(r'filename="([^"]+)"', headers)
if not filename_match:
filename_match = re.search(r"filename=([^\s;]+)", headers)
if not filename_match:
continue
filename = filename_match.group(1)
# Remove trailing boundary marker and whitespace
content = content.rstrip()
if content.endswith(b'--'):
content = content[:-2].rstrip()
return filename, content
return None, None
CONFIG_TEMPLATE = """\
# grist-mcp configuration
#
@@ -74,25 +149,42 @@ def _ensure_config(config_path: str) -> bool:
# Check if path is a directory (Docker creates this when mounting missing file)
if os.path.isdir(path):
os.rmdir(path)
print(f"ERROR: Config path is a directory: {path}")
print()
print("This usually means the config file doesn't exist on the host.")
print("Please create the config file before starting the container:")
print()
print(f" mkdir -p $(dirname {config_path})")
print(f" cat > {config_path} << 'EOF'")
print(CONFIG_TEMPLATE)
print("EOF")
print()
return False
if os.path.exists(path):
return True
# Create template config
with open(path, "w") as f:
f.write(CONFIG_TEMPLATE)
print(f"Created template configuration at: {path}")
print()
print("Please edit this file to configure your Grist documents and agent tokens,")
print("then restart the server.")
try:
with open(path, "w") as f:
f.write(CONFIG_TEMPLATE)
print(f"Created template configuration at: {path}")
print()
print("Please edit this file to configure your Grist documents and agent tokens,")
print("then restart the server.")
except PermissionError:
print(f"ERROR: Cannot create config file at: {path}")
print()
print("Please create the config file manually before starting the container.")
print()
return False
def create_app(config: Config):
"""Create the ASGI application."""
auth = Authenticator(config)
token_manager = SessionTokenManager()
proxy_base_url = os.environ.get("GRIST_MCP_URL")
sse = SseServerTransport("/messages")
@@ -110,7 +202,7 @@ def create_app(config: Config):
return
# Create a server instance for this authenticated connection
server = create_server(auth, agent)
server = create_server(auth, agent, token_manager, proxy_base_url)
async with sse.connect_sse(scope, receive, send) as streams:
await server.run(
@@ -142,6 +234,196 @@ def create_app(config: Config):
"body": b'{"error":"Not found"}',
})
async def handle_proxy(scope: Scope, receive: Receive, send: Send) -> None:
# Extract token
token = _get_bearer_token(scope)
if not token:
await send_json_response(send, 401, {
"success": False,
"error": "Missing Authorization header",
"code": "INVALID_TOKEN",
})
return
# Validate session token
session = token_manager.validate_token(token)
if session is None:
await send_json_response(send, 401, {
"success": False,
"error": "Invalid or expired token",
"code": "TOKEN_EXPIRED",
})
return
# Read request body
body = b""
while True:
message = await receive()
body += message.get("body", b"")
if not message.get("more_body", False):
break
try:
request_data = json.loads(body)
except json.JSONDecodeError:
await send_json_response(send, 400, {
"success": False,
"error": "Invalid JSON",
"code": "INVALID_REQUEST",
})
return
# Parse and dispatch
try:
request = parse_proxy_request(request_data)
result = await dispatch_proxy_request(request, session, auth)
await send_json_response(send, 200, result)
except ProxyError as e:
status = 403 if e.code == "UNAUTHORIZED" else 400
await send_json_response(send, status, {
"success": False,
"error": e.message,
"code": e.code,
})
async def handle_attachments(scope: Scope, receive: Receive, send: Send) -> None:
"""Handle file attachment uploads via multipart/form-data."""
# Extract token
token = _get_bearer_token(scope)
if not token:
await send_json_response(send, 401, {
"success": False,
"error": "Missing Authorization header",
"code": "INVALID_TOKEN",
})
return
# Validate session token
session = token_manager.validate_token(token)
if session is None:
await send_json_response(send, 401, {
"success": False,
"error": "Invalid or expired token",
"code": "TOKEN_EXPIRED",
})
return
# Check write permission
if "write" not in session.permissions:
await send_json_response(send, 403, {
"success": False,
"error": "Write permission required for attachment upload",
"code": "UNAUTHORIZED",
})
return
# Get content-type header
headers = dict(scope.get("headers", []))
content_type = headers.get(b"content-type", b"").decode()
if not content_type.startswith("multipart/form-data"):
await send_json_response(send, 400, {
"success": False,
"error": "Content-Type must be multipart/form-data",
"code": "INVALID_REQUEST",
})
return
# Read request body
body = b""
while True:
message = await receive()
body += message.get("body", b"")
if not message.get("more_body", False):
break
# Parse multipart
filename, content = _parse_multipart(content_type, body)
if filename is None or content is None:
await send_json_response(send, 400, {
"success": False,
"error": "No file found in request",
"code": "INVALID_REQUEST",
})
return
# Upload to Grist
try:
doc = auth.get_document(session.document)
client = GristClient(doc)
result = await client.upload_attachment(filename, content)
await send_json_response(send, 200, {
"success": True,
"data": result,
})
except Exception as e:
await send_json_response(send, 500, {
"success": False,
"error": str(e),
"code": "GRIST_ERROR",
})
async def handle_attachment_download(
scope: Scope, receive: Receive, send: Send, attachment_id: int
) -> None:
"""Handle attachment download by ID."""
# Extract token
token = _get_bearer_token(scope)
if not token:
await send_json_response(send, 401, {
"success": False,
"error": "Missing Authorization header",
"code": "INVALID_TOKEN",
})
return
# Validate session token
session = token_manager.validate_token(token)
if session is None:
await send_json_response(send, 401, {
"success": False,
"error": "Invalid or expired token",
"code": "TOKEN_EXPIRED",
})
return
# Check read permission
if "read" not in session.permissions:
await send_json_response(send, 403, {
"success": False,
"error": "Read permission required for attachment download",
"code": "UNAUTHORIZED",
})
return
# Download from Grist
try:
doc = auth.get_document(session.document)
client = GristClient(doc)
result = await client.download_attachment(attachment_id)
# Build response headers
headers = [[b"content-type", result["content_type"].encode()]]
if result["filename"]:
disposition = f'attachment; filename="{result["filename"]}"'
headers.append([b"content-disposition", disposition.encode()])
await send({
"type": "http.response.start",
"status": 200,
"headers": headers,
})
await send({
"type": "http.response.body",
"body": result["content"],
})
except Exception as e:
await send_json_response(send, 500, {
"success": False,
"error": str(e),
"code": "GRIST_ERROR",
})
async def app(scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
return
@@ -155,6 +437,21 @@ def create_app(config: Config):
await handle_sse(scope, receive, send)
elif path == "/messages" and method == "POST":
await handle_messages(scope, receive, send)
elif path == "/api/v1/proxy" and method == "POST":
await handle_proxy(scope, receive, send)
elif path == "/api/v1/attachments" and method == "POST":
await handle_attachments(scope, receive, send)
elif path.startswith("/api/v1/attachments/") and method == "GET":
# Parse attachment ID from path: /api/v1/attachments/{id}
try:
attachment_id = int(path.split("/")[-1])
await handle_attachment_download(scope, receive, send, attachment_id)
except ValueError:
await send_json_response(send, 400, {
"success": False,
"error": "Invalid attachment ID",
"code": "INVALID_REQUEST",
})
else:
await handle_not_found(scope, receive, send)
@@ -163,23 +460,47 @@ def create_app(config: Config):
def _print_mcp_config(external_port: int, tokens: list) -> None:
"""Print Claude Code MCP configuration."""
# Use GRIST_MCP_URL if set, otherwise fall back to localhost
base_url = os.environ.get("GRIST_MCP_URL")
if base_url:
sse_url = f"{base_url.rstrip('/')}/sse"
else:
sse_url = f"http://localhost:{external_port}/sse"
print()
print("Claude Code MCP configuration (copy-paste to add):")
for t in tokens:
config = (
f'{{"type": "sse", "url": "http://localhost:{external_port}/sse", '
f'{{"type": "sse", "url": "{sse_url}", '
f'"headers": {{"Authorization": "Bearer {t.token}"}}}}'
)
print(f" claude mcp add-json grist-{t.name} '{config}'")
print()
class UvicornAccessFilter(logging.Filter):
"""Suppress uvicorn access logs unless LOG_LEVEL is DEBUG.
At INFO level, only grist_mcp tool logs are shown.
At DEBUG level, all HTTP requests are visible.
"""
def filter(self, record: logging.LogRecord) -> bool:
# Only show uvicorn access logs at DEBUG level
return os.environ.get("LOG_LEVEL", "INFO").upper() == "DEBUG"
def main():
"""Run the SSE server."""
port = int(os.environ.get("PORT", "3000"))
external_port = int(os.environ.get("EXTERNAL_PORT", str(port)))
config_path = os.environ.get("CONFIG_PATH", "/app/config.yaml")
setup_logging()
# Suppress uvicorn access logs at INFO level (only show tool logs)
logging.getLogger("uvicorn.access").addFilter(UvicornAccessFilter())
if not _ensure_config(config_path):
return
@@ -192,7 +513,13 @@ def main():
_print_mcp_config(external_port, config.tokens)
app = create_app(config)
uvicorn.run(app, host="0.0.0.0", port=port)
# Configure uvicorn logging to reduce health check noise
log_config = uvicorn.config.LOGGING_CONFIG
log_config["formatters"]["default"]["fmt"] = "%(message)s"
log_config["formatters"]["access"]["fmt"] = "%(message)s"
uvicorn.run(app, host="0.0.0.0", port=port, log_config=log_config)
if __name__ == "__main__":

192
src/grist_mcp/proxy.py Normal file
View File

@@ -0,0 +1,192 @@
"""HTTP proxy handler for session token access."""
from dataclasses import dataclass
from typing import Any
from grist_mcp.auth import Authenticator
from grist_mcp.grist_client import GristClient
from grist_mcp.session import SessionToken
class ProxyError(Exception):
"""Error during proxy request processing."""
def __init__(self, message: str, code: str):
self.message = message
self.code = code
super().__init__(message)
@dataclass
class ProxyRequest:
"""Parsed proxy request."""
method: str
table: str | None = None
records: list[dict] | None = None
record_ids: list[int] | None = None
filter: dict | None = None
sort: str | None = None
limit: int | None = None
query: str | None = None
table_id: str | None = None
columns: list[dict] | None = None
column_id: str | None = None
column_type: str | None = None
formula: str | None = None
type: str | None = None
METHODS_REQUIRING_TABLE = {
"get_records", "describe_table", "add_records", "update_records",
"delete_records", "add_column", "modify_column", "delete_column",
}
def parse_proxy_request(body: dict[str, Any]) -> ProxyRequest:
"""Parse and validate a proxy request body."""
if "method" not in body:
raise ProxyError("Missing required field: method", "INVALID_REQUEST")
method = body["method"]
if method in METHODS_REQUIRING_TABLE and "table" not in body:
raise ProxyError(f"Missing required field 'table' for method '{method}'", "INVALID_REQUEST")
return ProxyRequest(
method=method,
table=body.get("table"),
records=body.get("records"),
record_ids=body.get("record_ids"),
filter=body.get("filter"),
sort=body.get("sort"),
limit=body.get("limit"),
query=body.get("query"),
table_id=body.get("table_id"),
columns=body.get("columns"),
column_id=body.get("column_id"),
column_type=body.get("column_type"),
formula=body.get("formula"),
type=body.get("type"),
)
# Map methods to required permissions
METHOD_PERMISSIONS = {
"list_tables": "read",
"describe_table": "read",
"get_records": "read",
"sql_query": "read",
"add_records": "write",
"update_records": "write",
"delete_records": "write",
"create_table": "schema",
"add_column": "schema",
"modify_column": "schema",
"delete_column": "schema",
}
async def dispatch_proxy_request(
request: ProxyRequest,
session: SessionToken,
auth: Authenticator,
client: GristClient | None = None,
) -> dict[str, Any]:
"""Dispatch a proxy request to the appropriate handler."""
# Check permission
required_perm = METHOD_PERMISSIONS.get(request.method)
if required_perm is None:
raise ProxyError(f"Unknown method: {request.method}", "INVALID_REQUEST")
if required_perm not in session.permissions:
raise ProxyError(
f"Permission '{required_perm}' required for {request.method}",
"UNAUTHORIZED",
)
# Create client if not provided
if client is None:
doc = auth.get_document(session.document)
client = GristClient(doc)
# Dispatch to appropriate method
try:
if request.method == "list_tables":
data = await client.list_tables()
return {"success": True, "data": {"tables": data}}
elif request.method == "describe_table":
data = await client.describe_table(request.table)
return {"success": True, "data": {"table": request.table, "columns": data}}
elif request.method == "get_records":
data = await client.get_records(
request.table,
filter=request.filter,
sort=request.sort,
limit=request.limit,
)
return {"success": True, "data": {"records": data}}
elif request.method == "sql_query":
if request.query is None:
raise ProxyError("Missing required field: query", "INVALID_REQUEST")
data = await client.sql_query(request.query)
return {"success": True, "data": {"records": data}}
elif request.method == "add_records":
if request.records is None:
raise ProxyError("Missing required field: records", "INVALID_REQUEST")
data = await client.add_records(request.table, request.records)
return {"success": True, "data": {"record_ids": data}}
elif request.method == "update_records":
if request.records is None:
raise ProxyError("Missing required field: records", "INVALID_REQUEST")
await client.update_records(request.table, request.records)
return {"success": True, "data": {"updated": len(request.records)}}
elif request.method == "delete_records":
if request.record_ids is None:
raise ProxyError("Missing required field: record_ids", "INVALID_REQUEST")
await client.delete_records(request.table, request.record_ids)
return {"success": True, "data": {"deleted": len(request.record_ids)}}
elif request.method == "create_table":
if request.table_id is None or request.columns is None:
raise ProxyError("Missing required fields: table_id, columns", "INVALID_REQUEST")
data = await client.create_table(request.table_id, request.columns)
return {"success": True, "data": {"table_id": data}}
elif request.method == "add_column":
if request.column_id is None or request.column_type is None:
raise ProxyError("Missing required fields: column_id, column_type", "INVALID_REQUEST")
await client.add_column(
request.table, request.column_id, request.column_type,
formula=request.formula,
)
return {"success": True, "data": {"column_id": request.column_id}}
elif request.method == "modify_column":
if request.column_id is None:
raise ProxyError("Missing required field: column_id", "INVALID_REQUEST")
await client.modify_column(
request.table, request.column_id,
type=request.type,
formula=request.formula,
)
return {"success": True, "data": {"column_id": request.column_id}}
elif request.method == "delete_column":
if request.column_id is None:
raise ProxyError("Missing required field: column_id", "INVALID_REQUEST")
await client.delete_column(request.table, request.column_id)
return {"success": True, "data": {"deleted": request.column_id}}
else:
raise ProxyError(f"Unknown method: {request.method}", "INVALID_REQUEST")
except ProxyError:
raise
except Exception as e:
raise ProxyError(str(e), "GRIST_ERROR")

View File

@@ -1,11 +1,18 @@
"""MCP server setup and tool registration."""
import json
import time
from mcp.server import Server
from mcp.types import Tool, TextContent
from grist_mcp.auth import Authenticator, Agent, AuthError
from grist_mcp.session import SessionTokenManager
from grist_mcp.tools.session import get_proxy_documentation as _get_proxy_documentation
from grist_mcp.tools.session import request_session_token as _request_session_token
from grist_mcp.logging import get_logger, extract_stats, format_tool_log
logger = get_logger("server")
from grist_mcp.tools.discovery import list_documents as _list_documents
from grist_mcp.tools.read import list_tables as _list_tables
@@ -21,18 +28,26 @@ from grist_mcp.tools.schema import modify_column as _modify_column
from grist_mcp.tools.schema import delete_column as _delete_column
def create_server(auth: Authenticator, agent: Agent) -> Server:
def create_server(
auth: Authenticator,
agent: Agent,
token_manager: SessionTokenManager | None = None,
proxy_base_url: str | None = None,
) -> Server:
"""Create and configure the MCP server for an authenticated agent.
Args:
auth: Authenticator instance for permission checks.
agent: The authenticated agent for this server instance.
token_manager: Optional session token manager for HTTP proxy access.
proxy_base_url: Base URL for the proxy endpoint (e.g., "https://example.com").
Returns:
Configured MCP Server instance.
"""
server = Server("grist-mcp")
_current_agent = agent
_proxy_base_url = proxy_base_url
@server.list_tools()
async def list_tools() -> list[Tool]:
@@ -171,13 +186,14 @@ def create_server(auth: Authenticator, agent: Agent) -> Server:
"column_id": {"type": "string"},
"column_type": {"type": "string"},
"formula": {"type": "string"},
"label": {"type": "string", "description": "Display label for the column"},
},
"required": ["document", "table", "column_id", "column_type"],
},
),
Tool(
name="modify_column",
description="Modify a column's type or formula",
description="Modify a column's type, formula, or label",
inputSchema={
"type": "object",
"properties": {
@@ -186,6 +202,7 @@ def create_server(auth: Authenticator, agent: Agent) -> Server:
"column_id": {"type": "string"},
"type": {"type": "string"},
"formula": {"type": "string"},
"label": {"type": "string", "description": "Display label for the column"},
},
"required": ["document", "table", "column_id"],
},
@@ -203,10 +220,54 @@ def create_server(auth: Authenticator, agent: Agent) -> Server:
"required": ["document", "table", "column_id"],
},
),
Tool(
name="get_proxy_documentation",
description="Get complete documentation for the HTTP proxy API",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="request_session_token",
description="Request a short-lived token for direct HTTP API access. Use this to delegate bulk data operations to scripts.",
inputSchema={
"type": "object",
"properties": {
"document": {
"type": "string",
"description": "Document name to grant access to",
},
"permissions": {
"type": "array",
"items": {"type": "string", "enum": ["read", "write", "schema"]},
"description": "Permission levels to grant",
},
"ttl_seconds": {
"type": "integer",
"description": "Token lifetime in seconds (max 3600, default 300)",
},
},
"required": ["document", "permissions"],
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
start_time = time.time()
document = arguments.get("document")
# Log arguments at DEBUG level
logger.debug(
format_tool_log(
agent_name=_current_agent.name,
token=_current_agent.token,
tool=name,
document=document,
stats=f"args: {json.dumps(arguments)}",
status="started",
duration_ms=0,
)
)
try:
if name == "list_documents":
result = await _list_documents(_current_agent)
@@ -252,6 +313,7 @@ def create_server(auth: Authenticator, agent: Agent) -> Server:
_current_agent, auth, arguments["document"], arguments["table"],
arguments["column_id"], arguments["column_type"],
formula=arguments.get("formula"),
label=arguments.get("label"),
)
elif name == "modify_column":
result = await _modify_column(
@@ -259,20 +321,75 @@ def create_server(auth: Authenticator, agent: Agent) -> Server:
arguments["column_id"],
type=arguments.get("type"),
formula=arguments.get("formula"),
label=arguments.get("label"),
)
elif name == "delete_column":
result = await _delete_column(
_current_agent, auth, arguments["document"], arguments["table"],
arguments["column_id"],
)
elif name == "get_proxy_documentation":
result = await _get_proxy_documentation()
elif name == "request_session_token":
if token_manager is None:
return [TextContent(type="text", text="Session tokens not enabled")]
result = await _request_session_token(
_current_agent, auth, token_manager,
arguments["document"],
arguments["permissions"],
ttl_seconds=arguments.get("ttl_seconds", 300),
proxy_base_url=_proxy_base_url,
)
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
duration_ms = int((time.time() - start_time) * 1000)
stats = extract_stats(name, arguments, result)
logger.info(
format_tool_log(
agent_name=_current_agent.name,
token=_current_agent.token,
tool=name,
document=document,
stats=stats,
status="success",
duration_ms=duration_ms,
)
)
return [TextContent(type="text", text=json.dumps(result))]
except AuthError as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.warning(
format_tool_log(
agent_name=_current_agent.name,
token=_current_agent.token,
tool=name,
document=document,
stats="-",
status="auth_error",
duration_ms=duration_ms,
error_message=str(e),
)
)
return [TextContent(type="text", text=f"Authorization error: {e}")]
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(
format_tool_log(
agent_name=_current_agent.name,
token=_current_agent.token,
tool=name,
document=document,
stats="-",
status="error",
duration_ms=duration_ms,
error_message=str(e),
)
)
return [TextContent(type="text", text=f"Error: {e}")]
return server

73
src/grist_mcp/session.py Normal file
View File

@@ -0,0 +1,73 @@
"""Session token management for HTTP proxy access."""
import secrets
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
MAX_TTL_SECONDS = 3600 # 1 hour
DEFAULT_TTL_SECONDS = 300 # 5 minutes
@dataclass
class SessionToken:
"""A short-lived session token for proxy access."""
token: str
document: str
permissions: list[str]
agent_name: str
created_at: datetime
expires_at: datetime
class SessionTokenManager:
"""Manages creation and validation of session tokens."""
def __init__(self):
self._tokens: dict[str, SessionToken] = {}
def create_token(
self,
agent_name: str,
document: str,
permissions: list[str],
ttl_seconds: int = DEFAULT_TTL_SECONDS,
) -> SessionToken:
"""Create a new session token.
TTL is capped at MAX_TTL_SECONDS (1 hour).
"""
now = datetime.now(timezone.utc)
token_str = f"sess_{secrets.token_urlsafe(32)}"
# Cap TTL at maximum
effective_ttl = min(ttl_seconds, MAX_TTL_SECONDS)
session = SessionToken(
token=token_str,
document=document,
permissions=permissions,
agent_name=agent_name,
created_at=now,
expires_at=now + timedelta(seconds=effective_ttl),
)
self._tokens[token_str] = session
return session
def validate_token(self, token: str) -> SessionToken | None:
"""Validate a session token.
Returns the SessionToken if valid and not expired, None otherwise.
Also removes expired tokens lazily.
"""
session = self._tokens.get(token)
if session is None:
return None
now = datetime.now(timezone.utc)
if session.expires_at < now:
# Token expired, remove it
del self._tokens[token]
return None
return session

View File

@@ -0,0 +1,37 @@
"""Filter normalization for Grist API queries."""
from typing import Any
def normalize_filter_value(value: Any) -> list:
"""Ensure a filter value is a list.
Grist API expects filter values to be arrays.
Args:
value: Single value or list of values.
Returns:
Value wrapped in list, or original list if already a list.
"""
if isinstance(value, list):
return value
return [value]
def normalize_filter(filter: dict | None) -> dict | None:
"""Normalize filter values to array format for Grist API.
Grist expects all filter values to be arrays. This function
wraps single values in lists.
Args:
filter: Filter dict with column names as keys.
Returns:
Normalized filter dict, or None if input was None.
"""
if not filter:
return filter
return {key: normalize_filter_value(value) for key, value in filter.items()}

View File

@@ -2,6 +2,7 @@
from grist_mcp.auth import Agent, Authenticator, Permission
from grist_mcp.grist_client import GristClient
from grist_mcp.tools.filters import normalize_filter
async def list_tables(
@@ -56,7 +57,10 @@ async def get_records(
doc = auth.get_document(document)
client = GristClient(doc)
records = await client.get_records(table, filter=filter, sort=sort, limit=limit)
# Normalize filter values to array format for Grist API
normalized_filter = normalize_filter(filter)
records = await client.get_records(table, filter=normalized_filter, sort=sort, limit=limit)
return {"records": records}

View File

@@ -31,6 +31,7 @@ async def add_column(
column_id: str,
column_type: str,
formula: str | None = None,
label: str | None = None,
client: GristClient | None = None,
) -> dict:
"""Add a column to a table."""
@@ -40,7 +41,9 @@ async def add_column(
doc = auth.get_document(document)
client = GristClient(doc)
created_id = await client.add_column(table, column_id, column_type, formula=formula)
created_id = await client.add_column(
table, column_id, column_type, formula=formula, label=label
)
return {"column_id": created_id}
@@ -52,16 +55,17 @@ async def modify_column(
column_id: str,
type: str | None = None,
formula: str | None = None,
label: str | None = None,
client: GristClient | None = None,
) -> dict:
"""Modify a column's type or formula."""
"""Modify a column's type, formula, or label."""
auth.authorize(agent, document, Permission.SCHEMA)
if client is None:
doc = auth.get_document(document)
client = GristClient(doc)
await client.modify_column(table, column_id, type=type, formula=formula)
await client.modify_column(table, column_id, type=type, formula=formula, label=label)
return {"modified": True}

View File

@@ -0,0 +1,192 @@
"""Session token tools for HTTP proxy access."""
from grist_mcp.auth import Agent, Authenticator, AuthError, Permission
from grist_mcp.session import SessionTokenManager
PROXY_DOCUMENTATION = {
"description": "HTTP proxy API for bulk data operations. Use request_session_token to get a short-lived token, then call the proxy endpoint directly from scripts.",
"endpoints": {
"proxy": "POST /api/v1/proxy - JSON operations (CRUD, schema)",
"attachments_upload": "POST /api/v1/attachments - File uploads (multipart/form-data)",
"attachments_download": "GET /api/v1/attachments/{id} - File downloads (binary response)",
},
"endpoint_note": "The full URL is returned in the 'proxy_url' field of request_session_token response. Replace /proxy with /attachments for file operations.",
"authentication": "Bearer token in Authorization header",
"attachment_upload": {
"endpoint": "POST /api/v1/attachments",
"content_type": "multipart/form-data",
"permission": "write",
"description": "Upload file attachments to the document. Returns attachment_id for linking to records via update_records.",
"response": {"success": True, "data": {"attachment_id": 42, "filename": "invoice.pdf", "size_bytes": 31395}},
"example_curl": "curl -X POST -H 'Authorization: Bearer TOKEN' -F 'file=@invoice.pdf' URL/api/v1/attachments",
"example_python": """import requests
response = requests.post(
f'{proxy_url.replace("/proxy", "/attachments")}',
headers={'Authorization': f'Bearer {token}'},
files={'file': open('invoice.pdf', 'rb')}
)
attachment_id = response.json()['data']['attachment_id']
# Link to record: update_records with {'Attachment': [attachment_id]}""",
},
"attachment_download": {
"endpoint": "GET /api/v1/attachments/{attachment_id}",
"permission": "read",
"description": "Download attachment by ID. Returns binary content with appropriate Content-Type and Content-Disposition headers.",
"response_headers": ["Content-Type", "Content-Disposition"],
"example_curl": "curl -H 'Authorization: Bearer TOKEN' URL/api/v1/attachments/42 -o file.pdf",
"example_python": """import requests
response = requests.get(
f'{base_url}/api/v1/attachments/42',
headers={'Authorization': f'Bearer {token}'}
)
with open('downloaded.pdf', 'wb') as f:
f.write(response.content)""",
},
"request_format": {
"method": "Operation name (required)",
"table": "Table name (required for most operations)",
},
"methods": {
"get_records": {
"description": "Fetch records from a table",
"fields": {
"table": "string",
"filter": "object (optional)",
"sort": "string (optional)",
"limit": "integer (optional)",
},
},
"sql_query": {
"description": "Run a read-only SQL query",
"fields": {"query": "string"},
},
"list_tables": {
"description": "List all tables in the document",
"fields": {},
},
"describe_table": {
"description": "Get column information for a table",
"fields": {"table": "string"},
},
"add_records": {
"description": "Add records to a table",
"fields": {"table": "string", "records": "array of objects"},
},
"update_records": {
"description": "Update existing records",
"fields": {"table": "string", "records": "array of {id, fields}"},
},
"delete_records": {
"description": "Delete records by ID",
"fields": {"table": "string", "record_ids": "array of integers"},
},
"create_table": {
"description": "Create a new table",
"fields": {"table_id": "string", "columns": "array of {id, type}"},
},
"add_column": {
"description": "Add a column to a table",
"fields": {
"table": "string",
"column_id": "string",
"column_type": "string",
"formula": "string (optional)",
},
},
"modify_column": {
"description": "Modify a column's type or formula",
"fields": {
"table": "string",
"column_id": "string",
"type": "string (optional)",
"formula": "string (optional)",
},
},
"delete_column": {
"description": "Delete a column",
"fields": {"table": "string", "column_id": "string"},
},
},
"response_format": {
"success": {"success": True, "data": "..."},
"error": {"success": False, "error": "message", "code": "ERROR_CODE"},
},
"error_codes": [
"UNAUTHORIZED",
"INVALID_TOKEN",
"TOKEN_EXPIRED",
"INVALID_REQUEST",
"GRIST_ERROR",
],
"example_script": """#!/usr/bin/env python3
import requests
import sys
# Use token and proxy_url from request_session_token response
token = sys.argv[1]
proxy_url = sys.argv[2]
response = requests.post(
proxy_url,
headers={'Authorization': f'Bearer {token}'},
json={
'method': 'add_records',
'table': 'Orders',
'records': [{'item': 'Widget', 'qty': 100}]
}
)
print(response.json())
""",
}
async def get_proxy_documentation() -> dict:
"""Return complete documentation for the HTTP proxy API."""
return PROXY_DOCUMENTATION
async def request_session_token(
agent: Agent,
auth: Authenticator,
token_manager: SessionTokenManager,
document: str,
permissions: list[str],
ttl_seconds: int = 300,
proxy_base_url: str | None = None,
) -> dict:
"""Request a short-lived session token for HTTP proxy access.
The token can only grant permissions the agent already has.
"""
# Verify agent has access to the document
# Check each requested permission
for perm_str in permissions:
try:
perm = Permission(perm_str)
except ValueError:
raise AuthError(f"Invalid permission: {perm_str}")
auth.authorize(agent, document, perm)
# Create the session token
session = token_manager.create_token(
agent_name=agent.name,
document=document,
permissions=permissions,
ttl_seconds=ttl_seconds,
)
# Build proxy URL - use base URL if provided, otherwise just path
proxy_path = "/api/v1/proxy"
if proxy_base_url:
proxy_url = f"{proxy_base_url.rstrip('/')}{proxy_path}"
else:
proxy_url = proxy_path
return {
"token": session.token,
"document": session.document,
"permissions": session.permissions,
"expires_at": session.expires_at.isoformat(),
"proxy_url": proxy_url,
}

View File

@@ -1,4 +1,4 @@
FROM python:3.14-slim
FROM python:3.14-slim@sha256:fb83750094b46fd6b8adaa80f66e2302ecbe45d513f6cece637a841e1025b4ca
WORKDIR /app

View File

@@ -35,6 +35,18 @@ MOCK_TABLES = {
{"id": 2, "fields": {"Title": "Deploy", "Done": False}},
],
},
"Orders": {
"columns": [
{"id": "OrderNum", "fields": {"type": "Int"}},
{"id": "Customer", "fields": {"type": "Ref:People"}},
{"id": "Amount", "fields": {"type": "Numeric"}},
],
"records": [
{"id": 1, "fields": {"OrderNum": 1001, "Customer": 1, "Amount": 100.0}},
{"id": 2, "fields": {"OrderNum": 1002, "Customer": 2, "Amount": 200.0}},
{"id": 3, "fields": {"OrderNum": 1003, "Customer": 1, "Amount": 150.0}},
],
},
}
# Track requests for test assertions
@@ -93,12 +105,40 @@ async def get_records(request):
"""GET /api/docs/{doc_id}/tables/{table_id}/records"""
doc_id = request.path_params["doc_id"]
table_id = request.path_params["table_id"]
log_request("GET", f"/api/docs/{doc_id}/tables/{table_id}/records")
filter_param = request.query_params.get("filter")
log_request("GET", f"/api/docs/{doc_id}/tables/{table_id}/records?filter={filter_param}")
if table_id not in MOCK_TABLES:
return JSONResponse({"error": "Table not found"}, status_code=404)
return JSONResponse({"records": MOCK_TABLES[table_id]["records"]})
records = MOCK_TABLES[table_id]["records"]
# Apply filtering if provided
if filter_param:
try:
filters = json.loads(filter_param)
# Validate filter format: all values must be arrays (Grist API requirement)
for key, values in filters.items():
if not isinstance(values, list):
return JSONResponse(
{"error": f"Filter values must be arrays, got {type(values).__name__} for '{key}'"},
status_code=400
)
# Apply filters: record matches if field value is in the filter list
filtered_records = []
for record in records:
match = True
for key, allowed_values in filters.items():
if record["fields"].get(key) not in allowed_values:
match = False
break
if match:
filtered_records.append(record)
records = filtered_records
except json.JSONDecodeError:
return JSONResponse({"error": "Invalid filter JSON"}, status_code=400)
return JSONResponse({"records": records})
async def add_records(request):
@@ -178,6 +218,15 @@ async def modify_column(request):
return JSONResponse({})
async def modify_columns(request):
"""PATCH /api/docs/{doc_id}/tables/{table_id}/columns - batch modify columns"""
doc_id = request.path_params["doc_id"]
table_id = request.path_params["table_id"]
body = await request.json()
log_request("PATCH", f"/api/docs/{doc_id}/tables/{table_id}/columns", body)
return JSONResponse({})
async def delete_column(request):
"""DELETE /api/docs/{doc_id}/tables/{table_id}/columns/{col_id}"""
doc_id = request.path_params["doc_id"]
@@ -199,6 +248,7 @@ app = Starlette(
Route("/api/docs/{doc_id}/tables", endpoint=create_tables, methods=["POST"]),
Route("/api/docs/{doc_id}/tables/{table_id}/columns", endpoint=get_table_columns),
Route("/api/docs/{doc_id}/tables/{table_id}/columns", endpoint=add_column, methods=["POST"]),
Route("/api/docs/{doc_id}/tables/{table_id}/columns", endpoint=modify_columns, methods=["PATCH"]),
Route("/api/docs/{doc_id}/tables/{table_id}/columns/{col_id}", endpoint=modify_column, methods=["PATCH"]),
Route("/api/docs/{doc_id}/tables/{table_id}/columns/{col_id}", endpoint=delete_column, methods=["DELETE"]),
Route("/api/docs/{doc_id}/tables/{table_id}/records", endpoint=get_records),

View File

@@ -9,12 +9,14 @@ from mcp.client.sse import sse_client
GRIST_MCP_URL = os.environ.get("GRIST_MCP_URL", "http://localhost:3000")
GRIST_MCP_TOKEN = os.environ.get("GRIST_MCP_TOKEN", "test-token")
@asynccontextmanager
async def create_mcp_session():
"""Create and yield an MCP session."""
async with sse_client(f"{GRIST_MCP_URL}/sse") as (read_stream, write_stream):
headers = {"Authorization": f"Bearer {GRIST_MCP_TOKEN}"}
async with sse_client(f"{GRIST_MCP_URL}/sse", headers=headers) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
yield session
@@ -44,12 +46,14 @@ async def test_mcp_protocol_compliance(services_ready):
"add_column",
"modify_column",
"delete_column",
"get_proxy_documentation",
"request_session_token",
]
for expected in expected_tools:
assert expected in tool_names, f"Missing tool: {expected}"
assert len(result.tools) == 12, f"Expected 12 tools, got {len(result.tools)}"
assert len(result.tools) == 14, f"Expected 14 tools, got {len(result.tools)}"
# Test 3: All tools have descriptions
for tool in result.tools:

View File

@@ -0,0 +1,52 @@
"""Integration tests for session token proxy."""
import os
import pytest
import httpx
GRIST_MCP_URL = os.environ.get("GRIST_MCP_URL", "http://localhost:3000")
GRIST_MCP_TOKEN = os.environ.get("GRIST_MCP_TOKEN")
@pytest.fixture
def mcp_client():
"""Client for MCP SSE endpoint."""
return httpx.Client(
base_url=GRIST_MCP_URL,
headers={"Authorization": f"Bearer {GRIST_MCP_TOKEN}"},
)
@pytest.fixture
def proxy_client():
"""Client for proxy endpoint (session token set per-test)."""
return httpx.Client(base_url=GRIST_MCP_URL)
@pytest.mark.integration
def test_full_session_proxy_flow(mcp_client, proxy_client):
"""Test: request token via MCP, use token to call proxy."""
# This test requires a running grist-mcp server with proper config
# Skip if not configured
if not GRIST_MCP_TOKEN:
pytest.skip("GRIST_MCP_TOKEN not set")
# Step 1: Request session token (would be via MCP in real usage)
# For integration test, we test the proxy endpoint directly
# This is a placeholder - full MCP integration would use SSE
# Step 2: Use proxy endpoint
# Note: Need a valid session token to test this fully
# For now, verify endpoint exists and rejects bad tokens
response = proxy_client.post(
"/api/v1/proxy",
headers={"Authorization": "Bearer invalid_token"},
json={"method": "list_tables"},
)
assert response.status_code == 401
data = response.json()
assert data["success"] is False
assert data["code"] in ["INVALID_TOKEN", "TOKEN_EXPIRED"]

View File

@@ -12,12 +12,14 @@ from mcp.client.sse import sse_client
GRIST_MCP_URL = os.environ.get("GRIST_MCP_URL", "http://localhost:3000")
MOCK_GRIST_URL = os.environ.get("MOCK_GRIST_URL", "http://localhost:8484")
GRIST_MCP_TOKEN = os.environ.get("GRIST_MCP_TOKEN", "test-token")
@asynccontextmanager
async def create_mcp_session():
"""Create and yield an MCP session."""
async with sse_client(f"{GRIST_MCP_URL}/sse") as (read_stream, write_stream):
headers = {"Authorization": f"Bearer {GRIST_MCP_TOKEN}"}
async with sse_client(f"{GRIST_MCP_URL}/sse", headers=headers) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
yield session
@@ -88,6 +90,36 @@ async def test_all_tools(services_ready):
log = get_mock_request_log()
assert any("/records" in entry["path"] and entry["method"] == "GET" for entry in log)
# Test get_records with Ref column filter
# This tests that single values are normalized to arrays for the Grist API
clear_mock_request_log()
result = await client.call_tool(
"get_records",
{"document": "test-doc", "table": "Orders", "filter": {"Customer": 1}}
)
data = json.loads(result.content[0].text)
assert "records" in data
# Should return only orders for Customer 1 (orders 1 and 3)
assert len(data["records"]) == 2
for record in data["records"]:
assert record["Customer"] == 1
log = get_mock_request_log()
# Verify the filter was sent as array format
filter_requests = [e for e in log if "/records" in e["path"] and "filter=" in e["path"]]
assert len(filter_requests) >= 1
# The filter value should be [1] not 1
assert "[1]" in filter_requests[0]["path"]
# Test get_records with multiple filter values
clear_mock_request_log()
result = await client.call_tool(
"get_records",
{"document": "test-doc", "table": "Orders", "filter": {"Customer": [1, 2]}}
)
data = json.loads(result.content[0].text)
assert "records" in data
assert len(data["records"]) == 3 # All 3 orders (customers 1 and 2)
# Test sql_query
clear_mock_request_log()
result = await client.call_tool(
@@ -194,7 +226,7 @@ async def test_all_tools(services_ready):
data = json.loads(result.content[0].text)
assert "modified" in data
log = get_mock_request_log()
patch_cols = [e for e in log if e["method"] == "PATCH" and "/columns/" in e["path"]]
patch_cols = [e for e in log if e["method"] == "PATCH" and "/columns" in e["path"]]
assert len(patch_cols) >= 1
# Test delete_column

View File

@@ -0,0 +1,89 @@
"""Unit tests for filter normalization."""
import pytest
from grist_mcp.tools.filters import normalize_filter, normalize_filter_value
class TestNormalizeFilterValue:
"""Tests for normalize_filter_value function."""
def test_int_becomes_list(self):
assert normalize_filter_value(5) == [5]
def test_string_becomes_list(self):
assert normalize_filter_value("foo") == ["foo"]
def test_float_becomes_list(self):
assert normalize_filter_value(3.14) == [3.14]
def test_list_unchanged(self):
assert normalize_filter_value([1, 2, 3]) == [1, 2, 3]
def test_empty_list_unchanged(self):
assert normalize_filter_value([]) == []
def test_single_item_list_unchanged(self):
assert normalize_filter_value([42]) == [42]
def test_mixed_type_list_unchanged(self):
assert normalize_filter_value([1, "foo", 3.14]) == [1, "foo", 3.14]
class TestNormalizeFilter:
"""Tests for normalize_filter function."""
def test_none_returns_none(self):
assert normalize_filter(None) is None
def test_empty_dict_returns_empty_dict(self):
assert normalize_filter({}) == {}
def test_single_int_value_wrapped(self):
result = normalize_filter({"Transaction": 44})
assert result == {"Transaction": [44]}
def test_single_string_value_wrapped(self):
result = normalize_filter({"Status": "active"})
assert result == {"Status": ["active"]}
def test_list_value_unchanged(self):
result = normalize_filter({"Transaction": [44, 45, 46]})
assert result == {"Transaction": [44, 45, 46]}
def test_mixed_columns_all_normalized(self):
"""Both ref and non-ref columns are normalized to arrays."""
result = normalize_filter({
"Transaction": 44, # Ref column (int)
"Debit": 500, # Non-ref column (int)
"Memo": "test", # Non-ref column (str)
})
assert result == {
"Transaction": [44],
"Debit": [500],
"Memo": ["test"],
}
def test_multiple_values_list_unchanged(self):
"""Filter with multiple values passes through."""
result = normalize_filter({
"Status": ["pending", "active"],
"Priority": [1, 2, 3],
})
assert result == {
"Status": ["pending", "active"],
"Priority": [1, 2, 3],
}
def test_mixed_single_and_list_values(self):
"""Mix of single values and lists."""
result = normalize_filter({
"Transaction": 44, # Single int
"Status": ["open", "closed"], # List
"Amount": 100.50, # Single float
})
assert result == {
"Transaction": [44],
"Status": ["open", "closed"],
"Amount": [100.50],
}

View File

@@ -155,6 +155,27 @@ async def test_add_column(client, httpx_mock: HTTPXMock):
col_id = await client.add_column("Table1", "NewCol", "Text", formula=None)
assert col_id == "NewCol"
request = httpx_mock.get_request()
import json
payload = json.loads(request.content)
assert payload == {"columns": [{"id": "NewCol", "fields": {"type": "Text"}}]}
@pytest.mark.asyncio
async def test_add_column_with_label(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/tables/Table1/columns",
method="POST",
json={"columns": [{"id": "first_name"}]},
)
col_id = await client.add_column("Table1", "first_name", "Text", label="First Name")
assert col_id == "first_name"
request = httpx_mock.get_request()
import json
payload = json.loads(request.content)
assert payload == {"columns": [{"id": "first_name", "fields": {"type": "Text", "label": "First Name"}}]}
@pytest.mark.asyncio
@@ -169,6 +190,22 @@ async def test_modify_column(client, httpx_mock: HTTPXMock):
await client.modify_column("Table1", "Amount", type="Int", formula="$Price * $Qty")
@pytest.mark.asyncio
async def test_modify_column_with_label(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/tables/Table1/columns",
method="PATCH",
json={},
)
await client.modify_column("Table1", "Col1", label="Column One")
request = httpx_mock.get_request()
import json
payload = json.loads(request.content)
assert payload == {"columns": [{"id": "Col1", "fields": {"label": "Column One"}}]}
@pytest.mark.asyncio
async def test_delete_column(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
@@ -196,3 +233,99 @@ def test_sql_validation_rejects_multiple_statements(client):
def test_sql_validation_allows_trailing_semicolon(client):
# Should not raise
client._validate_sql_query("SELECT * FROM users;")
# Attachment tests
@pytest.mark.asyncio
async def test_upload_attachment(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/attachments",
method="POST",
json=[42],
)
result = await client.upload_attachment(
filename="invoice.pdf",
content=b"PDF content here",
content_type="application/pdf",
)
assert result == {
"attachment_id": 42,
"filename": "invoice.pdf",
"size_bytes": 16,
}
@pytest.mark.asyncio
async def test_upload_attachment_default_content_type(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/attachments",
method="POST",
json=[99],
)
result = await client.upload_attachment(
filename="data.bin",
content=b"\x00\x01\x02",
)
assert result["attachment_id"] == 99
assert result["size_bytes"] == 3
@pytest.mark.asyncio
async def test_download_attachment(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/attachments/42/download",
method="GET",
content=b"PDF content here",
headers={
"content-type": "application/pdf",
"content-disposition": 'attachment; filename="invoice.pdf"',
},
)
result = await client.download_attachment(42)
assert result["content"] == b"PDF content here"
assert result["content_type"] == "application/pdf"
assert result["filename"] == "invoice.pdf"
@pytest.mark.asyncio
async def test_download_attachment_no_filename(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/attachments/99/download",
method="GET",
content=b"binary data",
headers={
"content-type": "application/octet-stream",
},
)
result = await client.download_attachment(99)
assert result["content"] == b"binary data"
assert result["content_type"] == "application/octet-stream"
assert result["filename"] is None
@pytest.mark.asyncio
async def test_download_attachment_unquoted_filename(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/attachments/55/download",
method="GET",
content=b"image data",
headers={
"content-type": "image/png",
"content-disposition": "attachment; filename=photo.png",
},
)
result = await client.download_attachment(55)
assert result["content"] == b"image data"
assert result["content_type"] == "image/png"
assert result["filename"] == "photo.png"

170
tests/unit/test_logging.py Normal file
View File

@@ -0,0 +1,170 @@
"""Unit tests for logging module."""
import logging
from grist_mcp.logging import truncate_token, extract_stats, format_tool_log
class TestTruncateToken:
def test_normal_token_shows_prefix_suffix(self):
token = "abcdefghijklmnop"
assert truncate_token(token) == "abc...nop"
def test_short_token_shows_asterisks(self):
token = "abcdefgh" # 8 chars
assert truncate_token(token) == "***"
def test_very_short_token_shows_asterisks(self):
token = "abc"
assert truncate_token(token) == "***"
def test_empty_token_shows_asterisks(self):
assert truncate_token("") == "***"
def test_boundary_token_shows_prefix_suffix(self):
token = "abcdefghi" # 9 chars - first to show truncation
assert truncate_token(token) == "abc...ghi"
class TestExtractStats:
def test_list_documents(self):
result = {"documents": [{"name": "a"}, {"name": "b"}, {"name": "c"}]}
assert extract_stats("list_documents", {}, result) == "3 docs"
def test_list_tables(self):
result = {"tables": ["Orders", "Products"]}
assert extract_stats("list_tables", {}, result) == "2 tables"
def test_describe_table(self):
result = {"columns": [{"id": "A"}, {"id": "B"}]}
assert extract_stats("describe_table", {}, result) == "2 columns"
def test_get_records(self):
result = {"records": [{"id": 1}, {"id": 2}]}
assert extract_stats("get_records", {}, result) == "2 records"
def test_sql_query(self):
result = {"records": [{"a": 1}, {"a": 2}, {"a": 3}]}
assert extract_stats("sql_query", {}, result) == "3 rows"
def test_add_records_from_args(self):
args = {"records": [{"a": 1}, {"a": 2}]}
assert extract_stats("add_records", args, {"ids": [1, 2]}) == "2 records"
def test_update_records_from_args(self):
args = {"records": [{"id": 1, "fields": {}}, {"id": 2, "fields": {}}]}
assert extract_stats("update_records", args, {}) == "2 records"
def test_delete_records_from_args(self):
args = {"record_ids": [1, 2, 3]}
assert extract_stats("delete_records", args, {}) == "3 records"
def test_create_table(self):
args = {"columns": [{"id": "A"}, {"id": "B"}]}
assert extract_stats("create_table", args, {}) == "2 columns"
def test_single_column_ops(self):
assert extract_stats("add_column", {}, {}) == "1 column"
assert extract_stats("modify_column", {}, {}) == "1 column"
assert extract_stats("delete_column", {}, {}) == "1 column"
def test_empty_result_returns_zero(self):
assert extract_stats("list_documents", {}, {"documents": []}) == "0 docs"
def test_unknown_tool(self):
assert extract_stats("unknown_tool", {}, {}) == "-"
class TestFormatToolLog:
def test_success_format(self):
line = format_tool_log(
agent_name="dev-agent",
token="abcdefghijklmnop",
tool="get_records",
document="sales",
stats="42 records",
status="success",
duration_ms=125,
)
assert "dev-agent" in line
assert "abc...nop" in line
assert "get_records" in line
assert "sales" in line
assert "42 records" in line
assert "success" in line
assert "125ms" in line
# Check pipe-delimited format
assert line.count("|") == 6
def test_no_document(self):
line = format_tool_log(
agent_name="dev-agent",
token="abcdefghijklmnop",
tool="list_documents",
document=None,
stats="3 docs",
status="success",
duration_ms=45,
)
assert "| - |" in line
def test_error_format(self):
line = format_tool_log(
agent_name="dev-agent",
token="abcdefghijklmnop",
tool="add_records",
document="inventory",
stats="5 records",
status="error",
duration_ms=89,
error_message="Grist API error: Invalid column 'foo'",
)
assert "error" in line
assert "\n Grist API error: Invalid column 'foo'" in line
class TestSetupLogging:
def test_default_level_is_info(self, monkeypatch):
monkeypatch.delenv("LOG_LEVEL", raising=False)
from grist_mcp.logging import setup_logging
setup_logging()
logger = logging.getLogger("grist_mcp")
assert logger.level == logging.INFO
def test_respects_log_level_env(self, monkeypatch):
monkeypatch.setenv("LOG_LEVEL", "DEBUG")
from grist_mcp.logging import setup_logging
setup_logging()
logger = logging.getLogger("grist_mcp")
assert logger.level == logging.DEBUG
def test_invalid_level_defaults_to_info(self, monkeypatch):
monkeypatch.setenv("LOG_LEVEL", "INVALID")
from grist_mcp.logging import setup_logging
setup_logging()
logger = logging.getLogger("grist_mcp")
assert logger.level == logging.INFO
class TestGetLogger:
def test_returns_child_logger(self):
from grist_mcp.logging import get_logger
logger = get_logger("server")
assert logger.name == "grist_mcp.server"
def test_inherits_parent_level(self, monkeypatch):
monkeypatch.setenv("LOG_LEVEL", "WARNING")
from grist_mcp.logging import setup_logging, get_logger
setup_logging()
logger = get_logger("test")
# Child inherits from parent when level is NOTSET
assert logger.getEffectiveLevel() == logging.WARNING

98
tests/unit/test_proxy.py Normal file
View File

@@ -0,0 +1,98 @@
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
from grist_mcp.proxy import parse_proxy_request, ProxyRequest, ProxyError, dispatch_proxy_request
from grist_mcp.session import SessionToken
@pytest.fixture
def mock_session():
return SessionToken(
token="sess_test",
document="sales",
permissions=["read", "write"],
agent_name="test-agent",
created_at=datetime.now(timezone.utc),
expires_at=datetime.now(timezone.utc),
)
@pytest.fixture
def mock_auth():
auth = MagicMock()
doc = MagicMock()
doc.url = "https://grist.example.com"
doc.doc_id = "abc123"
doc.api_key = "key"
auth.get_document.return_value = doc
return auth
def test_parse_proxy_request_valid_add_records():
body = {
"method": "add_records",
"table": "Orders",
"records": [{"item": "Widget", "qty": 10}],
}
request = parse_proxy_request(body)
assert request.method == "add_records"
assert request.table == "Orders"
assert request.records == [{"item": "Widget", "qty": 10}]
def test_parse_proxy_request_missing_method():
body = {"table": "Orders"}
with pytest.raises(ProxyError) as exc_info:
parse_proxy_request(body)
assert exc_info.value.code == "INVALID_REQUEST"
assert "method" in str(exc_info.value)
@pytest.mark.asyncio
async def test_dispatch_add_records(mock_session, mock_auth):
request = ProxyRequest(
method="add_records",
table="Orders",
records=[{"item": "Widget"}],
)
mock_client = AsyncMock()
mock_client.add_records.return_value = [1, 2, 3]
result = await dispatch_proxy_request(
request, mock_session, mock_auth, client=mock_client
)
assert result["success"] is True
assert result["data"]["record_ids"] == [1, 2, 3]
mock_client.add_records.assert_called_once_with("Orders", [{"item": "Widget"}])
@pytest.mark.asyncio
async def test_dispatch_denies_without_permission(mock_auth):
# Session only has read permission
session = SessionToken(
token="sess_test",
document="sales",
permissions=["read"], # No write
agent_name="test-agent",
created_at=datetime.now(timezone.utc),
expires_at=datetime.now(timezone.utc),
)
request = ProxyRequest(
method="add_records", # Requires write
table="Orders",
records=[{"item": "Widget"}],
)
with pytest.raises(ProxyError) as exc_info:
await dispatch_proxy_request(request, session, mock_auth)
assert exc_info.value.code == "UNAUTHORIZED"

View File

@@ -53,5 +53,48 @@ tokens:
assert "modify_column" in tool_names
assert "delete_column" in tool_names
# Should have all 12 tools
assert len(result.root.tools) == 12
# Session tools (always registered)
assert "get_proxy_documentation" in tool_names
assert "request_session_token" in tool_names
# Should have all 14 tools
assert len(result.root.tools) == 14
@pytest.mark.asyncio
async def test_create_server_registers_session_tools(tmp_path):
from grist_mcp.session import SessionTokenManager
config_file = tmp_path / "config.yaml"
config_file.write_text("""
documents:
test-doc:
url: https://grist.example.com
doc_id: abc123
api_key: test-key
tokens:
- token: valid-token
name: test-agent
scope:
- document: test-doc
permissions: [read, write, schema]
""")
config = load_config(str(config_file))
auth = Authenticator(config)
agent = auth.authenticate("valid-token")
token_manager = SessionTokenManager()
server = create_server(auth, agent, token_manager)
# Get the list_tools handler and call it
handler = server.request_handlers.get(ListToolsRequest)
assert handler is not None
req = ListToolsRequest(method="tools/list")
result = await handler(req)
tool_names = [t.name for t in result.root.tools]
assert "get_proxy_documentation" in tool_names
assert "request_session_token" in tool_names

View File

@@ -0,0 +1,81 @@
import pytest
from datetime import datetime, timedelta, timezone
from grist_mcp.session import SessionTokenManager, SessionToken
def test_create_token_returns_valid_session_token():
manager = SessionTokenManager()
token = manager.create_token(
agent_name="test-agent",
document="sales",
permissions=["read", "write"],
ttl_seconds=300,
)
assert token.token.startswith("sess_")
assert len(token.token) > 20
assert token.document == "sales"
assert token.permissions == ["read", "write"]
assert token.agent_name == "test-agent"
assert token.expires_at > datetime.now(timezone.utc)
assert token.expires_at < datetime.now(timezone.utc) + timedelta(seconds=310)
def test_create_token_caps_ttl_at_maximum():
manager = SessionTokenManager()
# Request 2 hours, should be capped at 1 hour
token = manager.create_token(
agent_name="test-agent",
document="sales",
permissions=["read"],
ttl_seconds=7200,
)
# Should be capped at 3600 seconds (1 hour)
max_expires = datetime.now(timezone.utc) + timedelta(seconds=3610)
assert token.expires_at < max_expires
def test_validate_token_returns_session_for_valid_token():
manager = SessionTokenManager()
created = manager.create_token(
agent_name="test-agent",
document="sales",
permissions=["read"],
ttl_seconds=300,
)
session = manager.validate_token(created.token)
assert session is not None
assert session.document == "sales"
assert session.agent_name == "test-agent"
def test_validate_token_returns_none_for_unknown_token():
manager = SessionTokenManager()
session = manager.validate_token("sess_unknown_token")
assert session is None
def test_validate_token_returns_none_for_expired_token():
manager = SessionTokenManager()
created = manager.create_token(
agent_name="test-agent",
document="sales",
permissions=["read"],
ttl_seconds=1,
)
# Wait for expiry
import time
time.sleep(1.5)
session = manager.validate_token(created.token)
assert session is None

View File

@@ -75,6 +75,45 @@ async def test_get_records(agent, auth, mock_client):
assert result == {"records": [{"id": 1, "Name": "Alice"}]}
@pytest.mark.asyncio
async def test_get_records_normalizes_filter(agent, auth, mock_client):
"""Test that filter values are normalized to array format for Grist API."""
mock_client.get_records.return_value = [{"id": 1, "Customer": 5}]
await get_records(
agent, auth, "budget", "Orders",
filter={"Customer": 5, "Status": "active"},
client=mock_client,
)
# Verify filter was normalized: single values wrapped in lists
mock_client.get_records.assert_called_once_with(
"Orders",
filter={"Customer": [5], "Status": ["active"]},
sort=None,
limit=None,
)
@pytest.mark.asyncio
async def test_get_records_preserves_list_filter(agent, auth, mock_client):
"""Test that filter values already in list format are preserved."""
mock_client.get_records.return_value = []
await get_records(
agent, auth, "budget", "Orders",
filter={"Customer": [5, 6, 7]},
client=mock_client,
)
mock_client.get_records.assert_called_once_with(
"Orders",
filter={"Customer": [5, 6, 7]},
sort=None,
limit=None,
)
@pytest.mark.asyncio
async def test_sql_query(agent, auth, mock_client):
result = await sql_query(agent, auth, "budget", "SELECT * FROM Table1", client=mock_client)

View File

@@ -81,6 +81,25 @@ async def test_add_column(auth, mock_client):
)
assert result == {"column_id": "NewCol"}
mock_client.add_column.assert_called_once_with(
"Table1", "NewCol", "Text", formula=None, label=None
)
@pytest.mark.asyncio
async def test_add_column_with_label(auth, mock_client):
agent = auth.authenticate("schema-token")
result = await add_column(
agent, auth, "budget", "Table1", "first_name", "Text",
label="First Name",
client=mock_client,
)
assert result == {"column_id": "NewCol"}
mock_client.add_column.assert_called_once_with(
"Table1", "first_name", "Text", formula=None, label="First Name"
)
@pytest.mark.asyncio
@@ -95,6 +114,25 @@ async def test_modify_column(auth, mock_client):
)
assert result == {"modified": True}
mock_client.modify_column.assert_called_once_with(
"Table1", "Col1", type="Int", formula="$A + $B", label=None
)
@pytest.mark.asyncio
async def test_modify_column_with_label(auth, mock_client):
agent = auth.authenticate("schema-token")
result = await modify_column(
agent, auth, "budget", "Table1", "Col1",
label="Column One",
client=mock_client,
)
assert result == {"modified": True}
mock_client.modify_column.assert_called_once_with(
"Table1", "Col1", type=None, formula=None, label="Column One"
)
@pytest.mark.asyncio

View File

@@ -0,0 +1,126 @@
import pytest
from grist_mcp.tools.session import get_proxy_documentation, request_session_token
from grist_mcp.auth import Authenticator, Agent, AuthError
from grist_mcp.config import Config, Document, Token, TokenScope
from grist_mcp.session import SessionTokenManager
@pytest.fixture
def sample_config():
return Config(
documents={
"sales": Document(
url="https://grist.example.com",
doc_id="abc123",
api_key="key",
),
},
tokens=[
Token(
token="agent-token",
name="test-agent",
scope=[
TokenScope(document="sales", permissions=["read", "write"]),
],
),
],
)
@pytest.fixture
def auth_and_agent(sample_config):
auth = Authenticator(sample_config)
agent = auth.authenticate("agent-token")
return auth, agent
@pytest.mark.asyncio
async def test_get_proxy_documentation_returns_complete_spec():
result = await get_proxy_documentation()
assert "description" in result
assert "endpoints" in result
assert "proxy" in result["endpoints"]
assert "attachments_upload" in result["endpoints"]
assert "attachments_download" in result["endpoints"]
assert "authentication" in result
assert "methods" in result
assert "add_records" in result["methods"]
assert "get_records" in result["methods"]
assert "attachment_upload" in result
assert "attachment_download" in result
assert "example_script" in result
@pytest.mark.asyncio
async def test_request_session_token_creates_valid_token(auth_and_agent):
auth, agent = auth_and_agent
manager = SessionTokenManager()
result = await request_session_token(
agent=agent,
auth=auth,
token_manager=manager,
document="sales",
permissions=["read", "write"],
ttl_seconds=300,
)
assert "token" in result
assert result["token"].startswith("sess_")
assert result["document"] == "sales"
assert result["permissions"] == ["read", "write"]
assert "expires_at" in result
assert result["proxy_url"] == "/api/v1/proxy"
@pytest.mark.asyncio
async def test_request_session_token_rejects_unauthorized_document(sample_config):
auth = Authenticator(sample_config)
agent = auth.authenticate("agent-token")
manager = SessionTokenManager()
with pytest.raises(AuthError, match="Document not in scope"):
await request_session_token(
agent=agent,
auth=auth,
token_manager=manager,
document="unauthorized_doc",
permissions=["read"],
ttl_seconds=300,
)
@pytest.mark.asyncio
async def test_request_session_token_rejects_unauthorized_permission(sample_config):
auth = Authenticator(sample_config)
agent = auth.authenticate("agent-token")
manager = SessionTokenManager()
# Agent has read/write on sales, but not schema
with pytest.raises(AuthError, match="Permission denied"):
await request_session_token(
agent=agent,
auth=auth,
token_manager=manager,
document="sales",
permissions=["read", "schema"], # schema not granted
ttl_seconds=300,
)
@pytest.mark.asyncio
async def test_request_session_token_rejects_invalid_permission(sample_config):
auth = Authenticator(sample_config)
agent = auth.authenticate("agent-token")
manager = SessionTokenManager()
with pytest.raises(AuthError, match="Invalid permission"):
await request_session_token(
agent=agent,
auth=auth,
token_manager=manager,
document="sales",
permissions=["read", "invalid_perm"],
ttl_seconds=300,
)

2
uv.lock generated
View File

@@ -153,7 +153,7 @@ wheels = [
[[package]]
name = "grist-mcp"
version = "0.1.0"
version = "1.4.1"
source = { editable = "." }
dependencies = [
{ name = "httpx" },