mirror of
https://github.com/Xe138/AI-Trader.git
synced 2026-04-01 17:17:24 -04:00
refactor: migrate trade tools from file-based to SQLite position storage
Complete rewrite of position management in MCP trade tools: **Trade Tools (agent_tools/tool_trade.py)** - Replace file-based position.jsonl reads with SQLite queries - Add get_current_position_from_db() to query positions and holdings tables - Rewrite buy() and sell() to write directly to database - Calculate portfolio value and P&L metrics in tools - Accept job_id and session_id parameters via ContextInjector - Return errors with proper context for debugging - Use deployment-aware database path resolution **Context Injection (agent/context_injector.py)** - Add job_id and session_id to constructor - Inject job_id and session_id into buy/sell tool calls - Support optional parameters (None in standalone mode) **BaseAgent (agent/base_agent/base_agent.py)** - Read JOB_ID from runtime config - Pass job_id to ContextInjector during initialization - Enable automatic context injection for API mode **ModelDayExecutor (api/model_day_executor.py)** - Add _initialize_starting_position() method - Create initial position record before agent runs - Load initial_cash from config - Update context_injector.session_id after session creation - Link positions to sessions automatically **Architecture Changes:** - Eliminates file-based position tracking entirely - Single source of truth: SQLite database - Positions automatically linked to trading sessions - Concurrent execution safe (no file system conflicts) - Deployment mode aware (prod vs dev databases) This completes the migration to database-only position storage. File-based position.jsonl is no longer used or created. Fixes context injection errors in concurrent simulations.
This commit is contained in:
@@ -173,10 +173,15 @@ class BaseAgent:
|
||||
print("⚠️ OpenAI base URL not set, using default")
|
||||
|
||||
try:
|
||||
# Get job_id from runtime config if available (API mode)
|
||||
from tools.general_tools import get_config_value
|
||||
job_id = get_config_value("JOB_ID") # Returns None if not in API mode
|
||||
|
||||
# Create context injector for injecting signature and today_date into tool calls
|
||||
self.context_injector = ContextInjector(
|
||||
signature=self.signature,
|
||||
today_date=self.init_date # Will be updated per trading session
|
||||
today_date=self.init_date, # Will be updated per trading session
|
||||
job_id=job_id # Will be None in standalone mode, populated in API mode
|
||||
)
|
||||
|
||||
# Create MCP client with interceptor
|
||||
|
||||
@@ -17,16 +17,20 @@ class ContextInjector:
|
||||
client = MultiServerMCPClient(config, tool_interceptors=[interceptor])
|
||||
"""
|
||||
|
||||
def __init__(self, signature: str, today_date: str):
|
||||
def __init__(self, signature: str, today_date: str, job_id: str = None, session_id: int = None):
|
||||
"""
|
||||
Initialize context injector.
|
||||
|
||||
Args:
|
||||
signature: Model signature to inject
|
||||
today_date: Trading date to inject
|
||||
job_id: Job UUID to inject (optional)
|
||||
session_id: Trading session ID to inject (optional, updated during execution)
|
||||
"""
|
||||
self.signature = signature
|
||||
self.today_date = today_date
|
||||
self.job_id = job_id
|
||||
self.session_id = session_id
|
||||
|
||||
async def __call__(
|
||||
self,
|
||||
@@ -43,13 +47,17 @@ class ContextInjector:
|
||||
Returns:
|
||||
Result from handler after injecting context
|
||||
"""
|
||||
# Inject signature and today_date for trade tools
|
||||
# Inject context parameters for trade tools
|
||||
if request.name in ["buy", "sell"]:
|
||||
# Add signature and today_date to args if not present
|
||||
if "signature" not in request.args:
|
||||
request.args["signature"] = self.signature
|
||||
if "today_date" not in request.args:
|
||||
request.args["today_date"] = self.today_date
|
||||
if "job_id" not in request.args and self.job_id:
|
||||
request.args["job_id"] = self.job_id
|
||||
if "session_id" not in request.args and self.session_id:
|
||||
request.args["session_id"] = self.session_id
|
||||
|
||||
# Debug logging
|
||||
print(f"[ContextInjector] Tool: {request.name}, Args after injection: {request.args}")
|
||||
|
||||
@@ -1,208 +1,333 @@
|
||||
from fastmcp import FastMCP
|
||||
import sys
|
||||
import os
|
||||
from typing import Dict, List, Optional, Any
|
||||
from typing import Dict, List, Optional, Any, Tuple
|
||||
# Add project root directory to Python path
|
||||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.insert(0, project_root)
|
||||
from tools.price_tools import get_yesterday_date, get_open_prices, get_yesterday_open_and_close_price, get_latest_position, get_yesterday_profit
|
||||
from tools.price_tools import get_open_prices
|
||||
import json
|
||||
from tools.general_tools import get_config_value,write_config_value
|
||||
from tools.deployment_config import get_db_path
|
||||
from api.database import get_db_connection
|
||||
from datetime import datetime
|
||||
mcp = FastMCP("TradeTools")
|
||||
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def buy(symbol: str, amount: int, signature: str = None, today_date: str = None) -> Dict[str, Any]:
|
||||
def get_current_position_from_db(job_id: str, model: str, date: str) -> Tuple[Dict[str, float], int]:
|
||||
"""
|
||||
Buy stock function
|
||||
|
||||
This function simulates stock buying operations, including the following steps:
|
||||
1. Get current position and operation ID
|
||||
2. Get stock opening price for the day
|
||||
3. Validate buy conditions (sufficient cash)
|
||||
4. Update position (increase stock quantity, decrease cash)
|
||||
5. Record transaction to position.jsonl file
|
||||
Query current position from SQLite database.
|
||||
|
||||
Args:
|
||||
symbol: Stock symbol, such as "AAPL", "MSFT", etc.
|
||||
amount: Buy quantity, must be a positive integer, indicating how many shares to buy
|
||||
signature: Model signature (optional, will use config/env if not provided)
|
||||
today_date: Trading date (optional, will use config/env if not provided)
|
||||
job_id: Job UUID
|
||||
model: Model signature
|
||||
date: Trading date (YYYY-MM-DD)
|
||||
|
||||
Returns:
|
||||
Tuple of (position_dict, next_action_id)
|
||||
- position_dict: {symbol: quantity, "CASH": amount}
|
||||
- next_action_id: Next available action_id for this job+model
|
||||
|
||||
Raises:
|
||||
Exception: If database query fails
|
||||
"""
|
||||
db_path = get_db_path()
|
||||
conn = get_db_connection(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# Get most recent position on or before this date
|
||||
cursor.execute("""
|
||||
SELECT p.id, p.cash
|
||||
FROM positions p
|
||||
WHERE p.job_id = ? AND p.model = ? AND p.date <= ?
|
||||
ORDER BY p.date DESC, p.action_id DESC
|
||||
LIMIT 1
|
||||
""", (job_id, model, date))
|
||||
|
||||
position_row = cursor.fetchone()
|
||||
|
||||
if not position_row:
|
||||
# No position found - this shouldn't happen if ModelDayExecutor initializes properly
|
||||
raise Exception(f"No position found for job_id={job_id}, model={model}, date={date}")
|
||||
|
||||
position_id = position_row[0]
|
||||
cash = position_row[1]
|
||||
|
||||
# Build position dict starting with CASH
|
||||
position_dict = {"CASH": cash}
|
||||
|
||||
# Get holdings for this position
|
||||
cursor.execute("""
|
||||
SELECT symbol, quantity
|
||||
FROM holdings
|
||||
WHERE position_id = ?
|
||||
""", (position_id,))
|
||||
|
||||
for row in cursor.fetchall():
|
||||
symbol = row[0]
|
||||
quantity = row[1]
|
||||
position_dict[symbol] = quantity
|
||||
|
||||
# Get next action_id
|
||||
cursor.execute("""
|
||||
SELECT COALESCE(MAX(action_id), -1) + 1 as next_action_id
|
||||
FROM positions
|
||||
WHERE job_id = ? AND model = ?
|
||||
""", (job_id, model))
|
||||
|
||||
next_action_id = cursor.fetchone()[0]
|
||||
|
||||
return position_dict, next_action_id
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def buy(symbol: str, amount: int, signature: str = None, today_date: str = None,
|
||||
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Buy stock function - writes to SQLite database.
|
||||
|
||||
Args:
|
||||
symbol: Stock symbol (e.g., "AAPL", "MSFT")
|
||||
amount: Number of shares to buy (positive integer)
|
||||
signature: Model signature (injected by ContextInjector)
|
||||
today_date: Trading date YYYY-MM-DD (injected by ContextInjector)
|
||||
job_id: Job UUID (injected by ContextInjector)
|
||||
session_id: Trading session ID (injected by ContextInjector)
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]:
|
||||
- Success: Returns new position dictionary (containing stock quantity and cash balance)
|
||||
- Failure: Returns {"error": error message, ...} dictionary
|
||||
|
||||
Raises:
|
||||
ValueError: Raised when SIGNATURE environment variable is not set
|
||||
|
||||
Example:
|
||||
>>> result = buy("AAPL", 10)
|
||||
>>> print(result) # {"AAPL": 110, "MSFT": 5, "CASH": 5000.0, ...}
|
||||
- Success: {"CASH": amount, symbol: quantity, ...}
|
||||
- Failure: {"error": message, ...}
|
||||
"""
|
||||
# Step 1: Get environment variables and basic information
|
||||
# Get signature (model name) from parameter or fallback to config/env
|
||||
print(f"[buy] Received signature parameter: {signature}")
|
||||
if signature is None:
|
||||
signature = get_config_value("SIGNATURE")
|
||||
print(f"[buy] Signature from config: {signature}")
|
||||
if signature is None:
|
||||
raise ValueError("SIGNATURE not provided and environment variable is not set")
|
||||
# Validate required parameters
|
||||
if not job_id:
|
||||
return {"error": "Missing required parameter: job_id"}
|
||||
if not signature:
|
||||
return {"error": "Missing required parameter: signature"}
|
||||
if not today_date:
|
||||
return {"error": "Missing required parameter: today_date"}
|
||||
|
||||
# Get current trading date from parameter or fallback to config/env
|
||||
if today_date is None:
|
||||
today_date = get_config_value("TODAY_DATE")
|
||||
db_path = get_db_path()
|
||||
conn = get_db_connection(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Step 2: Get current latest position and operation ID
|
||||
# get_latest_position returns two values: position dictionary and current maximum operation ID
|
||||
# This ID is used to ensure each operation has a unique identifier
|
||||
try:
|
||||
current_position, current_action_id = get_latest_position(today_date, signature)
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to get position: {str(e)}", "signature": signature, "date": today_date}
|
||||
# Step 3: Get stock opening price for the day
|
||||
# Use get_open_prices function to get the opening price of specified stock for the day
|
||||
# If stock symbol does not exist or price data is missing, KeyError exception will be raised
|
||||
# Step 1: Get current position
|
||||
current_position, next_action_id = get_current_position_from_db(job_id, signature, today_date)
|
||||
|
||||
# Step 2: Get stock price
|
||||
try:
|
||||
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
|
||||
except KeyError:
|
||||
# Stock symbol does not exist or price data is missing, return error message
|
||||
return {"error": f"Symbol {symbol} not found! This action will not be allowed.", "symbol": symbol, "date": today_date}
|
||||
return {"error": f"Symbol {symbol} not found on {today_date}", "symbol": symbol, "date": today_date}
|
||||
|
||||
# Step 4: Validate buy conditions
|
||||
# Calculate cash required for purchase: stock price × buy quantity
|
||||
try:
|
||||
cash_left = current_position["CASH"] - this_symbol_price * amount
|
||||
except Exception as e:
|
||||
print(current_position, "CASH", this_symbol_price, amount)
|
||||
# Step 3: Validate sufficient cash
|
||||
cash_required = this_symbol_price * amount
|
||||
cash_available = current_position.get("CASH", 0)
|
||||
cash_left = cash_available - cash_required
|
||||
|
||||
# Check if cash balance is sufficient for purchase
|
||||
if cash_left < 0:
|
||||
# Insufficient cash, return error message
|
||||
return {"error": "Insufficient cash! This action will not be allowed.", "required_cash": this_symbol_price * amount, "cash_available": current_position.get("CASH", 0), "symbol": symbol, "date": today_date}
|
||||
else:
|
||||
# Step 5: Execute buy operation, update position
|
||||
# Create a copy of current position to avoid directly modifying original data
|
||||
return {
|
||||
"error": "Insufficient cash",
|
||||
"required_cash": cash_required,
|
||||
"cash_available": cash_available,
|
||||
"symbol": symbol,
|
||||
"date": today_date
|
||||
}
|
||||
|
||||
# Step 4: Calculate new position
|
||||
new_position = current_position.copy()
|
||||
|
||||
# Decrease cash balance
|
||||
new_position["CASH"] = cash_left
|
||||
new_position[symbol] = new_position.get(symbol, 0) + amount
|
||||
|
||||
# Increase stock position quantity
|
||||
new_position[symbol] += amount
|
||||
# Step 5: Calculate portfolio value and P&L
|
||||
portfolio_value = cash_left
|
||||
for sym, qty in new_position.items():
|
||||
if sym != "CASH":
|
||||
try:
|
||||
price = get_open_prices(today_date, [sym])[f'{sym}_price']
|
||||
portfolio_value += qty * price
|
||||
except KeyError:
|
||||
pass # Symbol price not available, skip
|
||||
|
||||
# Step 6: Record transaction to position.jsonl file
|
||||
# Build file path: {project_root}/data/agent_data/{signature}/position/position.jsonl
|
||||
# Use append mode ("a") to write new transaction record
|
||||
# Each operation ID increments by 1, ensuring uniqueness of operation sequence
|
||||
position_file_path = os.path.join(project_root, "data", "agent_data", signature, "position", "position.jsonl")
|
||||
with open(position_file_path, "a") as f:
|
||||
# Write JSON format transaction record, containing date, operation ID, transaction details and updated position
|
||||
print(f"Writing to position.jsonl: {json.dumps({'date': today_date, 'id': current_action_id + 1, 'this_action':{'action':'buy','symbol':symbol,'amount':amount},'positions': new_position})}")
|
||||
f.write(json.dumps({"date": today_date, "id": current_action_id + 1, "this_action":{"action":"buy","symbol":symbol,"amount":amount},"positions": new_position}) + "\n")
|
||||
# Step 7: Return updated position
|
||||
write_config_value("IF_TRADE", True)
|
||||
print("IF_TRADE", get_config_value("IF_TRADE"))
|
||||
# Get previous portfolio value for P&L calculation
|
||||
cursor.execute("""
|
||||
SELECT portfolio_value
|
||||
FROM positions
|
||||
WHERE job_id = ? AND model = ? AND date < ?
|
||||
ORDER BY date DESC, action_id DESC
|
||||
LIMIT 1
|
||||
""", (job_id, signature, today_date))
|
||||
|
||||
row = cursor.fetchone()
|
||||
previous_value = row[0] if row else 10000.0 # Default initial value
|
||||
|
||||
daily_profit = portfolio_value - previous_value
|
||||
daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0
|
||||
|
||||
# Step 6: Write to positions table
|
||||
created_at = datetime.utcnow().isoformat() + "Z"
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO positions (
|
||||
job_id, date, model, action_id, action_type, symbol,
|
||||
amount, price, cash, portfolio_value, daily_profit,
|
||||
daily_return_pct, session_id, created_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
job_id, today_date, signature, next_action_id, "buy", symbol,
|
||||
amount, this_symbol_price, cash_left, portfolio_value, daily_profit,
|
||||
daily_return_pct, session_id, created_at
|
||||
))
|
||||
|
||||
position_id = cursor.lastrowid
|
||||
|
||||
# Step 7: Write to holdings table
|
||||
for sym, qty in new_position.items():
|
||||
if sym != "CASH":
|
||||
cursor.execute("""
|
||||
INSERT INTO holdings (position_id, symbol, quantity)
|
||||
VALUES (?, ?, ?)
|
||||
""", (position_id, sym, qty))
|
||||
|
||||
conn.commit()
|
||||
print(f"[buy] {signature} bought {amount} shares of {symbol} at ${this_symbol_price}")
|
||||
return new_position
|
||||
|
||||
@mcp.tool()
|
||||
def sell(symbol: str, amount: int, signature: str = None, today_date: str = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Sell stock function
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
return {"error": f"Trade failed: {str(e)}", "symbol": symbol, "date": today_date}
|
||||
|
||||
This function simulates stock selling operations, including the following steps:
|
||||
1. Get current position and operation ID
|
||||
2. Get stock opening price for the day
|
||||
3. Validate sell conditions (position exists, sufficient quantity)
|
||||
4. Update position (decrease stock quantity, increase cash)
|
||||
5. Record transaction to position.jsonl file
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def sell(symbol: str, amount: int, signature: str = None, today_date: str = None,
|
||||
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Sell stock function - writes to SQLite database.
|
||||
|
||||
Args:
|
||||
symbol: Stock symbol, such as "AAPL", "MSFT", etc.
|
||||
amount: Sell quantity, must be a positive integer, indicating how many shares to sell
|
||||
signature: Model signature (optional, will use config/env if not provided)
|
||||
today_date: Trading date (optional, will use config/env if not provided)
|
||||
symbol: Stock symbol (e.g., "AAPL", "MSFT")
|
||||
amount: Number of shares to sell (positive integer)
|
||||
signature: Model signature (injected by ContextInjector)
|
||||
today_date: Trading date YYYY-MM-DD (injected by ContextInjector)
|
||||
job_id: Job UUID (injected by ContextInjector)
|
||||
session_id: Trading session ID (injected by ContextInjector)
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]:
|
||||
- Success: Returns new position dictionary (containing stock quantity and cash balance)
|
||||
- Failure: Returns {"error": error message, ...} dictionary
|
||||
|
||||
Raises:
|
||||
ValueError: Raised when SIGNATURE environment variable is not set
|
||||
|
||||
Example:
|
||||
>>> result = sell("AAPL", 10)
|
||||
>>> print(result) # {"AAPL": 90, "MSFT": 5, "CASH": 15000.0, ...}
|
||||
- Success: {"CASH": amount, symbol: quantity, ...}
|
||||
- Failure: {"error": message, ...}
|
||||
"""
|
||||
# Step 1: Get environment variables and basic information
|
||||
# Get signature (model name) from parameter or fallback to config/env
|
||||
if signature is None:
|
||||
signature = get_config_value("SIGNATURE")
|
||||
if signature is None:
|
||||
raise ValueError("SIGNATURE not provided and environment variable is not set")
|
||||
# Validate required parameters
|
||||
if not job_id:
|
||||
return {"error": "Missing required parameter: job_id"}
|
||||
if not signature:
|
||||
return {"error": "Missing required parameter: signature"}
|
||||
if not today_date:
|
||||
return {"error": "Missing required parameter: today_date"}
|
||||
|
||||
# Get current trading date from parameter or fallback to config/env
|
||||
if today_date is None:
|
||||
today_date = get_config_value("TODAY_DATE")
|
||||
db_path = get_db_path()
|
||||
conn = get_db_connection(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Step 2: Get current latest position and operation ID
|
||||
# get_latest_position returns two values: position dictionary and current maximum operation ID
|
||||
# This ID is used to ensure each operation has a unique identifier
|
||||
try:
|
||||
current_position, current_action_id = get_latest_position(today_date, signature)
|
||||
except Exception as e:
|
||||
return {"error": f"Failed to get position: {str(e)}", "signature": signature, "date": today_date}
|
||||
# Step 1: Get current position
|
||||
current_position, next_action_id = get_current_position_from_db(job_id, signature, today_date)
|
||||
|
||||
# Step 3: Get stock opening price for the day
|
||||
# Use get_open_prices function to get the opening price of specified stock for the day
|
||||
# If stock symbol does not exist or price data is missing, KeyError exception will be raised
|
||||
# Step 2: Validate position exists
|
||||
if symbol not in current_position:
|
||||
return {"error": f"No position for {symbol}", "symbol": symbol, "date": today_date}
|
||||
|
||||
if current_position[symbol] < amount:
|
||||
return {
|
||||
"error": "Insufficient shares",
|
||||
"have": current_position[symbol],
|
||||
"want_to_sell": amount,
|
||||
"symbol": symbol,
|
||||
"date": today_date
|
||||
}
|
||||
|
||||
# Step 3: Get stock price
|
||||
try:
|
||||
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
|
||||
except KeyError:
|
||||
# Stock symbol does not exist or price data is missing, return error message
|
||||
return {"error": f"Symbol {symbol} not found! This action will not be allowed.", "symbol": symbol, "date": today_date}
|
||||
return {"error": f"Symbol {symbol} not found on {today_date}", "symbol": symbol, "date": today_date}
|
||||
|
||||
# Step 4: Validate sell conditions
|
||||
# Check if holding this stock
|
||||
if symbol not in current_position:
|
||||
return {"error": f"No position for {symbol}! This action will not be allowed.", "symbol": symbol, "date": today_date}
|
||||
|
||||
# Check if position quantity is sufficient for selling
|
||||
if current_position[symbol] < amount:
|
||||
return {"error": "Insufficient shares! This action will not be allowed.", "have": current_position.get(symbol, 0), "want_to_sell": amount, "symbol": symbol, "date": today_date}
|
||||
|
||||
# Step 5: Execute sell operation, update position
|
||||
# Create a copy of current position to avoid directly modifying original data
|
||||
# Step 4: Calculate new position
|
||||
new_position = current_position.copy()
|
||||
|
||||
# Decrease stock position quantity
|
||||
new_position[symbol] -= amount
|
||||
new_position["CASH"] = new_position.get("CASH", 0) + (this_symbol_price * amount)
|
||||
|
||||
# Increase cash balance: sell price × sell quantity
|
||||
# Use get method to ensure CASH field exists, default to 0 if not present
|
||||
new_position["CASH"] = new_position.get("CASH", 0) + this_symbol_price * amount
|
||||
# Step 5: Calculate portfolio value and P&L
|
||||
portfolio_value = new_position["CASH"]
|
||||
for sym, qty in new_position.items():
|
||||
if sym != "CASH":
|
||||
try:
|
||||
price = get_open_prices(today_date, [sym])[f'{sym}_price']
|
||||
portfolio_value += qty * price
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# Step 6: Record transaction to position.jsonl file
|
||||
# Build file path: {project_root}/data/agent_data/{signature}/position/position.jsonl
|
||||
# Use append mode ("a") to write new transaction record
|
||||
# Each operation ID increments by 1, ensuring uniqueness of operation sequence
|
||||
position_file_path = os.path.join(project_root, "data", "agent_data", signature, "position", "position.jsonl")
|
||||
with open(position_file_path, "a") as f:
|
||||
# Write JSON format transaction record, containing date, operation ID and updated position
|
||||
print(f"Writing to position.jsonl: {json.dumps({'date': today_date, 'id': current_action_id + 1, 'this_action':{'action':'sell','symbol':symbol,'amount':amount},'positions': new_position})}")
|
||||
f.write(json.dumps({"date": today_date, "id": current_action_id + 1, "this_action":{"action":"sell","symbol":symbol,"amount":amount},"positions": new_position}) + "\n")
|
||||
# Get previous portfolio value
|
||||
cursor.execute("""
|
||||
SELECT portfolio_value
|
||||
FROM positions
|
||||
WHERE job_id = ? AND model = ? AND date < ?
|
||||
ORDER BY date DESC, action_id DESC
|
||||
LIMIT 1
|
||||
""", (job_id, signature, today_date))
|
||||
|
||||
# Step 7: Return updated position
|
||||
write_config_value("IF_TRADE", True)
|
||||
row = cursor.fetchone()
|
||||
previous_value = row[0] if row else 10000.0
|
||||
|
||||
daily_profit = portfolio_value - previous_value
|
||||
daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0
|
||||
|
||||
# Step 6: Write to positions table
|
||||
created_at = datetime.utcnow().isoformat() + "Z"
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO positions (
|
||||
job_id, date, model, action_id, action_type, symbol,
|
||||
amount, price, cash, portfolio_value, daily_profit,
|
||||
daily_return_pct, session_id, created_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
job_id, today_date, signature, next_action_id, "sell", symbol,
|
||||
amount, this_symbol_price, new_position["CASH"], portfolio_value, daily_profit,
|
||||
daily_return_pct, session_id, created_at
|
||||
))
|
||||
|
||||
position_id = cursor.lastrowid
|
||||
|
||||
# Step 7: Write to holdings table
|
||||
for sym, qty in new_position.items():
|
||||
if sym != "CASH":
|
||||
cursor.execute("""
|
||||
INSERT INTO holdings (position_id, symbol, quantity)
|
||||
VALUES (?, ?, ?)
|
||||
""", (position_id, sym, qty))
|
||||
|
||||
conn.commit()
|
||||
print(f"[sell] {signature} sold {amount} shares of {symbol} at ${this_symbol_price}")
|
||||
return new_position
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
return {"error": f"Trade failed: {str(e)}", "symbol": symbol, "date": today_date}
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# new_result = buy("AAPL", 1)
|
||||
# print(new_result)
|
||||
# new_result = sell("AAPL", 1)
|
||||
# print(new_result)
|
||||
port = int(os.getenv("TRADE_HTTP_PORT", "8002"))
|
||||
mcp.run(transport="streamable-http", port=port)
|
||||
|
||||
@@ -122,12 +122,20 @@ class ModelDayExecutor:
|
||||
session_id = self._create_trading_session(cursor)
|
||||
conn.commit()
|
||||
|
||||
# Initialize starting position if this is first day
|
||||
self._initialize_starting_position(cursor, session_id)
|
||||
conn.commit()
|
||||
|
||||
# Set environment variable for agent to use isolated config
|
||||
os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path
|
||||
|
||||
# Initialize agent
|
||||
agent = await self._initialize_agent()
|
||||
|
||||
# Update context injector with session_id
|
||||
if hasattr(agent, 'context_injector') and agent.context_injector:
|
||||
agent.context_injector.session_id = session_id
|
||||
|
||||
# Run trading session
|
||||
logger.info(f"Running trading session for {self.model_sig} on {self.date}")
|
||||
session_result = await agent.run_trading_session(self.date)
|
||||
@@ -287,6 +295,51 @@ class ModelDayExecutor:
|
||||
|
||||
return cursor.lastrowid
|
||||
|
||||
def _initialize_starting_position(self, cursor, session_id: int) -> None:
|
||||
"""
|
||||
Initialize starting position if no prior positions exist for this job+model.
|
||||
|
||||
Creates action_id=0 position with initial_cash and zero stock holdings.
|
||||
|
||||
Args:
|
||||
cursor: Database cursor
|
||||
session_id: Trading session ID
|
||||
"""
|
||||
# Check if any positions exist for this job+model
|
||||
cursor.execute("""
|
||||
SELECT COUNT(*) FROM positions
|
||||
WHERE job_id = ? AND model = ?
|
||||
""", (self.job_id, self.model_sig))
|
||||
|
||||
if cursor.fetchone()[0] > 0:
|
||||
# Positions already exist, no initialization needed
|
||||
return
|
||||
|
||||
# Load config to get initial_cash
|
||||
import json
|
||||
with open(self.config_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
|
||||
agent_config = config.get("agent_config", {})
|
||||
initial_cash = agent_config.get("initial_cash", 10000.0)
|
||||
|
||||
# Create initial position record
|
||||
from datetime import datetime
|
||||
created_at = datetime.utcnow().isoformat() + "Z"
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO positions (
|
||||
job_id, date, model, action_id, action_type,
|
||||
cash, portfolio_value, session_id, created_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
self.job_id, self.date, self.model_sig, 0, "init",
|
||||
initial_cash, initial_cash, session_id, created_at
|
||||
))
|
||||
|
||||
logger.info(f"Initialized starting position for {self.model_sig} with ${initial_cash}")
|
||||
|
||||
async def _store_reasoning_logs(
|
||||
self,
|
||||
cursor,
|
||||
|
||||
Reference in New Issue
Block a user