Compare commits

...

4 Commits

Author SHA1 Message Date
019c84fca8 refactor: migrate trade tools from file-based to SQLite position storage
Complete rewrite of position management in MCP trade tools:

**Trade Tools (agent_tools/tool_trade.py)**
- Replace file-based position.jsonl reads with SQLite queries
- Add get_current_position_from_db() to query positions and holdings tables
- Rewrite buy() and sell() to write directly to database
- Calculate portfolio value and P&L metrics in tools
- Accept job_id and session_id parameters via ContextInjector
- Return errors with proper context for debugging
- Use deployment-aware database path resolution

**Context Injection (agent/context_injector.py)**
- Add job_id and session_id to constructor
- Inject job_id and session_id into buy/sell tool calls
- Support optional parameters (None in standalone mode)

**BaseAgent (agent/base_agent/base_agent.py)**
- Read JOB_ID from runtime config
- Pass job_id to ContextInjector during initialization
- Enable automatic context injection for API mode

**ModelDayExecutor (api/model_day_executor.py)**
- Add _initialize_starting_position() method
- Create initial position record before agent runs
- Load initial_cash from config
- Update context_injector.session_id after session creation
- Link positions to sessions automatically

**Architecture Changes:**
- Eliminates file-based position tracking entirely
- Single source of truth: SQLite database
- Positions automatically linked to trading sessions
- Concurrent execution safe (no file system conflicts)
- Deployment mode aware (prod vs dev databases)

This completes the migration to database-only position storage.
File-based position.jsonl is no longer used or created.

Fixes context injection errors in concurrent simulations.
2025-11-02 21:36:57 -05:00
8521f685c7 fix: improve error handling in buy/sell functions
Add proper exception handling around get_latest_position() calls in both
buy() and sell() functions. Previously, exceptions were caught but code
continued execution with undefined variables, causing "variable referenced
before assignment" errors.

Now returns error dict with context when position lookup fails.

Related to context injection implementation for concurrent simulations.
2025-11-02 20:38:55 -05:00
bf12e981fe debug: add logging to trace parameter injection 2025-11-02 20:29:35 -05:00
a16bac5d08 fix: use 'args' instead of 'arguments' in MCPToolCallRequest
MCPToolCallRequest has 'args' attribute, not 'arguments'. Fixed
attribute name to match the actual API.
2025-11-02 20:21:43 -05:00
4 changed files with 367 additions and 170 deletions

View File

@@ -173,10 +173,15 @@ class BaseAgent:
print("⚠️ OpenAI base URL not set, using default")
try:
# Get job_id from runtime config if available (API mode)
from tools.general_tools import get_config_value
job_id = get_config_value("JOB_ID") # Returns None if not in API mode
# Create context injector for injecting signature and today_date into tool calls
self.context_injector = ContextInjector(
signature=self.signature,
today_date=self.init_date # Will be updated per trading session
today_date=self.init_date, # Will be updated per trading session
job_id=job_id # Will be None in standalone mode, populated in API mode
)
# Create MCP client with interceptor

View File

@@ -17,16 +17,20 @@ class ContextInjector:
client = MultiServerMCPClient(config, tool_interceptors=[interceptor])
"""
def __init__(self, signature: str, today_date: str):
def __init__(self, signature: str, today_date: str, job_id: str = None, session_id: int = None):
"""
Initialize context injector.
Args:
signature: Model signature to inject
today_date: Trading date to inject
job_id: Job UUID to inject (optional)
session_id: Trading session ID to inject (optional, updated during execution)
"""
self.signature = signature
self.today_date = today_date
self.job_id = job_id
self.session_id = session_id
async def __call__(
self,
@@ -43,13 +47,20 @@ class ContextInjector:
Returns:
Result from handler after injecting context
"""
# Inject signature and today_date for trade tools
# Inject context parameters for trade tools
if request.name in ["buy", "sell"]:
# Add signature and today_date to arguments if not present
if "signature" not in request.arguments:
request.arguments["signature"] = self.signature
if "today_date" not in request.arguments:
request.arguments["today_date"] = self.today_date
# Add signature and today_date to args if not present
if "signature" not in request.args:
request.args["signature"] = self.signature
if "today_date" not in request.args:
request.args["today_date"] = self.today_date
if "job_id" not in request.args and self.job_id:
request.args["job_id"] = self.job_id
if "session_id" not in request.args and self.session_id:
request.args["session_id"] = self.session_id
# Debug logging
print(f"[ContextInjector] Tool: {request.name}, Args after injection: {request.args}")
# Call the actual tool handler
return await handler(request)

View File

@@ -1,205 +1,333 @@
from fastmcp import FastMCP
import sys
import os
from typing import Dict, List, Optional, Any
from typing import Dict, List, Optional, Any, Tuple
# Add project root directory to Python path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)
from tools.price_tools import get_yesterday_date, get_open_prices, get_yesterday_open_and_close_price, get_latest_position, get_yesterday_profit
from tools.price_tools import get_open_prices
import json
from tools.general_tools import get_config_value,write_config_value
from tools.deployment_config import get_db_path
from api.database import get_db_connection
from datetime import datetime
mcp = FastMCP("TradeTools")
@mcp.tool()
def buy(symbol: str, amount: int, signature: str = None, today_date: str = None) -> Dict[str, Any]:
def get_current_position_from_db(job_id: str, model: str, date: str) -> Tuple[Dict[str, float], int]:
"""
Buy stock function
This function simulates stock buying operations, including the following steps:
1. Get current position and operation ID
2. Get stock opening price for the day
3. Validate buy conditions (sufficient cash)
4. Update position (increase stock quantity, decrease cash)
5. Record transaction to position.jsonl file
Query current position from SQLite database.
Args:
symbol: Stock symbol, such as "AAPL", "MSFT", etc.
amount: Buy quantity, must be a positive integer, indicating how many shares to buy
signature: Model signature (optional, will use config/env if not provided)
today_date: Trading date (optional, will use config/env if not provided)
job_id: Job UUID
model: Model signature
date: Trading date (YYYY-MM-DD)
Returns:
Tuple of (position_dict, next_action_id)
- position_dict: {symbol: quantity, "CASH": amount}
- next_action_id: Next available action_id for this job+model
Raises:
Exception: If database query fails
"""
db_path = get_db_path()
conn = get_db_connection(db_path)
cursor = conn.cursor()
try:
# Get most recent position on or before this date
cursor.execute("""
SELECT p.id, p.cash
FROM positions p
WHERE p.job_id = ? AND p.model = ? AND p.date <= ?
ORDER BY p.date DESC, p.action_id DESC
LIMIT 1
""", (job_id, model, date))
position_row = cursor.fetchone()
if not position_row:
# No position found - this shouldn't happen if ModelDayExecutor initializes properly
raise Exception(f"No position found for job_id={job_id}, model={model}, date={date}")
position_id = position_row[0]
cash = position_row[1]
# Build position dict starting with CASH
position_dict = {"CASH": cash}
# Get holdings for this position
cursor.execute("""
SELECT symbol, quantity
FROM holdings
WHERE position_id = ?
""", (position_id,))
for row in cursor.fetchall():
symbol = row[0]
quantity = row[1]
position_dict[symbol] = quantity
# Get next action_id
cursor.execute("""
SELECT COALESCE(MAX(action_id), -1) + 1 as next_action_id
FROM positions
WHERE job_id = ? AND model = ?
""", (job_id, model))
next_action_id = cursor.fetchone()[0]
return position_dict, next_action_id
finally:
conn.close()
@mcp.tool()
def buy(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
"""
Buy stock function - writes to SQLite database.
Args:
symbol: Stock symbol (e.g., "AAPL", "MSFT")
amount: Number of shares to buy (positive integer)
signature: Model signature (injected by ContextInjector)
today_date: Trading date YYYY-MM-DD (injected by ContextInjector)
job_id: Job UUID (injected by ContextInjector)
session_id: Trading session ID (injected by ContextInjector)
Returns:
Dict[str, Any]:
- Success: Returns new position dictionary (containing stock quantity and cash balance)
- Failure: Returns {"error": error message, ...} dictionary
Raises:
ValueError: Raised when SIGNATURE environment variable is not set
Example:
>>> result = buy("AAPL", 10)
>>> print(result) # {"AAPL": 110, "MSFT": 5, "CASH": 5000.0, ...}
- Success: {"CASH": amount, symbol: quantity, ...}
- Failure: {"error": message, ...}
"""
# Step 1: Get environment variables and basic information
# Get signature (model name) from parameter or fallback to config/env
if signature is None:
signature = get_config_value("SIGNATURE")
if signature is None:
raise ValueError("SIGNATURE not provided and environment variable is not set")
# Validate required parameters
if not job_id:
return {"error": "Missing required parameter: job_id"}
if not signature:
return {"error": "Missing required parameter: signature"}
if not today_date:
return {"error": "Missing required parameter: today_date"}
# Get current trading date from parameter or fallback to config/env
if today_date is None:
today_date = get_config_value("TODAY_DATE")
# Step 2: Get current latest position and operation ID
# get_latest_position returns two values: position dictionary and current maximum operation ID
# This ID is used to ensure each operation has a unique identifier
try:
current_position, current_action_id = get_latest_position(today_date, signature)
except Exception as e:
print(e)
print(current_position, current_action_id)
print(today_date, signature)
# Step 3: Get stock opening price for the day
# Use get_open_prices function to get the opening price of specified stock for the day
# If stock symbol does not exist or price data is missing, KeyError exception will be raised
try:
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
except KeyError:
# Stock symbol does not exist or price data is missing, return error message
return {"error": f"Symbol {symbol} not found! This action will not be allowed.", "symbol": symbol, "date": today_date}
db_path = get_db_path()
conn = get_db_connection(db_path)
cursor = conn.cursor()
# Step 4: Validate buy conditions
# Calculate cash required for purchase: stock price × buy quantity
try:
cash_left = current_position["CASH"] - this_symbol_price * amount
except Exception as e:
print(current_position, "CASH", this_symbol_price, amount)
# Step 1: Get current position
current_position, next_action_id = get_current_position_from_db(job_id, signature, today_date)
# Check if cash balance is sufficient for purchase
if cash_left < 0:
# Insufficient cash, return error message
return {"error": "Insufficient cash! This action will not be allowed.", "required_cash": this_symbol_price * amount, "cash_available": current_position.get("CASH", 0), "symbol": symbol, "date": today_date}
else:
# Step 5: Execute buy operation, update position
# Create a copy of current position to avoid directly modifying original data
# Step 2: Get stock price
try:
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
except KeyError:
return {"error": f"Symbol {symbol} not found on {today_date}", "symbol": symbol, "date": today_date}
# Step 3: Validate sufficient cash
cash_required = this_symbol_price * amount
cash_available = current_position.get("CASH", 0)
cash_left = cash_available - cash_required
if cash_left < 0:
return {
"error": "Insufficient cash",
"required_cash": cash_required,
"cash_available": cash_available,
"symbol": symbol,
"date": today_date
}
# Step 4: Calculate new position
new_position = current_position.copy()
# Decrease cash balance
new_position["CASH"] = cash_left
# Increase stock position quantity
new_position[symbol] += amount
# Step 6: Record transaction to position.jsonl file
# Build file path: {project_root}/data/agent_data/{signature}/position/position.jsonl
# Use append mode ("a") to write new transaction record
# Each operation ID increments by 1, ensuring uniqueness of operation sequence
position_file_path = os.path.join(project_root, "data", "agent_data", signature, "position", "position.jsonl")
with open(position_file_path, "a") as f:
# Write JSON format transaction record, containing date, operation ID, transaction details and updated position
print(f"Writing to position.jsonl: {json.dumps({'date': today_date, 'id': current_action_id + 1, 'this_action':{'action':'buy','symbol':symbol,'amount':amount},'positions': new_position})}")
f.write(json.dumps({"date": today_date, "id": current_action_id + 1, "this_action":{"action":"buy","symbol":symbol,"amount":amount},"positions": new_position}) + "\n")
# Step 7: Return updated position
write_config_value("IF_TRADE", True)
print("IF_TRADE", get_config_value("IF_TRADE"))
new_position[symbol] = new_position.get(symbol, 0) + amount
# Step 5: Calculate portfolio value and P&L
portfolio_value = cash_left
for sym, qty in new_position.items():
if sym != "CASH":
try:
price = get_open_prices(today_date, [sym])[f'{sym}_price']
portfolio_value += qty * price
except KeyError:
pass # Symbol price not available, skip
# Get previous portfolio value for P&L calculation
cursor.execute("""
SELECT portfolio_value
FROM positions
WHERE job_id = ? AND model = ? AND date < ?
ORDER BY date DESC, action_id DESC
LIMIT 1
""", (job_id, signature, today_date))
row = cursor.fetchone()
previous_value = row[0] if row else 10000.0 # Default initial value
daily_profit = portfolio_value - previous_value
daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0
# Step 6: Write to positions table
created_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
INSERT INTO positions (
job_id, date, model, action_id, action_type, symbol,
amount, price, cash, portfolio_value, daily_profit,
daily_return_pct, session_id, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
job_id, today_date, signature, next_action_id, "buy", symbol,
amount, this_symbol_price, cash_left, portfolio_value, daily_profit,
daily_return_pct, session_id, created_at
))
position_id = cursor.lastrowid
# Step 7: Write to holdings table
for sym, qty in new_position.items():
if sym != "CASH":
cursor.execute("""
INSERT INTO holdings (position_id, symbol, quantity)
VALUES (?, ?, ?)
""", (position_id, sym, qty))
conn.commit()
print(f"[buy] {signature} bought {amount} shares of {symbol} at ${this_symbol_price}")
return new_position
@mcp.tool()
def sell(symbol: str, amount: int, signature: str = None, today_date: str = None) -> Dict[str, Any]:
"""
Sell stock function
except Exception as e:
conn.rollback()
return {"error": f"Trade failed: {str(e)}", "symbol": symbol, "date": today_date}
This function simulates stock selling operations, including the following steps:
1. Get current position and operation ID
2. Get stock opening price for the day
3. Validate sell conditions (position exists, sufficient quantity)
4. Update position (decrease stock quantity, increase cash)
5. Record transaction to position.jsonl file
finally:
conn.close()
@mcp.tool()
def sell(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
"""
Sell stock function - writes to SQLite database.
Args:
symbol: Stock symbol, such as "AAPL", "MSFT", etc.
amount: Sell quantity, must be a positive integer, indicating how many shares to sell
signature: Model signature (optional, will use config/env if not provided)
today_date: Trading date (optional, will use config/env if not provided)
symbol: Stock symbol (e.g., "AAPL", "MSFT")
amount: Number of shares to sell (positive integer)
signature: Model signature (injected by ContextInjector)
today_date: Trading date YYYY-MM-DD (injected by ContextInjector)
job_id: Job UUID (injected by ContextInjector)
session_id: Trading session ID (injected by ContextInjector)
Returns:
Dict[str, Any]:
- Success: Returns new position dictionary (containing stock quantity and cash balance)
- Failure: Returns {"error": error message, ...} dictionary
Raises:
ValueError: Raised when SIGNATURE environment variable is not set
Example:
>>> result = sell("AAPL", 10)
>>> print(result) # {"AAPL": 90, "MSFT": 5, "CASH": 15000.0, ...}
- Success: {"CASH": amount, symbol: quantity, ...}
- Failure: {"error": message, ...}
"""
# Step 1: Get environment variables and basic information
# Get signature (model name) from parameter or fallback to config/env
if signature is None:
signature = get_config_value("SIGNATURE")
if signature is None:
raise ValueError("SIGNATURE not provided and environment variable is not set")
# Validate required parameters
if not job_id:
return {"error": "Missing required parameter: job_id"}
if not signature:
return {"error": "Missing required parameter: signature"}
if not today_date:
return {"error": "Missing required parameter: today_date"}
db_path = get_db_path()
conn = get_db_connection(db_path)
cursor = conn.cursor()
# Get current trading date from parameter or fallback to config/env
if today_date is None:
today_date = get_config_value("TODAY_DATE")
# Step 2: Get current latest position and operation ID
# get_latest_position returns two values: position dictionary and current maximum operation ID
# This ID is used to ensure each operation has a unique identifier
current_position, current_action_id = get_latest_position(today_date, signature)
# Step 3: Get stock opening price for the day
# Use get_open_prices function to get the opening price of specified stock for the day
# If stock symbol does not exist or price data is missing, KeyError exception will be raised
try:
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
except KeyError:
# Stock symbol does not exist or price data is missing, return error message
return {"error": f"Symbol {symbol} not found! This action will not be allowed.", "symbol": symbol, "date": today_date}
# Step 1: Get current position
current_position, next_action_id = get_current_position_from_db(job_id, signature, today_date)
# Step 4: Validate sell conditions
# Check if holding this stock
if symbol not in current_position:
return {"error": f"No position for {symbol}! This action will not be allowed.", "symbol": symbol, "date": today_date}
# Step 2: Validate position exists
if symbol not in current_position:
return {"error": f"No position for {symbol}", "symbol": symbol, "date": today_date}
# Check if position quantity is sufficient for selling
if current_position[symbol] < amount:
return {"error": "Insufficient shares! This action will not be allowed.", "have": current_position.get(symbol, 0), "want_to_sell": amount, "symbol": symbol, "date": today_date}
if current_position[symbol] < amount:
return {
"error": "Insufficient shares",
"have": current_position[symbol],
"want_to_sell": amount,
"symbol": symbol,
"date": today_date
}
# Step 5: Execute sell operation, update position
# Create a copy of current position to avoid directly modifying original data
new_position = current_position.copy()
# Decrease stock position quantity
new_position[symbol] -= amount
# Increase cash balance: sell price × sell quantity
# Use get method to ensure CASH field exists, default to 0 if not present
new_position["CASH"] = new_position.get("CASH", 0) + this_symbol_price * amount
# Step 3: Get stock price
try:
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
except KeyError:
return {"error": f"Symbol {symbol} not found on {today_date}", "symbol": symbol, "date": today_date}
# Step 6: Record transaction to position.jsonl file
# Build file path: {project_root}/data/agent_data/{signature}/position/position.jsonl
# Use append mode ("a") to write new transaction record
# Each operation ID increments by 1, ensuring uniqueness of operation sequence
position_file_path = os.path.join(project_root, "data", "agent_data", signature, "position", "position.jsonl")
with open(position_file_path, "a") as f:
# Write JSON format transaction record, containing date, operation ID and updated position
print(f"Writing to position.jsonl: {json.dumps({'date': today_date, 'id': current_action_id + 1, 'this_action':{'action':'sell','symbol':symbol,'amount':amount},'positions': new_position})}")
f.write(json.dumps({"date": today_date, "id": current_action_id + 1, "this_action":{"action":"sell","symbol":symbol,"amount":amount},"positions": new_position}) + "\n")
# Step 4: Calculate new position
new_position = current_position.copy()
new_position[symbol] -= amount
new_position["CASH"] = new_position.get("CASH", 0) + (this_symbol_price * amount)
# Step 5: Calculate portfolio value and P&L
portfolio_value = new_position["CASH"]
for sym, qty in new_position.items():
if sym != "CASH":
try:
price = get_open_prices(today_date, [sym])[f'{sym}_price']
portfolio_value += qty * price
except KeyError:
pass
# Get previous portfolio value
cursor.execute("""
SELECT portfolio_value
FROM positions
WHERE job_id = ? AND model = ? AND date < ?
ORDER BY date DESC, action_id DESC
LIMIT 1
""", (job_id, signature, today_date))
row = cursor.fetchone()
previous_value = row[0] if row else 10000.0
daily_profit = portfolio_value - previous_value
daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0
# Step 6: Write to positions table
created_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
INSERT INTO positions (
job_id, date, model, action_id, action_type, symbol,
amount, price, cash, portfolio_value, daily_profit,
daily_return_pct, session_id, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
job_id, today_date, signature, next_action_id, "sell", symbol,
amount, this_symbol_price, new_position["CASH"], portfolio_value, daily_profit,
daily_return_pct, session_id, created_at
))
position_id = cursor.lastrowid
# Step 7: Write to holdings table
for sym, qty in new_position.items():
if sym != "CASH":
cursor.execute("""
INSERT INTO holdings (position_id, symbol, quantity)
VALUES (?, ?, ?)
""", (position_id, sym, qty))
conn.commit()
print(f"[sell] {signature} sold {amount} shares of {symbol} at ${this_symbol_price}")
return new_position
except Exception as e:
conn.rollback()
return {"error": f"Trade failed: {str(e)}", "symbol": symbol, "date": today_date}
finally:
conn.close()
# Step 7: Return updated position
write_config_value("IF_TRADE", True)
return new_position
if __name__ == "__main__":
# new_result = buy("AAPL", 1)
# print(new_result)
# new_result = sell("AAPL", 1)
# print(new_result)
port = int(os.getenv("TRADE_HTTP_PORT", "8002"))
mcp.run(transport="streamable-http", port=port)

View File

@@ -122,12 +122,20 @@ class ModelDayExecutor:
session_id = self._create_trading_session(cursor)
conn.commit()
# Initialize starting position if this is first day
self._initialize_starting_position(cursor, session_id)
conn.commit()
# Set environment variable for agent to use isolated config
os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path
# Initialize agent
agent = await self._initialize_agent()
# Update context injector with session_id
if hasattr(agent, 'context_injector') and agent.context_injector:
agent.context_injector.session_id = session_id
# Run trading session
logger.info(f"Running trading session for {self.model_sig} on {self.date}")
session_result = await agent.run_trading_session(self.date)
@@ -287,6 +295,51 @@ class ModelDayExecutor:
return cursor.lastrowid
def _initialize_starting_position(self, cursor, session_id: int) -> None:
"""
Initialize starting position if no prior positions exist for this job+model.
Creates action_id=0 position with initial_cash and zero stock holdings.
Args:
cursor: Database cursor
session_id: Trading session ID
"""
# Check if any positions exist for this job+model
cursor.execute("""
SELECT COUNT(*) FROM positions
WHERE job_id = ? AND model = ?
""", (self.job_id, self.model_sig))
if cursor.fetchone()[0] > 0:
# Positions already exist, no initialization needed
return
# Load config to get initial_cash
import json
with open(self.config_path, 'r') as f:
config = json.load(f)
agent_config = config.get("agent_config", {})
initial_cash = agent_config.get("initial_cash", 10000.0)
# Create initial position record
from datetime import datetime
created_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
INSERT INTO positions (
job_id, date, model, action_id, action_type,
cash, portfolio_value, session_id, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
self.job_id, self.date, self.model_sig, 0, "init",
initial_cash, initial_cash, session_id, created_at
))
logger.info(f"Initialized starting position for {self.model_sig} with ${initial_cash}")
async def _store_reasoning_logs(
self,
cursor,