diff --git a/app/main.py b/app/main.py index 0cd5bf4..4d55907 100644 --- a/app/main.py +++ b/app/main.py @@ -1,11 +1,30 @@ +import asyncio +from contextlib import asynccontextmanager + from fastapi import FastAPI, HTTPException, status from app.models import CreateJobRequest, Job, JobResponse, JobStatus +from app.queue import JobQueue, worker_loop from app.store import JobStore -app = FastAPI(title="FFmpeg Worker", version="1.0.0") - job_store = JobStore() +job_queue = JobQueue() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Start worker on startup + worker_task = asyncio.create_task(worker_loop(job_queue, job_store)) + yield + # Cancel worker on shutdown + worker_task.cancel() + try: + await worker_task + except asyncio.CancelledError: + pass + + +app = FastAPI(title="FFmpeg Worker", version="1.0.0", lifespan=lifespan) @app.get("/health") @@ -17,6 +36,7 @@ async def health() -> dict[str, str]: async def create_job(request: CreateJobRequest) -> JobResponse: job = Job(command=request.command) job_store.add(job) + await job_queue.enqueue(job.id) return JobResponse.model_validate(job.model_dump()) diff --git a/app/queue.py b/app/queue.py index 65e33c6..ce9dade 100644 --- a/app/queue.py +++ b/app/queue.py @@ -1,4 +1,16 @@ import asyncio +import os +from datetime import datetime, timezone + +from app.ffmpeg import ( + parse_command, + resolve_paths, + get_duration, + parse_progress, + extract_output_path, +) +from app.models import Job, JobStatus, Progress +from app.store import JobStore class JobQueue: @@ -13,3 +25,84 @@ class JobQueue: def size(self) -> int: return self._queue.qsize() + + +async def run_ffmpeg(job: Job, data_path: str, timeout: int) -> None: + """Execute FFmpeg command and update job with progress.""" + args = parse_command(job.command) + resolved_args = resolve_paths(args, data_path) + + # Find input file for duration + input_path = None + for i, arg in enumerate(resolved_args): + if arg == "-i" and i + 1 < len(resolved_args): + input_path = resolved_args[i + 1] + break + + duration = None + if input_path: + duration = await get_duration(input_path) + + # Build FFmpeg command with progress output + ffmpeg_args = ["ffmpeg", "-y", "-progress", "pipe:1", "-nostats"] + resolved_args + + process = await asyncio.create_subprocess_exec( + *ffmpeg_args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Read progress output + progress_buffer = "" + try: + async with asyncio.timeout(timeout): + while True: + line = await process.stdout.readline() + if not line: + break + progress_buffer += line.decode() + + # Parse progress when we get a complete block + if "progress=" in progress_buffer: + job.progress = parse_progress(progress_buffer, duration) + progress_buffer = "" + + _, stderr = await process.communicate() + except asyncio.TimeoutError: + process.kill() + await process.wait() + raise TimeoutError(f"FFmpeg timed out after {timeout} seconds") + + if process.returncode != 0: + raise RuntimeError(stderr.decode()) + + # Extract output path + output_path = extract_output_path(args) + if output_path: + job.output_files = [output_path] + + +async def worker_loop(job_queue: "JobQueue", job_store: JobStore) -> None: + """Main worker loop that processes jobs from the queue.""" + data_path = os.environ.get("DATA_PATH", "/data") + timeout = int(os.environ.get("FFMPEG_TIMEOUT", "3600")) + + while True: + job_id = await job_queue.dequeue() + job = job_store.get(job_id) + + if job is None: + continue + + job.status = JobStatus.RUNNING + job.started_at = datetime.now(timezone.utc) + job.progress = Progress() + + try: + await run_ffmpeg(job, data_path, timeout) + job.status = JobStatus.COMPLETED + except Exception as e: + job.status = JobStatus.FAILED + job.error = str(e) + finally: + job.completed_at = datetime.now(timezone.utc)