Files
AI-Trader/tests/unit/test_database_schema.py
Bill 14cf88f642 test: improve test coverage from 61% to 84.81%
Major improvements:
- Fixed all 42 broken tests (database connection leaks)
- Added db_connection() context manager for proper cleanup
- Created comprehensive test suites for undertested modules

New test coverage:
- tools/general_tools.py: 26 tests (97% coverage)
- tools/price_tools.py: 11 tests (validates NASDAQ symbols, date handling)
- api/price_data_manager.py: 12 tests (85% coverage)
- api/routes/results_v2.py: 3 tests (98% coverage)
- agent/reasoning_summarizer.py: 2 tests (87% coverage)
- api/routes/period_metrics.py: 2 edge case tests (100% coverage)
- agent/mock_provider: 1 test (100% coverage)

Database fixes:
- Added db_connection() context manager to prevent leaks
- Updated 16+ test files to use context managers
- Fixed drop_all_tables() to match new schema
- Added CHECK constraint for action_type
- Added ON DELETE CASCADE to trading_days foreign key

Test improvements:
- Updated SQL INSERT statements with all required fields
- Fixed date parameter handling in API integration tests
- Added edge case tests for validation functions
- Fixed import errors across test suite

Results:
- Total coverage: 84.81% (was 61%)
- Tests passing: 406 (was 364 with 42 failures)
- Total lines covered: 6364 of 7504

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-07 21:02:38 -05:00

46 lines
1.6 KiB
Python

import pytest
import sqlite3
from api.database import initialize_database, get_db_connection, db_connection
def test_jobs_table_allows_downloading_data_status(tmp_path):
"""Test that jobs table accepts downloading_data status."""
db_path = str(tmp_path / "test.db")
initialize_database(db_path)
with db_connection(db_path) as conn:
cursor = conn.cursor()
# Should not raise constraint violation
cursor.execute("""
INSERT INTO jobs (job_id, config_path, status, date_range, models, created_at)
VALUES ('test-123', 'config.json', 'downloading_data', '[]', '[]', '2025-11-01T00:00:00Z')
""")
conn.commit()
# Verify it was inserted
cursor.execute("SELECT status FROM jobs WHERE job_id = 'test-123'")
result = cursor.fetchone()
assert result[0] == "downloading_data"
def test_jobs_table_has_warnings_column(tmp_path):
"""Test that jobs table has warnings TEXT column."""
db_path = str(tmp_path / "test.db")
initialize_database(db_path)
with db_connection(db_path) as conn:
cursor = conn.cursor()
# Insert job with warnings
cursor.execute("""
INSERT INTO jobs (job_id, config_path, status, date_range, models, created_at, warnings)
VALUES ('test-456', 'config.json', 'completed', '[]', '[]', '2025-11-01T00:00:00Z', '["Warning 1", "Warning 2"]')
""")
conn.commit()
# Verify warnings can be retrieved
cursor.execute("SELECT warnings FROM jobs WHERE job_id = 'test-456'")
result = cursor.fetchone()
assert result[0] == '["Warning 1", "Warning 2"]'