diff --git a/src/grist_mcp/tools/session.py b/src/grist_mcp/tools/session.py index f1b957e..bd5569f 100644 --- a/src/grist_mcp/tools/session.py +++ b/src/grist_mcp/tools/session.py @@ -1,5 +1,9 @@ """Session token tools for HTTP proxy access.""" +from grist_mcp.auth import Agent, Authenticator, AuthError, Permission +from grist_mcp.session import SessionTokenManager + + PROXY_DOCUMENTATION = { "description": "HTTP proxy API for bulk data operations. Use request_session_token to get a short-lived token, then call the proxy endpoint directly from scripts.", "endpoint": "POST /api/v1/proxy", @@ -104,3 +108,41 @@ print(response.json()) async def get_proxy_documentation() -> dict: """Return complete documentation for the HTTP proxy API.""" return PROXY_DOCUMENTATION + + +async def request_session_token( + agent: Agent, + auth: Authenticator, + token_manager: SessionTokenManager, + document: str, + permissions: list[str], + ttl_seconds: int = 300, +) -> dict: + """Request a short-lived session token for HTTP proxy access. + + The token can only grant permissions the agent already has. + """ + # Verify agent has access to the document + # Check each requested permission + for perm_str in permissions: + try: + perm = Permission(perm_str) + except ValueError: + raise AuthError(f"Invalid permission: {perm_str}") + auth.authorize(agent, document, perm) + + # Create the session token + session = token_manager.create_token( + agent_name=agent.name, + document=document, + permissions=permissions, + ttl_seconds=ttl_seconds, + ) + + return { + "token": session.token, + "document": session.document, + "permissions": session.permissions, + "expires_at": session.expires_at.isoformat(), + "proxy_url": "/api/v1/proxy", + } diff --git a/tests/unit/test_tools_session.py b/tests/unit/test_tools_session.py index d1d3078..25c9731 100644 --- a/tests/unit/test_tools_session.py +++ b/tests/unit/test_tools_session.py @@ -1,5 +1,37 @@ import pytest -from grist_mcp.tools.session import get_proxy_documentation +from grist_mcp.tools.session import get_proxy_documentation, request_session_token +from grist_mcp.auth import Authenticator, Agent, AuthError +from grist_mcp.config import Config, Document, Token, TokenScope +from grist_mcp.session import SessionTokenManager + + +@pytest.fixture +def sample_config(): + return Config( + documents={ + "sales": Document( + url="https://grist.example.com", + doc_id="abc123", + api_key="key", + ), + }, + tokens=[ + Token( + token="agent-token", + name="test-agent", + scope=[ + TokenScope(document="sales", permissions=["read", "write"]), + ], + ), + ], + ) + + +@pytest.fixture +def auth_and_agent(sample_config): + auth = Authenticator(sample_config) + agent = auth.authenticate("agent-token") + return auth, agent @pytest.mark.asyncio @@ -14,3 +46,77 @@ async def test_get_proxy_documentation_returns_complete_spec(): assert "add_records" in result["methods"] assert "get_records" in result["methods"] assert "example_script" in result + + +@pytest.mark.asyncio +async def test_request_session_token_creates_valid_token(auth_and_agent): + auth, agent = auth_and_agent + manager = SessionTokenManager() + + result = await request_session_token( + agent=agent, + auth=auth, + token_manager=manager, + document="sales", + permissions=["read", "write"], + ttl_seconds=300, + ) + + assert "token" in result + assert result["token"].startswith("sess_") + assert result["document"] == "sales" + assert result["permissions"] == ["read", "write"] + assert "expires_at" in result + assert result["proxy_url"] == "/api/v1/proxy" + + +@pytest.mark.asyncio +async def test_request_session_token_rejects_unauthorized_document(sample_config): + auth = Authenticator(sample_config) + agent = auth.authenticate("agent-token") + manager = SessionTokenManager() + + with pytest.raises(AuthError, match="Document not in scope"): + await request_session_token( + agent=agent, + auth=auth, + token_manager=manager, + document="unauthorized_doc", + permissions=["read"], + ttl_seconds=300, + ) + + +@pytest.mark.asyncio +async def test_request_session_token_rejects_unauthorized_permission(sample_config): + auth = Authenticator(sample_config) + agent = auth.authenticate("agent-token") + manager = SessionTokenManager() + + # Agent has read/write on sales, but not schema + with pytest.raises(AuthError, match="Permission denied"): + await request_session_token( + agent=agent, + auth=auth, + token_manager=manager, + document="sales", + permissions=["read", "schema"], # schema not granted + ttl_seconds=300, + ) + + +@pytest.mark.asyncio +async def test_request_session_token_rejects_invalid_permission(sample_config): + auth = Authenticator(sample_config) + agent = auth.authenticate("agent-token") + manager = SessionTokenManager() + + with pytest.raises(AuthError, match="Invalid permission"): + await request_session_token( + agent=agent, + auth=auth, + token_manager=manager, + document="sales", + permissions=["read", "invalid_perm"], + ttl_seconds=300, + )