Compare commits

...

2 Commits

Author SHA1 Message Date
e590cdc13b fix: prevent already-completed simulations from re-running
Previously, when re-running a job with some model-days already completed:
- _prepare_data() marked them as "skipped" with error="Already completed"
- But _execute_date() didn't check the skip list before launching executors
- ModelDayExecutor would start, change status to "running", and never complete
- Job would hang with status="running" and pending count > 0

Fixed by:
- _prepare_data() now returns completion_skips: {model: {dates}}
- _execute_date() receives completion_skips and filters out already-completed models
- Skipped model-days are not submitted to ThreadPoolExecutor
- Job completes correctly, skipped model-days remain with status="skipped"

This ensures idempotent job behavior - re-running a job only executes
model-days that haven't completed yet.

Fixes #73
2025-11-03 00:03:57 -05:00
c74747d1d4 fix: revert **kwargs approach - FastMCP doesn't support it
Root cause: FastMCP uses inspect module to generate tool schemas from function
signatures. **kwargs prevents FastMCP from determining parameter types, causing
tool registration to fail.

Fix: Keep explicit parameters with defaults (signature=None, today_date=None, etc.)
but document in docstring that they are auto-injected.

This preserves:
- ContextInjector always overrides values (defense-in-depth from v0.3.0-alpha.40)
- FastMCP can generate proper tool schema
- Parameters visible to AI, but with clear documentation they're automatic

Trade-off: AI can still see the parameters, but documentation instructs not to provide them.
Combined with ContextInjector override, AI-provided values are ignored anyway.

Fixes TradeTools service crash on startup.
2025-11-02 23:41:00 -05:00
2 changed files with 29 additions and 19 deletions

View File

@@ -195,7 +195,8 @@ def _buy_impl(symbol: str, amount: int, signature: str = None, today_date: str =
@mcp.tool()
def buy(symbol: str, amount: int, **kwargs) -> Dict[str, Any]:
def buy(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
"""
Buy stock shares.
@@ -207,13 +208,10 @@ def buy(symbol: str, amount: int, **kwargs) -> Dict[str, Any]:
Dict[str, Any]:
- Success: {"CASH": remaining_cash, "SYMBOL": shares, ...}
- Failure: {"error": error_message, ...}
"""
# Extract injected parameters (added by ContextInjector, hidden from AI)
signature = kwargs.get("signature")
today_date = kwargs.get("today_date")
job_id = kwargs.get("job_id")
session_id = kwargs.get("session_id")
Note: signature, today_date, job_id, session_id are automatically injected by the system.
Do not provide these parameters - they will be added automatically.
"""
# Delegate to internal implementation
return _buy_impl(symbol, amount, signature, today_date, job_id, session_id)
@@ -340,7 +338,8 @@ def _sell_impl(symbol: str, amount: int, signature: str = None, today_date: str
@mcp.tool()
def sell(symbol: str, amount: int, **kwargs) -> Dict[str, Any]:
def sell(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
"""
Sell stock shares.
@@ -352,13 +351,10 @@ def sell(symbol: str, amount: int, **kwargs) -> Dict[str, Any]:
Dict[str, Any]:
- Success: {"CASH": remaining_cash, "SYMBOL": shares, ...}
- Failure: {"error": error_message, ...}
"""
# Extract injected parameters (added by ContextInjector, hidden from AI)
signature = kwargs.get("signature")
today_date = kwargs.get("today_date")
job_id = kwargs.get("job_id")
session_id = kwargs.get("session_id")
Note: signature, today_date, job_id, session_id are automatically injected by the system.
Do not provide these parameters - they will be added automatically.
"""
# Delegate to internal implementation
return _sell_impl(symbol, amount, signature, today_date, job_id, session_id)

View File

@@ -90,7 +90,7 @@ class SimulationWorker:
logger.info(f"Starting job {self.job_id}: {len(date_range)} dates, {len(models)} models")
# NEW: Prepare price data (download if needed)
available_dates, warnings = self._prepare_data(date_range, models, config_path)
available_dates, warnings, completion_skips = self._prepare_data(date_range, models, config_path)
if not available_dates:
error_msg = "No trading dates available after price data preparation"
@@ -100,7 +100,7 @@ class SimulationWorker:
# Execute available dates only
for date in available_dates:
logger.info(f"Processing date {date} with {len(models)} models")
self._execute_date(date, models, config_path)
self._execute_date(date, models, config_path, completion_skips)
# Job completed - determine final status
progress = self.job_manager.get_job_progress(self.job_id)
@@ -145,7 +145,8 @@ class SimulationWorker:
"error": error_msg
}
def _execute_date(self, date: str, models: List[str], config_path: str) -> None:
def _execute_date(self, date: str, models: List[str], config_path: str,
completion_skips: Dict[str, Set[str]] = None) -> None:
"""
Execute all models for a single date in parallel.
@@ -153,14 +154,24 @@ class SimulationWorker:
date: Trading date (YYYY-MM-DD)
models: List of model signatures to execute
config_path: Path to configuration file
completion_skips: {model: {dates}} of already-completed model-days to skip
Uses ThreadPoolExecutor to run all models concurrently for this date.
Waits for all models to complete before returning.
Skips models that have already completed this date.
"""
if completion_skips is None:
completion_skips = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all model executions for this date
futures = []
for model in models:
# Skip if this model-day was already completed
if date in completion_skips.get(model, set()):
logger.debug(f"Skipping {model} on {date} (already completed)")
continue
future = executor.submit(
self._execute_model_day,
date,
@@ -397,7 +408,10 @@ class SimulationWorker:
config_path: Path to configuration file
Returns:
Tuple of (available_dates, warnings)
Tuple of (available_dates, warnings, completion_skips)
- available_dates: Dates to process
- warnings: Warning messages
- completion_skips: {model: {dates}} of already-completed model-days
"""
from api.price_data_manager import PriceDataManager
@@ -456,7 +470,7 @@ class SimulationWorker:
self.job_manager.update_job_status(self.job_id, "running")
logger.info(f"Job {self.job_id}: Starting execution - {len(dates_to_process)} dates, {len(models)} models")
return dates_to_process, warnings
return dates_to_process, warnings, completion_skips
def get_job_info(self) -> Dict[str, Any]:
"""