diff --git a/agent/base_agent/base_agent.py b/agent/base_agent/base_agent.py index b63b2e9..323b295 100644 --- a/agent/base_agent/base_agent.py +++ b/agent/base_agent/base_agent.py @@ -173,10 +173,15 @@ class BaseAgent: print("⚠️ OpenAI base URL not set, using default") try: + # Get job_id from runtime config if available (API mode) + from tools.general_tools import get_config_value + job_id = get_config_value("JOB_ID") # Returns None if not in API mode + # Create context injector for injecting signature and today_date into tool calls self.context_injector = ContextInjector( signature=self.signature, - today_date=self.init_date # Will be updated per trading session + today_date=self.init_date, # Will be updated per trading session + job_id=job_id # Will be None in standalone mode, populated in API mode ) # Create MCP client with interceptor diff --git a/agent/context_injector.py b/agent/context_injector.py index ee6b6b5..1518b51 100644 --- a/agent/context_injector.py +++ b/agent/context_injector.py @@ -17,16 +17,20 @@ class ContextInjector: client = MultiServerMCPClient(config, tool_interceptors=[interceptor]) """ - def __init__(self, signature: str, today_date: str): + def __init__(self, signature: str, today_date: str, job_id: str = None, session_id: int = None): """ Initialize context injector. Args: signature: Model signature to inject today_date: Trading date to inject + job_id: Job UUID to inject (optional) + session_id: Trading session ID to inject (optional, updated during execution) """ self.signature = signature self.today_date = today_date + self.job_id = job_id + self.session_id = session_id async def __call__( self, @@ -43,13 +47,17 @@ class ContextInjector: Returns: Result from handler after injecting context """ - # Inject signature and today_date for trade tools + # Inject context parameters for trade tools if request.name in ["buy", "sell"]: # Add signature and today_date to args if not present if "signature" not in request.args: request.args["signature"] = self.signature if "today_date" not in request.args: request.args["today_date"] = self.today_date + if "job_id" not in request.args and self.job_id: + request.args["job_id"] = self.job_id + if "session_id" not in request.args and self.session_id: + request.args["session_id"] = self.session_id # Debug logging print(f"[ContextInjector] Tool: {request.name}, Args after injection: {request.args}") diff --git a/agent_tools/tool_trade.py b/agent_tools/tool_trade.py index 4c777e8..f25405c 100644 --- a/agent_tools/tool_trade.py +++ b/agent_tools/tool_trade.py @@ -1,208 +1,333 @@ from fastmcp import FastMCP import sys import os -from typing import Dict, List, Optional, Any +from typing import Dict, List, Optional, Any, Tuple # Add project root directory to Python path project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, project_root) -from tools.price_tools import get_yesterday_date, get_open_prices, get_yesterday_open_and_close_price, get_latest_position, get_yesterday_profit +from tools.price_tools import get_open_prices import json -from tools.general_tools import get_config_value,write_config_value +from tools.deployment_config import get_db_path +from api.database import get_db_connection +from datetime import datetime mcp = FastMCP("TradeTools") - -@mcp.tool() -def buy(symbol: str, amount: int, signature: str = None, today_date: str = None) -> Dict[str, Any]: +def get_current_position_from_db(job_id: str, model: str, date: str) -> Tuple[Dict[str, float], int]: """ - Buy stock function - - This function simulates stock buying operations, including the following steps: - 1. Get current position and operation ID - 2. Get stock opening price for the day - 3. Validate buy conditions (sufficient cash) - 4. Update position (increase stock quantity, decrease cash) - 5. Record transaction to position.jsonl file + Query current position from SQLite database. Args: - symbol: Stock symbol, such as "AAPL", "MSFT", etc. - amount: Buy quantity, must be a positive integer, indicating how many shares to buy - signature: Model signature (optional, will use config/env if not provided) - today_date: Trading date (optional, will use config/env if not provided) + job_id: Job UUID + model: Model signature + date: Trading date (YYYY-MM-DD) + + Returns: + Tuple of (position_dict, next_action_id) + - position_dict: {symbol: quantity, "CASH": amount} + - next_action_id: Next available action_id for this job+model + + Raises: + Exception: If database query fails + """ + db_path = get_db_path() + conn = get_db_connection(db_path) + cursor = conn.cursor() + + try: + # Get most recent position on or before this date + cursor.execute(""" + SELECT p.id, p.cash + FROM positions p + WHERE p.job_id = ? AND p.model = ? AND p.date <= ? + ORDER BY p.date DESC, p.action_id DESC + LIMIT 1 + """, (job_id, model, date)) + + position_row = cursor.fetchone() + + if not position_row: + # No position found - this shouldn't happen if ModelDayExecutor initializes properly + raise Exception(f"No position found for job_id={job_id}, model={model}, date={date}") + + position_id = position_row[0] + cash = position_row[1] + + # Build position dict starting with CASH + position_dict = {"CASH": cash} + + # Get holdings for this position + cursor.execute(""" + SELECT symbol, quantity + FROM holdings + WHERE position_id = ? + """, (position_id,)) + + for row in cursor.fetchall(): + symbol = row[0] + quantity = row[1] + position_dict[symbol] = quantity + + # Get next action_id + cursor.execute(""" + SELECT COALESCE(MAX(action_id), -1) + 1 as next_action_id + FROM positions + WHERE job_id = ? AND model = ? + """, (job_id, model)) + + next_action_id = cursor.fetchone()[0] + + return position_dict, next_action_id + + finally: + conn.close() + + +@mcp.tool() +def buy(symbol: str, amount: int, signature: str = None, today_date: str = None, + job_id: str = None, session_id: int = None) -> Dict[str, Any]: + """ + Buy stock function - writes to SQLite database. + + Args: + symbol: Stock symbol (e.g., "AAPL", "MSFT") + amount: Number of shares to buy (positive integer) + signature: Model signature (injected by ContextInjector) + today_date: Trading date YYYY-MM-DD (injected by ContextInjector) + job_id: Job UUID (injected by ContextInjector) + session_id: Trading session ID (injected by ContextInjector) Returns: Dict[str, Any]: - - Success: Returns new position dictionary (containing stock quantity and cash balance) - - Failure: Returns {"error": error message, ...} dictionary - - Raises: - ValueError: Raised when SIGNATURE environment variable is not set - - Example: - >>> result = buy("AAPL", 10) - >>> print(result) # {"AAPL": 110, "MSFT": 5, "CASH": 5000.0, ...} + - Success: {"CASH": amount, symbol: quantity, ...} + - Failure: {"error": message, ...} """ - # Step 1: Get environment variables and basic information - # Get signature (model name) from parameter or fallback to config/env - print(f"[buy] Received signature parameter: {signature}") - if signature is None: - signature = get_config_value("SIGNATURE") - print(f"[buy] Signature from config: {signature}") - if signature is None: - raise ValueError("SIGNATURE not provided and environment variable is not set") + # Validate required parameters + if not job_id: + return {"error": "Missing required parameter: job_id"} + if not signature: + return {"error": "Missing required parameter: signature"} + if not today_date: + return {"error": "Missing required parameter: today_date"} - # Get current trading date from parameter or fallback to config/env - if today_date is None: - today_date = get_config_value("TODAY_DATE") - - # Step 2: Get current latest position and operation ID - # get_latest_position returns two values: position dictionary and current maximum operation ID - # This ID is used to ensure each operation has a unique identifier - try: - current_position, current_action_id = get_latest_position(today_date, signature) - except Exception as e: - return {"error": f"Failed to get position: {str(e)}", "signature": signature, "date": today_date} - # Step 3: Get stock opening price for the day - # Use get_open_prices function to get the opening price of specified stock for the day - # If stock symbol does not exist or price data is missing, KeyError exception will be raised - try: - this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price'] - except KeyError: - # Stock symbol does not exist or price data is missing, return error message - return {"error": f"Symbol {symbol} not found! This action will not be allowed.", "symbol": symbol, "date": today_date} + db_path = get_db_path() + conn = get_db_connection(db_path) + cursor = conn.cursor() - # Step 4: Validate buy conditions - # Calculate cash required for purchase: stock price × buy quantity try: - cash_left = current_position["CASH"] - this_symbol_price * amount - except Exception as e: - print(current_position, "CASH", this_symbol_price, amount) + # Step 1: Get current position + current_position, next_action_id = get_current_position_from_db(job_id, signature, today_date) - # Check if cash balance is sufficient for purchase - if cash_left < 0: - # Insufficient cash, return error message - return {"error": "Insufficient cash! This action will not be allowed.", "required_cash": this_symbol_price * amount, "cash_available": current_position.get("CASH", 0), "symbol": symbol, "date": today_date} - else: - # Step 5: Execute buy operation, update position - # Create a copy of current position to avoid directly modifying original data + # Step 2: Get stock price + try: + this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price'] + except KeyError: + return {"error": f"Symbol {symbol} not found on {today_date}", "symbol": symbol, "date": today_date} + + # Step 3: Validate sufficient cash + cash_required = this_symbol_price * amount + cash_available = current_position.get("CASH", 0) + cash_left = cash_available - cash_required + + if cash_left < 0: + return { + "error": "Insufficient cash", + "required_cash": cash_required, + "cash_available": cash_available, + "symbol": symbol, + "date": today_date + } + + # Step 4: Calculate new position new_position = current_position.copy() - - # Decrease cash balance new_position["CASH"] = cash_left - - # Increase stock position quantity - new_position[symbol] += amount - - # Step 6: Record transaction to position.jsonl file - # Build file path: {project_root}/data/agent_data/{signature}/position/position.jsonl - # Use append mode ("a") to write new transaction record - # Each operation ID increments by 1, ensuring uniqueness of operation sequence - position_file_path = os.path.join(project_root, "data", "agent_data", signature, "position", "position.jsonl") - with open(position_file_path, "a") as f: - # Write JSON format transaction record, containing date, operation ID, transaction details and updated position - print(f"Writing to position.jsonl: {json.dumps({'date': today_date, 'id': current_action_id + 1, 'this_action':{'action':'buy','symbol':symbol,'amount':amount},'positions': new_position})}") - f.write(json.dumps({"date": today_date, "id": current_action_id + 1, "this_action":{"action":"buy","symbol":symbol,"amount":amount},"positions": new_position}) + "\n") - # Step 7: Return updated position - write_config_value("IF_TRADE", True) - print("IF_TRADE", get_config_value("IF_TRADE")) + new_position[symbol] = new_position.get(symbol, 0) + amount + + # Step 5: Calculate portfolio value and P&L + portfolio_value = cash_left + for sym, qty in new_position.items(): + if sym != "CASH": + try: + price = get_open_prices(today_date, [sym])[f'{sym}_price'] + portfolio_value += qty * price + except KeyError: + pass # Symbol price not available, skip + + # Get previous portfolio value for P&L calculation + cursor.execute(""" + SELECT portfolio_value + FROM positions + WHERE job_id = ? AND model = ? AND date < ? + ORDER BY date DESC, action_id DESC + LIMIT 1 + """, (job_id, signature, today_date)) + + row = cursor.fetchone() + previous_value = row[0] if row else 10000.0 # Default initial value + + daily_profit = portfolio_value - previous_value + daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0 + + # Step 6: Write to positions table + created_at = datetime.utcnow().isoformat() + "Z" + + cursor.execute(""" + INSERT INTO positions ( + job_id, date, model, action_id, action_type, symbol, + amount, price, cash, portfolio_value, daily_profit, + daily_return_pct, session_id, created_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + job_id, today_date, signature, next_action_id, "buy", symbol, + amount, this_symbol_price, cash_left, portfolio_value, daily_profit, + daily_return_pct, session_id, created_at + )) + + position_id = cursor.lastrowid + + # Step 7: Write to holdings table + for sym, qty in new_position.items(): + if sym != "CASH": + cursor.execute(""" + INSERT INTO holdings (position_id, symbol, quantity) + VALUES (?, ?, ?) + """, (position_id, sym, qty)) + + conn.commit() + print(f"[buy] {signature} bought {amount} shares of {symbol} at ${this_symbol_price}") return new_position -@mcp.tool() -def sell(symbol: str, amount: int, signature: str = None, today_date: str = None) -> Dict[str, Any]: - """ - Sell stock function + except Exception as e: + conn.rollback() + return {"error": f"Trade failed: {str(e)}", "symbol": symbol, "date": today_date} - This function simulates stock selling operations, including the following steps: - 1. Get current position and operation ID - 2. Get stock opening price for the day - 3. Validate sell conditions (position exists, sufficient quantity) - 4. Update position (decrease stock quantity, increase cash) - 5. Record transaction to position.jsonl file + finally: + conn.close() + + +@mcp.tool() +def sell(symbol: str, amount: int, signature: str = None, today_date: str = None, + job_id: str = None, session_id: int = None) -> Dict[str, Any]: + """ + Sell stock function - writes to SQLite database. Args: - symbol: Stock symbol, such as "AAPL", "MSFT", etc. - amount: Sell quantity, must be a positive integer, indicating how many shares to sell - signature: Model signature (optional, will use config/env if not provided) - today_date: Trading date (optional, will use config/env if not provided) + symbol: Stock symbol (e.g., "AAPL", "MSFT") + amount: Number of shares to sell (positive integer) + signature: Model signature (injected by ContextInjector) + today_date: Trading date YYYY-MM-DD (injected by ContextInjector) + job_id: Job UUID (injected by ContextInjector) + session_id: Trading session ID (injected by ContextInjector) Returns: Dict[str, Any]: - - Success: Returns new position dictionary (containing stock quantity and cash balance) - - Failure: Returns {"error": error message, ...} dictionary - - Raises: - ValueError: Raised when SIGNATURE environment variable is not set - - Example: - >>> result = sell("AAPL", 10) - >>> print(result) # {"AAPL": 90, "MSFT": 5, "CASH": 15000.0, ...} + - Success: {"CASH": amount, symbol: quantity, ...} + - Failure: {"error": message, ...} """ - # Step 1: Get environment variables and basic information - # Get signature (model name) from parameter or fallback to config/env - if signature is None: - signature = get_config_value("SIGNATURE") - if signature is None: - raise ValueError("SIGNATURE not provided and environment variable is not set") + # Validate required parameters + if not job_id: + return {"error": "Missing required parameter: job_id"} + if not signature: + return {"error": "Missing required parameter: signature"} + if not today_date: + return {"error": "Missing required parameter: today_date"} + + db_path = get_db_path() + conn = get_db_connection(db_path) + cursor = conn.cursor() - # Get current trading date from parameter or fallback to config/env - if today_date is None: - today_date = get_config_value("TODAY_DATE") - - # Step 2: Get current latest position and operation ID - # get_latest_position returns two values: position dictionary and current maximum operation ID - # This ID is used to ensure each operation has a unique identifier try: - current_position, current_action_id = get_latest_position(today_date, signature) + # Step 1: Get current position + current_position, next_action_id = get_current_position_from_db(job_id, signature, today_date) + + # Step 2: Validate position exists + if symbol not in current_position: + return {"error": f"No position for {symbol}", "symbol": symbol, "date": today_date} + + if current_position[symbol] < amount: + return { + "error": "Insufficient shares", + "have": current_position[symbol], + "want_to_sell": amount, + "symbol": symbol, + "date": today_date + } + + # Step 3: Get stock price + try: + this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price'] + except KeyError: + return {"error": f"Symbol {symbol} not found on {today_date}", "symbol": symbol, "date": today_date} + + # Step 4: Calculate new position + new_position = current_position.copy() + new_position[symbol] -= amount + new_position["CASH"] = new_position.get("CASH", 0) + (this_symbol_price * amount) + + # Step 5: Calculate portfolio value and P&L + portfolio_value = new_position["CASH"] + for sym, qty in new_position.items(): + if sym != "CASH": + try: + price = get_open_prices(today_date, [sym])[f'{sym}_price'] + portfolio_value += qty * price + except KeyError: + pass + + # Get previous portfolio value + cursor.execute(""" + SELECT portfolio_value + FROM positions + WHERE job_id = ? AND model = ? AND date < ? + ORDER BY date DESC, action_id DESC + LIMIT 1 + """, (job_id, signature, today_date)) + + row = cursor.fetchone() + previous_value = row[0] if row else 10000.0 + + daily_profit = portfolio_value - previous_value + daily_return_pct = (daily_profit / previous_value * 100) if previous_value > 0 else 0 + + # Step 6: Write to positions table + created_at = datetime.utcnow().isoformat() + "Z" + + cursor.execute(""" + INSERT INTO positions ( + job_id, date, model, action_id, action_type, symbol, + amount, price, cash, portfolio_value, daily_profit, + daily_return_pct, session_id, created_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + job_id, today_date, signature, next_action_id, "sell", symbol, + amount, this_symbol_price, new_position["CASH"], portfolio_value, daily_profit, + daily_return_pct, session_id, created_at + )) + + position_id = cursor.lastrowid + + # Step 7: Write to holdings table + for sym, qty in new_position.items(): + if sym != "CASH": + cursor.execute(""" + INSERT INTO holdings (position_id, symbol, quantity) + VALUES (?, ?, ?) + """, (position_id, sym, qty)) + + conn.commit() + print(f"[sell] {signature} sold {amount} shares of {symbol} at ${this_symbol_price}") + return new_position + except Exception as e: - return {"error": f"Failed to get position: {str(e)}", "signature": signature, "date": today_date} - - # Step 3: Get stock opening price for the day - # Use get_open_prices function to get the opening price of specified stock for the day - # If stock symbol does not exist or price data is missing, KeyError exception will be raised - try: - this_symbol_price = get_open_prices(today_date, [symbol])[f'{symbol}_price'] - except KeyError: - # Stock symbol does not exist or price data is missing, return error message - return {"error": f"Symbol {symbol} not found! This action will not be allowed.", "symbol": symbol, "date": today_date} + conn.rollback() + return {"error": f"Trade failed: {str(e)}", "symbol": symbol, "date": today_date} - # Step 4: Validate sell conditions - # Check if holding this stock - if symbol not in current_position: - return {"error": f"No position for {symbol}! This action will not be allowed.", "symbol": symbol, "date": today_date} + finally: + conn.close() - # Check if position quantity is sufficient for selling - if current_position[symbol] < amount: - return {"error": "Insufficient shares! This action will not be allowed.", "have": current_position.get(symbol, 0), "want_to_sell": amount, "symbol": symbol, "date": today_date} - - # Step 5: Execute sell operation, update position - # Create a copy of current position to avoid directly modifying original data - new_position = current_position.copy() - - # Decrease stock position quantity - new_position[symbol] -= amount - - # Increase cash balance: sell price × sell quantity - # Use get method to ensure CASH field exists, default to 0 if not present - new_position["CASH"] = new_position.get("CASH", 0) + this_symbol_price * amount - - # Step 6: Record transaction to position.jsonl file - # Build file path: {project_root}/data/agent_data/{signature}/position/position.jsonl - # Use append mode ("a") to write new transaction record - # Each operation ID increments by 1, ensuring uniqueness of operation sequence - position_file_path = os.path.join(project_root, "data", "agent_data", signature, "position", "position.jsonl") - with open(position_file_path, "a") as f: - # Write JSON format transaction record, containing date, operation ID and updated position - print(f"Writing to position.jsonl: {json.dumps({'date': today_date, 'id': current_action_id + 1, 'this_action':{'action':'sell','symbol':symbol,'amount':amount},'positions': new_position})}") - f.write(json.dumps({"date": today_date, "id": current_action_id + 1, "this_action":{"action":"sell","symbol":symbol,"amount":amount},"positions": new_position}) + "\n") - - # Step 7: Return updated position - write_config_value("IF_TRADE", True) - return new_position if __name__ == "__main__": - # new_result = buy("AAPL", 1) - # print(new_result) - # new_result = sell("AAPL", 1) - # print(new_result) port = int(os.getenv("TRADE_HTTP_PORT", "8002")) mcp.run(transport="streamable-http", port=port) diff --git a/api/model_day_executor.py b/api/model_day_executor.py index 95620e9..3fba7b4 100644 --- a/api/model_day_executor.py +++ b/api/model_day_executor.py @@ -122,12 +122,20 @@ class ModelDayExecutor: session_id = self._create_trading_session(cursor) conn.commit() + # Initialize starting position if this is first day + self._initialize_starting_position(cursor, session_id) + conn.commit() + # Set environment variable for agent to use isolated config os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path # Initialize agent agent = await self._initialize_agent() + # Update context injector with session_id + if hasattr(agent, 'context_injector') and agent.context_injector: + agent.context_injector.session_id = session_id + # Run trading session logger.info(f"Running trading session for {self.model_sig} on {self.date}") session_result = await agent.run_trading_session(self.date) @@ -287,6 +295,51 @@ class ModelDayExecutor: return cursor.lastrowid + def _initialize_starting_position(self, cursor, session_id: int) -> None: + """ + Initialize starting position if no prior positions exist for this job+model. + + Creates action_id=0 position with initial_cash and zero stock holdings. + + Args: + cursor: Database cursor + session_id: Trading session ID + """ + # Check if any positions exist for this job+model + cursor.execute(""" + SELECT COUNT(*) FROM positions + WHERE job_id = ? AND model = ? + """, (self.job_id, self.model_sig)) + + if cursor.fetchone()[0] > 0: + # Positions already exist, no initialization needed + return + + # Load config to get initial_cash + import json + with open(self.config_path, 'r') as f: + config = json.load(f) + + agent_config = config.get("agent_config", {}) + initial_cash = agent_config.get("initial_cash", 10000.0) + + # Create initial position record + from datetime import datetime + created_at = datetime.utcnow().isoformat() + "Z" + + cursor.execute(""" + INSERT INTO positions ( + job_id, date, model, action_id, action_type, + cash, portfolio_value, session_id, created_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + self.job_id, self.date, self.model_sig, 0, "init", + initial_cash, initial_cash, session_id, created_at + )) + + logger.info(f"Initialized starting position for {self.model_sig} with ${initial_cash}") + async def _store_reasoning_logs( self, cursor,