feat: add authentication and authorization
This commit is contained in:
72
src/grist_mcp/auth.py
Normal file
72
src/grist_mcp/auth.py
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
"""Authentication and authorization."""
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
from grist_mcp.config import Config, Token
|
||||||
|
|
||||||
|
|
||||||
|
class Permission(Enum):
|
||||||
|
"""Document permission levels."""
|
||||||
|
READ = "read"
|
||||||
|
WRITE = "write"
|
||||||
|
SCHEMA = "schema"
|
||||||
|
|
||||||
|
|
||||||
|
class AuthError(Exception):
|
||||||
|
"""Authentication or authorization error."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Agent:
|
||||||
|
"""An authenticated agent."""
|
||||||
|
token: str
|
||||||
|
name: str
|
||||||
|
_token_obj: Token
|
||||||
|
|
||||||
|
|
||||||
|
class Authenticator:
|
||||||
|
"""Handles token validation and permission checking."""
|
||||||
|
|
||||||
|
def __init__(self, config: Config):
|
||||||
|
self._config = config
|
||||||
|
self._token_map = {t.token: t for t in config.tokens}
|
||||||
|
|
||||||
|
def authenticate(self, token: str) -> Agent:
|
||||||
|
"""Validate token and return Agent object."""
|
||||||
|
token_obj = self._token_map.get(token)
|
||||||
|
if token_obj is None:
|
||||||
|
raise AuthError("Invalid token")
|
||||||
|
|
||||||
|
return Agent(
|
||||||
|
token=token,
|
||||||
|
name=token_obj.name,
|
||||||
|
_token_obj=token_obj,
|
||||||
|
)
|
||||||
|
|
||||||
|
def authorize(self, agent: Agent, document: str, permission: Permission) -> None:
|
||||||
|
"""Check if agent has permission on document. Raises AuthError if not."""
|
||||||
|
# Find the scope entry for this document
|
||||||
|
scope_entry = None
|
||||||
|
for scope in agent._token_obj.scope:
|
||||||
|
if scope.document == document:
|
||||||
|
scope_entry = scope
|
||||||
|
break
|
||||||
|
|
||||||
|
if scope_entry is None:
|
||||||
|
raise AuthError("Document not in scope")
|
||||||
|
|
||||||
|
if permission.value not in scope_entry.permissions:
|
||||||
|
raise AuthError("Permission denied")
|
||||||
|
|
||||||
|
def get_accessible_documents(self, agent: Agent) -> list[dict]:
|
||||||
|
"""Return list of documents agent can access with their permissions."""
|
||||||
|
return [
|
||||||
|
{"name": scope.document, "permissions": scope.permissions}
|
||||||
|
for scope in agent._token_obj.scope
|
||||||
|
]
|
||||||
|
|
||||||
|
def get_document(self, document_name: str):
|
||||||
|
"""Get document config by name."""
|
||||||
|
return self._config.documents.get(document_name)
|
||||||
84
tests/test_auth.py
Normal file
84
tests/test_auth.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import pytest
|
||||||
|
from grist_mcp.auth import Authenticator, AuthError, Permission
|
||||||
|
from grist_mcp.config import Config, Document, Token, TokenScope
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_config():
|
||||||
|
return Config(
|
||||||
|
documents={
|
||||||
|
"budget": Document(
|
||||||
|
url="https://grist.example.com",
|
||||||
|
doc_id="abc123",
|
||||||
|
api_key="doc-api-key",
|
||||||
|
),
|
||||||
|
"expenses": Document(
|
||||||
|
url="https://grist.example.com",
|
||||||
|
doc_id="def456",
|
||||||
|
api_key="doc-api-key",
|
||||||
|
),
|
||||||
|
},
|
||||||
|
tokens=[
|
||||||
|
Token(
|
||||||
|
token="valid-token",
|
||||||
|
name="test-agent",
|
||||||
|
scope=[
|
||||||
|
TokenScope(document="budget", permissions=["read", "write"]),
|
||||||
|
TokenScope(document="expenses", permissions=["read"]),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_valid_token(sample_config):
|
||||||
|
auth = Authenticator(sample_config)
|
||||||
|
agent = auth.authenticate("valid-token")
|
||||||
|
|
||||||
|
assert agent.name == "test-agent"
|
||||||
|
assert agent.token == "valid-token"
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_invalid_token(sample_config):
|
||||||
|
auth = Authenticator(sample_config)
|
||||||
|
|
||||||
|
with pytest.raises(AuthError, match="Invalid token"):
|
||||||
|
auth.authenticate("bad-token")
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorize_allowed_document_and_permission(sample_config):
|
||||||
|
auth = Authenticator(sample_config)
|
||||||
|
agent = auth.authenticate("valid-token")
|
||||||
|
|
||||||
|
# Should not raise
|
||||||
|
auth.authorize(agent, "budget", Permission.READ)
|
||||||
|
auth.authorize(agent, "budget", Permission.WRITE)
|
||||||
|
auth.authorize(agent, "expenses", Permission.READ)
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorize_denied_document(sample_config):
|
||||||
|
auth = Authenticator(sample_config)
|
||||||
|
agent = auth.authenticate("valid-token")
|
||||||
|
|
||||||
|
with pytest.raises(AuthError, match="Document not in scope"):
|
||||||
|
auth.authorize(agent, "unknown-doc", Permission.READ)
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorize_denied_permission(sample_config):
|
||||||
|
auth = Authenticator(sample_config)
|
||||||
|
agent = auth.authenticate("valid-token")
|
||||||
|
|
||||||
|
# expenses only has read permission
|
||||||
|
with pytest.raises(AuthError, match="Permission denied"):
|
||||||
|
auth.authorize(agent, "expenses", Permission.WRITE)
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_accessible_documents(sample_config):
|
||||||
|
auth = Authenticator(sample_config)
|
||||||
|
agent = auth.authenticate("valid-token")
|
||||||
|
|
||||||
|
docs = auth.get_accessible_documents(agent)
|
||||||
|
|
||||||
|
assert len(docs) == 2
|
||||||
|
assert {"name": "budget", "permissions": ["read", "write"]} in docs
|
||||||
|
assert {"name": "expenses", "permissions": ["read"]} in docs
|
||||||
Reference in New Issue
Block a user