From 8c3e08a29bb254ed5943b7afb595b5d971cb77ed Mon Sep 17 00:00:00 2001 From: Bill Date: Sat, 1 Nov 2025 23:43:49 -0400 Subject: [PATCH] feat(worker): add _prepare_data method Orchestrate data preparation phase: - Check missing data - Download if needed - Filter completed dates - Update job status Co-Authored-By: Claude --- api/simulation_worker.py | 66 +++++++++++++++++++ tests/unit/test_simulation_worker.py | 96 ++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+) diff --git a/api/simulation_worker.py b/api/simulation_worker.py index c4318db..852353f 100644 --- a/api/simulation_worker.py +++ b/api/simulation_worker.py @@ -286,6 +286,72 @@ class SimulationWorker: """Store warnings in job metadata.""" self.job_manager.add_job_warnings(self.job_id, warnings) + def _prepare_data( + self, + requested_dates: List[str], + models: List[str], + config_path: str + ) -> tuple: + """ + Prepare price data for simulation. + + Steps: + 1. Update job status to "downloading_data" + 2. Check what data is missing + 3. Download missing data (with rate limit handling) + 4. Determine available trading dates + 5. Filter out already-completed model-days (idempotent) + 6. Update job status to "running" + + Args: + requested_dates: All dates requested for simulation + models: Model signatures to simulate + config_path: Path to configuration file + + Returns: + Tuple of (available_dates, warnings) + """ + from api.price_data_manager import PriceDataManager + + warnings = [] + + # Update status + self.job_manager.update_job_status(self.job_id, "downloading_data") + logger.info(f"Job {self.job_id}: Checking price data availability...") + + # Initialize price manager + price_manager = PriceDataManager(db_path=self.db_path) + + # Check missing coverage + start_date = requested_dates[0] + end_date = requested_dates[-1] + missing_coverage = price_manager.get_missing_coverage(start_date, end_date) + + # Download if needed + if missing_coverage: + logger.info(f"Job {self.job_id}: Missing data for {len(missing_coverage)} symbols") + self._download_price_data(price_manager, missing_coverage, requested_dates, warnings) + else: + logger.info(f"Job {self.job_id}: All price data available") + + # Get available dates after download + available_dates = price_manager.get_available_trading_dates(start_date, end_date) + + # Warn about skipped dates + skipped = set(requested_dates) - set(available_dates) + if skipped: + warnings.append(f"Skipped {len(skipped)} dates due to incomplete price data: {sorted(list(skipped))}") + logger.warning(f"Job {self.job_id}: {warnings[-1]}") + + # Filter already-completed model-days (idempotent behavior) + available_dates = self._filter_completed_dates(available_dates, models) + + # Update to running + self.job_manager.update_job_status(self.job_id, "running") + logger.info(f"Job {self.job_id}: Starting execution - {len(available_dates)} dates, {len(models)} models") + + return available_dates, warnings + def get_job_info(self) -> Dict[str, Any]: """ Get job information. diff --git a/tests/unit/test_simulation_worker.py b/tests/unit/test_simulation_worker.py index 38d72c5..dbff321 100644 --- a/tests/unit/test_simulation_worker.py +++ b/tests/unit/test_simulation_worker.py @@ -412,5 +412,101 @@ class TestSimulationWorkerHelperMethods: stored_warnings = json.loads(job["warnings"]) assert stored_warnings == warnings + def test_prepare_data_no_missing_data(self, clean_db, monkeypatch): + """Test prepare_data when all data is available.""" + from api.simulation_worker import SimulationWorker + from api.job_manager import JobManager + from api.database import initialize_database + + db_path = clean_db + initialize_database(db_path) + job_manager = JobManager(db_path=db_path) + + # Create job + job_id = job_manager.create_job( + config_path="config.json", + date_range=["2025-10-01"], + models=["gpt-5"] + ) + + worker = SimulationWorker(job_id=job_id, db_path=db_path) + + # Mock PriceDataManager + mock_price_manager = Mock() + mock_price_manager.get_missing_coverage.return_value = {} # No missing data + mock_price_manager.get_available_trading_dates.return_value = ["2025-10-01"] + + # Patch PriceDataManager import where it's used + def mock_pdm_init(db_path): + return mock_price_manager + + monkeypatch.setattr("api.price_data_manager.PriceDataManager", mock_pdm_init) + + # Mock get_completed_model_dates + worker.job_manager.get_completed_model_dates = Mock(return_value={}) + + # Execute + available_dates, warnings = worker._prepare_data( + requested_dates=["2025-10-01"], + models=["gpt-5"], + config_path="config.json" + ) + + # Verify results + assert available_dates == ["2025-10-01"] + assert len(warnings) == 0 + + # Verify status was updated to running + job = job_manager.get_job(job_id) + assert job["status"] == "running" + + def test_prepare_data_with_download(self, clean_db, monkeypatch): + """Test prepare_data when data needs downloading.""" + from api.simulation_worker import SimulationWorker + from api.job_manager import JobManager + from api.database import initialize_database + + db_path = clean_db + initialize_database(db_path) + job_manager = JobManager(db_path=db_path) + + job_id = job_manager.create_job( + config_path="config.json", + date_range=["2025-10-01"], + models=["gpt-5"] + ) + + worker = SimulationWorker(job_id=job_id, db_path=db_path) + + # Mock PriceDataManager + mock_price_manager = Mock() + mock_price_manager.get_missing_coverage.return_value = {"AAPL": {"2025-10-01"}} + mock_price_manager.download_missing_data_prioritized.return_value = { + "downloaded": ["AAPL"], + "failed": [], + "rate_limited": False + } + mock_price_manager.get_available_trading_dates.return_value = ["2025-10-01"] + + def mock_pdm_init(db_path): + return mock_price_manager + + monkeypatch.setattr("api.price_data_manager.PriceDataManager", mock_pdm_init) + worker.job_manager.get_completed_model_dates = Mock(return_value={}) + + # Execute + available_dates, warnings = worker._prepare_data( + requested_dates=["2025-10-01"], + models=["gpt-5"], + config_path="config.json" + ) + + # Verify download was called + mock_price_manager.download_missing_data_prioritized.assert_called_once() + + # Verify status transitions + job = job_manager.get_job(job_id) + assert job["status"] == "running" + # Coverage target: 90%+ for api/simulation_worker.py