feat: store reasoning logs with sessions in model_day_executor

- Add _create_trading_session() method to create session records
- Add async _store_reasoning_logs() to store conversation with AI summaries
- Add async _update_session_summary() to generate overall session summary
- Modify execute() -> execute_async() with async workflow
- Add execute_sync() wrapper and keep execute() as sync entry point
- Update _write_results_to_db() to accept and use session_id parameter
- Modify positions INSERT to include session_id foreign key
- Remove old reasoning_logs code block (obsolete schema)
- Add comprehensive unit tests for all new functionality

All tests pass. Session-based reasoning storage now integrated.
This commit is contained in:
2025-11-02 18:03:41 -05:00
parent f83e4caf41
commit 555f0e7b66
2 changed files with 429 additions and 32 deletions

View File

@@ -82,26 +82,31 @@ class ModelDayExecutor:
logger.info(f"Initialized executor for {model_sig} on {date} (job: {job_id})") logger.info(f"Initialized executor for {model_sig} on {date} (job: {job_id})")
def execute(self) -> Dict[str, Any]: async def execute_async(self) -> Dict[str, Any]:
""" """
Execute trading session and persist results. Execute trading session and persist results (async version).
Returns: Returns:
Result dict with success status and metadata Result dict with success status and metadata
Process: Process:
1. Update job_detail status to 'running' 1. Update job_detail status to 'running'
2. Initialize and run trading agent 2. Create trading session
3. Write results to SQLite 3. Initialize and run trading agent
4. Update job_detail status to 'completed' or 'failed' 4. Store reasoning logs with summaries
5. Cleanup runtime config 5. Update session summary
6. Write results to SQLite
7. Update job_detail status to 'completed' or 'failed'
8. Cleanup runtime config
SQLite writes: SQLite writes:
- positions: Trading position record - trading_sessions: Session metadata and summary
- reasoning_logs: Conversation history with summaries
- positions: Trading position record (linked to session)
- holdings: Portfolio holdings breakdown - holdings: Portfolio holdings breakdown
- reasoning_logs: AI reasoning steps (if available)
- tool_usage: Tool usage statistics (if available) - tool_usage: Tool usage statistics (if available)
""" """
conn = None
try: try:
# Update status to running # Update status to running
self.job_manager.update_job_detail_status( self.job_manager.update_job_detail_status(
@@ -111,6 +116,12 @@ class ModelDayExecutor:
"running" "running"
) )
# Create trading session at start
conn = get_db_connection(self.db_path)
cursor = conn.cursor()
session_id = self._create_trading_session(cursor)
conn.commit()
# Set environment variable for agent to use isolated config # Set environment variable for agent to use isolated config
os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path
@@ -119,10 +130,21 @@ class ModelDayExecutor:
# Run trading session # Run trading session
logger.info(f"Running trading session for {self.model_sig} on {self.date}") logger.info(f"Running trading session for {self.model_sig} on {self.date}")
session_result = asyncio.run(agent.run_trading_session(self.date)) session_result = await agent.run_trading_session(self.date)
# Persist results to SQLite # Get conversation history
self._write_results_to_db(agent, session_result) conversation = agent.get_conversation_history()
# Store reasoning logs with summaries
await self._store_reasoning_logs(cursor, session_id, conversation, agent)
# Update session summary
await self._update_session_summary(cursor, session_id, conversation, agent)
# Store positions (pass session_id)
self._write_results_to_db(agent, session_id)
conn.commit()
# Update status to completed # Update status to completed
self.job_manager.update_job_detail_status( self.job_manager.update_job_detail_status(
@@ -139,6 +161,7 @@ class ModelDayExecutor:
"job_id": self.job_id, "job_id": self.job_id,
"date": self.date, "date": self.date,
"model": self.model_sig, "model": self.model_sig,
"session_id": session_id,
"session_result": session_result "session_result": session_result
} }
@@ -146,6 +169,9 @@ class ModelDayExecutor:
error_msg = f"Execution failed: {str(e)}" error_msg = f"Execution failed: {str(e)}"
logger.error(f"{self.model_sig} on {self.date}: {error_msg}", exc_info=True) logger.error(f"{self.model_sig} on {self.date}: {error_msg}", exc_info=True)
if conn:
conn.rollback()
# Update status to failed # Update status to failed
self.job_manager.update_job_detail_status( self.job_manager.update_job_detail_status(
self.job_id, self.job_id,
@@ -164,9 +190,25 @@ class ModelDayExecutor:
} }
finally: finally:
if conn:
conn.close()
# Always cleanup runtime config # Always cleanup runtime config
self.runtime_manager.cleanup_runtime_config(self.runtime_config_path) self.runtime_manager.cleanup_runtime_config(self.runtime_config_path)
def execute_sync(self) -> Dict[str, Any]:
"""Synchronous wrapper for execute_async()."""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(self.execute_async())
def execute(self) -> Dict[str, Any]:
"""Execute model-day simulation (sync entry point)."""
return self.execute_sync()
def _initialize_agent(self): def _initialize_agent(self):
""" """
Initialize trading agent with config. Initialize trading agent with config.
@@ -219,18 +261,120 @@ class ModelDayExecutor:
return agent return agent
def _write_results_to_db(self, agent, session_result: Dict[str, Any]) -> None: def _create_trading_session(self, cursor) -> int:
"""
Create trading session record.
Args:
cursor: Database cursor
Returns:
session_id (int)
"""
from datetime import datetime
started_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
INSERT INTO trading_sessions (
job_id, date, model, started_at
)
VALUES (?, ?, ?, ?)
""", (self.job_id, self.date, self.model_sig, started_at))
return cursor.lastrowid
async def _store_reasoning_logs(
self,
cursor,
session_id: int,
conversation: List[Dict[str, Any]],
agent: Any
) -> None:
"""
Store reasoning logs with AI-generated summaries.
Args:
cursor: Database cursor
session_id: Trading session ID
conversation: List of messages from agent
agent: BaseAgent instance for summary generation
"""
for idx, message in enumerate(conversation):
summary = None
# Generate summary for assistant messages
if message["role"] == "assistant":
summary = await agent.generate_summary(message["content"])
cursor.execute("""
INSERT INTO reasoning_logs (
session_id, message_index, role, content,
summary, tool_name, tool_input, timestamp
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
session_id,
idx,
message["role"],
message["content"],
summary,
message.get("tool_name"),
message.get("tool_input"),
message["timestamp"]
))
async def _update_session_summary(
self,
cursor,
session_id: int,
conversation: List[Dict[str, Any]],
agent: Any
) -> None:
"""
Update session with overall summary.
Args:
cursor: Database cursor
session_id: Trading session ID
conversation: List of messages from agent
agent: BaseAgent instance for summary generation
"""
from datetime import datetime
# Concatenate all assistant messages
assistant_messages = [
msg["content"]
for msg in conversation
if msg["role"] == "assistant"
]
combined_content = "\n\n".join(assistant_messages)
# Generate session summary (longer: 500 chars)
session_summary = await agent.generate_summary(combined_content, max_length=500)
completed_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
UPDATE trading_sessions
SET session_summary = ?,
completed_at = ?,
total_messages = ?
WHERE id = ?
""", (session_summary, completed_at, len(conversation), session_id))
def _write_results_to_db(self, agent, session_id: int) -> None:
""" """
Write execution results to SQLite. Write execution results to SQLite.
Args: Args:
agent: Trading agent instance agent: Trading agent instance
session_result: Result from run_trading_session() session_id: Trading session ID (for linking positions)
Writes to: Writes to:
- positions: Position record with action and P&L - positions: Position record with action and P&L (linked to session)
- holdings: Current portfolio holdings - holdings: Current portfolio holdings
- reasoning_logs: AI reasoning steps (if available)
- tool_usage: Tool usage stats (if available) - tool_usage: Tool usage stats (if available)
""" """
conn = get_db_connection(self.db_path) conn = get_db_connection(self.db_path)
@@ -282,13 +426,14 @@ class ModelDayExecutor:
cursor.execute(""" cursor.execute("""
INSERT INTO positions ( INSERT INTO positions (
job_id, date, model, action_id, action_type, symbol, job_id, date, model, action_id, action_type, symbol,
amount, price, cash, portfolio_value, daily_profit, daily_return_pct, created_at amount, price, cash, portfolio_value, daily_profit, daily_return_pct,
session_id, created_at
) )
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", ( """, (
self.job_id, self.date, self.model_sig, action_id, action_type, self.job_id, self.date, self.model_sig, action_id, action_type,
symbol, amount, price, cash, total_value, symbol, amount, price, cash, total_value,
daily_profit, daily_return_pct, created_at daily_profit, daily_return_pct, session_id, created_at
)) ))
position_id = cursor.lastrowid position_id = cursor.lastrowid
@@ -300,20 +445,6 @@ class ModelDayExecutor:
VALUES (?, ?, ?) VALUES (?, ?, ?)
""", (position_id, symbol, float(quantity))) """, (position_id, symbol, float(quantity)))
# Insert reasoning logs (if available)
if hasattr(agent, 'get_reasoning_steps'):
reasoning_steps = agent.get_reasoning_steps()
for step in reasoning_steps:
cursor.execute("""
INSERT INTO reasoning_logs (
job_id, date, model, step_number, timestamp, content
)
VALUES (?, ?, ?, ?, ?, ?)
""", (
self.job_id, self.date, self.model_sig,
step.get("step"), created_at, step.get("reasoning")
))
# Insert tool usage (if available) # Insert tool usage (if available)
if hasattr(agent, 'get_tool_usage') and hasattr(agent, 'get_tool_usage'): if hasattr(agent, 'get_tool_usage') and hasattr(agent, 'get_tool_usage'):
tool_usage = agent.get_tool_usage() tool_usage = agent.get_tool_usage()

View File

@@ -0,0 +1,266 @@
"""Tests for reasoning log storage in model_day_executor."""
import pytest
import sqlite3
from api.model_day_executor import ModelDayExecutor
from api.database import initialize_database, get_db_connection
@pytest.fixture
def test_db(tmp_path):
"""Create test database with job record."""
db_path = str(tmp_path / "test.db")
initialize_database(db_path)
# Create a job record to satisfy foreign key constraint
conn = get_db_connection(db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO jobs (job_id, config_path, status, date_range, models, created_at)
VALUES ('test-job', 'configs/default_config.json', 'running', '["2025-01-01"]', '["test-model"]', '2025-01-01T00:00:00Z')
""")
conn.commit()
conn.close()
return db_path
def test_create_trading_session(test_db):
"""Should create trading session record."""
executor = ModelDayExecutor(
job_id="test-job",
date="2025-01-01",
model_sig="test-model",
config_path="configs/default_config.json",
db_path=test_db
)
conn = get_db_connection(test_db)
cursor = conn.cursor()
session_id = executor._create_trading_session(cursor)
conn.commit()
# Verify session created
cursor.execute("SELECT * FROM trading_sessions WHERE id = ?", (session_id,))
session = cursor.fetchone()
assert session is not None
assert session['job_id'] == "test-job"
assert session['date'] == "2025-01-01"
assert session['model'] == "test-model"
assert session['started_at'] is not None
conn.close()
@pytest.mark.asyncio
async def test_store_reasoning_logs(test_db):
"""Should store conversation with summaries."""
from agent.mock_provider.mock_langchain_model import MockChatModel
from agent.base_agent.base_agent import BaseAgent
executor = ModelDayExecutor(
job_id="test-job",
date="2025-01-01",
model_sig="test-model",
config_path="configs/default_config.json",
db_path=test_db
)
# Create mock agent
agent = BaseAgent(
signature="test-model",
basemodel="mock",
stock_symbols=["AAPL"],
init_date="2025-01-01"
)
agent.model = MockChatModel(model="test", signature="test")
# Create conversation
conversation = [
{"role": "user", "content": "Analyze market", "timestamp": "2025-01-01T10:00:00Z"},
{"role": "assistant", "content": "Bought AAPL 10 shares based on strong earnings", "timestamp": "2025-01-01T10:05:00Z"}
]
conn = get_db_connection(test_db)
cursor = conn.cursor()
session_id = executor._create_trading_session(cursor)
await executor._store_reasoning_logs(cursor, session_id, conversation, agent)
conn.commit()
# Verify logs stored
cursor.execute("SELECT * FROM reasoning_logs WHERE session_id = ? ORDER BY message_index", (session_id,))
logs = cursor.fetchall()
assert len(logs) == 2
assert logs[0]['role'] == 'user'
assert logs[0]['content'] == 'Analyze market'
assert logs[0]['summary'] is None # No summary for user messages
assert logs[1]['role'] == 'assistant'
assert logs[1]['content'] == 'Bought AAPL 10 shares based on strong earnings'
assert logs[1]['summary'] is not None # Summary generated for assistant
conn.close()
@pytest.mark.asyncio
async def test_update_session_summary(test_db):
"""Should update session with overall summary."""
from agent.mock_provider.mock_langchain_model import MockChatModel
from agent.base_agent.base_agent import BaseAgent
executor = ModelDayExecutor(
job_id="test-job",
date="2025-01-01",
model_sig="test-model",
config_path="configs/default_config.json",
db_path=test_db
)
# Create mock agent
agent = BaseAgent(
signature="test-model",
basemodel="mock",
stock_symbols=["AAPL"],
init_date="2025-01-01"
)
agent.model = MockChatModel(model="test", signature="test")
# Create conversation
conversation = [
{"role": "user", "content": "Analyze market", "timestamp": "2025-01-01T10:00:00Z"},
{"role": "assistant", "content": "Bought AAPL 10 shares", "timestamp": "2025-01-01T10:05:00Z"},
{"role": "assistant", "content": "Sold MSFT 5 shares", "timestamp": "2025-01-01T10:10:00Z"}
]
conn = get_db_connection(test_db)
cursor = conn.cursor()
session_id = executor._create_trading_session(cursor)
await executor._update_session_summary(cursor, session_id, conversation, agent)
conn.commit()
# Verify session updated
cursor.execute("SELECT * FROM trading_sessions WHERE id = ?", (session_id,))
session = cursor.fetchone()
assert session['session_summary'] is not None
assert len(session['session_summary']) > 0
assert session['completed_at'] is not None
assert session['total_messages'] == 3
conn.close()
@pytest.mark.asyncio
async def test_store_reasoning_logs_with_tool_messages(test_db):
"""Should store tool messages with tool_name and tool_input."""
from agent.mock_provider.mock_langchain_model import MockChatModel
from agent.base_agent.base_agent import BaseAgent
executor = ModelDayExecutor(
job_id="test-job",
date="2025-01-01",
model_sig="test-model",
config_path="configs/default_config.json",
db_path=test_db
)
# Create mock agent
agent = BaseAgent(
signature="test-model",
basemodel="mock",
stock_symbols=["AAPL"],
init_date="2025-01-01"
)
agent.model = MockChatModel(model="test", signature="test")
# Create conversation with tool message
conversation = [
{"role": "user", "content": "Get price", "timestamp": "2025-01-01T10:00:00Z"},
{
"role": "tool",
"content": "AAPL: $150.00",
"tool_name": "get_price",
"tool_input": '{"symbol": "AAPL"}',
"timestamp": "2025-01-01T10:01:00Z"
},
{"role": "assistant", "content": "AAPL is $150", "timestamp": "2025-01-01T10:02:00Z"}
]
conn = get_db_connection(test_db)
cursor = conn.cursor()
session_id = executor._create_trading_session(cursor)
await executor._store_reasoning_logs(cursor, session_id, conversation, agent)
conn.commit()
# Verify tool message stored correctly
cursor.execute("SELECT * FROM reasoning_logs WHERE session_id = ? AND role = 'tool'", (session_id,))
tool_log = cursor.fetchone()
assert tool_log is not None
assert tool_log['tool_name'] == 'get_price'
assert tool_log['tool_input'] == '{"symbol": "AAPL"}'
assert tool_log['content'] == 'AAPL: $150.00'
assert tool_log['summary'] is None # No summary for tool messages
conn.close()
def test_write_results_includes_session_id(test_db):
"""Should include session_id when writing positions."""
from agent.mock_provider.mock_langchain_model import MockChatModel
from agent.base_agent.base_agent import BaseAgent
executor = ModelDayExecutor(
job_id="test-job",
date="2025-01-01",
model_sig="test-model",
config_path="configs/default_config.json",
db_path=test_db
)
# Create mock agent with positions
agent = BaseAgent(
signature="test-model",
basemodel="mock",
stock_symbols=["AAPL"],
init_date="2025-01-01"
)
agent.model = MockChatModel(model="test", signature="test")
# Mock positions data
agent.positions = {"AAPL": 10, "CASH": 8500.0}
agent.last_trade = {"action": "buy", "symbol": "AAPL", "amount": 10, "price": 150.0}
agent.current_prices = {"AAPL": 150.0}
# Add required methods
agent.get_positions = lambda: agent.positions
agent.get_last_trade = lambda: agent.last_trade
agent.get_current_prices = lambda: agent.current_prices
conn = get_db_connection(test_db)
cursor = conn.cursor()
# Create session
session_id = executor._create_trading_session(cursor)
conn.commit()
# Write results
executor._write_results_to_db(agent, session_id)
# Verify position has session_id
cursor.execute("SELECT * FROM positions WHERE job_id = ? AND model = ?",
("test-job", "test-model"))
position = cursor.fetchone()
assert position is not None
assert position['session_id'] == session_id
assert position['action_type'] == 'buy'
assert position['symbol'] == 'AAPL'
conn.close()