fix: add migration logic for warnings column and update tests

Critical fixes identified in code review:

1. Add warnings column migration to _migrate_schema()
   - Checks if warnings column exists in jobs table
   - Adds column via ALTER TABLE if missing
   - Ensures existing databases get new column on upgrade

2. Document CHECK constraint limitation
   - Added docstring explaining ALTER TABLE cannot add CHECK constraints
   - Notes that "downloading_data" status requires fresh DB or manual migration

3. Add comprehensive migration tests
   - test_migration_adds_warnings_column: Verifies warnings column migration
   - test_migration_adds_simulation_run_id_column: Tests existing migration
   - Both tests include cleanup to prevent cross-test contamination

4. Update test fixtures and expectations
   - Updated clean_db fixture to delete from all 9 tables
   - Fixed table count assertions (6 -> 9 tables)
   - Updated expected columns in schema tests

All 21 database tests now pass.
This commit is contained in:
2025-11-01 23:17:25 -04:00
parent 711ae5df73
commit baa44c208a
3 changed files with 158 additions and 5 deletions

View File

@@ -286,7 +286,13 @@ def cleanup_dev_database(db_path: str = "data/trading_dev.db", data_path: str =
def _migrate_schema(cursor: sqlite3.Cursor) -> None:
"""Migrate existing database schema to latest version."""
"""
Migrate existing database schema to latest version.
Note: Cannot add CHECK constraints to existing columns via ALTER TABLE.
The "downloading_data" status in jobs table requires a fresh database
or manual migration if upgrading from an older schema version.
"""
# Check if positions table exists and has simulation_run_id column
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='positions'")
if cursor.fetchone():
@@ -299,6 +305,18 @@ def _migrate_schema(cursor: sqlite3.Cursor) -> None:
ALTER TABLE positions ADD COLUMN simulation_run_id TEXT
""")
# Check if jobs table exists and has warnings column
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='jobs'")
if cursor.fetchone():
cursor.execute("PRAGMA table_info(jobs)")
columns = [row[1] for row in cursor.fetchall()]
if 'warnings' not in columns:
# Add warnings column to existing jobs table
cursor.execute("""
ALTER TABLE jobs ADD COLUMN warnings TEXT
""")
def _create_indexes(cursor: sqlite3.Cursor) -> None:
"""Create database indexes for query performance."""

View File

@@ -56,8 +56,11 @@ def clean_db(test_db_path):
cursor.execute("DELETE FROM reasoning_logs")
cursor.execute("DELETE FROM holdings")
cursor.execute("DELETE FROM positions")
cursor.execute("DELETE FROM simulation_runs")
cursor.execute("DELETE FROM job_details")
cursor.execute("DELETE FROM jobs")
cursor.execute("DELETE FROM price_data_coverage")
cursor.execute("DELETE FROM price_data")
conn.commit()
conn.close()

View File

@@ -90,7 +90,7 @@ class TestSchemaInitialization:
"""Test database schema initialization."""
def test_initialize_database_creates_all_tables(self, clean_db):
"""Should create all 6 tables."""
"""Should create all 9 tables."""
conn = get_db_connection(clean_db)
cursor = conn.cursor()
@@ -109,7 +109,10 @@ class TestSchemaInitialization:
'jobs',
'positions',
'reasoning_logs',
'tool_usage'
'tool_usage',
'price_data',
'price_data_coverage',
'simulation_runs'
]
assert sorted(tables) == sorted(expected_tables)
@@ -135,7 +138,8 @@ class TestSchemaInitialization:
'updated_at': 'TEXT',
'completed_at': 'TEXT',
'total_duration_seconds': 'REAL',
'error': 'TEXT'
'error': 'TEXT',
'warnings': 'TEXT'
}
for col_name, col_type in expected_columns.items():
@@ -367,7 +371,7 @@ class TestUtilityFunctions:
conn = get_db_connection(test_db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
assert cursor.fetchone()[0] == 6
assert cursor.fetchone()[0] == 9 # Updated to reflect all tables
conn.close()
# Drop all tables
@@ -438,6 +442,134 @@ class TestUtilityFunctions:
assert stats["database_size_mb"] > 0
@pytest.mark.unit
class TestSchemaMigration:
"""Test database schema migration functionality."""
def test_migration_adds_warnings_column(self, test_db_path):
"""Should add warnings column to existing jobs table without it."""
from api.database import drop_all_tables
# Start with a clean slate
drop_all_tables(test_db_path)
# Create database without warnings column (simulate old schema)
conn = get_db_connection(test_db_path)
cursor = conn.cursor()
# Create jobs table without warnings column (old schema)
cursor.execute("""
CREATE TABLE jobs (
job_id TEXT PRIMARY KEY,
config_path TEXT NOT NULL,
status TEXT NOT NULL CHECK(status IN ('pending', 'downloading_data', 'running', 'completed', 'partial', 'failed')),
date_range TEXT NOT NULL,
models TEXT NOT NULL,
created_at TEXT NOT NULL,
started_at TEXT,
updated_at TEXT,
completed_at TEXT,
total_duration_seconds REAL,
error TEXT
)
""")
conn.commit()
# Verify warnings column doesn't exist
cursor.execute("PRAGMA table_info(jobs)")
columns = [row[1] for row in cursor.fetchall()]
assert 'warnings' not in columns
conn.close()
# Run initialize_database which should trigger migration
initialize_database(test_db_path)
# Verify warnings column was added
conn = get_db_connection(test_db_path)
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(jobs)")
columns = [row[1] for row in cursor.fetchall()]
assert 'warnings' in columns
# Verify we can insert and query warnings
cursor.execute("""
INSERT INTO jobs (job_id, config_path, status, date_range, models, created_at, warnings)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", ("test-job", "configs/test.json", "completed", "[]", "[]", "2025-01-20T00:00:00Z", "Test warning"))
conn.commit()
cursor.execute("SELECT warnings FROM jobs WHERE job_id = ?", ("test-job",))
result = cursor.fetchone()
assert result[0] == "Test warning"
conn.close()
# Clean up after test - drop all tables so we don't affect other tests
drop_all_tables(test_db_path)
def test_migration_adds_simulation_run_id_column(self, test_db_path):
"""Should add simulation_run_id column to existing positions table without it."""
from api.database import drop_all_tables
# Start with a clean slate
drop_all_tables(test_db_path)
# Create database without simulation_run_id column (simulate old schema)
conn = get_db_connection(test_db_path)
cursor = conn.cursor()
# Create jobs table first (for foreign key)
cursor.execute("""
CREATE TABLE jobs (
job_id TEXT PRIMARY KEY,
config_path TEXT NOT NULL,
status TEXT NOT NULL CHECK(status IN ('pending', 'downloading_data', 'running', 'completed', 'partial', 'failed')),
date_range TEXT NOT NULL,
models TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
# Create positions table without simulation_run_id column (old schema)
cursor.execute("""
CREATE TABLE positions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL,
date TEXT NOT NULL,
model TEXT NOT NULL,
action_id INTEGER NOT NULL,
cash REAL NOT NULL,
portfolio_value REAL NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE
)
""")
conn.commit()
# Verify simulation_run_id column doesn't exist
cursor.execute("PRAGMA table_info(positions)")
columns = [row[1] for row in cursor.fetchall()]
assert 'simulation_run_id' not in columns
conn.close()
# Run initialize_database which should trigger migration
initialize_database(test_db_path)
# Verify simulation_run_id column was added
conn = get_db_connection(test_db_path)
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(positions)")
columns = [row[1] for row in cursor.fetchall()]
assert 'simulation_run_id' in columns
conn.close()
# Clean up after test - drop all tables so we don't affect other tests
drop_all_tables(test_db_path)
@pytest.mark.unit
class TestCheckConstraints:
"""Test CHECK constraints on table columns."""