mirror of
https://github.com/Xe138/AI-Trader.git
synced 2026-04-13 13:47:23 -04:00
feat(worker): integrate data preparation into run() method
Call _prepare_data before executing trades: - Download missing data if needed - Filter completed dates - Store warnings - Handle empty date scenarios Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -65,12 +65,13 @@ class SimulationWorker:
|
|||||||
|
|
||||||
Process:
|
Process:
|
||||||
1. Get job details (dates, models, config)
|
1. Get job details (dates, models, config)
|
||||||
2. For each date sequentially:
|
2. Prepare data (download if needed)
|
||||||
|
3. For each date sequentially:
|
||||||
a. Execute all models in parallel
|
a. Execute all models in parallel
|
||||||
b. Wait for all to complete
|
b. Wait for all to complete
|
||||||
c. Update progress
|
c. Update progress
|
||||||
3. Determine final job status
|
4. Determine final job status
|
||||||
4. Update job with final status
|
5. Store warnings if any
|
||||||
|
|
||||||
Error Handling:
|
Error Handling:
|
||||||
- Individual model failures: Mark detail as failed, continue with others
|
- Individual model failures: Mark detail as failed, continue with others
|
||||||
@@ -88,8 +89,16 @@ class SimulationWorker:
|
|||||||
|
|
||||||
logger.info(f"Starting job {self.job_id}: {len(date_range)} dates, {len(models)} models")
|
logger.info(f"Starting job {self.job_id}: {len(date_range)} dates, {len(models)} models")
|
||||||
|
|
||||||
# Execute date-by-date (sequential)
|
# NEW: Prepare price data (download if needed)
|
||||||
for date in date_range:
|
available_dates, warnings = self._prepare_data(date_range, models, config_path)
|
||||||
|
|
||||||
|
if not available_dates:
|
||||||
|
error_msg = "No trading dates available after price data preparation"
|
||||||
|
self.job_manager.update_job_status(self.job_id, "failed", error=error_msg)
|
||||||
|
return {"success": False, "error": error_msg}
|
||||||
|
|
||||||
|
# Execute available dates only
|
||||||
|
for date in available_dates:
|
||||||
logger.info(f"Processing date {date} with {len(models)} models")
|
logger.info(f"Processing date {date} with {len(models)} models")
|
||||||
self._execute_date(date, models, config_path)
|
self._execute_date(date, models, config_path)
|
||||||
|
|
||||||
@@ -103,6 +112,10 @@ class SimulationWorker:
|
|||||||
else:
|
else:
|
||||||
final_status = "failed"
|
final_status = "failed"
|
||||||
|
|
||||||
|
# Add warnings if any dates were skipped
|
||||||
|
if warnings:
|
||||||
|
self._add_job_warnings(warnings)
|
||||||
|
|
||||||
# Note: Job status is already updated by model_day_executor's detail status updates
|
# Note: Job status is already updated by model_day_executor's detail status updates
|
||||||
# We don't need to explicitly call update_job_status here as it's handled automatically
|
# We don't need to explicitly call update_job_status here as it's handled automatically
|
||||||
# by the status transition logic in JobManager.update_job_detail_status
|
# by the status transition logic in JobManager.update_job_detail_status
|
||||||
@@ -115,7 +128,8 @@ class SimulationWorker:
|
|||||||
"status": final_status,
|
"status": final_status,
|
||||||
"total_model_days": progress["total_model_days"],
|
"total_model_days": progress["total_model_days"],
|
||||||
"completed": progress["completed"],
|
"completed": progress["completed"],
|
||||||
"failed": progress["failed"]
|
"failed": progress["failed"],
|
||||||
|
"warnings": warnings
|
||||||
}
|
}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
100
tests/integration/test_async_download.py
Normal file
100
tests/integration/test_async_download.py
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
import pytest
|
||||||
|
import time
|
||||||
|
from api.database import initialize_database
|
||||||
|
from api.job_manager import JobManager
|
||||||
|
from api.simulation_worker import SimulationWorker
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
def test_worker_prepares_data_before_execution(tmp_path):
|
||||||
|
"""Test that worker calls _prepare_data before executing trades."""
|
||||||
|
db_path = str(tmp_path / "test.db")
|
||||||
|
initialize_database(db_path)
|
||||||
|
job_manager = JobManager(db_path=db_path)
|
||||||
|
|
||||||
|
# Create job
|
||||||
|
job_id = job_manager.create_job(
|
||||||
|
config_path="configs/default_config.json",
|
||||||
|
date_range=["2025-10-01"],
|
||||||
|
models=["gpt-5"]
|
||||||
|
)
|
||||||
|
|
||||||
|
worker = SimulationWorker(job_id=job_id, db_path=db_path)
|
||||||
|
|
||||||
|
# Mock _prepare_data to track call
|
||||||
|
original_prepare = worker._prepare_data
|
||||||
|
prepare_called = []
|
||||||
|
|
||||||
|
def mock_prepare(*args, **kwargs):
|
||||||
|
prepare_called.append(True)
|
||||||
|
return (["2025-10-01"], []) # Return available dates, no warnings
|
||||||
|
|
||||||
|
worker._prepare_data = mock_prepare
|
||||||
|
|
||||||
|
# Mock _execute_date to avoid actual execution
|
||||||
|
worker._execute_date = Mock()
|
||||||
|
|
||||||
|
# Run worker
|
||||||
|
result = worker.run()
|
||||||
|
|
||||||
|
# Verify _prepare_data was called
|
||||||
|
assert len(prepare_called) == 1
|
||||||
|
assert result["success"] is True
|
||||||
|
|
||||||
|
def test_worker_handles_no_available_dates(tmp_path):
|
||||||
|
"""Test worker fails gracefully when no dates are available."""
|
||||||
|
db_path = str(tmp_path / "test.db")
|
||||||
|
initialize_database(db_path)
|
||||||
|
job_manager = JobManager(db_path=db_path)
|
||||||
|
|
||||||
|
job_id = job_manager.create_job(
|
||||||
|
config_path="configs/default_config.json",
|
||||||
|
date_range=["2025-10-01"],
|
||||||
|
models=["gpt-5"]
|
||||||
|
)
|
||||||
|
|
||||||
|
worker = SimulationWorker(job_id=job_id, db_path=db_path)
|
||||||
|
|
||||||
|
# Mock _prepare_data to return empty dates
|
||||||
|
worker._prepare_data = Mock(return_value=([], []))
|
||||||
|
|
||||||
|
# Run worker
|
||||||
|
result = worker.run()
|
||||||
|
|
||||||
|
# Should fail with descriptive error
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "No trading dates available" in result["error"]
|
||||||
|
|
||||||
|
# Job should be marked as failed
|
||||||
|
job = job_manager.get_job(job_id)
|
||||||
|
assert job["status"] == "failed"
|
||||||
|
|
||||||
|
def test_worker_stores_warnings(tmp_path):
|
||||||
|
"""Test worker stores warnings from prepare_data."""
|
||||||
|
db_path = str(tmp_path / "test.db")
|
||||||
|
initialize_database(db_path)
|
||||||
|
job_manager = JobManager(db_path=db_path)
|
||||||
|
|
||||||
|
job_id = job_manager.create_job(
|
||||||
|
config_path="configs/default_config.json",
|
||||||
|
date_range=["2025-10-01"],
|
||||||
|
models=["gpt-5"]
|
||||||
|
)
|
||||||
|
|
||||||
|
worker = SimulationWorker(job_id=job_id, db_path=db_path)
|
||||||
|
|
||||||
|
# Mock _prepare_data to return warnings
|
||||||
|
warnings = ["Rate limited", "Skipped 1 date"]
|
||||||
|
worker._prepare_data = Mock(return_value=(["2025-10-01"], warnings))
|
||||||
|
worker._execute_date = Mock()
|
||||||
|
|
||||||
|
# Run worker
|
||||||
|
result = worker.run()
|
||||||
|
|
||||||
|
# Verify warnings in result
|
||||||
|
assert result["warnings"] == warnings
|
||||||
|
|
||||||
|
# Verify warnings stored in database
|
||||||
|
import json
|
||||||
|
job = job_manager.get_job(job_id)
|
||||||
|
stored_warnings = json.loads(job["warnings"])
|
||||||
|
assert stored_warnings == warnings
|
||||||
Reference in New Issue
Block a user