feat: add driving flag to relationship query responses

Add computed driving flag to all relationship queries (list_relationships,
get_predecessors, get_successors). A relationship is marked as driving when
the predecessor's early end date plus lag determines the successor's early
start date.

Changes:
- Add early_start_date and early_end_date columns to activities schema
- Parse early dates from TASK table in XER files
- Implement is_driving_relationship() helper with 24hr tolerance for
  calendar gaps
- Update all relationship queries to compute and return driving flag
- Add contract and unit tests for driving flag functionality
- Update spec, contracts, and documentation
This commit is contained in:
2026-01-07 07:21:58 -05:00
parent 2255b65ef6
commit af8cdc1d31
17 changed files with 654 additions and 324 deletions

View File

@@ -76,10 +76,11 @@ def load_parsed_data(parsed: ParsedXer, project_id: str) -> None:
"""
INSERT INTO activities (
task_id, proj_id, wbs_id, task_code, task_name, task_type,
target_start_date, target_end_date, act_start_date, act_end_date,
target_start_date, target_end_date, early_start_date, early_end_date,
act_start_date, act_end_date,
total_float_hr_cnt, driving_path_flag, status_code
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
task["task_id"],
@@ -90,6 +91,8 @@ def load_parsed_data(parsed: ParsedXer, project_id: str) -> None:
task["task_type"],
task["target_start_date"],
task["target_end_date"],
task.get("early_start_date"),
task.get("early_end_date"),
task["act_start_date"],
task["act_end_date"],
task["total_float_hr_cnt"],

View File

@@ -1,8 +1,54 @@
"""Database query functions for XER data."""
from datetime import datetime, timedelta
from xer_mcp.db import db
def is_driving_relationship(
pred_early_end: str | None,
succ_early_start: str | None,
lag_hours: float,
pred_type: str,
) -> bool:
"""Determine if a relationship is driving the successor's early start.
A relationship is "driving" when the predecessor's completion (plus lag)
determines the successor's early start date. This is computed by comparing
dates with a tolerance for overnight gaps and calendar differences.
Args:
pred_early_end: Predecessor's early end date (ISO format)
succ_early_start: Successor's early start date (ISO format)
lag_hours: Lag duration in hours (can be negative)
pred_type: Relationship type (FS, SS, FF, SF)
Returns:
True if the relationship is driving, False otherwise
"""
if pred_early_end is None or succ_early_start is None:
return False
try:
pred_end = datetime.fromisoformat(pred_early_end)
succ_start = datetime.fromisoformat(succ_early_start)
except (ValueError, TypeError):
return False
# For FS (Finish-to-Start): pred_end + lag should equal succ_start
if pred_type == "FS":
expected_start = pred_end + timedelta(hours=lag_hours)
# Allow tolerance of 24 hours for overnight gaps and calendar differences
diff = abs((succ_start - expected_start).total_seconds())
return diff <= 24 * 3600 # 24 hours tolerance
# For SS (Start-to-Start): would need pred_early_start, not implemented
# For FF (Finish-to-Finish): would need succ_early_end, not implemented
# For SF (Start-to-Finish): complex case, not implemented
# Default to False for non-FS relationships for now
return False
def query_activities(
limit: int = 100,
offset: int = 0,
@@ -161,14 +207,15 @@ def query_relationships(
cur.execute("SELECT COUNT(*) FROM relationships")
total = cur.fetchone()[0]
# Get paginated results with activity names
# Get paginated results with activity names and early dates for driving computation
query = """
SELECT r.task_pred_id, r.task_id, a1.task_name,
r.pred_task_id, a2.task_name,
r.pred_type, r.lag_hr_cnt
SELECT r.task_pred_id, r.task_id, succ.task_name,
r.pred_task_id, pred.task_name,
r.pred_type, r.lag_hr_cnt,
pred.early_end_date, succ.early_start_date
FROM relationships r
LEFT JOIN activities a1 ON r.task_id = a1.task_id
LEFT JOIN activities a2 ON r.pred_task_id = a2.task_id
LEFT JOIN activities succ ON r.task_id = succ.task_id
LEFT JOIN activities pred ON r.pred_task_id = pred.task_id
ORDER BY r.task_pred_id
LIMIT ? OFFSET ?
"""
@@ -183,18 +230,25 @@ def query_relationships(
return pred_type[3:]
return pred_type
relationships = [
{
relationships = []
for row in rows:
pred_type = format_pred_type(row[5])
driving = is_driving_relationship(
pred_early_end=row[7],
succ_early_start=row[8],
lag_hours=row[6] or 0.0,
pred_type=pred_type,
)
relationships.append({
"task_pred_id": row[0],
"task_id": row[1],
"task_name": row[2],
"pred_task_id": row[3],
"pred_task_name": row[4],
"pred_type": format_pred_type(row[5]),
"pred_type": pred_type,
"lag_hr_cnt": row[6],
}
for row in rows
]
"driving": driving,
})
return relationships, total
@@ -206,15 +260,24 @@ def get_predecessors(activity_id: str) -> list[dict]:
activity_id: The task_id to find predecessors for
Returns:
List of predecessor activity dicts with relationship info
List of predecessor activity dicts with relationship info and driving flag
"""
# Get successor's early start for driving calculation
with db.cursor() as cur:
cur.execute(
"SELECT early_start_date FROM activities WHERE task_id = ?",
(activity_id,),
)
succ_row = cur.fetchone()
succ_early_start = succ_row[0] if succ_row else None
query = """
SELECT a.task_id, a.task_code, a.task_name,
r.pred_type, r.lag_hr_cnt
SELECT pred.task_id, pred.task_code, pred.task_name,
r.pred_type, r.lag_hr_cnt, pred.early_end_date
FROM relationships r
JOIN activities a ON r.pred_task_id = a.task_id
JOIN activities pred ON r.pred_task_id = pred.task_id
WHERE r.task_id = ?
ORDER BY a.task_code
ORDER BY pred.task_code
"""
with db.cursor() as cur:
@@ -226,16 +289,25 @@ def get_predecessors(activity_id: str) -> list[dict]:
return pred_type[3:]
return pred_type
return [
{
result = []
for row in rows:
pred_type = format_pred_type(row[3])
driving = is_driving_relationship(
pred_early_end=row[5],
succ_early_start=succ_early_start,
lag_hours=row[4] or 0.0,
pred_type=pred_type,
)
result.append({
"task_id": row[0],
"task_code": row[1],
"task_name": row[2],
"relationship_type": format_pred_type(row[3]),
"relationship_type": pred_type,
"lag_hr_cnt": row[4],
}
for row in rows
]
"driving": driving,
})
return result
def get_successors(activity_id: str) -> list[dict]:
@@ -245,15 +317,24 @@ def get_successors(activity_id: str) -> list[dict]:
activity_id: The task_id to find successors for
Returns:
List of successor activity dicts with relationship info
List of successor activity dicts with relationship info and driving flag
"""
# Get predecessor's early end for driving calculation
with db.cursor() as cur:
cur.execute(
"SELECT early_end_date FROM activities WHERE task_id = ?",
(activity_id,),
)
pred_row = cur.fetchone()
pred_early_end = pred_row[0] if pred_row else None
query = """
SELECT a.task_id, a.task_code, a.task_name,
r.pred_type, r.lag_hr_cnt
SELECT succ.task_id, succ.task_code, succ.task_name,
r.pred_type, r.lag_hr_cnt, succ.early_start_date
FROM relationships r
JOIN activities a ON r.task_id = a.task_id
JOIN activities succ ON r.task_id = succ.task_id
WHERE r.pred_task_id = ?
ORDER BY a.task_code
ORDER BY succ.task_code
"""
with db.cursor() as cur:
@@ -265,16 +346,25 @@ def get_successors(activity_id: str) -> list[dict]:
return pred_type[3:]
return pred_type
return [
{
result = []
for row in rows:
pred_type = format_pred_type(row[3])
driving = is_driving_relationship(
pred_early_end=pred_early_end,
succ_early_start=row[5],
lag_hours=row[4] or 0.0,
pred_type=pred_type,
)
result.append({
"task_id": row[0],
"task_code": row[1],
"task_name": row[2],
"relationship_type": format_pred_type(row[3]),
"relationship_type": pred_type,
"lag_hr_cnt": row[4],
}
for row in rows
]
"driving": driving,
})
return result
def get_project_summary(project_id: str) -> dict | None:

View File

@@ -21,6 +21,8 @@ CREATE TABLE IF NOT EXISTS activities (
task_type TEXT NOT NULL,
target_start_date TEXT,
target_end_date TEXT,
early_start_date TEXT,
early_end_date TEXT,
act_start_date TEXT,
act_end_date TEXT,
total_float_hr_cnt REAL,

View File

@@ -36,6 +36,8 @@ class TaskHandler(TableHandler):
"status_code": data.get("status_code") or None,
"target_start_date": convert_date(data.get("target_start_date")),
"target_end_date": convert_date(data.get("target_end_date")),
"early_start_date": convert_date(data.get("early_start_date")),
"early_end_date": convert_date(data.get("early_end_date")),
"act_start_date": convert_date(data.get("act_start_date")),
"act_end_date": convert_date(data.get("act_end_date")),
"total_float_hr_cnt": total_float,