mirror of
https://github.com/Xe138/AI-Trader.git
synced 2026-04-01 17:17:24 -04:00
feat: add resume mode and idempotent behavior to /simulate/trigger endpoint
BREAKING CHANGE: end_date is now required and cannot be null/empty
New Features:
- Resume mode: Set start_date to null to continue from last completed date per model
- Idempotent by default: Skip already-completed dates with replace_existing=false
- Per-model independence: Each model resumes from its own last completed date
- Cold start handling: If no data exists in resume mode, runs only end_date as single day
API Changes:
- start_date: Now optional (null enables resume mode)
- end_date: Now REQUIRED (cannot be null or empty string)
- replace_existing: New optional field (default: false for idempotent behavior)
Implementation:
- Added JobManager.get_last_completed_date_for_model() method
- Added JobManager.get_completed_model_dates() method
- Updated create_job() to support model_day_filter for selective task creation
- Fixed bug with start_date=None in price data checks
Documentation:
- Updated API_REFERENCE.md with complete examples and behavior matrix
- Updated QUICK_START.md with resume mode examples
- Updated docs/user-guide/using-the-api.md
- Added CHANGELOG_NEW_API.md with migration guide
- Updated all integration tests for new schema
- Updated client library examples (Python, TypeScript)
Migration:
- Old: {"start_date": "2025-01-16"}
- New: {"start_date": "2025-01-16", "end_date": "2025-01-16"}
- Resume: {"start_date": null, "end_date": "2025-01-31"}
See CHANGELOG_NEW_API.md for complete details.
This commit is contained in:
@@ -54,7 +54,8 @@ class JobManager:
|
||||
self,
|
||||
config_path: str,
|
||||
date_range: List[str],
|
||||
models: List[str]
|
||||
models: List[str],
|
||||
model_day_filter: Optional[List[tuple]] = None
|
||||
) -> str:
|
||||
"""
|
||||
Create new simulation job.
|
||||
@@ -63,6 +64,8 @@ class JobManager:
|
||||
config_path: Path to configuration file
|
||||
date_range: List of dates to simulate (YYYY-MM-DD)
|
||||
models: List of model signatures to execute
|
||||
model_day_filter: Optional list of (model, date) tuples to limit job_details.
|
||||
If None, creates job_details for all model-date combinations.
|
||||
|
||||
Returns:
|
||||
job_id: UUID of created job
|
||||
@@ -95,9 +98,10 @@ class JobManager:
|
||||
created_at
|
||||
))
|
||||
|
||||
# Create job_details for each model-day combination
|
||||
for date in date_range:
|
||||
for model in models:
|
||||
# Create job_details based on filter
|
||||
if model_day_filter is not None:
|
||||
# Only create job_details for specified model-day pairs
|
||||
for model, date in model_day_filter:
|
||||
cursor.execute("""
|
||||
INSERT INTO job_details (
|
||||
job_id, date, model, status
|
||||
@@ -105,8 +109,21 @@ class JobManager:
|
||||
VALUES (?, ?, ?, ?)
|
||||
""", (job_id, date, model, "pending"))
|
||||
|
||||
logger.info(f"Created job {job_id} with {len(model_day_filter)} model-day tasks (filtered)")
|
||||
else:
|
||||
# Create job_details for all model-day combinations
|
||||
for date in date_range:
|
||||
for model in models:
|
||||
cursor.execute("""
|
||||
INSERT INTO job_details (
|
||||
job_id, date, model, status
|
||||
)
|
||||
VALUES (?, ?, ?, ?)
|
||||
""", (job_id, date, model, "pending"))
|
||||
|
||||
logger.info(f"Created job {job_id} with {len(date_range)} dates and {len(models)} models")
|
||||
|
||||
conn.commit()
|
||||
logger.info(f"Created job {job_id} with {len(date_range)} dates and {len(models)} models")
|
||||
|
||||
return job_id
|
||||
|
||||
@@ -585,6 +602,67 @@ class JobManager:
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def get_last_completed_date_for_model(self, model: str) -> Optional[str]:
|
||||
"""
|
||||
Get last completed simulation date for a specific model.
|
||||
|
||||
Args:
|
||||
model: Model signature
|
||||
|
||||
Returns:
|
||||
Last completed date (YYYY-MM-DD) or None if no data exists
|
||||
"""
|
||||
conn = get_db_connection(self.db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute("""
|
||||
SELECT date
|
||||
FROM job_details
|
||||
WHERE model = ? AND status = 'completed'
|
||||
ORDER BY date DESC
|
||||
LIMIT 1
|
||||
""", (model,))
|
||||
|
||||
row = cursor.fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def get_completed_model_dates(self, models: List[str], start_date: str, end_date: str) -> Dict[str, List[str]]:
|
||||
"""
|
||||
Get all completed dates for each model within a date range.
|
||||
|
||||
Args:
|
||||
models: List of model signatures
|
||||
start_date: Start date (YYYY-MM-DD)
|
||||
end_date: End date (YYYY-MM-DD)
|
||||
|
||||
Returns:
|
||||
Dict mapping model signature to list of completed dates
|
||||
"""
|
||||
conn = get_db_connection(self.db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
result = {model: [] for model in models}
|
||||
|
||||
for model in models:
|
||||
cursor.execute("""
|
||||
SELECT DISTINCT date
|
||||
FROM job_details
|
||||
WHERE model = ? AND status = 'completed' AND date >= ? AND date <= ?
|
||||
ORDER BY date
|
||||
""", (model, start_date, end_date))
|
||||
|
||||
result[model] = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
return result
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def cleanup_old_jobs(self, days: int = 30) -> Dict[str, int]:
|
||||
"""
|
||||
Delete jobs older than threshold.
|
||||
|
||||
151
api/main.py
151
api/main.py
@@ -33,28 +33,36 @@ logger = logging.getLogger(__name__)
|
||||
# Pydantic models for request/response validation
|
||||
class SimulateTriggerRequest(BaseModel):
|
||||
"""Request body for POST /simulate/trigger."""
|
||||
start_date: str = Field(..., description="Start date for simulation (YYYY-MM-DD)")
|
||||
end_date: Optional[str] = Field(None, description="End date for simulation (YYYY-MM-DD). If not provided, simulates single day.")
|
||||
start_date: Optional[str] = Field(None, description="Start date for simulation (YYYY-MM-DD). If null/omitted, resumes from last completed date per model.")
|
||||
end_date: str = Field(..., description="End date for simulation (YYYY-MM-DD). Required.")
|
||||
models: Optional[List[str]] = Field(
|
||||
None,
|
||||
description="Optional: List of model signatures to simulate. If not provided, uses enabled models from config."
|
||||
)
|
||||
replace_existing: bool = Field(
|
||||
False,
|
||||
description="If true, replaces existing simulation data. If false (default), skips dates that already have data (idempotent)."
|
||||
)
|
||||
|
||||
@field_validator("start_date", "end_date")
|
||||
@classmethod
|
||||
def validate_date_format(cls, v):
|
||||
"""Validate date format."""
|
||||
if v is None:
|
||||
return v
|
||||
if v is None or v == "":
|
||||
return None
|
||||
try:
|
||||
datetime.strptime(v, "%Y-%m-%d")
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid date format: {v}. Expected YYYY-MM-DD")
|
||||
return v
|
||||
|
||||
def get_end_date(self) -> str:
|
||||
"""Get end date, defaulting to start_date if not provided."""
|
||||
return self.end_date or self.start_date
|
||||
@field_validator("end_date")
|
||||
@classmethod
|
||||
def validate_end_date_required(cls, v):
|
||||
"""Ensure end_date is not null or empty."""
|
||||
if v is None or v == "":
|
||||
raise ValueError("end_date is required and cannot be null or empty")
|
||||
return v
|
||||
|
||||
|
||||
class SimulateTriggerResponse(BaseModel):
|
||||
@@ -136,6 +144,12 @@ def create_app(
|
||||
Validates date range, downloads missing price data if needed,
|
||||
and creates job with available trading dates.
|
||||
|
||||
Supports:
|
||||
- Single date: start_date == end_date
|
||||
- Date range: start_date < end_date
|
||||
- Resume: start_date is null (each model resumes from its last completed date)
|
||||
- Idempotent: replace_existing=false skips already completed model-days
|
||||
|
||||
Raises:
|
||||
HTTPException 400: Validation errors, running job, or invalid dates
|
||||
HTTPException 503: Price data download failed
|
||||
@@ -151,12 +165,7 @@ def create_app(
|
||||
detail=f"Server configuration file not found: {config_path}"
|
||||
)
|
||||
|
||||
# Get end date (defaults to start_date for single day)
|
||||
end_date = request.get_end_date()
|
||||
|
||||
# Validate date range
|
||||
max_days = get_max_simulation_days()
|
||||
validate_date_range(request.start_date, end_date, max_days=max_days)
|
||||
end_date = request.end_date
|
||||
|
||||
# Determine which models to run
|
||||
import json
|
||||
@@ -180,13 +189,44 @@ def create_app(
|
||||
detail="No enabled models found in config. Either enable models in config or specify them in request."
|
||||
)
|
||||
|
||||
job_manager = JobManager(db_path=app.state.db_path)
|
||||
|
||||
# Handle resume logic (start_date is null)
|
||||
if request.start_date is None:
|
||||
# Resume mode: determine start date per model
|
||||
model_start_dates = {}
|
||||
|
||||
for model in models_to_run:
|
||||
last_date = job_manager.get_last_completed_date_for_model(model)
|
||||
|
||||
if last_date is None:
|
||||
# Cold start: use end_date as single-day simulation
|
||||
model_start_dates[model] = end_date
|
||||
else:
|
||||
# Resume from next day after last completed
|
||||
last_dt = datetime.strptime(last_date, "%Y-%m-%d")
|
||||
next_dt = last_dt + timedelta(days=1)
|
||||
model_start_dates[model] = next_dt.strftime("%Y-%m-%d")
|
||||
|
||||
# For validation purposes, use earliest start date
|
||||
earliest_start = min(model_start_dates.values())
|
||||
start_date = earliest_start
|
||||
else:
|
||||
# Explicit start date provided
|
||||
start_date = request.start_date
|
||||
model_start_dates = {model: start_date for model in models_to_run}
|
||||
|
||||
# Validate date range
|
||||
max_days = get_max_simulation_days()
|
||||
validate_date_range(start_date, end_date, max_days=max_days)
|
||||
|
||||
# Check price data and download if needed
|
||||
auto_download = os.getenv("AUTO_DOWNLOAD_PRICE_DATA", "true").lower() == "true"
|
||||
price_manager = PriceDataManager(db_path=app.state.db_path)
|
||||
|
||||
# Check what's missing
|
||||
# Check what's missing (use computed start_date, not request.start_date which may be None)
|
||||
missing_coverage = price_manager.get_missing_coverage(
|
||||
request.start_date,
|
||||
start_date,
|
||||
end_date
|
||||
)
|
||||
|
||||
@@ -203,7 +243,7 @@ def create_app(
|
||||
|
||||
logger.info(f"Downloading missing price data for {len(missing_coverage)} symbols")
|
||||
|
||||
requested_dates = set(expand_date_range(request.start_date, end_date))
|
||||
requested_dates = set(expand_date_range(start_date, end_date))
|
||||
|
||||
download_result = price_manager.download_missing_data_prioritized(
|
||||
missing_coverage,
|
||||
@@ -229,7 +269,7 @@ def create_app(
|
||||
|
||||
# Get available trading dates (after potential download)
|
||||
available_dates = price_manager.get_available_trading_dates(
|
||||
request.start_date,
|
||||
start_date,
|
||||
end_date
|
||||
)
|
||||
|
||||
@@ -237,11 +277,54 @@ def create_app(
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"No trading dates with complete price data in range "
|
||||
f"{request.start_date} to {end_date}. "
|
||||
f"{start_date} to {end_date}. "
|
||||
f"All symbols must have data for a date to be tradeable."
|
||||
)
|
||||
|
||||
job_manager = JobManager(db_path=app.state.db_path)
|
||||
# Handle idempotent behavior (skip already completed model-days)
|
||||
if not request.replace_existing:
|
||||
# Get existing completed dates per model
|
||||
completed_dates = job_manager.get_completed_model_dates(
|
||||
models_to_run,
|
||||
start_date,
|
||||
end_date
|
||||
)
|
||||
|
||||
# Build list of model-day tuples to simulate
|
||||
model_day_tasks = []
|
||||
for model in models_to_run:
|
||||
# Filter dates for this model
|
||||
model_start = model_start_dates[model]
|
||||
|
||||
for date in available_dates:
|
||||
# Skip if before model's start date
|
||||
if date < model_start:
|
||||
continue
|
||||
|
||||
# Skip if already completed (idempotent)
|
||||
if date in completed_dates.get(model, []):
|
||||
continue
|
||||
|
||||
model_day_tasks.append((model, date))
|
||||
|
||||
if not model_day_tasks:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="No new model-days to simulate. All requested dates are already completed. "
|
||||
"Use replace_existing=true to re-run."
|
||||
)
|
||||
|
||||
# Extract unique dates that will actually be run
|
||||
dates_to_run = sorted(list(set([date for _, date in model_day_tasks])))
|
||||
else:
|
||||
# Replace mode: run all model-date combinations
|
||||
dates_to_run = available_dates
|
||||
model_day_tasks = [
|
||||
(model, date)
|
||||
for model in models_to_run
|
||||
for date in available_dates
|
||||
if date >= model_start_dates[model]
|
||||
]
|
||||
|
||||
# Check if can start new job
|
||||
if not job_manager.can_start_new_job():
|
||||
@@ -250,11 +333,13 @@ def create_app(
|
||||
detail="Another simulation job is already running or pending. Please wait for it to complete."
|
||||
)
|
||||
|
||||
# Create job with available dates
|
||||
# Create job with dates that will be run
|
||||
# Pass model_day_tasks to only create job_details for tasks that will actually run
|
||||
job_id = job_manager.create_job(
|
||||
config_path=config_path,
|
||||
date_range=available_dates,
|
||||
models=models_to_run
|
||||
date_range=dates_to_run,
|
||||
models=models_to_run,
|
||||
model_day_filter=model_day_tasks
|
||||
)
|
||||
|
||||
# Start worker in background thread (only if not in test mode)
|
||||
@@ -266,12 +351,26 @@ def create_app(
|
||||
thread = threading.Thread(target=run_worker, daemon=True)
|
||||
thread.start()
|
||||
|
||||
logger.info(f"Triggered simulation job {job_id} with {len(available_dates)} dates")
|
||||
logger.info(f"Triggered simulation job {job_id} with {len(model_day_tasks)} model-day tasks")
|
||||
|
||||
# Build response message
|
||||
message = f"Simulation job created with {len(available_dates)} trading dates"
|
||||
total_model_days = len(model_day_tasks)
|
||||
message_parts = [f"Simulation job created with {total_model_days} model-day tasks"]
|
||||
|
||||
if request.start_date is None:
|
||||
message_parts.append("(resume mode)")
|
||||
|
||||
if not request.replace_existing:
|
||||
# Calculate how many were skipped
|
||||
total_possible = len(models_to_run) * len(available_dates)
|
||||
skipped = total_possible - total_model_days
|
||||
if skipped > 0:
|
||||
message_parts.append(f"({skipped} already completed, skipped)")
|
||||
|
||||
if download_info and download_info["rate_limited"]:
|
||||
message += " (rate limit reached - partial data)"
|
||||
message_parts.append("(rate limit reached - partial data)")
|
||||
|
||||
message = " ".join(message_parts)
|
||||
|
||||
# Get deployment mode info
|
||||
deployment_info = get_deployment_mode_dict()
|
||||
@@ -279,7 +378,7 @@ def create_app(
|
||||
response = SimulateTriggerResponse(
|
||||
job_id=job_id,
|
||||
status="pending",
|
||||
total_model_days=len(available_dates) * len(models_to_run),
|
||||
total_model_days=total_model_days,
|
||||
message=message,
|
||||
**deployment_info
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user