mirror of
https://github.com/Xe138/AI-Trader.git
synced 2026-04-01 17:17:24 -04:00
Add automatic schema migration to handle existing databases that don't have the simulation_run_id column in the positions table. Problem: - v0.3.0-alpha.3 databases lack simulation_run_id column - CREATE TABLE IF NOT EXISTS doesn't add new columns to existing tables - Index creation fails with "no such column: simulation_run_id" Solution: - Add _migrate_schema() function to detect and migrate old schemas - Check if positions table exists and inspect its columns - ALTER TABLE to add simulation_run_id if missing - Run migration before creating indexes This allows seamless upgrades from alpha.3 to alpha.4 without manual database deletion or migration scripts. Fixes docker compose startup error: sqlite3.OperationalError: no such column: simulation_run_id Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
414 lines
13 KiB
Python
414 lines
13 KiB
Python
"""
|
|
Database utilities and schema management for AI-Trader API.
|
|
|
|
This module provides:
|
|
- SQLite connection management
|
|
- Database schema initialization (6 tables)
|
|
- ACID-compliant transaction support
|
|
"""
|
|
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
import os
|
|
|
|
|
|
def get_db_connection(db_path: str = "data/jobs.db") -> sqlite3.Connection:
|
|
"""
|
|
Get SQLite database connection with proper configuration.
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file
|
|
|
|
Returns:
|
|
Configured SQLite connection
|
|
|
|
Configuration:
|
|
- Foreign keys enabled for referential integrity
|
|
- Row factory for dict-like access
|
|
- Check same thread disabled for FastAPI async compatibility
|
|
"""
|
|
# Ensure data directory exists
|
|
db_path_obj = Path(db_path)
|
|
db_path_obj.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
return conn
|
|
|
|
|
|
def initialize_database(db_path: str = "data/jobs.db") -> None:
|
|
"""
|
|
Create all database tables with enhanced schema.
|
|
|
|
Tables created:
|
|
1. jobs - High-level job metadata and status
|
|
2. job_details - Per model-day execution tracking
|
|
3. positions - Trading positions and P&L metrics
|
|
4. holdings - Portfolio holdings per position
|
|
5. reasoning_logs - AI decision logs (optional, for detail=full)
|
|
6. tool_usage - Tool usage statistics
|
|
7. price_data - Historical OHLCV price data (replaces merged.jsonl)
|
|
8. price_data_coverage - Downloaded date range tracking per symbol
|
|
9. simulation_runs - Simulation run tracking for soft delete
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file
|
|
"""
|
|
conn = get_db_connection(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Table 1: Jobs - Job metadata and lifecycle
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
job_id TEXT PRIMARY KEY,
|
|
config_path TEXT NOT NULL,
|
|
status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'partial', 'failed')),
|
|
date_range TEXT NOT NULL,
|
|
models TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
started_at TEXT,
|
|
updated_at TEXT,
|
|
completed_at TEXT,
|
|
total_duration_seconds REAL,
|
|
error TEXT
|
|
)
|
|
""")
|
|
|
|
# Table 2: Job Details - Per model-day execution
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS job_details (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
job_id TEXT NOT NULL,
|
|
date TEXT NOT NULL,
|
|
model TEXT NOT NULL,
|
|
status TEXT NOT NULL CHECK(status IN ('pending', 'running', 'completed', 'failed')),
|
|
started_at TEXT,
|
|
completed_at TEXT,
|
|
duration_seconds REAL,
|
|
error TEXT,
|
|
FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
# Table 3: Positions - Trading positions and P&L
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS positions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
job_id TEXT NOT NULL,
|
|
date TEXT NOT NULL,
|
|
model TEXT NOT NULL,
|
|
action_id INTEGER NOT NULL,
|
|
action_type TEXT CHECK(action_type IN ('buy', 'sell', 'no_trade')),
|
|
symbol TEXT,
|
|
amount INTEGER,
|
|
price REAL,
|
|
cash REAL NOT NULL,
|
|
portfolio_value REAL NOT NULL,
|
|
daily_profit REAL,
|
|
daily_return_pct REAL,
|
|
cumulative_profit REAL,
|
|
cumulative_return_pct REAL,
|
|
simulation_run_id TEXT,
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE,
|
|
FOREIGN KEY (simulation_run_id) REFERENCES simulation_runs(run_id) ON DELETE SET NULL
|
|
)
|
|
""")
|
|
|
|
# Table 4: Holdings - Portfolio holdings
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS holdings (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
position_id INTEGER NOT NULL,
|
|
symbol TEXT NOT NULL,
|
|
quantity INTEGER NOT NULL,
|
|
FOREIGN KEY (position_id) REFERENCES positions(id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
# Table 5: Reasoning Logs - AI decision logs (optional)
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS reasoning_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
job_id TEXT NOT NULL,
|
|
date TEXT NOT NULL,
|
|
model TEXT NOT NULL,
|
|
step_number INTEGER NOT NULL,
|
|
timestamp TEXT NOT NULL,
|
|
role TEXT CHECK(role IN ('user', 'assistant', 'tool')),
|
|
content TEXT,
|
|
tool_name TEXT,
|
|
FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
# Table 6: Tool Usage - Tool usage statistics
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS tool_usage (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
job_id TEXT NOT NULL,
|
|
date TEXT NOT NULL,
|
|
model TEXT NOT NULL,
|
|
tool_name TEXT NOT NULL,
|
|
call_count INTEGER NOT NULL DEFAULT 1,
|
|
total_duration_seconds REAL,
|
|
FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
# Table 7: Price Data - OHLCV price data (replaces merged.jsonl)
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS price_data (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
symbol TEXT NOT NULL,
|
|
date TEXT NOT NULL,
|
|
open REAL NOT NULL,
|
|
high REAL NOT NULL,
|
|
low REAL NOT NULL,
|
|
close REAL NOT NULL,
|
|
volume INTEGER NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
UNIQUE(symbol, date)
|
|
)
|
|
""")
|
|
|
|
# Table 8: Price Data Coverage - Track downloaded date ranges per symbol
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS price_data_coverage (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
symbol TEXT NOT NULL,
|
|
start_date TEXT NOT NULL,
|
|
end_date TEXT NOT NULL,
|
|
downloaded_at TEXT NOT NULL,
|
|
source TEXT DEFAULT 'alpha_vantage',
|
|
UNIQUE(symbol, start_date, end_date)
|
|
)
|
|
""")
|
|
|
|
# Table 9: Simulation Runs - Track simulation runs for soft delete
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS simulation_runs (
|
|
run_id TEXT PRIMARY KEY,
|
|
job_id TEXT NOT NULL,
|
|
model TEXT NOT NULL,
|
|
start_date TEXT NOT NULL,
|
|
end_date TEXT NOT NULL,
|
|
status TEXT NOT NULL CHECK(status IN ('active', 'superseded')),
|
|
created_at TEXT NOT NULL,
|
|
superseded_at TEXT,
|
|
FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
# Run schema migrations for existing databases
|
|
_migrate_schema(cursor)
|
|
|
|
# Create indexes for performance
|
|
_create_indexes(cursor)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _migrate_schema(cursor: sqlite3.Cursor) -> None:
|
|
"""Migrate existing database schema to latest version."""
|
|
# Check if positions table exists and has simulation_run_id column
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='positions'")
|
|
if cursor.fetchone():
|
|
cursor.execute("PRAGMA table_info(positions)")
|
|
columns = [row[1] for row in cursor.fetchall()]
|
|
|
|
if 'simulation_run_id' not in columns:
|
|
# Add simulation_run_id column to existing positions table
|
|
cursor.execute("""
|
|
ALTER TABLE positions ADD COLUMN simulation_run_id TEXT
|
|
""")
|
|
|
|
|
|
def _create_indexes(cursor: sqlite3.Cursor) -> None:
|
|
"""Create database indexes for query performance."""
|
|
|
|
# Jobs table indexes
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at DESC)
|
|
""")
|
|
|
|
# Job details table indexes
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_job_details_job_id ON job_details(job_id)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_job_details_status ON job_details(status)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_job_details_unique
|
|
ON job_details(job_id, date, model)
|
|
""")
|
|
|
|
# Positions table indexes
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_positions_job_id ON positions(job_id)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_positions_date ON positions(date)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_positions_model ON positions(model)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_positions_date_model ON positions(date, model)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_positions_unique
|
|
ON positions(job_id, date, model, action_id)
|
|
""")
|
|
|
|
# Holdings table indexes
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_holdings_position_id ON holdings(position_id)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_holdings_symbol ON holdings(symbol)
|
|
""")
|
|
|
|
# Reasoning logs table indexes
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_reasoning_logs_job_date_model
|
|
ON reasoning_logs(job_id, date, model)
|
|
""")
|
|
|
|
# Tool usage table indexes
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_tool_usage_job_date_model
|
|
ON tool_usage(job_id, date, model)
|
|
""")
|
|
|
|
# Price data table indexes
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_price_data_symbol_date ON price_data(symbol, date)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_price_data_date ON price_data(date)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_price_data_symbol ON price_data(symbol)
|
|
""")
|
|
|
|
# Price data coverage table indexes
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_coverage_symbol ON price_data_coverage(symbol)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_coverage_dates ON price_data_coverage(start_date, end_date)
|
|
""")
|
|
|
|
# Simulation runs table indexes
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_runs_job_model ON simulation_runs(job_id, model)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_runs_status ON simulation_runs(status)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_runs_dates ON simulation_runs(start_date, end_date)
|
|
""")
|
|
|
|
# Positions table - add index for simulation_run_id
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_positions_run_id ON positions(simulation_run_id)
|
|
""")
|
|
|
|
|
|
def drop_all_tables(db_path: str = "data/jobs.db") -> None:
|
|
"""
|
|
Drop all database tables. USE WITH CAUTION.
|
|
|
|
This is primarily for testing and development.
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file
|
|
"""
|
|
conn = get_db_connection(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
tables = [
|
|
'tool_usage',
|
|
'reasoning_logs',
|
|
'holdings',
|
|
'positions',
|
|
'simulation_runs',
|
|
'job_details',
|
|
'jobs',
|
|
'price_data_coverage',
|
|
'price_data'
|
|
]
|
|
|
|
for table in tables:
|
|
cursor.execute(f"DROP TABLE IF EXISTS {table}")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def vacuum_database(db_path: str = "data/jobs.db") -> None:
|
|
"""
|
|
Reclaim disk space after deletions.
|
|
|
|
Should be run periodically after cleanup operations.
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file
|
|
"""
|
|
conn = get_db_connection(db_path)
|
|
conn.execute("VACUUM")
|
|
conn.close()
|
|
|
|
|
|
def get_database_stats(db_path: str = "data/jobs.db") -> dict:
|
|
"""
|
|
Get database statistics for monitoring.
|
|
|
|
Returns:
|
|
Dictionary with table row counts and database size
|
|
|
|
Example:
|
|
{
|
|
"database_size_mb": 12.5,
|
|
"jobs": 150,
|
|
"job_details": 3000,
|
|
"positions": 15000,
|
|
"holdings": 45000,
|
|
"reasoning_logs": 300000,
|
|
"tool_usage": 12000
|
|
}
|
|
"""
|
|
conn = get_db_connection(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
stats = {}
|
|
|
|
# Get database file size
|
|
if os.path.exists(db_path):
|
|
size_bytes = os.path.getsize(db_path)
|
|
stats["database_size_mb"] = round(size_bytes / (1024 * 1024), 2)
|
|
else:
|
|
stats["database_size_mb"] = 0
|
|
|
|
# Get row counts for each table
|
|
tables = ['jobs', 'job_details', 'positions', 'holdings', 'reasoning_logs', 'tool_usage',
|
|
'price_data', 'price_data_coverage', 'simulation_runs']
|
|
|
|
for table in tables:
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
|
stats[table] = cursor.fetchone()[0]
|
|
|
|
conn.close()
|
|
|
|
return stats
|