feat: add price data management infrastructure (WIP)

Phase 1 of v0.3.0 date range and on-demand download implementation.

Database changes:
- Add price_data table (OHLCV data, replaces merged.jsonl)
- Add price_data_coverage table (track downloaded date ranges)
- Add simulation_runs table (soft delete support)
- Add simulation_run_id to positions table
- Add comprehensive indexes for new tables

New modules:
- api/price_data_manager.py - Priority-based download manager
  - Coverage gap detection
  - Smart download prioritization (maximize date completion)
  - Rate limit handling with retry logic
  - Alpha Vantage integration

Configuration:
- configs/nasdaq100_symbols.json - NASDAQ 100 constituent list

Next steps (not yet implemented):
- Migration script for merged.jsonl -> price_data
- Update API models (start_date/end_date)
- Update tools/price_tools.py to read from database
- Simulation run tracking implementation
- API integration
- Tests and documentation

This is work in progress for the v0.3.0 release.
This commit is contained in:
2025-10-31 16:37:14 -04:00
parent 8e7e80807b
commit bddf4d8b72
3 changed files with 646 additions and 3 deletions

View File

@@ -50,6 +50,9 @@ def initialize_database(db_path: str = "data/jobs.db") -> None:
4. holdings - Portfolio holdings per position
5. reasoning_logs - AI decision logs (optional, for detail=full)
6. tool_usage - Tool usage statistics
7. price_data - Historical OHLCV price data (replaces merged.jsonl)
8. price_data_coverage - Downloaded date range tracking per symbol
9. simulation_runs - Simulation run tracking for soft delete
Args:
db_path: Path to SQLite database file
@@ -108,8 +111,10 @@ def initialize_database(db_path: str = "data/jobs.db") -> None:
daily_return_pct REAL,
cumulative_profit REAL,
cumulative_return_pct REAL,
simulation_run_id TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE
FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE,
FOREIGN KEY (simulation_run_id) REFERENCES simulation_runs(run_id) ON DELETE SET NULL
)
""")
@@ -154,6 +159,50 @@ def initialize_database(db_path: str = "data/jobs.db") -> None:
)
""")
# Table 7: Price Data - OHLCV price data (replaces merged.jsonl)
cursor.execute("""
CREATE TABLE IF NOT EXISTS price_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
date TEXT NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume INTEGER NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(symbol, date)
)
""")
# Table 8: Price Data Coverage - Track downloaded date ranges per symbol
cursor.execute("""
CREATE TABLE IF NOT EXISTS price_data_coverage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
start_date TEXT NOT NULL,
end_date TEXT NOT NULL,
downloaded_at TEXT NOT NULL,
source TEXT DEFAULT 'alpha_vantage',
UNIQUE(symbol, start_date, end_date)
)
""")
# Table 9: Simulation Runs - Track simulation runs for soft delete
cursor.execute("""
CREATE TABLE IF NOT EXISTS simulation_runs (
run_id TEXT PRIMARY KEY,
job_id TEXT NOT NULL,
model TEXT NOT NULL,
start_date TEXT NOT NULL,
end_date TEXT NOT NULL,
status TEXT NOT NULL CHECK(status IN ('active', 'superseded')),
created_at TEXT NOT NULL,
superseded_at TEXT,
FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE
)
""")
# Create indexes for performance
_create_indexes(cursor)
@@ -222,6 +271,41 @@ def _create_indexes(cursor: sqlite3.Cursor) -> None:
ON tool_usage(job_id, date, model)
""")
# Price data table indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_price_data_symbol_date ON price_data(symbol, date)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_price_data_date ON price_data(date)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_price_data_symbol ON price_data(symbol)
""")
# Price data coverage table indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_coverage_symbol ON price_data_coverage(symbol)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_coverage_dates ON price_data_coverage(start_date, end_date)
""")
# Simulation runs table indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_runs_job_model ON simulation_runs(job_id, model)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_runs_status ON simulation_runs(status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_runs_dates ON simulation_runs(start_date, end_date)
""")
# Positions table - add index for simulation_run_id
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_positions_run_id ON positions(simulation_run_id)
""")
def drop_all_tables(db_path: str = "data/jobs.db") -> None:
"""
@@ -240,8 +324,11 @@ def drop_all_tables(db_path: str = "data/jobs.db") -> None:
'reasoning_logs',
'holdings',
'positions',
'simulation_runs',
'job_details',
'jobs'
'jobs',
'price_data_coverage',
'price_data'
]
for table in tables:
@@ -296,7 +383,8 @@ def get_database_stats(db_path: str = "data/jobs.db") -> dict:
stats["database_size_mb"] = 0
# Get row counts for each table
tables = ['jobs', 'job_details', 'positions', 'holdings', 'reasoning_logs', 'tool_usage']
tables = ['jobs', 'job_details', 'positions', 'holdings', 'reasoning_logs', 'tool_usage',
'price_data', 'price_data_coverage', 'simulation_runs']
for table in tables:
cursor.execute(f"SELECT COUNT(*) FROM {table}")

537
api/price_data_manager.py Normal file
View File

@@ -0,0 +1,537 @@
"""
Price data management for on-demand downloads and coverage tracking.
This module provides:
- Coverage gap detection
- Priority-based download ordering
- Rate limit handling with retry logic
- Price data storage and retrieval
"""
import logging
import json
import os
import time
import requests
from pathlib import Path
from typing import List, Dict, Set, Tuple, Optional, Callable, Any
from datetime import datetime, timedelta
from collections import defaultdict
from api.database import get_db_connection
logger = logging.getLogger(__name__)
class RateLimitError(Exception):
"""Raised when API rate limit is hit."""
pass
class PriceDataManager:
"""
Manages price data availability, downloads, and coverage tracking.
Responsibilities:
- Check which dates/symbols have price data
- Download missing data from Alpha Vantage
- Track downloaded date ranges per symbol
- Prioritize downloads to maximize date completion
- Handle rate limiting gracefully
"""
def __init__(
self,
db_path: str = "data/jobs.db",
symbols_config: str = "configs/nasdaq100_symbols.json",
api_key: Optional[str] = None
):
"""
Initialize PriceDataManager.
Args:
db_path: Path to SQLite database
symbols_config: Path to NASDAQ 100 symbols configuration
api_key: Alpha Vantage API key (defaults to env var)
"""
self.db_path = db_path
self.symbols_config = symbols_config
self.api_key = api_key or os.getenv("ALPHAADVANTAGE_API_KEY")
# Load symbols list
self.symbols = self._load_symbols()
logger.info(f"Initialized PriceDataManager with {len(self.symbols)} symbols")
def _load_symbols(self) -> List[str]:
"""Load NASDAQ 100 symbols from config file."""
config_path = Path(self.symbols_config)
if not config_path.exists():
logger.warning(f"Symbols config not found: {config_path}. Using default list.")
# Fallback to a minimal list
return ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA"]
with open(config_path, 'r') as f:
config = json.load(f)
return config.get("symbols", [])
def get_available_dates(self) -> Set[str]:
"""
Get all dates that have price data in database.
Returns:
Set of dates (YYYY-MM-DD) with data
"""
conn = get_db_connection(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT date FROM price_data ORDER BY date")
dates = {row[0] for row in cursor.fetchall()}
conn.close()
return dates
def get_symbol_dates(self, symbol: str) -> Set[str]:
"""
Get all dates that have data for a specific symbol.
Args:
symbol: Stock symbol
Returns:
Set of dates with data for this symbol
"""
conn = get_db_connection(self.db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT date FROM price_data WHERE symbol = ? ORDER BY date",
(symbol,)
)
dates = {row[0] for row in cursor.fetchall()}
conn.close()
return dates
def get_missing_coverage(
self,
start_date: str,
end_date: str
) -> Dict[str, Set[str]]:
"""
Identify which symbols are missing data for which dates in range.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Dict mapping symbol to set of missing dates
Example: {"AAPL": {"2025-01-20", "2025-01-21"}, "MSFT": set()}
"""
# Generate all dates in range
requested_dates = self._expand_date_range(start_date, end_date)
missing = {}
for symbol in self.symbols:
symbol_dates = self.get_symbol_dates(symbol)
missing_dates = requested_dates - symbol_dates
if missing_dates:
missing[symbol] = missing_dates
return missing
def _expand_date_range(self, start_date: str, end_date: str) -> Set[str]:
"""
Expand date range into set of all dates.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Set of all dates in range (inclusive)
"""
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
dates = set()
current = start
while current <= end:
dates.add(current.strftime("%Y-%m-%d"))
current += timedelta(days=1)
return dates
def prioritize_downloads(
self,
missing_coverage: Dict[str, Set[str]],
requested_dates: Set[str]
) -> List[str]:
"""
Prioritize symbol downloads to maximize date completion.
Strategy: Download symbols that complete the most requested dates first.
Args:
missing_coverage: Dict of symbol -> missing dates
requested_dates: Set of dates we want to simulate
Returns:
List of symbols in priority order (highest impact first)
"""
# Calculate impact score for each symbol
impacts = []
for symbol, missing_dates in missing_coverage.items():
# Impact = number of requested dates this symbol would complete
impact = len(missing_dates & requested_dates)
if impact > 0:
impacts.append((symbol, impact))
# Sort by impact (descending)
impacts.sort(key=lambda x: x[1], reverse=True)
# Return symbols in priority order
prioritized = [symbol for symbol, _ in impacts]
logger.info(f"Prioritized {len(prioritized)} symbols for download")
if prioritized:
logger.debug(f"Top 5 symbols: {prioritized[:5]}")
return prioritized
def download_missing_data_prioritized(
self,
missing_coverage: Dict[str, Set[str]],
requested_dates: Set[str],
progress_callback: Optional[Callable] = None
) -> Dict[str, Any]:
"""
Download data in priority order until rate limited.
Args:
missing_coverage: Dict of symbol -> missing dates
requested_dates: Set of dates being requested
progress_callback: Optional callback for progress updates
Returns:
{
"success": True/False,
"downloaded": ["AAPL", "MSFT", ...],
"failed": ["GOOGL", ...],
"rate_limited": True/False,
"dates_completed": ["2025-01-20", ...],
"partial_dates": {"2025-01-21": 75}
}
"""
if not self.api_key:
raise ValueError("ALPHAADVANTAGE_API_KEY not configured")
# Prioritize downloads
prioritized_symbols = self.prioritize_downloads(missing_coverage, requested_dates)
if not prioritized_symbols:
logger.info("No downloads needed - all data available")
return {
"success": True,
"downloaded": [],
"failed": [],
"rate_limited": False,
"dates_completed": sorted(requested_dates),
"partial_dates": {}
}
logger.info(f"Starting priority download of {len(prioritized_symbols)} symbols")
downloaded = []
failed = []
rate_limited = False
# Download in priority order
for i, symbol in enumerate(prioritized_symbols):
try:
# Progress callback
if progress_callback:
progress_callback({
"current": i + 1,
"total": len(prioritized_symbols),
"symbol": symbol,
"phase": "downloading"
})
# Download symbol data
logger.info(f"Downloading {symbol} ({i+1}/{len(prioritized_symbols)})")
data = self._download_symbol(symbol)
# Store in database
stored_dates = self._store_symbol_data(symbol, data, requested_dates)
# Update coverage tracking
if stored_dates:
self._update_coverage(symbol, min(stored_dates), max(stored_dates))
downloaded.append(symbol)
logger.info(f"✓ Downloaded {symbol} - {len(stored_dates)} dates stored")
except RateLimitError as e:
# Hit rate limit - stop downloading
logger.warning(f"Rate limit hit after {len(downloaded)} downloads: {e}")
rate_limited = True
failed = prioritized_symbols[i:] # Rest are undownloaded
break
except Exception as e:
# Other error - log and continue
logger.error(f"Failed to download {symbol}: {e}")
failed.append(symbol)
continue
# Analyze coverage
coverage_analysis = self._analyze_coverage(requested_dates)
result = {
"success": len(downloaded) > 0 or len(requested_dates) == len(coverage_analysis["completed_dates"]),
"downloaded": downloaded,
"failed": failed,
"rate_limited": rate_limited,
"dates_completed": coverage_analysis["completed_dates"],
"partial_dates": coverage_analysis["partial_dates"]
}
logger.info(
f"Download complete: {len(downloaded)} symbols downloaded, "
f"{len(failed)} failed/skipped, rate_limited={rate_limited}"
)
return result
def _download_symbol(self, symbol: str, retries: int = 3) -> Dict:
"""
Download full price history for a symbol.
Args:
symbol: Stock symbol
retries: Number of retry attempts for transient errors
Returns:
JSON response from Alpha Vantage
Raises:
RateLimitError: If rate limit is hit
ValueError: If download fails after retries
"""
for attempt in range(retries):
try:
response = requests.get(
"https://www.alphavantage.co/query",
params={
"function": "TIME_SERIES_DAILY",
"symbol": symbol,
"outputsize": "full", # Get full history
"apikey": self.api_key
},
timeout=30
)
if response.status_code == 200:
data = response.json()
# Check for API error messages
if "Error Message" in data:
raise ValueError(f"API error: {data['Error Message']}")
# Check for rate limit in response body
if "Note" in data:
note = data["Note"]
if "call frequency" in note.lower() or "rate limit" in note.lower():
raise RateLimitError(note)
# Other notes are warnings, continue
logger.warning(f"{symbol}: {note}")
if "Information" in data:
info = data["Information"]
if "premium" in info.lower() or "limit" in info.lower():
raise RateLimitError(info)
# Validate response has time series data
if "Time Series (Daily)" not in data:
raise ValueError(f"No time series data in response for {symbol}")
return data
elif response.status_code == 429:
raise RateLimitError("HTTP 429: Too Many Requests")
elif response.status_code >= 500:
# Server error - retry with backoff
if attempt < retries - 1:
wait_time = (2 ** attempt)
logger.warning(f"Server error {response.status_code}. Retrying in {wait_time}s...")
time.sleep(wait_time)
continue
raise ValueError(f"Server error: {response.status_code}")
else:
raise ValueError(f"HTTP {response.status_code}: {response.text[:200]}")
except RateLimitError:
raise # Don't retry rate limits
except requests.RequestException as e:
if attempt < retries - 1:
logger.warning(f"Request failed: {e}. Retrying...")
time.sleep(2)
continue
raise ValueError(f"Request failed after {retries} attempts: {e}")
raise ValueError(f"Failed to download {symbol} after {retries} attempts")
def _store_symbol_data(
self,
symbol: str,
data: Dict,
requested_dates: Set[str]
) -> List[str]:
"""
Store downloaded price data in database.
Args:
symbol: Stock symbol
data: Alpha Vantage API response
requested_dates: Only store dates in this set
Returns:
List of dates actually stored
"""
time_series = data.get("Time Series (Daily)", {})
if not time_series:
logger.warning(f"No time series data for {symbol}")
return []
conn = get_db_connection(self.db_path)
cursor = conn.cursor()
stored_dates = []
created_at = datetime.utcnow().isoformat() + "Z"
for date, ohlcv in time_series.items():
# Only store requested dates
if date not in requested_dates:
continue
try:
cursor.execute("""
INSERT OR REPLACE INTO price_data
(symbol, date, open, high, low, close, volume, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
symbol,
date,
float(ohlcv.get("1. open", 0)),
float(ohlcv.get("2. high", 0)),
float(ohlcv.get("3. low", 0)),
float(ohlcv.get("4. close", 0)),
int(ohlcv.get("5. volume", 0)),
created_at
))
stored_dates.append(date)
except Exception as e:
logger.error(f"Failed to store {symbol} {date}: {e}")
continue
conn.commit()
conn.close()
return stored_dates
def _update_coverage(self, symbol: str, start_date: str, end_date: str) -> None:
"""
Update coverage tracking for a symbol.
Args:
symbol: Stock symbol
start_date: Start of date range downloaded
end_date: End of date range downloaded
"""
conn = get_db_connection(self.db_path)
cursor = conn.cursor()
downloaded_at = datetime.utcnow().isoformat() + "Z"
cursor.execute("""
INSERT OR REPLACE INTO price_data_coverage
(symbol, start_date, end_date, downloaded_at, source)
VALUES (?, ?, ?, ?, 'alpha_vantage')
""", (symbol, start_date, end_date, downloaded_at))
conn.commit()
conn.close()
def _analyze_coverage(self, requested_dates: Set[str]) -> Dict[str, Any]:
"""
Analyze which requested dates have complete/partial coverage.
Args:
requested_dates: Set of dates requested
Returns:
{
"completed_dates": ["2025-01-20", ...], # All symbols available
"partial_dates": {"2025-01-21": 75, ...} # Date -> symbol count
}
"""
conn = get_db_connection(self.db_path)
cursor = conn.cursor()
total_symbols = len(self.symbols)
completed_dates = []
partial_dates = {}
for date in sorted(requested_dates):
# Count symbols available for this date
cursor.execute(
"SELECT COUNT(DISTINCT symbol) FROM price_data WHERE date = ?",
(date,)
)
count = cursor.fetchone()[0]
if count == total_symbols:
completed_dates.append(date)
elif count > 0:
partial_dates[date] = count
conn.close()
return {
"completed_dates": completed_dates,
"partial_dates": partial_dates
}
def get_available_trading_dates(
self,
start_date: str,
end_date: str
) -> List[str]:
"""
Get trading dates with complete data in range.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Sorted list of dates with complete data (all symbols)
"""
requested_dates = self._expand_date_range(start_date, end_date)
analysis = self._analyze_coverage(requested_dates)
return sorted(analysis["completed_dates"])