feat(worker): add _prepare_data method

Orchestrate data preparation phase:
- Check missing data
- Download if needed
- Filter completed dates
- Update job status

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-01 23:43:49 -04:00
parent 445183d5bf
commit 8c3e08a29b
2 changed files with 162 additions and 0 deletions

View File

@@ -286,6 +286,72 @@ class SimulationWorker:
"""Store warnings in job metadata."""
self.job_manager.add_job_warnings(self.job_id, warnings)
def _prepare_data(
self,
requested_dates: List[str],
models: List[str],
config_path: str
) -> tuple:
"""
Prepare price data for simulation.
Steps:
1. Update job status to "downloading_data"
2. Check what data is missing
3. Download missing data (with rate limit handling)
4. Determine available trading dates
5. Filter out already-completed model-days (idempotent)
6. Update job status to "running"
Args:
requested_dates: All dates requested for simulation
models: Model signatures to simulate
config_path: Path to configuration file
Returns:
Tuple of (available_dates, warnings)
"""
from api.price_data_manager import PriceDataManager
warnings = []
# Update status
self.job_manager.update_job_status(self.job_id, "downloading_data")
logger.info(f"Job {self.job_id}: Checking price data availability...")
# Initialize price manager
price_manager = PriceDataManager(db_path=self.db_path)
# Check missing coverage
start_date = requested_dates[0]
end_date = requested_dates[-1]
missing_coverage = price_manager.get_missing_coverage(start_date, end_date)
# Download if needed
if missing_coverage:
logger.info(f"Job {self.job_id}: Missing data for {len(missing_coverage)} symbols")
self._download_price_data(price_manager, missing_coverage, requested_dates, warnings)
else:
logger.info(f"Job {self.job_id}: All price data available")
# Get available dates after download
available_dates = price_manager.get_available_trading_dates(start_date, end_date)
# Warn about skipped dates
skipped = set(requested_dates) - set(available_dates)
if skipped:
warnings.append(f"Skipped {len(skipped)} dates due to incomplete price data: {sorted(list(skipped))}")
logger.warning(f"Job {self.job_id}: {warnings[-1]}")
# Filter already-completed model-days (idempotent behavior)
available_dates = self._filter_completed_dates(available_dates, models)
# Update to running
self.job_manager.update_job_status(self.job_id, "running")
logger.info(f"Job {self.job_id}: Starting execution - {len(available_dates)} dates, {len(models)} models")
return available_dates, warnings
def get_job_info(self) -> Dict[str, Any]:
"""
Get job information.