Files
ffmpeg-worker/docs/plans/2025-11-30-ffmpeg-worker-implementation.md
2025-11-30 14:59:27 -05:00

1360 lines
31 KiB
Markdown

# FFmpeg Worker Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Build a dockerized FFmpeg worker with REST API for job submission and status polling.
**Architecture:** FastAPI app with in-memory job queue, single background worker processing jobs sequentially, real-time FFmpeg progress parsing via `-progress pipe:1`.
**Tech Stack:** Python 3.14, FastAPI, Pydantic, asyncio, ffmpeg/ffprobe CLI
---
## Task 1: Project Setup
**Files:**
- Create: `requirements.txt`
- Create: `app/__init__.py`
- Create: `app/main.py`
**Step 1: Create requirements.txt**
```txt
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.9.0
```
**Step 2: Create app package**
Create `app/__init__.py` (empty file).
**Step 3: Create minimal FastAPI app**
Create `app/main.py`:
```python
from fastapi import FastAPI
app = FastAPI(title="FFmpeg Worker", version="1.0.0")
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
```
**Step 4: Test manually**
Run: `cd /home/bballou/ffmpeg-worker && uvicorn app.main:app --reload`
In another terminal: `curl http://localhost:8000/health`
Expected: `{"status":"ok"}`
Stop the server (Ctrl+C).
**Step 5: Commit**
```bash
git add requirements.txt app/
git commit -m "feat: initial FastAPI setup with health endpoint"
```
---
## Task 2: Job Models
**Files:**
- Create: `app/models.py`
- Create: `tests/__init__.py`
- Create: `tests/test_models.py`
**Step 1: Create tests package**
Create `tests/__init__.py` (empty file).
**Step 2: Write the failing test**
Create `tests/test_models.py`:
```python
from datetime import datetime
from app.models import Job, JobStatus, CreateJobRequest, JobResponse
def test_job_creation():
job = Job(command="-i input.mp4 output.mp4")
assert job.id.startswith("job_")
assert len(job.id) == 20 # job_ + 16 hex chars
assert job.status == JobStatus.QUEUED
assert job.command == "-i input.mp4 output.mp4"
assert isinstance(job.created_at, datetime)
assert job.started_at is None
assert job.completed_at is None
assert job.progress is None
assert job.output_files == []
assert job.error is None
def test_create_job_request():
request = CreateJobRequest(command="-i test.mp4 out.mp4")
assert request.command == "-i test.mp4 out.mp4"
def test_job_response_from_job():
job = Job(command="-i input.mp4 output.mp4")
response = JobResponse.model_validate(job.model_dump())
assert response.id == job.id
assert response.status == JobStatus.QUEUED
```
**Step 3: Run test to verify it fails**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_models.py -v`
Expected: FAIL with ModuleNotFoundError (app.models doesn't exist)
**Step 4: Write the implementation**
Create `app/models.py`:
```python
import secrets
from datetime import datetime, timezone
from enum import Enum
from pydantic import BaseModel, Field
class JobStatus(str, Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class Progress(BaseModel):
frame: int = 0
fps: float = 0.0
time: str = "00:00:00.00"
bitrate: str = "0kbits/s"
percent: float | None = None
class Job(BaseModel):
id: str = Field(default_factory=lambda: f"job_{secrets.token_hex(8)}")
status: JobStatus = JobStatus.QUEUED
command: str
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
started_at: datetime | None = None
completed_at: datetime | None = None
progress: Progress | None = None
output_files: list[str] = Field(default_factory=list)
error: str | None = None
class CreateJobRequest(BaseModel):
command: str
class JobResponse(BaseModel):
id: str
status: JobStatus
command: str
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
progress: Progress | None = None
output_files: list[str] = Field(default_factory=list)
error: str | None = None
```
**Step 5: Run test to verify it passes**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_models.py -v`
Expected: PASS (3 tests)
**Step 6: Commit**
```bash
git add app/models.py tests/
git commit -m "feat: add job models with Pydantic"
```
---
## Task 3: Job Store
**Files:**
- Create: `app/store.py`
- Create: `tests/test_store.py`
**Step 1: Write the failing test**
Create `tests/test_store.py`:
```python
import pytest
from app.models import Job, JobStatus
from app.store import JobStore
def test_add_and_get_job():
store = JobStore()
job = Job(command="-i input.mp4 output.mp4")
store.add(job)
retrieved = store.get(job.id)
assert retrieved is not None
assert retrieved.id == job.id
assert retrieved.command == job.command
def test_get_nonexistent_job():
store = JobStore()
result = store.get("nonexistent")
assert result is None
def test_list_all_jobs():
store = JobStore()
job1 = Job(command="-i a.mp4 b.mp4")
job2 = Job(command="-i c.mp4 d.mp4")
store.add(job1)
store.add(job2)
jobs = store.list_all()
assert len(jobs) == 2
assert job1 in jobs
assert job2 in jobs
def test_list_jobs_by_status():
store = JobStore()
job1 = Job(command="-i a.mp4 b.mp4")
job2 = Job(command="-i c.mp4 d.mp4")
job2.status = JobStatus.RUNNING
store.add(job1)
store.add(job2)
queued = store.list_by_status(JobStatus.QUEUED)
running = store.list_by_status(JobStatus.RUNNING)
assert len(queued) == 1
assert len(running) == 1
assert queued[0].id == job1.id
assert running[0].id == job2.id
```
**Step 2: Run test to verify it fails**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_store.py -v`
Expected: FAIL with ModuleNotFoundError
**Step 3: Write the implementation**
Create `app/store.py`:
```python
from app.models import Job, JobStatus
class JobStore:
def __init__(self) -> None:
self._jobs: dict[str, Job] = {}
def add(self, job: Job) -> None:
self._jobs[job.id] = job
def get(self, job_id: str) -> Job | None:
return self._jobs.get(job_id)
def list_all(self) -> list[Job]:
return list(self._jobs.values())
def list_by_status(self, status: JobStatus) -> list[Job]:
return [job for job in self._jobs.values() if job.status == status]
```
**Step 4: Run test to verify it passes**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_store.py -v`
Expected: PASS (4 tests)
**Step 5: Commit**
```bash
git add app/store.py tests/test_store.py
git commit -m "feat: add in-memory job store"
```
---
## Task 4: API Endpoints
**Files:**
- Modify: `app/main.py`
- Create: `tests/test_api.py`
**Step 1: Write the failing tests**
Create `tests/test_api.py`:
```python
import pytest
from fastapi.testclient import TestClient
from app.main import app, job_store
@pytest.fixture(autouse=True)
def clear_store():
job_store._jobs.clear()
yield
job_store._jobs.clear()
client = TestClient(app)
def test_health():
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
def test_create_job():
response = client.post("/jobs", json={"command": "-i input.mp4 output.mp4"})
assert response.status_code == 201
data = response.json()
assert data["id"].startswith("job_")
assert data["status"] == "queued"
assert data["command"] == "-i input.mp4 output.mp4"
def test_get_job():
# Create a job first
create_response = client.post("/jobs", json={"command": "-i test.mp4 out.mp4"})
job_id = create_response.json()["id"]
# Get the job
response = client.get(f"/jobs/{job_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == job_id
assert data["status"] == "queued"
def test_get_nonexistent_job():
response = client.get("/jobs/nonexistent")
assert response.status_code == 404
def test_list_jobs():
client.post("/jobs", json={"command": "-i a.mp4 b.mp4"})
client.post("/jobs", json={"command": "-i c.mp4 d.mp4"})
response = client.get("/jobs")
assert response.status_code == 200
data = response.json()
assert len(data) == 2
def test_list_jobs_filter_by_status():
client.post("/jobs", json={"command": "-i a.mp4 b.mp4"})
response = client.get("/jobs?status=queued")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["status"] == "queued"
```
**Step 2: Run test to verify it fails**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_api.py -v`
Expected: FAIL (job_store not defined, endpoints missing)
**Step 3: Write the implementation**
Replace `app/main.py`:
```python
from fastapi import FastAPI, HTTPException, status
from app.models import CreateJobRequest, Job, JobResponse, JobStatus
from app.store import JobStore
app = FastAPI(title="FFmpeg Worker", version="1.0.0")
job_store = JobStore()
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.post("/jobs", status_code=status.HTTP_201_CREATED)
async def create_job(request: CreateJobRequest) -> JobResponse:
job = Job(command=request.command)
job_store.add(job)
return JobResponse.model_validate(job.model_dump())
@app.get("/jobs/{job_id}")
async def get_job(job_id: str) -> JobResponse:
job = job_store.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
return JobResponse.model_validate(job.model_dump())
@app.get("/jobs")
async def list_jobs(status: JobStatus | None = None) -> list[JobResponse]:
if status is not None:
jobs = job_store.list_by_status(status)
else:
jobs = job_store.list_all()
return [JobResponse.model_validate(job.model_dump()) for job in jobs]
```
**Step 4: Run test to verify it passes**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_api.py -v`
Expected: PASS (6 tests)
**Step 5: Commit**
```bash
git add app/main.py tests/test_api.py
git commit -m "feat: add job CRUD API endpoints"
```
---
## Task 5: FFmpeg Command Parser
**Files:**
- Create: `app/ffmpeg.py`
- Create: `tests/test_ffmpeg.py`
**Step 1: Write the failing tests**
Create `tests/test_ffmpeg.py`:
```python
import pytest
from app.ffmpeg import parse_command, resolve_paths
def test_parse_simple_command():
command = "-i input.mp4 output.mp4"
args = parse_command(command)
assert args == ["-i", "input.mp4", "output.mp4"]
def test_parse_command_with_options():
command = "-i input.mp4 -c:v libx264 -crf 23 output.mp4"
args = parse_command(command)
assert args == ["-i", "input.mp4", "-c:v", "libx264", "-crf", "23", "output.mp4"]
def test_parse_command_with_quotes():
command = '-i "input file.mp4" output.mp4'
args = parse_command(command)
assert args == ["-i", "input file.mp4", "output.mp4"]
def test_resolve_paths():
args = ["-i", "input/video.mp4", "-c:v", "libx264", "output/result.mp4"]
data_path = "/data"
resolved = resolve_paths(args, data_path)
assert resolved == [
"-i", "/data/input/video.mp4",
"-c:v", "libx264",
"/data/output/result.mp4"
]
def test_resolve_paths_preserves_absolute():
args = ["-i", "/already/absolute.mp4", "output.mp4"]
data_path = "/data"
resolved = resolve_paths(args, data_path)
assert resolved == ["-i", "/already/absolute.mp4", "/data/output.mp4"]
def test_resolve_paths_skips_options():
args = ["-c:v", "libx264", "-preset", "fast"]
data_path = "/data"
resolved = resolve_paths(args, data_path)
# Options and their values should not be resolved as paths
assert resolved == ["-c:v", "libx264", "-preset", "fast"]
```
**Step 2: Run test to verify it fails**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py -v`
Expected: FAIL with ModuleNotFoundError
**Step 3: Write the implementation**
Create `app/ffmpeg.py`:
```python
import shlex
from pathlib import Path
# FFmpeg options that take a value (not exhaustive, covers common ones)
OPTIONS_WITH_VALUES = {
"-c", "-c:v", "-c:a", "-b", "-b:v", "-b:a", "-r", "-s", "-ar", "-ac",
"-f", "-t", "-ss", "-to", "-vf", "-af", "-filter:v", "-filter:a",
"-preset", "-crf", "-qp", "-profile", "-level", "-pix_fmt", "-map",
"-metadata", "-disposition", "-threads", "-filter_complex",
}
def parse_command(command: str) -> list[str]:
"""Parse FFmpeg command string into argument list."""
return shlex.split(command)
def resolve_paths(args: list[str], data_path: str) -> list[str]:
"""Resolve relative paths against the data directory."""
resolved = []
skip_next = False
for i, arg in enumerate(args):
if skip_next:
resolved.append(arg)
skip_next = False
continue
# Check if this is an option that takes a value
if arg in OPTIONS_WITH_VALUES or arg.startswith("-"):
resolved.append(arg)
if arg in OPTIONS_WITH_VALUES:
skip_next = True
continue
# This looks like a file path - resolve if relative
path = Path(arg)
if not path.is_absolute():
resolved.append(str(Path(data_path) / arg))
else:
resolved.append(arg)
return resolved
```
**Step 4: Run test to verify it passes**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py -v`
Expected: PASS (6 tests)
**Step 5: Commit**
```bash
git add app/ffmpeg.py tests/test_ffmpeg.py
git commit -m "feat: add FFmpeg command parser with path resolution"
```
---
## Task 6: FFmpeg Progress Parser
**Files:**
- Modify: `app/ffmpeg.py`
- Modify: `tests/test_ffmpeg.py`
**Step 1: Write the failing tests**
Add to `tests/test_ffmpeg.py`:
```python
from app.ffmpeg import parse_progress, extract_output_path
def test_parse_progress():
output = """frame=1234
fps=30.24
total_size=5678900
out_time_ms=83450000
bitrate=1250.5kbits/s
progress=continue
"""
progress = parse_progress(output, duration_seconds=120.0)
assert progress.frame == 1234
assert progress.fps == 30.24
assert progress.time == "00:01:23.45"
assert progress.bitrate == "1250.5kbits/s"
assert progress.percent == pytest.approx(69.54, rel=0.01)
def test_parse_progress_no_duration():
output = "frame=100\nfps=25.0\nout_time_ms=4000000\nbitrate=500kbits/s\n"
progress = parse_progress(output, duration_seconds=None)
assert progress.frame == 100
assert progress.percent is None
def test_extract_output_path():
args = ["-i", "input.mp4", "-c:v", "libx264", "output.mp4"]
output_path = extract_output_path(args)
assert output_path == "output.mp4"
def test_extract_output_path_complex():
args = ["-i", "a.mp4", "-i", "b.mp4", "-filter_complex", "[0:v][1:v]concat", "out.mp4"]
output_path = extract_output_path(args)
assert output_path == "out.mp4"
```
**Step 2: Run test to verify it fails**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py::test_parse_progress -v`
Expected: FAIL (parse_progress not defined)
**Step 3: Write the implementation**
Add to `app/ffmpeg.py`:
```python
from app.models import Progress
def parse_progress(output: str, duration_seconds: float | None) -> Progress:
"""Parse FFmpeg progress output into Progress model."""
data: dict[str, str] = {}
for line in output.strip().split("\n"):
if "=" in line:
key, value = line.split("=", 1)
data[key.strip()] = value.strip()
frame = int(data.get("frame", 0))
fps = float(data.get("fps", 0.0))
out_time_ms = int(data.get("out_time_ms", 0))
bitrate = data.get("bitrate", "0kbits/s")
# Convert out_time_ms to HH:MM:SS.mm format
total_seconds = out_time_ms / 1_000_000
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = total_seconds % 60
time_str = f"{hours:02d}:{minutes:02d}:{seconds:05.2f}"
# Calculate percent if duration is known
percent = None
if duration_seconds and duration_seconds > 0:
percent = (total_seconds / duration_seconds) * 100
percent = min(percent, 100.0) # Cap at 100%
return Progress(
frame=frame,
fps=fps,
time=time_str,
bitrate=bitrate,
percent=percent,
)
def extract_output_path(args: list[str]) -> str | None:
"""Extract output file path from FFmpeg arguments (last non-option argument)."""
# Work backwards to find the last argument that isn't an option or option value
i = len(args) - 1
while i >= 0:
arg = args[i]
# Skip if it's an option
if arg.startswith("-"):
i -= 1
continue
# Check if previous arg is an option that takes a value
if i > 0 and args[i - 1] in OPTIONS_WITH_VALUES:
i -= 1
continue
# Check if it looks like a file path (has extension or contains /)
if "." in arg or "/" in arg:
return arg
i -= 1
return None
```
Also update the imports at the top of `app/ffmpeg.py`:
```python
import shlex
from pathlib import Path
from app.models import Progress
```
**Step 4: Run test to verify it passes**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py -v`
Expected: PASS (10 tests)
**Step 5: Commit**
```bash
git add app/ffmpeg.py tests/test_ffmpeg.py
git commit -m "feat: add FFmpeg progress parser and output path extraction"
```
---
## Task 7: Job Queue
**Files:**
- Create: `app/queue.py`
- Create: `tests/test_queue.py`
**Step 1: Write the failing tests**
Create `tests/test_queue.py`:
```python
import pytest
import asyncio
from app.queue import JobQueue
@pytest.mark.asyncio
async def test_enqueue_and_dequeue():
queue = JobQueue()
await queue.enqueue("job_123")
job_id = await queue.dequeue()
assert job_id == "job_123"
@pytest.mark.asyncio
async def test_queue_ordering():
queue = JobQueue()
await queue.enqueue("job_1")
await queue.enqueue("job_2")
await queue.enqueue("job_3")
assert await queue.dequeue() == "job_1"
assert await queue.dequeue() == "job_2"
assert await queue.dequeue() == "job_3"
@pytest.mark.asyncio
async def test_queue_size():
queue = JobQueue()
assert queue.size() == 0
await queue.enqueue("job_1")
assert queue.size() == 1
await queue.enqueue("job_2")
assert queue.size() == 2
await queue.dequeue()
assert queue.size() == 1
```
**Step 2: Add pytest-asyncio to requirements**
Add to `requirements.txt`:
```txt
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.9.0
pytest==8.3.0
pytest-asyncio==0.24.0
```
**Step 3: Run test to verify it fails**
Run: `cd /home/bballou/ffmpeg-worker && pip install pytest-asyncio && python -m pytest tests/test_queue.py -v`
Expected: FAIL with ModuleNotFoundError
**Step 4: Write the implementation**
Create `app/queue.py`:
```python
import asyncio
class JobQueue:
def __init__(self) -> None:
self._queue: asyncio.Queue[str] = asyncio.Queue()
async def enqueue(self, job_id: str) -> None:
await self._queue.put(job_id)
async def dequeue(self) -> str:
return await self._queue.get()
def size(self) -> int:
return self._queue.qsize()
```
**Step 5: Run test to verify it passes**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_queue.py -v`
Expected: PASS (3 tests)
**Step 6: Commit**
```bash
git add app/queue.py tests/test_queue.py requirements.txt
git commit -m "feat: add async job queue"
```
---
## Task 8: FFmpeg Runner
**Files:**
- Modify: `app/ffmpeg.py`
- Modify: `tests/test_ffmpeg.py`
**Step 1: Write the failing test**
Add to `tests/test_ffmpeg.py`:
```python
import asyncio
from unittest.mock import AsyncMock, patch, MagicMock
from app.models import Job
@pytest.mark.asyncio
async def test_get_duration():
from app.ffmpeg import get_duration
# Mock ffprobe output
mock_process = MagicMock()
mock_process.communicate = AsyncMock(return_value=(b"120.5\n", b""))
mock_process.returncode = 0
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
duration = await get_duration("/data/input.mp4")
assert duration == 120.5
@pytest.mark.asyncio
async def test_get_duration_failure():
from app.ffmpeg import get_duration
mock_process = MagicMock()
mock_process.communicate = AsyncMock(return_value=(b"", b"error"))
mock_process.returncode = 1
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
duration = await get_duration("/data/input.mp4")
assert duration is None
```
**Step 2: Run test to verify it fails**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py::test_get_duration -v`
Expected: FAIL (get_duration not defined)
**Step 3: Write the implementation**
Add to `app/ffmpeg.py`:
```python
import asyncio
async def get_duration(input_path: str) -> float | None:
"""Get duration of media file using ffprobe."""
try:
process = await asyncio.create_subprocess_exec(
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
input_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await process.communicate()
if process.returncode == 0:
return float(stdout.decode().strip())
except (ValueError, OSError):
pass
return None
```
**Step 4: Run test to verify it passes**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/test_ffmpeg.py::test_get_duration tests/test_ffmpeg.py::test_get_duration_failure -v`
Expected: PASS (2 tests)
**Step 5: Commit**
```bash
git add app/ffmpeg.py tests/test_ffmpeg.py
git commit -m "feat: add ffprobe duration detection"
```
---
## Task 9: Worker Loop
**Files:**
- Modify: `app/queue.py`
- Modify: `app/main.py`
**Step 1: Add worker loop to queue.py**
Add to `app/queue.py`:
```python
import asyncio
import os
from datetime import datetime, timezone
from app.ffmpeg import (
parse_command,
resolve_paths,
get_duration,
parse_progress,
extract_output_path,
)
from app.models import Job, JobStatus, Progress
from app.store import JobStore
class JobQueue:
def __init__(self) -> None:
self._queue: asyncio.Queue[str] = asyncio.Queue()
async def enqueue(self, job_id: str) -> None:
await self._queue.put(job_id)
async def dequeue(self) -> str:
return await self._queue.get()
def size(self) -> int:
return self._queue.qsize()
async def run_ffmpeg(job: Job, data_path: str, timeout: int) -> None:
"""Execute FFmpeg command and update job with progress."""
args = parse_command(job.command)
resolved_args = resolve_paths(args, data_path)
# Find input file for duration
input_path = None
for i, arg in enumerate(resolved_args):
if arg == "-i" and i + 1 < len(resolved_args):
input_path = resolved_args[i + 1]
break
duration = None
if input_path:
duration = await get_duration(input_path)
# Build FFmpeg command with progress output
ffmpeg_args = ["ffmpeg", "-y", "-progress", "pipe:1", "-nostats"] + resolved_args
process = await asyncio.create_subprocess_exec(
*ffmpeg_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Read progress output
progress_buffer = ""
try:
async with asyncio.timeout(timeout):
while True:
line = await process.stdout.readline()
if not line:
break
progress_buffer += line.decode()
# Parse progress when we get a complete block
if "progress=" in progress_buffer:
job.progress = parse_progress(progress_buffer, duration)
progress_buffer = ""
_, stderr = await process.communicate()
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise TimeoutError(f"FFmpeg timed out after {timeout} seconds")
if process.returncode != 0:
raise RuntimeError(stderr.decode())
# Extract output path
output_path = extract_output_path(args)
if output_path:
job.output_files = [output_path]
async def worker_loop(job_queue: "JobQueue", job_store: JobStore) -> None:
"""Main worker loop that processes jobs from the queue."""
data_path = os.environ.get("DATA_PATH", "/data")
timeout = int(os.environ.get("FFMPEG_TIMEOUT", "3600"))
while True:
job_id = await job_queue.dequeue()
job = job_store.get(job_id)
if job is None:
continue
job.status = JobStatus.RUNNING
job.started_at = datetime.now(timezone.utc)
job.progress = Progress()
try:
await run_ffmpeg(job, data_path, timeout)
job.status = JobStatus.COMPLETED
except Exception as e:
job.status = JobStatus.FAILED
job.error = str(e)
finally:
job.completed_at = datetime.now(timezone.utc)
```
**Step 2: Wire up worker in main.py**
Replace `app/main.py`:
```python
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, status
from app.models import CreateJobRequest, Job, JobResponse, JobStatus
from app.queue import JobQueue, worker_loop
from app.store import JobStore
job_store = JobStore()
job_queue = JobQueue()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start worker on startup
worker_task = asyncio.create_task(worker_loop(job_queue, job_store))
yield
# Cancel worker on shutdown
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
app = FastAPI(title="FFmpeg Worker", version="1.0.0", lifespan=lifespan)
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.post("/jobs", status_code=status.HTTP_201_CREATED)
async def create_job(request: CreateJobRequest) -> JobResponse:
job = Job(command=request.command)
job_store.add(job)
await job_queue.enqueue(job.id)
return JobResponse.model_validate(job.model_dump())
@app.get("/jobs/{job_id}")
async def get_job(job_id: str) -> JobResponse:
job = job_store.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
return JobResponse.model_validate(job.model_dump())
@app.get("/jobs")
async def list_jobs(status: JobStatus | None = None) -> list[JobResponse]:
if status is not None:
jobs = job_store.list_by_status(status)
else:
jobs = job_store.list_all()
return [JobResponse.model_validate(job.model_dump()) for job in jobs]
```
**Step 3: Run all tests**
Run: `cd /home/bballou/ffmpeg-worker && python -m pytest tests/ -v`
Expected: All tests PASS
**Step 4: Commit**
```bash
git add app/queue.py app/main.py
git commit -m "feat: add worker loop with FFmpeg execution"
```
---
## Task 10: Dockerfile
**Files:**
- Create: `Dockerfile`
- Create: `docker-compose.yml`
- Create: `.dockerignore`
**Step 1: Create Dockerfile**
Create `Dockerfile`:
```dockerfile
FROM python:3.14-slim
# Install FFmpeg
RUN apt-get update && \
apt-get install -y --no-install-recommends ffmpeg && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY app/ ./app/
# Create data directory
RUN mkdir -p /data
ENV DATA_PATH=/data
ENV FFMPEG_TIMEOUT=3600
ENV HOST=0.0.0.0
ENV PORT=8000
EXPOSE 8000
CMD ["sh", "-c", "uvicorn app.main:app --host $HOST --port $PORT"]
```
**Step 2: Create docker-compose.yml**
Create `docker-compose.yml`:
```yaml
services:
ffmpeg-worker:
build: .
ports:
- "8000:8000"
volumes:
- ./data:/data
environment:
- DATA_PATH=/data
- FFMPEG_TIMEOUT=3600
```
**Step 3: Create .dockerignore**
Create `.dockerignore`:
```
__pycache__/
*.pyc
.git/
.gitignore
tests/
docs/
data/
*.md
.env
.venv/
```
**Step 4: Create data directory**
```bash
mkdir -p /home/bballou/ffmpeg-worker/data
echo "data/" >> /home/bballou/ffmpeg-worker/.gitignore
```
**Step 5: Commit**
```bash
git add Dockerfile docker-compose.yml .dockerignore .gitignore
git commit -m "feat: add Docker configuration"
```
---
## Task 11: Integration Test
**Files:**
- Create: `tests/test_integration.py`
**Step 1: Write integration test**
Create `tests/test_integration.py`:
```python
"""
Integration test - requires Docker and FFmpeg.
Run with: pytest tests/test_integration.py -v -s
"""
import pytest
import subprocess
import time
import requests
import os
WORKER_URL = "http://localhost:8000"
@pytest.fixture(scope="module")
def docker_compose():
"""Start docker-compose for integration tests."""
# Skip if not running integration tests
if os.environ.get("SKIP_INTEGRATION"):
pytest.skip("Skipping integration tests")
# Create a test video file
data_dir = os.path.join(os.path.dirname(__file__), "..", "data")
os.makedirs(data_dir, exist_ok=True)
test_input = os.path.join(data_dir, "test_input.mp4")
# Generate test video with FFmpeg (2 seconds of color bars)
subprocess.run([
"ffmpeg", "-y", "-f", "lavfi", "-i", "testsrc=duration=2:size=320x240:rate=30",
"-c:v", "libx264", "-pix_fmt", "yuv420p", test_input
], check=True, capture_output=True)
# Start docker-compose
subprocess.run(["docker-compose", "up", "-d", "--build"], check=True)
# Wait for service to be ready
for _ in range(30):
try:
response = requests.get(f"{WORKER_URL}/health")
if response.status_code == 200:
break
except requests.ConnectionError:
pass
time.sleep(1)
else:
pytest.fail("Service did not start in time")
yield
# Cleanup
subprocess.run(["docker-compose", "down"], check=True)
def test_full_workflow(docker_compose):
"""Test complete job submission and processing workflow."""
# Submit job
response = requests.post(
f"{WORKER_URL}/jobs",
json={"command": "-i test_input.mp4 -c:v libx264 -crf 28 test_output.mp4"}
)
assert response.status_code == 201
job = response.json()
job_id = job["id"]
assert job["status"] == "queued"
# Poll for completion
for _ in range(60):
response = requests.get(f"{WORKER_URL}/jobs/{job_id}")
assert response.status_code == 200
job = response.json()
if job["status"] in ("completed", "failed"):
break
time.sleep(1)
assert job["status"] == "completed", f"Job failed: {job.get('error')}"
assert "test_output.mp4" in job["output_files"]
```
**Step 2: Commit**
```bash
git add tests/test_integration.py
git commit -m "feat: add integration test"
```
---
## Summary
Tasks 1-11 build the complete FFmpeg worker:
1. Project setup with FastAPI
2. Job models (Pydantic)
3. In-memory job store
4. REST API endpoints
5. FFmpeg command parser
6. Progress parser
7. Async job queue
8. FFmpeg runner with ffprobe
9. Worker loop integration
10. Docker configuration
11. Integration test
All tasks follow TDD with specific file paths, complete code, and exact commands.