mirror of
https://github.com/Xe138/AI-Trader.git
synced 2026-04-02 01:27:24 -04:00
Compare commits
12 Commits
v0.3.0-alp
...
v0.3.0-alp
| Author | SHA1 | Date | |
|---|---|---|---|
| 618943b278 | |||
| 1c19eea29a | |||
| e968434062 | |||
| 4c1d23a7c8 | |||
| 027b4bd8e4 | |||
| 7a734d265b | |||
| fcfdf36c1c | |||
| 019c84fca8 | |||
| 8521f685c7 | |||
| bf12e981fe | |||
| a16bac5d08 | |||
| 81b92e293a |
@@ -173,16 +173,13 @@ class BaseAgent:
|
||||
print("⚠️ OpenAI base URL not set, using default")
|
||||
|
||||
try:
|
||||
# Create context injector for injecting signature and today_date into tool calls
|
||||
self.context_injector = ContextInjector(
|
||||
signature=self.signature,
|
||||
today_date=self.init_date # Will be updated per trading session
|
||||
)
|
||||
# Context injector will be set later via set_context() method
|
||||
self.context_injector = None
|
||||
|
||||
# Create MCP client with interceptor
|
||||
# Create MCP client without interceptors initially
|
||||
self.client = MultiServerMCPClient(
|
||||
self.mcp_config,
|
||||
tool_interceptors=[self.context_injector]
|
||||
tool_interceptors=[]
|
||||
)
|
||||
|
||||
# Get tools
|
||||
@@ -224,6 +221,40 @@ class BaseAgent:
|
||||
|
||||
print(f"✅ Agent {self.signature} initialization completed")
|
||||
|
||||
async def set_context(self, context_injector: "ContextInjector") -> None:
|
||||
"""
|
||||
Inject ContextInjector after initialization.
|
||||
|
||||
This allows the ContextInjector to be created with the correct
|
||||
trading day date and session_id after the agent is initialized.
|
||||
|
||||
Args:
|
||||
context_injector: Configured ContextInjector instance with
|
||||
correct signature, today_date, job_id, session_id
|
||||
"""
|
||||
print(f"[DEBUG] set_context() ENTRY: Received context_injector with signature={context_injector.signature}, date={context_injector.today_date}, job_id={context_injector.job_id}, session_id={context_injector.session_id}")
|
||||
|
||||
self.context_injector = context_injector
|
||||
print(f"[DEBUG] set_context(): Set self.context_injector, id={id(self.context_injector)}")
|
||||
|
||||
# Recreate MCP client with the interceptor
|
||||
# Note: We need to recreate because MultiServerMCPClient doesn't have add_interceptor()
|
||||
print(f"[DEBUG] set_context(): Creating new MCP client with interceptor, id={id(context_injector)}")
|
||||
self.client = MultiServerMCPClient(
|
||||
self.mcp_config,
|
||||
tool_interceptors=[context_injector]
|
||||
)
|
||||
print(f"[DEBUG] set_context(): MCP client created")
|
||||
|
||||
# CRITICAL: Reload tools from new client so they use the interceptor
|
||||
print(f"[DEBUG] set_context(): Reloading tools...")
|
||||
self.tools = await self.client.get_tools()
|
||||
print(f"[DEBUG] set_context(): Tools reloaded, count={len(self.tools)}")
|
||||
|
||||
print(f"✅ Context injected: signature={context_injector.signature}, "
|
||||
f"date={context_injector.today_date}, job_id={context_injector.job_id}, "
|
||||
f"session_id={context_injector.session_id}")
|
||||
|
||||
def _capture_message(self, role: str, content: str, tool_name: str = None, tool_input: str = None) -> None:
|
||||
"""
|
||||
Capture a message in conversation history.
|
||||
@@ -424,18 +455,32 @@ Summary:"""
|
||||
await self._handle_trading_result(today_date)
|
||||
|
||||
async def _handle_trading_result(self, today_date: str) -> None:
|
||||
"""Handle trading results"""
|
||||
"""Handle trading results with database writes."""
|
||||
from tools.price_tools import add_no_trade_record_to_db
|
||||
|
||||
if_trade = get_config_value("IF_TRADE")
|
||||
|
||||
if if_trade:
|
||||
write_config_value("IF_TRADE", False)
|
||||
print("✅ Trading completed")
|
||||
else:
|
||||
print("📊 No trading, maintaining positions")
|
||||
try:
|
||||
add_no_trade_record(today_date, self.signature)
|
||||
except NameError as e:
|
||||
print(f"❌ NameError: {e}")
|
||||
raise
|
||||
|
||||
# Get context from runtime config
|
||||
job_id = get_config_value("JOB_ID")
|
||||
session_id = self.context_injector.session_id if self.context_injector else None
|
||||
|
||||
if not job_id or not session_id:
|
||||
raise ValueError("Missing JOB_ID or session_id for no-trade record")
|
||||
|
||||
# Write no-trade record to database
|
||||
add_no_trade_record_to_db(
|
||||
today_date,
|
||||
self.signature,
|
||||
job_id,
|
||||
session_id
|
||||
)
|
||||
|
||||
write_config_value("IF_TRADE", False)
|
||||
|
||||
def register_agent(self) -> None:
|
||||
|
||||
@@ -5,7 +5,7 @@ This interceptor automatically injects `signature` and `today_date` parameters
|
||||
into buy/sell tool calls to support concurrent multi-model simulations.
|
||||
"""
|
||||
|
||||
from typing import Any, Dict
|
||||
from typing import Any, Callable, Awaitable
|
||||
|
||||
|
||||
class ContextInjector:
|
||||
@@ -17,34 +17,53 @@ class ContextInjector:
|
||||
client = MultiServerMCPClient(config, tool_interceptors=[interceptor])
|
||||
"""
|
||||
|
||||
def __init__(self, signature: str, today_date: str):
|
||||
def __init__(self, signature: str, today_date: str, job_id: str = None, session_id: int = None):
|
||||
"""
|
||||
Initialize context injector.
|
||||
|
||||
Args:
|
||||
signature: Model signature to inject
|
||||
today_date: Trading date to inject
|
||||
job_id: Job UUID to inject (optional)
|
||||
session_id: Trading session ID to inject (optional, updated during execution)
|
||||
"""
|
||||
self.signature = signature
|
||||
self.today_date = today_date
|
||||
self.job_id = job_id
|
||||
self.session_id = session_id
|
||||
|
||||
def __call__(self, tool_name: str, tool_input: Dict[str, Any]) -> Dict[str, Any]:
|
||||
async def __call__(
|
||||
self,
|
||||
request: Any, # MCPToolCallRequest
|
||||
handler: Callable[[Any], Awaitable[Any]]
|
||||
) -> Any: # MCPToolCallResult
|
||||
"""
|
||||
Intercept tool call and inject context parameters.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool being called
|
||||
tool_input: Original tool input parameters
|
||||
request: Tool call request containing name and arguments
|
||||
handler: Async callable to execute the actual tool
|
||||
|
||||
Returns:
|
||||
Modified tool input with injected context
|
||||
Result from handler after injecting context
|
||||
"""
|
||||
# Only inject for trade tools (buy/sell)
|
||||
if tool_name in ["buy", "sell"]:
|
||||
# Inject signature and today_date if not already provided
|
||||
if "signature" not in tool_input:
|
||||
tool_input["signature"] = self.signature
|
||||
if "today_date" not in tool_input:
|
||||
tool_input["today_date"] = self.today_date
|
||||
# Inject context parameters for trade tools
|
||||
if request.name in ["buy", "sell"]:
|
||||
# Debug: Log self attributes BEFORE injection
|
||||
print(f"[ContextInjector.__call__] ENTRY: id={id(self)}, self.signature={self.signature}, self.today_date={self.today_date}, self.job_id={self.job_id}, self.session_id={self.session_id}")
|
||||
|
||||
return tool_input
|
||||
# Add signature and today_date to args if not present
|
||||
if "signature" not in request.args:
|
||||
request.args["signature"] = self.signature
|
||||
if "today_date" not in request.args:
|
||||
request.args["today_date"] = self.today_date
|
||||
if "job_id" not in request.args and self.job_id:
|
||||
request.args["job_id"] = self.job_id
|
||||
if "session_id" not in request.args and self.session_id:
|
||||
request.args["session_id"] = self.session_id
|
||||
|
||||
# Debug logging
|
||||
print(f"[ContextInjector] Tool: {request.name}, Args after injection: {request.args}")
|
||||
|
||||
# Call the actual tool handler
|
||||
return await handler(request)
|
||||
|
||||
@@ -1,205 +1,332 @@
|
||||
from fastmcp import FastMCP
|
||||
import sys
|
||||
import os
|
||||
from typing import Dict, List, Optional, Any
|
||||
from typing import Dict, List, Optional, Any, Tuple
|
||||
# Add project root directory to Python path
|
||||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.insert(0, project_root)
|
||||
from tools.price_tools import get_yesterday_date, get_open_prices, get_yesterday_open_and_close_price, get_latest_position, get_yesterday_profit
|
||||
from tools.price_tools import get_open_prices
|
||||
import json
|
||||
from tools.general_tools import get_config_value,write_config_value
|
||||
from api.database import get_db_connection
|
||||
from datetime import datetime
|
||||
mcp = FastMCP("TradeTools")
|
||||
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def buy(symbol: str, amount: int, signature: str = None, today_date: str = None) -> Dict[str, Any]:
|
||||
def get_current_position_from_db(job_id: str, model: str, date: str) -> Tuple[Dict[str, float], int]:
|
||||
"""
|
||||
Buy stock function
|
||||
|
||||
This function simulates stock buying operations, including the following steps:
|
||||
1. Get current position and operation ID
|
||||
2. Get stock opening price for the day
|
||||
3. Validate buy conditions (sufficient cash)
|
||||
4. Update position (increase stock quantity, decrease cash)
|
||||
5. Record transaction to position.jsonl file
|
||||
Query current position from SQLite database.
|
||||
|
||||
Args:
|
||||
symbol: Stock symbol, such as "AAPL", "MSFT", etc.
|
||||
amount: Buy quantity, must be a positive integer, indicating how many shares to buy
|
||||
signature: Model signature (optional, will use config/env if not provided)
|
||||
today_date: Trading date (optional, will use config/env if not provided)
|
||||
job_id: Job UUID
|
||||
model: Model signature
|
||||
date: Trading date (YYYY-MM-DD)
|
||||
|
||||
Returns:
|
||||
Tuple of (position_dict, next_action_id)
|
||||
- position_dict: {symbol: quantity, "CASH": amount}
|
||||
- next_action_id: Next available action_id for this job+model
|
||||
|
||||
Raises:
|
||||
Exception: If database query fails
|
||||
"""
|
||||
db_path = "data/jobs.db"
|
||||
conn = get_db_connection(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# Get most recent position on or before this date
|
||||
cursor.execute("""
|
||||
SELECT p.id, p.cash
|
||||
FROM positions p
|
||||
WHERE p.job_id = ? AND p.model = ? AND p.date <= ?
|
||||
ORDER BY p.date DESC, p.action_id DESC
|
||||
LIMIT 1
|
||||
""", (job_id, model, date))
|
||||
|
||||
position_row = cursor.fetchone()
|
||||
|
||||
if not position_row:
|
||||
# No position found - this shouldn't happen if ModelDayExecutor initializes properly
|
||||
raise Exception(f"No position found for job_id={job_id}, model={model}, date={date}")
|
||||
|
||||
position_id = position_row[0]
|
||||
cash = position_row[1]
|
||||
|
||||
# Build position dict starting with CASH
|
||||
position_dict = {"CASH": cash}
|
||||
|
||||
# Get holdings for this position
|
||||
cursor.execute("""
|
||||
SELECT symbol, quantity
|
||||
FROM holdings
|
||||
WHERE position_id = ?
|
||||
""", (position_id,))
|
||||
|
||||
for row in cursor.fetchall():
|
||||
symbol = row[0]
|
||||
quantity = row[1]
|
||||
position_dict[symbol] = quantity
|
||||
|
||||
# Get next action_id
|
||||
cursor.execute("""
|
||||
SELECT COALESCE(MAX(action_id), -1) + 1 as next_action_id
|
||||
FROM positions
|
||||
WHERE job_id = ? AND model = ?
|
||||
""", (job_id, model))
|
||||
|
||||
next_action_id = cursor.fetchone()[0]
|
||||
|
||||
return position_dict, next_action_id
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def buy(symbol: str, amount: int, signature: str = None, today_date: str = None,
|
||||
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Buy stock function - writes to SQLite database.
|
||||
|
||||
Args:
|
||||
symbol: Stock symbol (e.g., "AAPL", "MSFT")
|
||||
amount: Number of shares to buy (positive integer)
|
||||
signature: Model signature (injected by ContextInjector)
|
||||
today_date: Trading date YYYY-MM-DD (injected by ContextInjector)
|
||||
job_id: Job UUID (injected by ContextInjector)
|
||||
session_id: Trading session ID (injected by ContextInjector)
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]:
|
||||
- Success: Returns new position dictionary (containing stock quantity and cash balance)
|
||||
- Failure: Returns {"error": error message, ...} dictionary
|
||||
|
||||
Raises:
|
||||
ValueError: Raised when SIGNATURE environment variable is not set
|
||||
|
||||
Example:
|
||||
>>> result = buy("AAPL", 10)
|
||||
>>> print(result) # {"AAPL": 110, "MSFT": 5, "CASH": 5000.0, ...}
|
||||
- Success: {"CASH": amount, symbol: quantity, ...}
|
||||
- Failure: {"error": message, ...}
|
||||
"""
|
||||
# Step 1: Get environment variables and basic information
|
||||
# Get signature (model name) from parameter or fallback to config/env
|
||||
if signature is None:
|
||||
signature = get_config_value("SIGNATURE")
|
||||
if signature is None:
|
||||
raise ValueError("SIGNATURE not provided and environment variable is not set")
|
||||
# Validate required parameters
|
||||
if not job_id:
|
||||
return {"error": "Missing required parameter: job_id"}
|
||||
if not signature:
|
||||
return {"error": "Missing required parameter: signature"}
|
||||
if not today_date:
|
||||
return {"error": "Missing required parameter: today_date"}
|
||||
|
||||
# Get current trading date from parameter or fallback to config/env
|
||||
if today_date is None:
|
||||
today_date = get_config_value("TODAY_DATE")
|
||||
|
||||
# Step 2: Get current latest position and operation ID
|
||||
# get_latest_position returns two values: position dictionary and current maximum operation ID
|
||||
# This ID is used to ensure each operation has a unique identifier
|
||||
try:
|
||||
current_position, current_action_id = get_latest_position(today_date, signature)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print(current_position, current_action_id)
|
||||
print(today_date, signature)
|
||||
# Step 3: Get stock opening price for the day
|
||||
# Use get_open_prices function to get the opening price of specified stock for the day
|
||||
# If stock symbol does not exist or price data is missing, KeyError exception will be raised
|
||||
try:
|
||||
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
|
||||
except KeyError:
|
||||
# Stock symbol does not exist or price data is missing, return error message
|
||||
return {"error": f"Symbol {symbol} not found! This action will not be allowed.", "symbol": symbol, "date": today_date}
|
||||
db_path = "data/jobs.db"
|
||||
conn = get_db_connection(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Step 4: Validate buy conditions
|
||||
# Calculate cash required for purchase: stock price × buy quantity
|
||||
try:
|
||||
cash_left = current_position["CASH"] - this_symbol_price * amount
|
||||
except Exception as e:
|
||||
print(current_position, "CASH", this_symbol_price, amount)
|
||||
# Step 1: Get current position
|
||||
current_position, next_action_id = get_current_position_from_db(job_id, signature, today_date)
|
||||
|
||||
# Check if cash balance is sufficient for purchase
|
||||
if cash_left < 0:
|
||||
# Insufficient cash, return error message
|
||||
return {"error": "Insufficient cash! This action will not be allowed.", "required_cash": this_symbol_price * amount, "cash_available": current_position.get("CASH", 0), "symbol": symbol, "date": today_date}
|
||||
else:
|
||||
# Step 5: Execute buy operation, update position
|
||||
# Create a copy of current position to avoid directly modifying original data
|
||||
# Step 2: Get stock price
|
||||
try:
|
||||
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
|
||||
except KeyError:
|
||||
return {"error": f"Symbol {symbol} not found on {today_date}", "symbol": symbol, "date": today_date}
|
||||
|
||||
# Step 3: Validate sufficient cash
|
||||
cash_required = this_symbol_price * amount
|
||||
cash_available = current_position.get("CASH", 0)
|
||||
cash_left = cash_available - cash_required
|
||||
|
||||
if cash_left < 0:
|
||||
return {
|
||||
"error": "Insufficient cash",
|
||||
"required_cash": cash_required,
|
||||
"cash_available": cash_available,
|
||||
"symbol": symbol,
|
||||
"date": today_date
|
||||
}
|
||||
|
||||
# Step 4: Calculate new position
|
||||
new_position = current_position.copy()
|
||||
|
||||
# Decrease cash balance
|
||||
new_position["CASH"] = cash_left
|
||||
|
||||
# Increase stock position quantity
|
||||
new_position[symbol] += amount
|
||||
|
||||
# Step 6: Record transaction to position.jsonl file
|
||||
# Build file path: {project_root}/data/agent_data/{signature}/position/position.jsonl
|
||||
# Use append mode ("a") to write new transaction record
|
||||
# Each operation ID increments by 1, ensuring uniqueness of operation sequence
|
||||
position_file_path = os.path.join(project_root, "data", "agent_data", signature, "position", "position.jsonl")
|
||||
with open(position_file_path, "a") as f:
|
||||
# Write JSON format transaction record, containing date, operation ID, transaction details and updated position
|
||||
print(f"Writing to position.jsonl: {json.dumps({'date': today_date, 'id': current_action_id + 1, 'this_action':{'action':'buy','symbol':symbol,'amount':amount},'positions': new_position})}")
|
||||
f.write(json.dumps({"date": today_date, "id": current_action_id + 1, "this_action":{"action":"buy","symbol":symbol,"amount":amount},"positions": new_position}) + "\n")
|
||||
# Step 7: Return updated position
|
||||
write_config_value("IF_TRADE", True)
|
||||
print("IF_TRADE", get_config_value("IF_TRADE"))
|
||||
new_position[symbol] = new_position.get(symbol, 0) + amount
|
||||
|
||||
# Step 5: Calculate portfolio value and P&L
|
||||
portfolio_value = cash_left
|
||||
for sym, qty in new_position.items():
|
||||
if sym != "CASH":
|
||||
try:
|
||||
price = get_open_prices(today_date, [sym])[f'{sym}_price']
|
||||
portfolio_value += qty * price
|
||||
except KeyError:
|
||||
pass # Symbol price not available, skip
|
||||
|
||||
# Get previous portfolio value for P&L calculation
|
||||
cursor.execute("""
|
||||
SELECT portfolio_value
|
||||
FROM positions
|
||||
WHERE job_id = ? AND model = ? AND date < ?
|
||||
ORDER BY date DESC, action_id DESC
|
||||
LIMIT 1
|
||||
""", (job_id, signature, today_date))
|
||||
|
||||
row = cursor.fetchone()
|
||||
previous_value = row[0] if row else 10000.0 # Default initial value
|
||||
|
||||
daily_profit = portfolio_value - previous_value
|
||||
daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0
|
||||
|
||||
# Step 6: Write to positions table
|
||||
created_at = datetime.utcnow().isoformat() + "Z"
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO positions (
|
||||
job_id, date, model, action_id, action_type, symbol,
|
||||
amount, price, cash, portfolio_value, daily_profit,
|
||||
daily_return_pct, session_id, created_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
job_id, today_date, signature, next_action_id, "buy", symbol,
|
||||
amount, this_symbol_price, cash_left, portfolio_value, daily_profit,
|
||||
daily_return_pct, session_id, created_at
|
||||
))
|
||||
|
||||
position_id = cursor.lastrowid
|
||||
|
||||
# Step 7: Write to holdings table
|
||||
for sym, qty in new_position.items():
|
||||
if sym != "CASH":
|
||||
cursor.execute("""
|
||||
INSERT INTO holdings (position_id, symbol, quantity)
|
||||
VALUES (?, ?, ?)
|
||||
""", (position_id, sym, qty))
|
||||
|
||||
conn.commit()
|
||||
print(f"[buy] {signature} bought {amount} shares of {symbol} at ${this_symbol_price}")
|
||||
return new_position
|
||||
|
||||
@mcp.tool()
|
||||
def sell(symbol: str, amount: int, signature: str = None, today_date: str = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Sell stock function
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
return {"error": f"Trade failed: {str(e)}", "symbol": symbol, "date": today_date}
|
||||
|
||||
This function simulates stock selling operations, including the following steps:
|
||||
1. Get current position and operation ID
|
||||
2. Get stock opening price for the day
|
||||
3. Validate sell conditions (position exists, sufficient quantity)
|
||||
4. Update position (decrease stock quantity, increase cash)
|
||||
5. Record transaction to position.jsonl file
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def sell(symbol: str, amount: int, signature: str = None, today_date: str = None,
|
||||
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Sell stock function - writes to SQLite database.
|
||||
|
||||
Args:
|
||||
symbol: Stock symbol, such as "AAPL", "MSFT", etc.
|
||||
amount: Sell quantity, must be a positive integer, indicating how many shares to sell
|
||||
signature: Model signature (optional, will use config/env if not provided)
|
||||
today_date: Trading date (optional, will use config/env if not provided)
|
||||
symbol: Stock symbol (e.g., "AAPL", "MSFT")
|
||||
amount: Number of shares to sell (positive integer)
|
||||
signature: Model signature (injected by ContextInjector)
|
||||
today_date: Trading date YYYY-MM-DD (injected by ContextInjector)
|
||||
job_id: Job UUID (injected by ContextInjector)
|
||||
session_id: Trading session ID (injected by ContextInjector)
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]:
|
||||
- Success: Returns new position dictionary (containing stock quantity and cash balance)
|
||||
- Failure: Returns {"error": error message, ...} dictionary
|
||||
|
||||
Raises:
|
||||
ValueError: Raised when SIGNATURE environment variable is not set
|
||||
|
||||
Example:
|
||||
>>> result = sell("AAPL", 10)
|
||||
>>> print(result) # {"AAPL": 90, "MSFT": 5, "CASH": 15000.0, ...}
|
||||
- Success: {"CASH": amount, symbol: quantity, ...}
|
||||
- Failure: {"error": message, ...}
|
||||
"""
|
||||
# Step 1: Get environment variables and basic information
|
||||
# Get signature (model name) from parameter or fallback to config/env
|
||||
if signature is None:
|
||||
signature = get_config_value("SIGNATURE")
|
||||
if signature is None:
|
||||
raise ValueError("SIGNATURE not provided and environment variable is not set")
|
||||
# Validate required parameters
|
||||
if not job_id:
|
||||
return {"error": "Missing required parameter: job_id"}
|
||||
if not signature:
|
||||
return {"error": "Missing required parameter: signature"}
|
||||
if not today_date:
|
||||
return {"error": "Missing required parameter: today_date"}
|
||||
|
||||
db_path = "data/jobs.db"
|
||||
conn = get_db_connection(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get current trading date from parameter or fallback to config/env
|
||||
if today_date is None:
|
||||
today_date = get_config_value("TODAY_DATE")
|
||||
|
||||
# Step 2: Get current latest position and operation ID
|
||||
# get_latest_position returns two values: position dictionary and current maximum operation ID
|
||||
# This ID is used to ensure each operation has a unique identifier
|
||||
current_position, current_action_id = get_latest_position(today_date, signature)
|
||||
|
||||
# Step 3: Get stock opening price for the day
|
||||
# Use get_open_prices function to get the opening price of specified stock for the day
|
||||
# If stock symbol does not exist or price data is missing, KeyError exception will be raised
|
||||
try:
|
||||
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
|
||||
except KeyError:
|
||||
# Stock symbol does not exist or price data is missing, return error message
|
||||
return {"error": f"Symbol {symbol} not found! This action will not be allowed.", "symbol": symbol, "date": today_date}
|
||||
# Step 1: Get current position
|
||||
current_position, next_action_id = get_current_position_from_db(job_id, signature, today_date)
|
||||
|
||||
# Step 4: Validate sell conditions
|
||||
# Check if holding this stock
|
||||
if symbol not in current_position:
|
||||
return {"error": f"No position for {symbol}! This action will not be allowed.", "symbol": symbol, "date": today_date}
|
||||
# Step 2: Validate position exists
|
||||
if symbol not in current_position:
|
||||
return {"error": f"No position for {symbol}", "symbol": symbol, "date": today_date}
|
||||
|
||||
# Check if position quantity is sufficient for selling
|
||||
if current_position[symbol] < amount:
|
||||
return {"error": "Insufficient shares! This action will not be allowed.", "have": current_position.get(symbol, 0), "want_to_sell": amount, "symbol": symbol, "date": today_date}
|
||||
if current_position[symbol] < amount:
|
||||
return {
|
||||
"error": "Insufficient shares",
|
||||
"have": current_position[symbol],
|
||||
"want_to_sell": amount,
|
||||
"symbol": symbol,
|
||||
"date": today_date
|
||||
}
|
||||
|
||||
# Step 5: Execute sell operation, update position
|
||||
# Create a copy of current position to avoid directly modifying original data
|
||||
new_position = current_position.copy()
|
||||
|
||||
# Decrease stock position quantity
|
||||
new_position[symbol] -= amount
|
||||
|
||||
# Increase cash balance: sell price × sell quantity
|
||||
# Use get method to ensure CASH field exists, default to 0 if not present
|
||||
new_position["CASH"] = new_position.get("CASH", 0) + this_symbol_price * amount
|
||||
# Step 3: Get stock price
|
||||
try:
|
||||
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
|
||||
except KeyError:
|
||||
return {"error": f"Symbol {symbol} not found on {today_date}", "symbol": symbol, "date": today_date}
|
||||
|
||||
# Step 6: Record transaction to position.jsonl file
|
||||
# Build file path: {project_root}/data/agent_data/{signature}/position/position.jsonl
|
||||
# Use append mode ("a") to write new transaction record
|
||||
# Each operation ID increments by 1, ensuring uniqueness of operation sequence
|
||||
position_file_path = os.path.join(project_root, "data", "agent_data", signature, "position", "position.jsonl")
|
||||
with open(position_file_path, "a") as f:
|
||||
# Write JSON format transaction record, containing date, operation ID and updated position
|
||||
print(f"Writing to position.jsonl: {json.dumps({'date': today_date, 'id': current_action_id + 1, 'this_action':{'action':'sell','symbol':symbol,'amount':amount},'positions': new_position})}")
|
||||
f.write(json.dumps({"date": today_date, "id": current_action_id + 1, "this_action":{"action":"sell","symbol":symbol,"amount":amount},"positions": new_position}) + "\n")
|
||||
# Step 4: Calculate new position
|
||||
new_position = current_position.copy()
|
||||
new_position[symbol] -= amount
|
||||
new_position["CASH"] = new_position.get("CASH", 0) + (this_symbol_price * amount)
|
||||
|
||||
# Step 5: Calculate portfolio value and P&L
|
||||
portfolio_value = new_position["CASH"]
|
||||
for sym, qty in new_position.items():
|
||||
if sym != "CASH":
|
||||
try:
|
||||
price = get_open_prices(today_date, [sym])[f'{sym}_price']
|
||||
portfolio_value += qty * price
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# Get previous portfolio value
|
||||
cursor.execute("""
|
||||
SELECT portfolio_value
|
||||
FROM positions
|
||||
WHERE job_id = ? AND model = ? AND date < ?
|
||||
ORDER BY date DESC, action_id DESC
|
||||
LIMIT 1
|
||||
""", (job_id, signature, today_date))
|
||||
|
||||
row = cursor.fetchone()
|
||||
previous_value = row[0] if row else 10000.0
|
||||
|
||||
daily_profit = portfolio_value - previous_value
|
||||
daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0
|
||||
|
||||
# Step 6: Write to positions table
|
||||
created_at = datetime.utcnow().isoformat() + "Z"
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO positions (
|
||||
job_id, date, model, action_id, action_type, symbol,
|
||||
amount, price, cash, portfolio_value, daily_profit,
|
||||
daily_return_pct, session_id, created_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
job_id, today_date, signature, next_action_id, "sell", symbol,
|
||||
amount, this_symbol_price, new_position["CASH"], portfolio_value, daily_profit,
|
||||
daily_return_pct, session_id, created_at
|
||||
))
|
||||
|
||||
position_id = cursor.lastrowid
|
||||
|
||||
# Step 7: Write to holdings table
|
||||
for sym, qty in new_position.items():
|
||||
if sym != "CASH":
|
||||
cursor.execute("""
|
||||
INSERT INTO holdings (position_id, symbol, quantity)
|
||||
VALUES (?, ?, ?)
|
||||
""", (position_id, sym, qty))
|
||||
|
||||
conn.commit()
|
||||
print(f"[sell] {signature} sold {amount} shares of {symbol} at ${this_symbol_price}")
|
||||
return new_position
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
return {"error": f"Trade failed: {str(e)}", "symbol": symbol, "date": today_date}
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
# Step 7: Return updated position
|
||||
write_config_value("IF_TRADE", True)
|
||||
return new_position
|
||||
|
||||
if __name__ == "__main__":
|
||||
# new_result = buy("AAPL", 1)
|
||||
# print(new_result)
|
||||
# new_result = sell("AAPL", 1)
|
||||
# print(new_result)
|
||||
port = int(os.getenv("TRADE_HTTP_PORT", "8002"))
|
||||
mcp.run(transport="streamable-http", port=port)
|
||||
|
||||
@@ -122,12 +122,29 @@ class ModelDayExecutor:
|
||||
session_id = self._create_trading_session(cursor)
|
||||
conn.commit()
|
||||
|
||||
# Initialize starting position if this is first day
|
||||
self._initialize_starting_position(cursor, session_id)
|
||||
conn.commit()
|
||||
|
||||
# Set environment variable for agent to use isolated config
|
||||
os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path
|
||||
|
||||
# Initialize agent
|
||||
# Initialize agent (without context)
|
||||
agent = await self._initialize_agent()
|
||||
|
||||
# Create and inject context with correct values
|
||||
from agent.context_injector import ContextInjector
|
||||
context_injector = ContextInjector(
|
||||
signature=self.model_sig,
|
||||
today_date=self.date, # Current trading day
|
||||
job_id=self.job_id,
|
||||
session_id=session_id
|
||||
)
|
||||
logger.info(f"[DEBUG] ModelDayExecutor: Created ContextInjector with signature={self.model_sig}, date={self.date}, job_id={self.job_id}, session_id={session_id}")
|
||||
logger.info(f"[DEBUG] ModelDayExecutor: Calling await agent.set_context()")
|
||||
await agent.set_context(context_injector)
|
||||
logger.info(f"[DEBUG] ModelDayExecutor: set_context() completed")
|
||||
|
||||
# Run trading session
|
||||
logger.info(f"Running trading session for {self.model_sig} on {self.date}")
|
||||
session_result = await agent.run_trading_session(self.date)
|
||||
@@ -141,10 +158,13 @@ class ModelDayExecutor:
|
||||
# Update session summary
|
||||
await self._update_session_summary(cursor, session_id, conversation, agent)
|
||||
|
||||
# Store positions (pass session_id)
|
||||
self._write_results_to_db(agent, session_id)
|
||||
|
||||
# Commit and close connection before _write_results_to_db opens a new one
|
||||
conn.commit()
|
||||
conn.close()
|
||||
conn = None # Mark as closed
|
||||
|
||||
# Store positions (pass session_id) - this opens its own connection
|
||||
self._write_results_to_db(agent, session_id)
|
||||
|
||||
# Update status to completed
|
||||
self.job_manager.update_job_detail_status(
|
||||
@@ -287,6 +307,51 @@ class ModelDayExecutor:
|
||||
|
||||
return cursor.lastrowid
|
||||
|
||||
def _initialize_starting_position(self, cursor, session_id: int) -> None:
|
||||
"""
|
||||
Initialize starting position if no prior positions exist for this job+model.
|
||||
|
||||
Creates action_id=0 position with initial_cash and zero stock holdings.
|
||||
|
||||
Args:
|
||||
cursor: Database cursor
|
||||
session_id: Trading session ID
|
||||
"""
|
||||
# Check if any positions exist for this job+model
|
||||
cursor.execute("""
|
||||
SELECT COUNT(*) FROM positions
|
||||
WHERE job_id = ? AND model = ?
|
||||
""", (self.job_id, self.model_sig))
|
||||
|
||||
if cursor.fetchone()[0] > 0:
|
||||
# Positions already exist, no initialization needed
|
||||
return
|
||||
|
||||
# Load config to get initial_cash
|
||||
import json
|
||||
with open(self.config_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
|
||||
agent_config = config.get("agent_config", {})
|
||||
initial_cash = agent_config.get("initial_cash", 10000.0)
|
||||
|
||||
# Create initial position record
|
||||
from datetime import datetime
|
||||
created_at = datetime.utcnow().isoformat() + "Z"
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO positions (
|
||||
job_id, date, model, action_id, action_type,
|
||||
cash, portfolio_value, session_id, created_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
self.job_id, self.date, self.model_sig, 0, "no_trade",
|
||||
initial_cash, initial_cash, session_id, created_at
|
||||
))
|
||||
|
||||
logger.info(f"Initialized starting position for {self.model_sig} with ${initial_cash}")
|
||||
|
||||
async def _store_reasoning_logs(
|
||||
self,
|
||||
cursor,
|
||||
|
||||
476
docs/plans/2025-02-11-database-position-tracking-design.md
Normal file
476
docs/plans/2025-02-11-database-position-tracking-design.md
Normal file
@@ -0,0 +1,476 @@
|
||||
# Database-Only Position Tracking Design
|
||||
|
||||
**Date:** 2025-02-11
|
||||
**Status:** Approved
|
||||
**Version:** 1.0
|
||||
|
||||
## Problem Statement
|
||||
|
||||
Two critical issues prevent simulations from running:
|
||||
|
||||
1. **ContextInjector receives None values**: The ContextInjector shows `{'signature': None, 'today_date': None, 'job_id': None, 'session_id': None}` when injecting parameters into trade tool calls, causing trade validation to fail.
|
||||
|
||||
2. **File-based position tracking still in use**: System prompt builder and no-trade handler attempt to read/write position.jsonl files that no longer exist after SQLite migration.
|
||||
|
||||
## Root Cause Analysis
|
||||
|
||||
### Issue 1: ContextInjector Initialization Timing
|
||||
|
||||
**Problem Chain:**
|
||||
- `BaseAgent.__init__()` creates `ContextInjector` with `self.init_date`
|
||||
- `init_date` is the START of simulation date range (e.g., "2025-10-13"), not current trading day ("2025-10-01")
|
||||
- Runtime config contains correct values (`TODAY_DATE="2025-10-01"`, `SIGNATURE="gpt-5"`, `JOB_ID="dc488e87..."`), but BaseAgent doesn't use them during initialization
|
||||
- ContextInjector is created before the trading session, so it doesn't know the correct date
|
||||
|
||||
**Evidence:**
|
||||
```
|
||||
ai-trader-app | [ContextInjector] Tool: buy, Args after injection: {'symbol': 'MSFT', 'amount': 1, 'signature': None, 'today_date': None, 'job_id': None, 'session_id': None}
|
||||
```
|
||||
|
||||
### Issue 2: Mixed Storage Architecture
|
||||
|
||||
**Problem Chain:**
|
||||
- Trade tools (tool_trade.py) query/write to SQLite database
|
||||
- System prompt builder calls `get_today_init_position()` which reads position.jsonl files
|
||||
- No-trade handler calls `add_no_trade_record()` which writes to position.jsonl files
|
||||
- Files don't exist because we migrated to database-only storage
|
||||
|
||||
**Evidence:**
|
||||
```
|
||||
FileNotFoundError: [Errno 2] No such file or directory: '/app/data/agent_data/gpt-5/position/position.jsonl'
|
||||
```
|
||||
|
||||
## Design Solution
|
||||
|
||||
### Architecture Principles
|
||||
|
||||
1. **Database-only position storage**: All position queries and writes go through SQLite
|
||||
2. **Lazy context injection**: Create ContextInjector after runtime config is written and session is created
|
||||
3. **Real-time database queries**: System prompt builder queries database directly, no file caching
|
||||
4. **Clean initialization order**: Config → Database → Agent → Context → Session
|
||||
|
||||
### Component Changes
|
||||
|
||||
#### 1. ContextInjector Lifecycle Refactor
|
||||
|
||||
**BaseAgent Changes:**
|
||||
|
||||
Remove ContextInjector creation from `__init__()`:
|
||||
```python
|
||||
# OLD (in __init__)
|
||||
self.context_injector = ContextInjector(
|
||||
signature=self.signature,
|
||||
today_date=self.init_date, # WRONG: uses start date
|
||||
job_id=job_id
|
||||
)
|
||||
self.client = MultiServerMCPClient(
|
||||
self.mcp_config,
|
||||
tool_interceptors=[self.context_injector]
|
||||
)
|
||||
|
||||
# NEW (in __init__)
|
||||
self.context_injector = None
|
||||
self.client = MultiServerMCPClient(
|
||||
self.mcp_config,
|
||||
tool_interceptors=[] # Empty initially
|
||||
)
|
||||
```
|
||||
|
||||
Add new method `set_context()`:
|
||||
```python
|
||||
def set_context(self, context_injector: ContextInjector) -> None:
|
||||
"""Inject ContextInjector after initialization.
|
||||
|
||||
Args:
|
||||
context_injector: Configured ContextInjector instance
|
||||
"""
|
||||
self.context_injector = context_injector
|
||||
self.client.add_interceptor(context_injector)
|
||||
```
|
||||
|
||||
**ModelDayExecutor Changes:**
|
||||
|
||||
Create and inject ContextInjector after agent initialization:
|
||||
```python
|
||||
async def execute_async(self) -> Dict[str, Any]:
|
||||
# ... create session, initialize position ...
|
||||
|
||||
# Set RUNTIME_ENV_PATH
|
||||
os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path
|
||||
|
||||
# Initialize agent (without context)
|
||||
agent = await self._initialize_agent()
|
||||
|
||||
# Create context injector with correct values
|
||||
context_injector = ContextInjector(
|
||||
signature=self.model_sig,
|
||||
today_date=self.date, # CORRECT: current trading day
|
||||
job_id=self.job_id,
|
||||
session_id=session_id
|
||||
)
|
||||
|
||||
# Inject context into agent
|
||||
agent.set_context(context_injector)
|
||||
|
||||
# Run trading session
|
||||
session_result = await agent.run_trading_session(self.date)
|
||||
```
|
||||
|
||||
#### 2. Database Position Query Functions
|
||||
|
||||
**New Functions (tools/price_tools.py):**
|
||||
|
||||
```python
|
||||
def get_today_init_position_from_db(
|
||||
today_date: str,
|
||||
modelname: str,
|
||||
job_id: str
|
||||
) -> Dict[str, float]:
|
||||
"""
|
||||
Query yesterday's position from database.
|
||||
|
||||
Args:
|
||||
today_date: Current trading date (YYYY-MM-DD)
|
||||
modelname: Model signature
|
||||
job_id: Job UUID
|
||||
|
||||
Returns:
|
||||
Position dict: {"AAPL": 50, "MSFT": 30, "CASH": 5000.0}
|
||||
If no position exists: {"CASH": 10000.0} (initial cash)
|
||||
"""
|
||||
from tools.deployment_config import get_db_path
|
||||
from api.database import get_db_connection
|
||||
|
||||
db_path = get_db_path()
|
||||
conn = get_db_connection(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# Get most recent position before today
|
||||
cursor.execute("""
|
||||
SELECT p.id, p.cash
|
||||
FROM positions p
|
||||
WHERE p.job_id = ? AND p.model = ? AND p.date < ?
|
||||
ORDER BY p.date DESC, p.action_id DESC
|
||||
LIMIT 1
|
||||
""", (job_id, modelname, today_date))
|
||||
|
||||
row = cursor.fetchone()
|
||||
|
||||
if not row:
|
||||
# First day - return initial cash
|
||||
return {"CASH": 10000.0} # TODO: Read from config
|
||||
|
||||
position_id, cash = row
|
||||
position_dict = {"CASH": cash}
|
||||
|
||||
# Get holdings for this position
|
||||
cursor.execute("""
|
||||
SELECT symbol, quantity
|
||||
FROM holdings
|
||||
WHERE position_id = ?
|
||||
""", (position_id,))
|
||||
|
||||
for symbol, quantity in cursor.fetchall():
|
||||
position_dict[symbol] = quantity
|
||||
|
||||
return position_dict
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def add_no_trade_record_to_db(
|
||||
today_date: str,
|
||||
modelname: str,
|
||||
job_id: str,
|
||||
session_id: int
|
||||
) -> None:
|
||||
"""
|
||||
Create no-trade position record in database.
|
||||
|
||||
Args:
|
||||
today_date: Current trading date (YYYY-MM-DD)
|
||||
modelname: Model signature
|
||||
job_id: Job UUID
|
||||
session_id: Trading session ID
|
||||
"""
|
||||
from tools.deployment_config import get_db_path
|
||||
from api.database import get_db_connection
|
||||
from agent_tools.tool_trade import get_current_position_from_db
|
||||
from datetime import datetime
|
||||
|
||||
db_path = get_db_path()
|
||||
conn = get_db_connection(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# Get current position
|
||||
current_position, next_action_id = get_current_position_from_db(
|
||||
job_id, modelname, today_date
|
||||
)
|
||||
|
||||
# Calculate portfolio value
|
||||
# (Reuse logic from tool_trade.py)
|
||||
cash = current_position.get("CASH", 0.0)
|
||||
portfolio_value = cash
|
||||
|
||||
# Add stock values
|
||||
for symbol, qty in current_position.items():
|
||||
if symbol != "CASH":
|
||||
try:
|
||||
from tools.price_tools import get_open_prices
|
||||
price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
|
||||
portfolio_value += qty * price
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# Get previous value for P&L
|
||||
cursor.execute("""
|
||||
SELECT portfolio_value
|
||||
FROM positions
|
||||
WHERE job_id = ? AND model = ? AND date < ?
|
||||
ORDER BY date DESC, action_id DESC
|
||||
LIMIT 1
|
||||
""", (job_id, modelname, today_date))
|
||||
|
||||
row = cursor.fetchone()
|
||||
previous_value = row[0] if row else 10000.0
|
||||
|
||||
daily_profit = portfolio_value - previous_value
|
||||
daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0
|
||||
|
||||
# Insert position record
|
||||
created_at = datetime.utcnow().isoformat() + "Z"
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO positions (
|
||||
job_id, date, model, action_id, action_type,
|
||||
cash, portfolio_value, daily_profit, daily_return_pct,
|
||||
session_id, created_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
job_id, today_date, modelname, next_action_id, "no_trade",
|
||||
cash, portfolio_value, daily_profit, daily_return_pct,
|
||||
session_id, created_at
|
||||
))
|
||||
|
||||
position_id = cursor.lastrowid
|
||||
|
||||
# Insert holdings (unchanged from previous position)
|
||||
for symbol, qty in current_position.items():
|
||||
if symbol != "CASH":
|
||||
cursor.execute("""
|
||||
INSERT INTO holdings (position_id, symbol, quantity)
|
||||
VALUES (?, ?, ?)
|
||||
""", (position_id, symbol, qty))
|
||||
|
||||
conn.commit()
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
raise
|
||||
finally:
|
||||
conn.close()
|
||||
```
|
||||
|
||||
#### 3. System Prompt Builder Updates
|
||||
|
||||
**Modified Function (prompts/agent_prompt.py):**
|
||||
|
||||
```python
|
||||
def get_agent_system_prompt(today_date: str, signature: str) -> str:
|
||||
"""Build system prompt with database position queries."""
|
||||
from tools.general_tools import get_config_value
|
||||
|
||||
print(f"signature: {signature}")
|
||||
print(f"today_date: {today_date}")
|
||||
|
||||
# Get job_id from runtime config
|
||||
job_id = get_config_value("JOB_ID")
|
||||
if not job_id:
|
||||
raise ValueError("JOB_ID not found in runtime config")
|
||||
|
||||
# Query database for yesterday's position
|
||||
today_init_position = get_today_init_position_from_db(
|
||||
today_date, signature, job_id
|
||||
)
|
||||
|
||||
# Get prices (unchanged)
|
||||
yesterday_buy_prices, yesterday_sell_prices = get_yesterday_open_and_close_price(
|
||||
today_date, all_nasdaq_100_symbols
|
||||
)
|
||||
today_buy_price = get_open_prices(today_date, all_nasdaq_100_symbols)
|
||||
yesterday_profit = get_yesterday_profit(
|
||||
today_date, yesterday_buy_prices, yesterday_sell_prices, today_init_position
|
||||
)
|
||||
|
||||
return agent_system_prompt.format(
|
||||
date=today_date,
|
||||
positions=today_init_position,
|
||||
STOP_SIGNAL=STOP_SIGNAL,
|
||||
yesterday_close_price=yesterday_sell_prices,
|
||||
today_buy_price=today_buy_price,
|
||||
yesterday_profit=yesterday_profit
|
||||
)
|
||||
```
|
||||
|
||||
#### 4. No-Trade Handler Updates
|
||||
|
||||
**Modified Method (agent/base_agent/base_agent.py):**
|
||||
|
||||
```python
|
||||
async def _handle_trading_result(self, today_date: str) -> None:
|
||||
"""Handle trading results with database writes."""
|
||||
from tools.general_tools import get_config_value
|
||||
from tools.price_tools import add_no_trade_record_to_db
|
||||
|
||||
if_trade = get_config_value("IF_TRADE")
|
||||
|
||||
if if_trade:
|
||||
write_config_value("IF_TRADE", False)
|
||||
print("✅ Trading completed")
|
||||
else:
|
||||
print("📊 No trading, maintaining positions")
|
||||
|
||||
# Get context from runtime config
|
||||
job_id = get_config_value("JOB_ID")
|
||||
session_id = self.context_injector.session_id if self.context_injector else None
|
||||
|
||||
if not job_id or not session_id:
|
||||
raise ValueError("Missing JOB_ID or session_id for no-trade record")
|
||||
|
||||
# Write no-trade record to database
|
||||
add_no_trade_record_to_db(
|
||||
today_date,
|
||||
self.signature,
|
||||
job_id,
|
||||
session_id
|
||||
)
|
||||
|
||||
write_config_value("IF_TRADE", False)
|
||||
```
|
||||
|
||||
### Data Flow Summary
|
||||
|
||||
**Complete Execution Sequence:**
|
||||
|
||||
1. `ModelDayExecutor.__init__()`:
|
||||
- Create runtime config file with TODAY_DATE, SIGNATURE, JOB_ID
|
||||
|
||||
2. `ModelDayExecutor.execute_async()`:
|
||||
- Create trading_sessions record → get session_id
|
||||
- Initialize starting position (if first day)
|
||||
- Set RUNTIME_ENV_PATH environment variable
|
||||
- Initialize agent (without ContextInjector)
|
||||
- Create ContextInjector(date, model_sig, job_id, session_id)
|
||||
- Call agent.set_context(context_injector)
|
||||
- Run trading session
|
||||
|
||||
3. `BaseAgent.run_trading_session()`:
|
||||
- Build system prompt → queries database for yesterday's position
|
||||
- AI agent analyzes and decides
|
||||
- Calls buy/sell tools → ContextInjector injects parameters
|
||||
- Trade tools write to database
|
||||
- If no trade: add_no_trade_record_to_db()
|
||||
|
||||
4. Position Query Flow:
|
||||
- System prompt needs yesterday's position
|
||||
- `get_today_init_position_from_db(today_date, signature, job_id)`
|
||||
- Query: `SELECT positions + holdings WHERE job_id=? AND model=? AND date<? ORDER BY date DESC, action_id DESC LIMIT 1`
|
||||
- Reconstruct position dict from results
|
||||
- Return to system prompt builder
|
||||
|
||||
### Testing Strategy
|
||||
|
||||
**Critical Test Cases:**
|
||||
|
||||
1. **First Trading Day**
|
||||
- No previous position in database
|
||||
- Returns `{"CASH": 10000.0}`
|
||||
- System prompt shows available cash
|
||||
- Initial position created with action_id=0
|
||||
|
||||
2. **Subsequent Trading Days**
|
||||
- Query finds previous position
|
||||
- System prompt shows yesterday's holdings
|
||||
- Action_id increments properly
|
||||
|
||||
3. **No-Trade Days**
|
||||
- Agent outputs `<FINISH_SIGNAL>` without trading
|
||||
- `add_no_trade_record_to_db()` creates record
|
||||
- Holdings unchanged
|
||||
- Portfolio value calculated
|
||||
|
||||
4. **ContextInjector Values**
|
||||
- All parameters non-None
|
||||
- Debug log shows correct injection
|
||||
- Trade tools validate successfully
|
||||
|
||||
**Edge Cases:**
|
||||
|
||||
- Multiple models, same job (different signatures)
|
||||
- Date gaps (weekends) - query finds Friday on Monday
|
||||
- Mid-simulation restart - resumes from last position
|
||||
- Empty holdings (only CASH)
|
||||
|
||||
**Validation Points:**
|
||||
|
||||
- Log ContextInjector values at injection
|
||||
- Log database query results
|
||||
- Verify initial position created
|
||||
- Check session_id links positions
|
||||
|
||||
## Implementation Checklist
|
||||
|
||||
### Phase 1: ContextInjector Refactor
|
||||
- [ ] Remove ContextInjector creation from BaseAgent.__init__()
|
||||
- [ ] Add BaseAgent.set_context() method
|
||||
- [ ] Update ModelDayExecutor to create and inject ContextInjector
|
||||
- [ ] Add debug logging for injected values
|
||||
|
||||
### Phase 2: Database Position Functions
|
||||
- [ ] Implement get_today_init_position_from_db()
|
||||
- [ ] Implement add_no_trade_record_to_db()
|
||||
- [ ] Add database error handling
|
||||
- [ ] Add logging for query results
|
||||
|
||||
### Phase 3: Integration
|
||||
- [ ] Update get_agent_system_prompt() to use database queries
|
||||
- [ ] Update _handle_trading_result() to use database writes
|
||||
- [ ] Remove/deprecate old file-based functions
|
||||
- [ ] Test first trading day scenario
|
||||
- [ ] Test subsequent trading days
|
||||
- [ ] Test no-trade scenario
|
||||
|
||||
### Phase 4: Validation
|
||||
- [ ] Run full simulation and verify ContextInjector logs
|
||||
- [ ] Verify initial cash appears in system prompt
|
||||
- [ ] Verify trades execute successfully
|
||||
- [ ] Verify no-trade records created
|
||||
- [ ] Check database for correct position records
|
||||
|
||||
## Rollback Plan
|
||||
|
||||
If issues arise:
|
||||
1. Revert ContextInjector changes (keep in __init__)
|
||||
2. Temporarily pass correct date via environment variable
|
||||
3. Keep file-based functions as fallback
|
||||
4. Debug database queries in isolation
|
||||
|
||||
## Success Criteria
|
||||
|
||||
1. ContextInjector logs show all non-None values
|
||||
2. System prompt displays initial $10,000 cash
|
||||
3. Trade tools successfully execute buy/sell operations
|
||||
4. No FileNotFoundError exceptions
|
||||
5. Database contains correct position records
|
||||
6. AI agent can complete full trading day
|
||||
|
||||
## Notes
|
||||
|
||||
- File-based functions marked as deprecated but not removed (backward compatibility)
|
||||
- Database queries use deployment_config for automatic prod/dev resolution
|
||||
- Initial cash value should eventually be read from config, not hardcoded
|
||||
- Consider adding database connection pooling for performance
|
||||
@@ -68,14 +68,24 @@ When you think your task is complete, output
|
||||
def get_agent_system_prompt(today_date: str, signature: str) -> str:
|
||||
print(f"signature: {signature}")
|
||||
print(f"today_date: {today_date}")
|
||||
|
||||
# Get job_id from runtime config
|
||||
job_id = get_config_value("JOB_ID")
|
||||
if not job_id:
|
||||
raise ValueError("JOB_ID not found in runtime config")
|
||||
|
||||
# Query database for yesterday's position
|
||||
from tools.price_tools import get_today_init_position_from_db
|
||||
today_init_position = get_today_init_position_from_db(today_date, signature, job_id)
|
||||
|
||||
# Get yesterday's buy and sell prices
|
||||
yesterday_buy_prices, yesterday_sell_prices = get_yesterday_open_and_close_price(today_date, all_nasdaq_100_symbols)
|
||||
today_buy_price = get_open_prices(today_date, all_nasdaq_100_symbols)
|
||||
today_init_position = get_today_init_position(today_date, signature)
|
||||
yesterday_profit = get_yesterday_profit(today_date, yesterday_buy_prices, yesterday_sell_prices, today_init_position)
|
||||
|
||||
return agent_system_prompt.format(
|
||||
date=today_date,
|
||||
positions=today_init_position,
|
||||
date=today_date,
|
||||
positions=today_init_position,
|
||||
STOP_SIGNAL=STOP_SIGNAL,
|
||||
yesterday_close_price=yesterday_sell_prices,
|
||||
today_buy_price=today_buy_price,
|
||||
|
||||
@@ -299,7 +299,172 @@ def add_no_trade_record(today_date: str, modelname: str):
|
||||
|
||||
with position_file.open("a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(save_item) + "\n")
|
||||
return
|
||||
return
|
||||
|
||||
|
||||
def get_today_init_position_from_db(
|
||||
today_date: str,
|
||||
modelname: str,
|
||||
job_id: str
|
||||
) -> Dict[str, float]:
|
||||
"""
|
||||
Query yesterday's position from SQLite database.
|
||||
|
||||
Args:
|
||||
today_date: Current trading date (YYYY-MM-DD)
|
||||
modelname: Model signature
|
||||
job_id: Job UUID
|
||||
|
||||
Returns:
|
||||
Position dict: {"AAPL": 50, "MSFT": 30, "CASH": 5000.0}
|
||||
If no position exists: {"CASH": 10000.0} (initial cash)
|
||||
"""
|
||||
import logging
|
||||
from api.database import get_db_connection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
db_path = "data/jobs.db"
|
||||
conn = get_db_connection(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# Get most recent position before today
|
||||
cursor.execute("""
|
||||
SELECT p.id, p.cash
|
||||
FROM positions p
|
||||
WHERE p.job_id = ? AND p.model = ? AND p.date < ?
|
||||
ORDER BY p.date DESC, p.action_id DESC
|
||||
LIMIT 1
|
||||
""", (job_id, modelname, today_date))
|
||||
|
||||
row = cursor.fetchone()
|
||||
|
||||
if not row:
|
||||
# First day - return initial cash
|
||||
logger.info(f"No previous position found for {modelname}, returning initial cash")
|
||||
return {"CASH": 10000.0}
|
||||
|
||||
position_id, cash = row
|
||||
position_dict = {"CASH": cash}
|
||||
|
||||
# Get holdings for this position
|
||||
cursor.execute("""
|
||||
SELECT symbol, quantity
|
||||
FROM holdings
|
||||
WHERE position_id = ?
|
||||
""", (position_id,))
|
||||
|
||||
for symbol, quantity in cursor.fetchall():
|
||||
position_dict[symbol] = quantity
|
||||
|
||||
logger.debug(f"Loaded position for {modelname}: {position_dict}")
|
||||
return position_dict
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Database error in get_today_init_position_from_db: {e}")
|
||||
raise
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def add_no_trade_record_to_db(
|
||||
today_date: str,
|
||||
modelname: str,
|
||||
job_id: str,
|
||||
session_id: int
|
||||
) -> None:
|
||||
"""
|
||||
Create no-trade position record in SQLite database.
|
||||
|
||||
Args:
|
||||
today_date: Current trading date (YYYY-MM-DD)
|
||||
modelname: Model signature
|
||||
job_id: Job UUID
|
||||
session_id: Trading session ID
|
||||
"""
|
||||
import logging
|
||||
from api.database import get_db_connection
|
||||
from agent_tools.tool_trade import get_current_position_from_db
|
||||
from datetime import datetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
db_path = "data/jobs.db"
|
||||
conn = get_db_connection(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# Get current position
|
||||
current_position, next_action_id = get_current_position_from_db(
|
||||
job_id, modelname, today_date
|
||||
)
|
||||
|
||||
# Calculate portfolio value
|
||||
cash = current_position.get("CASH", 0.0)
|
||||
portfolio_value = cash
|
||||
|
||||
# Add stock values
|
||||
for symbol, qty in current_position.items():
|
||||
if symbol != "CASH":
|
||||
try:
|
||||
price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
|
||||
portfolio_value += qty * price
|
||||
except KeyError:
|
||||
logger.warning(f"Price not found for {symbol} on {today_date}")
|
||||
pass
|
||||
|
||||
# Get previous value for P&L
|
||||
cursor.execute("""
|
||||
SELECT portfolio_value
|
||||
FROM positions
|
||||
WHERE job_id = ? AND model = ? AND date < ?
|
||||
ORDER BY date DESC, action_id DESC
|
||||
LIMIT 1
|
||||
""", (job_id, modelname, today_date))
|
||||
|
||||
row = cursor.fetchone()
|
||||
previous_value = row[0] if row else 10000.0
|
||||
|
||||
daily_profit = portfolio_value - previous_value
|
||||
daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0
|
||||
|
||||
# Insert position record
|
||||
created_at = datetime.utcnow().isoformat() + "Z"
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO positions (
|
||||
job_id, date, model, action_id, action_type,
|
||||
cash, portfolio_value, daily_profit, daily_return_pct,
|
||||
session_id, created_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
job_id, today_date, modelname, next_action_id, "no_trade",
|
||||
cash, portfolio_value, daily_profit, daily_return_pct,
|
||||
session_id, created_at
|
||||
))
|
||||
|
||||
position_id = cursor.lastrowid
|
||||
|
||||
# Insert holdings (unchanged from previous position)
|
||||
for symbol, qty in current_position.items():
|
||||
if symbol != "CASH":
|
||||
cursor.execute("""
|
||||
INSERT INTO holdings (position_id, symbol, quantity)
|
||||
VALUES (?, ?, ?)
|
||||
""", (position_id, symbol, qty))
|
||||
|
||||
conn.commit()
|
||||
logger.info(f"Created no-trade record for {modelname} on {today_date}")
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
logger.error(f"Database error in add_no_trade_record_to_db: {e}")
|
||||
raise
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
today_date = get_config_value("TODAY_DATE")
|
||||
|
||||
Reference in New Issue
Block a user