3 Commits

Author SHA1 Message Date
c868e8a7fa chore: bump version to 1.4.0 and update changelog
All checks were successful
Build and Push Docker Image / build (push) Successful in 11s
2026-01-12 12:14:49 -05:00
734cc0a525 feat: add attachment download via proxy endpoint
Add GET /api/v1/attachments/{id} endpoint for downloading attachments
through the MCP proxy. This complements the existing upload endpoint and
enables complete attachment workflows via the proxy API.
2026-01-12 12:13:23 -05:00
a7c87128ef feat: replace MCP attachment tool with proxy endpoint
All checks were successful
Build and Push Docker Image / build (push) Successful in 8s
The MCP tool approach was impractical because it required the LLM to
generate large base64 strings token-by-token, causing timeouts.

Changes:
- Remove upload_attachment MCP tool
- Add POST /api/v1/attachments endpoint for multipart/form-data uploads
- Update proxy documentation to show both endpoints
- Uses existing GristClient.upload_attachment() method
- Requires write permission in session token
2026-01-03 20:26:36 -05:00
12 changed files with 410 additions and 213 deletions

View File

@@ -5,30 +5,81 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.4.0] - 2026-01-12
### Added
#### Attachment Download via Proxy
- **`GET /api/v1/attachments/{id}`**: New HTTP endpoint for downloading attachments
- Returns binary content with appropriate `Content-Type` and `Content-Disposition` headers
- Requires read permission in session token
- Complements the existing upload endpoint for complete attachment workflows
#### Usage
```bash
# Get session token with read permission
TOKEN=$(curl -s ... | jq -r '.token')
# Download attachment
curl -H "Authorization: Bearer $TOKEN" \
https://example.com/api/v1/attachments/42 \
-o downloaded.pdf
```
```python
# Python example
import requests
response = requests.get(
f'{base_url}/api/v1/attachments/42',
headers={'Authorization': f'Bearer {token}'}
)
with open('downloaded.pdf', 'wb') as f:
f.write(response.content)
```
## [1.3.0] - 2026-01-03 ## [1.3.0] - 2026-01-03
### Added ### Added
#### Attachment Upload #### Attachment Upload via Proxy
- **`upload_attachment` MCP tool**: Upload files to Grist documents - **`POST /api/v1/attachments`**: New HTTP endpoint for file uploads
- Base64-encoded content input (required for JSON-based MCP protocol) - Uses `multipart/form-data` for efficient binary transfer (no base64 overhead)
- Automatic MIME type detection from filename - Automatic MIME type detection from filename
- Returns attachment ID for linking to records via `update_records` - Returns attachment ID for linking to records via `update_records`
- Requires write permission in session token
#### Usage #### Usage
```python ```bash
# 1. Upload attachment # Get session token with write permission
result = upload_attachment( TOKEN=$(curl -s ... | jq -r '.token')
document="accounting",
filename="invoice.pdf",
content_base64="JVBERi0xLjQK..."
)
# Returns: {"attachment_id": 42, "filename": "invoice.pdf", "size_bytes": 31395}
# 2. Link to record # Upload file
update_records(document="accounting", table="Bills", records=[ curl -X POST \
{"id": 1, "fields": {"Attachment": [42]}} -H "Authorization: Bearer $TOKEN" \
]) -F "file=@invoice.pdf" \
https://example.com/api/v1/attachments
# Returns: {"success": true, "data": {"attachment_id": 42, "filename": "invoice.pdf", "size_bytes": 31395}}
```
```python
# Python example
import requests
response = requests.post(
f'{proxy_url.replace("/proxy", "/attachments")}',
headers={'Authorization': f'Bearer {token}'},
files={'file': open('invoice.pdf', 'rb')}
)
attachment_id = response.json()['data']['attachment_id']
# Link to record via proxy
requests.post(proxy_url, headers={'Authorization': f'Bearer {token}'}, json={
'method': 'update_records',
'table': 'Bills',
'records': [{'id': 1, 'fields': {'Attachment': [attachment_id]}}]
})
``` ```
## [1.2.0] - 2026-01-02 ## [1.2.0] - 2026-01-02

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "grist-mcp" name = "grist-mcp"
version = "1.3.0" version = "1.4.0"
description = "MCP server for AI agents to interact with Grist documents" description = "MCP server for AI agents to interact with Grist documents"
requires-python = ">=3.14" requires-python = ">=3.14"
dependencies = [ dependencies = [

View File

@@ -149,6 +149,38 @@ class GristClient:
"size_bytes": len(content), "size_bytes": len(content),
} }
async def download_attachment(self, attachment_id: int) -> dict:
"""Download an attachment by ID.
Args:
attachment_id: The ID of the attachment to download.
Returns:
Dict with content (bytes), content_type, and filename.
"""
import re
async with httpx.AsyncClient(timeout=self._timeout) as client:
response = await client.get(
f"{self._base_url}/attachments/{attachment_id}/download",
headers=self._headers,
)
response.raise_for_status()
# Extract filename from Content-Disposition header
content_disp = response.headers.get("content-disposition", "")
filename = None
if "filename=" in content_disp:
match = re.search(r'filename="?([^";]+)"?', content_disp)
if match:
filename = match.group(1)
return {
"content": response.content,
"content_type": response.headers.get("content-type", "application/octet-stream"),
"filename": filename,
}
# Schema operations # Schema operations
async def create_table(self, table_id: str, columns: list[dict]) -> str: async def create_table(self, table_id: str, columns: list[dict]) -> str:

View File

@@ -14,6 +14,7 @@ from grist_mcp.config import Config, load_config
from grist_mcp.auth import Authenticator, AuthError from grist_mcp.auth import Authenticator, AuthError
from grist_mcp.session import SessionTokenManager from grist_mcp.session import SessionTokenManager
from grist_mcp.proxy import parse_proxy_request, dispatch_proxy_request, ProxyError from grist_mcp.proxy import parse_proxy_request, dispatch_proxy_request, ProxyError
from grist_mcp.grist_client import GristClient
from grist_mcp.logging import setup_logging from grist_mcp.logging import setup_logging
@@ -59,6 +60,62 @@ async def send_json_response(send: Send, status: int, data: dict) -> None:
}) })
def _parse_multipart(content_type: str, body: bytes) -> tuple[str | None, bytes | None]:
"""Parse multipart/form-data to extract uploaded file.
Returns (filename, content) or (None, None) if parsing fails.
"""
import re
# Extract boundary from content-type
match = re.search(r'boundary=([^\s;]+)', content_type)
if not match:
return None, None
boundary = match.group(1).encode()
if boundary.startswith(b'"') and boundary.endswith(b'"'):
boundary = boundary[1:-1]
# Split by boundary
parts = body.split(b'--' + boundary)
for part in parts:
if b'Content-Disposition' not in part:
continue
# Split headers from content
if b'\r\n\r\n' in part:
header_section, content = part.split(b'\r\n\r\n', 1)
elif b'\n\n' in part:
header_section, content = part.split(b'\n\n', 1)
else:
continue
headers = header_section.decode('utf-8', errors='replace')
# Check if this is a file upload
if 'filename=' not in headers:
continue
# Extract filename
filename_match = re.search(r'filename="([^"]+)"', headers)
if not filename_match:
filename_match = re.search(r"filename=([^\s;]+)", headers)
if not filename_match:
continue
filename = filename_match.group(1)
# Remove trailing boundary marker and whitespace
content = content.rstrip()
if content.endswith(b'--'):
content = content[:-2].rstrip()
return filename, content
return None, None
CONFIG_TEMPLATE = """\ CONFIG_TEMPLATE = """\
# grist-mcp configuration # grist-mcp configuration
# #
@@ -229,6 +286,144 @@ def create_app(config: Config):
"code": e.code, "code": e.code,
}) })
async def handle_attachments(scope: Scope, receive: Receive, send: Send) -> None:
"""Handle file attachment uploads via multipart/form-data."""
# Extract token
token = _get_bearer_token(scope)
if not token:
await send_json_response(send, 401, {
"success": False,
"error": "Missing Authorization header",
"code": "INVALID_TOKEN",
})
return
# Validate session token
session = token_manager.validate_token(token)
if session is None:
await send_json_response(send, 401, {
"success": False,
"error": "Invalid or expired token",
"code": "TOKEN_EXPIRED",
})
return
# Check write permission
if "write" not in session.permissions:
await send_json_response(send, 403, {
"success": False,
"error": "Write permission required for attachment upload",
"code": "UNAUTHORIZED",
})
return
# Get content-type header
headers = dict(scope.get("headers", []))
content_type = headers.get(b"content-type", b"").decode()
if not content_type.startswith("multipart/form-data"):
await send_json_response(send, 400, {
"success": False,
"error": "Content-Type must be multipart/form-data",
"code": "INVALID_REQUEST",
})
return
# Read request body
body = b""
while True:
message = await receive()
body += message.get("body", b"")
if not message.get("more_body", False):
break
# Parse multipart
filename, content = _parse_multipart(content_type, body)
if filename is None or content is None:
await send_json_response(send, 400, {
"success": False,
"error": "No file found in request",
"code": "INVALID_REQUEST",
})
return
# Upload to Grist
try:
doc = auth.get_document(session.document)
client = GristClient(doc)
result = await client.upload_attachment(filename, content)
await send_json_response(send, 200, {
"success": True,
"data": result,
})
except Exception as e:
await send_json_response(send, 500, {
"success": False,
"error": str(e),
"code": "GRIST_ERROR",
})
async def handle_attachment_download(
scope: Scope, receive: Receive, send: Send, attachment_id: int
) -> None:
"""Handle attachment download by ID."""
# Extract token
token = _get_bearer_token(scope)
if not token:
await send_json_response(send, 401, {
"success": False,
"error": "Missing Authorization header",
"code": "INVALID_TOKEN",
})
return
# Validate session token
session = token_manager.validate_token(token)
if session is None:
await send_json_response(send, 401, {
"success": False,
"error": "Invalid or expired token",
"code": "TOKEN_EXPIRED",
})
return
# Check read permission
if "read" not in session.permissions:
await send_json_response(send, 403, {
"success": False,
"error": "Read permission required for attachment download",
"code": "UNAUTHORIZED",
})
return
# Download from Grist
try:
doc = auth.get_document(session.document)
client = GristClient(doc)
result = await client.download_attachment(attachment_id)
# Build response headers
headers = [[b"content-type", result["content_type"].encode()]]
if result["filename"]:
disposition = f'attachment; filename="{result["filename"]}"'
headers.append([b"content-disposition", disposition.encode()])
await send({
"type": "http.response.start",
"status": 200,
"headers": headers,
})
await send({
"type": "http.response.body",
"body": result["content"],
})
except Exception as e:
await send_json_response(send, 500, {
"success": False,
"error": str(e),
"code": "GRIST_ERROR",
})
async def app(scope: Scope, receive: Receive, send: Send) -> None: async def app(scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http": if scope["type"] != "http":
return return
@@ -244,6 +439,19 @@ def create_app(config: Config):
await handle_messages(scope, receive, send) await handle_messages(scope, receive, send)
elif path == "/api/v1/proxy" and method == "POST": elif path == "/api/v1/proxy" and method == "POST":
await handle_proxy(scope, receive, send) await handle_proxy(scope, receive, send)
elif path == "/api/v1/attachments" and method == "POST":
await handle_attachments(scope, receive, send)
elif path.startswith("/api/v1/attachments/") and method == "GET":
# Parse attachment ID from path: /api/v1/attachments/{id}
try:
attachment_id = int(path.split("/")[-1])
await handle_attachment_download(scope, receive, send, attachment_id)
except ValueError:
await send_json_response(send, 400, {
"success": False,
"error": "Invalid attachment ID",
"code": "INVALID_REQUEST",
})
else: else:
await handle_not_found(scope, receive, send) await handle_not_found(scope, receive, send)

View File

@@ -22,7 +22,6 @@ from grist_mcp.tools.read import sql_query as _sql_query
from grist_mcp.tools.write import add_records as _add_records from grist_mcp.tools.write import add_records as _add_records
from grist_mcp.tools.write import update_records as _update_records from grist_mcp.tools.write import update_records as _update_records
from grist_mcp.tools.write import delete_records as _delete_records from grist_mcp.tools.write import delete_records as _delete_records
from grist_mcp.tools.write import upload_attachment as _upload_attachment
from grist_mcp.tools.schema import create_table as _create_table from grist_mcp.tools.schema import create_table as _create_table
from grist_mcp.tools.schema import add_column as _add_column from grist_mcp.tools.schema import add_column as _add_column
from grist_mcp.tools.schema import modify_column as _modify_column from grist_mcp.tools.schema import modify_column as _modify_column
@@ -219,32 +218,6 @@ def create_server(
"required": ["document", "table", "column_id"], "required": ["document", "table", "column_id"],
}, },
), ),
Tool(
name="upload_attachment",
description="Upload a file attachment to a Grist document. Returns attachment ID for linking to records via update_records.",
inputSchema={
"type": "object",
"properties": {
"document": {
"type": "string",
"description": "Document name",
},
"filename": {
"type": "string",
"description": "Filename with extension (e.g., 'invoice.pdf')",
},
"content_base64": {
"type": "string",
"description": "File content as base64-encoded string",
},
"content_type": {
"type": "string",
"description": "MIME type (optional, auto-detected from filename)",
},
},
"required": ["document", "filename", "content_base64"],
},
),
Tool( Tool(
name="get_proxy_documentation", name="get_proxy_documentation",
description="Get complete documentation for the HTTP proxy API", description="Get complete documentation for the HTTP proxy API",
@@ -351,12 +324,6 @@ def create_server(
_current_agent, auth, arguments["document"], arguments["table"], _current_agent, auth, arguments["document"], arguments["table"],
arguments["column_id"], arguments["column_id"],
) )
elif name == "upload_attachment":
result = await _upload_attachment(
_current_agent, auth, arguments["document"],
arguments["filename"], arguments["content_base64"],
content_type=arguments.get("content_type"),
)
elif name == "get_proxy_documentation": elif name == "get_proxy_documentation":
result = await _get_proxy_documentation() result = await _get_proxy_documentation()
elif name == "request_session_token": elif name == "request_session_token":

View File

@@ -6,9 +6,43 @@ from grist_mcp.session import SessionTokenManager
PROXY_DOCUMENTATION = { PROXY_DOCUMENTATION = {
"description": "HTTP proxy API for bulk data operations. Use request_session_token to get a short-lived token, then call the proxy endpoint directly from scripts.", "description": "HTTP proxy API for bulk data operations. Use request_session_token to get a short-lived token, then call the proxy endpoint directly from scripts.",
"endpoint": "POST /api/v1/proxy", "endpoints": {
"endpoint_note": "The full URL is returned in the 'proxy_url' field of request_session_token response", "proxy": "POST /api/v1/proxy - JSON operations (CRUD, schema)",
"attachments_upload": "POST /api/v1/attachments - File uploads (multipart/form-data)",
"attachments_download": "GET /api/v1/attachments/{id} - File downloads (binary response)",
},
"endpoint_note": "The full URL is returned in the 'proxy_url' field of request_session_token response. Replace /proxy with /attachments for file operations.",
"authentication": "Bearer token in Authorization header", "authentication": "Bearer token in Authorization header",
"attachment_upload": {
"endpoint": "POST /api/v1/attachments",
"content_type": "multipart/form-data",
"permission": "write",
"description": "Upload file attachments to the document. Returns attachment_id for linking to records via update_records.",
"response": {"success": True, "data": {"attachment_id": 42, "filename": "invoice.pdf", "size_bytes": 31395}},
"example_curl": "curl -X POST -H 'Authorization: Bearer TOKEN' -F 'file=@invoice.pdf' URL/api/v1/attachments",
"example_python": """import requests
response = requests.post(
f'{proxy_url.replace("/proxy", "/attachments")}',
headers={'Authorization': f'Bearer {token}'},
files={'file': open('invoice.pdf', 'rb')}
)
attachment_id = response.json()['data']['attachment_id']
# Link to record: update_records with {'Attachment': [attachment_id]}""",
},
"attachment_download": {
"endpoint": "GET /api/v1/attachments/{attachment_id}",
"permission": "read",
"description": "Download attachment by ID. Returns binary content with appropriate Content-Type and Content-Disposition headers.",
"response_headers": ["Content-Type", "Content-Disposition"],
"example_curl": "curl -H 'Authorization: Bearer TOKEN' URL/api/v1/attachments/42 -o file.pdf",
"example_python": """import requests
response = requests.get(
f'{base_url}/api/v1/attachments/42',
headers={'Authorization': f'Bearer {token}'}
)
with open('downloaded.pdf', 'wb') as f:
f.write(response.content)""",
},
"request_format": { "request_format": {
"method": "Operation name (required)", "method": "Operation name (required)",
"table": "Table name (required for most operations)", "table": "Table name (required for most operations)",

View File

@@ -1,7 +1,4 @@
"""Write tools - create, update, delete records, upload attachments.""" """Write tools - create, update, delete records."""
import base64
import mimetypes
from grist_mcp.auth import Agent, Authenticator, Permission from grist_mcp.auth import Agent, Authenticator, Permission
from grist_mcp.grist_client import GristClient from grist_mcp.grist_client import GristClient
@@ -62,50 +59,3 @@ async def delete_records(
await client.delete_records(table, record_ids) await client.delete_records(table, record_ids)
return {"deleted": True} return {"deleted": True}
async def upload_attachment(
agent: Agent,
auth: Authenticator,
document: str,
filename: str,
content_base64: str,
content_type: str | None = None,
client: GristClient | None = None,
) -> dict:
"""Upload a file attachment to a document.
Args:
agent: The authenticated agent.
auth: Authenticator for permission checks.
document: Document name.
filename: Filename with extension.
content_base64: File content as base64-encoded string.
content_type: MIME type (auto-detected from filename if omitted).
client: Optional GristClient instance.
Returns:
Dict with attachment_id, filename, and size_bytes.
Raises:
ValueError: If content_base64 is not valid base64.
"""
auth.authorize(agent, document, Permission.WRITE)
# Decode base64 content
try:
content = base64.b64decode(content_base64)
except Exception:
raise ValueError("Invalid base64 encoding")
# Auto-detect MIME type if not provided
if content_type is None:
content_type, _ = mimetypes.guess_type(filename)
if content_type is None:
content_type = "application/octet-stream"
if client is None:
doc = auth.get_document(document)
client = GristClient(doc)
return await client.upload_attachment(filename, content, content_type)

View File

@@ -236,3 +236,59 @@ async def test_upload_attachment_default_content_type(client, httpx_mock: HTTPXM
assert result["attachment_id"] == 99 assert result["attachment_id"] == 99
assert result["size_bytes"] == 3 assert result["size_bytes"] == 3
@pytest.mark.asyncio
async def test_download_attachment(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/attachments/42/download",
method="GET",
content=b"PDF content here",
headers={
"content-type": "application/pdf",
"content-disposition": 'attachment; filename="invoice.pdf"',
},
)
result = await client.download_attachment(42)
assert result["content"] == b"PDF content here"
assert result["content_type"] == "application/pdf"
assert result["filename"] == "invoice.pdf"
@pytest.mark.asyncio
async def test_download_attachment_no_filename(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/attachments/99/download",
method="GET",
content=b"binary data",
headers={
"content-type": "application/octet-stream",
},
)
result = await client.download_attachment(99)
assert result["content"] == b"binary data"
assert result["content_type"] == "application/octet-stream"
assert result["filename"] is None
@pytest.mark.asyncio
async def test_download_attachment_unquoted_filename(client, httpx_mock: HTTPXMock):
httpx_mock.add_response(
url="https://grist.example.com/api/docs/abc123/attachments/55/download",
method="GET",
content=b"image data",
headers={
"content-type": "image/png",
"content-disposition": "attachment; filename=photo.png",
},
)
result = await client.download_attachment(55)
assert result["content"] == b"image data"
assert result["content_type"] == "image/png"
assert result["filename"] == "photo.png"

View File

@@ -52,14 +52,13 @@ tokens:
assert "add_column" in tool_names assert "add_column" in tool_names
assert "modify_column" in tool_names assert "modify_column" in tool_names
assert "delete_column" in tool_names assert "delete_column" in tool_names
assert "upload_attachment" in tool_names
# Session tools (always registered) # Session tools (always registered)
assert "get_proxy_documentation" in tool_names assert "get_proxy_documentation" in tool_names
assert "request_session_token" in tool_names assert "request_session_token" in tool_names
# Should have all 15 tools # Should have all 14 tools
assert len(result.root.tools) == 15 assert len(result.root.tools) == 14
@pytest.mark.asyncio @pytest.mark.asyncio

View File

@@ -39,12 +39,16 @@ async def test_get_proxy_documentation_returns_complete_spec():
result = await get_proxy_documentation() result = await get_proxy_documentation()
assert "description" in result assert "description" in result
assert "endpoint" in result assert "endpoints" in result
assert result["endpoint"] == "POST /api/v1/proxy" assert "proxy" in result["endpoints"]
assert "attachments_upload" in result["endpoints"]
assert "attachments_download" in result["endpoints"]
assert "authentication" in result assert "authentication" in result
assert "methods" in result assert "methods" in result
assert "add_records" in result["methods"] assert "add_records" in result["methods"]
assert "get_records" in result["methods"] assert "get_records" in result["methods"]
assert "attachment_upload" in result
assert "attachment_download" in result
assert "example_script" in result assert "example_script" in result

View File

@@ -1,9 +1,7 @@
import base64
import pytest import pytest
from unittest.mock import AsyncMock from unittest.mock import AsyncMock
from grist_mcp.tools.write import add_records, update_records, delete_records, upload_attachment from grist_mcp.tools.write import add_records, update_records, delete_records
from grist_mcp.auth import Authenticator, AuthError from grist_mcp.auth import Authenticator, AuthError
from grist_mcp.config import Config, Document, Token, TokenScope from grist_mcp.config import Config, Document, Token, TokenScope
@@ -96,105 +94,3 @@ async def test_delete_records(auth, mock_client):
) )
assert result == {"deleted": True} assert result == {"deleted": True}
# Upload attachment tests
@pytest.fixture
def mock_client_with_attachment():
client = AsyncMock()
client.upload_attachment.return_value = {
"attachment_id": 42,
"filename": "invoice.pdf",
"size_bytes": 1024,
}
return client
@pytest.mark.asyncio
async def test_upload_attachment_success(auth, mock_client_with_attachment):
agent = auth.authenticate("write-token")
content = b"PDF content"
content_base64 = base64.b64encode(content).decode()
result = await upload_attachment(
agent, auth, "budget",
filename="invoice.pdf",
content_base64=content_base64,
client=mock_client_with_attachment,
)
assert result == {
"attachment_id": 42,
"filename": "invoice.pdf",
"size_bytes": 1024,
}
mock_client_with_attachment.upload_attachment.assert_called_once_with(
"invoice.pdf", content, "application/pdf"
)
@pytest.mark.asyncio
async def test_upload_attachment_invalid_base64(auth, mock_client_with_attachment):
agent = auth.authenticate("write-token")
with pytest.raises(ValueError, match="Invalid base64 encoding"):
await upload_attachment(
agent, auth, "budget",
filename="test.txt",
content_base64="not-valid-base64!!!",
client=mock_client_with_attachment,
)
@pytest.mark.asyncio
async def test_upload_attachment_auth_required(auth, mock_client_with_attachment):
agent = auth.authenticate("read-token")
content_base64 = base64.b64encode(b"test").decode()
with pytest.raises(AuthError, match="Permission denied"):
await upload_attachment(
agent, auth, "budget",
filename="test.txt",
content_base64=content_base64,
client=mock_client_with_attachment,
)
@pytest.mark.asyncio
async def test_upload_attachment_mime_detection(auth, mock_client_with_attachment):
agent = auth.authenticate("write-token")
content = b"PNG content"
content_base64 = base64.b64encode(content).decode()
await upload_attachment(
agent, auth, "budget",
filename="image.png",
content_base64=content_base64,
client=mock_client_with_attachment,
)
# Should auto-detect image/png from filename
mock_client_with_attachment.upload_attachment.assert_called_once_with(
"image.png", content, "image/png"
)
@pytest.mark.asyncio
async def test_upload_attachment_explicit_content_type(auth, mock_client_with_attachment):
agent = auth.authenticate("write-token")
content = b"custom content"
content_base64 = base64.b64encode(content).decode()
await upload_attachment(
agent, auth, "budget",
filename="file.dat",
content_base64=content_base64,
content_type="application/custom",
client=mock_client_with_attachment,
)
# Should use explicit content type
mock_client_with_attachment.upload_attachment.assert_called_once_with(
"file.dat", content, "application/custom"
)

2
uv.lock generated
View File

@@ -153,7 +153,7 @@ wheels = [
[[package]] [[package]]
name = "grist-mcp" name = "grist-mcp"
version = "1.2.0" version = "1.3.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "httpx" }, { name = "httpx" },