From e590cdc13b1c7d39e76e5918598900920c3f00d9 Mon Sep 17 00:00:00 2001 From: Bill Date: Mon, 3 Nov 2025 00:03:57 -0500 Subject: [PATCH] fix: prevent already-completed simulations from re-running Previously, when re-running a job with some model-days already completed: - _prepare_data() marked them as "skipped" with error="Already completed" - But _execute_date() didn't check the skip list before launching executors - ModelDayExecutor would start, change status to "running", and never complete - Job would hang with status="running" and pending count > 0 Fixed by: - _prepare_data() now returns completion_skips: {model: {dates}} - _execute_date() receives completion_skips and filters out already-completed models - Skipped model-days are not submitted to ThreadPoolExecutor - Job completes correctly, skipped model-days remain with status="skipped" This ensures idempotent job behavior - re-running a job only executes model-days that haven't completed yet. Fixes #73 --- api/simulation_worker.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/api/simulation_worker.py b/api/simulation_worker.py index 8b08624..cfed550 100644 --- a/api/simulation_worker.py +++ b/api/simulation_worker.py @@ -90,7 +90,7 @@ class SimulationWorker: logger.info(f"Starting job {self.job_id}: {len(date_range)} dates, {len(models)} models") # NEW: Prepare price data (download if needed) - available_dates, warnings = self._prepare_data(date_range, models, config_path) + available_dates, warnings, completion_skips = self._prepare_data(date_range, models, config_path) if not available_dates: error_msg = "No trading dates available after price data preparation" @@ -100,7 +100,7 @@ class SimulationWorker: # Execute available dates only for date in available_dates: logger.info(f"Processing date {date} with {len(models)} models") - self._execute_date(date, models, config_path) + self._execute_date(date, models, config_path, completion_skips) # Job completed - determine final status progress = self.job_manager.get_job_progress(self.job_id) @@ -145,7 +145,8 @@ class SimulationWorker: "error": error_msg } - def _execute_date(self, date: str, models: List[str], config_path: str) -> None: + def _execute_date(self, date: str, models: List[str], config_path: str, + completion_skips: Dict[str, Set[str]] = None) -> None: """ Execute all models for a single date in parallel. @@ -153,14 +154,24 @@ class SimulationWorker: date: Trading date (YYYY-MM-DD) models: List of model signatures to execute config_path: Path to configuration file + completion_skips: {model: {dates}} of already-completed model-days to skip Uses ThreadPoolExecutor to run all models concurrently for this date. Waits for all models to complete before returning. + Skips models that have already completed this date. """ + if completion_skips is None: + completion_skips = {} + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit all model executions for this date futures = [] for model in models: + # Skip if this model-day was already completed + if date in completion_skips.get(model, set()): + logger.debug(f"Skipping {model} on {date} (already completed)") + continue + future = executor.submit( self._execute_model_day, date, @@ -397,7 +408,10 @@ class SimulationWorker: config_path: Path to configuration file Returns: - Tuple of (available_dates, warnings) + Tuple of (available_dates, warnings, completion_skips) + - available_dates: Dates to process + - warnings: Warning messages + - completion_skips: {model: {dates}} of already-completed model-days """ from api.price_data_manager import PriceDataManager @@ -456,7 +470,7 @@ class SimulationWorker: self.job_manager.update_job_status(self.job_id, "running") logger.info(f"Job {self.job_id}: Starting execution - {len(dates_to_process)} dates, {len(models)} models") - return dates_to_process, warnings + return dates_to_process, warnings, completion_skips def get_job_info(self) -> Dict[str, Any]: """