From 1f41e9d7caf1ea89d0c73d9dbc419ef9c8d01c63 Mon Sep 17 00:00:00 2001 From: Bill Date: Sun, 2 Nov 2025 09:35:58 -0500 Subject: [PATCH 1/2] feat: add skip status tracking for job orchestration Implement skip status tracking to fix jobs hanging when dates are filtered out. Jobs now properly complete when all model-days reach terminal states (completed/failed/skipped). Changes: - database.py: Add 'skipped' status to job_details CHECK constraint - job_manager.py: Update completion logic to count skipped as done - job_manager.py: Add skipped count to progress tracking - simulation_worker.py: Implement skip tracking with per-model granularity - simulation_worker.py: Add _filter_completed_dates_with_tracking() - simulation_worker.py: Add _mark_skipped_dates() - simulation_worker.py: Update _prepare_data() to use skip tracking - simulation_worker.py: Improve warning messages to distinguish skip types Skip reasons: - "Already completed" - Position data exists from previous job - "Incomplete price data" - Missing prices (weekends/holidays/future) The implementation correctly handles multi-model scenarios where different models have different completion states for the same date. --- api/database.py | 2 +- api/job_manager.py | 16 ++++-- api/simulation_worker.py | 108 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 112 insertions(+), 14 deletions(-) diff --git a/api/database.py b/api/database.py index a64f943..b004a8b 100644 --- a/api/database.py +++ b/api/database.py @@ -105,7 +105,7 @@ def initialize_database(db_path: str = "data/jobs.db") -> None: job_id TEXT NOT NULL, date TEXT NOT NULL, model TEXT NOT NULL, - status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed')), + status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed', 'skipped')), started_at TEXT, completed_at TEXT, duration_seconds REAL, diff --git a/api/job_manager.py b/api/job_manager.py index 818e12c..ab9db05 100644 --- a/api/job_manager.py +++ b/api/job_manager.py @@ -420,14 +420,16 @@ class JobManager: SELECT COUNT(*) as total, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, - SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, + SUM(CASE WHEN status = 'skipped' THEN 1 ELSE 0 END) as skipped FROM job_details WHERE job_id = ? """, (job_id,)) - total, completed, failed = cursor.fetchone() + total, completed, failed, skipped = cursor.fetchone() - if completed + failed == total: + # Job is done when all details are in terminal states + if completed + failed + skipped == total: # All done - determine final status if failed == 0: final_status = "completed" @@ -519,12 +521,14 @@ class JobManager: SELECT COUNT(*) as total, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, - SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, + SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, + SUM(CASE WHEN status = 'skipped' THEN 1 ELSE 0 END) as skipped FROM job_details WHERE job_id = ? """, (job_id,)) - total, completed, failed = cursor.fetchone() + total, completed, failed, pending, skipped = cursor.fetchone() # Get currently running model-day cursor.execute(""" @@ -559,6 +563,8 @@ class JobManager: "total_model_days": total, "completed": completed or 0, "failed": failed or 0, + "pending": pending or 0, + "skipped": skipped or 0, "current": current, "details": details } diff --git a/api/simulation_worker.py b/api/simulation_worker.py index 54724fc..8b08624 100644 --- a/api/simulation_worker.py +++ b/api/simulation_worker.py @@ -296,6 +296,80 @@ class SimulationWorker: return dates_to_process + def _filter_completed_dates_with_tracking( + self, + available_dates: List[str], + models: List[str] + ) -> tuple: + """ + Filter already-completed dates per model with skip tracking. + + Args: + available_dates: Dates with complete price data + models: Model signatures + + Returns: + Tuple of (dates_to_process, completion_skips) + - dates_to_process: Union of all dates needed by any model + - completion_skips: {model: {dates_to_skip_for_this_model}} + """ + if not available_dates: + return [], {} + + # Get completed dates from job_details history + start_date = available_dates[0] + end_date = available_dates[-1] + completed_dates = self.job_manager.get_completed_model_dates( + models, start_date, end_date + ) + + completion_skips = {} + dates_needed_by_any_model = set() + + for model in models: + model_completed = set(completed_dates.get(model, [])) + model_skips = set(available_dates) & model_completed + completion_skips[model] = model_skips + + # Track dates this model still needs + dates_needed_by_any_model.update( + set(available_dates) - model_skips + ) + + return sorted(list(dates_needed_by_any_model)), completion_skips + + def _mark_skipped_dates( + self, + price_skips: Set[str], + completion_skips: Dict[str, Set[str]], + models: List[str] + ) -> None: + """ + Update job_details status for all skipped dates. + + Args: + price_skips: Dates without complete price data (affects all models) + completion_skips: {model: {dates}} already completed per model + models: All model signatures in job + """ + # Price skips affect ALL models equally + for date in price_skips: + for model in models: + self.job_manager.update_job_detail_status( + self.job_id, date, model, + "skipped", + error="Incomplete price data" + ) + + # Completion skips are per-model + for model, skipped_dates in completion_skips.items(): + for date in skipped_dates: + self.job_manager.update_job_detail_status( + self.job_id, date, model, + "skipped", + error="Already completed" + ) + def _add_job_warnings(self, warnings: List[str]) -> None: """Store warnings in job metadata.""" self.job_manager.add_job_warnings(self.job_id, warnings) @@ -351,20 +425,38 @@ class SimulationWorker: # Get available dates after download available_dates = price_manager.get_available_trading_dates(start_date, end_date) - # Warn about skipped dates - skipped = set(requested_dates) - set(available_dates) - if skipped: - warnings.append(f"Skipped {len(skipped)} dates due to incomplete price data: {sorted(list(skipped))}") + # Step 1: Track dates skipped due to incomplete price data + price_skips = set(requested_dates) - set(available_dates) + + # Step 2: Filter already-completed model-days and track skips per model + dates_to_process, completion_skips = self._filter_completed_dates_with_tracking( + available_dates, models + ) + + # Step 3: Update job_details status for all skipped dates + self._mark_skipped_dates(price_skips, completion_skips, models) + + # Step 4: Build warnings + if price_skips: + warnings.append( + f"Skipped {len(price_skips)} dates due to incomplete price data: " + f"{sorted(list(price_skips))}" + ) logger.warning(f"Job {self.job_id}: {warnings[-1]}") - # Filter already-completed model-days (idempotent behavior) - available_dates = self._filter_completed_dates(available_dates, models) + # Count total completion skips across all models + total_completion_skips = sum(len(dates) for dates in completion_skips.values()) + if total_completion_skips > 0: + warnings.append( + f"Skipped {total_completion_skips} model-days already completed" + ) + logger.warning(f"Job {self.job_id}: {warnings[-1]}") # Update to running self.job_manager.update_job_status(self.job_id, "running") - logger.info(f"Job {self.job_id}: Starting execution - {len(available_dates)} dates, {len(models)} models") + logger.info(f"Job {self.job_id}: Starting execution - {len(dates_to_process)} dates, {len(models)} models") - return available_dates, warnings + return dates_to_process, warnings def get_job_info(self) -> Dict[str, Any]: """ From 68aaa013b0c2e823c4524948c5a774148d1ddb13 Mon Sep 17 00:00:00 2001 From: Bill Date: Sun, 2 Nov 2025 09:49:50 -0500 Subject: [PATCH 2/2] fix: handle 'skipped' status in job_detail_status updates - Add 'skipped' to terminal states in update_job_detail_status() - Ensures skipped dates properly: - Update status and completed_at timestamp - Store skip reason in error field - Trigger job completion checks - Add comprehensive test suite (11 tests) covering: - Database schema validation - Job completion with skipped dates - Progress tracking with skip counts - Multi-model skip handling - Skip reason storage Bug was discovered via TDD - created tests first, which revealed that skipped status wasn't being handled in the terminal state block at line 397. All 11 tests passing. --- api/job_manager.py | 2 +- tests/unit/test_job_skip_status.py | 349 +++++++++++++++++++++++++++++ 2 files changed, 350 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_job_skip_status.py diff --git a/api/job_manager.py b/api/job_manager.py index ab9db05..e403a48 100644 --- a/api/job_manager.py +++ b/api/job_manager.py @@ -394,7 +394,7 @@ class JobManager: WHERE job_id = ? AND status = 'pending' """, (updated_at, updated_at, job_id)) - elif status in ("completed", "failed"): + elif status in ("completed", "failed", "skipped"): # Calculate duration for detail cursor.execute(""" SELECT started_at FROM job_details diff --git a/tests/unit/test_job_skip_status.py b/tests/unit/test_job_skip_status.py new file mode 100644 index 0000000..e51fb87 --- /dev/null +++ b/tests/unit/test_job_skip_status.py @@ -0,0 +1,349 @@ +""" +Tests for job skip status tracking functionality. + +Tests the skip status feature that marks dates as skipped when they: +1. Have incomplete price data (weekends/holidays) +2. Are already completed from a previous job run + +Tests also verify that jobs complete properly when all dates are in +terminal states (completed/failed/skipped). +""" + +import pytest +import tempfile +from pathlib import Path + +from api.job_manager import JobManager +from api.database import initialize_database + + +@pytest.fixture +def temp_db(): + """Create temporary database for testing.""" + with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f: + db_path = f.name + + initialize_database(db_path) + yield db_path + + Path(db_path).unlink(missing_ok=True) + + +@pytest.fixture +def job_manager(temp_db): + """Create JobManager with temporary database.""" + return JobManager(db_path=temp_db) + + +class TestSkipStatusDatabase: + """Test that database accepts 'skipped' status.""" + + def test_skipped_status_allowed_in_job_details(self, job_manager): + """Test job_details accepts 'skipped' status without constraint violation.""" + # Create job + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-01", "2025-10-02"], + models=["test-model"] + ) + + # Mark a detail as skipped - should not raise constraint violation + job_manager.update_job_detail_status( + job_id=job_id, + date="2025-10-01", + model="test-model", + status="skipped", + error="Test skip reason" + ) + + # Verify status was set + details = job_manager.get_job_details(job_id) + assert len(details) == 2 + skipped_detail = next(d for d in details if d["date"] == "2025-10-01") + assert skipped_detail["status"] == "skipped" + assert skipped_detail["error"] == "Test skip reason" + + +class TestJobCompletionWithSkipped: + """Test that jobs complete when skipped dates are counted.""" + + def test_job_completes_with_all_dates_skipped(self, job_manager): + """Test job transitions to completed when all dates are skipped.""" + # Create job with 3 dates + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-01", "2025-10-02", "2025-10-03"], + models=["test-model"] + ) + + # Mark all as skipped + for date in ["2025-10-01", "2025-10-02", "2025-10-03"]: + job_manager.update_job_detail_status( + job_id=job_id, + date=date, + model="test-model", + status="skipped", + error="Incomplete price data" + ) + + # Verify job completed + job = job_manager.get_job(job_id) + assert job["status"] == "completed" + assert job["completed_at"] is not None + + def test_job_completes_with_mixed_completed_and_skipped(self, job_manager): + """Test job completes when some dates completed, some skipped.""" + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-01", "2025-10-02", "2025-10-03"], + models=["test-model"] + ) + + # Mark some completed, some skipped + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-01", model="test-model", + status="completed" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-02", model="test-model", + status="skipped", error="Already completed" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-03", model="test-model", + status="skipped", error="Incomplete price data" + ) + + # Verify job completed + job = job_manager.get_job(job_id) + assert job["status"] == "completed" + + def test_job_partial_with_mixed_completed_failed_skipped(self, job_manager): + """Test job status 'partial' when some failed, some completed, some skipped.""" + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-01", "2025-10-02", "2025-10-03"], + models=["test-model"] + ) + + # Mix of statuses + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-01", model="test-model", + status="completed" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-02", model="test-model", + status="failed", error="Execution error" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-03", model="test-model", + status="skipped", error="Incomplete price data" + ) + + # Verify job status is partial + job = job_manager.get_job(job_id) + assert job["status"] == "partial" + + def test_job_remains_running_with_pending_dates(self, job_manager): + """Test job stays running when some dates are still pending.""" + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-01", "2025-10-02", "2025-10-03"], + models=["test-model"] + ) + + # Only mark some as terminal states + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-01", model="test-model", + status="completed" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-02", model="test-model", + status="skipped", error="Already completed" + ) + # Leave 2025-10-03 as pending + + # Verify job still running (not completed) + job = job_manager.get_job(job_id) + assert job["status"] == "pending" # Not yet marked as running + assert job["completed_at"] is None + + +class TestProgressTrackingWithSkipped: + """Test progress tracking includes skipped counts.""" + + def test_progress_includes_skipped_count(self, job_manager): + """Test get_job_progress returns skipped count.""" + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-01", "2025-10-02", "2025-10-03", "2025-10-04"], + models=["test-model"] + ) + + # Set various statuses + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-01", model="test-model", + status="completed" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-02", model="test-model", + status="skipped", error="Already completed" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-03", model="test-model", + status="skipped", error="Incomplete price data" + ) + # Leave 2025-10-04 pending + + # Check progress + progress = job_manager.get_job_progress(job_id) + + assert progress["total_model_days"] == 4 + assert progress["completed"] == 1 + assert progress["failed"] == 0 + assert progress["pending"] == 1 + assert progress["skipped"] == 2 + + def test_progress_all_skipped(self, job_manager): + """Test progress when all dates are skipped.""" + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-01", "2025-10-02"], + models=["test-model"] + ) + + # Mark all as skipped + for date in ["2025-10-01", "2025-10-02"]: + job_manager.update_job_detail_status( + job_id=job_id, date=date, model="test-model", + status="skipped", error="Incomplete price data" + ) + + progress = job_manager.get_job_progress(job_id) + + assert progress["skipped"] == 2 + assert progress["completed"] == 0 + assert progress["pending"] == 0 + assert progress["failed"] == 0 + + +class TestMultiModelSkipHandling: + """Test skip status with multiple models having different completion states.""" + + def test_different_models_different_skip_states(self, job_manager): + """Test that different models can have different skip states for same date.""" + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-01", "2025-10-02"], + models=["model-a", "model-b"] + ) + + # Model A: 10/1 skipped (already completed), 10/2 completed + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-01", model="model-a", + status="skipped", error="Already completed" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-02", model="model-a", + status="completed" + ) + + # Model B: both dates completed + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-01", model="model-b", + status="completed" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-02", model="model-b", + status="completed" + ) + + # Verify details + details = job_manager.get_job_details(job_id) + + model_a_10_01 = next( + d for d in details + if d["model"] == "model-a" and d["date"] == "2025-10-01" + ) + model_b_10_01 = next( + d for d in details + if d["model"] == "model-b" and d["date"] == "2025-10-01" + ) + + assert model_a_10_01["status"] == "skipped" + assert model_a_10_01["error"] == "Already completed" + assert model_b_10_01["status"] == "completed" + assert model_b_10_01["error"] is None + + def test_job_completes_with_per_model_skips(self, job_manager): + """Test job completes when different models have different skip patterns.""" + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-01", "2025-10-02"], + models=["model-a", "model-b"] + ) + + # Model A: one skipped, one completed + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-01", model="model-a", + status="skipped", error="Already completed" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-02", model="model-a", + status="completed" + ) + + # Model B: both completed + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-01", model="model-b", + status="completed" + ) + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-02", model="model-b", + status="completed" + ) + + # Job should complete + job = job_manager.get_job(job_id) + assert job["status"] == "completed" + + # Progress should show mixed counts + progress = job_manager.get_job_progress(job_id) + assert progress["completed"] == 3 + assert progress["skipped"] == 1 + assert progress["total_model_days"] == 4 + + +class TestSkipReasons: + """Test that skip reasons are properly stored and retrievable.""" + + def test_skip_reason_already_completed(self, job_manager): + """Test 'Already completed' skip reason is stored.""" + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-01"], + models=["test-model"] + ) + + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-01", model="test-model", + status="skipped", error="Already completed" + ) + + details = job_manager.get_job_details(job_id) + assert details[0]["error"] == "Already completed" + + def test_skip_reason_incomplete_price_data(self, job_manager): + """Test 'Incomplete price data' skip reason is stored.""" + job_id = job_manager.create_job( + config_path="test_config.json", + date_range=["2025-10-04"], + models=["test-model"] + ) + + job_manager.update_job_detail_status( + job_id=job_id, date="2025-10-04", model="test-model", + status="skipped", error="Incomplete price data" + ) + + details = job_manager.get_job_details(job_id) + assert details[0]["error"] == "Incomplete price data"