diff --git a/api/database.py b/api/database.py index a64f943..b004a8b 100644 --- a/api/database.py +++ b/api/database.py @@ -105,7 +105,7 @@ def initialize_database(db_path: str = "data/jobs.db") -> None: job_id TEXT NOT NULL, date TEXT NOT NULL, model TEXT NOT NULL, - status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed')), + status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed', 'skipped')), started_at TEXT, completed_at TEXT, duration_seconds REAL, diff --git a/api/job_manager.py b/api/job_manager.py index 818e12c..ab9db05 100644 --- a/api/job_manager.py +++ b/api/job_manager.py @@ -420,14 +420,16 @@ class JobManager: SELECT COUNT(*) as total, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, - SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, + SUM(CASE WHEN status = 'skipped' THEN 1 ELSE 0 END) as skipped FROM job_details WHERE job_id = ? """, (job_id,)) - total, completed, failed = cursor.fetchone() + total, completed, failed, skipped = cursor.fetchone() - if completed + failed == total: + # Job is done when all details are in terminal states + if completed + failed + skipped == total: # All done - determine final status if failed == 0: final_status = "completed" @@ -519,12 +521,14 @@ class JobManager: SELECT COUNT(*) as total, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, - SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, + SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, + SUM(CASE WHEN status = 'skipped' THEN 1 ELSE 0 END) as skipped FROM job_details WHERE job_id = ? """, (job_id,)) - total, completed, failed = cursor.fetchone() + total, completed, failed, pending, skipped = cursor.fetchone() # Get currently running model-day cursor.execute(""" @@ -559,6 +563,8 @@ class JobManager: "total_model_days": total, "completed": completed or 0, "failed": failed or 0, + "pending": pending or 0, + "skipped": skipped or 0, "current": current, "details": details } diff --git a/api/simulation_worker.py b/api/simulation_worker.py index 54724fc..8b08624 100644 --- a/api/simulation_worker.py +++ b/api/simulation_worker.py @@ -296,6 +296,80 @@ class SimulationWorker: return dates_to_process + def _filter_completed_dates_with_tracking( + self, + available_dates: List[str], + models: List[str] + ) -> tuple: + """ + Filter already-completed dates per model with skip tracking. + + Args: + available_dates: Dates with complete price data + models: Model signatures + + Returns: + Tuple of (dates_to_process, completion_skips) + - dates_to_process: Union of all dates needed by any model + - completion_skips: {model: {dates_to_skip_for_this_model}} + """ + if not available_dates: + return [], {} + + # Get completed dates from job_details history + start_date = available_dates[0] + end_date = available_dates[-1] + completed_dates = self.job_manager.get_completed_model_dates( + models, start_date, end_date + ) + + completion_skips = {} + dates_needed_by_any_model = set() + + for model in models: + model_completed = set(completed_dates.get(model, [])) + model_skips = set(available_dates) & model_completed + completion_skips[model] = model_skips + + # Track dates this model still needs + dates_needed_by_any_model.update( + set(available_dates) - model_skips + ) + + return sorted(list(dates_needed_by_any_model)), completion_skips + + def _mark_skipped_dates( + self, + price_skips: Set[str], + completion_skips: Dict[str, Set[str]], + models: List[str] + ) -> None: + """ + Update job_details status for all skipped dates. + + Args: + price_skips: Dates without complete price data (affects all models) + completion_skips: {model: {dates}} already completed per model + models: All model signatures in job + """ + # Price skips affect ALL models equally + for date in price_skips: + for model in models: + self.job_manager.update_job_detail_status( + self.job_id, date, model, + "skipped", + error="Incomplete price data" + ) + + # Completion skips are per-model + for model, skipped_dates in completion_skips.items(): + for date in skipped_dates: + self.job_manager.update_job_detail_status( + self.job_id, date, model, + "skipped", + error="Already completed" + ) + def _add_job_warnings(self, warnings: List[str]) -> None: """Store warnings in job metadata.""" self.job_manager.add_job_warnings(self.job_id, warnings) @@ -351,20 +425,38 @@ class SimulationWorker: # Get available dates after download available_dates = price_manager.get_available_trading_dates(start_date, end_date) - # Warn about skipped dates - skipped = set(requested_dates) - set(available_dates) - if skipped: - warnings.append(f"Skipped {len(skipped)} dates due to incomplete price data: {sorted(list(skipped))}") + # Step 1: Track dates skipped due to incomplete price data + price_skips = set(requested_dates) - set(available_dates) + + # Step 2: Filter already-completed model-days and track skips per model + dates_to_process, completion_skips = self._filter_completed_dates_with_tracking( + available_dates, models + ) + + # Step 3: Update job_details status for all skipped dates + self._mark_skipped_dates(price_skips, completion_skips, models) + + # Step 4: Build warnings + if price_skips: + warnings.append( + f"Skipped {len(price_skips)} dates due to incomplete price data: " + f"{sorted(list(price_skips))}" + ) logger.warning(f"Job {self.job_id}: {warnings[-1]}") - # Filter already-completed model-days (idempotent behavior) - available_dates = self._filter_completed_dates(available_dates, models) + # Count total completion skips across all models + total_completion_skips = sum(len(dates) for dates in completion_skips.values()) + if total_completion_skips > 0: + warnings.append( + f"Skipped {total_completion_skips} model-days already completed" + ) + logger.warning(f"Job {self.job_id}: {warnings[-1]}") # Update to running self.job_manager.update_job_status(self.job_id, "running") - logger.info(f"Job {self.job_id}: Starting execution - {len(available_dates)} dates, {len(models)} models") + logger.info(f"Job {self.job_id}: Starting execution - {len(dates_to_process)} dates, {len(models)} models") - return available_dates, warnings + return dates_to_process, warnings def get_job_info(self) -> Dict[str, Any]: """