Implement complete MCP server for parsing Primavera P6 XER files and exposing schedule data through MCP tools. All 4 user stories complete. Tools implemented: - load_xer: Parse XER files into SQLite database - list_activities: Query activities with pagination and filtering - get_activity: Get activity details by ID - list_relationships: Query activity dependencies - get_predecessors/get_successors: Query activity relationships - get_project_summary: Project overview with counts - list_milestones: Query milestone activities - get_critical_path: Query driving path activities Features: - Tab-delimited XER format parsing with pluggable table handlers - In-memory SQLite database for fast queries - Pagination with 100-item default limit - Multi-project file support with project selection - ISO8601 date formatting - NO_FILE_LOADED error handling for all query tools Test coverage: 81 tests (contract, integration, unit)
128 lines
4.0 KiB
Python
128 lines
4.0 KiB
Python
"""XER file parser."""
|
|
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
from xer_mcp.errors import FileNotFoundError, ParseError
|
|
from xer_mcp.parser.table_handlers import TABLE_HANDLERS
|
|
|
|
|
|
@dataclass
|
|
class ParsedXer:
|
|
"""Container for parsed XER data."""
|
|
|
|
projects: list[dict] = field(default_factory=list)
|
|
tasks: list[dict] = field(default_factory=list)
|
|
taskpreds: list[dict] = field(default_factory=list)
|
|
projwbs: list[dict] = field(default_factory=list)
|
|
calendars: list[dict] = field(default_factory=list)
|
|
|
|
|
|
class XerParser:
|
|
"""Parser for Primavera P6 XER files.
|
|
|
|
XER files are tab-delimited with the following structure:
|
|
- ERMHDR line: header with version info
|
|
- %T lines: table name declarations
|
|
- %F lines: field (column) names
|
|
- %R lines: data rows
|
|
"""
|
|
|
|
def parse(self, file_path: Path | str) -> ParsedXer:
|
|
"""Parse an XER file and return structured data.
|
|
|
|
Args:
|
|
file_path: Path to the XER file
|
|
|
|
Returns:
|
|
ParsedXer containing all parsed tables
|
|
|
|
Raises:
|
|
FileNotFoundError: If file doesn't exist
|
|
ParseError: If file is invalid or cannot be parsed
|
|
"""
|
|
path = Path(file_path)
|
|
if not path.exists():
|
|
raise FileNotFoundError(str(path))
|
|
|
|
try:
|
|
content = path.read_text(encoding="utf-8", errors="replace")
|
|
except OSError as e:
|
|
raise ParseError(f"Cannot read file: {e}") from e
|
|
|
|
return self._parse_content(content)
|
|
|
|
def _parse_content(self, content: str) -> ParsedXer:
|
|
"""Parse XER content string."""
|
|
lines = content.split("\n")
|
|
if not lines:
|
|
raise ParseError("Empty file")
|
|
|
|
# Check for ERMHDR line
|
|
first_line = lines[0].strip()
|
|
if not first_line.startswith("ERMHDR"):
|
|
raise ParseError("Invalid XER file: missing ERMHDR header")
|
|
|
|
result = ParsedXer()
|
|
current_table: str | None = None
|
|
current_fields: list[str] = []
|
|
|
|
for line in lines[1:]:
|
|
line = line.rstrip("\r\n")
|
|
if not line:
|
|
continue
|
|
|
|
parts = line.split("\t")
|
|
if not parts:
|
|
continue
|
|
|
|
marker = parts[0]
|
|
|
|
if marker == "%T":
|
|
# Table declaration
|
|
if len(parts) < 2:
|
|
continue
|
|
current_table = parts[1]
|
|
current_fields = []
|
|
|
|
elif marker == "%F":
|
|
# Field names
|
|
current_fields = parts[1:]
|
|
|
|
elif marker == "%R":
|
|
# Data row
|
|
if current_table and current_fields:
|
|
values = parts[1:]
|
|
row_data = self._parse_row(current_table, current_fields, values)
|
|
if row_data:
|
|
self._add_to_result(result, current_table, row_data)
|
|
|
|
# Validate we got at least some data
|
|
if not result.projects:
|
|
raise ParseError("No PROJECT data found in XER file")
|
|
|
|
return result
|
|
|
|
def _parse_row(self, table_name: str, fields: list[str], values: list[str]) -> dict | None:
|
|
"""Parse a single data row using the appropriate handler."""
|
|
handler_class = TABLE_HANDLERS.get(table_name)
|
|
if handler_class is None:
|
|
# Unknown table, skip
|
|
return None
|
|
|
|
handler = handler_class()
|
|
return handler.parse_row(fields, values)
|
|
|
|
def _add_to_result(self, result: ParsedXer, table_name: str, row_data: dict) -> None:
|
|
"""Add parsed row to the appropriate result list."""
|
|
if table_name == "PROJECT":
|
|
result.projects.append(row_data)
|
|
elif table_name == "TASK":
|
|
result.tasks.append(row_data)
|
|
elif table_name == "TASKPRED":
|
|
result.taskpreds.append(row_data)
|
|
elif table_name == "PROJWBS":
|
|
result.projwbs.append(row_data)
|
|
elif table_name == "CALENDAR":
|
|
result.calendars.append(row_data)
|