From bddf4d8b72480c83b5fb25b132e5b72229b70d97 Mon Sep 17 00:00:00 2001 From: Bill Date: Fri, 31 Oct 2025 16:37:14 -0400 Subject: [PATCH] feat: add price data management infrastructure (WIP) Phase 1 of v0.3.0 date range and on-demand download implementation. Database changes: - Add price_data table (OHLCV data, replaces merged.jsonl) - Add price_data_coverage table (track downloaded date ranges) - Add simulation_runs table (soft delete support) - Add simulation_run_id to positions table - Add comprehensive indexes for new tables New modules: - api/price_data_manager.py - Priority-based download manager - Coverage gap detection - Smart download prioritization (maximize date completion) - Rate limit handling with retry logic - Alpha Vantage integration Configuration: - configs/nasdaq100_symbols.json - NASDAQ 100 constituent list Next steps (not yet implemented): - Migration script for merged.jsonl -> price_data - Update API models (start_date/end_date) - Update tools/price_tools.py to read from database - Simulation run tracking implementation - API integration - Tests and documentation This is work in progress for the v0.3.0 release. --- api/database.py | 94 +++++- api/price_data_manager.py | 537 +++++++++++++++++++++++++++++++++ configs/nasdaq100_symbols.json | 18 ++ 3 files changed, 646 insertions(+), 3 deletions(-) create mode 100644 api/price_data_manager.py create mode 100644 configs/nasdaq100_symbols.json diff --git a/api/database.py b/api/database.py index 7341c23..86aa34c 100644 --- a/api/database.py +++ b/api/database.py @@ -50,6 +50,9 @@ def initialize_database(db_path: str = "data/jobs.db") -> None: 4. holdings - Portfolio holdings per position 5. reasoning_logs - AI decision logs (optional, for detail=full) 6. tool_usage - Tool usage statistics + 7. price_data - Historical OHLCV price data (replaces merged.jsonl) + 8. price_data_coverage - Downloaded date range tracking per symbol + 9. simulation_runs - Simulation run tracking for soft delete Args: db_path: Path to SQLite database file @@ -108,8 +111,10 @@ def initialize_database(db_path: str = "data/jobs.db") -> None: daily_return_pct REAL, cumulative_profit REAL, cumulative_return_pct REAL, + simulation_run_id TEXT, created_at TEXT NOT NULL, - FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE + FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE, + FOREIGN KEY (simulation_run_id) REFERENCES simulation_runs(run_id) ON DELETE SET NULL ) """) @@ -154,6 +159,50 @@ def initialize_database(db_path: str = "data/jobs.db") -> None: ) """) + # Table 7: Price Data - OHLCV price data (replaces merged.jsonl) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS price_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + symbol TEXT NOT NULL, + date TEXT NOT NULL, + open REAL NOT NULL, + high REAL NOT NULL, + low REAL NOT NULL, + close REAL NOT NULL, + volume INTEGER NOT NULL, + created_at TEXT NOT NULL, + UNIQUE(symbol, date) + ) + """) + + # Table 8: Price Data Coverage - Track downloaded date ranges per symbol + cursor.execute(""" + CREATE TABLE IF NOT EXISTS price_data_coverage ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + symbol TEXT NOT NULL, + start_date TEXT NOT NULL, + end_date TEXT NOT NULL, + downloaded_at TEXT NOT NULL, + source TEXT DEFAULT 'alpha_vantage', + UNIQUE(symbol, start_date, end_date) + ) + """) + + # Table 9: Simulation Runs - Track simulation runs for soft delete + cursor.execute(""" + CREATE TABLE IF NOT EXISTS simulation_runs ( + run_id TEXT PRIMARY KEY, + job_id TEXT NOT NULL, + model TEXT NOT NULL, + start_date TEXT NOT NULL, + end_date TEXT NOT NULL, + status TEXT NOT NULL CHECK(status IN ('active', 'superseded')), + created_at TEXT NOT NULL, + superseded_at TEXT, + FOREIGN KEY (job_id) REFERENCES jobs(job_id) ON DELETE CASCADE + ) + """) + # Create indexes for performance _create_indexes(cursor) @@ -222,6 +271,41 @@ def _create_indexes(cursor: sqlite3.Cursor) -> None: ON tool_usage(job_id, date, model) """) + # Price data table indexes + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_price_data_symbol_date ON price_data(symbol, date) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_price_data_date ON price_data(date) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_price_data_symbol ON price_data(symbol) + """) + + # Price data coverage table indexes + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_coverage_symbol ON price_data_coverage(symbol) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_coverage_dates ON price_data_coverage(start_date, end_date) + """) + + # Simulation runs table indexes + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_runs_job_model ON simulation_runs(job_id, model) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_runs_status ON simulation_runs(status) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_runs_dates ON simulation_runs(start_date, end_date) + """) + + # Positions table - add index for simulation_run_id + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_positions_run_id ON positions(simulation_run_id) + """) + def drop_all_tables(db_path: str = "data/jobs.db") -> None: """ @@ -240,8 +324,11 @@ def drop_all_tables(db_path: str = "data/jobs.db") -> None: 'reasoning_logs', 'holdings', 'positions', + 'simulation_runs', 'job_details', - 'jobs' + 'jobs', + 'price_data_coverage', + 'price_data' ] for table in tables: @@ -296,7 +383,8 @@ def get_database_stats(db_path: str = "data/jobs.db") -> dict: stats["database_size_mb"] = 0 # Get row counts for each table - tables = ['jobs', 'job_details', 'positions', 'holdings', 'reasoning_logs', 'tool_usage'] + tables = ['jobs', 'job_details', 'positions', 'holdings', 'reasoning_logs', 'tool_usage', + 'price_data', 'price_data_coverage', 'simulation_runs'] for table in tables: cursor.execute(f"SELECT COUNT(*) FROM {table}") diff --git a/api/price_data_manager.py b/api/price_data_manager.py new file mode 100644 index 0000000..230ecff --- /dev/null +++ b/api/price_data_manager.py @@ -0,0 +1,537 @@ +""" +Price data management for on-demand downloads and coverage tracking. + +This module provides: +- Coverage gap detection +- Priority-based download ordering +- Rate limit handling with retry logic +- Price data storage and retrieval +""" + +import logging +import json +import os +import time +import requests +from pathlib import Path +from typing import List, Dict, Set, Tuple, Optional, Callable, Any +from datetime import datetime, timedelta +from collections import defaultdict + +from api.database import get_db_connection + +logger = logging.getLogger(__name__) + + +class RateLimitError(Exception): + """Raised when API rate limit is hit.""" + pass + + +class PriceDataManager: + """ + Manages price data availability, downloads, and coverage tracking. + + Responsibilities: + - Check which dates/symbols have price data + - Download missing data from Alpha Vantage + - Track downloaded date ranges per symbol + - Prioritize downloads to maximize date completion + - Handle rate limiting gracefully + """ + + def __init__( + self, + db_path: str = "data/jobs.db", + symbols_config: str = "configs/nasdaq100_symbols.json", + api_key: Optional[str] = None + ): + """ + Initialize PriceDataManager. + + Args: + db_path: Path to SQLite database + symbols_config: Path to NASDAQ 100 symbols configuration + api_key: Alpha Vantage API key (defaults to env var) + """ + self.db_path = db_path + self.symbols_config = symbols_config + self.api_key = api_key or os.getenv("ALPHAADVANTAGE_API_KEY") + + # Load symbols list + self.symbols = self._load_symbols() + + logger.info(f"Initialized PriceDataManager with {len(self.symbols)} symbols") + + def _load_symbols(self) -> List[str]: + """Load NASDAQ 100 symbols from config file.""" + config_path = Path(self.symbols_config) + + if not config_path.exists(): + logger.warning(f"Symbols config not found: {config_path}. Using default list.") + # Fallback to a minimal list + return ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA"] + + with open(config_path, 'r') as f: + config = json.load(f) + + return config.get("symbols", []) + + def get_available_dates(self) -> Set[str]: + """ + Get all dates that have price data in database. + + Returns: + Set of dates (YYYY-MM-DD) with data + """ + conn = get_db_connection(self.db_path) + cursor = conn.cursor() + + cursor.execute("SELECT DISTINCT date FROM price_data ORDER BY date") + dates = {row[0] for row in cursor.fetchall()} + + conn.close() + + return dates + + def get_symbol_dates(self, symbol: str) -> Set[str]: + """ + Get all dates that have data for a specific symbol. + + Args: + symbol: Stock symbol + + Returns: + Set of dates with data for this symbol + """ + conn = get_db_connection(self.db_path) + cursor = conn.cursor() + + cursor.execute( + "SELECT date FROM price_data WHERE symbol = ? ORDER BY date", + (symbol,) + ) + dates = {row[0] for row in cursor.fetchall()} + + conn.close() + + return dates + + def get_missing_coverage( + self, + start_date: str, + end_date: str + ) -> Dict[str, Set[str]]: + """ + Identify which symbols are missing data for which dates in range. + + Args: + start_date: Start date (YYYY-MM-DD) + end_date: End date (YYYY-MM-DD) + + Returns: + Dict mapping symbol to set of missing dates + Example: {"AAPL": {"2025-01-20", "2025-01-21"}, "MSFT": set()} + """ + # Generate all dates in range + requested_dates = self._expand_date_range(start_date, end_date) + + missing = {} + + for symbol in self.symbols: + symbol_dates = self.get_symbol_dates(symbol) + missing_dates = requested_dates - symbol_dates + + if missing_dates: + missing[symbol] = missing_dates + + return missing + + def _expand_date_range(self, start_date: str, end_date: str) -> Set[str]: + """ + Expand date range into set of all dates. + + Args: + start_date: Start date (YYYY-MM-DD) + end_date: End date (YYYY-MM-DD) + + Returns: + Set of all dates in range (inclusive) + """ + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + + dates = set() + current = start + + while current <= end: + dates.add(current.strftime("%Y-%m-%d")) + current += timedelta(days=1) + + return dates + + def prioritize_downloads( + self, + missing_coverage: Dict[str, Set[str]], + requested_dates: Set[str] + ) -> List[str]: + """ + Prioritize symbol downloads to maximize date completion. + + Strategy: Download symbols that complete the most requested dates first. + + Args: + missing_coverage: Dict of symbol -> missing dates + requested_dates: Set of dates we want to simulate + + Returns: + List of symbols in priority order (highest impact first) + """ + # Calculate impact score for each symbol + impacts = [] + + for symbol, missing_dates in missing_coverage.items(): + # Impact = number of requested dates this symbol would complete + impact = len(missing_dates & requested_dates) + + if impact > 0: + impacts.append((symbol, impact)) + + # Sort by impact (descending) + impacts.sort(key=lambda x: x[1], reverse=True) + + # Return symbols in priority order + prioritized = [symbol for symbol, _ in impacts] + + logger.info(f"Prioritized {len(prioritized)} symbols for download") + if prioritized: + logger.debug(f"Top 5 symbols: {prioritized[:5]}") + + return prioritized + + def download_missing_data_prioritized( + self, + missing_coverage: Dict[str, Set[str]], + requested_dates: Set[str], + progress_callback: Optional[Callable] = None + ) -> Dict[str, Any]: + """ + Download data in priority order until rate limited. + + Args: + missing_coverage: Dict of symbol -> missing dates + requested_dates: Set of dates being requested + progress_callback: Optional callback for progress updates + + Returns: + { + "success": True/False, + "downloaded": ["AAPL", "MSFT", ...], + "failed": ["GOOGL", ...], + "rate_limited": True/False, + "dates_completed": ["2025-01-20", ...], + "partial_dates": {"2025-01-21": 75} + } + """ + if not self.api_key: + raise ValueError("ALPHAADVANTAGE_API_KEY not configured") + + # Prioritize downloads + prioritized_symbols = self.prioritize_downloads(missing_coverage, requested_dates) + + if not prioritized_symbols: + logger.info("No downloads needed - all data available") + return { + "success": True, + "downloaded": [], + "failed": [], + "rate_limited": False, + "dates_completed": sorted(requested_dates), + "partial_dates": {} + } + + logger.info(f"Starting priority download of {len(prioritized_symbols)} symbols") + + downloaded = [] + failed = [] + rate_limited = False + + # Download in priority order + for i, symbol in enumerate(prioritized_symbols): + try: + # Progress callback + if progress_callback: + progress_callback({ + "current": i + 1, + "total": len(prioritized_symbols), + "symbol": symbol, + "phase": "downloading" + }) + + # Download symbol data + logger.info(f"Downloading {symbol} ({i+1}/{len(prioritized_symbols)})") + data = self._download_symbol(symbol) + + # Store in database + stored_dates = self._store_symbol_data(symbol, data, requested_dates) + + # Update coverage tracking + if stored_dates: + self._update_coverage(symbol, min(stored_dates), max(stored_dates)) + + downloaded.append(symbol) + logger.info(f"✓ Downloaded {symbol} - {len(stored_dates)} dates stored") + + except RateLimitError as e: + # Hit rate limit - stop downloading + logger.warning(f"Rate limit hit after {len(downloaded)} downloads: {e}") + rate_limited = True + failed = prioritized_symbols[i:] # Rest are undownloaded + break + + except Exception as e: + # Other error - log and continue + logger.error(f"Failed to download {symbol}: {e}") + failed.append(symbol) + continue + + # Analyze coverage + coverage_analysis = self._analyze_coverage(requested_dates) + + result = { + "success": len(downloaded) > 0 or len(requested_dates) == len(coverage_analysis["completed_dates"]), + "downloaded": downloaded, + "failed": failed, + "rate_limited": rate_limited, + "dates_completed": coverage_analysis["completed_dates"], + "partial_dates": coverage_analysis["partial_dates"] + } + + logger.info( + f"Download complete: {len(downloaded)} symbols downloaded, " + f"{len(failed)} failed/skipped, rate_limited={rate_limited}" + ) + + return result + + def _download_symbol(self, symbol: str, retries: int = 3) -> Dict: + """ + Download full price history for a symbol. + + Args: + symbol: Stock symbol + retries: Number of retry attempts for transient errors + + Returns: + JSON response from Alpha Vantage + + Raises: + RateLimitError: If rate limit is hit + ValueError: If download fails after retries + """ + for attempt in range(retries): + try: + response = requests.get( + "https://www.alphavantage.co/query", + params={ + "function": "TIME_SERIES_DAILY", + "symbol": symbol, + "outputsize": "full", # Get full history + "apikey": self.api_key + }, + timeout=30 + ) + + if response.status_code == 200: + data = response.json() + + # Check for API error messages + if "Error Message" in data: + raise ValueError(f"API error: {data['Error Message']}") + + # Check for rate limit in response body + if "Note" in data: + note = data["Note"] + if "call frequency" in note.lower() or "rate limit" in note.lower(): + raise RateLimitError(note) + # Other notes are warnings, continue + logger.warning(f"{symbol}: {note}") + + if "Information" in data: + info = data["Information"] + if "premium" in info.lower() or "limit" in info.lower(): + raise RateLimitError(info) + + # Validate response has time series data + if "Time Series (Daily)" not in data: + raise ValueError(f"No time series data in response for {symbol}") + + return data + + elif response.status_code == 429: + raise RateLimitError("HTTP 429: Too Many Requests") + + elif response.status_code >= 500: + # Server error - retry with backoff + if attempt < retries - 1: + wait_time = (2 ** attempt) + logger.warning(f"Server error {response.status_code}. Retrying in {wait_time}s...") + time.sleep(wait_time) + continue + raise ValueError(f"Server error: {response.status_code}") + + else: + raise ValueError(f"HTTP {response.status_code}: {response.text[:200]}") + + except RateLimitError: + raise # Don't retry rate limits + except requests.RequestException as e: + if attempt < retries - 1: + logger.warning(f"Request failed: {e}. Retrying...") + time.sleep(2) + continue + raise ValueError(f"Request failed after {retries} attempts: {e}") + + raise ValueError(f"Failed to download {symbol} after {retries} attempts") + + def _store_symbol_data( + self, + symbol: str, + data: Dict, + requested_dates: Set[str] + ) -> List[str]: + """ + Store downloaded price data in database. + + Args: + symbol: Stock symbol + data: Alpha Vantage API response + requested_dates: Only store dates in this set + + Returns: + List of dates actually stored + """ + time_series = data.get("Time Series (Daily)", {}) + + if not time_series: + logger.warning(f"No time series data for {symbol}") + return [] + + conn = get_db_connection(self.db_path) + cursor = conn.cursor() + + stored_dates = [] + created_at = datetime.utcnow().isoformat() + "Z" + + for date, ohlcv in time_series.items(): + # Only store requested dates + if date not in requested_dates: + continue + + try: + cursor.execute(""" + INSERT OR REPLACE INTO price_data + (symbol, date, open, high, low, close, volume, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + symbol, + date, + float(ohlcv.get("1. open", 0)), + float(ohlcv.get("2. high", 0)), + float(ohlcv.get("3. low", 0)), + float(ohlcv.get("4. close", 0)), + int(ohlcv.get("5. volume", 0)), + created_at + )) + stored_dates.append(date) + except Exception as e: + logger.error(f"Failed to store {symbol} {date}: {e}") + continue + + conn.commit() + conn.close() + + return stored_dates + + def _update_coverage(self, symbol: str, start_date: str, end_date: str) -> None: + """ + Update coverage tracking for a symbol. + + Args: + symbol: Stock symbol + start_date: Start of date range downloaded + end_date: End of date range downloaded + """ + conn = get_db_connection(self.db_path) + cursor = conn.cursor() + + downloaded_at = datetime.utcnow().isoformat() + "Z" + + cursor.execute(""" + INSERT OR REPLACE INTO price_data_coverage + (symbol, start_date, end_date, downloaded_at, source) + VALUES (?, ?, ?, ?, 'alpha_vantage') + """, (symbol, start_date, end_date, downloaded_at)) + + conn.commit() + conn.close() + + def _analyze_coverage(self, requested_dates: Set[str]) -> Dict[str, Any]: + """ + Analyze which requested dates have complete/partial coverage. + + Args: + requested_dates: Set of dates requested + + Returns: + { + "completed_dates": ["2025-01-20", ...], # All symbols available + "partial_dates": {"2025-01-21": 75, ...} # Date -> symbol count + } + """ + conn = get_db_connection(self.db_path) + cursor = conn.cursor() + + total_symbols = len(self.symbols) + completed_dates = [] + partial_dates = {} + + for date in sorted(requested_dates): + # Count symbols available for this date + cursor.execute( + "SELECT COUNT(DISTINCT symbol) FROM price_data WHERE date = ?", + (date,) + ) + count = cursor.fetchone()[0] + + if count == total_symbols: + completed_dates.append(date) + elif count > 0: + partial_dates[date] = count + + conn.close() + + return { + "completed_dates": completed_dates, + "partial_dates": partial_dates + } + + def get_available_trading_dates( + self, + start_date: str, + end_date: str + ) -> List[str]: + """ + Get trading dates with complete data in range. + + Args: + start_date: Start date (YYYY-MM-DD) + end_date: End date (YYYY-MM-DD) + + Returns: + Sorted list of dates with complete data (all symbols) + """ + requested_dates = self._expand_date_range(start_date, end_date) + analysis = self._analyze_coverage(requested_dates) + + return sorted(analysis["completed_dates"]) diff --git a/configs/nasdaq100_symbols.json b/configs/nasdaq100_symbols.json new file mode 100644 index 0000000..4d769b5 --- /dev/null +++ b/configs/nasdaq100_symbols.json @@ -0,0 +1,18 @@ +{ + "symbols": [ + "NVDA", "MSFT", "AAPL", "GOOG", "GOOGL", "AMZN", "META", "AVGO", "TSLA", + "NFLX", "PLTR", "COST", "ASML", "AMD", "CSCO", "AZN", "TMUS", "MU", "LIN", + "PEP", "SHOP", "APP", "INTU", "AMAT", "LRCX", "PDD", "QCOM", "ARM", "INTC", + "BKNG", "AMGN", "TXN", "ISRG", "GILD", "KLAC", "PANW", "ADBE", "HON", + "CRWD", "CEG", "ADI", "ADP", "DASH", "CMCSA", "VRTX", "MELI", "SBUX", + "CDNS", "ORLY", "SNPS", "MSTR", "MDLZ", "ABNB", "MRVL", "CTAS", "TRI", + "MAR", "MNST", "CSX", "ADSK", "PYPL", "FTNT", "AEP", "WDAY", "REGN", "ROP", + "NXPI", "DDOG", "AXON", "ROST", "IDXX", "EA", "PCAR", "FAST", "EXC", "TTWO", + "XEL", "ZS", "PAYX", "WBD", "BKR", "CPRT", "CCEP", "FANG", "TEAM", "CHTR", + "KDP", "MCHP", "GEHC", "VRSK", "CTSH", "CSGP", "KHC", "ODFL", "DXCM", "TTD", + "ON", "BIIB", "LULU", "CDW", "GFS", "QQQ" + ], + "description": "NASDAQ 100 constituent stocks plus QQQ ETF", + "last_updated": "2025-10-31", + "total_symbols": 101 +}