Compare commits

...

2 Commits

Author SHA1 Message Date
406bb281b2 fix: cleanup stale jobs on container restart to unblock new job creation
When a Docker container is shutdown and restarted, jobs with status
'pending', 'downloading_data', or 'running' remained in the database,
preventing new jobs from starting due to concurrency control checks.

This commit adds automatic cleanup of stale jobs during FastAPI startup:

- New cleanup_stale_jobs() method in JobManager (api/job_manager.py:702-779)
- Integrated into FastAPI lifespan startup (api/main.py:164-168)
- Intelligent status determination based on completion percentage:
  - 'partial' if any model-days completed (preserves progress data)
  - 'failed' if no progress made
- Detailed error messages with original status and completion counts
- Marks incomplete job_details as 'failed' with clear error messages
- Deployment-aware: skips cleanup in DEV mode when DB is reset
- Comprehensive logging at warning level for visibility

Testing:
- 6 new unit tests covering all cleanup scenarios (451-609)
- All 30 existing job_manager tests still pass
- Tests verify pending, running, downloading_data, partial progress,
  no stale jobs, and multiple stale jobs scenarios

Resolves issue where container restarts left stale jobs blocking the
can_start_new_job() concurrency check.
2025-11-06 21:24:45 -05:00
6ddc5abede fix: resolve DeepSeek tool_calls validation errors (production ready)
After extensive systematic debugging, identified and fixed LangChain bug
where parse_tool_call() returns string args instead of dict.

**Root Cause:**
LangChain's parse_tool_call() has intermittent bug returning unparsed
JSON string for 'args' field instead of dict object, violating AIMessage
Pydantic schema.

**Solution:**
ToolCallArgsParsingWrapper provides two-layer fix:
1. Patches parse_tool_call() to detect string args and parse to dict
2. Normalizes non-standard tool_call formats to OpenAI standard

**Implementation:**
- Patches parse_tool_call in langchain_openai.chat_models.base namespace
- Defensive approach: only acts when string args detected
- Handles edge cases: invalid JSON, non-standard formats, invalid_tool_calls
- Minimal performance impact: lightweight type checks
- Thread-safe: patches apply at wrapper initialization

**Testing:**
- Confirmed fix working in production with DeepSeek Chat v3.1
- All tool calls now process successfully without validation errors
- No impact on other AI providers (OpenAI, Anthropic, etc.)

**Impact:**
- Enables DeepSeek models via OpenRouter
- Maintains backward compatibility
- Future-proof against similar issues from other providers

Closes systematic debugging investigation that spanned 6 alpha releases.

Fixes: tool_calls.0.args validation error [type=dict_type, input_type=str]
2025-11-06 20:49:11 -05:00
5 changed files with 293 additions and 91 deletions

View File

@@ -8,11 +8,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Fixed
- **Critical:** Fixed stale jobs blocking new jobs after Docker container restart
- Root cause: Jobs with status 'pending', 'downloading_data', or 'running' remained in database after container shutdown, preventing new job creation
- Solution: Added `cleanup_stale_jobs()` method that runs on FastAPI startup to mark interrupted jobs as 'failed' or 'partial' based on completion percentage
- Intelligent status determination: Uses existing progress tracking (completed/total model-days) to distinguish between failed (0% complete) and partial (>0% complete)
- Detailed error messages include original status and completion counts (e.g., "Job interrupted by container restart (was running, 3/10 model-days completed)")
- Incomplete job_details automatically marked as 'failed' with clear error messages
- Deployment-aware: Skips cleanup in DEV mode when database is reset, always runs in PROD mode
- Comprehensive test coverage: 6 new unit tests covering all cleanup scenarios
- Locations: `api/job_manager.py:702-779`, `api/main.py:164-168`, `tests/unit/test_job_manager.py:451-609`
- Fixed Pydantic validation errors when using DeepSeek models via OpenRouter
- Root cause: DeepSeek returns tool_calls in non-standard format with `args` field directly, bypassing LangChain's `parse_tool_call()`
- Solution: Added `ToolCallArgsParsingWrapper` that normalizes non-standard tool_call format to OpenAI standard before LangChain processing
- Wrapper converts `{name, args, id}``{function: {name, arguments}, id}` format
- Includes diagnostic logging to identify format inconsistencies across providers
- Root cause: LangChain's `parse_tool_call()` has a bug where it sometimes returns `args` as JSON string instead of parsed dict object
- Solution: Added `ToolCallArgsParsingWrapper` that:
1. Patches `parse_tool_call()` to detect and fix string args by parsing them to dict
2. Normalizes non-standard tool_call formats (e.g., `{name, args, id}``{function: {name, arguments}, id}`)
- The wrapper is defensive and only acts when needed, ensuring compatibility with all AI providers
- Fixes validation error: `tool_calls.0.args: Input should be a valid dictionary [type=dict_type, input_value='...', input_type=str]`
## [0.4.1] - 2025-11-06

View File

@@ -37,21 +37,17 @@ class ToolCallArgsParsingWrapper:
original_parse_tool_call = langchain_base.parse_tool_call
def patched_parse_tool_call(raw_tool_call, *, partial=False, strict=False, return_id=True):
"""Patched parse_tool_call to fix string args bug and add logging"""
"""Patched parse_tool_call to fix string args bug"""
result = original_parse_tool_call(raw_tool_call, partial=partial, strict=strict, return_id=return_id)
if result:
args_type = type(result.get('args', None)).__name__
print(f"[DIAGNOSTIC] parse_tool_call returned: args type = {args_type}")
if args_type == 'str':
print(f"[DIAGNOSTIC] ⚠️ BUG FOUND! parse_tool_call returned STRING args, fixing...")
# FIX: parse_tool_call sometimes returns string args instead of dict
# This happens when it fails to parse but doesn't raise an exception
try:
result['args'] = json.loads(result['args'])
print(f"[DIAGNOSTIC] ✓ Fixed! Converted string args to dict")
except (json.JSONDecodeError, TypeError) as e:
print(f"[DIAGNOSTIC] ❌ Failed to parse args: {e}")
# Leave as string if we can't parse it
if result and isinstance(result.get('args'), str):
# FIX: parse_tool_call sometimes returns string args instead of dict
# This is a known LangChain bug - parse the string to dict
try:
result['args'] = json.loads(result['args'])
except (json.JSONDecodeError, TypeError):
# Leave as string if we can't parse it - will fail validation
# but at least we tried
pass
return result
# Replace in base.py's namespace (where _convert_dict_to_message uses it)
@@ -61,49 +57,10 @@ class ToolCallArgsParsingWrapper:
@wraps(original_create_chat_result)
def patched_create_chat_result(response: Any, generation_info: Optional[Dict] = None):
"""Patched version with diagnostic logging and args parsing"""
import traceback
"""Patched version that normalizes non-standard tool_call formats"""
response_dict = response if isinstance(response, dict) else response.model_dump()
# DIAGNOSTIC: Log response structure for debugging
print(f"\n[DIAGNOSTIC] _create_chat_result called")
print(f" Response type: {type(response)}")
print(f" Call stack:")
for line in traceback.format_stack()[-5:-1]: # Show last 4 stack frames
print(f" {line.strip()}")
print(f"\n[DIAGNOSTIC] Response structure:")
print(f" Response keys: {list(response_dict.keys())}")
if 'choices' in response_dict and response_dict['choices']:
choice = response_dict['choices'][0]
print(f" Choice keys: {list(choice.keys())}")
if 'message' in choice:
message = choice['message']
print(f" Message keys: {list(message.keys())}")
# Check for raw tool_calls in message (before parse_tool_call processing)
if 'tool_calls' in message:
tool_calls_value = message['tool_calls']
print(f" message['tool_calls'] type: {type(tool_calls_value)}")
if tool_calls_value:
print(f" tool_calls count: {len(tool_calls_value)}")
for i, tc in enumerate(tool_calls_value): # Show ALL
print(f" tool_calls[{i}] type: {type(tc)}")
print(f" tool_calls[{i}] keys: {list(tc.keys()) if isinstance(tc, dict) else 'N/A'}")
if isinstance(tc, dict):
if 'function' in tc:
print(f" function keys: {list(tc['function'].keys())}")
if 'arguments' in tc['function']:
args = tc['function']['arguments']
print(f" function.arguments type: {type(args).__name__}")
print(f" function.arguments value: {str(args)[:100]}")
if 'args' in tc:
print(f" ALSO HAS 'args' KEY: type={type(tc['args']).__name__}")
print(f" args value: {str(tc['args'])[:100]}")
# Fix tool_calls: Normalize to OpenAI format if needed
# Normalize tool_calls to OpenAI standard format if needed
if 'choices' in response_dict:
for choice in response_dict['choices']:
if 'message' not in choice:
@@ -111,13 +68,11 @@ class ToolCallArgsParsingWrapper:
message = choice['message']
# Fix tool_calls: Ensure standard OpenAI format
# Fix tool_calls: Convert non-standard {name, args, id} to {function: {name, arguments}, id}
if 'tool_calls' in message and message['tool_calls']:
print(f"[DIAGNOSTIC] Processing {len(message['tool_calls'])} tool_calls...")
for idx, tool_call in enumerate(message['tool_calls']):
for tool_call in message['tool_calls']:
# Check if this is non-standard format (has 'args' directly)
if 'args' in tool_call and 'function' not in tool_call:
print(f"[DIAGNOSTIC] tool_calls[{idx}] has non-standard format (direct args)")
# Convert to standard OpenAI format
args = tool_call['args']
tool_call['function'] = {
@@ -129,36 +84,19 @@ class ToolCallArgsParsingWrapper:
del tool_call['name']
if 'args' in tool_call:
del tool_call['args']
print(f"[DIAGNOSTIC] Converted tool_calls[{idx}] to standard OpenAI format")
# Fix invalid_tool_calls: dict args -> string
# Fix invalid_tool_calls: Ensure args is JSON string (not dict)
if 'invalid_tool_calls' in message and message['invalid_tool_calls']:
print(f"[DIAGNOSTIC] Checking invalid_tool_calls for dict-to-string conversion...")
for idx, invalid_call in enumerate(message['invalid_tool_calls']):
if 'args' in invalid_call:
args = invalid_call['args']
# Convert dict arguments to JSON string
if isinstance(args, dict):
try:
invalid_call['args'] = json.dumps(args)
print(f"[DIAGNOSTIC] Converted invalid_tool_calls[{idx}].args from dict to string")
except (TypeError, ValueError) as e:
print(f"[DIAGNOSTIC] Failed to serialize invalid_tool_calls[{idx}].args: {e}")
# Keep as-is if serialization fails
for invalid_call in message['invalid_tool_calls']:
if 'args' in invalid_call and isinstance(invalid_call['args'], dict):
try:
invalid_call['args'] = json.dumps(invalid_call['args'])
except (TypeError, ValueError):
# Keep as-is if serialization fails
pass
# Call original method with fixed response
print(f"[DIAGNOSTIC] Calling original_create_chat_result...")
result = original_create_chat_result(response_dict, generation_info)
print(f"[DIAGNOSTIC] original_create_chat_result returned successfully")
print(f"[DIAGNOSTIC] Result type: {type(result)}")
if hasattr(result, 'generations') and result.generations:
gen = result.generations[0]
if hasattr(gen, 'message') and hasattr(gen.message, 'tool_calls'):
print(f"[DIAGNOSTIC] Result has {len(gen.message.tool_calls)} tool_calls")
if gen.message.tool_calls:
tc = gen.message.tool_calls[0]
print(f"[DIAGNOSTIC] tool_calls[0]['args'] type in result: {type(tc['args'])}")
return result
# Call original method with normalized response
return original_create_chat_result(response_dict, generation_info)
# Replace the method
self.wrapped_model._create_chat_result = patched_create_chat_result

View File

@@ -699,6 +699,85 @@ class JobManager:
finally:
conn.close()
def cleanup_stale_jobs(self) -> Dict[str, int]:
"""
Clean up stale jobs from container restarts.
Marks jobs with status 'pending', 'downloading_data', or 'running' as
'failed' or 'partial' based on completion percentage.
Called on application startup to reset interrupted jobs.
Returns:
Dict with jobs_cleaned count and details
"""
conn = get_db_connection(self.db_path)
cursor = conn.cursor()
try:
# Find all stale jobs
cursor.execute("""
SELECT job_id, status
FROM jobs
WHERE status IN ('pending', 'downloading_data', 'running')
""")
stale_jobs = cursor.fetchall()
cleaned_count = 0
for job_id, original_status in stale_jobs:
# Get progress to determine if partially completed
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM job_details
WHERE job_id = ?
""", (job_id,))
total, completed, failed = cursor.fetchone()
completed = completed or 0
failed = failed or 0
# Determine final status based on completion
if completed > 0:
new_status = "partial"
error_msg = f"Job interrupted by container restart (was {original_status}, {completed}/{total} model-days completed)"
else:
new_status = "failed"
error_msg = f"Job interrupted by container restart (was {original_status}, no progress made)"
# Mark incomplete job_details as failed
cursor.execute("""
UPDATE job_details
SET status = 'failed', error = 'Container restarted before completion'
WHERE job_id = ? AND status IN ('pending', 'running')
""", (job_id,))
# Update job status
updated_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
UPDATE jobs
SET status = ?, error = ?, completed_at = ?, updated_at = ?
WHERE job_id = ?
""", (new_status, error_msg, updated_at, updated_at, job_id))
logger.warning(f"Cleaned up stale job {job_id}: {original_status}{new_status} ({completed}/{total} completed)")
cleaned_count += 1
conn.commit()
if cleaned_count > 0:
logger.warning(f"⚠️ Cleaned up {cleaned_count} stale job(s) from previous container session")
else:
logger.info("✅ No stale jobs found")
return {"jobs_cleaned": cleaned_count}
finally:
conn.close()
def cleanup_old_jobs(self, days: int = 30) -> Dict[str, int]:
"""
Delete jobs older than threshold.

View File

@@ -134,25 +134,39 @@ def create_app(
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize database on startup, cleanup on shutdown if needed"""
from tools.deployment_config import is_dev_mode, get_db_path
from tools.deployment_config import is_dev_mode, get_db_path, should_preserve_dev_data
from api.database import initialize_dev_database, initialize_database
# Startup - use closure to access db_path from create_app scope
logger.info("🚀 FastAPI application starting...")
logger.info("📊 Initializing database...")
should_cleanup_stale_jobs = False
if is_dev_mode():
# Initialize dev database (reset unless PRESERVE_DEV_DATA=true)
logger.info(" 🔧 DEV mode detected - initializing dev database")
dev_db_path = get_db_path(db_path)
initialize_dev_database(dev_db_path)
log_dev_mode_startup_warning()
# Only cleanup stale jobs if preserving dev data (otherwise DB is fresh)
if should_preserve_dev_data():
should_cleanup_stale_jobs = True
else:
# Ensure production database schema exists
logger.info(" 🏭 PROD mode - ensuring database schema exists")
initialize_database(db_path)
should_cleanup_stale_jobs = True
logger.info("✅ Database initialized")
# Clean up stale jobs from previous container session
if should_cleanup_stale_jobs:
logger.info("🧹 Checking for stale jobs from previous session...")
job_manager = JobManager(get_db_path(db_path) if is_dev_mode() else db_path)
job_manager.cleanup_stale_jobs()
logger.info("🌐 API server ready to accept requests")
yield

View File

@@ -448,4 +448,164 @@ class TestJobWarnings:
assert stored_warnings == warnings
@pytest.mark.unit
class TestStaleJobCleanup:
"""Test cleanup of stale jobs from container restarts."""
def test_cleanup_stale_pending_job(self, clean_db):
"""Should mark pending job as failed with no progress."""
from api.job_manager import JobManager
manager = JobManager(db_path=clean_db)
job_id = manager.create_job(
config_path="configs/test.json",
date_range=["2025-01-16", "2025-01-17"],
models=["gpt-5"]
)
# Job is pending - simulate container restart
result = manager.cleanup_stale_jobs()
assert result["jobs_cleaned"] == 1
job = manager.get_job(job_id)
assert job["status"] == "failed"
assert "container restart" in job["error"].lower()
assert "pending" in job["error"]
assert "no progress" in job["error"]
def test_cleanup_stale_running_job_with_partial_progress(self, clean_db):
"""Should mark running job as partial if some model-days completed."""
from api.job_manager import JobManager
manager = JobManager(db_path=clean_db)
job_id = manager.create_job(
config_path="configs/test.json",
date_range=["2025-01-16", "2025-01-17"],
models=["gpt-5"]
)
# Mark job as running and complete one model-day
manager.update_job_status(job_id, "running")
manager.update_job_detail_status(job_id, "2025-01-16", "gpt-5", "completed")
# Simulate container restart
result = manager.cleanup_stale_jobs()
assert result["jobs_cleaned"] == 1
job = manager.get_job(job_id)
assert job["status"] == "partial"
assert "container restart" in job["error"].lower()
assert "1/2" in job["error"] # 1 out of 2 model-days completed
def test_cleanup_stale_downloading_data_job(self, clean_db):
"""Should mark downloading_data job as failed."""
from api.job_manager import JobManager
manager = JobManager(db_path=clean_db)
job_id = manager.create_job(
config_path="configs/test.json",
date_range=["2025-01-16"],
models=["gpt-5"]
)
# Mark as downloading data
manager.update_job_status(job_id, "downloading_data")
# Simulate container restart
result = manager.cleanup_stale_jobs()
assert result["jobs_cleaned"] == 1
job = manager.get_job(job_id)
assert job["status"] == "failed"
assert "downloading_data" in job["error"]
def test_cleanup_marks_incomplete_job_details_as_failed(self, clean_db):
"""Should mark incomplete job_details as failed."""
from api.job_manager import JobManager
manager = JobManager(db_path=clean_db)
job_id = manager.create_job(
config_path="configs/test.json",
date_range=["2025-01-16", "2025-01-17"],
models=["gpt-5"]
)
# Mark job as running, one detail running, one pending
manager.update_job_status(job_id, "running")
manager.update_job_detail_status(job_id, "2025-01-16", "gpt-5", "running")
# Simulate container restart
manager.cleanup_stale_jobs()
# Check job_details were marked as failed
progress = manager.get_job_progress(job_id)
assert progress["failed"] == 2 # Both model-days marked failed
assert progress["pending"] == 0
details = manager.get_job_details(job_id)
for detail in details:
assert detail["status"] == "failed"
assert "container restarted" in detail["error"].lower()
def test_cleanup_no_stale_jobs(self, clean_db):
"""Should report 0 cleaned jobs when none are stale."""
from api.job_manager import JobManager
manager = JobManager(db_path=clean_db)
job_id = manager.create_job(
config_path="configs/test.json",
date_range=["2025-01-16"],
models=["gpt-5"]
)
# Complete the job
manager.update_job_detail_status(job_id, "2025-01-16", "gpt-5", "completed")
# Simulate container restart
result = manager.cleanup_stale_jobs()
assert result["jobs_cleaned"] == 0
job = manager.get_job(job_id)
assert job["status"] == "completed"
def test_cleanup_multiple_stale_jobs(self, clean_db):
"""Should clean up multiple stale jobs."""
from api.job_manager import JobManager
manager = JobManager(db_path=clean_db)
# Create first job
job1_id = manager.create_job(
config_path="configs/test.json",
date_range=["2025-01-16"],
models=["gpt-5"]
)
manager.update_job_status(job1_id, "running")
manager.update_job_status(job1_id, "completed")
# Create second job (pending)
job2_id = manager.create_job(
config_path="configs/test.json",
date_range=["2025-01-17"],
models=["gpt-5"]
)
# Create third job (running)
manager.update_job_status(job2_id, "completed")
job3_id = manager.create_job(
config_path="configs/test.json",
date_range=["2025-01-18"],
models=["gpt-5"]
)
manager.update_job_status(job3_id, "running")
# Simulate container restart
result = manager.cleanup_stale_jobs()
assert result["jobs_cleaned"] == 1 # Only job3 is running
assert manager.get_job(job1_id)["status"] == "completed"
assert manager.get_job(job2_id)["status"] == "completed"
assert manager.get_job(job3_id)["status"] == "failed"
# Coverage target: 95%+ for api/job_manager.py