mirror of
https://github.com/Xe138/AI-Trader.git
synced 2026-04-01 17:17:24 -04:00
Delegate to JobManager.add_job_warnings for storing warnings. Co-Authored-By: Claude <noreply@anthropic.com>
297 lines
9.7 KiB
Python
297 lines
9.7 KiB
Python
"""
|
|
Simulation job orchestration worker.
|
|
|
|
This module provides:
|
|
- Job execution orchestration
|
|
- Date-sequential, model-parallel execution
|
|
- Progress tracking and status updates
|
|
- Error handling and recovery
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Any, List, Set
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
from api.job_manager import JobManager
|
|
from api.model_day_executor import ModelDayExecutor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SimulationWorker:
|
|
"""
|
|
Orchestrates execution of a simulation job.
|
|
|
|
Responsibilities:
|
|
- Execute all model-day combinations for a job
|
|
- Date-sequential execution (one date at a time)
|
|
- Model-parallel execution (all models for a date run concurrently)
|
|
- Update job status throughout execution
|
|
- Handle failures gracefully
|
|
|
|
Execution Strategy:
|
|
For each date in job.date_range:
|
|
Execute all models in parallel using ThreadPoolExecutor
|
|
Wait for all models to complete before moving to next date
|
|
|
|
Status Transitions:
|
|
pending → running → completed (all succeeded)
|
|
→ partial (some failed)
|
|
→ failed (job-level error)
|
|
"""
|
|
|
|
def __init__(self, job_id: str, db_path: str = "data/jobs.db", max_workers: int = 4):
|
|
"""
|
|
Initialize SimulationWorker.
|
|
|
|
Args:
|
|
job_id: Job UUID to execute
|
|
db_path: Path to SQLite database
|
|
max_workers: Maximum concurrent model executions per date
|
|
"""
|
|
self.job_id = job_id
|
|
self.db_path = db_path
|
|
self.max_workers = max_workers
|
|
self.job_manager = JobManager(db_path=db_path)
|
|
|
|
logger.info(f"Initialized worker for job {job_id}")
|
|
|
|
def run(self) -> Dict[str, Any]:
|
|
"""
|
|
Execute the simulation job.
|
|
|
|
Returns:
|
|
Result dict with success status and summary
|
|
|
|
Process:
|
|
1. Get job details (dates, models, config)
|
|
2. For each date sequentially:
|
|
a. Execute all models in parallel
|
|
b. Wait for all to complete
|
|
c. Update progress
|
|
3. Determine final job status
|
|
4. Update job with final status
|
|
|
|
Error Handling:
|
|
- Individual model failures: Mark detail as failed, continue with others
|
|
- Job-level errors: Mark entire job as failed
|
|
"""
|
|
try:
|
|
# Get job info
|
|
job = self.job_manager.get_job(self.job_id)
|
|
if not job:
|
|
raise ValueError(f"Job {self.job_id} not found")
|
|
|
|
date_range = job["date_range"]
|
|
models = job["models"]
|
|
config_path = job["config_path"]
|
|
|
|
logger.info(f"Starting job {self.job_id}: {len(date_range)} dates, {len(models)} models")
|
|
|
|
# Execute date-by-date (sequential)
|
|
for date in date_range:
|
|
logger.info(f"Processing date {date} with {len(models)} models")
|
|
self._execute_date(date, models, config_path)
|
|
|
|
# Job completed - determine final status
|
|
progress = self.job_manager.get_job_progress(self.job_id)
|
|
|
|
if progress["failed"] == 0:
|
|
final_status = "completed"
|
|
elif progress["completed"] > 0:
|
|
final_status = "partial"
|
|
else:
|
|
final_status = "failed"
|
|
|
|
# Note: Job status is already updated by model_day_executor's detail status updates
|
|
# We don't need to explicitly call update_job_status here as it's handled automatically
|
|
# by the status transition logic in JobManager.update_job_detail_status
|
|
|
|
logger.info(f"Job {self.job_id} finished with status: {final_status}")
|
|
|
|
return {
|
|
"success": True,
|
|
"job_id": self.job_id,
|
|
"status": final_status,
|
|
"total_model_days": progress["total_model_days"],
|
|
"completed": progress["completed"],
|
|
"failed": progress["failed"]
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = f"Job execution failed: {str(e)}"
|
|
logger.error(f"Job {self.job_id}: {error_msg}", exc_info=True)
|
|
|
|
# Update job to failed
|
|
self.job_manager.update_job_status(self.job_id, "failed", error=error_msg)
|
|
|
|
return {
|
|
"success": False,
|
|
"job_id": self.job_id,
|
|
"error": error_msg
|
|
}
|
|
|
|
def _execute_date(self, date: str, models: List[str], config_path: str) -> None:
|
|
"""
|
|
Execute all models for a single date in parallel.
|
|
|
|
Args:
|
|
date: Trading date (YYYY-MM-DD)
|
|
models: List of model signatures to execute
|
|
config_path: Path to configuration file
|
|
|
|
Uses ThreadPoolExecutor to run all models concurrently for this date.
|
|
Waits for all models to complete before returning.
|
|
"""
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
|
# Submit all model executions for this date
|
|
futures = []
|
|
for model in models:
|
|
future = executor.submit(
|
|
self._execute_model_day,
|
|
date,
|
|
model,
|
|
config_path
|
|
)
|
|
futures.append(future)
|
|
|
|
# Wait for all to complete
|
|
for future in as_completed(futures):
|
|
try:
|
|
result = future.result()
|
|
if result["success"]:
|
|
logger.debug(f"Completed {result['model']} on {result['date']}")
|
|
else:
|
|
logger.warning(f"Failed {result['model']} on {result['date']}: {result.get('error')}")
|
|
except Exception as e:
|
|
logger.error(f"Exception in model execution: {e}", exc_info=True)
|
|
|
|
def _execute_model_day(self, date: str, model: str, config_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Execute a single model for a single date.
|
|
|
|
Args:
|
|
date: Trading date (YYYY-MM-DD)
|
|
model: Model signature
|
|
config_path: Path to configuration file
|
|
|
|
Returns:
|
|
Execution result dict
|
|
"""
|
|
try:
|
|
executor = ModelDayExecutor(
|
|
job_id=self.job_id,
|
|
date=date,
|
|
model_sig=model,
|
|
config_path=config_path,
|
|
db_path=self.db_path
|
|
)
|
|
|
|
result = executor.execute()
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to execute {model} on {date}: {e}", exc_info=True)
|
|
return {
|
|
"success": False,
|
|
"job_id": self.job_id,
|
|
"date": date,
|
|
"model": model,
|
|
"error": str(e)
|
|
}
|
|
|
|
def _download_price_data(
|
|
self,
|
|
price_manager,
|
|
missing_coverage: Dict[str, Set[str]],
|
|
requested_dates: List[str],
|
|
warnings: List[str]
|
|
) -> None:
|
|
"""Download missing price data with progress logging."""
|
|
logger.info(f"Job {self.job_id}: Starting prioritized download...")
|
|
|
|
requested_dates_set = set(requested_dates)
|
|
|
|
download_result = price_manager.download_missing_data_prioritized(
|
|
missing_coverage,
|
|
requested_dates_set
|
|
)
|
|
|
|
downloaded = len(download_result["downloaded"])
|
|
failed = len(download_result["failed"])
|
|
total = downloaded + failed
|
|
|
|
logger.info(
|
|
f"Job {self.job_id}: Download complete - "
|
|
f"{downloaded}/{total} symbols succeeded"
|
|
)
|
|
|
|
if download_result["rate_limited"]:
|
|
msg = f"Rate limit reached - downloaded {downloaded}/{total} symbols"
|
|
warnings.append(msg)
|
|
logger.warning(f"Job {self.job_id}: {msg}")
|
|
|
|
if failed > 0 and not download_result["rate_limited"]:
|
|
msg = f"{failed} symbols failed to download"
|
|
warnings.append(msg)
|
|
logger.warning(f"Job {self.job_id}: {msg}")
|
|
|
|
def _filter_completed_dates(
|
|
self,
|
|
available_dates: List[str],
|
|
models: List[str]
|
|
) -> List[str]:
|
|
"""
|
|
Filter out dates that are already completed for all models.
|
|
|
|
Implements idempotent job behavior - skip model-days that already
|
|
have completed data.
|
|
|
|
Args:
|
|
available_dates: List of dates with complete price data
|
|
models: List of model signatures
|
|
|
|
Returns:
|
|
List of dates that need processing
|
|
"""
|
|
if not available_dates:
|
|
return []
|
|
|
|
# Get completed dates from job_manager
|
|
start_date = available_dates[0]
|
|
end_date = available_dates[-1]
|
|
|
|
completed_dates = self.job_manager.get_completed_model_dates(
|
|
models,
|
|
start_date,
|
|
end_date
|
|
)
|
|
|
|
# Build list of dates that need processing
|
|
dates_to_process = []
|
|
for date in available_dates:
|
|
# Check if any model needs this date
|
|
needs_processing = False
|
|
for model in models:
|
|
if date not in completed_dates.get(model, []):
|
|
needs_processing = True
|
|
break
|
|
|
|
if needs_processing:
|
|
dates_to_process.append(date)
|
|
|
|
return dates_to_process
|
|
|
|
def _add_job_warnings(self, warnings: List[str]) -> None:
|
|
"""Store warnings in job metadata."""
|
|
self.job_manager.add_job_warnings(self.job_id, warnings)
|
|
|
|
def get_job_info(self) -> Dict[str, Any]:
|
|
"""
|
|
Get job information.
|
|
|
|
Returns:
|
|
Job data dict
|
|
"""
|
|
return self.job_manager.get_job(self.job_id)
|