feat: implement replace_existing parameter to allow re-running completed simulations

Add skip_completed parameter to JobManager.create_job() to control duplicate detection:
- When skip_completed=True (default), skips already-completed simulations (existing behavior)
- When skip_completed=False, includes ALL requested simulations regardless of completion status

API endpoint now uses request.replace_existing to control skip_completed parameter:
- replace_existing=false (default): skip_completed=True (skip duplicates)
- replace_existing=true: skip_completed=False (force re-run all simulations)

This allows users to force re-running completed simulations when needed.
This commit is contained in:
2025-11-07 13:39:51 -05:00
parent e5b83839ad
commit db1341e204
3 changed files with 80 additions and 27 deletions

View File

@@ -55,7 +55,8 @@ class JobManager:
config_path: str,
date_range: List[str],
models: List[str],
model_day_filter: Optional[List[tuple]] = None
model_day_filter: Optional[List[tuple]] = None,
skip_completed: bool = True
) -> Dict[str, Any]:
"""
Create new simulation job.
@@ -66,6 +67,8 @@ class JobManager:
models: List of model signatures to execute
model_day_filter: Optional list of (model, date) tuples to limit job_details.
If None, creates job_details for all model-date combinations.
skip_completed: If True (default), skips already-completed simulations.
If False, includes all requested simulations regardless of completion status.
Returns:
Dict with:
@@ -73,7 +76,7 @@ class JobManager:
- warnings: List of warning messages for skipped simulations
Raises:
ValueError: If another job is already running/pending or if all simulations are already completed
ValueError: If another job is already running/pending or if all simulations are already completed (when skip_completed=True)
"""
if not self.can_start_new_job():
raise ValueError("Another simulation job is already running or pending")
@@ -91,36 +94,42 @@ class JobManager:
else:
pairs_to_check = [(model, date) for date in date_range for model in models]
# Check for already-completed simulations
# Check for already-completed simulations (only if skip_completed=True)
skipped_pairs = []
pending_pairs = []
for model, date in pairs_to_check:
cursor.execute("""
SELECT COUNT(*)
FROM job_details
WHERE model = ? AND date = ? AND status = 'completed'
""", (model, date))
if skip_completed:
# Perform duplicate checking
for model, date in pairs_to_check:
cursor.execute("""
SELECT COUNT(*)
FROM job_details
WHERE model = ? AND date = ? AND status = 'completed'
""", (model, date))
count = cursor.fetchone()[0]
count = cursor.fetchone()[0]
if count > 0:
skipped_pairs.append((model, date))
logger.info(f"Skipping {model}/{date} - already completed in previous job")
else:
pending_pairs.append((model, date))
if count > 0:
skipped_pairs.append((model, date))
logger.info(f"Skipping {model}/{date} - already completed in previous job")
else:
pending_pairs.append((model, date))
# If all simulations are already completed, raise error
if len(pending_pairs) == 0:
warnings = [
f"Skipped {model}/{date} - already completed"
for model, date in skipped_pairs
]
raise ValueError(
f"All requested simulations are already completed. "
f"Skipped {len(skipped_pairs)} model-day pair(s). "
f"Details: {warnings}"
)
# If all simulations are already completed, raise error
if len(pending_pairs) == 0:
warnings = [
f"Skipped {model}/{date} - already completed"
for model, date in skipped_pairs
]
raise ValueError(
f"All requested simulations are already completed. "
f"Skipped {len(skipped_pairs)} model-day pair(s). "
f"Details: {warnings}"
)
else:
# skip_completed=False: include ALL pairs (no duplicate checking)
pending_pairs = pairs_to_check
logger.info(f"Including all {len(pending_pairs)} model-day pairs (skip_completed=False)")
# Insert job
cursor.execute("""

View File

@@ -284,7 +284,8 @@ def create_app(
config_path=config_path,
date_range=all_dates,
models=models_to_run,
model_day_filter=None # Worker will filter based on available data
model_day_filter=None, # Worker will filter based on available data
skip_completed=(not request.replace_existing) # Skip if replace_existing=False
)
job_id = result["job_id"]
warnings = result.get("warnings", [])