feat(tools): add request_session_token tool
Add MCP tool for agents to request short-lived session tokens for HTTP proxy access. The tool validates that agents can only request permissions they already have (no privilege escalation). - Validates document access and each requested permission - Creates session token via SessionTokenManager - Returns token metadata including proxy URL and expiration - Includes tests for success case and permission denial scenarios
This commit is contained in:
@@ -1,5 +1,9 @@
|
||||
"""Session token tools for HTTP proxy access."""
|
||||
|
||||
from grist_mcp.auth import Agent, Authenticator, AuthError, Permission
|
||||
from grist_mcp.session import SessionTokenManager
|
||||
|
||||
|
||||
PROXY_DOCUMENTATION = {
|
||||
"description": "HTTP proxy API for bulk data operations. Use request_session_token to get a short-lived token, then call the proxy endpoint directly from scripts.",
|
||||
"endpoint": "POST /api/v1/proxy",
|
||||
@@ -104,3 +108,41 @@ print(response.json())
|
||||
async def get_proxy_documentation() -> dict:
|
||||
"""Return complete documentation for the HTTP proxy API."""
|
||||
return PROXY_DOCUMENTATION
|
||||
|
||||
|
||||
async def request_session_token(
|
||||
agent: Agent,
|
||||
auth: Authenticator,
|
||||
token_manager: SessionTokenManager,
|
||||
document: str,
|
||||
permissions: list[str],
|
||||
ttl_seconds: int = 300,
|
||||
) -> dict:
|
||||
"""Request a short-lived session token for HTTP proxy access.
|
||||
|
||||
The token can only grant permissions the agent already has.
|
||||
"""
|
||||
# Verify agent has access to the document
|
||||
# Check each requested permission
|
||||
for perm_str in permissions:
|
||||
try:
|
||||
perm = Permission(perm_str)
|
||||
except ValueError:
|
||||
raise AuthError(f"Invalid permission: {perm_str}")
|
||||
auth.authorize(agent, document, perm)
|
||||
|
||||
# Create the session token
|
||||
session = token_manager.create_token(
|
||||
agent_name=agent.name,
|
||||
document=document,
|
||||
permissions=permissions,
|
||||
ttl_seconds=ttl_seconds,
|
||||
)
|
||||
|
||||
return {
|
||||
"token": session.token,
|
||||
"document": session.document,
|
||||
"permissions": session.permissions,
|
||||
"expires_at": session.expires_at.isoformat(),
|
||||
"proxy_url": "/api/v1/proxy",
|
||||
}
|
||||
|
||||
@@ -1,5 +1,37 @@
|
||||
import pytest
|
||||
from grist_mcp.tools.session import get_proxy_documentation
|
||||
from grist_mcp.tools.session import get_proxy_documentation, request_session_token
|
||||
from grist_mcp.auth import Authenticator, Agent, AuthError
|
||||
from grist_mcp.config import Config, Document, Token, TokenScope
|
||||
from grist_mcp.session import SessionTokenManager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_config():
|
||||
return Config(
|
||||
documents={
|
||||
"sales": Document(
|
||||
url="https://grist.example.com",
|
||||
doc_id="abc123",
|
||||
api_key="key",
|
||||
),
|
||||
},
|
||||
tokens=[
|
||||
Token(
|
||||
token="agent-token",
|
||||
name="test-agent",
|
||||
scope=[
|
||||
TokenScope(document="sales", permissions=["read", "write"]),
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def auth_and_agent(sample_config):
|
||||
auth = Authenticator(sample_config)
|
||||
agent = auth.authenticate("agent-token")
|
||||
return auth, agent
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -14,3 +46,77 @@ async def test_get_proxy_documentation_returns_complete_spec():
|
||||
assert "add_records" in result["methods"]
|
||||
assert "get_records" in result["methods"]
|
||||
assert "example_script" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_session_token_creates_valid_token(auth_and_agent):
|
||||
auth, agent = auth_and_agent
|
||||
manager = SessionTokenManager()
|
||||
|
||||
result = await request_session_token(
|
||||
agent=agent,
|
||||
auth=auth,
|
||||
token_manager=manager,
|
||||
document="sales",
|
||||
permissions=["read", "write"],
|
||||
ttl_seconds=300,
|
||||
)
|
||||
|
||||
assert "token" in result
|
||||
assert result["token"].startswith("sess_")
|
||||
assert result["document"] == "sales"
|
||||
assert result["permissions"] == ["read", "write"]
|
||||
assert "expires_at" in result
|
||||
assert result["proxy_url"] == "/api/v1/proxy"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_session_token_rejects_unauthorized_document(sample_config):
|
||||
auth = Authenticator(sample_config)
|
||||
agent = auth.authenticate("agent-token")
|
||||
manager = SessionTokenManager()
|
||||
|
||||
with pytest.raises(AuthError, match="Document not in scope"):
|
||||
await request_session_token(
|
||||
agent=agent,
|
||||
auth=auth,
|
||||
token_manager=manager,
|
||||
document="unauthorized_doc",
|
||||
permissions=["read"],
|
||||
ttl_seconds=300,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_session_token_rejects_unauthorized_permission(sample_config):
|
||||
auth = Authenticator(sample_config)
|
||||
agent = auth.authenticate("agent-token")
|
||||
manager = SessionTokenManager()
|
||||
|
||||
# Agent has read/write on sales, but not schema
|
||||
with pytest.raises(AuthError, match="Permission denied"):
|
||||
await request_session_token(
|
||||
agent=agent,
|
||||
auth=auth,
|
||||
token_manager=manager,
|
||||
document="sales",
|
||||
permissions=["read", "schema"], # schema not granted
|
||||
ttl_seconds=300,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_session_token_rejects_invalid_permission(sample_config):
|
||||
auth = Authenticator(sample_config)
|
||||
agent = auth.authenticate("agent-token")
|
||||
manager = SessionTokenManager()
|
||||
|
||||
with pytest.raises(AuthError, match="Invalid permission"):
|
||||
await request_session_token(
|
||||
agent=agent,
|
||||
auth=auth,
|
||||
token_manager=manager,
|
||||
document="sales",
|
||||
permissions=["read", "invalid_perm"],
|
||||
ttl_seconds=300,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user