feat: add skip status tracking for job orchestration

Implement skip status tracking to fix jobs hanging when dates are
filtered out. Jobs now properly complete when all model-days reach
terminal states (completed/failed/skipped).

Changes:
- database.py: Add 'skipped' status to job_details CHECK constraint
- job_manager.py: Update completion logic to count skipped as done
- job_manager.py: Add skipped count to progress tracking
- simulation_worker.py: Implement skip tracking with per-model granularity
- simulation_worker.py: Add _filter_completed_dates_with_tracking()
- simulation_worker.py: Add _mark_skipped_dates()
- simulation_worker.py: Update _prepare_data() to use skip tracking
- simulation_worker.py: Improve warning messages to distinguish skip types

Skip reasons:
- "Already completed" - Position data exists from previous job
- "Incomplete price data" - Missing prices (weekends/holidays/future)

The implementation correctly handles multi-model scenarios where different
models have different completion states for the same date.
This commit is contained in:
2025-11-02 09:35:58 -05:00
parent aa4958bd9c
commit 1f41e9d7ca
3 changed files with 112 additions and 14 deletions

View File

@@ -105,7 +105,7 @@ def initialize_database(db_path: str = "data/jobs.db") -> None:
job_id TEXT NOT NULL,
date TEXT NOT NULL,
model TEXT NOT NULL,
status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed')),
status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed', 'skipped')),
started_at TEXT,
completed_at TEXT,
duration_seconds REAL,

View File

@@ -420,14 +420,16 @@ class JobManager:
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(CASE WHEN status = 'skipped' THEN 1 ELSE 0 END) as skipped
FROM job_details
WHERE job_id = ?
""", (job_id,))
total, completed, failed = cursor.fetchone()
total, completed, failed, skipped = cursor.fetchone()
if completed + failed == total:
# Job is done when all details are in terminal states
if completed + failed + skipped == total:
# All done - determine final status
if failed == 0:
final_status = "completed"
@@ -519,12 +521,14 @@ class JobManager:
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'skipped' THEN 1 ELSE 0 END) as skipped
FROM job_details
WHERE job_id = ?
""", (job_id,))
total, completed, failed = cursor.fetchone()
total, completed, failed, pending, skipped = cursor.fetchone()
# Get currently running model-day
cursor.execute("""
@@ -559,6 +563,8 @@ class JobManager:
"total_model_days": total,
"completed": completed or 0,
"failed": failed or 0,
"pending": pending or 0,
"skipped": skipped or 0,
"current": current,
"details": details
}

View File

@@ -296,6 +296,80 @@ class SimulationWorker:
return dates_to_process
def _filter_completed_dates_with_tracking(
self,
available_dates: List[str],
models: List[str]
) -> tuple:
"""
Filter already-completed dates per model with skip tracking.
Args:
available_dates: Dates with complete price data
models: Model signatures
Returns:
Tuple of (dates_to_process, completion_skips)
- dates_to_process: Union of all dates needed by any model
- completion_skips: {model: {dates_to_skip_for_this_model}}
"""
if not available_dates:
return [], {}
# Get completed dates from job_details history
start_date = available_dates[0]
end_date = available_dates[-1]
completed_dates = self.job_manager.get_completed_model_dates(
models, start_date, end_date
)
completion_skips = {}
dates_needed_by_any_model = set()
for model in models:
model_completed = set(completed_dates.get(model, []))
model_skips = set(available_dates) & model_completed
completion_skips[model] = model_skips
# Track dates this model still needs
dates_needed_by_any_model.update(
set(available_dates) - model_skips
)
return sorted(list(dates_needed_by_any_model)), completion_skips
def _mark_skipped_dates(
self,
price_skips: Set[str],
completion_skips: Dict[str, Set[str]],
models: List[str]
) -> None:
"""
Update job_details status for all skipped dates.
Args:
price_skips: Dates without complete price data (affects all models)
completion_skips: {model: {dates}} already completed per model
models: All model signatures in job
"""
# Price skips affect ALL models equally
for date in price_skips:
for model in models:
self.job_manager.update_job_detail_status(
self.job_id, date, model,
"skipped",
error="Incomplete price data"
)
# Completion skips are per-model
for model, skipped_dates in completion_skips.items():
for date in skipped_dates:
self.job_manager.update_job_detail_status(
self.job_id, date, model,
"skipped",
error="Already completed"
)
def _add_job_warnings(self, warnings: List[str]) -> None:
"""Store warnings in job metadata."""
self.job_manager.add_job_warnings(self.job_id, warnings)
@@ -351,20 +425,38 @@ class SimulationWorker:
# Get available dates after download
available_dates = price_manager.get_available_trading_dates(start_date, end_date)
# Warn about skipped dates
skipped = set(requested_dates) - set(available_dates)
if skipped:
warnings.append(f"Skipped {len(skipped)} dates due to incomplete price data: {sorted(list(skipped))}")
# Step 1: Track dates skipped due to incomplete price data
price_skips = set(requested_dates) - set(available_dates)
# Step 2: Filter already-completed model-days and track skips per model
dates_to_process, completion_skips = self._filter_completed_dates_with_tracking(
available_dates, models
)
# Step 3: Update job_details status for all skipped dates
self._mark_skipped_dates(price_skips, completion_skips, models)
# Step 4: Build warnings
if price_skips:
warnings.append(
f"Skipped {len(price_skips)} dates due to incomplete price data: "
f"{sorted(list(price_skips))}"
)
logger.warning(f"Job {self.job_id}: {warnings[-1]}")
# Filter already-completed model-days (idempotent behavior)
available_dates = self._filter_completed_dates(available_dates, models)
# Count total completion skips across all models
total_completion_skips = sum(len(dates) for dates in completion_skips.values())
if total_completion_skips > 0:
warnings.append(
f"Skipped {total_completion_skips} model-days already completed"
)
logger.warning(f"Job {self.job_id}: {warnings[-1]}")
# Update to running
self.job_manager.update_job_status(self.job_id, "running")
logger.info(f"Job {self.job_id}: Starting execution - {len(available_dates)} dates, {len(models)} models")
logger.info(f"Job {self.job_id}: Starting execution - {len(dates_to_process)} dates, {len(models)} models")
return available_dates, warnings
return dates_to_process, warnings
def get_job_info(self) -> Dict[str, Any]:
"""