Files
ffmpeg-worker/docs/plans/2025-11-30-ffmpeg-worker-implementation.md
2025-11-30 14:59:27 -05:00

31 KiB

FFmpeg Worker Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Build a dockerized FFmpeg worker with REST API for job submission and status polling.

Architecture: FastAPI app with in-memory job queue, single background worker processing jobs sequentially, real-time FFmpeg progress parsing via -progress pipe:1.

Tech Stack: Python 3.14, FastAPI, Pydantic, asyncio, ffmpeg/ffprobe CLI


Task 1: Project Setup

Files:

  • Create: requirements.txt
  • Create: app/__init__.py
  • Create: app/main.py

Step 1: Create requirements.txt

fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.9.0

Step 2: Create app package

Create app/__init__.py (empty file).

Step 3: Create minimal FastAPI app

Create app/main.py:

from fastapi import FastAPI

app = FastAPI(title="FFmpeg Worker", version="1.0.0")


@app.get("/health")
async def health() -> dict[str, str]:
    return {"status": "ok"}

Step 4: Test manually

Run: cd /home/bballou/ffmpeg-worker && uvicorn app.main:app --reload

In another terminal: curl http://localhost:8000/health

Expected: {"status":"ok"}

Stop the server (Ctrl+C).

Step 5: Commit

git add requirements.txt app/
git commit -m "feat: initial FastAPI setup with health endpoint"

Task 2: Job Models

Files:

  • Create: app/models.py
  • Create: tests/__init__.py
  • Create: tests/test_models.py

Step 1: Create tests package

Create tests/__init__.py (empty file).

Step 2: Write the failing test

Create tests/test_models.py:

from datetime import datetime

from app.models import Job, JobStatus, CreateJobRequest, JobResponse


def test_job_creation():
    job = Job(command="-i input.mp4 output.mp4")

    assert job.id.startswith("job_")
    assert len(job.id) == 20  # job_ + 16 hex chars
    assert job.status == JobStatus.QUEUED
    assert job.command == "-i input.mp4 output.mp4"
    assert isinstance(job.created_at, datetime)
    assert job.started_at is None
    assert job.completed_at is None
    assert job.progress is None
    assert job.output_files == []
    assert job.error is None


def test_create_job_request():
    request = CreateJobRequest(command="-i test.mp4 out.mp4")
    assert request.command == "-i test.mp4 out.mp4"


def test_job_response_from_job():
    job = Job(command="-i input.mp4 output.mp4")
    response = JobResponse.model_validate(job.model_dump())

    assert response.id == job.id
    assert response.status == JobStatus.QUEUED

Step 3: Run test to verify it fails

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_models.py -v

Expected: FAIL with ModuleNotFoundError (app.models doesn't exist)

Step 4: Write the implementation

Create app/models.py:

import secrets
from datetime import datetime, timezone
from enum import Enum

from pydantic import BaseModel, Field


class JobStatus(str, Enum):
    QUEUED = "queued"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"


class Progress(BaseModel):
    frame: int = 0
    fps: float = 0.0
    time: str = "00:00:00.00"
    bitrate: str = "0kbits/s"
    percent: float | None = None


class Job(BaseModel):
    id: str = Field(default_factory=lambda: f"job_{secrets.token_hex(8)}")
    status: JobStatus = JobStatus.QUEUED
    command: str
    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    started_at: datetime | None = None
    completed_at: datetime | None = None
    progress: Progress | None = None
    output_files: list[str] = Field(default_factory=list)
    error: str | None = None


class CreateJobRequest(BaseModel):
    command: str


class JobResponse(BaseModel):
    id: str
    status: JobStatus
    command: str
    created_at: datetime
    started_at: datetime | None = None
    completed_at: datetime | None = None
    progress: Progress | None = None
    output_files: list[str] = Field(default_factory=list)
    error: str | None = None

Step 5: Run test to verify it passes

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_models.py -v

Expected: PASS (3 tests)

Step 6: Commit

git add app/models.py tests/
git commit -m "feat: add job models with Pydantic"

Task 3: Job Store

Files:

  • Create: app/store.py
  • Create: tests/test_store.py

Step 1: Write the failing test

Create tests/test_store.py:

import pytest

from app.models import Job, JobStatus
from app.store import JobStore


def test_add_and_get_job():
    store = JobStore()
    job = Job(command="-i input.mp4 output.mp4")

    store.add(job)
    retrieved = store.get(job.id)

    assert retrieved is not None
    assert retrieved.id == job.id
    assert retrieved.command == job.command


def test_get_nonexistent_job():
    store = JobStore()

    result = store.get("nonexistent")

    assert result is None


def test_list_all_jobs():
    store = JobStore()
    job1 = Job(command="-i a.mp4 b.mp4")
    job2 = Job(command="-i c.mp4 d.mp4")

    store.add(job1)
    store.add(job2)
    jobs = store.list_all()

    assert len(jobs) == 2
    assert job1 in jobs
    assert job2 in jobs


def test_list_jobs_by_status():
    store = JobStore()
    job1 = Job(command="-i a.mp4 b.mp4")
    job2 = Job(command="-i c.mp4 d.mp4")
    job2.status = JobStatus.RUNNING

    store.add(job1)
    store.add(job2)

    queued = store.list_by_status(JobStatus.QUEUED)
    running = store.list_by_status(JobStatus.RUNNING)

    assert len(queued) == 1
    assert len(running) == 1
    assert queued[0].id == job1.id
    assert running[0].id == job2.id

Step 2: Run test to verify it fails

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_store.py -v

Expected: FAIL with ModuleNotFoundError

Step 3: Write the implementation

Create app/store.py:

from app.models import Job, JobStatus


class JobStore:
    def __init__(self) -> None:
        self._jobs: dict[str, Job] = {}

    def add(self, job: Job) -> None:
        self._jobs[job.id] = job

    def get(self, job_id: str) -> Job | None:
        return self._jobs.get(job_id)

    def list_all(self) -> list[Job]:
        return list(self._jobs.values())

    def list_by_status(self, status: JobStatus) -> list[Job]:
        return [job for job in self._jobs.values() if job.status == status]

Step 4: Run test to verify it passes

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_store.py -v

Expected: PASS (4 tests)

Step 5: Commit

git add app/store.py tests/test_store.py
git commit -m "feat: add in-memory job store"

Task 4: API Endpoints

Files:

  • Modify: app/main.py
  • Create: tests/test_api.py

Step 1: Write the failing tests

Create tests/test_api.py:

import pytest
from fastapi.testclient import TestClient

from app.main import app, job_store


@pytest.fixture(autouse=True)
def clear_store():
    job_store._jobs.clear()
    yield
    job_store._jobs.clear()


client = TestClient(app)


def test_health():
    response = client.get("/health")

    assert response.status_code == 200
    assert response.json() == {"status": "ok"}


def test_create_job():
    response = client.post("/jobs", json={"command": "-i input.mp4 output.mp4"})

    assert response.status_code == 201
    data = response.json()
    assert data["id"].startswith("job_")
    assert data["status"] == "queued"
    assert data["command"] == "-i input.mp4 output.mp4"


def test_get_job():
    # Create a job first
    create_response = client.post("/jobs", json={"command": "-i test.mp4 out.mp4"})
    job_id = create_response.json()["id"]

    # Get the job
    response = client.get(f"/jobs/{job_id}")

    assert response.status_code == 200
    data = response.json()
    assert data["id"] == job_id
    assert data["status"] == "queued"


def test_get_nonexistent_job():
    response = client.get("/jobs/nonexistent")

    assert response.status_code == 404


def test_list_jobs():
    client.post("/jobs", json={"command": "-i a.mp4 b.mp4"})
    client.post("/jobs", json={"command": "-i c.mp4 d.mp4"})

    response = client.get("/jobs")

    assert response.status_code == 200
    data = response.json()
    assert len(data) == 2


def test_list_jobs_filter_by_status():
    client.post("/jobs", json={"command": "-i a.mp4 b.mp4"})

    response = client.get("/jobs?status=queued")

    assert response.status_code == 200
    data = response.json()
    assert len(data) == 1
    assert data[0]["status"] == "queued"

Step 2: Run test to verify it fails

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_api.py -v

Expected: FAIL (job_store not defined, endpoints missing)

Step 3: Write the implementation

Replace app/main.py:

from fastapi import FastAPI, HTTPException, status

from app.models import CreateJobRequest, Job, JobResponse, JobStatus
from app.store import JobStore

app = FastAPI(title="FFmpeg Worker", version="1.0.0")

job_store = JobStore()


@app.get("/health")
async def health() -> dict[str, str]:
    return {"status": "ok"}


@app.post("/jobs", status_code=status.HTTP_201_CREATED)
async def create_job(request: CreateJobRequest) -> JobResponse:
    job = Job(command=request.command)
    job_store.add(job)
    return JobResponse.model_validate(job.model_dump())


@app.get("/jobs/{job_id}")
async def get_job(job_id: str) -> JobResponse:
    job = job_store.get(job_id)
    if job is None:
        raise HTTPException(status_code=404, detail="Job not found")
    return JobResponse.model_validate(job.model_dump())


@app.get("/jobs")
async def list_jobs(status: JobStatus | None = None) -> list[JobResponse]:
    if status is not None:
        jobs = job_store.list_by_status(status)
    else:
        jobs = job_store.list_all()
    return [JobResponse.model_validate(job.model_dump()) for job in jobs]

Step 4: Run test to verify it passes

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_api.py -v

Expected: PASS (6 tests)

Step 5: Commit

git add app/main.py tests/test_api.py
git commit -m "feat: add job CRUD API endpoints"

Task 5: FFmpeg Command Parser

Files:

  • Create: app/ffmpeg.py
  • Create: tests/test_ffmpeg.py

Step 1: Write the failing tests

Create tests/test_ffmpeg.py:

import pytest

from app.ffmpeg import parse_command, resolve_paths


def test_parse_simple_command():
    command = "-i input.mp4 output.mp4"

    args = parse_command(command)

    assert args == ["-i", "input.mp4", "output.mp4"]


def test_parse_command_with_options():
    command = "-i input.mp4 -c:v libx264 -crf 23 output.mp4"

    args = parse_command(command)

    assert args == ["-i", "input.mp4", "-c:v", "libx264", "-crf", "23", "output.mp4"]


def test_parse_command_with_quotes():
    command = '-i "input file.mp4" output.mp4'

    args = parse_command(command)

    assert args == ["-i", "input file.mp4", "output.mp4"]


def test_resolve_paths():
    args = ["-i", "input/video.mp4", "-c:v", "libx264", "output/result.mp4"]
    data_path = "/data"

    resolved = resolve_paths(args, data_path)

    assert resolved == [
        "-i", "/data/input/video.mp4",
        "-c:v", "libx264",
        "/data/output/result.mp4"
    ]


def test_resolve_paths_preserves_absolute():
    args = ["-i", "/already/absolute.mp4", "output.mp4"]
    data_path = "/data"

    resolved = resolve_paths(args, data_path)

    assert resolved == ["-i", "/already/absolute.mp4", "/data/output.mp4"]


def test_resolve_paths_skips_options():
    args = ["-c:v", "libx264", "-preset", "fast"]
    data_path = "/data"

    resolved = resolve_paths(args, data_path)

    # Options and their values should not be resolved as paths
    assert resolved == ["-c:v", "libx264", "-preset", "fast"]

Step 2: Run test to verify it fails

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py -v

Expected: FAIL with ModuleNotFoundError

Step 3: Write the implementation

Create app/ffmpeg.py:

import shlex
from pathlib import Path

# FFmpeg options that take a value (not exhaustive, covers common ones)
OPTIONS_WITH_VALUES = {
    "-c", "-c:v", "-c:a", "-b", "-b:v", "-b:a", "-r", "-s", "-ar", "-ac",
    "-f", "-t", "-ss", "-to", "-vf", "-af", "-filter:v", "-filter:a",
    "-preset", "-crf", "-qp", "-profile", "-level", "-pix_fmt", "-map",
    "-metadata", "-disposition", "-threads", "-filter_complex",
}


def parse_command(command: str) -> list[str]:
    """Parse FFmpeg command string into argument list."""
    return shlex.split(command)


def resolve_paths(args: list[str], data_path: str) -> list[str]:
    """Resolve relative paths against the data directory."""
    resolved = []
    skip_next = False

    for i, arg in enumerate(args):
        if skip_next:
            resolved.append(arg)
            skip_next = False
            continue

        # Check if this is an option that takes a value
        if arg in OPTIONS_WITH_VALUES or arg.startswith("-"):
            resolved.append(arg)
            if arg in OPTIONS_WITH_VALUES:
                skip_next = True
            continue

        # This looks like a file path - resolve if relative
        path = Path(arg)
        if not path.is_absolute():
            resolved.append(str(Path(data_path) / arg))
        else:
            resolved.append(arg)

    return resolved

Step 4: Run test to verify it passes

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py -v

Expected: PASS (6 tests)

Step 5: Commit

git add app/ffmpeg.py tests/test_ffmpeg.py
git commit -m "feat: add FFmpeg command parser with path resolution"

Task 6: FFmpeg Progress Parser

Files:

  • Modify: app/ffmpeg.py
  • Modify: tests/test_ffmpeg.py

Step 1: Write the failing tests

Add to tests/test_ffmpeg.py:

from app.ffmpeg import parse_progress, extract_output_path


def test_parse_progress():
    output = """frame=1234
fps=30.24
total_size=5678900
out_time_ms=83450000
bitrate=1250.5kbits/s
progress=continue
"""
    progress = parse_progress(output, duration_seconds=120.0)

    assert progress.frame == 1234
    assert progress.fps == 30.24
    assert progress.time == "00:01:23.45"
    assert progress.bitrate == "1250.5kbits/s"
    assert progress.percent == pytest.approx(69.54, rel=0.01)


def test_parse_progress_no_duration():
    output = "frame=100\nfps=25.0\nout_time_ms=4000000\nbitrate=500kbits/s\n"

    progress = parse_progress(output, duration_seconds=None)

    assert progress.frame == 100
    assert progress.percent is None


def test_extract_output_path():
    args = ["-i", "input.mp4", "-c:v", "libx264", "output.mp4"]

    output_path = extract_output_path(args)

    assert output_path == "output.mp4"


def test_extract_output_path_complex():
    args = ["-i", "a.mp4", "-i", "b.mp4", "-filter_complex", "[0:v][1:v]concat", "out.mp4"]

    output_path = extract_output_path(args)

    assert output_path == "out.mp4"

Step 2: Run test to verify it fails

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py::test_parse_progress -v

Expected: FAIL (parse_progress not defined)

Step 3: Write the implementation

Add to app/ffmpeg.py:

from app.models import Progress


def parse_progress(output: str, duration_seconds: float | None) -> Progress:
    """Parse FFmpeg progress output into Progress model."""
    data: dict[str, str] = {}
    for line in output.strip().split("\n"):
        if "=" in line:
            key, value = line.split("=", 1)
            data[key.strip()] = value.strip()

    frame = int(data.get("frame", 0))
    fps = float(data.get("fps", 0.0))
    out_time_ms = int(data.get("out_time_ms", 0))
    bitrate = data.get("bitrate", "0kbits/s")

    # Convert out_time_ms to HH:MM:SS.mm format
    total_seconds = out_time_ms / 1_000_000
    hours = int(total_seconds // 3600)
    minutes = int((total_seconds % 3600) // 60)
    seconds = total_seconds % 60
    time_str = f"{hours:02d}:{minutes:02d}:{seconds:05.2f}"

    # Calculate percent if duration is known
    percent = None
    if duration_seconds and duration_seconds > 0:
        percent = (total_seconds / duration_seconds) * 100
        percent = min(percent, 100.0)  # Cap at 100%

    return Progress(
        frame=frame,
        fps=fps,
        time=time_str,
        bitrate=bitrate,
        percent=percent,
    )


def extract_output_path(args: list[str]) -> str | None:
    """Extract output file path from FFmpeg arguments (last non-option argument)."""
    # Work backwards to find the last argument that isn't an option or option value
    i = len(args) - 1
    while i >= 0:
        arg = args[i]
        # Skip if it's an option
        if arg.startswith("-"):
            i -= 1
            continue
        # Check if previous arg is an option that takes a value
        if i > 0 and args[i - 1] in OPTIONS_WITH_VALUES:
            i -= 1
            continue
        # Check if it looks like a file path (has extension or contains /)
        if "." in arg or "/" in arg:
            return arg
        i -= 1
    return None

Also update the imports at the top of app/ffmpeg.py:

import shlex
from pathlib import Path

from app.models import Progress

Step 4: Run test to verify it passes

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py -v

Expected: PASS (10 tests)

Step 5: Commit

git add app/ffmpeg.py tests/test_ffmpeg.py
git commit -m "feat: add FFmpeg progress parser and output path extraction"

Task 7: Job Queue

Files:

  • Create: app/queue.py
  • Create: tests/test_queue.py

Step 1: Write the failing tests

Create tests/test_queue.py:

import pytest
import asyncio

from app.queue import JobQueue


@pytest.mark.asyncio
async def test_enqueue_and_dequeue():
    queue = JobQueue()

    await queue.enqueue("job_123")
    job_id = await queue.dequeue()

    assert job_id == "job_123"


@pytest.mark.asyncio
async def test_queue_ordering():
    queue = JobQueue()

    await queue.enqueue("job_1")
    await queue.enqueue("job_2")
    await queue.enqueue("job_3")

    assert await queue.dequeue() == "job_1"
    assert await queue.dequeue() == "job_2"
    assert await queue.dequeue() == "job_3"


@pytest.mark.asyncio
async def test_queue_size():
    queue = JobQueue()

    assert queue.size() == 0
    await queue.enqueue("job_1")
    assert queue.size() == 1
    await queue.enqueue("job_2")
    assert queue.size() == 2
    await queue.dequeue()
    assert queue.size() == 1

Step 2: Add pytest-asyncio to requirements

Add to requirements.txt:

fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.9.0
pytest==8.3.0
pytest-asyncio==0.24.0

Step 3: Run test to verify it fails

Run: cd /home/bballou/ffmpeg-worker && pip install pytest-asyncio && python -m pytest tests/test_queue.py -v

Expected: FAIL with ModuleNotFoundError

Step 4: Write the implementation

Create app/queue.py:

import asyncio


class JobQueue:
    def __init__(self) -> None:
        self._queue: asyncio.Queue[str] = asyncio.Queue()

    async def enqueue(self, job_id: str) -> None:
        await self._queue.put(job_id)

    async def dequeue(self) -> str:
        return await self._queue.get()

    def size(self) -> int:
        return self._queue.qsize()

Step 5: Run test to verify it passes

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_queue.py -v

Expected: PASS (3 tests)

Step 6: Commit

git add app/queue.py tests/test_queue.py requirements.txt
git commit -m "feat: add async job queue"

Task 8: FFmpeg Runner

Files:

  • Modify: app/ffmpeg.py
  • Modify: tests/test_ffmpeg.py

Step 1: Write the failing test

Add to tests/test_ffmpeg.py:

import asyncio
from unittest.mock import AsyncMock, patch, MagicMock

from app.models import Job


@pytest.mark.asyncio
async def test_get_duration():
    from app.ffmpeg import get_duration

    # Mock ffprobe output
    mock_process = MagicMock()
    mock_process.communicate = AsyncMock(return_value=(b"120.5\n", b""))
    mock_process.returncode = 0

    with patch("asyncio.create_subprocess_exec", return_value=mock_process):
        duration = await get_duration("/data/input.mp4")

    assert duration == 120.5


@pytest.mark.asyncio
async def test_get_duration_failure():
    from app.ffmpeg import get_duration

    mock_process = MagicMock()
    mock_process.communicate = AsyncMock(return_value=(b"", b"error"))
    mock_process.returncode = 1

    with patch("asyncio.create_subprocess_exec", return_value=mock_process):
        duration = await get_duration("/data/input.mp4")

    assert duration is None

Step 2: Run test to verify it fails

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py::test_get_duration -v

Expected: FAIL (get_duration not defined)

Step 3: Write the implementation

Add to app/ffmpeg.py:

import asyncio


async def get_duration(input_path: str) -> float | None:
    """Get duration of media file using ffprobe."""
    try:
        process = await asyncio.create_subprocess_exec(
            "ffprobe",
            "-v", "error",
            "-show_entries", "format=duration",
            "-of", "default=noprint_wrappers=1:nokey=1",
            input_path,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, _ = await process.communicate()

        if process.returncode == 0:
            return float(stdout.decode().strip())
    except (ValueError, OSError):
        pass
    return None

Step 4: Run test to verify it passes

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py::test_get_duration tests/test_ffmpeg.py::test_get_duration_failure -v

Expected: PASS (2 tests)

Step 5: Commit

git add app/ffmpeg.py tests/test_ffmpeg.py
git commit -m "feat: add ffprobe duration detection"

Task 9: Worker Loop

Files:

  • Modify: app/queue.py
  • Modify: app/main.py

Step 1: Add worker loop to queue.py

Add to app/queue.py:

import asyncio
import os
from datetime import datetime, timezone

from app.ffmpeg import (
    parse_command,
    resolve_paths,
    get_duration,
    parse_progress,
    extract_output_path,
)
from app.models import Job, JobStatus, Progress
from app.store import JobStore


class JobQueue:
    def __init__(self) -> None:
        self._queue: asyncio.Queue[str] = asyncio.Queue()

    async def enqueue(self, job_id: str) -> None:
        await self._queue.put(job_id)

    async def dequeue(self) -> str:
        return await self._queue.get()

    def size(self) -> int:
        return self._queue.qsize()


async def run_ffmpeg(job: Job, data_path: str, timeout: int) -> None:
    """Execute FFmpeg command and update job with progress."""
    args = parse_command(job.command)
    resolved_args = resolve_paths(args, data_path)

    # Find input file for duration
    input_path = None
    for i, arg in enumerate(resolved_args):
        if arg == "-i" and i + 1 < len(resolved_args):
            input_path = resolved_args[i + 1]
            break

    duration = None
    if input_path:
        duration = await get_duration(input_path)

    # Build FFmpeg command with progress output
    ffmpeg_args = ["ffmpeg", "-y", "-progress", "pipe:1", "-nostats"] + resolved_args

    process = await asyncio.create_subprocess_exec(
        *ffmpeg_args,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    # Read progress output
    progress_buffer = ""
    try:
        async with asyncio.timeout(timeout):
            while True:
                line = await process.stdout.readline()
                if not line:
                    break
                progress_buffer += line.decode()

                # Parse progress when we get a complete block
                if "progress=" in progress_buffer:
                    job.progress = parse_progress(progress_buffer, duration)
                    progress_buffer = ""

            _, stderr = await process.communicate()
    except asyncio.TimeoutError:
        process.kill()
        await process.wait()
        raise TimeoutError(f"FFmpeg timed out after {timeout} seconds")

    if process.returncode != 0:
        raise RuntimeError(stderr.decode())

    # Extract output path
    output_path = extract_output_path(args)
    if output_path:
        job.output_files = [output_path]


async def worker_loop(job_queue: "JobQueue", job_store: JobStore) -> None:
    """Main worker loop that processes jobs from the queue."""
    data_path = os.environ.get("DATA_PATH", "/data")
    timeout = int(os.environ.get("FFMPEG_TIMEOUT", "3600"))

    while True:
        job_id = await job_queue.dequeue()
        job = job_store.get(job_id)

        if job is None:
            continue

        job.status = JobStatus.RUNNING
        job.started_at = datetime.now(timezone.utc)
        job.progress = Progress()

        try:
            await run_ffmpeg(job, data_path, timeout)
            job.status = JobStatus.COMPLETED
        except Exception as e:
            job.status = JobStatus.FAILED
            job.error = str(e)
        finally:
            job.completed_at = datetime.now(timezone.utc)

Step 2: Wire up worker in main.py

Replace app/main.py:

import asyncio
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException, status

from app.models import CreateJobRequest, Job, JobResponse, JobStatus
from app.queue import JobQueue, worker_loop
from app.store import JobStore

job_store = JobStore()
job_queue = JobQueue()


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Start worker on startup
    worker_task = asyncio.create_task(worker_loop(job_queue, job_store))
    yield
    # Cancel worker on shutdown
    worker_task.cancel()
    try:
        await worker_task
    except asyncio.CancelledError:
        pass


app = FastAPI(title="FFmpeg Worker", version="1.0.0", lifespan=lifespan)


@app.get("/health")
async def health() -> dict[str, str]:
    return {"status": "ok"}


@app.post("/jobs", status_code=status.HTTP_201_CREATED)
async def create_job(request: CreateJobRequest) -> JobResponse:
    job = Job(command=request.command)
    job_store.add(job)
    await job_queue.enqueue(job.id)
    return JobResponse.model_validate(job.model_dump())


@app.get("/jobs/{job_id}")
async def get_job(job_id: str) -> JobResponse:
    job = job_store.get(job_id)
    if job is None:
        raise HTTPException(status_code=404, detail="Job not found")
    return JobResponse.model_validate(job.model_dump())


@app.get("/jobs")
async def list_jobs(status: JobStatus | None = None) -> list[JobResponse]:
    if status is not None:
        jobs = job_store.list_by_status(status)
    else:
        jobs = job_store.list_all()
    return [JobResponse.model_validate(job.model_dump()) for job in jobs]

Step 3: Run all tests

Run: cd /home/bballou/ffmpeg-worker && python -m pytest tests/ -v

Expected: All tests PASS

Step 4: Commit

git add app/queue.py app/main.py
git commit -m "feat: add worker loop with FFmpeg execution"

Task 10: Dockerfile

Files:

  • Create: Dockerfile
  • Create: docker-compose.yml
  • Create: .dockerignore

Step 1: Create Dockerfile

Create Dockerfile:

FROM python:3.14-slim

# Install FFmpeg
RUN apt-get update && \
    apt-get install -y --no-install-recommends ffmpeg && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY app/ ./app/

# Create data directory
RUN mkdir -p /data

ENV DATA_PATH=/data
ENV FFMPEG_TIMEOUT=3600
ENV HOST=0.0.0.0
ENV PORT=8000

EXPOSE 8000

CMD ["sh", "-c", "uvicorn app.main:app --host $HOST --port $PORT"]

Step 2: Create docker-compose.yml

Create docker-compose.yml:

services:
  ffmpeg-worker:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./data:/data
    environment:
      - DATA_PATH=/data
      - FFMPEG_TIMEOUT=3600

Step 3: Create .dockerignore

Create .dockerignore:

__pycache__/
*.pyc
.git/
.gitignore
tests/
docs/
data/
*.md
.env
.venv/

Step 4: Create data directory

mkdir -p /home/bballou/ffmpeg-worker/data
echo "data/" >> /home/bballou/ffmpeg-worker/.gitignore

Step 5: Commit

git add Dockerfile docker-compose.yml .dockerignore .gitignore
git commit -m "feat: add Docker configuration"

Task 11: Integration Test

Files:

  • Create: tests/test_integration.py

Step 1: Write integration test

Create tests/test_integration.py:

"""
Integration test - requires Docker and FFmpeg.
Run with: pytest tests/test_integration.py -v -s
"""
import pytest
import subprocess
import time
import requests
import os

WORKER_URL = "http://localhost:8000"


@pytest.fixture(scope="module")
def docker_compose():
    """Start docker-compose for integration tests."""
    # Skip if not running integration tests
    if os.environ.get("SKIP_INTEGRATION"):
        pytest.skip("Skipping integration tests")

    # Create a test video file
    data_dir = os.path.join(os.path.dirname(__file__), "..", "data")
    os.makedirs(data_dir, exist_ok=True)
    test_input = os.path.join(data_dir, "test_input.mp4")

    # Generate test video with FFmpeg (2 seconds of color bars)
    subprocess.run([
        "ffmpeg", "-y", "-f", "lavfi", "-i", "testsrc=duration=2:size=320x240:rate=30",
        "-c:v", "libx264", "-pix_fmt", "yuv420p", test_input
    ], check=True, capture_output=True)

    # Start docker-compose
    subprocess.run(["docker-compose", "up", "-d", "--build"], check=True)

    # Wait for service to be ready
    for _ in range(30):
        try:
            response = requests.get(f"{WORKER_URL}/health")
            if response.status_code == 200:
                break
        except requests.ConnectionError:
            pass
        time.sleep(1)
    else:
        pytest.fail("Service did not start in time")

    yield

    # Cleanup
    subprocess.run(["docker-compose", "down"], check=True)


def test_full_workflow(docker_compose):
    """Test complete job submission and processing workflow."""
    # Submit job
    response = requests.post(
        f"{WORKER_URL}/jobs",
        json={"command": "-i test_input.mp4 -c:v libx264 -crf 28 test_output.mp4"}
    )
    assert response.status_code == 201
    job = response.json()
    job_id = job["id"]
    assert job["status"] == "queued"

    # Poll for completion
    for _ in range(60):
        response = requests.get(f"{WORKER_URL}/jobs/{job_id}")
        assert response.status_code == 200
        job = response.json()

        if job["status"] in ("completed", "failed"):
            break
        time.sleep(1)

    assert job["status"] == "completed", f"Job failed: {job.get('error')}"
    assert "test_output.mp4" in job["output_files"]

Step 2: Commit

git add tests/test_integration.py
git commit -m "feat: add integration test"

Summary

Tasks 1-11 build the complete FFmpeg worker:

  1. Project setup with FastAPI
  2. Job models (Pydantic)
  3. In-memory job store
  4. REST API endpoints
  5. FFmpeg command parser
  6. Progress parser
  7. Async job queue
  8. FFmpeg runner with ffprobe
  9. Worker loop integration
  10. Docker configuration
  11. Integration test

All tasks follow TDD with specific file paths, complete code, and exact commands.