Compare commits

..

8 Commits

Author SHA1 Message Date
fcfdf36c1c fix: use 'no_trade' action type for initial position
Changed action_type from 'init' to 'no_trade' in _initialize_starting_position()
to comply with database CHECK constraint that only allows 'buy', 'sell', or 'no_trade'.

Fixes sqlite3.IntegrityError during position initialization.
2025-11-02 21:49:57 -05:00
019c84fca8 refactor: migrate trade tools from file-based to SQLite position storage
Complete rewrite of position management in MCP trade tools:

**Trade Tools (agent_tools/tool_trade.py)**
- Replace file-based position.jsonl reads with SQLite queries
- Add get_current_position_from_db() to query positions and holdings tables
- Rewrite buy() and sell() to write directly to database
- Calculate portfolio value and P&L metrics in tools
- Accept job_id and session_id parameters via ContextInjector
- Return errors with proper context for debugging
- Use deployment-aware database path resolution

**Context Injection (agent/context_injector.py)**
- Add job_id and session_id to constructor
- Inject job_id and session_id into buy/sell tool calls
- Support optional parameters (None in standalone mode)

**BaseAgent (agent/base_agent/base_agent.py)**
- Read JOB_ID from runtime config
- Pass job_id to ContextInjector during initialization
- Enable automatic context injection for API mode

**ModelDayExecutor (api/model_day_executor.py)**
- Add _initialize_starting_position() method
- Create initial position record before agent runs
- Load initial_cash from config
- Update context_injector.session_id after session creation
- Link positions to sessions automatically

**Architecture Changes:**
- Eliminates file-based position tracking entirely
- Single source of truth: SQLite database
- Positions automatically linked to trading sessions
- Concurrent execution safe (no file system conflicts)
- Deployment mode aware (prod vs dev databases)

This completes the migration to database-only position storage.
File-based position.jsonl is no longer used or created.

Fixes context injection errors in concurrent simulations.
2025-11-02 21:36:57 -05:00
8521f685c7 fix: improve error handling in buy/sell functions
Add proper exception handling around get_latest_position() calls in both
buy() and sell() functions. Previously, exceptions were caught but code
continued execution with undefined variables, causing "variable referenced
before assignment" errors.

Now returns error dict with context when position lookup fails.

Related to context injection implementation for concurrent simulations.
2025-11-02 20:38:55 -05:00
bf12e981fe debug: add logging to trace parameter injection 2025-11-02 20:29:35 -05:00
a16bac5d08 fix: use 'args' instead of 'arguments' in MCPToolCallRequest
MCPToolCallRequest has 'args' attribute, not 'arguments'. Fixed
attribute name to match the actual API.
2025-11-02 20:21:43 -05:00
81b92e293a fix: make ContextInjector async to match ToolCallInterceptor protocol
The interceptor __call__ method must be async and follow the proper
signature: async __call__(request, handler) -> result

Previous implementation was synchronous and had wrong signature, causing
'object function can't be used in await expression' error.
2025-11-02 20:11:20 -05:00
b1b486dcc4 fix: inject signature and today_date into trade tool calls for concurrent simulations
Resolves issue where MCP trade tools couldn't access SIGNATURE and TODAY_DATE
during concurrent API simulations, causing "SIGNATURE environment variable is
not set" errors.

Problem:
- MCP services run as separate HTTP processes
- Multiple simulations execute concurrently via ThreadPoolExecutor
- Environment variables from executor process not accessible to MCP services

Solution:
- Add ContextInjector that implements ToolCallInterceptor
- Automatically injects signature and today_date into buy/sell tool calls
- Trade tools accept optional parameters, falling back to config/env
- BaseAgent creates interceptor and updates today_date per session

Changes:
- agent/context_injector.py: New interceptor for context injection
- agent/base_agent/base_agent.py: Create and use ContextInjector
- agent_tools/tool_trade.py: Add optional signature/today_date parameters

Benefits:
- Supports concurrent multi-model simulations
- Maintains backward compatibility with CLI mode
- AI model unaware of injected parameters
2025-11-02 20:01:32 -05:00
1bdfefae35 refactor: remove duplicate MCP service log files
Remove redundant log file creation for MCP services since output is
already captured by Docker logs. This simplifies deployment by removing
unnecessary volume mounts and file management.

Changes:
- Remove logs/ directory creation from Dockerfile
- Remove logs/ volume mount from docker-compose.yml
- Update start_mcp_services.py to send output to DEVNULL
- Update documentation to reflect changes (DOCKER.md, docs/DOCKER.md)
- Update .env.example to remove logs/ from volume description

Users can still view MCP service output via 'docker logs' or
'docker-compose logs -f'. Trading session logs in data/agent_data/
remain unchanged.
2025-11-02 19:57:17 -05:00
10 changed files with 482 additions and 221 deletions

View File

@@ -35,7 +35,7 @@ MAX_SIMULATION_DAYS=30
AUTO_DOWNLOAD_PRICE_DATA=true
# Data Volume Configuration
# Base directory for all persistent data (will contain data/, logs/, configs/ subdirectories)
# Base directory for all persistent data (will contain data/ and configs/ subdirectories)
# Use relative paths (./volumes) or absolute paths (/home/user/ai-trader-volumes)
# Defaults to current directory (.) if not set
VOLUME_PATH=.

View File

@@ -154,10 +154,9 @@ docker-compose up
### Volume Mounts
Docker Compose mounts three volumes for persistent data. By default, these are stored in the project directory:
Docker Compose mounts two volumes for persistent data. By default, these are stored in the project directory:
- `./data:/app/data` - Price data and trading records
- `./logs:/app/logs` - MCP service logs
- `./configs:/app/configs` - Configuration files (allows editing configs without rebuilding)
### Custom Volume Location
@@ -174,7 +173,6 @@ VOLUME_PATH=./volumes
This will store data in:
- `/home/user/trading-data/data/`
- `/home/user/trading-data/logs/`
- `/home/user/trading-data/configs/`
**Note:** The directory structure is automatically created. You'll need to copy your existing configs:
@@ -190,7 +188,7 @@ To reset all trading data:
```bash
docker-compose down
rm -rf ${VOLUME_PATH:-.}/data/agent_data/* ${VOLUME_PATH:-.}/logs/*
rm -rf ${VOLUME_PATH:-.}/data/agent_data/*
docker-compose up
```
@@ -217,8 +215,7 @@ docker pull ghcr.io/xe138/ai-trader-server:latest
```bash
docker run --env-file .env \
-v $(pwd)/data:/app/data \
-v $(pwd)/logs:/app/logs \
-p 8000-8003:8000-8003 \
-p 8080:8080 \
ghcr.io/xe138/ai-trader-server:latest
```
@@ -234,9 +231,9 @@ docker pull ghcr.io/xe138/ai-trader-server:v1.0.0
**Symptom:** Container exits immediately or errors about ports
**Solutions:**
- Check ports 8000-8003 not already in use: `lsof -i :8000-8003`
- View container logs: `docker-compose logs`
- Check MCP service logs: `cat logs/math.log`
- Check if API port 8080 is already in use: `lsof -i :8080`
- Verify MCP services started by checking Docker logs for service startup messages
### Missing API Keys
@@ -258,12 +255,12 @@ docker pull ghcr.io/xe138/ai-trader-server:v1.0.0
### Permission Issues
**Symptom:** Cannot write to data or logs directories
**Symptom:** Cannot write to data directory
**Solutions:**
- Ensure directories writable: `chmod -R 755 data logs`
- Ensure data directory is writable: `chmod -R 755 data`
- Check volume mount permissions
- May need to create directories first: `mkdir -p data logs`
- May need to create directory first: `mkdir -p data`
### Container Keeps Restarting
@@ -298,13 +295,12 @@ docker buildx build --platform linux/amd64,linux/arm64 -t ai-trader-server .
docker stats ai-trader-server
```
### Access MCP Services Directly
### Access API Directly
Services exposed on host:
- Math: http://localhost:8000
- Search: http://localhost:8001
- Trade: http://localhost:8002
- Price: http://localhost:8003
API server exposed on host:
- REST API: http://localhost:8080
MCP services run internally and are not exposed to the host.
## Development Workflow

View File

@@ -27,7 +27,7 @@ WORKDIR /app
COPY . .
# Create necessary directories
RUN mkdir -p data logs data/agent_data
RUN mkdir -p data data/agent_data
# Make entrypoint executable
RUN chmod +x entrypoint.sh

View File

@@ -29,6 +29,7 @@ from tools.deployment_config import (
log_api_key_warning,
get_deployment_mode
)
from agent.context_injector import ContextInjector
# Load environment variables
load_dotenv()
@@ -124,6 +125,9 @@ class BaseAgent:
self.tools: Optional[List] = None
self.model: Optional[ChatOpenAI] = None
self.agent: Optional[Any] = None
# Context injector for MCP tools
self.context_injector: Optional[ContextInjector] = None
# Data paths
self.data_path = os.path.join(self.base_log_path, self.signature)
@@ -169,16 +173,32 @@ class BaseAgent:
print("⚠️ OpenAI base URL not set, using default")
try:
# Create MCP client
self.client = MultiServerMCPClient(self.mcp_config)
# Get job_id from runtime config if available (API mode)
from tools.general_tools import get_config_value
job_id = get_config_value("JOB_ID") # Returns None if not in API mode
# Create context injector for injecting signature and today_date into tool calls
self.context_injector = ContextInjector(
signature=self.signature,
today_date=self.init_date, # Will be updated per trading session
job_id=job_id # Will be None in standalone mode, populated in API mode
)
# Create MCP client with interceptor
self.client = MultiServerMCPClient(
self.mcp_config,
tool_interceptors=[self.context_injector]
)
# Get tools
self.tools = await self.client.get_tools()
if not self.tools:
raw_tools = await self.client.get_tools()
if not raw_tools:
print("⚠️ Warning: No MCP tools loaded. MCP services may not be running.")
print(f" MCP configuration: {self.mcp_config}")
self.tools = []
else:
print(f"✅ Loaded {len(self.tools)} MCP tools")
print(f"✅ Loaded {len(raw_tools)} MCP tools")
self.tools = raw_tools
except Exception as e:
raise RuntimeError(
f"❌ Failed to initialize MCP client: {e}\n"
@@ -336,6 +356,10 @@ Summary:"""
"""
print(f"📈 Starting trading session: {today_date}")
# Update context injector with current trading date
if self.context_injector:
self.context_injector.today_date = today_date
# Clear conversation history for new trading day
self.clear_conversation_history()

66
agent/context_injector.py Normal file
View File

@@ -0,0 +1,66 @@
"""
Tool interceptor for injecting runtime context into MCP tool calls.
This interceptor automatically injects `signature` and `today_date` parameters
into buy/sell tool calls to support concurrent multi-model simulations.
"""
from typing import Any, Callable, Awaitable
class ContextInjector:
"""
Intercepts tool calls to inject runtime context (signature, today_date).
Usage:
interceptor = ContextInjector(signature="gpt-5", today_date="2025-10-01")
client = MultiServerMCPClient(config, tool_interceptors=[interceptor])
"""
def __init__(self, signature: str, today_date: str, job_id: str = None, session_id: int = None):
"""
Initialize context injector.
Args:
signature: Model signature to inject
today_date: Trading date to inject
job_id: Job UUID to inject (optional)
session_id: Trading session ID to inject (optional, updated during execution)
"""
self.signature = signature
self.today_date = today_date
self.job_id = job_id
self.session_id = session_id
async def __call__(
self,
request: Any, # MCPToolCallRequest
handler: Callable[[Any], Awaitable[Any]]
) -> Any: # MCPToolCallResult
"""
Intercept tool call and inject context parameters.
Args:
request: Tool call request containing name and arguments
handler: Async callable to execute the actual tool
Returns:
Result from handler after injecting context
"""
# Inject context parameters for trade tools
if request.name in ["buy", "sell"]:
# Add signature and today_date to args if not present
if "signature" not in request.args:
request.args["signature"] = self.signature
if "today_date" not in request.args:
request.args["today_date"] = self.today_date
if "job_id" not in request.args and self.job_id:
request.args["job_id"] = self.job_id
if "session_id" not in request.args and self.session_id:
request.args["session_id"] = self.session_id
# Debug logging
print(f"[ContextInjector] Tool: {request.name}, Args after injection: {request.args}")
# Call the actual tool handler
return await handler(request)

View File

@@ -52,10 +52,6 @@ class MCPServiceManager:
}
}
# Create logs directory
self.log_dir = Path('logs')
self.log_dir.mkdir(exist_ok=True)
# Set signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
@@ -77,27 +73,23 @@ class MCPServiceManager:
return False
try:
# Start service process
log_file = self.log_dir / f"{service_id}.log"
# Set PYTHONPATH to /app so services can import from tools module
env = os.environ.copy()
env['PYTHONPATH'] = str(Path.cwd())
with open(log_file, 'w') as f:
process = subprocess.Popen(
[sys.executable, str(script_path)],
stdout=f,
stderr=subprocess.STDOUT,
cwd=Path.cwd(), # Use current working directory (/app)
env=env # Pass environment with PYTHONPATH
)
# Start service process (output goes to Docker logs)
process = subprocess.Popen(
[sys.executable, str(script_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
cwd=Path.cwd(), # Use current working directory (/app)
env=env # Pass environment with PYTHONPATH
)
self.services[service_id] = {
'process': process,
'name': service_name,
'port': port,
'log_file': log_file
'port': port
}
print(f"{service_name} service started (PID: {process.pid}, Port: {port})")
@@ -167,15 +159,14 @@ class MCPServiceManager:
print(f"{service['name']} service running normally")
else:
print(f"{service['name']} service failed to start")
print(f" Please check logs: {service['log_file']}")
print(f" Check Docker logs for details: docker logs ai-trader-server")
def print_service_info(self):
"""Print service information"""
print("\n📋 Service information:")
for service_id, service in self.services.items():
print(f" - {service['name']}: http://localhost:{service['port']} (PID: {service['process'].pid})")
print(f"\n📁 Log files location: {self.log_dir.absolute()}")
print("\n🛑 Press Ctrl+C to stop all services")
def keep_alive(self):

View File

@@ -1,197 +1,333 @@
from fastmcp import FastMCP
import sys
import os
from typing import Dict, List, Optional, Any
from typing import Dict, List, Optional, Any, Tuple
# Add project root directory to Python path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)
from tools.price_tools import get_yesterday_date, get_open_prices, get_yesterday_open_and_close_price, get_latest_position, get_yesterday_profit
from tools.price_tools import get_open_prices
import json
from tools.general_tools import get_config_value,write_config_value
from tools.deployment_config import get_db_path
from api.database import get_db_connection
from datetime import datetime
mcp = FastMCP("TradeTools")
def get_current_position_from_db(job_id: str, model: str, date: str) -> Tuple[Dict[str, float], int]:
"""
Query current position from SQLite database.
Args:
job_id: Job UUID
model: Model signature
date: Trading date (YYYY-MM-DD)
Returns:
Tuple of (position_dict, next_action_id)
- position_dict: {symbol: quantity, "CASH": amount}
- next_action_id: Next available action_id for this job+model
Raises:
Exception: If database query fails
"""
db_path = get_db_path()
conn = get_db_connection(db_path)
cursor = conn.cursor()
try:
# Get most recent position on or before this date
cursor.execute("""
SELECT p.id, p.cash
FROM positions p
WHERE p.job_id = ? AND p.model = ? AND p.date <= ?
ORDER BY p.date DESC, p.action_id DESC
LIMIT 1
""", (job_id, model, date))
position_row = cursor.fetchone()
if not position_row:
# No position found - this shouldn't happen if ModelDayExecutor initializes properly
raise Exception(f"No position found for job_id={job_id}, model={model}, date={date}")
position_id = position_row[0]
cash = position_row[1]
# Build position dict starting with CASH
position_dict = {"CASH": cash}
# Get holdings for this position
cursor.execute("""
SELECT symbol, quantity
FROM holdings
WHERE position_id = ?
""", (position_id,))
for row in cursor.fetchall():
symbol = row[0]
quantity = row[1]
position_dict[symbol] = quantity
# Get next action_id
cursor.execute("""
SELECT COALESCE(MAX(action_id), -1) + 1 as next_action_id
FROM positions
WHERE job_id = ? AND model = ?
""", (job_id, model))
next_action_id = cursor.fetchone()[0]
return position_dict, next_action_id
finally:
conn.close()
@mcp.tool()
def buy(symbol: str, amount: int) -> Dict[str, Any]:
def buy(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
"""
Buy stock function
This function simulates stock buying operations, including the following steps:
1. Get current position and operation ID
2. Get stock opening price for the day
3. Validate buy conditions (sufficient cash)
4. Update position (increase stock quantity, decrease cash)
5. Record transaction to position.jsonl file
Buy stock function - writes to SQLite database.
Args:
symbol: Stock symbol, such as "AAPL", "MSFT", etc.
amount: Buy quantity, must be a positive integer, indicating how many shares to buy
symbol: Stock symbol (e.g., "AAPL", "MSFT")
amount: Number of shares to buy (positive integer)
signature: Model signature (injected by ContextInjector)
today_date: Trading date YYYY-MM-DD (injected by ContextInjector)
job_id: Job UUID (injected by ContextInjector)
session_id: Trading session ID (injected by ContextInjector)
Returns:
Dict[str, Any]:
- Success: Returns new position dictionary (containing stock quantity and cash balance)
- Failure: Returns {"error": error message, ...} dictionary
Raises:
ValueError: Raised when SIGNATURE environment variable is not set
Example:
>>> result = buy("AAPL", 10)
>>> print(result) # {"AAPL": 110, "MSFT": 5, "CASH": 5000.0, ...}
- Success: {"CASH": amount, symbol: quantity, ...}
- Failure: {"error": message, ...}
"""
# Step 1: Get environment variables and basic information
# Get signature (model name) from environment variable, used to determine data storage path
signature = get_config_value("SIGNATURE")
if signature is None:
raise ValueError("SIGNATURE environment variable is not set")
# Get current trading date from environment variable
today_date = get_config_value("TODAY_DATE")
# Step 2: Get current latest position and operation ID
# get_latest_position returns two values: position dictionary and current maximum operation ID
# This ID is used to ensure each operation has a unique identifier
try:
current_position, current_action_id = get_latest_position(today_date, signature)
except Exception as e:
print(e)
print(current_position, current_action_id)
print(today_date, signature)
# Step 3: Get stock opening price for the day
# Use get_open_prices function to get the opening price of specified stock for the day
# If stock symbol does not exist or price data is missing, KeyError exception will be raised
try:
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
except KeyError:
# Stock symbol does not exist or price data is missing, return error message
return {"error": f"Symbol {symbol} not found! This action will not be allowed.", "symbol": symbol, "date": today_date}
# Validate required parameters
if not job_id:
return {"error": "Missing required parameter: job_id"}
if not signature:
return {"error": "Missing required parameter: signature"}
if not today_date:
return {"error": "Missing required parameter: today_date"}
# Step 4: Validate buy conditions
# Calculate cash required for purchase: stock price × buy quantity
try:
cash_left = current_position["CASH"] - this_symbol_price * amount
except Exception as e:
print(current_position, "CASH", this_symbol_price, amount)
db_path = get_db_path()
conn = get_db_connection(db_path)
cursor = conn.cursor()
# Check if cash balance is sufficient for purchase
if cash_left < 0:
# Insufficient cash, return error message
return {"error": "Insufficient cash! This action will not be allowed.", "required_cash": this_symbol_price * amount, "cash_available": current_position.get("CASH", 0), "symbol": symbol, "date": today_date}
else:
# Step 5: Execute buy operation, update position
# Create a copy of current position to avoid directly modifying original data
try:
# Step 1: Get current position
current_position, next_action_id = get_current_position_from_db(job_id, signature, today_date)
# Step 2: Get stock price
try:
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
except KeyError:
return {"error": f"Symbol {symbol} not found on {today_date}", "symbol": symbol, "date": today_date}
# Step 3: Validate sufficient cash
cash_required = this_symbol_price * amount
cash_available = current_position.get("CASH", 0)
cash_left = cash_available - cash_required
if cash_left < 0:
return {
"error": "Insufficient cash",
"required_cash": cash_required,
"cash_available": cash_available,
"symbol": symbol,
"date": today_date
}
# Step 4: Calculate new position
new_position = current_position.copy()
# Decrease cash balance
new_position["CASH"] = cash_left
# Increase stock position quantity
new_position[symbol] += amount
# Step 6: Record transaction to position.jsonl file
# Build file path: {project_root}/data/agent_data/{signature}/position/position.jsonl
# Use append mode ("a") to write new transaction record
# Each operation ID increments by 1, ensuring uniqueness of operation sequence
position_file_path = os.path.join(project_root, "data", "agent_data", signature, "position", "position.jsonl")
with open(position_file_path, "a") as f:
# Write JSON format transaction record, containing date, operation ID, transaction details and updated position
print(f"Writing to position.jsonl: {json.dumps({'date': today_date, 'id': current_action_id + 1, 'this_action':{'action':'buy','symbol':symbol,'amount':amount},'positions': new_position})}")
f.write(json.dumps({"date": today_date, "id": current_action_id + 1, "this_action":{"action":"buy","symbol":symbol,"amount":amount},"positions": new_position}) + "\n")
# Step 7: Return updated position
write_config_value("IF_TRADE", True)
print("IF_TRADE", get_config_value("IF_TRADE"))
new_position[symbol] = new_position.get(symbol, 0) + amount
# Step 5: Calculate portfolio value and P&L
portfolio_value = cash_left
for sym, qty in new_position.items():
if sym != "CASH":
try:
price = get_open_prices(today_date, [sym])[f'{sym}_price']
portfolio_value += qty * price
except KeyError:
pass # Symbol price not available, skip
# Get previous portfolio value for P&L calculation
cursor.execute("""
SELECT portfolio_value
FROM positions
WHERE job_id = ? AND model = ? AND date < ?
ORDER BY date DESC, action_id DESC
LIMIT 1
""", (job_id, signature, today_date))
row = cursor.fetchone()
previous_value = row[0] if row else 10000.0 # Default initial value
daily_profit = portfolio_value - previous_value
daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0
# Step 6: Write to positions table
created_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
INSERT INTO positions (
job_id, date, model, action_id, action_type, symbol,
amount, price, cash, portfolio_value, daily_profit,
daily_return_pct, session_id, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
job_id, today_date, signature, next_action_id, "buy", symbol,
amount, this_symbol_price, cash_left, portfolio_value, daily_profit,
daily_return_pct, session_id, created_at
))
position_id = cursor.lastrowid
# Step 7: Write to holdings table
for sym, qty in new_position.items():
if sym != "CASH":
cursor.execute("""
INSERT INTO holdings (position_id, symbol, quantity)
VALUES (?, ?, ?)
""", (position_id, sym, qty))
conn.commit()
print(f"[buy] {signature} bought {amount} shares of {symbol} at ${this_symbol_price}")
return new_position
except Exception as e:
conn.rollback()
return {"error": f"Trade failed: {str(e)}", "symbol": symbol, "date": today_date}
finally:
conn.close()
@mcp.tool()
def sell(symbol: str, amount: int) -> Dict[str, Any]:
def sell(symbol: str, amount: int, signature: str = None, today_date: str = None,
job_id: str = None, session_id: int = None) -> Dict[str, Any]:
"""
Sell stock function
This function simulates stock selling operations, including the following steps:
1. Get current position and operation ID
2. Get stock opening price for the day
3. Validate sell conditions (position exists, sufficient quantity)
4. Update position (decrease stock quantity, increase cash)
5. Record transaction to position.jsonl file
Sell stock function - writes to SQLite database.
Args:
symbol: Stock symbol, such as "AAPL", "MSFT", etc.
amount: Sell quantity, must be a positive integer, indicating how many shares to sell
symbol: Stock symbol (e.g., "AAPL", "MSFT")
amount: Number of shares to sell (positive integer)
signature: Model signature (injected by ContextInjector)
today_date: Trading date YYYY-MM-DD (injected by ContextInjector)
job_id: Job UUID (injected by ContextInjector)
session_id: Trading session ID (injected by ContextInjector)
Returns:
Dict[str, Any]:
- Success: Returns new position dictionary (containing stock quantity and cash balance)
- Failure: Returns {"error": error message, ...} dictionary
Raises:
ValueError: Raised when SIGNATURE environment variable is not set
Example:
>>> result = sell("AAPL", 10)
>>> print(result) # {"AAPL": 90, "MSFT": 5, "CASH": 15000.0, ...}
- Success: {"CASH": amount, symbol: quantity, ...}
- Failure: {"error": message, ...}
"""
# Step 1: Get environment variables and basic information
# Get signature (model name) from environment variable, used to determine data storage path
signature = get_config_value("SIGNATURE")
if signature is None:
raise ValueError("SIGNATURE environment variable is not set")
# Get current trading date from environment variable
today_date = get_config_value("TODAY_DATE")
# Step 2: Get current latest position and operation ID
# get_latest_position returns two values: position dictionary and current maximum operation ID
# This ID is used to ensure each operation has a unique identifier
current_position, current_action_id = get_latest_position(today_date, signature)
# Step 3: Get stock opening price for the day
# Use get_open_prices function to get the opening price of specified stock for the day
# If stock symbol does not exist or price data is missing, KeyError exception will be raised
# Validate required parameters
if not job_id:
return {"error": "Missing required parameter: job_id"}
if not signature:
return {"error": "Missing required parameter: signature"}
if not today_date:
return {"error": "Missing required parameter: today_date"}
db_path = get_db_path()
conn = get_db_connection(db_path)
cursor = conn.cursor()
try:
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
except KeyError:
# Stock symbol does not exist or price data is missing, return error message
return {"error": f"Symbol {symbol} not found! This action will not be allowed.", "symbol": symbol, "date": today_date}
# Step 1: Get current position
current_position, next_action_id = get_current_position_from_db(job_id, signature, today_date)
# Step 4: Validate sell conditions
# Check if holding this stock
if symbol not in current_position:
return {"error": f"No position for {symbol}! This action will not be allowed.", "symbol": symbol, "date": today_date}
# Step 2: Validate position exists
if symbol not in current_position:
return {"error": f"No position for {symbol}", "symbol": symbol, "date": today_date}
# Check if position quantity is sufficient for selling
if current_position[symbol] < amount:
return {"error": "Insufficient shares! This action will not be allowed.", "have": current_position.get(symbol, 0), "want_to_sell": amount, "symbol": symbol, "date": today_date}
if current_position[symbol] < amount:
return {
"error": "Insufficient shares",
"have": current_position[symbol],
"want_to_sell": amount,
"symbol": symbol,
"date": today_date
}
# Step 5: Execute sell operation, update position
# Create a copy of current position to avoid directly modifying original data
new_position = current_position.copy()
# Decrease stock position quantity
new_position[symbol] -= amount
# Increase cash balance: sell price × sell quantity
# Use get method to ensure CASH field exists, default to 0 if not present
new_position["CASH"] = new_position.get("CASH", 0) + this_symbol_price * amount
# Step 3: Get stock price
try:
this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price']
except KeyError:
return {"error": f"Symbol {symbol} not found on {today_date}", "symbol": symbol, "date": today_date}
# Step 6: Record transaction to position.jsonl file
# Build file path: {project_root}/data/agent_data/{signature}/position/position.jsonl
# Use append mode ("a") to write new transaction record
# Each operation ID increments by 1, ensuring uniqueness of operation sequence
position_file_path = os.path.join(project_root, "data", "agent_data", signature, "position", "position.jsonl")
with open(position_file_path, "a") as f:
# Write JSON format transaction record, containing date, operation ID and updated position
print(f"Writing to position.jsonl: {json.dumps({'date': today_date, 'id': current_action_id + 1, 'this_action':{'action':'sell','symbol':symbol,'amount':amount},'positions': new_position})}")
f.write(json.dumps({"date": today_date, "id": current_action_id + 1, "this_action":{"action":"sell","symbol":symbol,"amount":amount},"positions": new_position}) + "\n")
# Step 4: Calculate new position
new_position = current_position.copy()
new_position[symbol] -= amount
new_position["CASH"] = new_position.get("CASH", 0) + (this_symbol_price * amount)
# Step 5: Calculate portfolio value and P&L
portfolio_value = new_position["CASH"]
for sym, qty in new_position.items():
if sym != "CASH":
try:
price = get_open_prices(today_date, [sym])[f'{sym}_price']
portfolio_value += qty * price
except KeyError:
pass
# Get previous portfolio value
cursor.execute("""
SELECT portfolio_value
FROM positions
WHERE job_id = ? AND model = ? AND date < ?
ORDER BY date DESC, action_id DESC
LIMIT 1
""", (job_id, signature, today_date))
row = cursor.fetchone()
previous_value = row[0] if row else 10000.0
daily_profit = portfolio_value - previous_value
daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0
# Step 6: Write to positions table
created_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
INSERT INTO positions (
job_id, date, model, action_id, action_type, symbol,
amount, price, cash, portfolio_value, daily_profit,
daily_return_pct, session_id, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
job_id, today_date, signature, next_action_id, "sell", symbol,
amount, this_symbol_price, new_position["CASH"], portfolio_value, daily_profit,
daily_return_pct, session_id, created_at
))
position_id = cursor.lastrowid
# Step 7: Write to holdings table
for sym, qty in new_position.items():
if sym != "CASH":
cursor.execute("""
INSERT INTO holdings (position_id, symbol, quantity)
VALUES (?, ?, ?)
""", (position_id, sym, qty))
conn.commit()
print(f"[sell] {signature} sold {amount} shares of {symbol} at ${this_symbol_price}")
return new_position
except Exception as e:
conn.rollback()
return {"error": f"Trade failed: {str(e)}", "symbol": symbol, "date": today_date}
finally:
conn.close()
# Step 7: Return updated position
write_config_value("IF_TRADE", True)
return new_position
if __name__ == "__main__":
# new_result = buy("AAPL", 1)
# print(new_result)
# new_result = sell("AAPL", 1)
# print(new_result)
port = int(os.getenv("TRADE_HTTP_PORT", "8002"))
mcp.run(transport="streamable-http", port=port)

View File

@@ -122,12 +122,20 @@ class ModelDayExecutor:
session_id = self._create_trading_session(cursor)
conn.commit()
# Initialize starting position if this is first day
self._initialize_starting_position(cursor, session_id)
conn.commit()
# Set environment variable for agent to use isolated config
os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path
# Initialize agent
agent = await self._initialize_agent()
# Update context injector with session_id
if hasattr(agent, 'context_injector') and agent.context_injector:
agent.context_injector.session_id = session_id
# Run trading session
logger.info(f"Running trading session for {self.model_sig} on {self.date}")
session_result = await agent.run_trading_session(self.date)
@@ -287,6 +295,51 @@ class ModelDayExecutor:
return cursor.lastrowid
def _initialize_starting_position(self, cursor, session_id: int) -> None:
"""
Initialize starting position if no prior positions exist for this job+model.
Creates action_id=0 position with initial_cash and zero stock holdings.
Args:
cursor: Database cursor
session_id: Trading session ID
"""
# Check if any positions exist for this job+model
cursor.execute("""
SELECT COUNT(*) FROM positions
WHERE job_id = ? AND model = ?
""", (self.job_id, self.model_sig))
if cursor.fetchone()[0] > 0:
# Positions already exist, no initialization needed
return
# Load config to get initial_cash
import json
with open(self.config_path, 'r') as f:
config = json.load(f)
agent_config = config.get("agent_config", {})
initial_cash = agent_config.get("initial_cash", 10000.0)
# Create initial position record
from datetime import datetime
created_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
INSERT INTO positions (
job_id, date, model, action_id, action_type,
cash, portfolio_value, session_id, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
self.job_id, self.date, self.model_sig, 0, "no_trade",
initial_cash, initial_cash, session_id, created_at
))
logger.info(f"Initialized starting position for {self.model_sig} with ${initial_cash}")
async def _store_reasoning_logs(
self,
cursor,

View File

@@ -7,7 +7,6 @@ services:
container_name: ai-trader-server
volumes:
- ${VOLUME_PATH:-.}/data:/app/data
- ${VOLUME_PATH:-.}/logs:/app/logs
# User configs mounted to /app/user-configs (default config baked into image)
- ${VOLUME_PATH:-.}/configs:/app/user-configs
environment:

View File

@@ -112,10 +112,9 @@ docker-compose up
### Volume Mounts
Docker Compose mounts three volumes for persistent data. By default, these are stored in the project directory:
Docker Compose mounts two volumes for persistent data. By default, these are stored in the project directory:
- `./data:/app/data` - Price data and trading records
- `./logs:/app/logs` - MCP service logs
- `./configs:/app/configs` - Configuration files (allows editing configs without rebuilding)
### Custom Volume Location
@@ -132,7 +131,6 @@ VOLUME_PATH=./volumes
This will store data in:
- `/home/user/trading-data/data/`
- `/home/user/trading-data/logs/`
- `/home/user/trading-data/configs/`
**Note:** The directory structure is automatically created. You'll need to copy your existing configs:
@@ -148,7 +146,7 @@ To reset all trading data:
```bash
docker-compose down
rm -rf ${VOLUME_PATH:-.}/data/agent_data/* ${VOLUME_PATH:-.}/logs/*
rm -rf ${VOLUME_PATH:-.}/data/agent_data/*
docker-compose up
```
@@ -175,8 +173,7 @@ docker pull ghcr.io/xe138/ai-trader-server:latest
```bash
docker run --env-file .env \
-v $(pwd)/data:/app/data \
-v $(pwd)/logs:/app/logs \
-p 8000-8003:8000-8003 \
-p 8080:8080 \
ghcr.io/xe138/ai-trader-server:latest
```
@@ -192,9 +189,9 @@ docker pull ghcr.io/xe138/ai-trader-server:v1.0.0
**Symptom:** Container exits immediately or errors about ports
**Solutions:**
- Check ports 8000-8003 not already in use: `lsof -i :8000-8003`
- View container logs: `docker-compose logs`
- Check MCP service logs: `cat logs/math.log`
- Check if API port 8080 is already in use: `lsof -i :8080`
- Verify MCP services started by checking Docker logs for service startup messages
### Missing API Keys
@@ -216,12 +213,12 @@ docker pull ghcr.io/xe138/ai-trader-server:v1.0.0
### Permission Issues
**Symptom:** Cannot write to data or logs directories
**Symptom:** Cannot write to data directory
**Solutions:**
- Ensure directories writable: `chmod -R 755 data logs`
- Ensure data directory is writable: `chmod -R 755 data`
- Check volume mount permissions
- May need to create directories first: `mkdir -p data logs`
- May need to create directory first: `mkdir -p data`
### Container Keeps Restarting
@@ -256,13 +253,12 @@ docker buildx build --platform linux/amd64,linux/arm64 -t ai-trader-server .
docker stats ai-trader-server
```
### Access MCP Services Directly
### Access API Directly
Services exposed on host:
- Math: http://localhost:8000
- Search: http://localhost:8001
- Trade: http://localhost:8002
- Price: http://localhost:8003
API server exposed on host:
- REST API: http://localhost:8080
MCP services run internally and are not exposed to the host.
## Development Workflow