From 5e5354e2af01ced9fff96557524082d3e60add99 Mon Sep 17 00:00:00 2001 From: Bill Date: Sat, 1 Nov 2025 23:49:24 -0400 Subject: [PATCH] feat(worker): integrate data preparation into run() method Call _prepare_data before executing trades: - Download missing data if needed - Filter completed dates - Store warnings - Handle empty date scenarios Co-Authored-By: Claude --- api/simulation_worker.py | 26 ++++-- tests/integration/test_async_download.py | 100 +++++++++++++++++++++++ 2 files changed, 120 insertions(+), 6 deletions(-) create mode 100644 tests/integration/test_async_download.py diff --git a/api/simulation_worker.py b/api/simulation_worker.py index 852353f..54724fc 100644 --- a/api/simulation_worker.py +++ b/api/simulation_worker.py @@ -65,12 +65,13 @@ class SimulationWorker: Process: 1. Get job details (dates, models, config) - 2. For each date sequentially: + 2. Prepare data (download if needed) + 3. For each date sequentially: a. Execute all models in parallel b. Wait for all to complete c. Update progress - 3. Determine final job status - 4. Update job with final status + 4. Determine final job status + 5. Store warnings if any Error Handling: - Individual model failures: Mark detail as failed, continue with others @@ -88,8 +89,16 @@ class SimulationWorker: logger.info(f"Starting job {self.job_id}: {len(date_range)} dates, {len(models)} models") - # Execute date-by-date (sequential) - for date in date_range: + # NEW: Prepare price data (download if needed) + available_dates, warnings = self._prepare_data(date_range, models, config_path) + + if not available_dates: + error_msg = "No trading dates available after price data preparation" + self.job_manager.update_job_status(self.job_id, "failed", error=error_msg) + return {"success": False, "error": error_msg} + + # Execute available dates only + for date in available_dates: logger.info(f"Processing date {date} with {len(models)} models") self._execute_date(date, models, config_path) @@ -103,6 +112,10 @@ class SimulationWorker: else: final_status = "failed" + # Add warnings if any dates were skipped + if warnings: + self._add_job_warnings(warnings) + # Note: Job status is already updated by model_day_executor's detail status updates # We don't need to explicitly call update_job_status here as it's handled automatically # by the status transition logic in JobManager.update_job_detail_status @@ -115,7 +128,8 @@ class SimulationWorker: "status": final_status, "total_model_days": progress["total_model_days"], "completed": progress["completed"], - "failed": progress["failed"] + "failed": progress["failed"], + "warnings": warnings } except Exception as e: diff --git a/tests/integration/test_async_download.py b/tests/integration/test_async_download.py new file mode 100644 index 0000000..b10eca9 --- /dev/null +++ b/tests/integration/test_async_download.py @@ -0,0 +1,100 @@ +import pytest +import time +from api.database import initialize_database +from api.job_manager import JobManager +from api.simulation_worker import SimulationWorker +from unittest.mock import Mock, patch + +def test_worker_prepares_data_before_execution(tmp_path): + """Test that worker calls _prepare_data before executing trades.""" + db_path = str(tmp_path / "test.db") + initialize_database(db_path) + job_manager = JobManager(db_path=db_path) + + # Create job + job_id = job_manager.create_job( + config_path="configs/default_config.json", + date_range=["2025-10-01"], + models=["gpt-5"] + ) + + worker = SimulationWorker(job_id=job_id, db_path=db_path) + + # Mock _prepare_data to track call + original_prepare = worker._prepare_data + prepare_called = [] + + def mock_prepare(*args, **kwargs): + prepare_called.append(True) + return (["2025-10-01"], []) # Return available dates, no warnings + + worker._prepare_data = mock_prepare + + # Mock _execute_date to avoid actual execution + worker._execute_date = Mock() + + # Run worker + result = worker.run() + + # Verify _prepare_data was called + assert len(prepare_called) == 1 + assert result["success"] is True + +def test_worker_handles_no_available_dates(tmp_path): + """Test worker fails gracefully when no dates are available.""" + db_path = str(tmp_path / "test.db") + initialize_database(db_path) + job_manager = JobManager(db_path=db_path) + + job_id = job_manager.create_job( + config_path="configs/default_config.json", + date_range=["2025-10-01"], + models=["gpt-5"] + ) + + worker = SimulationWorker(job_id=job_id, db_path=db_path) + + # Mock _prepare_data to return empty dates + worker._prepare_data = Mock(return_value=([], [])) + + # Run worker + result = worker.run() + + # Should fail with descriptive error + assert result["success"] is False + assert "No trading dates available" in result["error"] + + # Job should be marked as failed + job = job_manager.get_job(job_id) + assert job["status"] == "failed" + +def test_worker_stores_warnings(tmp_path): + """Test worker stores warnings from prepare_data.""" + db_path = str(tmp_path / "test.db") + initialize_database(db_path) + job_manager = JobManager(db_path=db_path) + + job_id = job_manager.create_job( + config_path="configs/default_config.json", + date_range=["2025-10-01"], + models=["gpt-5"] + ) + + worker = SimulationWorker(job_id=job_id, db_path=db_path) + + # Mock _prepare_data to return warnings + warnings = ["Rate limited", "Skipped 1 date"] + worker._prepare_data = Mock(return_value=(["2025-10-01"], warnings)) + worker._execute_date = Mock() + + # Run worker + result = worker.run() + + # Verify warnings in result + assert result["warnings"] == warnings + + # Verify warnings stored in database + import json + job = job_manager.get_job(job_id) + stored_warnings = json.loads(job["warnings"]) + assert stored_warnings == warnings