refactor: remove old schema writes from model_day_executor

Removed methods that wrote to deprecated tables:
- _create_trading_session (wrote to trading_sessions)
- _initialize_starting_position (wrote to old positions table)
- _store_reasoning_logs (wrote to reasoning_logs)
- _update_session_summary (updated trading_sessions)

All data persistence now handled by BaseAgent using new schema:
- trading_days: Day-centric records with P&L metrics
- actions: Trade execution ledger
- holdings: End-of-day position snapshots

Changes:
- Removed session_id from execute flow (deprecated)
- Updated docstrings to reflect new schema
- Simplified execute_async() - no more duplicate writes
- Added integration test verifying only new schema tables used
This commit is contained in:
2025-11-04 09:38:01 -05:00
parent e7fe0ab51b
commit 94381e7f25
2 changed files with 166 additions and 193 deletions

View File

@@ -4,13 +4,12 @@ Single model-day execution engine.
This module provides:
- Isolated execution of one model for one trading day
- Runtime config management per execution
- Result persistence to SQLite (positions, holdings, reasoning)
- Result persistence to SQLite (trading_days, actions, holdings)
- Automatic status updates via JobManager
- Cleanup of temporary resources
NOTE: This module uses the OLD positions table schema.
It is being replaced by the new trading_days schema.
Some position-related code is commented out during migration.
NOTE: Uses new trading_days schema exclusively.
All data persistence is handled by BaseAgent.
"""
import logging
@@ -95,22 +94,17 @@ class ModelDayExecutor:
Process:
1. Update job_detail status to 'running'
2. Create trading session
2. Create trading_day record with P&L metrics
3. Initialize and run trading agent
4. Store reasoning logs with summaries
5. Update session summary
6. Write results to SQLite
7. Update job_detail status to 'completed' or 'failed'
8. Cleanup runtime config
4. Agent writes actions and updates trading_day
5. Update job_detail status to 'completed' or 'failed'
6. Cleanup runtime config
SQLite writes:
- trading_sessions: Session metadata and summary
- reasoning_logs: Conversation history with summaries
- positions: Trading position record (linked to session)
- holdings: Portfolio holdings breakdown
- tool_usage: Tool usage statistics (if available)
- trading_days: Complete day record with P&L, reasoning, holdings
- actions: Trade execution ledger
- holdings: Ending positions snapshot
"""
conn = None
try:
# Update status to running
self.job_manager.update_job_detail_status(
@@ -120,15 +114,6 @@ class ModelDayExecutor:
"running"
)
# Create trading session at start
conn = get_db_connection(self.db_path)
cursor = conn.cursor()
session_id = self._create_trading_session(cursor)
conn.commit()
# Initialize starting position if this is first day
self._initialize_starting_position(cursor, session_id)
conn.commit()
# Set environment variable for agent to use isolated config
os.environ["RUNTIME_ENV_PATH"] = self.runtime_config_path
@@ -145,10 +130,10 @@ class ModelDayExecutor:
signature=self.model_sig,
today_date=self.date, # Current trading day
job_id=self.job_id,
session_id=session_id,
session_id=0, # Deprecated, kept for compatibility
trading_day_id=trading_day_id
)
logger.info(f"[DEBUG] ModelDayExecutor: Created ContextInjector with signature={self.model_sig}, date={self.date}, job_id={self.job_id}, session_id={session_id}")
logger.info(f"[DEBUG] ModelDayExecutor: Created ContextInjector with signature={self.model_sig}, date={self.date}, job_id={self.job_id}, trading_day_id={trading_day_id}")
logger.info(f"[DEBUG] ModelDayExecutor: Calling await agent.set_context()")
await agent.set_context(context_injector)
logger.info(f"[DEBUG] ModelDayExecutor: set_context() completed")
@@ -157,22 +142,11 @@ class ModelDayExecutor:
logger.info(f"Running trading session for {self.model_sig} on {self.date}")
session_result = await agent.run_trading_session(self.date)
# Get conversation history
conversation = agent.get_conversation_history()
# Store reasoning logs with summaries
await self._store_reasoning_logs(cursor, session_id, conversation, agent)
# Update session summary
await self._update_session_summary(cursor, session_id, conversation, agent)
# Commit and close connection
conn.commit()
conn.close()
conn = None # Mark as closed
# Note: Positions are written by trade tools (buy/sell) or no_trade_record
# No need to write positions here - that was creating duplicate/corrupt records
# Note: All data persistence is handled by BaseAgent:
# - trading_days record created with P&L metrics
# - actions recorded during trading
# - holdings snapshot saved at end of day
# - reasoning stored in trading_days.reasoning_full
# Update status to completed
self.job_manager.update_job_detail_status(
@@ -189,7 +163,6 @@ class ModelDayExecutor:
"job_id": self.job_id,
"date": self.date,
"model": self.model_sig,
"session_id": session_id,
"session_result": session_result
}
@@ -197,9 +170,6 @@ class ModelDayExecutor:
error_msg = f"Execution failed: {str(e)}"
logger.error(f"{self.model_sig} on {self.date}: {error_msg}", exc_info=True)
if conn:
conn.rollback()
# Update status to failed
self.job_manager.update_job_detail_status(
self.job_id,
@@ -218,8 +188,6 @@ class ModelDayExecutor:
}
finally:
if conn:
conn.close()
# Always cleanup runtime config
self.runtime_manager.cleanup_runtime_config(self.runtime_config_path)
@@ -292,150 +260,6 @@ class ModelDayExecutor:
return agent
def _create_trading_session(self, cursor) -> int:
"""
Create trading session record.
Args:
cursor: Database cursor
Returns:
session_id (int)
"""
from datetime import datetime
started_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
INSERT INTO trading_sessions (
job_id, date, model, started_at
)
VALUES (?, ?, ?, ?)
""", (self.job_id, self.date, self.model_sig, started_at))
return cursor.lastrowid
def _initialize_starting_position(self, cursor, session_id: int) -> None:
"""
Initialize starting position if no prior positions exist for this job+model.
Creates action_id=0 position with initial_cash and zero stock holdings.
Args:
cursor: Database cursor
session_id: Trading session ID
"""
# Check if any positions exist for this job+model
cursor.execute("""
SELECT COUNT(*) FROM positions
WHERE job_id = ? AND model = ?
""", (self.job_id, self.model_sig))
if cursor.fetchone()[0] > 0:
# Positions already exist, no initialization needed
return
# Load config to get initial_cash
import json
with open(self.config_path, 'r') as f:
config = json.load(f)
agent_config = config.get("agent_config", {})
initial_cash = agent_config.get("initial_cash", 10000.0)
# Create initial position record
from datetime import datetime
created_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
INSERT INTO positions (
job_id, date, model, action_id, action_type,
cash, portfolio_value, session_id, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
self.job_id, self.date, self.model_sig, 0, "no_trade",
initial_cash, initial_cash, session_id, created_at
))
logger.info(f"Initialized starting position for {self.model_sig} with ${initial_cash}")
async def _store_reasoning_logs(
self,
cursor,
session_id: int,
conversation: List[Dict[str, Any]],
agent: Any
) -> None:
"""
Store reasoning logs with AI-generated summaries.
Args:
cursor: Database cursor
session_id: Trading session ID
conversation: List of messages from agent
agent: BaseAgent instance for summary generation
"""
for idx, message in enumerate(conversation):
summary = None
# Generate summary for assistant messages
if message["role"] == "assistant":
summary = await agent.generate_summary(message["content"])
cursor.execute("""
INSERT INTO reasoning_logs (
session_id, message_index, role, content,
summary, tool_name, tool_input, timestamp
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
session_id,
idx,
message["role"],
message["content"],
summary,
message.get("tool_name"),
message.get("tool_input"),
message["timestamp"]
))
async def _update_session_summary(
self,
cursor,
session_id: int,
conversation: List[Dict[str, Any]],
agent: Any
) -> None:
"""
Update session with overall summary.
Args:
cursor: Database cursor
session_id: Trading session ID
conversation: List of messages from agent
agent: BaseAgent instance for summary generation
"""
from datetime import datetime
# Concatenate all assistant messages
assistant_messages = [
msg["content"]
for msg in conversation
if msg["role"] == "assistant"
]
combined_content = "\n\n".join(assistant_messages)
# Generate session summary (longer: 500 chars)
session_summary = await agent.generate_summary(combined_content, max_length=500)
completed_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
UPDATE trading_sessions
SET session_summary = ?,
completed_at = ?,
total_messages = ?
WHERE id = ?
""", (session_summary, completed_at, len(conversation), session_id))