mirror of
https://github.com/Xe138/AI-Trader.git
synced 2026-04-02 01:27:24 -04:00
Previously, when re-running a job with some model-days already completed:
- _prepare_data() marked them as "skipped" with error="Already completed"
- But _execute_date() didn't check the skip list before launching executors
- ModelDayExecutor would start, change status to "running", and never complete
- Job would hang with status="running" and pending count > 0
Fixed by:
- _prepare_data() now returns completion_skips: {model: {dates}}
- _execute_date() receives completion_skips and filters out already-completed models
- Skipped model-days are not submitted to ThreadPoolExecutor
- Job completes correctly, skipped model-days remain with status="skipped"
This ensures idempotent job behavior - re-running a job only executes
model-days that haven't completed yet.
Fixes #73
483 lines
17 KiB
Python
483 lines
17 KiB
Python
"""
|
|
Simulation job orchestration worker.
|
|
|
|
This module provides:
|
|
- Job execution orchestration
|
|
- Date-sequential, model-parallel execution
|
|
- Progress tracking and status updates
|
|
- Error handling and recovery
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Any, List, Set
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
from api.job_manager import JobManager
|
|
from api.model_day_executor import ModelDayExecutor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SimulationWorker:
|
|
"""
|
|
Orchestrates execution of a simulation job.
|
|
|
|
Responsibilities:
|
|
- Execute all model-day combinations for a job
|
|
- Date-sequential execution (one date at a time)
|
|
- Model-parallel execution (all models for a date run concurrently)
|
|
- Update job status throughout execution
|
|
- Handle failures gracefully
|
|
|
|
Execution Strategy:
|
|
For each date in job.date_range:
|
|
Execute all models in parallel using ThreadPoolExecutor
|
|
Wait for all models to complete before moving to next date
|
|
|
|
Status Transitions:
|
|
pending → running → completed (all succeeded)
|
|
→ partial (some failed)
|
|
→ failed (job-level error)
|
|
"""
|
|
|
|
def __init__(self, job_id: str, db_path: str = "data/jobs.db", max_workers: int = 4):
|
|
"""
|
|
Initialize SimulationWorker.
|
|
|
|
Args:
|
|
job_id: Job UUID to execute
|
|
db_path: Path to SQLite database
|
|
max_workers: Maximum concurrent model executions per date
|
|
"""
|
|
self.job_id = job_id
|
|
self.db_path = db_path
|
|
self.max_workers = max_workers
|
|
self.job_manager = JobManager(db_path=db_path)
|
|
|
|
logger.info(f"Initialized worker for job {job_id}")
|
|
|
|
def run(self) -> Dict[str, Any]:
|
|
"""
|
|
Execute the simulation job.
|
|
|
|
Returns:
|
|
Result dict with success status and summary
|
|
|
|
Process:
|
|
1. Get job details (dates, models, config)
|
|
2. Prepare data (download if needed)
|
|
3. For each date sequentially:
|
|
a. Execute all models in parallel
|
|
b. Wait for all to complete
|
|
c. Update progress
|
|
4. Determine final job status
|
|
5. Store warnings if any
|
|
|
|
Error Handling:
|
|
- Individual model failures: Mark detail as failed, continue with others
|
|
- Job-level errors: Mark entire job as failed
|
|
"""
|
|
try:
|
|
# Get job info
|
|
job = self.job_manager.get_job(self.job_id)
|
|
if not job:
|
|
raise ValueError(f"Job {self.job_id} not found")
|
|
|
|
date_range = job["date_range"]
|
|
models = job["models"]
|
|
config_path = job["config_path"]
|
|
|
|
logger.info(f"Starting job {self.job_id}: {len(date_range)} dates, {len(models)} models")
|
|
|
|
# NEW: Prepare price data (download if needed)
|
|
available_dates, warnings, completion_skips = self._prepare_data(date_range, models, config_path)
|
|
|
|
if not available_dates:
|
|
error_msg = "No trading dates available after price data preparation"
|
|
self.job_manager.update_job_status(self.job_id, "failed", error=error_msg)
|
|
return {"success": False, "error": error_msg}
|
|
|
|
# Execute available dates only
|
|
for date in available_dates:
|
|
logger.info(f"Processing date {date} with {len(models)} models")
|
|
self._execute_date(date, models, config_path, completion_skips)
|
|
|
|
# Job completed - determine final status
|
|
progress = self.job_manager.get_job_progress(self.job_id)
|
|
|
|
if progress["failed"] == 0:
|
|
final_status = "completed"
|
|
elif progress["completed"] > 0:
|
|
final_status = "partial"
|
|
else:
|
|
final_status = "failed"
|
|
|
|
# Add warnings if any dates were skipped
|
|
if warnings:
|
|
self._add_job_warnings(warnings)
|
|
|
|
# Note: Job status is already updated by model_day_executor's detail status updates
|
|
# We don't need to explicitly call update_job_status here as it's handled automatically
|
|
# by the status transition logic in JobManager.update_job_detail_status
|
|
|
|
logger.info(f"Job {self.job_id} finished with status: {final_status}")
|
|
|
|
return {
|
|
"success": True,
|
|
"job_id": self.job_id,
|
|
"status": final_status,
|
|
"total_model_days": progress["total_model_days"],
|
|
"completed": progress["completed"],
|
|
"failed": progress["failed"],
|
|
"warnings": warnings
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"Job execution failed: {str(e)}"
|
|
logger.error(f"Job {self.job_id}: {error_msg}", exc_info=True)
|
|
|
|
# Update job to failed
|
|
self.job_manager.update_job_status(self.job_id, "failed", error=error_msg)
|
|
|
|
return {
|
|
"success": False,
|
|
"job_id": self.job_id,
|
|
"error": error_msg
|
|
}
|
|
|
|
def _execute_date(self, date: str, models: List[str], config_path: str,
|
|
completion_skips: Dict[str, Set[str]] = None) -> None:
|
|
"""
|
|
Execute all models for a single date in parallel.
|
|
|
|
Args:
|
|
date: Trading date (YYYY-MM-DD)
|
|
models: List of model signatures to execute
|
|
config_path: Path to configuration file
|
|
completion_skips: {model: {dates}} of already-completed model-days to skip
|
|
|
|
Uses ThreadPoolExecutor to run all models concurrently for this date.
|
|
Waits for all models to complete before returning.
|
|
Skips models that have already completed this date.
|
|
"""
|
|
if completion_skips is None:
|
|
completion_skips = {}
|
|
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
|
# Submit all model executions for this date
|
|
futures = []
|
|
for model in models:
|
|
# Skip if this model-day was already completed
|
|
if date in completion_skips.get(model, set()):
|
|
logger.debug(f"Skipping {model} on {date} (already completed)")
|
|
continue
|
|
|
|
future = executor.submit(
|
|
self._execute_model_day,
|
|
date,
|
|
model,
|
|
config_path
|
|
)
|
|
futures.append(future)
|
|
|
|
# Wait for all to complete
|
|
for future in as_completed(futures):
|
|
try:
|
|
result = future.result()
|
|
if result["success"]:
|
|
logger.debug(f"Completed {result['model']} on {result['date']}")
|
|
else:
|
|
logger.warning(f"Failed {result['model']} on {result['date']}: {result.get('error')}")
|
|
except Exception as e:
|
|
logger.error(f"Exception in model execution: {e}", exc_info=True)
|
|
|
|
def _execute_model_day(self, date: str, model: str, config_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Execute a single model for a single date.
|
|
|
|
Args:
|
|
date: Trading date (YYYY-MM-DD)
|
|
model: Model signature
|
|
config_path: Path to configuration file
|
|
|
|
Returns:
|
|
Execution result dict
|
|
"""
|
|
try:
|
|
executor = ModelDayExecutor(
|
|
job_id=self.job_id,
|
|
date=date,
|
|
model_sig=model,
|
|
config_path=config_path,
|
|
db_path=self.db_path
|
|
)
|
|
|
|
result = executor.execute()
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to execute {model} on {date}: {e}", exc_info=True)
|
|
return {
|
|
"success": False,
|
|
"job_id": self.job_id,
|
|
"date": date,
|
|
"model": model,
|
|
"error": str(e)
|
|
}
|
|
|
|
def _download_price_data(
|
|
self,
|
|
price_manager,
|
|
missing_coverage: Dict[str, Set[str]],
|
|
requested_dates: List[str],
|
|
warnings: List[str]
|
|
) -> None:
|
|
"""Download missing price data with progress logging."""
|
|
logger.info(f"Job {self.job_id}: Starting prioritized download...")
|
|
|
|
requested_dates_set = set(requested_dates)
|
|
|
|
download_result = price_manager.download_missing_data_prioritized(
|
|
missing_coverage,
|
|
requested_dates_set
|
|
)
|
|
|
|
downloaded = len(download_result["downloaded"])
|
|
failed = len(download_result["failed"])
|
|
total = downloaded + failed
|
|
|
|
logger.info(
|
|
f"Job {self.job_id}: Download complete - "
|
|
f"{downloaded}/{total} symbols succeeded"
|
|
)
|
|
|
|
if download_result["rate_limited"]:
|
|
msg = f"Rate limit reached - downloaded {downloaded}/{total} symbols"
|
|
warnings.append(msg)
|
|
logger.warning(f"Job {self.job_id}: {msg}")
|
|
|
|
if failed > 0 and not download_result["rate_limited"]:
|
|
msg = f"{failed} symbols failed to download"
|
|
warnings.append(msg)
|
|
logger.warning(f"Job {self.job_id}: {msg}")
|
|
|
|
def _filter_completed_dates(
|
|
self,
|
|
available_dates: List[str],
|
|
models: List[str]
|
|
) -> List[str]:
|
|
"""
|
|
Filter out dates that are already completed for all models.
|
|
|
|
Implements idempotent job behavior - skip model-days that already
|
|
have completed data.
|
|
|
|
Args:
|
|
available_dates: List of dates with complete price data
|
|
models: List of model signatures
|
|
|
|
Returns:
|
|
List of dates that need processing
|
|
"""
|
|
if not available_dates:
|
|
return []
|
|
|
|
# Get completed dates from job_manager
|
|
start_date = available_dates[0]
|
|
end_date = available_dates[-1]
|
|
|
|
completed_dates = self.job_manager.get_completed_model_dates(
|
|
models,
|
|
start_date,
|
|
end_date
|
|
)
|
|
|
|
# Build list of dates that need processing
|
|
dates_to_process = []
|
|
for date in available_dates:
|
|
# Check if any model needs this date
|
|
needs_processing = False
|
|
for model in models:
|
|
if date not in completed_dates.get(model, []):
|
|
needs_processing = True
|
|
break
|
|
|
|
if needs_processing:
|
|
dates_to_process.append(date)
|
|
|
|
return dates_to_process
|
|
|
|
def _filter_completed_dates_with_tracking(
|
|
self,
|
|
available_dates: List[str],
|
|
models: List[str]
|
|
) -> tuple:
|
|
"""
|
|
Filter already-completed dates per model with skip tracking.
|
|
|
|
Args:
|
|
available_dates: Dates with complete price data
|
|
models: Model signatures
|
|
|
|
Returns:
|
|
Tuple of (dates_to_process, completion_skips)
|
|
- dates_to_process: Union of all dates needed by any model
|
|
- completion_skips: {model: {dates_to_skip_for_this_model}}
|
|
"""
|
|
if not available_dates:
|
|
return [], {}
|
|
|
|
# Get completed dates from job_details history
|
|
start_date = available_dates[0]
|
|
end_date = available_dates[-1]
|
|
completed_dates = self.job_manager.get_completed_model_dates(
|
|
models, start_date, end_date
|
|
)
|
|
|
|
completion_skips = {}
|
|
dates_needed_by_any_model = set()
|
|
|
|
for model in models:
|
|
model_completed = set(completed_dates.get(model, []))
|
|
model_skips = set(available_dates) & model_completed
|
|
completion_skips[model] = model_skips
|
|
|
|
# Track dates this model still needs
|
|
dates_needed_by_any_model.update(
|
|
set(available_dates) - model_skips
|
|
)
|
|
|
|
return sorted(list(dates_needed_by_any_model)), completion_skips
|
|
|
|
def _mark_skipped_dates(
|
|
self,
|
|
price_skips: Set[str],
|
|
completion_skips: Dict[str, Set[str]],
|
|
models: List[str]
|
|
) -> None:
|
|
"""
|
|
Update job_details status for all skipped dates.
|
|
|
|
Args:
|
|
price_skips: Dates without complete price data (affects all models)
|
|
completion_skips: {model: {dates}} already completed per model
|
|
models: All model signatures in job
|
|
"""
|
|
# Price skips affect ALL models equally
|
|
for date in price_skips:
|
|
for model in models:
|
|
self.job_manager.update_job_detail_status(
|
|
self.job_id, date, model,
|
|
"skipped",
|
|
error="Incomplete price data"
|
|
)
|
|
|
|
# Completion skips are per-model
|
|
for model, skipped_dates in completion_skips.items():
|
|
for date in skipped_dates:
|
|
self.job_manager.update_job_detail_status(
|
|
self.job_id, date, model,
|
|
"skipped",
|
|
error="Already completed"
|
|
)
|
|
|
|
def _add_job_warnings(self, warnings: List[str]) -> None:
|
|
"""Store warnings in job metadata."""
|
|
self.job_manager.add_job_warnings(self.job_id, warnings)
|
|
|
|
def _prepare_data(
|
|
self,
|
|
requested_dates: List[str],
|
|
models: List[str],
|
|
config_path: str
|
|
) -> tuple:
|
|
"""
|
|
Prepare price data for simulation.
|
|
|
|
Steps:
|
|
1. Update job status to "downloading_data"
|
|
2. Check what data is missing
|
|
3. Download missing data (with rate limit handling)
|
|
4. Determine available trading dates
|
|
5. Filter out already-completed model-days (idempotent)
|
|
6. Update job status to "running"
|
|
|
|
Args:
|
|
requested_dates: All dates requested for simulation
|
|
models: Model signatures to simulate
|
|
config_path: Path to configuration file
|
|
|
|
Returns:
|
|
Tuple of (available_dates, warnings, completion_skips)
|
|
- available_dates: Dates to process
|
|
- warnings: Warning messages
|
|
- completion_skips: {model: {dates}} of already-completed model-days
|
|
"""
|
|
from api.price_data_manager import PriceDataManager
|
|
|
|
warnings = []
|
|
|
|
# Update status
|
|
self.job_manager.update_job_status(self.job_id, "downloading_data")
|
|
logger.info(f"Job {self.job_id}: Checking price data availability...")
|
|
|
|
# Initialize price manager
|
|
price_manager = PriceDataManager(db_path=self.db_path)
|
|
|
|
# Check missing coverage
|
|
start_date = requested_dates[0]
|
|
end_date = requested_dates[-1]
|
|
missing_coverage = price_manager.get_missing_coverage(start_date, end_date)
|
|
|
|
# Download if needed
|
|
if missing_coverage:
|
|
logger.info(f"Job {self.job_id}: Missing data for {len(missing_coverage)} symbols")
|
|
self._download_price_data(price_manager, missing_coverage, requested_dates, warnings)
|
|
else:
|
|
logger.info(f"Job {self.job_id}: All price data available")
|
|
|
|
# Get available dates after download
|
|
available_dates = price_manager.get_available_trading_dates(start_date, end_date)
|
|
|
|
# Step 1: Track dates skipped due to incomplete price data
|
|
price_skips = set(requested_dates) - set(available_dates)
|
|
|
|
# Step 2: Filter already-completed model-days and track skips per model
|
|
dates_to_process, completion_skips = self._filter_completed_dates_with_tracking(
|
|
available_dates, models
|
|
)
|
|
|
|
# Step 3: Update job_details status for all skipped dates
|
|
self._mark_skipped_dates(price_skips, completion_skips, models)
|
|
|
|
# Step 4: Build warnings
|
|
if price_skips:
|
|
warnings.append(
|
|
f"Skipped {len(price_skips)} dates due to incomplete price data: "
|
|
f"{sorted(list(price_skips))}"
|
|
)
|
|
logger.warning(f"Job {self.job_id}: {warnings[-1]}")
|
|
|
|
# Count total completion skips across all models
|
|
total_completion_skips = sum(len(dates) for dates in completion_skips.values())
|
|
if total_completion_skips > 0:
|
|
warnings.append(
|
|
f"Skipped {total_completion_skips} model-days already completed"
|
|
)
|
|
logger.warning(f"Job {self.job_id}: {warnings[-1]}")
|
|
|
|
# Update to running
|
|
self.job_manager.update_job_status(self.job_id, "running")
|
|
logger.info(f"Job {self.job_id}: Starting execution - {len(dates_to_process)} dates, {len(models)} models")
|
|
|
|
return dates_to_process, warnings, completion_skips
|
|
|
|
def get_job_info(self) -> Dict[str, Any]:
|
|
"""
|
|
Get job information.
|
|
|
|
Returns:
|
|
Job data dict
|
|
"""
|
|
return self.job_manager.get_job(self.job_id)
|