Compare commits

..

8 Commits

Author SHA1 Message Date
e590cdc13b fix: prevent already-completed simulations from re-running
Previously, when re-running a job with some model-days already completed:
- _prepare_data() marked them as "skipped" with error="Already completed"
- But _execute_date() didn't check the skip list before launching executors
- ModelDayExecutor would start, change status to "running", and never complete
- Job would hang with status="running" and pending count > 0

Fixed by:
- _prepare_data() now returns completion_skips: {model: {dates}}
- _execute_date() receives completion_skips and filters out already-completed models
- Skipped model-days are not submitted to ThreadPoolExecutor
- Job completes correctly, skipped model-days remain with status="skipped"

This ensures idempotent job behavior - re-running a job only executes
model-days that haven't completed yet.

Fixes #73
2025-11-03 00:03:57 -05:00
c74747d1d4 fix: revert **kwargs approach - FastMCP doesn't support it
Root cause: FastMCP uses inspect module to generate tool schemas from function
signatures. **kwargs prevents FastMCP from determining parameter types, causing
tool registration to fail.

Fix: Keep explicit parameters with defaults (signature=None, today_date=None, etc.)
but document in docstring that they are auto-injected.

This preserves:
- ContextInjector always overrides values (defense-in-depth from v0.3.0-alpha.40)
- FastMCP can generate proper tool schema
- Parameters visible to AI, but with clear documentation they're automatic

Trade-off: AI can still see the parameters, but documentation instructs not to provide them.
Combined with ContextInjector override, AI-provided values are ignored anyway.

Fixes TradeTools service crash on startup.
2025-11-02 23:41:00 -05:00
96f6b78a93 refactor: hide context parameters from AI model tool schema
Prevent AI hallucination of runtime parameters by hiding them from the tool schema.

Architecture:
- Public tool functions (buy/sell) only expose symbol and amount to AI
- Use **kwargs to accept hidden parameters (signature, job_id, today_date, session_id)
- Internal _impl functions contain the actual business logic
- ContextInjector injects parameters into kwargs (invisible to AI)

Benefits:
- AI cannot see or hallucinate signature/job_id/session_id parameters
- Cleaner tool schema focuses on trading-relevant parameters only
- Defense-in-depth: ContextInjector still overrides any provided values
- More maintainable: clear separation of public API vs internal implementation

Example AI sees:
  buy(symbol: str, amount: int) -> dict

Actual execution:
  buy(symbol="AAPL", amount=10, signature="gpt-5", job_id="...", ...)

Fixes #TBD
2025-11-02 23:34:07 -05:00
6c395f740d fix: always override context parameters in ContextInjector
Root cause: AI models were hallucinating signature/job_id/today_date values
and passing them in tool calls. The ContextInjector was checking
"if param not in request.args" before injecting, which failed when AI
provided (incorrect) values.

Fix: Always override context parameters, never trust AI-provided values.

Evidence from logs:
- ContextInjector had correct values (self.signature=gpt-5, job_id=6dabd9e6...)
- But AI was passing signature=None or hallucinated values like "fundamental-bot-v1"
- After injection, args showed the AI's (wrong) values, not the interceptor's

This ensures runtime context is ALWAYS injected regardless of what the AI sends.

Fixes #TBD
2025-11-02 23:30:49 -05:00
618943b278 debug: add self attribute logging to ContextInjector.__call__
Log ContextInjector instance ID and attribute values at entry to __call__()
to diagnose why attributes appear as None during tool invocation despite
being set correctly during set_context().

This will reveal whether:
- Multiple ContextInjector instances exist
- Attributes are being overwritten/cleared
- Wrong instance is being invoked
2025-11-02 23:17:52 -05:00
1c19eea29a debug: add comprehensive diagnostic logging for ContextInjector flow
Add instrumentation at component boundaries to trace where ContextInjector values become None:
- ModelDayExecutor: Log ContextInjector creation and set_context() invocation
- BaseAgent.set_context(): Log entry, client creation, tool reload, completion
- Includes object IDs to verify instance identity across boundaries

Part of systematic debugging investigation for issue #TBD.
2025-11-02 23:05:40 -05:00
e968434062 fix: reload tools after context injection and prevent database locking
Critical fixes for ContextInjector and database concurrency:

1. ContextInjector Not Working:
   - Made set_context() async to reload tools after recreating MCP client
   - Tools from old client (without interceptor) were still being used
   - Now tools are reloaded from new client with interceptor active
   - This ensures buy/sell calls properly receive injected parameters

2. Database Locking:
   - Closed main connection before _write_results_to_db() opens new one
   - SQLite doesn't handle concurrent write connections well
   - Prevents "database is locked" error during position writes

Changes:
- agent/base_agent/base_agent.py:
  - async def set_context() instead of def set_context()
  - Added: self.tools = await self.client.get_tools()
- api/model_day_executor.py:
  - await agent.set_context(context_injector)
  - conn.close() before _write_results_to_db()

Root Cause:
When recreating the MCP client with tool_interceptors, the old tools
were still cached in self.tools and being passed to the AI agent.
The interceptor was never invoked, so job_id/signature/date were missing.
2025-11-02 22:42:17 -05:00
4c1d23a7c8 fix: correct get_db_path() usage to pass base database path
The get_db_path() function requires a base_db_path argument
to properly resolve PROD vs DEV database paths. Updated all
calls to pass "data/jobs.db" as the base path.

Changes:
- agent_tools/tool_trade.py: Fix 3 occurrences (lines 33, 113, 236)
- tools/price_tools.py: Fix 2 occurrences in new database functions
- Remove unused get_db_path import from tool_trade.py

This fixes TypeError when running simulations:
  get_db_path() missing 1 required positional argument: 'base_db_path'

The get_db_connection() function internally calls get_db_path()
to resolve the correct database path based on DEPLOYMENT_MODE.
2025-11-02 22:26:45 -05:00
6 changed files with 104 additions and 43 deletions

View File

@@ -221,7 +221,7 @@ class BaseAgent:
print(f"✅ Agent {self.signature} initialization completed")
def set_context(self, context_injector: "ContextInjector") -> None:
async def set_context(self, context_injector: "ContextInjector") -> None:
"""
Inject ContextInjector after initialization.
@@ -232,14 +232,24 @@ class BaseAgent:
context_injector: Configured ContextInjector instance with
correct signature, today_date, job_id, session_id
"""
print(f"[DEBUG] set_context() ENTRY: Received context_injector with signature={context_injector.signature}, date={context_injector.today_date}, job_id={context_injector.job_id}, session_id={context_injector.session_id}")
self.context_injector = context_injector
print(f"[DEBUG] set_context(): Set self.context_injector, id={id(self.context_injector)}")
# Recreate MCP client with the interceptor
# Note: We need to recreate because MultiServerMCPClient doesn't have add_interceptor()
print(f"[DEBUG] set_context(): Creating new MCP client with interceptor, id={id(context_injector)}")
self.client = MultiServerMCPClient(
self.mcp_config,
tool_interceptors=[context_injector]
)
print(f"[DEBUG] set_context(): MCP client created")
# CRITICAL: Reload tools from new client so they use the interceptor
print(f"[DEBUG] set_context(): Reloading tools...")
self.tools = await self.client.get_tools()
print(f"[DEBUG] set_context(): Tools reloaded, count={len(self.tools)}")
print(f"✅ Context injected: signature={context_injector.signature}, "
f"date={context_injector.today_date}, job_id={context_injector.job_id}, "

View File

@@ -49,14 +49,16 @@ class ContextInjector:
"""
# Inject context parameters for trade tools
if request.name in ["buy", "sell"]:
# Add signature and today_date to args if not present
if "signature" not in request.args:
request.args["signature"] = self.signature
if "today_date" not in request.args:
request.args["today_date"] = self.today_date
if "job_id" not in request.args and self.job_id:
# Debug: Log self attributes BEFORE injection
print(f"[ContextInjector.__call__] ENTRY: id={id(self)}, self.signature={self.signature}, self.today_date={self.today_date}, self.job_id={self.job_id}, self.session_id={self.session_id}")
print(f"[ContextInjector.__call__] Args BEFORE injection: {request.args}")
# ALWAYS inject/override context parameters (don't trust AI-provided values)
request.args["signature"] = self.signature
request.args["today_date"] = self.today_date
if self.job_id:
request.args["job_id"] = self.job_id
if "session_id" not in request.args and self.session_id:
if self.session_id:
request.args["session_id"] = self.session_id
# Debug logging

View File

@@ -7,7 +7,6 @@ project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)
from tools.price_tools import get_open_prices
import json
from tools.deployment_config import get_db_path
from api.database import get_db_connection
from datetime import datetime
mcp = FastMCP("TradeTools")
@@ -30,7 +29,7 @@ def get_current_position_from_db(job_id: str, model: str, date: str) -> Tuple[Di
Raises:
Exception: If database query fails
"""
db_path = get_db_path()
db_path = "data/jobs.db"
conn = get_db_connection(db_path)
cursor = conn.cursor()
@@ -83,24 +82,13 @@ def get_current_position_from_db(job_id: str, model: str, date: str) -> Tuple[Di
conn.close()
@mcp.tool()
def buy(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
def _buy_impl(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
"""
Buy stock function - writes to SQLite database.
Internal buy implementation - accepts injected context parameters.
Args:
symbol: Stock symbol (e.g., "AAPL", "MSFT")
amount: Number of shares to buy (positive integer)
signature: Model signature (injected by ContextInjector)
today_date: Trading date YYYY-MM-DD (injected by ContextInjector)
job_id: Job UUID (injected by ContextInjector)
session_id: Trading session ID (injected by ContextInjector)
Returns:
Dict[str, Any]:
- Success: {"CASH": amount, symbol: quantity, ...}
- Failure: {"error": message, ...}
This function is not exposed to the AI model. It receives runtime context
(signature, today_date, job_id, session_id) from the ContextInjector.
"""
# Validate required parameters
if not job_id:
@@ -110,7 +98,7 @@ def buy(symbol: str, amount: int, signature: str = None, today_date: str = None,
if not today_date:
return {"error": "Missing required parameter: today_date"}
db_path = get_db_path()
db_path = "data/jobs.db"
conn = get_db_connection(db_path)
cursor = conn.cursor()
@@ -207,8 +195,29 @@ def buy(symbol: str, amount: int, signature: str = None, today_date: str = None,
@mcp.tool()
def sell(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
def buy(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
"""
Buy stock shares.
Args:
symbol: Stock symbol (e.g., "AAPL", "MSFT", "GOOGL")
amount: Number of shares to buy (positive integer)
Returns:
Dict[str, Any]:
- Success: {"CASH": remaining_cash, "SYMBOL": shares, ...}
- Failure: {"error": error_message, ...}
Note: signature, today_date, job_id, session_id are automatically injected by the system.
Do not provide these parameters - they will be added automatically.
"""
# Delegate to internal implementation
return _buy_impl(symbol, amount, signature, today_date, job_id, session_id)
def _sell_impl(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
"""
Sell stock function - writes to SQLite database.
@@ -233,7 +242,7 @@ def sell(symbol: str, amount: int, signature: str = None, today_date: str = None
if not today_date:
return {"error": "Missing required parameter: today_date"}
db_path = get_db_path()
db_path = "data/jobs.db"
conn = get_db_connection(db_path)
cursor = conn.cursor()
@@ -328,6 +337,28 @@ def sell(symbol: str, amount: int, signature: str = None, today_date: str = None
conn.close()
@mcp.tool()
def sell(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
"""
Sell stock shares.
Args:
symbol: Stock symbol (e.g., "AAPL", "MSFT", "GOOGL")
amount: Number of shares to sell (positive integer)
Returns:
Dict[str, Any]:
- Success: {"CASH": remaining_cash, "SYMBOL": shares, ...}
- Failure: {"error": error_message, ...}
Note: signature, today_date, job_id, session_id are automatically injected by the system.
Do not provide these parameters - they will be added automatically.
"""
# Delegate to internal implementation
return _sell_impl(symbol, amount, signature, today_date, job_id, session_id)
if __name__ == "__main__":
port = int(os.getenv("TRADE_HTTP_PORT", "8002"))
mcp.run(transport="streamable-http", port=port)

View File

@@ -140,7 +140,10 @@ class ModelDayExecutor:
job_id=self.job_id,
session_id=session_id
)
agent.set_context(context_injector)
logger.info(f"[DEBUG] ModelDayExecutor: Created ContextInjector with signature={self.model_sig}, date={self.date}, job_id={self.job_id}, session_id={session_id}")
logger.info(f"[DEBUG] ModelDayExecutor: Calling await agent.set_context()")
await agent.set_context(context_injector)
logger.info(f"[DEBUG] ModelDayExecutor: set_context() completed")
# Run trading session
logger.info(f"Running trading session for {self.model_sig} on {self.date}")
@@ -155,10 +158,13 @@ class ModelDayExecutor:
# Update session summary
await self._update_session_summary(cursor, session_id, conversation, agent)
# Store positions (pass session_id)
self._write_results_to_db(agent, session_id)
# Commit and close connection before _write_results_to_db opens a new one
conn.commit()
conn.close()
conn = None # Mark as closed
# Store positions (pass session_id) - this opens its own connection
self._write_results_to_db(agent, session_id)
# Update status to completed
self.job_manager.update_job_detail_status(

View File

@@ -90,7 +90,7 @@ class SimulationWorker:
logger.info(f"Starting job {self.job_id}: {len(date_range)} dates, {len(models)} models")
# NEW: Prepare price data (download if needed)
available_dates, warnings = self._prepare_data(date_range, models, config_path)
available_dates, warnings, completion_skips = self._prepare_data(date_range, models, config_path)
if not available_dates:
error_msg = "No trading dates available after price data preparation"
@@ -100,7 +100,7 @@ class SimulationWorker:
# Execute available dates only
for date in available_dates:
logger.info(f"Processing date {date} with {len(models)} models")
self._execute_date(date, models, config_path)
self._execute_date(date, models, config_path, completion_skips)
# Job completed - determine final status
progress = self.job_manager.get_job_progress(self.job_id)
@@ -145,7 +145,8 @@ class SimulationWorker:
"error": error_msg
}
def _execute_date(self, date: str, models: List[str], config_path: str) -> None:
def _execute_date(self, date: str, models: List[str], config_path: str,
completion_skips: Dict[str, Set[str]] = None) -> None:
"""
Execute all models for a single date in parallel.
@@ -153,14 +154,24 @@ class SimulationWorker:
date: Trading date (YYYY-MM-DD)
models: List of model signatures to execute
config_path: Path to configuration file
completion_skips: {model: {dates}} of already-completed model-days to skip
Uses ThreadPoolExecutor to run all models concurrently for this date.
Waits for all models to complete before returning.
Skips models that have already completed this date.
"""
if completion_skips is None:
completion_skips = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all model executions for this date
futures = []
for model in models:
# Skip if this model-day was already completed
if date in completion_skips.get(model, set()):
logger.debug(f"Skipping {model} on {date} (already completed)")
continue
future = executor.submit(
self._execute_model_day,
date,
@@ -397,7 +408,10 @@ class SimulationWorker:
config_path: Path to configuration file
Returns:
Tuple of (available_dates, warnings)
Tuple of (available_dates, warnings, completion_skips)
- available_dates: Dates to process
- warnings: Warning messages
- completion_skips: {model: {dates}} of already-completed model-days
"""
from api.price_data_manager import PriceDataManager
@@ -456,7 +470,7 @@ class SimulationWorker:
self.job_manager.update_job_status(self.job_id, "running")
logger.info(f"Job {self.job_id}: Starting execution - {len(dates_to_process)} dates, {len(models)} models")
return dates_to_process, warnings
return dates_to_process, warnings, completion_skips
def get_job_info(self) -> Dict[str, Any]:
"""

View File

@@ -320,12 +320,11 @@ def get_today_init_position_from_db(
If no position exists: {"CASH": 10000.0} (initial cash)
"""
import logging
from tools.deployment_config import get_db_path
from api.database import get_db_connection
logger = logging.getLogger(__name__)
db_path = get_db_path()
db_path = "data/jobs.db"
conn = get_db_connection(db_path)
cursor = conn.cursor()
@@ -385,14 +384,13 @@ def add_no_trade_record_to_db(
session_id: Trading session ID
"""
import logging
from tools.deployment_config import get_db_path
from api.database import get_db_connection
from agent_tools.tool_trade import get_current_position_from_db
from datetime import datetime
logger = logging.getLogger(__name__)
db_path = get_db_path()
db_path = "data/jobs.db"
conn = get_db_connection(db_path)
cursor = conn.cursor()