Files
AI-Trader/docs/job-manager-specification.md
Bill fb9583b374 feat: transform to REST API service with SQLite persistence (v0.3.0)
Major architecture transformation from batch-only to API service with
database persistence for Windmill integration.

## REST API Implementation
- POST /simulate/trigger - Start simulation jobs
- GET /simulate/status/{job_id} - Monitor job progress
- GET /results - Query results with filters (job_id, date, model)
- GET /health - Service health checks

## Database Layer
- SQLite persistence with 6 tables (jobs, job_details, positions,
  holdings, reasoning_logs, tool_usage)
- Foreign key constraints with cascade deletes
- Replaces JSONL file storage

## Backend Components
- JobManager: Job lifecycle management with concurrency control
- RuntimeConfigManager: Thread-safe isolated runtime configs
- ModelDayExecutor: Single model-day execution engine
- SimulationWorker: Date-sequential, model-parallel orchestration

## Testing
- 102 unit and integration tests (85% coverage)
- Database: 98% coverage
- Job manager: 98% coverage
- API endpoints: 81% coverage
- Pydantic models: 100% coverage
- TDD approach throughout

## Docker Deployment
- Dual-mode: API server (persistent) + batch (one-time)
- Health checks with 30s interval
- Volume persistence for database and logs
- Separate entrypoints for each mode

## Validation Tools
- scripts/validate_docker_build.sh - Build validation
- scripts/test_api_endpoints.sh - Complete API testing
- scripts/test_batch_mode.sh - Batch mode validation
- DOCKER_API.md - Deployment guide
- TESTING_GUIDE.md - Testing procedures

## Configuration
- API_PORT environment variable (default: 8080)
- Backwards compatible with existing configs
- FastAPI, uvicorn, pydantic>=2.0 dependencies

Co-Authored-By: AI Assistant <noreply@example.com>
2025-10-31 11:47:10 -04:00

29 KiB
Raw Blame History

Job Manager & Database Specification

1. Overview

The Job Manager is responsible for:

  1. Job lifecycle management - Creating, tracking, updating job status
  2. Database operations - SQLite CRUD operations for jobs and job_details
  3. Concurrency control - Ensuring only one simulation runs at a time
  4. State persistence - Maintaining job state across API restarts

2. Database Schema

2.1 SQLite Database Location

data/jobs.db

Rationale: Co-located with simulation data for easy volume mounting

2.2 Table: jobs

Purpose: Track high-level job metadata and status

CREATE TABLE IF NOT EXISTS jobs (
    job_id TEXT PRIMARY KEY,
    config_path TEXT NOT NULL,
    status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'partial', 'failed')),
    date_range TEXT NOT NULL,  -- JSON array: ["2025-01-16", "2025-01-17"]
    models TEXT NOT NULL,      -- JSON array: ["claude-3.7-sonnet", "gpt-5"]
    created_at TEXT NOT NULL,  -- ISO 8601: "2025-01-20T14:30:00Z"
    started_at TEXT,           -- When first model-day started
    completed_at TEXT,         -- When last model-day finished
    total_duration_seconds REAL,
    error TEXT                 -- Top-level error message if job failed
);

-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at DESC);

Field Details:

  • job_id: UUID v4 (e.g., 550e8400-e29b-41d4-a716-446655440000)
  • status: Current job state
    • pending: Job created, not started yet
    • running: At least one model-day is executing
    • completed: All model-days succeeded
    • partial: Some model-days succeeded, some failed
    • failed: All model-days failed (rare edge case)
  • date_range: JSON string for easy querying
  • models: JSON string of enabled model signatures

2.3 Table: job_details

Purpose: Track individual model-day execution status

CREATE TABLE IF NOT EXISTS job_details (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT NOT NULL,
    date TEXT NOT NULL,        -- "2025-01-16"
    model TEXT NOT NULL,       -- "gpt-5"
    status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed')),
    started_at TEXT,
    completed_at TEXT,
    duration_seconds REAL,
    error TEXT,                -- Error message if this model-day failed
    FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE
);

-- Indexes
CREATE INDEX IF NOT EXISTS idx_job_details_job_id ON job_details(job_id);
CREATE INDEX IF NOT EXISTS idx_job_details_status ON job_details(status);
CREATE UNIQUE INDEX IF NOT EXISTS idx_job_details_unique ON job_details(job_id, date, model);

Field Details:

  • Each row represents one model-day (e.g., gpt-5 on 2025-01-16)
  • UNIQUE INDEX prevents duplicate execution entries
  • ON DELETE CASCADE ensures orphaned records are cleaned up

2.4 Example Data

jobs table:

job_id                                | config_path              | status    | date_range                        | models                          | created_at           | started_at           | completed_at         | total_duration_seconds
--------------------------------------|--------------------------|-----------|-----------------------------------|---------------------------------|----------------------|----------------------|----------------------|----------------------
550e8400-e29b-41d4-a716-446655440000 | configs/default_config.json | completed | ["2025-01-16","2025-01-17"]     | ["gpt-5","claude-3.7-sonnet"]  | 2025-01-20T14:25:00Z | 2025-01-20T14:25:10Z | 2025-01-20T14:29:45Z | 275.3

job_details table:

id | job_id                               | date       | model              | status    | started_at           | completed_at         | duration_seconds | error
---|--------------------------------------|------------|--------------------|-----------|----------------------|----------------------|------------------|------
1  | 550e8400-e29b-41d4-a716-446655440000 | 2025-01-16 | gpt-5              | completed | 2025-01-20T14:25:10Z | 2025-01-20T14:25:48Z | 38.2             | NULL
2  | 550e8400-e29b-41d4-a716-446655440000 | 2025-01-16 | claude-3.7-sonnet  | completed | 2025-01-20T14:25:10Z | 2025-01-20T14:25:55Z | 45.1             | NULL
3  | 550e8400-e29b-41d4-a716-446655440000 | 2025-01-17 | gpt-5              | completed | 2025-01-20T14:25:56Z | 2025-01-20T14:26:36Z | 40.0             | NULL
4  | 550e8400-e29b-41d4-a716-446655440000 | 2025-01-17 | claude-3.7-sonnet  | completed | 2025-01-20T14:25:56Z | 2025-01-20T14:26:42Z | 46.5             | NULL

3. Job Manager Class

3.1 File Structure

api/
├── job_manager.py      # Core JobManager class
├── database.py         # SQLite connection and utilities
└── models.py           # Pydantic models

3.2 JobManager Interface

# api/job_manager.py

from datetime import datetime
from typing import Optional, List, Dict, Tuple
import uuid
import json
from api.database import get_db_connection

class JobManager:
    """Manages simulation job lifecycle and database operations"""

    def __init__(self, db_path: str = "data/jobs.db"):
        self.db_path = db_path
        self._initialize_database()

    def _initialize_database(self) -> None:
        """Create tables if they don't exist"""
        conn = get_db_connection(self.db_path)
        # Execute CREATE TABLE statements from section 2.2 and 2.3
        conn.close()

    # ========== Job Creation ==========

    def create_job(
        self,
        config_path: str,
        date_range: List[str],
        models: List[str]
    ) -> str:
        """
        Create a new simulation job.

        Args:
            config_path: Path to config file
            date_range: List of trading dates to simulate
            models: List of model signatures to run

        Returns:
            job_id: UUID of created job

        Raises:
            ValueError: If another job is already running
        """
        # 1. Check if any jobs are currently running
        if not self.can_start_new_job():
            raise ValueError("Another simulation job is already running")

        # 2. Generate job ID
        job_id = str(uuid.uuid4())

        # 3. Create job record
        conn = get_db_connection(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            INSERT INTO jobs (
                job_id, config_path, status, date_range, models, created_at
            ) VALUES (?, ?, ?, ?, ?, ?)
        """, (
            job_id,
            config_path,
            "pending",
            json.dumps(date_range),
            json.dumps(models),
            datetime.utcnow().isoformat() + "Z"
        ))

        # 4. Create job_details records for each model-day
        for date in date_range:
            for model in models:
                cursor.execute("""
                    INSERT INTO job_details (
                        job_id, date, model, status
                    ) VALUES (?, ?, ?, ?)
                """, (job_id, date, model, "pending"))

        conn.commit()
        conn.close()

        return job_id

    # ========== Job Retrieval ==========

    def get_job(self, job_id: str) -> Optional[Dict]:
        """
        Get job metadata by ID.

        Returns:
            Job dict with keys: job_id, config_path, status, date_range (list),
            models (list), created_at, started_at, completed_at, total_duration_seconds

            Returns None if job not found.
        """
        conn = get_db_connection(self.db_path)
        cursor = conn.cursor()

        cursor.execute("SELECT * FROM jobs WHERE job_id = ?", (job_id,))
        row = cursor.fetchone()
        conn.close()

        if row is None:
            return None

        return {
            "job_id": row[0],
            "config_path": row[1],
            "status": row[2],
            "date_range": json.loads(row[3]),
            "models": json.loads(row[4]),
            "created_at": row[5],
            "started_at": row[6],
            "completed_at": row[7],
            "total_duration_seconds": row[8],
            "error": row[9]
        }

    def get_current_job(self) -> Optional[Dict]:
        """Get most recent job (for /simulate/current endpoint)"""
        conn = get_db_connection(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            SELECT * FROM jobs
            ORDER BY created_at DESC
            LIMIT 1
        """)
        row = cursor.fetchone()
        conn.close()

        if row is None:
            return None

        return self._row_to_job_dict(row)

    def get_running_jobs(self) -> List[Dict]:
        """Get all running or pending jobs"""
        conn = get_db_connection(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            SELECT * FROM jobs
            WHERE status IN ('pending', 'running')
            ORDER BY created_at DESC
        """)
        rows = cursor.fetchall()
        conn.close()

        return [self._row_to_job_dict(row) for row in rows]

    # ========== Job Status Updates ==========

    def update_job_status(
        self,
        job_id: str,
        status: str,
        error: Optional[str] = None
    ) -> None:
        """Update job status (pending → running → completed/partial/failed)"""
        conn = get_db_connection(self.db_path)
        cursor = conn.cursor()

        updates = {"status": status}

        if status == "running" and self.get_job(job_id)["status"] == "pending":
            updates["started_at"] = datetime.utcnow().isoformat() + "Z"

        if status in ("completed", "partial", "failed"):
            updates["completed_at"] = datetime.utcnow().isoformat() + "Z"
            # Calculate total duration
            job = self.get_job(job_id)
            if job["started_at"]:
                started = datetime.fromisoformat(job["started_at"].replace("Z", ""))
                completed = datetime.utcnow()
                updates["total_duration_seconds"] = (completed - started).total_seconds()

        if error:
            updates["error"] = error

        # Build dynamic UPDATE query
        set_clause = ", ".join([f"{k} = ?" for k in updates.keys()])
        values = list(updates.values()) + [job_id]

        cursor.execute(f"""
            UPDATE jobs
            SET {set_clause}
            WHERE job_id = ?
        """, values)

        conn.commit()
        conn.close()

    def update_job_detail_status(
        self,
        job_id: str,
        date: str,
        model: str,
        status: str,
        error: Optional[str] = None
    ) -> None:
        """Update individual model-day status"""
        conn = get_db_connection(self.db_path)
        cursor = conn.cursor()

        updates = {"status": status}

        # Get current detail status to determine if this is a status transition
        cursor.execute("""
            SELECT status, started_at FROM job_details
            WHERE job_id = ? AND date = ? AND model = ?
        """, (job_id, date, model))
        row = cursor.fetchone()

        if row:
            current_status = row[0]

            if status == "running" and current_status == "pending":
                updates["started_at"] = datetime.utcnow().isoformat() + "Z"

            if status in ("completed", "failed"):
                updates["completed_at"] = datetime.utcnow().isoformat() + "Z"
                # Calculate duration if started_at exists
                if row[1]:  # started_at
                    started = datetime.fromisoformat(row[1].replace("Z", ""))
                    completed = datetime.utcnow()
                    updates["duration_seconds"] = (completed - started).total_seconds()

        if error:
            updates["error"] = error

        # Build UPDATE query
        set_clause = ", ".join([f"{k} = ?" for k in updates.keys()])
        values = list(updates.values()) + [job_id, date, model]

        cursor.execute(f"""
            UPDATE job_details
            SET {set_clause}
            WHERE job_id = ? AND date = ? AND model = ?
        """, values)

        conn.commit()
        conn.close()

        # After updating detail, check if overall job status needs update
        self._update_job_status_from_details(job_id)

    def _update_job_status_from_details(self, job_id: str) -> None:
        """
        Recalculate job status based on job_details statuses.

        Logic:
        - If any detail is 'running' → job is 'running'
        - If all details are 'completed' → job is 'completed'
        - If some details are 'completed' and some 'failed' → job is 'partial'
        - If all details are 'failed' → job is 'failed'
        - If all details are 'pending' → job is 'pending'
        """
        conn = get_db_connection(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            SELECT status, COUNT(*)
            FROM job_details
            WHERE job_id = ?
            GROUP BY status
        """, (job_id,))

        status_counts = {row[0]: row[1] for row in cursor.fetchall()}
        conn.close()

        # Determine overall job status
        if status_counts.get("running", 0) > 0:
            new_status = "running"
        elif status_counts.get("pending", 0) > 0:
            # Some details still pending, job is either pending or running
            current_job = self.get_job(job_id)
            new_status = current_job["status"]  # Keep current status
        elif status_counts.get("failed", 0) > 0 and status_counts.get("completed", 0) > 0:
            new_status = "partial"
        elif status_counts.get("failed", 0) > 0:
            new_status = "failed"
        else:
            new_status = "completed"

        self.update_job_status(job_id, new_status)

    # ========== Job Progress ==========

    def get_job_progress(self, job_id: str) -> Dict:
        """
        Get detailed progress for a job.

        Returns:
            {
                "total_model_days": int,
                "completed": int,
                "failed": int,
                "current": {"date": str, "model": str} | None,
                "details": [
                    {"date": str, "model": str, "status": str, "duration_seconds": float | None, "error": str | None},
                    ...
                ]
            }
        """
        conn = get_db_connection(self.db_path)
        cursor = conn.cursor()

        # Get all details for this job
        cursor.execute("""
            SELECT date, model, status, started_at, completed_at, duration_seconds, error
            FROM job_details
            WHERE job_id = ?
            ORDER BY date ASC, model ASC
        """, (job_id,))

        rows = cursor.fetchall()
        conn.close()

        if not rows:
            return {
                "total_model_days": 0,
                "completed": 0,
                "failed": 0,
                "current": None,
                "details": []
            }

        total = len(rows)
        completed = sum(1 for row in rows if row[2] == "completed")
        failed = sum(1 for row in rows if row[2] == "failed")

        # Find currently running model-day
        current = None
        for row in rows:
            if row[2] == "running":
                current = {"date": row[0], "model": row[1]}
                break

        # Build details list
        details = []
        for row in rows:
            details.append({
                "date": row[0],
                "model": row[1],
                "status": row[2],
                "started_at": row[3],
                "completed_at": row[4],
                "duration_seconds": row[5],
                "error": row[6]
            })

        return {
            "total_model_days": total,
            "completed": completed,
            "failed": failed,
            "current": current,
            "details": details
        }

    # ========== Concurrency Control ==========

    def can_start_new_job(self) -> bool:
        """Check if a new job can be started (max 1 concurrent job)"""
        running_jobs = self.get_running_jobs()
        return len(running_jobs) == 0

    def find_job_by_date_range(self, date_range: List[str]) -> Optional[Dict]:
        """Find job with exact matching date range (for idempotency check)"""
        conn = get_db_connection(self.db_path)
        cursor = conn.cursor()

        # Query recent jobs (last 24 hours)
        cursor.execute("""
            SELECT * FROM jobs
            WHERE created_at > datetime('now', '-1 day')
            ORDER BY created_at DESC
        """)

        rows = cursor.fetchall()
        conn.close()

        # Check each job's date_range
        target_range = set(date_range)
        for row in rows:
            job_range = set(json.loads(row[3]))  # date_range column
            if job_range == target_range:
                return self._row_to_job_dict(row)

        return None

    # ========== Utility Methods ==========

    def _row_to_job_dict(self, row: tuple) -> Dict:
        """Convert DB row to job dictionary"""
        return {
            "job_id": row[0],
            "config_path": row[1],
            "status": row[2],
            "date_range": json.loads(row[3]),
            "models": json.loads(row[4]),
            "created_at": row[5],
            "started_at": row[6],
            "completed_at": row[7],
            "total_duration_seconds": row[8],
            "error": row[9]
        }

    def cleanup_old_jobs(self, days: int = 30) -> int:
        """
        Delete jobs older than specified days (cleanup maintenance).

        Returns:
            Number of jobs deleted
        """
        conn = get_db_connection(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            DELETE FROM jobs
            WHERE created_at < datetime('now', '-' || ? || ' days')
        """, (days,))

        deleted_count = cursor.rowcount
        conn.commit()
        conn.close()

        return deleted_count

4. Database Utility Module

# api/database.py

import sqlite3
from typing import Optional
import os

def get_db_connection(db_path: str = "data/jobs.db") -> sqlite3.Connection:
    """
    Get SQLite database connection.

    Ensures:
    - Database directory exists
    - Foreign keys are enabled
    - Row factory returns dict-like objects
    """
    # Ensure data directory exists
    os.makedirs(os.path.dirname(db_path), exist_ok=True)

    conn = sqlite3.connect(db_path, check_same_thread=False)
    conn.execute("PRAGMA foreign_keys = ON")  # Enable FK constraints
    conn.row_factory = sqlite3.Row  # Return rows as dict-like objects

    return conn

def initialize_database(db_path: str = "data/jobs.db") -> None:
    """Create database tables if they don't exist"""
    conn = get_db_connection(db_path)
    cursor = conn.cursor()

    # Create jobs table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS jobs (
            job_id TEXT PRIMARY KEY,
            config_path TEXT NOT NULL,
            status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'partial', 'failed')),
            date_range TEXT NOT NULL,
            models TEXT NOT NULL,
            created_at TEXT NOT NULL,
            started_at TEXT,
            completed_at TEXT,
            total_duration_seconds REAL,
            error TEXT
        )
    """)

    # Create indexes
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)
    """)
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at DESC)
    """)

    # Create job_details table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS job_details (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            job_id TEXT NOT NULL,
            date TEXT NOT NULL,
            model TEXT NOT NULL,
            status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed')),
            started_at TEXT,
            completed_at TEXT,
            duration_seconds REAL,
            error TEXT,
            FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE
        )
    """)

    # Create indexes
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_job_details_job_id ON job_details(job_id)
    """)
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_job_details_status ON job_details(status)
    """)
    cursor.execute("""
        CREATE UNIQUE INDEX IF NOT EXISTS idx_job_details_unique
        ON job_details(job_id, date, model)
    """)

    conn.commit()
    conn.close()

5. State Transitions

5.1 Job Status State Machine

pending ──────────────> running ──────────> completed
                          │                     │
                          │                     │
                          └────────────> partial
                          │                     │
                          └────────────> failed

Transition Logic:

  • pending → running: When first model-day starts executing
  • running → completed: When all model-days complete successfully
  • running → partial: When some model-days succeed, some fail
  • running → failed: When all model-days fail (rare)

5.2 Job Detail Status State Machine

pending ──────> running ──────> completed
                   │
                   └───────────> failed

Transition Logic:

  • pending → running: When worker starts executing that model-day
  • running → completed: When agent.run_trading_session() succeeds
  • running → failed: When agent.run_trading_session() raises exception after retries

6. Concurrency Scenarios

6.1 Scenario: Duplicate Trigger Requests

Timeline:

  1. Request A: POST /simulate/trigger → Job created with date_range=[2025-01-16, 2025-01-17]
  2. Request B (5 seconds later): POST /simulate/trigger → Same date range

Expected Behavior:

  • Request A: Returns {"job_id": "abc123", "status": "accepted"}
  • Request B: find_job_by_date_range() finds Job abc123
  • Request B: Returns {"job_id": "abc123", "status": "running", ...} (same job)

Code:

# In /simulate/trigger endpoint
existing_job = job_manager.find_job_by_date_range(date_range)
if existing_job:
    # Return existing job instead of creating duplicate
    return existing_job

6.2 Scenario: Concurrent Jobs with Different Dates

Timeline:

  1. Job A running: date_range=[2025-01-01 to 2025-01-10] (started 5 min ago)
  2. Request: POST /simulate/trigger with date_range=[2025-01-11 to 2025-01-15]

Expected Behavior:

  • can_start_new_job() returns False (Job A is still running)
  • Request returns 409 Conflict with details of Job A

6.3 Scenario: Job Cleanup on API Restart

Problem: API crashes while job is running. On restart, job stuck in "running" state.

Solution: On API startup, detect stale jobs and mark as failed:

# In api/main.py startup event
@app.on_event("startup")
async def startup_event():
    job_manager = JobManager()

    # Find jobs stuck in 'running' or 'pending' state
    stale_jobs = job_manager.get_running_jobs()

    for job in stale_jobs:
        # Mark as failed with explanation
        job_manager.update_job_status(
            job["job_id"],
            "failed",
            error="API restarted while job was running"
        )

7. Testing Strategy

7.1 Unit Tests

# tests/test_job_manager.py

import pytest
from api.job_manager import JobManager
import tempfile
import os

@pytest.fixture
def job_manager():
    # Use temporary database for tests
    temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db")
    temp_db.close()

    jm = JobManager(db_path=temp_db.name)
    yield jm

    # Cleanup
    os.unlink(temp_db.name)

def test_create_job(job_manager):
    job_id = job_manager.create_job(
        config_path="configs/test.json",
        date_range=["2025-01-16", "2025-01-17"],
        models=["gpt-5", "claude-3.7-sonnet"]
    )

    assert job_id is not None
    job = job_manager.get_job(job_id)
    assert job["status"] == "pending"
    assert job["date_range"] == ["2025-01-16", "2025-01-17"]

    # Check job_details created
    progress = job_manager.get_job_progress(job_id)
    assert progress["total_model_days"] == 4  # 2 dates × 2 models

def test_concurrent_job_blocked(job_manager):
    # Create first job
    job1_id = job_manager.create_job("configs/test.json", ["2025-01-16"], ["gpt-5"])

    # Try to create second job while first is pending
    with pytest.raises(ValueError, match="Another simulation job is already running"):
        job_manager.create_job("configs/test.json", ["2025-01-17"], ["gpt-5"])

    # Mark first job as completed
    job_manager.update_job_status(job1_id, "completed")

    # Now second job should be allowed
    job2_id = job_manager.create_job("configs/test.json", ["2025-01-17"], ["gpt-5"])
    assert job2_id is not None

def test_job_status_transitions(job_manager):
    job_id = job_manager.create_job("configs/test.json", ["2025-01-16"], ["gpt-5"])

    # Update job detail to running
    job_manager.update_job_detail_status(job_id, "2025-01-16", "gpt-5", "running")

    # Job should now be 'running'
    job = job_manager.get_job(job_id)
    assert job["status"] == "running"
    assert job["started_at"] is not None

    # Complete the detail
    job_manager.update_job_detail_status(job_id, "2025-01-16", "gpt-5", "completed")

    # Job should now be 'completed'
    job = job_manager.get_job(job_id)
    assert job["status"] == "completed"
    assert job["completed_at"] is not None

def test_partial_job_status(job_manager):
    job_id = job_manager.create_job(
        "configs/test.json",
        ["2025-01-16"],
        ["gpt-5", "claude-3.7-sonnet"]
    )

    # One model succeeds
    job_manager.update_job_detail_status(job_id, "2025-01-16", "gpt-5", "running")
    job_manager.update_job_detail_status(job_id, "2025-01-16", "gpt-5", "completed")

    # One model fails
    job_manager.update_job_detail_status(job_id, "2025-01-16", "claude-3.7-sonnet", "running")
    job_manager.update_job_detail_status(
        job_id, "2025-01-16", "claude-3.7-sonnet", "failed",
        error="API timeout"
    )

    # Job should be 'partial'
    job = job_manager.get_job(job_id)
    assert job["status"] == "partial"

    progress = job_manager.get_job_progress(job_id)
    assert progress["completed"] == 1
    assert progress["failed"] == 1

8. Performance Considerations

8.1 Database Indexing

  • idx_jobs_status: Fast filtering for running jobs
  • idx_jobs_created_at DESC: Fast retrieval of most recent job
  • idx_job_details_unique: Prevent duplicate model-day entries

8.2 Connection Pooling

For MVP, using sqlite3.connect() per operation is acceptable (low concurrency).

For higher concurrency (future), consider:

  • SQLAlchemy ORM with connection pooling
  • PostgreSQL for production deployments

8.3 Query Optimization

Avoid N+1 queries:

# BAD: Separate query for each job's progress
for job in jobs:
    progress = job_manager.get_job_progress(job["job_id"])

# GOOD: Join jobs and job_details in single query
SELECT
    jobs.*,
    COUNT(job_details.id) as total,
    SUM(CASE WHEN job_details.status = 'completed' THEN 1 ELSE 0 END) as completed
FROM jobs
LEFT JOIN job_details ON jobs.job_id = job_details.job_id
GROUP BY jobs.job_id

9. Error Handling

9.1 Database Errors

Scenario: SQLite database is locked or corrupted

Handling:

try:
    job_id = job_manager.create_job(...)
except sqlite3.OperationalError as e:
    # Database locked - retry with exponential backoff
    logger.error(f"Database error: {e}")
    raise HTTPException(status_code=503, detail="Database temporarily unavailable")
except sqlite3.IntegrityError as e:
    # Constraint violation (e.g., duplicate job_id)
    logger.error(f"Integrity error: {e}")
    raise HTTPException(status_code=400, detail="Invalid job data")

9.2 Foreign Key Violations

Scenario: Attempt to create job_detail for non-existent job

Prevention:

  • Always create job record before job_details records
  • Use transactions to ensure atomicity
def create_job(self, ...):
    conn = get_db_connection(self.db_path)
    try:
        cursor = conn.cursor()

        # Insert job
        cursor.execute("INSERT INTO jobs ...")

        # Insert job_details
        for date in date_range:
            for model in models:
                cursor.execute("INSERT INTO job_details ...")

        conn.commit()  # Atomic commit
    except Exception as e:
        conn.rollback()  # Rollback on any error
        raise
    finally:
        conn.close()

10. Migration Strategy

10.1 Schema Versioning

For future schema changes, use migration scripts:

data/
└── migrations/
    ├── 001_initial_schema.sql
    ├── 002_add_priority_column.sql
    └── ...

Track applied migrations in database:

CREATE TABLE IF NOT EXISTS schema_migrations (
    version INTEGER PRIMARY KEY,
    applied_at TEXT NOT NULL
);

10.2 Backward Compatibility

When adding columns:

  • Use ALTER TABLE ADD COLUMN ... DEFAULT ... for backward compatibility
  • Never remove columns (deprecate instead)
  • Version API responses to handle schema changes

Summary

The Job Manager provides:

  1. Robust job tracking with SQLite persistence
  2. Concurrency control ensuring single-job execution
  3. Granular progress monitoring at model-day level
  4. Flexible status handling (completed/partial/failed)
  5. Idempotency for duplicate trigger requests

Next specification: Background Worker Architecture