feat: implement XER MCP Server with 9 schedule query tools
Implement complete MCP server for parsing Primavera P6 XER files and exposing schedule data through MCP tools. All 4 user stories complete. Tools implemented: - load_xer: Parse XER files into SQLite database - list_activities: Query activities with pagination and filtering - get_activity: Get activity details by ID - list_relationships: Query activity dependencies - get_predecessors/get_successors: Query activity relationships - get_project_summary: Project overview with counts - list_milestones: Query milestone activities - get_critical_path: Query driving path activities Features: - Tab-delimited XER format parsing with pluggable table handlers - In-memory SQLite database for fast queries - Pagination with 100-item default limit - Multi-project file support with project selection - ISO8601 date formatting - NO_FILE_LOADED error handling for all query tools Test coverage: 81 tests (contract, integration, unit)
This commit is contained in:
127
src/xer_mcp/parser/xer_parser.py
Normal file
127
src/xer_mcp/parser/xer_parser.py
Normal file
@@ -0,0 +1,127 @@
|
||||
"""XER file parser."""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
from xer_mcp.errors import FileNotFoundError, ParseError
|
||||
from xer_mcp.parser.table_handlers import TABLE_HANDLERS
|
||||
|
||||
|
||||
@dataclass
|
||||
class ParsedXer:
|
||||
"""Container for parsed XER data."""
|
||||
|
||||
projects: list[dict] = field(default_factory=list)
|
||||
tasks: list[dict] = field(default_factory=list)
|
||||
taskpreds: list[dict] = field(default_factory=list)
|
||||
projwbs: list[dict] = field(default_factory=list)
|
||||
calendars: list[dict] = field(default_factory=list)
|
||||
|
||||
|
||||
class XerParser:
|
||||
"""Parser for Primavera P6 XER files.
|
||||
|
||||
XER files are tab-delimited with the following structure:
|
||||
- ERMHDR line: header with version info
|
||||
- %T lines: table name declarations
|
||||
- %F lines: field (column) names
|
||||
- %R lines: data rows
|
||||
"""
|
||||
|
||||
def parse(self, file_path: Path | str) -> ParsedXer:
|
||||
"""Parse an XER file and return structured data.
|
||||
|
||||
Args:
|
||||
file_path: Path to the XER file
|
||||
|
||||
Returns:
|
||||
ParsedXer containing all parsed tables
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
ParseError: If file is invalid or cannot be parsed
|
||||
"""
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(str(path))
|
||||
|
||||
try:
|
||||
content = path.read_text(encoding="utf-8", errors="replace")
|
||||
except OSError as e:
|
||||
raise ParseError(f"Cannot read file: {e}") from e
|
||||
|
||||
return self._parse_content(content)
|
||||
|
||||
def _parse_content(self, content: str) -> ParsedXer:
|
||||
"""Parse XER content string."""
|
||||
lines = content.split("\n")
|
||||
if not lines:
|
||||
raise ParseError("Empty file")
|
||||
|
||||
# Check for ERMHDR line
|
||||
first_line = lines[0].strip()
|
||||
if not first_line.startswith("ERMHDR"):
|
||||
raise ParseError("Invalid XER file: missing ERMHDR header")
|
||||
|
||||
result = ParsedXer()
|
||||
current_table: str | None = None
|
||||
current_fields: list[str] = []
|
||||
|
||||
for line in lines[1:]:
|
||||
line = line.rstrip("\r\n")
|
||||
if not line:
|
||||
continue
|
||||
|
||||
parts = line.split("\t")
|
||||
if not parts:
|
||||
continue
|
||||
|
||||
marker = parts[0]
|
||||
|
||||
if marker == "%T":
|
||||
# Table declaration
|
||||
if len(parts) < 2:
|
||||
continue
|
||||
current_table = parts[1]
|
||||
current_fields = []
|
||||
|
||||
elif marker == "%F":
|
||||
# Field names
|
||||
current_fields = parts[1:]
|
||||
|
||||
elif marker == "%R":
|
||||
# Data row
|
||||
if current_table and current_fields:
|
||||
values = parts[1:]
|
||||
row_data = self._parse_row(current_table, current_fields, values)
|
||||
if row_data:
|
||||
self._add_to_result(result, current_table, row_data)
|
||||
|
||||
# Validate we got at least some data
|
||||
if not result.projects:
|
||||
raise ParseError("No PROJECT data found in XER file")
|
||||
|
||||
return result
|
||||
|
||||
def _parse_row(self, table_name: str, fields: list[str], values: list[str]) -> dict | None:
|
||||
"""Parse a single data row using the appropriate handler."""
|
||||
handler_class = TABLE_HANDLERS.get(table_name)
|
||||
if handler_class is None:
|
||||
# Unknown table, skip
|
||||
return None
|
||||
|
||||
handler = handler_class()
|
||||
return handler.parse_row(fields, values)
|
||||
|
||||
def _add_to_result(self, result: ParsedXer, table_name: str, row_data: dict) -> None:
|
||||
"""Add parsed row to the appropriate result list."""
|
||||
if table_name == "PROJECT":
|
||||
result.projects.append(row_data)
|
||||
elif table_name == "TASK":
|
||||
result.tasks.append(row_data)
|
||||
elif table_name == "TASKPRED":
|
||||
result.taskpreds.append(row_data)
|
||||
elif table_name == "PROJWBS":
|
||||
result.projwbs.append(row_data)
|
||||
elif table_name == "CALENDAR":
|
||||
result.calendars.append(row_data)
|
||||
Reference in New Issue
Block a user