mirror of
https://github.com/Xe138/AI-Trader.git
synced 2026-04-02 01:27:24 -04:00
Compare commits
1 Commits
v0.3.0-alp
...
v0.3.0-alp
| Author | SHA1 | Date | |
|---|---|---|---|
| e590cdc13b |
@@ -90,7 +90,7 @@ class SimulationWorker:
|
||||
logger.info(f"Starting job {self.job_id}: {len(date_range)} dates, {len(models)} models")
|
||||
|
||||
# NEW: Prepare price data (download if needed)
|
||||
available_dates, warnings = self._prepare_data(date_range, models, config_path)
|
||||
available_dates, warnings, completion_skips = self._prepare_data(date_range, models, config_path)
|
||||
|
||||
if not available_dates:
|
||||
error_msg = "No trading dates available after price data preparation"
|
||||
@@ -100,7 +100,7 @@ class SimulationWorker:
|
||||
# Execute available dates only
|
||||
for date in available_dates:
|
||||
logger.info(f"Processing date {date} with {len(models)} models")
|
||||
self._execute_date(date, models, config_path)
|
||||
self._execute_date(date, models, config_path, completion_skips)
|
||||
|
||||
# Job completed - determine final status
|
||||
progress = self.job_manager.get_job_progress(self.job_id)
|
||||
@@ -145,7 +145,8 @@ class SimulationWorker:
|
||||
"error": error_msg
|
||||
}
|
||||
|
||||
def _execute_date(self, date: str, models: List[str], config_path: str) -> None:
|
||||
def _execute_date(self, date: str, models: List[str], config_path: str,
|
||||
completion_skips: Dict[str, Set[str]] = None) -> None:
|
||||
"""
|
||||
Execute all models for a single date in parallel.
|
||||
|
||||
@@ -153,14 +154,24 @@ class SimulationWorker:
|
||||
date: Trading date (YYYY-MM-DD)
|
||||
models: List of model signatures to execute
|
||||
config_path: Path to configuration file
|
||||
completion_skips: {model: {dates}} of already-completed model-days to skip
|
||||
|
||||
Uses ThreadPoolExecutor to run all models concurrently for this date.
|
||||
Waits for all models to complete before returning.
|
||||
Skips models that have already completed this date.
|
||||
"""
|
||||
if completion_skips is None:
|
||||
completion_skips = {}
|
||||
|
||||
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
||||
# Submit all model executions for this date
|
||||
futures = []
|
||||
for model in models:
|
||||
# Skip if this model-day was already completed
|
||||
if date in completion_skips.get(model, set()):
|
||||
logger.debug(f"Skipping {model} on {date} (already completed)")
|
||||
continue
|
||||
|
||||
future = executor.submit(
|
||||
self._execute_model_day,
|
||||
date,
|
||||
@@ -397,7 +408,10 @@ class SimulationWorker:
|
||||
config_path: Path to configuration file
|
||||
|
||||
Returns:
|
||||
Tuple of (available_dates, warnings)
|
||||
Tuple of (available_dates, warnings, completion_skips)
|
||||
- available_dates: Dates to process
|
||||
- warnings: Warning messages
|
||||
- completion_skips: {model: {dates}} of already-completed model-days
|
||||
"""
|
||||
from api.price_data_manager import PriceDataManager
|
||||
|
||||
@@ -456,7 +470,7 @@ class SimulationWorker:
|
||||
self.job_manager.update_job_status(self.job_id, "running")
|
||||
logger.info(f"Job {self.job_id}: Starting execution - {len(dates_to_process)} dates, {len(models)} models")
|
||||
|
||||
return dates_to_process, warnings
|
||||
return dates_to_process, warnings, completion_skips
|
||||
|
||||
def get_job_info(self) -> Dict[str, Any]:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user