From 716de523d8933ed64f04e9e89560fc1789b830f6 Mon Sep 17 00:00:00 2001 From: Bill Date: Wed, 3 Dec 2025 14:30:32 -0500 Subject: [PATCH] feat: add authentication and authorization --- src/grist_mcp/auth.py | 72 +++++++++++++++++++++++++++++++++++++ tests/test_auth.py | 84 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 src/grist_mcp/auth.py create mode 100644 tests/test_auth.py diff --git a/src/grist_mcp/auth.py b/src/grist_mcp/auth.py new file mode 100644 index 0000000..92d3741 --- /dev/null +++ b/src/grist_mcp/auth.py @@ -0,0 +1,72 @@ +"""Authentication and authorization.""" + +from dataclasses import dataclass +from enum import Enum + +from grist_mcp.config import Config, Token + + +class Permission(Enum): + """Document permission levels.""" + READ = "read" + WRITE = "write" + SCHEMA = "schema" + + +class AuthError(Exception): + """Authentication or authorization error.""" + pass + + +@dataclass +class Agent: + """An authenticated agent.""" + token: str + name: str + _token_obj: Token + + +class Authenticator: + """Handles token validation and permission checking.""" + + def __init__(self, config: Config): + self._config = config + self._token_map = {t.token: t for t in config.tokens} + + def authenticate(self, token: str) -> Agent: + """Validate token and return Agent object.""" + token_obj = self._token_map.get(token) + if token_obj is None: + raise AuthError("Invalid token") + + return Agent( + token=token, + name=token_obj.name, + _token_obj=token_obj, + ) + + def authorize(self, agent: Agent, document: str, permission: Permission) -> None: + """Check if agent has permission on document. Raises AuthError if not.""" + # Find the scope entry for this document + scope_entry = None + for scope in agent._token_obj.scope: + if scope.document == document: + scope_entry = scope + break + + if scope_entry is None: + raise AuthError("Document not in scope") + + if permission.value not in scope_entry.permissions: + raise AuthError("Permission denied") + + def get_accessible_documents(self, agent: Agent) -> list[dict]: + """Return list of documents agent can access with their permissions.""" + return [ + {"name": scope.document, "permissions": scope.permissions} + for scope in agent._token_obj.scope + ] + + def get_document(self, document_name: str): + """Get document config by name.""" + return self._config.documents.get(document_name) diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..d261806 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,84 @@ +import pytest +from grist_mcp.auth import Authenticator, AuthError, Permission +from grist_mcp.config import Config, Document, Token, TokenScope + + +@pytest.fixture +def sample_config(): + return Config( + documents={ + "budget": Document( + url="https://grist.example.com", + doc_id="abc123", + api_key="doc-api-key", + ), + "expenses": Document( + url="https://grist.example.com", + doc_id="def456", + api_key="doc-api-key", + ), + }, + tokens=[ + Token( + token="valid-token", + name="test-agent", + scope=[ + TokenScope(document="budget", permissions=["read", "write"]), + TokenScope(document="expenses", permissions=["read"]), + ], + ), + ], + ) + + +def test_authenticate_valid_token(sample_config): + auth = Authenticator(sample_config) + agent = auth.authenticate("valid-token") + + assert agent.name == "test-agent" + assert agent.token == "valid-token" + + +def test_authenticate_invalid_token(sample_config): + auth = Authenticator(sample_config) + + with pytest.raises(AuthError, match="Invalid token"): + auth.authenticate("bad-token") + + +def test_authorize_allowed_document_and_permission(sample_config): + auth = Authenticator(sample_config) + agent = auth.authenticate("valid-token") + + # Should not raise + auth.authorize(agent, "budget", Permission.READ) + auth.authorize(agent, "budget", Permission.WRITE) + auth.authorize(agent, "expenses", Permission.READ) + + +def test_authorize_denied_document(sample_config): + auth = Authenticator(sample_config) + agent = auth.authenticate("valid-token") + + with pytest.raises(AuthError, match="Document not in scope"): + auth.authorize(agent, "unknown-doc", Permission.READ) + + +def test_authorize_denied_permission(sample_config): + auth = Authenticator(sample_config) + agent = auth.authenticate("valid-token") + + # expenses only has read permission + with pytest.raises(AuthError, match="Permission denied"): + auth.authorize(agent, "expenses", Permission.WRITE) + + +def test_get_accessible_documents(sample_config): + auth = Authenticator(sample_config) + agent = auth.authenticate("valid-token") + + docs = auth.get_accessible_documents(agent) + + assert len(docs) == 2 + assert {"name": "budget", "permissions": ["read", "write"]} in docs + assert {"name": "expenses", "permissions": ["read"]} in docs