feat: add job CRUD API endpoints

This commit is contained in:
2025-11-30 15:19:46 -05:00
parent 2c2864b566
commit 5e9ac68d12
3 changed files with 104 additions and 1 deletions

View File

@@ -1,8 +1,37 @@
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException, status
from app.models import CreateJobRequest, Job, JobResponse, JobStatus
from app.store import JobStore
app = FastAPI(title="FFmpeg Worker", version="1.0.0")
job_store = JobStore()
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.post("/jobs", status_code=status.HTTP_201_CREATED)
async def create_job(request: CreateJobRequest) -> JobResponse:
job = Job(command=request.command)
job_store.add(job)
return JobResponse.model_validate(job.model_dump())
@app.get("/jobs/{job_id}")
async def get_job(job_id: str) -> JobResponse:
job = job_store.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
return JobResponse.model_validate(job.model_dump())
@app.get("/jobs")
async def list_jobs(status: JobStatus | None = None) -> list[JobResponse]:
if status is not None:
jobs = job_store.list_by_status(status)
else:
jobs = job_store.list_all()
return [JobResponse.model_validate(job.model_dump()) for job in jobs]

View File

@@ -2,3 +2,4 @@ fastapi==0.123.0
uvicorn[standard]==0.38.0
pydantic==2.12.5
pytest==8.3.0
httpx==0.28.1

73
tests/test_api.py Normal file
View File

@@ -0,0 +1,73 @@
import pytest
from fastapi.testclient import TestClient
from app.main import app, job_store
@pytest.fixture(autouse=True)
def clear_store():
job_store._jobs.clear()
yield
job_store._jobs.clear()
client = TestClient(app)
def test_health():
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
def test_create_job():
response = client.post("/jobs", json={"command": "-i input.mp4 output.mp4"})
assert response.status_code == 201
data = response.json()
assert data["id"].startswith("job_")
assert data["status"] == "queued"
assert data["command"] == "-i input.mp4 output.mp4"
def test_get_job():
# Create a job first
create_response = client.post("/jobs", json={"command": "-i test.mp4 out.mp4"})
job_id = create_response.json()["id"]
# Get the job
response = client.get(f"/jobs/{job_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == job_id
assert data["status"] == "queued"
def test_get_nonexistent_job():
response = client.get("/jobs/nonexistent")
assert response.status_code == 404
def test_list_jobs():
client.post("/jobs", json={"command": "-i a.mp4 b.mp4"})
client.post("/jobs", json={"command": "-i c.mp4 d.mp4"})
response = client.get("/jobs")
assert response.status_code == 200
data = response.json()
assert len(data) == 2
def test_list_jobs_filter_by_status():
client.post("/jobs", json={"command": "-i a.mp4 b.mp4"})
response = client.get("/jobs?status=queued")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["status"] == "queued"