diff --git a/api/model_day_executor.py b/api/model_day_executor.py index 07bf888..dfecf47 100644 --- a/api/model_day_executor.py +++ b/api/model_day_executor.py @@ -4,13 +4,12 @@ Single model-day execution engine. This module provides: - Isolated execution of one model for one trading day - Runtime config management per execution -- Result persistence to SQLite (positions, holdings, reasoning) +- Result persistence to SQLite (trading_days, actions, holdings) - Automatic status updates via JobManager - Cleanup of temporary resources -NOTE: This module uses the OLD positions table schema. -It is being replaced by the new trading_days schema. -Some position-related code is commented out during migration. +NOTE: Uses new trading_days schema exclusively. +All data persistence is handled by BaseAgent. """ import logging @@ -95,22 +94,17 @@ class ModelDayExecutor: Process: 1. Update job_detail status to 'running' - 2. Create trading session + 2. Create trading_day record with P&L metrics 3. Initialize and run trading agent - 4. Store reasoning logs with summaries - 5. Update session summary - 6. Write results to SQLite - 7. Update job_detail status to 'completed' or 'failed' - 8. Cleanup runtime config + 4. Agent writes actions and updates trading_day + 5. Update job_detail status to 'completed' or 'failed' + 6. Cleanup runtime config SQLite writes: - - trading_sessions: Session metadata and summary - - reasoning_logs: Conversation history with summaries - - positions: Trading position record (linked to session) - - holdings: Portfolio holdings breakdown - - tool_usage: Tool usage statistics (if available) + - trading_days: Complete day record with P&L, reasoning, holdings + - actions: Trade execution ledger + - holdings: Ending positions snapshot """ - conn = None try: # Update status to running self.job_manager.update_job_detail_status( @@ -120,15 +114,6 @@ class ModelDayExecutor: "running" ) - # Create trading session at start - conn = get_db_connection(self.db_path) - cursor = conn.cursor() - session_id = self._create_trading_session(cursor) - conn.commit() - - # Initialize starting position if this is first day - self._initialize_starting_position(cursor, session_id) - conn.commit() # Set environment variable for agent to use isolated config os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path @@ -145,10 +130,10 @@ class ModelDayExecutor: signature=self.model_sig, today_date=self.date, # Current trading day job_id=self.job_id, - session_id=session_id, + session_id=0, # Deprecated, kept for compatibility trading_day_id=trading_day_id ) - logger.info(f"[DEBUG] ModelDayExecutor: Created ContextInjector with signature={self.model_sig}, date={self.date}, job_id={self.job_id}, session_id={session_id}") + logger.info(f"[DEBUG] ModelDayExecutor: Created ContextInjector with signature={self.model_sig}, date={self.date}, job_id={self.job_id}, trading_day_id={trading_day_id}") logger.info(f"[DEBUG] ModelDayExecutor: Calling await agent.set_context()") await agent.set_context(context_injector) logger.info(f"[DEBUG] ModelDayExecutor: set_context() completed") @@ -157,22 +142,11 @@ class ModelDayExecutor: logger.info(f"Running trading session for {self.model_sig} on {self.date}") session_result = await agent.run_trading_session(self.date) - # Get conversation history - conversation = agent.get_conversation_history() - - # Store reasoning logs with summaries - await self._store_reasoning_logs(cursor, session_id, conversation, agent) - - # Update session summary - await self._update_session_summary(cursor, session_id, conversation, agent) - - # Commit and close connection - conn.commit() - conn.close() - conn = None # Mark as closed - - # Note: Positions are written by trade tools (buy/sell) or no_trade_record - # No need to write positions here - that was creating duplicate/corrupt records + # Note: All data persistence is handled by BaseAgent: + # - trading_days record created with P&L metrics + # - actions recorded during trading + # - holdings snapshot saved at end of day + # - reasoning stored in trading_days.reasoning_full # Update status to completed self.job_manager.update_job_detail_status( @@ -189,7 +163,6 @@ class ModelDayExecutor: "job_id": self.job_id, "date": self.date, "model": self.model_sig, - "session_id": session_id, "session_result": session_result } @@ -197,9 +170,6 @@ class ModelDayExecutor: error_msg = f"Execution failed: {str(e)}" logger.error(f"{self.model_sig} on {self.date}: {error_msg}", exc_info=True) - if conn: - conn.rollback() - # Update status to failed self.job_manager.update_job_detail_status( self.job_id, @@ -218,8 +188,6 @@ class ModelDayExecutor: } finally: - if conn: - conn.close() # Always cleanup runtime config self.runtime_manager.cleanup_runtime_config(self.runtime_config_path) @@ -292,150 +260,6 @@ class ModelDayExecutor: return agent - def _create_trading_session(self, cursor) -> int: - """ - Create trading session record. - Args: - cursor: Database cursor - Returns: - session_id (int) - """ - from datetime import datetime - started_at = datetime.utcnow().isoformat() + "Z" - - cursor.execute(""" - INSERT INTO trading_sessions ( - job_id, date, model, started_at - ) - VALUES (?, ?, ?, ?) - """, (self.job_id, self.date, self.model_sig, started_at)) - - return cursor.lastrowid - - def _initialize_starting_position(self, cursor, session_id: int) -> None: - """ - Initialize starting position if no prior positions exist for this job+model. - - Creates action_id=0 position with initial_cash and zero stock holdings. - - Args: - cursor: Database cursor - session_id: Trading session ID - """ - # Check if any positions exist for this job+model - cursor.execute(""" - SELECT COUNT(*) FROM positions - WHERE job_id = ? AND model = ? - """, (self.job_id, self.model_sig)) - - if cursor.fetchone()[0] > 0: - # Positions already exist, no initialization needed - return - - # Load config to get initial_cash - import json - with open(self.config_path, 'r') as f: - config = json.load(f) - - agent_config = config.get("agent_config", {}) - initial_cash = agent_config.get("initial_cash", 10000.0) - - # Create initial position record - from datetime import datetime - created_at = datetime.utcnow().isoformat() + "Z" - - cursor.execute(""" - INSERT INTO positions ( - job_id, date, model, action_id, action_type, - cash, portfolio_value, session_id, created_at - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - self.job_id, self.date, self.model_sig, 0, "no_trade", - initial_cash, initial_cash, session_id, created_at - )) - - logger.info(f"Initialized starting position for {self.model_sig} with ${initial_cash}") - - async def _store_reasoning_logs( - self, - cursor, - session_id: int, - conversation: List[Dict[str, Any]], - agent: Any - ) -> None: - """ - Store reasoning logs with AI-generated summaries. - - Args: - cursor: Database cursor - session_id: Trading session ID - conversation: List of messages from agent - agent: BaseAgent instance for summary generation - """ - for idx, message in enumerate(conversation): - summary = None - - # Generate summary for assistant messages - if message["role"] == "assistant": - summary = await agent.generate_summary(message["content"]) - - cursor.execute(""" - INSERT INTO reasoning_logs ( - session_id, message_index, role, content, - summary, tool_name, tool_input, timestamp - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, ( - session_id, - idx, - message["role"], - message["content"], - summary, - message.get("tool_name"), - message.get("tool_input"), - message["timestamp"] - )) - - async def _update_session_summary( - self, - cursor, - session_id: int, - conversation: List[Dict[str, Any]], - agent: Any - ) -> None: - """ - Update session with overall summary. - - Args: - cursor: Database cursor - session_id: Trading session ID - conversation: List of messages from agent - agent: BaseAgent instance for summary generation - """ - from datetime import datetime - - # Concatenate all assistant messages - assistant_messages = [ - msg["content"] - for msg in conversation - if msg["role"] == "assistant" - ] - - combined_content = "\n\n".join(assistant_messages) - - # Generate session summary (longer: 500 chars) - session_summary = await agent.generate_summary(combined_content, max_length=500) - - completed_at = datetime.utcnow().isoformat() + "Z" - - cursor.execute(""" - UPDATE trading_sessions - SET session_summary = ?, - completed_at = ?, - total_messages = ? - WHERE id = ? - """, (session_summary, completed_at, len(conversation), session_id)) diff --git a/tests/integration/test_model_day_executor_new_schema.py b/tests/integration/test_model_day_executor_new_schema.py new file mode 100644 index 0000000..e97de6f --- /dev/null +++ b/tests/integration/test_model_day_executor_new_schema.py @@ -0,0 +1,149 @@ +"""Test model_day_executor uses new schema exclusively.""" + +import pytest +from api.model_day_executor import ModelDayExecutor +from api.database import Database + + +@pytest.mark.asyncio +async def test_executor_writes_only_to_new_schema(tmp_path, monkeypatch): + """Verify executor writes to trading_days, not old tables.""" + + # Create test database + db_path = str(tmp_path / "test.db") + db = Database(db_path) + + # Create jobs and job_details tables (required by ModelDayExecutor) + db.connection.execute(""" + CREATE TABLE IF NOT EXISTS jobs ( + job_id TEXT PRIMARY KEY, + config_path TEXT NOT NULL, + status TEXT NOT NULL, + date_range TEXT NOT NULL, + models TEXT NOT NULL, + created_at TEXT NOT NULL, + started_at TEXT, + updated_at TEXT, + completed_at TEXT + ) + """) + + db.connection.execute(""" + CREATE TABLE IF NOT EXISTS job_details ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id TEXT NOT NULL, + date TEXT NOT NULL, + model TEXT NOT NULL, + status TEXT NOT NULL, + created_at TEXT, + updated_at TEXT, + started_at TEXT, + completed_at TEXT, + duration_seconds REAL, + error TEXT, + FOREIGN KEY (job_id) REFERENCES jobs(job_id) + ) + """) + + # Create job records (prerequisite) + db.connection.execute(""" + INSERT INTO jobs (job_id, status, created_at, config_path, date_range, models) + VALUES ('test-job-123', 'running', '2025-01-15T10:00:00Z', 'test_config.json', + '{"start": "2025-01-15", "end": "2025-01-15"}', '["test-model"]') + """) + + db.connection.execute(""" + INSERT INTO job_details (job_id, date, model, status) + VALUES ('test-job-123', '2025-01-15', 'test-model', 'pending') + """) + + db.connection.commit() + + # Create test config + config_path = str(tmp_path / "config.json") + import json + with open(config_path, 'w') as f: + json.dump({ + "models": [{ + "signature": "test-model", + "basemodel": "gpt-3.5-turbo", + "enabled": True + }], + "agent_config": { + "stock_symbols": ["AAPL"], + "initial_cash": 10000.0, + "max_steps": 10 + }, + "log_config": {"log_path": str(tmp_path / "logs")} + }, f) + + # Mock agent initialization and execution + from unittest.mock import AsyncMock, MagicMock, patch + mock_agent = MagicMock() + + # Mock agent to create trading_day record when run + async def mock_run_trading_session(date): + # Simulate BaseAgent creating trading_day record + trading_day_id = db.create_trading_day( + job_id='test-job-123', + model='test-model', + date='2025-01-15', + starting_cash=10000.0, + starting_portfolio_value=10000.0, + daily_profit=0.0, + daily_return_pct=0.0, + ending_cash=10000.0, + ending_portfolio_value=10000.0, + days_since_last_trading=0 + ) + db.connection.commit() + return {"success": True} + + mock_agent.run_trading_session = mock_run_trading_session + mock_agent.get_conversation_history = MagicMock(return_value=[]) + mock_agent.initialize = AsyncMock() + mock_agent.set_context = AsyncMock() + + async def mock_init_agent(self): + return mock_agent + + monkeypatch.setattr('api.model_day_executor.ModelDayExecutor._initialize_agent', + mock_init_agent) + + # Mock get_config_value to return None for TRADING_DAY_ID (not yet implemented) + monkeypatch.setattr('tools.general_tools.get_config_value', + lambda key: None if key == 'TRADING_DAY_ID' else 'test-value') + + # Execute + executor = ModelDayExecutor( + job_id='test-job-123', + date='2025-01-15', + model_sig='test-model', + config_path=config_path, + db_path=db_path + ) + + result = await executor.execute_async() + + # Verify: trading_days record exists + cursor = db.connection.execute(""" + SELECT COUNT(*) FROM trading_days + WHERE job_id = ? AND date = ? AND model = ? + """, ('test-job-123', '2025-01-15', 'test-model')) + + count = cursor.fetchone()[0] + assert count == 1, "Should have exactly one trading_days record" + + # Verify: NO trading_sessions records + cursor = db.connection.execute(""" + SELECT name FROM sqlite_master + WHERE type='table' AND name='trading_sessions' + """) + assert cursor.fetchone() is None, "trading_sessions table should not exist" + + # Verify: NO reasoning_logs records + cursor = db.connection.execute(""" + SELECT name FROM sqlite_master + WHERE type='table' AND name='reasoning_logs' + """) + assert cursor.fetchone() is None, "reasoning_logs table should not exist"