26 Commits

Author SHA1 Message Date
848cfd684f feat: add upload_attachment MCP tool
All checks were successful
Build and Push Docker Image / build (push) Successful in 24s
Add support for uploading file attachments to Grist documents:

- GristClient.upload_attachment() method using multipart/form-data
- upload_attachment tool function with base64 decoding and MIME detection
- Tool registration in server.py
- Comprehensive unit tests (7 new tests)

Returns attachment ID for linking to records via update_records.

Bumps version to 1.3.0.
2026-01-03 19:59:47 -05:00
ea175d55a2 Add attachment upload feature design 2026-01-03 19:50:01 -05:00
db12fca615 Merge pull request 'chore(deps): update actions/checkout action to v6' (#3) from renovate/actions-checkout-6.x into master
Reviewed-on: #3
2026-01-02 17:19:43 -05:00
d540105d09 docs(proxy): clarify proxy_url usage in documentation
All checks were successful
Build and Push Docker Image / build (push) Successful in 21s
2026-01-02 15:01:33 -05:00
d40ae0b238 feat(main): use GRIST_MCP_URL in startup config output 2026-01-02 14:58:55 -05:00
2a60de1bf1 docs: add GRIST_MCP_URL to environment variables 2026-01-02 14:56:02 -05:00
ba45de4582 fix(session): include full proxy URL from GRIST_MCP_URL env var 2026-01-02 14:54:25 -05:00
d176b03d56 chore: bump version to 1.2.0
All checks were successful
Build and Push Docker Image / build (push) Successful in 21s
2026-01-02 14:43:50 -05:00
50c5cfbab1 Merge master into feature/session-proxy 2026-01-02 14:40:37 -05:00
8484536aae fix(integration): add auth headers and fix mock server routes 2026-01-02 14:36:25 -05:00
b3bfdf97c2 fix(test): increase sleep duration for flaky expiry test 2026-01-02 14:24:10 -05:00
eabddee737 docs: update CHANGELOG for session proxy feature 2026-01-02 14:20:45 -05:00
3d1ac1fe60 test(integration): add session proxy integration test 2026-01-02 14:17:59 -05:00
ed1d14a4d4 feat(main): add /api/v1/proxy HTTP endpoint 2026-01-02 14:16:24 -05:00
80e93ab3d9 test(proxy): add permission denial test 2026-01-02 14:08:58 -05:00
7073182f9e feat(proxy): add method dispatch 2026-01-02 14:07:47 -05:00
caa435d972 feat(proxy): add request parsing 2026-01-02 13:57:38 -05:00
ba88ba01f3 feat(server): register session token tools
Add get_proxy_documentation and request_session_token tools to the MCP
server. The create_server function now accepts an optional token_manager
parameter (SessionTokenManager | None) to maintain backward compatibility.

When token_manager is None, request_session_token returns an error
message instead of creating tokens.
2026-01-02 13:51:47 -05:00
fb6d4af973 feat(tools): add request_session_token tool
Add MCP tool for agents to request short-lived session tokens for HTTP
proxy access. The tool validates that agents can only request permissions
they already have (no privilege escalation).

- Validates document access and each requested permission
- Creates session token via SessionTokenManager
- Returns token metadata including proxy URL and expiration
- Includes tests for success case and permission denial scenarios
2026-01-02 13:45:07 -05:00
a7bb11d765 feat(tools): add get_proxy_documentation tool
Add a new MCP tool that returns complete documentation for the HTTP
proxy API. This enables agents to get all the information they need
to construct valid proxy requests when writing scripts.

The tool is stateless and returns a static documentation dict
describing endpoints, methods, authentication, and example usage.
2026-01-02 13:39:02 -05:00
c65ec0489c test(session): add tests for invalid and expired tokens 2026-01-02 13:34:52 -05:00
681cb0f67c feat(session): add token validation 2026-01-02 13:31:18 -05:00
3c97ad407c feat(session): cap TTL at 1 hour maximum 2026-01-02 13:27:30 -05:00
b310ee10a9 feat(session): add SessionTokenManager with token creation
Add SessionTokenManager class that creates short-lived session tokens
for HTTP proxy access. Each token includes agent identity, document
scope, permissions, and expiration time.
2026-01-02 13:22:53 -05:00
4923d3110c docs: add session proxy implementation plan 2026-01-02 13:04:49 -05:00
f79ae5546f chore(deps): update actions/checkout action to v6 2026-01-02 05:20:49 +00:00
24 changed files with 2939 additions and 14 deletions

View File

@@ -18,7 +18,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6
- name: Log in to Container Registry - name: Log in to Container Registry
uses: docker/login-action@v3 uses: docker/login-action@v3

View File

@@ -5,6 +5,43 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.3.0] - 2026-01-03
### Added
#### Attachment Upload
- **`upload_attachment` MCP tool**: Upload files to Grist documents
- Base64-encoded content input (required for JSON-based MCP protocol)
- Automatic MIME type detection from filename
- Returns attachment ID for linking to records via `update_records`
#### Usage
```python
# 1. Upload attachment
result = upload_attachment(
document="accounting",
filename="invoice.pdf",
content_base64="JVBERi0xLjQK..."
)
# Returns: {"attachment_id": 42, "filename": "invoice.pdf", "size_bytes": 31395}
# 2. Link to record
update_records(document="accounting", table="Bills", records=[
{"id": 1, "fields": {"Attachment": [42]}}
])
```
## [1.2.0] - 2026-01-02
### Added
#### Session Token Proxy
- **Session token proxy**: Agents can request short-lived tokens for bulk operations
- `get_proxy_documentation` MCP tool: returns complete proxy API spec
- `request_session_token` MCP tool: creates scoped session tokens with TTL (max 1 hour)
- `POST /api/v1/proxy` HTTP endpoint: accepts session tokens for direct API access
- Supports all 11 Grist operations (read, write, schema) via HTTP
## [1.1.0] - 2026-01-02 ## [1.1.0] - 2026-01-02
### Added ### Added

View File

@@ -150,6 +150,7 @@ Add to your MCP client configuration (e.g., Claude Desktop):
| `GRIST_MCP_TOKEN` | Agent authentication token (required) | - | | `GRIST_MCP_TOKEN` | Agent authentication token (required) | - |
| `CONFIG_PATH` | Path to config file inside container | `/app/config.yaml` | | `CONFIG_PATH` | Path to config file inside container | `/app/config.yaml` |
| `LOG_LEVEL` | Logging verbosity (`DEBUG`, `INFO`, `WARNING`, `ERROR`) | `INFO` | | `LOG_LEVEL` | Logging verbosity (`DEBUG`, `INFO`, `WARNING`, `ERROR`) | `INFO` |
| `GRIST_MCP_URL` | Public URL of this server (for session proxy tokens) | - |
### config.yaml Structure ### config.yaml Structure

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,187 @@
# Attachment Upload Feature Design
**Date:** 2026-01-03
**Status:** Approved
## Summary
Add an `upload_attachment` MCP tool to upload files to Grist documents and receive an attachment ID for linking to records.
## Design Decisions
| Decision | Choice | Rationale |
|----------|--------|-----------|
| Content encoding | Base64 string | MCP tools use JSON; binary must be encoded |
| Batch support | Single file only | YAGNI; caller can loop if needed |
| Linking behavior | Upload only, return ID | Single responsibility; use existing `update_records` to link |
| Download support | Not included | YAGNI; can add later if needed |
| Permission level | Write | Attachments are data, not schema |
| Proxy support | MCP tool only | Reduces scope; scripts can use Grist API directly |
## Tool Interface
### Input Schema
```json
{
"type": "object",
"properties": {
"document": {
"type": "string",
"description": "Document name"
},
"filename": {
"type": "string",
"description": "Filename with extension (e.g., 'invoice.pdf')"
},
"content_base64": {
"type": "string",
"description": "File content as base64-encoded string"
},
"content_type": {
"type": "string",
"description": "MIME type (optional, auto-detected from filename if omitted)"
}
},
"required": ["document", "filename", "content_base64"]
}
```
### Response
```json
{
"attachment_id": 42,
"filename": "invoice.pdf",
"size_bytes": 30720
}
```
### Usage Example
```python
# 1. Upload attachment
result = upload_attachment(
document="accounting",
filename="Invoice-001.pdf",
content_base64="JVBERi0xLjQK..."
)
# 2. Link to record via existing update_records tool
update_records("Bills", [{
"id": 1,
"fields": {"Attachment": [result["attachment_id"]]}
}])
```
## Implementation
### Files to Modify
1. **`src/grist_mcp/grist_client.py`** - Add `upload_attachment()` method
2. **`src/grist_mcp/tools/write.py`** - Add tool function
3. **`src/grist_mcp/server.py`** - Register tool
### GristClient Method
```python
async def upload_attachment(
self,
filename: str,
content: bytes,
content_type: str | None = None
) -> dict:
"""Upload a file attachment. Returns attachment metadata."""
if content_type is None:
content_type = "application/octet-stream"
files = {"upload": (filename, content, content_type)}
async with httpx.AsyncClient(timeout=self._timeout) as client:
response = await client.post(
f"{self._base_url}/attachments",
headers=self._headers,
files=files,
)
response.raise_for_status()
# Grist returns list of attachment IDs
attachment_ids = response.json()
return {
"attachment_id": attachment_ids[0],
"filename": filename,
"size_bytes": len(content),
}
```
### Tool Function
```python
import base64
import mimetypes
async def upload_attachment(
agent: Agent,
auth: Authenticator,
document: str,
filename: str,
content_base64: str,
content_type: str | None = None,
client: GristClient | None = None,
) -> dict:
"""Upload a file attachment to a document."""
auth.authorize(agent, document, Permission.WRITE)
# Decode base64
try:
content = base64.b64decode(content_base64)
except Exception:
raise ValueError("Invalid base64 encoding")
# Auto-detect MIME type if not provided
if content_type is None:
content_type, _ = mimetypes.guess_type(filename)
if content_type is None:
content_type = "application/octet-stream"
if client is None:
doc = auth.get_document(document)
client = GristClient(doc)
return await client.upload_attachment(filename, content, content_type)
```
## Error Handling
| Error | Cause | Response |
|-------|-------|----------|
| Invalid base64 | Malformed content_base64 | `ValueError: Invalid base64 encoding` |
| Authorization | Agent lacks write permission | `AuthError` (existing pattern) |
| Grist API error | Upload fails | `httpx.HTTPStatusError` (existing pattern) |
## Testing
### Unit Tests
**`tests/unit/test_tools_write.py`:**
- `test_upload_attachment_success` - Valid base64, returns attachment_id
- `test_upload_attachment_invalid_base64` - Raises ValueError
- `test_upload_attachment_auth_required` - Verifies write permission check
- `test_upload_attachment_mime_detection` - Auto-detects type from filename
**`tests/unit/test_grist_client.py`:**
- `test_upload_attachment_api_call` - Correct multipart request format
- `test_upload_attachment_with_explicit_content_type` - Passes through MIME type
### Mock Approach
Mock `httpx.AsyncClient` responses; no Grist server needed for unit tests.
## Future Considerations
Not included in this implementation (YAGNI):
- Batch upload (multiple files)
- Download attachment
- Proxy API support
- Size limit validation (rely on Grist's limits)
These can be added if real use cases emerge.

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "grist-mcp" name = "grist-mcp"
version = "1.1.0" version = "1.3.0"
description = "MCP server for AI agents to interact with Grist documents" description = "MCP server for AI agents to interact with Grist documents"
requires-python = ">=3.14" requires-python = ">=3.14"
dependencies = [ dependencies = [
@@ -28,3 +28,6 @@ build-backend = "hatchling.build"
[tool.pytest.ini_options] [tool.pytest.ini_options]
asyncio_mode = "auto" asyncio_mode = "auto"
testpaths = ["tests/unit", "tests/integration"] testpaths = ["tests/unit", "tests/integration"]
markers = [
"integration: marks tests as integration tests (require Docker containers)",
]

View File

@@ -116,6 +116,39 @@ class GristClient:
"""Delete records by ID.""" """Delete records by ID."""
await self._request("POST", f"/tables/{table}/data/delete", json=record_ids) await self._request("POST", f"/tables/{table}/data/delete", json=record_ids)
async def upload_attachment(
self,
filename: str,
content: bytes,
content_type: str = "application/octet-stream",
) -> dict:
"""Upload a file attachment. Returns attachment metadata.
Args:
filename: Name for the uploaded file.
content: File content as bytes.
content_type: MIME type of the file.
Returns:
Dict with attachment_id, filename, and size_bytes.
"""
files = {"upload": (filename, content, content_type)}
async with httpx.AsyncClient(timeout=self._timeout) as client:
response = await client.post(
f"{self._base_url}/attachments",
headers=self._headers,
files=files,
)
response.raise_for_status()
# Grist returns list of attachment IDs
attachment_ids = response.json()
return {
"attachment_id": attachment_ids[0],
"filename": filename,
"size_bytes": len(content),
}
# Schema operations # Schema operations
async def create_table(self, table_id: str, columns: list[dict]) -> str: async def create_table(self, table_id: str, columns: list[dict]) -> str:

View File

@@ -12,6 +12,8 @@ from mcp.server.sse import SseServerTransport
from grist_mcp.server import create_server from grist_mcp.server import create_server
from grist_mcp.config import Config, load_config from grist_mcp.config import Config, load_config
from grist_mcp.auth import Authenticator, AuthError from grist_mcp.auth import Authenticator, AuthError
from grist_mcp.session import SessionTokenManager
from grist_mcp.proxy import parse_proxy_request, dispatch_proxy_request, ProxyError
from grist_mcp.logging import setup_logging from grist_mcp.logging import setup_logging
@@ -43,6 +45,20 @@ async def send_error(send: Send, status: int, message: str) -> None:
}) })
async def send_json_response(send: Send, status: int, data: dict) -> None:
"""Send a JSON response."""
body = json.dumps(data).encode()
await send({
"type": "http.response.start",
"status": status,
"headers": [[b"content-type", b"application/json"]],
})
await send({
"type": "http.response.body",
"body": body,
})
CONFIG_TEMPLATE = """\ CONFIG_TEMPLATE = """\
# grist-mcp configuration # grist-mcp configuration
# #
@@ -110,6 +126,8 @@ def _ensure_config(config_path: str) -> bool:
def create_app(config: Config): def create_app(config: Config):
"""Create the ASGI application.""" """Create the ASGI application."""
auth = Authenticator(config) auth = Authenticator(config)
token_manager = SessionTokenManager()
proxy_base_url = os.environ.get("GRIST_MCP_URL")
sse = SseServerTransport("/messages") sse = SseServerTransport("/messages")
@@ -127,7 +145,7 @@ def create_app(config: Config):
return return
# Create a server instance for this authenticated connection # Create a server instance for this authenticated connection
server = create_server(auth, agent) server = create_server(auth, agent, token_manager, proxy_base_url)
async with sse.connect_sse(scope, receive, send) as streams: async with sse.connect_sse(scope, receive, send) as streams:
await server.run( await server.run(
@@ -159,6 +177,58 @@ def create_app(config: Config):
"body": b'{"error":"Not found"}', "body": b'{"error":"Not found"}',
}) })
async def handle_proxy(scope: Scope, receive: Receive, send: Send) -> None:
# Extract token
token = _get_bearer_token(scope)
if not token:
await send_json_response(send, 401, {
"success": False,
"error": "Missing Authorization header",
"code": "INVALID_TOKEN",
})
return
# Validate session token
session = token_manager.validate_token(token)
if session is None:
await send_json_response(send, 401, {
"success": False,
"error": "Invalid or expired token",
"code": "TOKEN_EXPIRED",
})
return
# Read request body
body = b""
while True:
message = await receive()
body += message.get("body", b"")
if not message.get("more_body", False):
break
try:
request_data = json.loads(body)
except json.JSONDecodeError:
await send_json_response(send, 400, {
"success": False,
"error": "Invalid JSON",
"code": "INVALID_REQUEST",
})
return
# Parse and dispatch
try:
request = parse_proxy_request(request_data)
result = await dispatch_proxy_request(request, session, auth)
await send_json_response(send, 200, result)
except ProxyError as e:
status = 403 if e.code == "UNAUTHORIZED" else 400
await send_json_response(send, status, {
"success": False,
"error": e.message,
"code": e.code,
})
async def app(scope: Scope, receive: Receive, send: Send) -> None: async def app(scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http": if scope["type"] != "http":
return return
@@ -172,6 +242,8 @@ def create_app(config: Config):
await handle_sse(scope, receive, send) await handle_sse(scope, receive, send)
elif path == "/messages" and method == "POST": elif path == "/messages" and method == "POST":
await handle_messages(scope, receive, send) await handle_messages(scope, receive, send)
elif path == "/api/v1/proxy" and method == "POST":
await handle_proxy(scope, receive, send)
else: else:
await handle_not_found(scope, receive, send) await handle_not_found(scope, receive, send)
@@ -180,11 +252,18 @@ def create_app(config: Config):
def _print_mcp_config(external_port: int, tokens: list) -> None: def _print_mcp_config(external_port: int, tokens: list) -> None:
"""Print Claude Code MCP configuration.""" """Print Claude Code MCP configuration."""
# Use GRIST_MCP_URL if set, otherwise fall back to localhost
base_url = os.environ.get("GRIST_MCP_URL")
if base_url:
sse_url = f"{base_url.rstrip('/')}/sse"
else:
sse_url = f"http://localhost:{external_port}/sse"
print() print()
print("Claude Code MCP configuration (copy-paste to add):") print("Claude Code MCP configuration (copy-paste to add):")
for t in tokens: for t in tokens:
config = ( config = (
f'{{"type": "sse", "url": "http://localhost:{external_port}/sse", ' f'{{"type": "sse", "url": "{sse_url}", '
f'"headers": {{"Authorization": "Bearer {t.token}"}}}}' f'"headers": {{"Authorization": "Bearer {t.token}"}}}}'
) )
print(f" claude mcp add-json grist-{t.name} '{config}'") print(f" claude mcp add-json grist-{t.name} '{config}'")

192
src/grist_mcp/proxy.py Normal file
View File

@@ -0,0 +1,192 @@
"""HTTP proxy handler for session token access."""
from dataclasses import dataclass
from typing import Any
from grist_mcp.auth import Authenticator
from grist_mcp.grist_client import GristClient
from grist_mcp.session import SessionToken
class ProxyError(Exception):
"""Error during proxy request processing."""
def __init__(self, message: str, code: str):
self.message = message
self.code = code
super().__init__(message)
@dataclass
class ProxyRequest:
"""Parsed proxy request."""
method: str
table: str | None = None
records: list[dict] | None = None
record_ids: list[int] | None = None
filter: dict | None = None
sort: str | None = None
limit: int | None = None
query: str | None = None
table_id: str | None = None
columns: list[dict] | None = None
column_id: str | None = None
column_type: str | None = None
formula: str | None = None
type: str | None = None
METHODS_REQUIRING_TABLE = {
"get_records", "describe_table", "add_records", "update_records",
"delete_records", "add_column", "modify_column", "delete_column",
}
def parse_proxy_request(body: dict[str, Any]) -> ProxyRequest:
"""Parse and validate a proxy request body."""
if "method" not in body:
raise ProxyError("Missing required field: method", "INVALID_REQUEST")
method = body["method"]
if method in METHODS_REQUIRING_TABLE and "table" not in body:
raise ProxyError(f"Missing required field 'table' for method '{method}'", "INVALID_REQUEST")
return ProxyRequest(
method=method,
table=body.get("table"),
records=body.get("records"),
record_ids=body.get("record_ids"),
filter=body.get("filter"),
sort=body.get("sort"),
limit=body.get("limit"),
query=body.get("query"),
table_id=body.get("table_id"),
columns=body.get("columns"),
column_id=body.get("column_id"),
column_type=body.get("column_type"),
formula=body.get("formula"),
type=body.get("type"),
)
# Map methods to required permissions
METHOD_PERMISSIONS = {
"list_tables": "read",
"describe_table": "read",
"get_records": "read",
"sql_query": "read",
"add_records": "write",
"update_records": "write",
"delete_records": "write",
"create_table": "schema",
"add_column": "schema",
"modify_column": "schema",
"delete_column": "schema",
}
async def dispatch_proxy_request(
request: ProxyRequest,
session: SessionToken,
auth: Authenticator,
client: GristClient | None = None,
) -> dict[str, Any]:
"""Dispatch a proxy request to the appropriate handler."""
# Check permission
required_perm = METHOD_PERMISSIONS.get(request.method)
if required_perm is None:
raise ProxyError(f"Unknown method: {request.method}", "INVALID_REQUEST")
if required_perm not in session.permissions:
raise ProxyError(
f"Permission '{required_perm}' required for {request.method}",
"UNAUTHORIZED",
)
# Create client if not provided
if client is None:
doc = auth.get_document(session.document)
client = GristClient(doc)
# Dispatch to appropriate method
try:
if request.method == "list_tables":
data = await client.list_tables()
return {"success": True, "data": {"tables": data}}
elif request.method == "describe_table":
data = await client.describe_table(request.table)
return {"success": True, "data": {"table": request.table, "columns": data}}
elif request.method == "get_records":
data = await client.get_records(
request.table,
filter=request.filter,
sort=request.sort,
limit=request.limit,
)
return {"success": True, "data": {"records": data}}
elif request.method == "sql_query":
if request.query is None:
raise ProxyError("Missing required field: query", "INVALID_REQUEST")
data = await client.sql_query(request.query)
return {"success": True, "data": {"records": data}}
elif request.method == "add_records":
if request.records is None:
raise ProxyError("Missing required field: records", "INVALID_REQUEST")
data = await client.add_records(request.table, request.records)
return {"success": True, "data": {"record_ids": data}}
elif request.method == "update_records":
if request.records is None:
raise ProxyError("Missing required field: records", "INVALID_REQUEST")
await client.update_records(request.table, request.records)
return {"success": True, "data": {"updated": len(request.records)}}
elif request.method == "delete_records":
if request.record_ids is None:
raise ProxyError("Missing required field: record_ids", "INVALID_REQUEST")
await client.delete_records(request.table, request.record_ids)
return {"success": True, "data": {"deleted": len(request.record_ids)}}
elif request.method == "create_table":
if request.table_id is None or request.columns is None:
raise ProxyError("Missing required fields: table_id, columns", "INVALID_REQUEST")
data = await client.create_table(request.table_id, request.columns)
return {"success": True, "data": {"table_id": data}}
elif request.method == "add_column":
if request.column_id is None or request.column_type is None:
raise ProxyError("Missing required fields: column_id, column_type", "INVALID_REQUEST")
await client.add_column(
request.table, request.column_id, request.column_type,
formula=request.formula,
)
return {"success": True, "data": {"column_id": request.column_id}}
elif request.method == "modify_column":
if request.column_id is None:
raise ProxyError("Missing required field: column_id", "INVALID_REQUEST")
await client.modify_column(
request.table, request.column_id,
type=request.type,
formula=request.formula,
)
return {"success": True, "data": {"column_id": request.column_id}}
elif request.method == "delete_column":
if request.column_id is None:
raise ProxyError("Missing required field: column_id", "INVALID_REQUEST")
await client.delete_column(request.table, request.column_id)
return {"success": True, "data": {"deleted": request.column_id}}
else:
raise ProxyError(f"Unknown method: {request.method}", "INVALID_REQUEST")
except ProxyError:
raise
except Exception as e:
raise ProxyError(str(e), "GRIST_ERROR")

View File

@@ -7,6 +7,9 @@ from mcp.server import Server
from mcp.types import Tool, TextContent from mcp.types import Tool, TextContent
from grist_mcp.auth import Authenticator, Agent, AuthError from grist_mcp.auth import Authenticator, Agent, AuthError
from grist_mcp.session import SessionTokenManager
from grist_mcp.tools.session import get_proxy_documentation as _get_proxy_documentation
from grist_mcp.tools.session import request_session_token as _request_session_token
from grist_mcp.logging import get_logger, extract_stats, format_tool_log from grist_mcp.logging import get_logger, extract_stats, format_tool_log
logger = get_logger("server") logger = get_logger("server")
@@ -19,24 +22,33 @@ from grist_mcp.tools.read import sql_query as _sql_query
from grist_mcp.tools.write import add_records as _add_records from grist_mcp.tools.write import add_records as _add_records
from grist_mcp.tools.write import update_records as _update_records from grist_mcp.tools.write import update_records as _update_records
from grist_mcp.tools.write import delete_records as _delete_records from grist_mcp.tools.write import delete_records as _delete_records
from grist_mcp.tools.write import upload_attachment as _upload_attachment
from grist_mcp.tools.schema import create_table as _create_table from grist_mcp.tools.schema import create_table as _create_table
from grist_mcp.tools.schema import add_column as _add_column from grist_mcp.tools.schema import add_column as _add_column
from grist_mcp.tools.schema import modify_column as _modify_column from grist_mcp.tools.schema import modify_column as _modify_column
from grist_mcp.tools.schema import delete_column as _delete_column from grist_mcp.tools.schema import delete_column as _delete_column
def create_server(auth: Authenticator, agent: Agent) -> Server: def create_server(
auth: Authenticator,
agent: Agent,
token_manager: SessionTokenManager | None = None,
proxy_base_url: str | None = None,
) -> Server:
"""Create and configure the MCP server for an authenticated agent. """Create and configure the MCP server for an authenticated agent.
Args: Args:
auth: Authenticator instance for permission checks. auth: Authenticator instance for permission checks.
agent: The authenticated agent for this server instance. agent: The authenticated agent for this server instance.
token_manager: Optional session token manager for HTTP proxy access.
proxy_base_url: Base URL for the proxy endpoint (e.g., "https://example.com").
Returns: Returns:
Configured MCP Server instance. Configured MCP Server instance.
""" """
server = Server("grist-mcp") server = Server("grist-mcp")
_current_agent = agent _current_agent = agent
_proxy_base_url = proxy_base_url
@server.list_tools() @server.list_tools()
async def list_tools() -> list[Tool]: async def list_tools() -> list[Tool]:
@@ -207,6 +219,60 @@ def create_server(auth: Authenticator, agent: Agent) -> Server:
"required": ["document", "table", "column_id"], "required": ["document", "table", "column_id"],
}, },
), ),
Tool(
name="upload_attachment",
description="Upload a file attachment to a Grist document. Returns attachment ID for linking to records via update_records.",
inputSchema={
"type": "object",
"properties": {
"document": {
"type": "string",
"description": "Document name",
},
"filename": {
"type": "string",
"description": "Filename with extension (e.g., 'invoice.pdf')",
},
"content_base64": {
"type": "string",
"description": "File content as base64-encoded string",
},
"content_type": {
"type": "string",
"description": "MIME type (optional, auto-detected from filename)",
},
},
"required": ["document", "filename", "content_base64"],
},
),
Tool(
name="get_proxy_documentation",
description="Get complete documentation for the HTTP proxy API",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="request_session_token",
description="Request a short-lived token for direct HTTP API access. Use this to delegate bulk data operations to scripts.",
inputSchema={
"type": "object",
"properties": {
"document": {
"type": "string",
"description": "Document name to grant access to",
},
"permissions": {
"type": "array",
"items": {"type": "string", "enum": ["read", "write", "schema"]},
"description": "Permission levels to grant",
},
"ttl_seconds": {
"type": "integer",
"description": "Token lifetime in seconds (max 3600, default 300)",
},
},
"required": ["document", "permissions"],
},
),
] ]
@server.call_tool() @server.call_tool()
@@ -285,6 +351,24 @@ def create_server(auth: Authenticator, agent: Agent) -> Server:
_current_agent, auth, arguments["document"], arguments["table"], _current_agent, auth, arguments["document"], arguments["table"],
arguments["column_id"], arguments["column_id"],
) )
elif name == "upload_attachment":
result = await _upload_attachment(
_current_agent, auth, arguments["document"],
arguments["filename"], arguments["content_base64"],
content_type=arguments.get("content_type"),
)
elif name == "get_proxy_documentation":
result = await _get_proxy_documentation()
elif name == "request_session_token":
if token_manager is None:
return [TextContent(type="text", text="Session tokens not enabled")]
result = await _request_session_token(
_current_agent, auth, token_manager,
arguments["document"],
arguments["permissions"],
ttl_seconds=arguments.get("ttl_seconds", 300),
proxy_base_url=_proxy_base_url,
)
else: else:
return [TextContent(type="text", text=f"Unknown tool: {name}")] return [TextContent(type="text", text=f"Unknown tool: {name}")]

73
src/grist_mcp/session.py Normal file
View File

@@ -0,0 +1,73 @@
"""Session token management for HTTP proxy access."""
import secrets
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
MAX_TTL_SECONDS = 3600 # 1 hour
DEFAULT_TTL_SECONDS = 300 # 5 minutes
@dataclass
class SessionToken:
"""A short-lived session token for proxy access."""
token: str
document: str
permissions: list[str]
agent_name: str
created_at: datetime
expires_at: datetime
class SessionTokenManager:
"""Manages creation and validation of session tokens."""
def __init__(self):
self._tokens: dict[str, SessionToken] = {}
def create_token(
self,
agent_name: str,
document: str,
permissions: list[str],
ttl_seconds: int = DEFAULT_TTL_SECONDS,
) -> SessionToken:
"""Create a new session token.
TTL is capped at MAX_TTL_SECONDS (1 hour).
"""
now = datetime.now(timezone.utc)
token_str = f"sess_{secrets.token_urlsafe(32)}"
# Cap TTL at maximum
effective_ttl = min(ttl_seconds, MAX_TTL_SECONDS)
session = SessionToken(
token=token_str,
document=document,
permissions=permissions,
agent_name=agent_name,
created_at=now,
expires_at=now + timedelta(seconds=effective_ttl),
)
self._tokens[token_str] = session
return session
def validate_token(self, token: str) -> SessionToken | None:
"""Validate a session token.
Returns the SessionToken if valid and not expired, None otherwise.
Also removes expired tokens lazily.
"""
session = self._tokens.get(token)
if session is None:
return None
now = datetime.now(timezone.utc)
if session.expires_at < now:
# Token expired, remove it
del self._tokens[token]
return None
return session

View File

@@ -0,0 +1,158 @@
"""Session token tools for HTTP proxy access."""
from grist_mcp.auth import Agent, Authenticator, AuthError, Permission
from grist_mcp.session import SessionTokenManager
PROXY_DOCUMENTATION = {
"description": "HTTP proxy API for bulk data operations. Use request_session_token to get a short-lived token, then call the proxy endpoint directly from scripts.",
"endpoint": "POST /api/v1/proxy",
"endpoint_note": "The full URL is returned in the 'proxy_url' field of request_session_token response",
"authentication": "Bearer token in Authorization header",
"request_format": {
"method": "Operation name (required)",
"table": "Table name (required for most operations)",
},
"methods": {
"get_records": {
"description": "Fetch records from a table",
"fields": {
"table": "string",
"filter": "object (optional)",
"sort": "string (optional)",
"limit": "integer (optional)",
},
},
"sql_query": {
"description": "Run a read-only SQL query",
"fields": {"query": "string"},
},
"list_tables": {
"description": "List all tables in the document",
"fields": {},
},
"describe_table": {
"description": "Get column information for a table",
"fields": {"table": "string"},
},
"add_records": {
"description": "Add records to a table",
"fields": {"table": "string", "records": "array of objects"},
},
"update_records": {
"description": "Update existing records",
"fields": {"table": "string", "records": "array of {id, fields}"},
},
"delete_records": {
"description": "Delete records by ID",
"fields": {"table": "string", "record_ids": "array of integers"},
},
"create_table": {
"description": "Create a new table",
"fields": {"table_id": "string", "columns": "array of {id, type}"},
},
"add_column": {
"description": "Add a column to a table",
"fields": {
"table": "string",
"column_id": "string",
"column_type": "string",
"formula": "string (optional)",
},
},
"modify_column": {
"description": "Modify a column's type or formula",
"fields": {
"table": "string",
"column_id": "string",
"type": "string (optional)",
"formula": "string (optional)",
},
},
"delete_column": {
"description": "Delete a column",
"fields": {"table": "string", "column_id": "string"},
},
},
"response_format": {
"success": {"success": True, "data": "..."},
"error": {"success": False, "error": "message", "code": "ERROR_CODE"},
},
"error_codes": [
"UNAUTHORIZED",
"INVALID_TOKEN",
"TOKEN_EXPIRED",
"INVALID_REQUEST",
"GRIST_ERROR",
],
"example_script": """#!/usr/bin/env python3
import requests
import sys
# Use token and proxy_url from request_session_token response
token = sys.argv[1]
proxy_url = sys.argv[2]
response = requests.post(
proxy_url,
headers={'Authorization': f'Bearer {token}'},
json={
'method': 'add_records',
'table': 'Orders',
'records': [{'item': 'Widget', 'qty': 100}]
}
)
print(response.json())
""",
}
async def get_proxy_documentation() -> dict:
"""Return complete documentation for the HTTP proxy API."""
return PROXY_DOCUMENTATION
async def request_session_token(
agent: Agent,
auth: Authenticator,
token_manager: SessionTokenManager,
document: str,
permissions: list[str],
ttl_seconds: int = 300,
proxy_base_url: str | None = None,
) -> dict:
"""Request a short-lived session token for HTTP proxy access.
The token can only grant permissions the agent already has.
"""
# Verify agent has access to the document
# Check each requested permission
for perm_str in permissions:
try:
perm = Permission(perm_str)
except ValueError:
raise AuthError(f"Invalid permission: {perm_str}")
auth.authorize(agent, document, perm)
# Create the session token
session = token_manager.create_token(
agent_name=agent.name,
document=document,
permissions=permissions,
ttl_seconds=ttl_seconds,
)
# Build proxy URL - use base URL if provided, otherwise just path
proxy_path = "/api/v1/proxy"
if proxy_base_url:
proxy_url = f"{proxy_base_url.rstrip('/')}{proxy_path}"
else:
proxy_url = proxy_path
return {
"token": session.token,
"document": session.document,
"permissions": session.permissions,
"expires_at": session.expires_at.isoformat(),
"proxy_url": proxy_url,
}

View File

@@ -1,4 +1,7 @@
"""Write tools - create, update, delete records.""" """Write tools - create, update, delete records, upload attachments."""
import base64
import mimetypes
from grist_mcp.auth import Agent, Authenticator, Permission from grist_mcp.auth import Agent, Authenticator, Permission
from grist_mcp.grist_client import GristClient from grist_mcp.grist_client import GristClient
@@ -59,3 +62,50 @@ async def delete_records(
await client.delete_records(table, record_ids) await client.delete_records(table, record_ids)
return {"deleted": True} return {"deleted": True}
async def upload_attachment(
agent: Agent,
auth: Authenticator,
document: str,
filename: str,
content_base64: str,
content_type: str | None = None,
client: GristClient | None = None,
) -> dict:
"""Upload a file attachment to a document.
Args:
agent: The authenticated agent.
auth: Authenticator for permission checks.
document: Document name.
filename: Filename with extension.
content_base64: File content as base64-encoded string.
content_type: MIME type (auto-detected from filename if omitted).
client: Optional GristClient instance.
Returns:
Dict with attachment_id, filename, and size_bytes.
Raises:
ValueError: If content_base64 is not valid base64.
"""
auth.authorize(agent, document, Permission.WRITE)
# Decode base64 content
try:
content = base64.b64decode(content_base64)
except Exception:
raise ValueError("Invalid base64 encoding")
# Auto-detect MIME type if not provided
if content_type is None:
content_type, _ = mimetypes.guess_type(filename)
if content_type is None:
content_type = "application/octet-stream"
if client is None:
doc = auth.get_document(document)
client = GristClient(doc)
return await client.upload_attachment(filename, content, content_type)

View File

@@ -178,6 +178,15 @@ async def modify_column(request):
return JSONResponse({}) return JSONResponse({})
async def modify_columns(request):
"""PATCH /api/docs/{doc_id}/tables/{table_id}/columns - batch modify columns"""
doc_id = request.path_params["doc_id"]
table_id = request.path_params["table_id"]
body = await request.json()
log_request("PATCH", f"/api/docs/{doc_id}/tables/{table_id}/columns", body)
return JSONResponse({})
async def delete_column(request): async def delete_column(request):
"""DELETE /api/docs/{doc_id}/tables/{table_id}/columns/{col_id}""" """DELETE /api/docs/{doc_id}/tables/{table_id}/columns/{col_id}"""
doc_id = request.path_params["doc_id"] doc_id = request.path_params["doc_id"]
@@ -199,6 +208,7 @@ app = Starlette(
Route("/api/docs/{doc_id}/tables", endpoint=create_tables, methods=["POST"]), Route("/api/docs/{doc_id}/tables", endpoint=create_tables, methods=["POST"]),
Route("/api/docs/{doc_id}/tables/{table_id}/columns", endpoint=get_table_columns), Route("/api/docs/{doc_id}/tables/{table_id}/columns", endpoint=get_table_columns),
Route("/api/docs/{doc_id}/tables/{table_id}/columns", endpoint=add_column, methods=["POST"]), Route("/api/docs/{doc_id}/tables/{table_id}/columns", endpoint=add_column, methods=["POST"]),
Route("/api/docs/{doc_id}/tables/{table_id}/columns", endpoint=modify_columns, methods=["PATCH"]),
Route("/api/docs/{doc_id}/tables/{table_id}/columns/{col_id}", endpoint=modify_column, methods=["PATCH"]), Route("/api/docs/{doc_id}/tables/{table_id}/columns/{col_id}", endpoint=modify_column, methods=["PATCH"]),
Route("/api/docs/{doc_id}/tables/{table_id}/columns/{col_id}", endpoint=delete_column, methods=["DELETE"]), Route("/api/docs/{doc_id}/tables/{table_id}/columns/{col_id}", endpoint=delete_column, methods=["DELETE"]),
Route("/api/docs/{doc_id}/tables/{table_id}/records", endpoint=get_records), Route("/api/docs/{doc_id}/tables/{table_id}/records", endpoint=get_records),

View File

@@ -9,12 +9,14 @@ from mcp.client.sse import sse_client
GRIST_MCP_URL = os.environ.get("GRIST_MCP_URL", "http://localhost:3000") GRIST_MCP_URL = os.environ.get("GRIST_MCP_URL", "http://localhost:3000")
GRIST_MCP_TOKEN = os.environ.get("GRIST_MCP_TOKEN", "test-token")
@asynccontextmanager @asynccontextmanager
async def create_mcp_session(): async def create_mcp_session():
"""Create and yield an MCP session.""" """Create and yield an MCP session."""
async with sse_client(f"{GRIST_MCP_URL}/sse") as (read_stream, write_stream): headers = {"Authorization": f"Bearer {GRIST_MCP_TOKEN}"}
async with sse_client(f"{GRIST_MCP_URL}/sse", headers=headers) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session: async with ClientSession(read_stream, write_stream) as session:
await session.initialize() await session.initialize()
yield session yield session
@@ -44,12 +46,14 @@ async def test_mcp_protocol_compliance(services_ready):
"add_column", "add_column",
"modify_column", "modify_column",
"delete_column", "delete_column",
"get_proxy_documentation",
"request_session_token",
] ]
for expected in expected_tools: for expected in expected_tools:
assert expected in tool_names, f"Missing tool: {expected}" assert expected in tool_names, f"Missing tool: {expected}"
assert len(result.tools) == 12, f"Expected 12 tools, got {len(result.tools)}" assert len(result.tools) == 14, f"Expected 14 tools, got {len(result.tools)}"
# Test 3: All tools have descriptions # Test 3: All tools have descriptions
for tool in result.tools: for tool in result.tools:

View File

@@ -0,0 +1,52 @@
"""Integration tests for session token proxy."""
import os
import pytest
import httpx
GRIST_MCP_URL = os.environ.get("GRIST_MCP_URL", "http://localhost:3000")
GRIST_MCP_TOKEN = os.environ.get("GRIST_MCP_TOKEN")
@pytest.fixture
def mcp_client():
"""Client for MCP SSE endpoint."""
return httpx.Client(
base_url=GRIST_MCP_URL,
headers={"Authorization": f"Bearer {GRIST_MCP_TOKEN}"},
)
@pytest.fixture
def proxy_client():
"""Client for proxy endpoint (session token set per-test)."""
return httpx.Client(base_url=GRIST_MCP_URL)
@pytest.mark.integration
def test_full_session_proxy_flow(mcp_client, proxy_client):
"""Test: request token via MCP, use token to call proxy."""
# This test requires a running grist-mcp server with proper config
# Skip if not configured
if not GRIST_MCP_TOKEN:
pytest.skip("GRIST_MCP_TOKEN not set")
# Step 1: Request session token (would be via MCP in real usage)
# For integration test, we test the proxy endpoint directly
# This is a placeholder - full MCP integration would use SSE
# Step 2: Use proxy endpoint
# Note: Need a valid session token to test this fully
# For now, verify endpoint exists and rejects bad tokens
response = proxy_client.post(
"/api/v1/proxy",
headers={"Authorization": "Bearer invalid_token"},
json={"method": "list_tables"},
)
assert response.status_code == 401
data = response.json()
assert data["success"] is False
assert data["code"] in ["INVALID_TOKEN", "TOKEN_EXPIRED"]

View File

@@ -12,12 +12,14 @@ from mcp.client.sse import sse_client
GRIST_MCP_URL = os.environ.get("GRIST_MCP_URL", "http://localhost:3000") GRIST_MCP_URL = os.environ.get("GRIST_MCP_URL", "http://localhost:3000")
MOCK_GRIST_URL = os.environ.get("MOCK_GRIST_URL", "http://localhost:8484") MOCK_GRIST_URL = os.environ.get("MOCK_GRIST_URL", "http://localhost:8484")
GRIST_MCP_TOKEN = os.environ.get("GRIST_MCP_TOKEN", "test-token")
@asynccontextmanager @asynccontextmanager
async def create_mcp_session(): async def create_mcp_session():
"""Create and yield an MCP session.""" """Create and yield an MCP session."""
async with sse_client(f"{GRIST_MCP_URL}/sse") as (read_stream, write_stream): headers = {"Authorization": f"Bearer {GRIST_MCP_TOKEN}"}
async with sse_client(f"{GRIST_MCP_URL}/sse", headers=headers) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session: async with ClientSession(read_stream, write_stream) as session:
await session.initialize() await session.initialize()
yield session yield session
@@ -194,7 +196,7 @@ async def test_all_tools(services_ready):
data = json.loads(result.content[0].text) data = json.loads(result.content[0].text)
assert "modified" in data assert "modified" in data
log = get_mock_request_log() log = get_mock_request_log()
patch_cols = [e for e in log if e["method"] == "PATCH" and "/columns/" in e["path"]] patch_cols = [e for e in log if e["method"] == "PATCH" and "/columns" in e["path"]]
assert len(patch_cols) >= 1 assert len(patch_cols) >= 1
# Test delete_column # Test delete_column

View File

@@ -196,3 +196,43 @@ def test_sql_validation_rejects_multiple_statements(client):
def test_sql_validation_allows_trailing_semicolon(client): def test_sql_validation_allows_trailing_semicolon(client):
# Should not raise # Should not raise
client._validate_sql_query("SELECT * FROM users;") client._validate_sql_query("SELECT * FROM users;")
# Attachment tests
@pytest.mark.asyncio
async def test_upload_attachment(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/attachments",
method="POST",
json=[42],
)
result = await client.upload_attachment(
filename="invoice.pdf",
content=b"PDF content here",
content_type="application/pdf",
)
assert result == {
"attachment_id": 42,
"filename": "invoice.pdf",
"size_bytes": 16,
}
@pytest.mark.asyncio
async def test_upload_attachment_default_content_type(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/attachments",
method="POST",
json=[99],
)
result = await client.upload_attachment(
filename="data.bin",
content=b"\x00\x01\x02",
)
assert result["attachment_id"] == 99
assert result["size_bytes"] == 3

98
tests/unit/test_proxy.py Normal file
View File

@@ -0,0 +1,98 @@
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
from grist_mcp.proxy import parse_proxy_request, ProxyRequest, ProxyError, dispatch_proxy_request
from grist_mcp.session import SessionToken
@pytest.fixture
def mock_session():
return SessionToken(
token="sess_test",
document="sales",
permissions=["read", "write"],
agent_name="test-agent",
created_at=datetime.now(timezone.utc),
expires_at=datetime.now(timezone.utc),
)
@pytest.fixture
def mock_auth():
auth = MagicMock()
doc = MagicMock()
doc.url = "https://grist.example.com"
doc.doc_id = "abc123"
doc.api_key = "key"
auth.get_document.return_value = doc
return auth
def test_parse_proxy_request_valid_add_records():
body = {
"method": "add_records",
"table": "Orders",
"records": [{"item": "Widget", "qty": 10}],
}
request = parse_proxy_request(body)
assert request.method == "add_records"
assert request.table == "Orders"
assert request.records == [{"item": "Widget", "qty": 10}]
def test_parse_proxy_request_missing_method():
body = {"table": "Orders"}
with pytest.raises(ProxyError) as exc_info:
parse_proxy_request(body)
assert exc_info.value.code == "INVALID_REQUEST"
assert "method" in str(exc_info.value)
@pytest.mark.asyncio
async def test_dispatch_add_records(mock_session, mock_auth):
request = ProxyRequest(
method="add_records",
table="Orders",
records=[{"item": "Widget"}],
)
mock_client = AsyncMock()
mock_client.add_records.return_value = [1, 2, 3]
result = await dispatch_proxy_request(
request, mock_session, mock_auth, client=mock_client
)
assert result["success"] is True
assert result["data"]["record_ids"] == [1, 2, 3]
mock_client.add_records.assert_called_once_with("Orders", [{"item": "Widget"}])
@pytest.mark.asyncio
async def test_dispatch_denies_without_permission(mock_auth):
# Session only has read permission
session = SessionToken(
token="sess_test",
document="sales",
permissions=["read"], # No write
agent_name="test-agent",
created_at=datetime.now(timezone.utc),
expires_at=datetime.now(timezone.utc),
)
request = ProxyRequest(
method="add_records", # Requires write
table="Orders",
records=[{"item": "Widget"}],
)
with pytest.raises(ProxyError) as exc_info:
await dispatch_proxy_request(request, session, mock_auth)
assert exc_info.value.code == "UNAUTHORIZED"

View File

@@ -52,6 +52,50 @@ tokens:
assert "add_column" in tool_names assert "add_column" in tool_names
assert "modify_column" in tool_names assert "modify_column" in tool_names
assert "delete_column" in tool_names assert "delete_column" in tool_names
assert "upload_attachment" in tool_names
# Should have all 12 tools # Session tools (always registered)
assert len(result.root.tools) == 12 assert "get_proxy_documentation" in tool_names
assert "request_session_token" in tool_names
# Should have all 15 tools
assert len(result.root.tools) == 15
@pytest.mark.asyncio
async def test_create_server_registers_session_tools(tmp_path):
from grist_mcp.session import SessionTokenManager
config_file = tmp_path / "config.yaml"
config_file.write_text("""
documents:
test-doc:
url: https://grist.example.com
doc_id: abc123
api_key: test-key
tokens:
- token: valid-token
name: test-agent
scope:
- document: test-doc
permissions: [read, write, schema]
""")
config = load_config(str(config_file))
auth = Authenticator(config)
agent = auth.authenticate("valid-token")
token_manager = SessionTokenManager()
server = create_server(auth, agent, token_manager)
# Get the list_tools handler and call it
handler = server.request_handlers.get(ListToolsRequest)
assert handler is not None
req = ListToolsRequest(method="tools/list")
result = await handler(req)
tool_names = [t.name for t in result.root.tools]
assert "get_proxy_documentation" in tool_names
assert "request_session_token" in tool_names

View File

@@ -0,0 +1,81 @@
import pytest
from datetime import datetime, timedelta, timezone
from grist_mcp.session import SessionTokenManager, SessionToken
def test_create_token_returns_valid_session_token():
manager = SessionTokenManager()
token = manager.create_token(
agent_name="test-agent",
document="sales",
permissions=["read", "write"],
ttl_seconds=300,
)
assert token.token.startswith("sess_")
assert len(token.token) > 20
assert token.document == "sales"
assert token.permissions == ["read", "write"]
assert token.agent_name == "test-agent"
assert token.expires_at > datetime.now(timezone.utc)
assert token.expires_at < datetime.now(timezone.utc) + timedelta(seconds=310)
def test_create_token_caps_ttl_at_maximum():
manager = SessionTokenManager()
# Request 2 hours, should be capped at 1 hour
token = manager.create_token(
agent_name="test-agent",
document="sales",
permissions=["read"],
ttl_seconds=7200,
)
# Should be capped at 3600 seconds (1 hour)
max_expires = datetime.now(timezone.utc) + timedelta(seconds=3610)
assert token.expires_at < max_expires
def test_validate_token_returns_session_for_valid_token():
manager = SessionTokenManager()
created = manager.create_token(
agent_name="test-agent",
document="sales",
permissions=["read"],
ttl_seconds=300,
)
session = manager.validate_token(created.token)
assert session is not None
assert session.document == "sales"
assert session.agent_name == "test-agent"
def test_validate_token_returns_none_for_unknown_token():
manager = SessionTokenManager()
session = manager.validate_token("sess_unknown_token")
assert session is None
def test_validate_token_returns_none_for_expired_token():
manager = SessionTokenManager()
created = manager.create_token(
agent_name="test-agent",
document="sales",
permissions=["read"],
ttl_seconds=1,
)
# Wait for expiry
import time
time.sleep(1.5)
session = manager.validate_token(created.token)
assert session is None

View File

@@ -0,0 +1,122 @@
import pytest
from grist_mcp.tools.session import get_proxy_documentation, request_session_token
from grist_mcp.auth import Authenticator, Agent, AuthError
from grist_mcp.config import Config, Document, Token, TokenScope
from grist_mcp.session import SessionTokenManager
@pytest.fixture
def sample_config():
return Config(
documents={
"sales": Document(
url="https://grist.example.com",
doc_id="abc123",
api_key="key",
),
},
tokens=[
Token(
token="agent-token",
name="test-agent",
scope=[
TokenScope(document="sales", permissions=["read", "write"]),
],
),
],
)
@pytest.fixture
def auth_and_agent(sample_config):
auth = Authenticator(sample_config)
agent = auth.authenticate("agent-token")
return auth, agent
@pytest.mark.asyncio
async def test_get_proxy_documentation_returns_complete_spec():
result = await get_proxy_documentation()
assert "description" in result
assert "endpoint" in result
assert result["endpoint"] == "POST /api/v1/proxy"
assert "authentication" in result
assert "methods" in result
assert "add_records" in result["methods"]
assert "get_records" in result["methods"]
assert "example_script" in result
@pytest.mark.asyncio
async def test_request_session_token_creates_valid_token(auth_and_agent):
auth, agent = auth_and_agent
manager = SessionTokenManager()
result = await request_session_token(
agent=agent,
auth=auth,
token_manager=manager,
document="sales",
permissions=["read", "write"],
ttl_seconds=300,
)
assert "token" in result
assert result["token"].startswith("sess_")
assert result["document"] == "sales"
assert result["permissions"] == ["read", "write"]
assert "expires_at" in result
assert result["proxy_url"] == "/api/v1/proxy"
@pytest.mark.asyncio
async def test_request_session_token_rejects_unauthorized_document(sample_config):
auth = Authenticator(sample_config)
agent = auth.authenticate("agent-token")
manager = SessionTokenManager()
with pytest.raises(AuthError, match="Document not in scope"):
await request_session_token(
agent=agent,
auth=auth,
token_manager=manager,
document="unauthorized_doc",
permissions=["read"],
ttl_seconds=300,
)
@pytest.mark.asyncio
async def test_request_session_token_rejects_unauthorized_permission(sample_config):
auth = Authenticator(sample_config)
agent = auth.authenticate("agent-token")
manager = SessionTokenManager()
# Agent has read/write on sales, but not schema
with pytest.raises(AuthError, match="Permission denied"):
await request_session_token(
agent=agent,
auth=auth,
token_manager=manager,
document="sales",
permissions=["read", "schema"], # schema not granted
ttl_seconds=300,
)
@pytest.mark.asyncio
async def test_request_session_token_rejects_invalid_permission(sample_config):
auth = Authenticator(sample_config)
agent = auth.authenticate("agent-token")
manager = SessionTokenManager()
with pytest.raises(AuthError, match="Invalid permission"):
await request_session_token(
agent=agent,
auth=auth,
token_manager=manager,
document="sales",
permissions=["read", "invalid_perm"],
ttl_seconds=300,
)

View File

@@ -1,7 +1,9 @@
import base64
import pytest import pytest
from unittest.mock import AsyncMock from unittest.mock import AsyncMock
from grist_mcp.tools.write import add_records, update_records, delete_records from grist_mcp.tools.write import add_records, update_records, delete_records, upload_attachment
from grist_mcp.auth import Authenticator, AuthError from grist_mcp.auth import Authenticator, AuthError
from grist_mcp.config import Config, Document, Token, TokenScope from grist_mcp.config import Config, Document, Token, TokenScope
@@ -94,3 +96,105 @@ async def test_delete_records(auth, mock_client):
) )
assert result == {"deleted": True} assert result == {"deleted": True}
# Upload attachment tests
@pytest.fixture
def mock_client_with_attachment():
client = AsyncMock()
client.upload_attachment.return_value = {
"attachment_id": 42,
"filename": "invoice.pdf",
"size_bytes": 1024,
}
return client
@pytest.mark.asyncio
async def test_upload_attachment_success(auth, mock_client_with_attachment):
agent = auth.authenticate("write-token")
content = b"PDF content"
content_base64 = base64.b64encode(content).decode()
result = await upload_attachment(
agent, auth, "budget",
filename="invoice.pdf",
content_base64=content_base64,
client=mock_client_with_attachment,
)
assert result == {
"attachment_id": 42,
"filename": "invoice.pdf",
"size_bytes": 1024,
}
mock_client_with_attachment.upload_attachment.assert_called_once_with(
"invoice.pdf", content, "application/pdf"
)
@pytest.mark.asyncio
async def test_upload_attachment_invalid_base64(auth, mock_client_with_attachment):
agent = auth.authenticate("write-token")
with pytest.raises(ValueError, match="Invalid base64 encoding"):
await upload_attachment(
agent, auth, "budget",
filename="test.txt",
content_base64="not-valid-base64!!!",
client=mock_client_with_attachment,
)
@pytest.mark.asyncio
async def test_upload_attachment_auth_required(auth, mock_client_with_attachment):
agent = auth.authenticate("read-token")
content_base64 = base64.b64encode(b"test").decode()
with pytest.raises(AuthError, match="Permission denied"):
await upload_attachment(
agent, auth, "budget",
filename="test.txt",
content_base64=content_base64,
client=mock_client_with_attachment,
)
@pytest.mark.asyncio
async def test_upload_attachment_mime_detection(auth, mock_client_with_attachment):
agent = auth.authenticate("write-token")
content = b"PNG content"
content_base64 = base64.b64encode(content).decode()
await upload_attachment(
agent, auth, "budget",
filename="image.png",
content_base64=content_base64,
client=mock_client_with_attachment,
)
# Should auto-detect image/png from filename
mock_client_with_attachment.upload_attachment.assert_called_once_with(
"image.png", content, "image/png"
)
@pytest.mark.asyncio
async def test_upload_attachment_explicit_content_type(auth, mock_client_with_attachment):
agent = auth.authenticate("write-token")
content = b"custom content"
content_base64 = base64.b64encode(content).decode()
await upload_attachment(
agent, auth, "budget",
filename="file.dat",
content_base64=content_base64,
content_type="application/custom",
client=mock_client_with_attachment,
)
# Should use explicit content type
mock_client_with_attachment.upload_attachment.assert_called_once_with(
"file.dat", content, "application/custom"
)

2
uv.lock generated
View File

@@ -153,7 +153,7 @@ wheels = [
[[package]] [[package]]
name = "grist-mcp" name = "grist-mcp"
version = "1.1.0" version = "1.2.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "httpx" }, { name = "httpx" },