From 555f0e7b66b3571ee7a7626a435e8c367f0e6272 Mon Sep 17 00:00:00 2001 From: Bill Date: Sun, 2 Nov 2025 18:03:41 -0500 Subject: [PATCH] feat: store reasoning logs with sessions in model_day_executor - Add _create_trading_session() method to create session records - Add async _store_reasoning_logs() to store conversation with AI summaries - Add async _update_session_summary() to generate overall session summary - Modify execute() -> execute_async() with async workflow - Add execute_sync() wrapper and keep execute() as sync entry point - Update _write_results_to_db() to accept and use session_id parameter - Modify positions INSERT to include session_id foreign key - Remove old reasoning_logs code block (obsolete schema) - Add comprehensive unit tests for all new functionality All tests pass. Session-based reasoning storage now integrated. --- api/model_day_executor.py | 195 ++++++++++--- .../unit/test_model_day_executor_reasoning.py | 266 ++++++++++++++++++ 2 files changed, 429 insertions(+), 32 deletions(-) create mode 100644 tests/unit/test_model_day_executor_reasoning.py diff --git a/api/model_day_executor.py b/api/model_day_executor.py index 1b2d5e6..9e9e503 100644 --- a/api/model_day_executor.py +++ b/api/model_day_executor.py @@ -82,26 +82,31 @@ class ModelDayExecutor: logger.info(f"Initialized executor for {model_sig} on {date} (job: {job_id})") - def execute(self) -> Dict[str, Any]: + async def execute_async(self) -> Dict[str, Any]: """ - Execute trading session and persist results. + Execute trading session and persist results (async version). Returns: Result dict with success status and metadata Process: 1. Update job_detail status to 'running' - 2. Initialize and run trading agent - 3. Write results to SQLite - 4. Update job_detail status to 'completed' or 'failed' - 5. Cleanup runtime config + 2. Create trading session + 3. Initialize and run trading agent + 4. Store reasoning logs with summaries + 5. Update session summary + 6. Write results to SQLite + 7. Update job_detail status to 'completed' or 'failed' + 8. Cleanup runtime config SQLite writes: - - positions: Trading position record + - trading_sessions: Session metadata and summary + - reasoning_logs: Conversation history with summaries + - positions: Trading position record (linked to session) - holdings: Portfolio holdings breakdown - - reasoning_logs: AI reasoning steps (if available) - tool_usage: Tool usage statistics (if available) """ + conn = None try: # Update status to running self.job_manager.update_job_detail_status( @@ -111,6 +116,12 @@ class ModelDayExecutor: "running" ) + # Create trading session at start + conn = get_db_connection(self.db_path) + cursor = conn.cursor() + session_id = self._create_trading_session(cursor) + conn.commit() + # Set environment variable for agent to use isolated config os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path @@ -119,10 +130,21 @@ class ModelDayExecutor: # Run trading session logger.info(f"Running trading session for {self.model_sig} on {self.date}") - session_result = asyncio.run(agent.run_trading_session(self.date)) + session_result = await agent.run_trading_session(self.date) - # Persist results to SQLite - self._write_results_to_db(agent, session_result) + # Get conversation history + conversation = agent.get_conversation_history() + + # Store reasoning logs with summaries + await self._store_reasoning_logs(cursor, session_id, conversation, agent) + + # Update session summary + await self._update_session_summary(cursor, session_id, conversation, agent) + + # Store positions (pass session_id) + self._write_results_to_db(agent, session_id) + + conn.commit() # Update status to completed self.job_manager.update_job_detail_status( @@ -139,6 +161,7 @@ class ModelDayExecutor: "job_id": self.job_id, "date": self.date, "model": self.model_sig, + "session_id": session_id, "session_result": session_result } @@ -146,6 +169,9 @@ class ModelDayExecutor: error_msg = f"Execution failed: {str(e)}" logger.error(f"{self.model_sig} on {self.date}: {error_msg}", exc_info=True) + if conn: + conn.rollback() + # Update status to failed self.job_manager.update_job_detail_status( self.job_id, @@ -164,9 +190,25 @@ class ModelDayExecutor: } finally: + if conn: + conn.close() # Always cleanup runtime config self.runtime_manager.cleanup_runtime_config(self.runtime_config_path) + def execute_sync(self) -> Dict[str, Any]: + """Synchronous wrapper for execute_async().""" + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + return loop.run_until_complete(self.execute_async()) + + def execute(self) -> Dict[str, Any]: + """Execute model-day simulation (sync entry point).""" + return self.execute_sync() + def _initialize_agent(self): """ Initialize trading agent with config. @@ -219,18 +261,120 @@ class ModelDayExecutor: return agent - def _write_results_to_db(self, agent, session_result: Dict[str, Any]) -> None: + def _create_trading_session(self, cursor) -> int: + """ + Create trading session record. + + Args: + cursor: Database cursor + + Returns: + session_id (int) + """ + from datetime import datetime + + started_at = datetime.utcnow().isoformat() + "Z" + + cursor.execute(""" + INSERT INTO trading_sessions ( + job_id, date, model, started_at + ) + VALUES (?, ?, ?, ?) + """, (self.job_id, self.date, self.model_sig, started_at)) + + return cursor.lastrowid + + async def _store_reasoning_logs( + self, + cursor, + session_id: int, + conversation: List[Dict[str, Any]], + agent: Any + ) -> None: + """ + Store reasoning logs with AI-generated summaries. + + Args: + cursor: Database cursor + session_id: Trading session ID + conversation: List of messages from agent + agent: BaseAgent instance for summary generation + """ + for idx, message in enumerate(conversation): + summary = None + + # Generate summary for assistant messages + if message["role"] == "assistant": + summary = await agent.generate_summary(message["content"]) + + cursor.execute(""" + INSERT INTO reasoning_logs ( + session_id, message_index, role, content, + summary, tool_name, tool_input, timestamp + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + session_id, + idx, + message["role"], + message["content"], + summary, + message.get("tool_name"), + message.get("tool_input"), + message["timestamp"] + )) + + async def _update_session_summary( + self, + cursor, + session_id: int, + conversation: List[Dict[str, Any]], + agent: Any + ) -> None: + """ + Update session with overall summary. + + Args: + cursor: Database cursor + session_id: Trading session ID + conversation: List of messages from agent + agent: BaseAgent instance for summary generation + """ + from datetime import datetime + + # Concatenate all assistant messages + assistant_messages = [ + msg["content"] + for msg in conversation + if msg["role"] == "assistant" + ] + + combined_content = "\n\n".join(assistant_messages) + + # Generate session summary (longer: 500 chars) + session_summary = await agent.generate_summary(combined_content, max_length=500) + + completed_at = datetime.utcnow().isoformat() + "Z" + + cursor.execute(""" + UPDATE trading_sessions + SET session_summary = ?, + completed_at = ?, + total_messages = ? + WHERE id = ? + """, (session_summary, completed_at, len(conversation), session_id)) + + def _write_results_to_db(self, agent, session_id: int) -> None: """ Write execution results to SQLite. Args: agent: Trading agent instance - session_result: Result from run_trading_session() + session_id: Trading session ID (for linking positions) Writes to: - - positions: Position record with action and P&L + - positions: Position record with action and P&L (linked to session) - holdings: Current portfolio holdings - - reasoning_logs: AI reasoning steps (if available) - tool_usage: Tool usage stats (if available) """ conn = get_db_connection(self.db_path) @@ -282,13 +426,14 @@ class ModelDayExecutor: cursor.execute(""" INSERT INTO positions ( job_id, date, model, action_id, action_type, symbol, - amount, price, cash, portfolio_value, daily_profit, daily_return_pct, created_at + amount, price, cash, portfolio_value, daily_profit, daily_return_pct, + session_id, created_at ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( self.job_id, self.date, self.model_sig, action_id, action_type, symbol, amount, price, cash, total_value, - daily_profit, daily_return_pct, created_at + daily_profit, daily_return_pct, session_id, created_at )) position_id = cursor.lastrowid @@ -300,20 +445,6 @@ class ModelDayExecutor: VALUES (?, ?, ?) """, (position_id, symbol, float(quantity))) - # Insert reasoning logs (if available) - if hasattr(agent, 'get_reasoning_steps'): - reasoning_steps = agent.get_reasoning_steps() - for step in reasoning_steps: - cursor.execute(""" - INSERT INTO reasoning_logs ( - job_id, date, model, step_number, timestamp, content - ) - VALUES (?, ?, ?, ?, ?, ?) - """, ( - self.job_id, self.date, self.model_sig, - step.get("step"), created_at, step.get("reasoning") - )) - # Insert tool usage (if available) if hasattr(agent, 'get_tool_usage') and hasattr(agent, 'get_tool_usage'): tool_usage = agent.get_tool_usage() diff --git a/tests/unit/test_model_day_executor_reasoning.py b/tests/unit/test_model_day_executor_reasoning.py new file mode 100644 index 0000000..d30f687 --- /dev/null +++ b/tests/unit/test_model_day_executor_reasoning.py @@ -0,0 +1,266 @@ +"""Tests for reasoning log storage in model_day_executor.""" + +import pytest +import sqlite3 +from api.model_day_executor import ModelDayExecutor +from api.database import initialize_database, get_db_connection + + +@pytest.fixture +def test_db(tmp_path): + """Create test database with job record.""" + db_path = str(tmp_path / "test.db") + initialize_database(db_path) + + # Create a job record to satisfy foreign key constraint + conn = get_db_connection(db_path) + cursor = conn.cursor() + cursor.execute(""" + INSERT INTO jobs (job_id, config_path, status, date_range, models, created_at) + VALUES ('test-job', 'configs/default_config.json', 'running', '["2025-01-01"]', '["test-model"]', '2025-01-01T00:00:00Z') + """) + conn.commit() + conn.close() + + return db_path + + +def test_create_trading_session(test_db): + """Should create trading session record.""" + executor = ModelDayExecutor( + job_id="test-job", + date="2025-01-01", + model_sig="test-model", + config_path="configs/default_config.json", + db_path=test_db + ) + + conn = get_db_connection(test_db) + cursor = conn.cursor() + + session_id = executor._create_trading_session(cursor) + conn.commit() + + # Verify session created + cursor.execute("SELECT * FROM trading_sessions WHERE id = ?", (session_id,)) + session = cursor.fetchone() + + assert session is not None + assert session['job_id'] == "test-job" + assert session['date'] == "2025-01-01" + assert session['model'] == "test-model" + assert session['started_at'] is not None + + conn.close() + + +@pytest.mark.asyncio +async def test_store_reasoning_logs(test_db): + """Should store conversation with summaries.""" + from agent.mock_provider.mock_langchain_model import MockChatModel + from agent.base_agent.base_agent import BaseAgent + + executor = ModelDayExecutor( + job_id="test-job", + date="2025-01-01", + model_sig="test-model", + config_path="configs/default_config.json", + db_path=test_db + ) + + # Create mock agent + agent = BaseAgent( + signature="test-model", + basemodel="mock", + stock_symbols=["AAPL"], + init_date="2025-01-01" + ) + agent.model = MockChatModel(model="test", signature="test") + + # Create conversation + conversation = [ + {"role": "user", "content": "Analyze market", "timestamp": "2025-01-01T10:00:00Z"}, + {"role": "assistant", "content": "Bought AAPL 10 shares based on strong earnings", "timestamp": "2025-01-01T10:05:00Z"} + ] + + conn = get_db_connection(test_db) + cursor = conn.cursor() + session_id = executor._create_trading_session(cursor) + + await executor._store_reasoning_logs(cursor, session_id, conversation, agent) + conn.commit() + + # Verify logs stored + cursor.execute("SELECT * FROM reasoning_logs WHERE session_id = ? ORDER BY message_index", (session_id,)) + logs = cursor.fetchall() + + assert len(logs) == 2 + assert logs[0]['role'] == 'user' + assert logs[0]['content'] == 'Analyze market' + assert logs[0]['summary'] is None # No summary for user messages + + assert logs[1]['role'] == 'assistant' + assert logs[1]['content'] == 'Bought AAPL 10 shares based on strong earnings' + assert logs[1]['summary'] is not None # Summary generated for assistant + + conn.close() + + +@pytest.mark.asyncio +async def test_update_session_summary(test_db): + """Should update session with overall summary.""" + from agent.mock_provider.mock_langchain_model import MockChatModel + from agent.base_agent.base_agent import BaseAgent + + executor = ModelDayExecutor( + job_id="test-job", + date="2025-01-01", + model_sig="test-model", + config_path="configs/default_config.json", + db_path=test_db + ) + + # Create mock agent + agent = BaseAgent( + signature="test-model", + basemodel="mock", + stock_symbols=["AAPL"], + init_date="2025-01-01" + ) + agent.model = MockChatModel(model="test", signature="test") + + # Create conversation + conversation = [ + {"role": "user", "content": "Analyze market", "timestamp": "2025-01-01T10:00:00Z"}, + {"role": "assistant", "content": "Bought AAPL 10 shares", "timestamp": "2025-01-01T10:05:00Z"}, + {"role": "assistant", "content": "Sold MSFT 5 shares", "timestamp": "2025-01-01T10:10:00Z"} + ] + + conn = get_db_connection(test_db) + cursor = conn.cursor() + session_id = executor._create_trading_session(cursor) + + await executor._update_session_summary(cursor, session_id, conversation, agent) + conn.commit() + + # Verify session updated + cursor.execute("SELECT * FROM trading_sessions WHERE id = ?", (session_id,)) + session = cursor.fetchone() + + assert session['session_summary'] is not None + assert len(session['session_summary']) > 0 + assert session['completed_at'] is not None + assert session['total_messages'] == 3 + + conn.close() + + +@pytest.mark.asyncio +async def test_store_reasoning_logs_with_tool_messages(test_db): + """Should store tool messages with tool_name and tool_input.""" + from agent.mock_provider.mock_langchain_model import MockChatModel + from agent.base_agent.base_agent import BaseAgent + + executor = ModelDayExecutor( + job_id="test-job", + date="2025-01-01", + model_sig="test-model", + config_path="configs/default_config.json", + db_path=test_db + ) + + # Create mock agent + agent = BaseAgent( + signature="test-model", + basemodel="mock", + stock_symbols=["AAPL"], + init_date="2025-01-01" + ) + agent.model = MockChatModel(model="test", signature="test") + + # Create conversation with tool message + conversation = [ + {"role": "user", "content": "Get price", "timestamp": "2025-01-01T10:00:00Z"}, + { + "role": "tool", + "content": "AAPL: $150.00", + "tool_name": "get_price", + "tool_input": '{"symbol": "AAPL"}', + "timestamp": "2025-01-01T10:01:00Z" + }, + {"role": "assistant", "content": "AAPL is $150", "timestamp": "2025-01-01T10:02:00Z"} + ] + + conn = get_db_connection(test_db) + cursor = conn.cursor() + session_id = executor._create_trading_session(cursor) + + await executor._store_reasoning_logs(cursor, session_id, conversation, agent) + conn.commit() + + # Verify tool message stored correctly + cursor.execute("SELECT * FROM reasoning_logs WHERE session_id = ? AND role = 'tool'", (session_id,)) + tool_log = cursor.fetchone() + + assert tool_log is not None + assert tool_log['tool_name'] == 'get_price' + assert tool_log['tool_input'] == '{"symbol": "AAPL"}' + assert tool_log['content'] == 'AAPL: $150.00' + assert tool_log['summary'] is None # No summary for tool messages + + conn.close() + + +def test_write_results_includes_session_id(test_db): + """Should include session_id when writing positions.""" + from agent.mock_provider.mock_langchain_model import MockChatModel + from agent.base_agent.base_agent import BaseAgent + + executor = ModelDayExecutor( + job_id="test-job", + date="2025-01-01", + model_sig="test-model", + config_path="configs/default_config.json", + db_path=test_db + ) + + # Create mock agent with positions + agent = BaseAgent( + signature="test-model", + basemodel="mock", + stock_symbols=["AAPL"], + init_date="2025-01-01" + ) + agent.model = MockChatModel(model="test", signature="test") + + # Mock positions data + agent.positions = {"AAPL": 10, "CASH": 8500.0} + agent.last_trade = {"action": "buy", "symbol": "AAPL", "amount": 10, "price": 150.0} + agent.current_prices = {"AAPL": 150.0} + + # Add required methods + agent.get_positions = lambda: agent.positions + agent.get_last_trade = lambda: agent.last_trade + agent.get_current_prices = lambda: agent.current_prices + + conn = get_db_connection(test_db) + cursor = conn.cursor() + + # Create session + session_id = executor._create_trading_session(cursor) + conn.commit() + + # Write results + executor._write_results_to_db(agent, session_id) + + # Verify position has session_id + cursor.execute("SELECT * FROM positions WHERE job_id = ? AND model = ?", + ("test-job", "test-model")) + position = cursor.fetchone() + + assert position is not None + assert position['session_id'] == session_id + assert position['action_type'] == 'buy' + assert position['symbol'] == 'AAPL' + + conn.close()