Files
AI-Trader/scripts/migrate_price_data.py
Bill 76b946449e feat: implement date range API and on-demand downloads (WIP phase 2)
Phase 2 progress - API integration complete.

API Changes:
- Replace date_range (List[str]) with start_date/end_date (str)
- Add automatic end_date defaulting to start_date for single day
- Add date format validation
- Integrate PriceDataManager for on-demand downloads
- Add rate limit handling (trusts provider, no pre-config)
- Validate date ranges with configurable max days (MAX_SIMULATION_DAYS)

New Modules:
- api/date_utils.py - Date validation and expansion utilities
- scripts/migrate_price_data.py - Migration script for merged.jsonl

API Flow:
1. Validate date range (start <= end, max 30 days, not future)
2. Check missing price data coverage
3. Download missing data if AUTO_DOWNLOAD_PRICE_DATA=true
4. Priority-based download (maximize date completion)
5. Create job with available trading dates
6. Graceful handling of partial data (rate limits)

Configuration:
- AUTO_DOWNLOAD_PRICE_DATA (default: true)
- MAX_SIMULATION_DAYS (default: 30)
- No rate limit configuration needed

Still TODO:
- Update tools/price_tools.py to read from database
- Implement simulation run tracking
- Update .env.example
- Comprehensive testing
- Documentation updates

Breaking Changes:
- API request format changed (date_range -> start_date/end_date)
- This completes v0.3.0 preparation
2025-10-31 16:40:50 -04:00

167 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Migration script: Import merged.jsonl price data into SQLite database.
This script:
1. Reads existing merged.jsonl file
2. Parses OHLCV data for each symbol/date
3. Inserts into price_data table
4. Tracks coverage in price_data_coverage table
Run this once to migrate from jsonl to database.
"""
import json
import sys
from pathlib import Path
from datetime import datetime
from collections import defaultdict
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from api.database import get_db_connection, initialize_database
def migrate_merged_jsonl(
jsonl_path: str = "data/merged.jsonl",
db_path: str = "data/jobs.db"
):
"""
Migrate price data from merged.jsonl to SQLite database.
Args:
jsonl_path: Path to merged.jsonl file
db_path: Path to SQLite database
"""
jsonl_file = Path(jsonl_path)
if not jsonl_file.exists():
print(f"⚠️ merged.jsonl not found at {jsonl_path}")
print(" No price data to migrate. Skipping migration.")
return
print(f"📊 Migrating price data from {jsonl_path} to {db_path}")
# Ensure database is initialized
initialize_database(db_path)
conn = get_db_connection(db_path)
cursor = conn.cursor()
# Track what we're importing
total_records = 0
symbols_processed = set()
symbol_date_ranges = defaultdict(lambda: {"min": None, "max": None})
created_at = datetime.utcnow().isoformat() + "Z"
print("Reading merged.jsonl...")
with open(jsonl_file, 'r') as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
record = json.loads(line)
# Extract metadata
meta = record.get("Meta Data", {})
symbol = meta.get("2. Symbol")
if not symbol:
print(f"⚠️ Line {line_num}: No symbol found, skipping")
continue
symbols_processed.add(symbol)
# Extract time series data
time_series = record.get("Time Series (Daily)", {})
if not time_series:
print(f"⚠️ {symbol}: No time series data, skipping")
continue
# Insert each date's data
for date, ohlcv in time_series.items():
try:
# Parse OHLCV values
open_price = float(ohlcv.get("1. buy price") or ohlcv.get("1. open", 0))
high_price = float(ohlcv.get("2. high", 0))
low_price = float(ohlcv.get("3. low", 0))
close_price = float(ohlcv.get("4. sell price") or ohlcv.get("4. close", 0))
volume = int(ohlcv.get("5. volume", 0))
# Insert or replace price data
cursor.execute("""
INSERT OR REPLACE INTO price_data
(symbol, date, open, high, low, close, volume, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (symbol, date, open_price, high_price, low_price, close_price, volume, created_at))
total_records += 1
# Track date range for this symbol
if symbol_date_ranges[symbol]["min"] is None or date < symbol_date_ranges[symbol]["min"]:
symbol_date_ranges[symbol]["min"] = date
if symbol_date_ranges[symbol]["max"] is None or date > symbol_date_ranges[symbol]["max"]:
symbol_date_ranges[symbol]["max"] = date
except (ValueError, KeyError) as e:
print(f"⚠️ {symbol} {date}: Failed to parse OHLCV data: {e}")
continue
# Commit every 1000 records for progress
if total_records % 1000 == 0:
conn.commit()
print(f" Imported {total_records} records...")
except json.JSONDecodeError as e:
print(f"⚠️ Line {line_num}: JSON decode error: {e}")
continue
# Final commit
conn.commit()
print(f"\n✓ Imported {total_records} price records for {len(symbols_processed)} symbols")
# Update coverage tracking
print("\nUpdating coverage tracking...")
for symbol, date_range in symbol_date_ranges.items():
if date_range["min"] and date_range["max"]:
cursor.execute("""
INSERT OR REPLACE INTO price_data_coverage
(symbol, start_date, end_date, downloaded_at, source)
VALUES (?, ?, ?, ?, 'migrated_from_jsonl')
""", (symbol, date_range["min"], date_range["max"], created_at))
conn.commit()
conn.close()
print(f"✓ Coverage tracking updated for {len(symbol_date_ranges)} symbols")
print("\n✅ Migration complete!")
print(f"\nSymbols migrated: {', '.join(sorted(symbols_processed))}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Migrate merged.jsonl to SQLite database")
parser.add_argument(
"--jsonl",
default="data/merged.jsonl",
help="Path to merged.jsonl file (default: data/merged.jsonl)"
)
parser.add_argument(
"--db",
default="data/jobs.db",
help="Path to SQLite database (default: data/jobs.db)"
)
args = parser.parse_args()
migrate_merged_jsonl(args.jsonl, args.db)