From 406bb281b2dad1e53f865f1e8afc72eb522cb16b Mon Sep 17 00:00:00 2001 From: Bill Date: Thu, 6 Nov 2025 21:24:45 -0500 Subject: [PATCH] fix: cleanup stale jobs on container restart to unblock new job creation When a Docker container is shutdown and restarted, jobs with status 'pending', 'downloading_data', or 'running' remained in the database, preventing new jobs from starting due to concurrency control checks. This commit adds automatic cleanup of stale jobs during FastAPI startup: - New cleanup_stale_jobs() method in JobManager (api/job_manager.py:702-779) - Integrated into FastAPI lifespan startup (api/main.py:164-168) - Intelligent status determination based on completion percentage: - 'partial' if any model-days completed (preserves progress data) - 'failed' if no progress made - Detailed error messages with original status and completion counts - Marks incomplete job_details as 'failed' with clear error messages - Deployment-aware: skips cleanup in DEV mode when DB is reset - Comprehensive logging at warning level for visibility Testing: - 6 new unit tests covering all cleanup scenarios (451-609) - All 30 existing job_manager tests still pass - Tests verify pending, running, downloading_data, partial progress, no stale jobs, and multiple stale jobs scenarios Resolves issue where container restarts left stale jobs blocking the can_start_new_job() concurrency check. --- CHANGELOG.md | 21 +++-- api/job_manager.py | 79 ++++++++++++++++ api/main.py | 16 +++- tests/unit/test_job_manager.py | 160 +++++++++++++++++++++++++++++++++ 4 files changed, 269 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 242e0c6..fd9ee89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,13 +8,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Fixed +- **Critical:** Fixed stale jobs blocking new jobs after Docker container restart + - Root cause: Jobs with status 'pending', 'downloading_data', or 'running' remained in database after container shutdown, preventing new job creation + - Solution: Added `cleanup_stale_jobs()` method that runs on FastAPI startup to mark interrupted jobs as 'failed' or 'partial' based on completion percentage + - Intelligent status determination: Uses existing progress tracking (completed/total model-days) to distinguish between failed (0% complete) and partial (>0% complete) + - Detailed error messages include original status and completion counts (e.g., "Job interrupted by container restart (was running, 3/10 model-days completed)") + - Incomplete job_details automatically marked as 'failed' with clear error messages + - Deployment-aware: Skips cleanup in DEV mode when database is reset, always runs in PROD mode + - Comprehensive test coverage: 6 new unit tests covering all cleanup scenarios + - Locations: `api/job_manager.py:702-779`, `api/main.py:164-168`, `tests/unit/test_job_manager.py:451-609` - Fixed Pydantic validation errors when using DeepSeek models via OpenRouter -- Root cause: LangChain's `parse_tool_call()` has a bug where it sometimes returns `args` as JSON string instead of parsed dict object -- Solution: Added `ToolCallArgsParsingWrapper` that: - 1. Patches `parse_tool_call()` to detect and fix string args by parsing them to dict - 2. Normalizes non-standard tool_call formats (e.g., `{name, args, id}` → `{function: {name, arguments}, id}`) -- The wrapper is defensive and only acts when needed, ensuring compatibility with all AI providers -- Fixes validation error: `tool_calls.0.args: Input should be a valid dictionary [type=dict_type, input_value='...', input_type=str]` + - Root cause: LangChain's `parse_tool_call()` has a bug where it sometimes returns `args` as JSON string instead of parsed dict object + - Solution: Added `ToolCallArgsParsingWrapper` that: + 1. Patches `parse_tool_call()` to detect and fix string args by parsing them to dict + 2. Normalizes non-standard tool_call formats (e.g., `{name, args, id}` → `{function: {name, arguments}, id}`) + - The wrapper is defensive and only acts when needed, ensuring compatibility with all AI providers + - Fixes validation error: `tool_calls.0.args: Input should be a valid dictionary [type=dict_type, input_value='...', input_type=str]` ## [0.4.1] - 2025-11-06 diff --git a/api/job_manager.py b/api/job_manager.py index e403a48..5eb7142 100644 --- a/api/job_manager.py +++ b/api/job_manager.py @@ -699,6 +699,85 @@ class JobManager: finally: conn.close() + def cleanup_stale_jobs(self) -> Dict[str, int]: + """ + Clean up stale jobs from container restarts. + + Marks jobs with status 'pending', 'downloading_data', or 'running' as + 'failed' or 'partial' based on completion percentage. + + Called on application startup to reset interrupted jobs. + + Returns: + Dict with jobs_cleaned count and details + """ + conn = get_db_connection(self.db_path) + cursor = conn.cursor() + + try: + # Find all stale jobs + cursor.execute(""" + SELECT job_id, status + FROM jobs + WHERE status IN ('pending', 'downloading_data', 'running') + """) + + stale_jobs = cursor.fetchall() + cleaned_count = 0 + + for job_id, original_status in stale_jobs: + # Get progress to determine if partially completed + cursor.execute(""" + SELECT + COUNT(*) as total, + SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed + FROM job_details + WHERE job_id = ? + """, (job_id,)) + + total, completed, failed = cursor.fetchone() + completed = completed or 0 + failed = failed or 0 + + # Determine final status based on completion + if completed > 0: + new_status = "partial" + error_msg = f"Job interrupted by container restart (was {original_status}, {completed}/{total} model-days completed)" + else: + new_status = "failed" + error_msg = f"Job interrupted by container restart (was {original_status}, no progress made)" + + # Mark incomplete job_details as failed + cursor.execute(""" + UPDATE job_details + SET status = 'failed', error = 'Container restarted before completion' + WHERE job_id = ? AND status IN ('pending', 'running') + """, (job_id,)) + + # Update job status + updated_at = datetime.utcnow().isoformat() + "Z" + cursor.execute(""" + UPDATE jobs + SET status = ?, error = ?, completed_at = ?, updated_at = ? + WHERE job_id = ? + """, (new_status, error_msg, updated_at, updated_at, job_id)) + + logger.warning(f"Cleaned up stale job {job_id}: {original_status} → {new_status} ({completed}/{total} completed)") + cleaned_count += 1 + + conn.commit() + + if cleaned_count > 0: + logger.warning(f"⚠️ Cleaned up {cleaned_count} stale job(s) from previous container session") + else: + logger.info("✅ No stale jobs found") + + return {"jobs_cleaned": cleaned_count} + + finally: + conn.close() + def cleanup_old_jobs(self, days: int = 30) -> Dict[str, int]: """ Delete jobs older than threshold. diff --git a/api/main.py b/api/main.py index 134e2d0..61a1038 100644 --- a/api/main.py +++ b/api/main.py @@ -134,25 +134,39 @@ def create_app( @asynccontextmanager async def lifespan(app: FastAPI): """Initialize database on startup, cleanup on shutdown if needed""" - from tools.deployment_config import is_dev_mode, get_db_path + from tools.deployment_config import is_dev_mode, get_db_path, should_preserve_dev_data from api.database import initialize_dev_database, initialize_database # Startup - use closure to access db_path from create_app scope logger.info("🚀 FastAPI application starting...") logger.info("📊 Initializing database...") + should_cleanup_stale_jobs = False + if is_dev_mode(): # Initialize dev database (reset unless PRESERVE_DEV_DATA=true) logger.info(" 🔧 DEV mode detected - initializing dev database") dev_db_path = get_db_path(db_path) initialize_dev_database(dev_db_path) log_dev_mode_startup_warning() + + # Only cleanup stale jobs if preserving dev data (otherwise DB is fresh) + if should_preserve_dev_data(): + should_cleanup_stale_jobs = True else: # Ensure production database schema exists logger.info(" 🏭 PROD mode - ensuring database schema exists") initialize_database(db_path) + should_cleanup_stale_jobs = True logger.info("✅ Database initialized") + + # Clean up stale jobs from previous container session + if should_cleanup_stale_jobs: + logger.info("🧹 Checking for stale jobs from previous session...") + job_manager = JobManager(get_db_path(db_path) if is_dev_mode() else db_path) + job_manager.cleanup_stale_jobs() + logger.info("🌐 API server ready to accept requests") yield diff --git a/tests/unit/test_job_manager.py b/tests/unit/test_job_manager.py index 96ab0c4..d721490 100644 --- a/tests/unit/test_job_manager.py +++ b/tests/unit/test_job_manager.py @@ -448,4 +448,164 @@ class TestJobWarnings: assert stored_warnings == warnings +@pytest.mark.unit +class TestStaleJobCleanup: + """Test cleanup of stale jobs from container restarts.""" + + def test_cleanup_stale_pending_job(self, clean_db): + """Should mark pending job as failed with no progress.""" + from api.job_manager import JobManager + + manager = JobManager(db_path=clean_db) + job_id = manager.create_job( + config_path="configs/test.json", + date_range=["2025-01-16", "2025-01-17"], + models=["gpt-5"] + ) + + # Job is pending - simulate container restart + result = manager.cleanup_stale_jobs() + + assert result["jobs_cleaned"] == 1 + job = manager.get_job(job_id) + assert job["status"] == "failed" + assert "container restart" in job["error"].lower() + assert "pending" in job["error"] + assert "no progress" in job["error"] + + def test_cleanup_stale_running_job_with_partial_progress(self, clean_db): + """Should mark running job as partial if some model-days completed.""" + from api.job_manager import JobManager + + manager = JobManager(db_path=clean_db) + job_id = manager.create_job( + config_path="configs/test.json", + date_range=["2025-01-16", "2025-01-17"], + models=["gpt-5"] + ) + + # Mark job as running and complete one model-day + manager.update_job_status(job_id, "running") + manager.update_job_detail_status(job_id, "2025-01-16", "gpt-5", "completed") + + # Simulate container restart + result = manager.cleanup_stale_jobs() + + assert result["jobs_cleaned"] == 1 + job = manager.get_job(job_id) + assert job["status"] == "partial" + assert "container restart" in job["error"].lower() + assert "1/2" in job["error"] # 1 out of 2 model-days completed + + def test_cleanup_stale_downloading_data_job(self, clean_db): + """Should mark downloading_data job as failed.""" + from api.job_manager import JobManager + + manager = JobManager(db_path=clean_db) + job_id = manager.create_job( + config_path="configs/test.json", + date_range=["2025-01-16"], + models=["gpt-5"] + ) + + # Mark as downloading data + manager.update_job_status(job_id, "downloading_data") + + # Simulate container restart + result = manager.cleanup_stale_jobs() + + assert result["jobs_cleaned"] == 1 + job = manager.get_job(job_id) + assert job["status"] == "failed" + assert "downloading_data" in job["error"] + + def test_cleanup_marks_incomplete_job_details_as_failed(self, clean_db): + """Should mark incomplete job_details as failed.""" + from api.job_manager import JobManager + + manager = JobManager(db_path=clean_db) + job_id = manager.create_job( + config_path="configs/test.json", + date_range=["2025-01-16", "2025-01-17"], + models=["gpt-5"] + ) + + # Mark job as running, one detail running, one pending + manager.update_job_status(job_id, "running") + manager.update_job_detail_status(job_id, "2025-01-16", "gpt-5", "running") + + # Simulate container restart + manager.cleanup_stale_jobs() + + # Check job_details were marked as failed + progress = manager.get_job_progress(job_id) + assert progress["failed"] == 2 # Both model-days marked failed + assert progress["pending"] == 0 + + details = manager.get_job_details(job_id) + for detail in details: + assert detail["status"] == "failed" + assert "container restarted" in detail["error"].lower() + + def test_cleanup_no_stale_jobs(self, clean_db): + """Should report 0 cleaned jobs when none are stale.""" + from api.job_manager import JobManager + + manager = JobManager(db_path=clean_db) + job_id = manager.create_job( + config_path="configs/test.json", + date_range=["2025-01-16"], + models=["gpt-5"] + ) + + # Complete the job + manager.update_job_detail_status(job_id, "2025-01-16", "gpt-5", "completed") + + # Simulate container restart + result = manager.cleanup_stale_jobs() + + assert result["jobs_cleaned"] == 0 + job = manager.get_job(job_id) + assert job["status"] == "completed" + + def test_cleanup_multiple_stale_jobs(self, clean_db): + """Should clean up multiple stale jobs.""" + from api.job_manager import JobManager + + manager = JobManager(db_path=clean_db) + + # Create first job + job1_id = manager.create_job( + config_path="configs/test.json", + date_range=["2025-01-16"], + models=["gpt-5"] + ) + manager.update_job_status(job1_id, "running") + manager.update_job_status(job1_id, "completed") + + # Create second job (pending) + job2_id = manager.create_job( + config_path="configs/test.json", + date_range=["2025-01-17"], + models=["gpt-5"] + ) + + # Create third job (running) + manager.update_job_status(job2_id, "completed") + job3_id = manager.create_job( + config_path="configs/test.json", + date_range=["2025-01-18"], + models=["gpt-5"] + ) + manager.update_job_status(job3_id, "running") + + # Simulate container restart + result = manager.cleanup_stale_jobs() + + assert result["jobs_cleaned"] == 1 # Only job3 is running + assert manager.get_job(job1_id)["status"] == "completed" + assert manager.get_job(job2_id)["status"] == "completed" + assert manager.get_job(job3_id)["status"] == "failed" + + # Coverage target: 95%+ for api/job_manager.py