Merge feature/job-skip-status: Add skip status tracking for jobs

This merge brings comprehensive skip status tracking to the job orchestration system:

Features:
- Single 'skipped' status in job_details with granular error messages
- Per-model skip tracking (different models can skip different dates)
- Job completion when all dates are in terminal states (completed/failed/skipped)
- Progress tracking includes skip counts
- Warning messages distinguish between skip reasons:
  - "Incomplete price data" (weekends/holidays without data)
  - "Already completed" (idempotent re-runs)

Implementation:
- Modified database schema to accept 'skipped' status
- Updated JobManager completion logic to count skipped dates
- Enhanced SimulationWorker to track and mark skipped dates
- Added comprehensive test suite (11 tests, all passing)

Bug fixes:
- Fixed update_job_detail_status to handle 'skipped' as terminal state

This resolves the issues where jobs would hang at "running" status when
all remaining dates were filtered out due to incomplete data or prior completion.

Commits merged:
- feat: add skip status tracking for job orchestration
- fix: handle 'skipped' status in job_detail_status updates
This commit is contained in:
2025-11-02 10:03:40 -05:00
4 changed files with 462 additions and 15 deletions

View File

@@ -105,7 +105,7 @@ def initialize_database(db_path: str = "data/jobs.db") -> None:
job_id TEXT NOT NULL,
date TEXT NOT NULL,
model TEXT NOT NULL,
status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed')),
status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed', 'skipped')),
started_at TEXT,
completed_at TEXT,
duration_seconds REAL,

View File

@@ -394,7 +394,7 @@ class JobManager:
WHERE job_id = ? AND status = 'pending'
""", (updated_at, updated_at, job_id))
elif status in ("completed", "failed"):
elif status in ("completed", "failed", "skipped"):
# Calculate duration for detail
cursor.execute("""
SELECT started_at FROM job_details
@@ -420,14 +420,16 @@ class JobManager:
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(CASE WHEN status = 'skipped' THEN 1 ELSE 0 END) as skipped
FROM job_details
WHERE job_id = ?
""", (job_id,))
total, completed, failed = cursor.fetchone()
total, completed, failed, skipped = cursor.fetchone()
if completed + failed == total:
# Job is done when all details are in terminal states
if completed + failed + skipped == total:
# All done - determine final status
if failed == 0:
final_status = "completed"
@@ -519,12 +521,14 @@ class JobManager:
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'skipped' THEN 1 ELSE 0 END) as skipped
FROM job_details
WHERE job_id = ?
""", (job_id,))
total, completed, failed = cursor.fetchone()
total, completed, failed, pending, skipped = cursor.fetchone()
# Get currently running model-day
cursor.execute("""
@@ -559,6 +563,8 @@ class JobManager:
"total_model_days": total,
"completed": completed or 0,
"failed": failed or 0,
"pending": pending or 0,
"skipped": skipped or 0,
"current": current,
"details": details
}

View File

@@ -296,6 +296,80 @@ class SimulationWorker:
return dates_to_process
def _filter_completed_dates_with_tracking(
self,
available_dates: List[str],
models: List[str]
) -> tuple:
"""
Filter already-completed dates per model with skip tracking.
Args:
available_dates: Dates with complete price data
models: Model signatures
Returns:
Tuple of (dates_to_process, completion_skips)
- dates_to_process: Union of all dates needed by any model
- completion_skips: {model: {dates_to_skip_for_this_model}}
"""
if not available_dates:
return [], {}
# Get completed dates from job_details history
start_date = available_dates[0]
end_date = available_dates[-1]
completed_dates = self.job_manager.get_completed_model_dates(
models, start_date, end_date
)
completion_skips = {}
dates_needed_by_any_model = set()
for model in models:
model_completed = set(completed_dates.get(model, []))
model_skips = set(available_dates) & model_completed
completion_skips[model] = model_skips
# Track dates this model still needs
dates_needed_by_any_model.update(
set(available_dates) - model_skips
)
return sorted(list(dates_needed_by_any_model)), completion_skips
def _mark_skipped_dates(
self,
price_skips: Set[str],
completion_skips: Dict[str, Set[str]],
models: List[str]
) -> None:
"""
Update job_details status for all skipped dates.
Args:
price_skips: Dates without complete price data (affects all models)
completion_skips: {model: {dates}} already completed per model
models: All model signatures in job
"""
# Price skips affect ALL models equally
for date in price_skips:
for model in models:
self.job_manager.update_job_detail_status(
self.job_id, date, model,
"skipped",
error="Incomplete price data"
)
# Completion skips are per-model
for model, skipped_dates in completion_skips.items():
for date in skipped_dates:
self.job_manager.update_job_detail_status(
self.job_id, date, model,
"skipped",
error="Already completed"
)
def _add_job_warnings(self, warnings: List[str]) -> None:
"""Store warnings in job metadata."""
self.job_manager.add_job_warnings(self.job_id, warnings)
@@ -351,20 +425,38 @@ class SimulationWorker:
# Get available dates after download
available_dates = price_manager.get_available_trading_dates(start_date, end_date)
# Warn about skipped dates
skipped = set(requested_dates) - set(available_dates)
if skipped:
warnings.append(f"Skipped {len(skipped)} dates due to incomplete price data: {sorted(list(skipped))}")
# Step 1: Track dates skipped due to incomplete price data
price_skips = set(requested_dates) - set(available_dates)
# Step 2: Filter already-completed model-days and track skips per model
dates_to_process, completion_skips = self._filter_completed_dates_with_tracking(
available_dates, models
)
# Step 3: Update job_details status for all skipped dates
self._mark_skipped_dates(price_skips, completion_skips, models)
# Step 4: Build warnings
if price_skips:
warnings.append(
f"Skipped {len(price_skips)} dates due to incomplete price data: "
f"{sorted(list(price_skips))}"
)
logger.warning(f"Job {self.job_id}: {warnings[-1]}")
# Filter already-completed model-days (idempotent behavior)
available_dates = self._filter_completed_dates(available_dates, models)
# Count total completion skips across all models
total_completion_skips = sum(len(dates) for dates in completion_skips.values())
if total_completion_skips > 0:
warnings.append(
f"Skipped {total_completion_skips} model-days already completed"
)
logger.warning(f"Job {self.job_id}: {warnings[-1]}")
# Update to running
self.job_manager.update_job_status(self.job_id, "running")
logger.info(f"Job {self.job_id}: Starting execution - {len(available_dates)} dates, {len(models)} models")
logger.info(f"Job {self.job_id}: Starting execution - {len(dates_to_process)} dates, {len(models)} models")
return available_dates, warnings
return dates_to_process, warnings
def get_job_info(self) -> Dict[str, Any]:
"""

View File

@@ -0,0 +1,349 @@
"""
Tests for job skip status tracking functionality.
Tests the skip status feature that marks dates as skipped when they:
1. Have incomplete price data (weekends/holidays)
2. Are already completed from a previous job run
Tests also verify that jobs complete properly when all dates are in
terminal states (completed/failed/skipped).
"""
import pytest
import tempfile
from pathlib import Path
from api.job_manager import JobManager
from api.database import initialize_database
@pytest.fixture
def temp_db():
"""Create temporary database for testing."""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
db_path = f.name
initialize_database(db_path)
yield db_path
Path(db_path).unlink(missing_ok=True)
@pytest.fixture
def job_manager(temp_db):
"""Create JobManager with temporary database."""
return JobManager(db_path=temp_db)
class TestSkipStatusDatabase:
"""Test that database accepts 'skipped' status."""
def test_skipped_status_allowed_in_job_details(self, job_manager):
"""Test job_details accepts 'skipped' status without constraint violation."""
# Create job
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-01", "2025-10-02"],
models=["test-model"]
)
# Mark a detail as skipped - should not raise constraint violation
job_manager.update_job_detail_status(
job_id=job_id,
date="2025-10-01",
model="test-model",
status="skipped",
error="Test skip reason"
)
# Verify status was set
details = job_manager.get_job_details(job_id)
assert len(details) == 2
skipped_detail = next(d for d in details if d["date"] == "2025-10-01")
assert skipped_detail["status"] == "skipped"
assert skipped_detail["error"] == "Test skip reason"
class TestJobCompletionWithSkipped:
"""Test that jobs complete when skipped dates are counted."""
def test_job_completes_with_all_dates_skipped(self, job_manager):
"""Test job transitions to completed when all dates are skipped."""
# Create job with 3 dates
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-01", "2025-10-02", "2025-10-03"],
models=["test-model"]
)
# Mark all as skipped
for date in ["2025-10-01", "2025-10-02", "2025-10-03"]:
job_manager.update_job_detail_status(
job_id=job_id,
date=date,
model="test-model",
status="skipped",
error="Incomplete price data"
)
# Verify job completed
job = job_manager.get_job(job_id)
assert job["status"] == "completed"
assert job["completed_at"] is not None
def test_job_completes_with_mixed_completed_and_skipped(self, job_manager):
"""Test job completes when some dates completed, some skipped."""
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-01", "2025-10-02", "2025-10-03"],
models=["test-model"]
)
# Mark some completed, some skipped
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-01", model="test-model",
status="completed"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-02", model="test-model",
status="skipped", error="Already completed"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-03", model="test-model",
status="skipped", error="Incomplete price data"
)
# Verify job completed
job = job_manager.get_job(job_id)
assert job["status"] == "completed"
def test_job_partial_with_mixed_completed_failed_skipped(self, job_manager):
"""Test job status 'partial' when some failed, some completed, some skipped."""
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-01", "2025-10-02", "2025-10-03"],
models=["test-model"]
)
# Mix of statuses
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-01", model="test-model",
status="completed"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-02", model="test-model",
status="failed", error="Execution error"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-03", model="test-model",
status="skipped", error="Incomplete price data"
)
# Verify job status is partial
job = job_manager.get_job(job_id)
assert job["status"] == "partial"
def test_job_remains_running_with_pending_dates(self, job_manager):
"""Test job stays running when some dates are still pending."""
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-01", "2025-10-02", "2025-10-03"],
models=["test-model"]
)
# Only mark some as terminal states
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-01", model="test-model",
status="completed"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-02", model="test-model",
status="skipped", error="Already completed"
)
# Leave 2025-10-03 as pending
# Verify job still running (not completed)
job = job_manager.get_job(job_id)
assert job["status"] == "pending" # Not yet marked as running
assert job["completed_at"] is None
class TestProgressTrackingWithSkipped:
"""Test progress tracking includes skipped counts."""
def test_progress_includes_skipped_count(self, job_manager):
"""Test get_job_progress returns skipped count."""
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-01", "2025-10-02", "2025-10-03", "2025-10-04"],
models=["test-model"]
)
# Set various statuses
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-01", model="test-model",
status="completed"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-02", model="test-model",
status="skipped", error="Already completed"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-03", model="test-model",
status="skipped", error="Incomplete price data"
)
# Leave 2025-10-04 pending
# Check progress
progress = job_manager.get_job_progress(job_id)
assert progress["total_model_days"] == 4
assert progress["completed"] == 1
assert progress["failed"] == 0
assert progress["pending"] == 1
assert progress["skipped"] == 2
def test_progress_all_skipped(self, job_manager):
"""Test progress when all dates are skipped."""
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-01", "2025-10-02"],
models=["test-model"]
)
# Mark all as skipped
for date in ["2025-10-01", "2025-10-02"]:
job_manager.update_job_detail_status(
job_id=job_id, date=date, model="test-model",
status="skipped", error="Incomplete price data"
)
progress = job_manager.get_job_progress(job_id)
assert progress["skipped"] == 2
assert progress["completed"] == 0
assert progress["pending"] == 0
assert progress["failed"] == 0
class TestMultiModelSkipHandling:
"""Test skip status with multiple models having different completion states."""
def test_different_models_different_skip_states(self, job_manager):
"""Test that different models can have different skip states for same date."""
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-01", "2025-10-02"],
models=["model-a", "model-b"]
)
# Model A: 10/1 skipped (already completed), 10/2 completed
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-01", model="model-a",
status="skipped", error="Already completed"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-02", model="model-a",
status="completed"
)
# Model B: both dates completed
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-01", model="model-b",
status="completed"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-02", model="model-b",
status="completed"
)
# Verify details
details = job_manager.get_job_details(job_id)
model_a_10_01 = next(
d for d in details
if d["model"] == "model-a" and d["date"] == "2025-10-01"
)
model_b_10_01 = next(
d for d in details
if d["model"] == "model-b" and d["date"] == "2025-10-01"
)
assert model_a_10_01["status"] == "skipped"
assert model_a_10_01["error"] == "Already completed"
assert model_b_10_01["status"] == "completed"
assert model_b_10_01["error"] is None
def test_job_completes_with_per_model_skips(self, job_manager):
"""Test job completes when different models have different skip patterns."""
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-01", "2025-10-02"],
models=["model-a", "model-b"]
)
# Model A: one skipped, one completed
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-01", model="model-a",
status="skipped", error="Already completed"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-02", model="model-a",
status="completed"
)
# Model B: both completed
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-01", model="model-b",
status="completed"
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-02", model="model-b",
status="completed"
)
# Job should complete
job = job_manager.get_job(job_id)
assert job["status"] == "completed"
# Progress should show mixed counts
progress = job_manager.get_job_progress(job_id)
assert progress["completed"] == 3
assert progress["skipped"] == 1
assert progress["total_model_days"] == 4
class TestSkipReasons:
"""Test that skip reasons are properly stored and retrievable."""
def test_skip_reason_already_completed(self, job_manager):
"""Test 'Already completed' skip reason is stored."""
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-01"],
models=["test-model"]
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-01", model="test-model",
status="skipped", error="Already completed"
)
details = job_manager.get_job_details(job_id)
assert details[0]["error"] == "Already completed"
def test_skip_reason_incomplete_price_data(self, job_manager):
"""Test 'Incomplete price data' skip reason is stored."""
job_id = job_manager.create_job(
config_path="test_config.json",
date_range=["2025-10-04"],
models=["test-model"]
)
job_manager.update_job_detail_status(
job_id=job_id, date="2025-10-04", model="test-model",
status="skipped", error="Incomplete price data"
)
details = job_manager.get_job_details(job_id)
assert details[0]["error"] == "Incomplete price data"