mirror of
https://github.com/Xe138/AI-Trader.git
synced 2026-04-02 01:27:24 -04:00
873 lines
27 KiB
Python
873 lines
27 KiB
Python
import os
|
||
import json
|
||
import numpy as np
|
||
import pandas as pd
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple
|
||
import sys
|
||
|
||
# Add project root directory to Python path to allow running this file from subdirectories
|
||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
if project_root not in sys.path:
|
||
sys.path.insert(0, project_root)
|
||
|
||
from tools.price_tools import (
|
||
get_yesterday_date,
|
||
get_open_prices,
|
||
get_yesterday_open_and_close_price,
|
||
get_today_init_position,
|
||
get_latest_position,
|
||
all_nasdaq_100_symbols
|
||
)
|
||
from tools.general_tools import get_config_value
|
||
|
||
|
||
def calculate_portfolio_value(positions: Dict[str, float], prices: Dict[str, Optional[float]], cash: float = 0.0) -> float:
|
||
"""
|
||
Calculate total portfolio value
|
||
|
||
Args:
|
||
positions: Position dictionary in format {symbol: shares}
|
||
prices: Price dictionary in format {symbol_price: price}
|
||
cash: Cash balance
|
||
|
||
Returns:
|
||
Total portfolio value
|
||
"""
|
||
total_value = cash
|
||
|
||
for symbol, shares in positions.items():
|
||
if symbol == "CASH":
|
||
continue
|
||
price_key = f'{symbol}_price'
|
||
price = prices.get(price_key)
|
||
if price is not None and shares > 0:
|
||
total_value += shares * price
|
||
|
||
return total_value
|
||
|
||
|
||
def get_available_date_range(modelname: str) -> Tuple[str, str]:
|
||
"""
|
||
Get available data date range
|
||
|
||
Args:
|
||
modelname: Model name
|
||
|
||
Returns:
|
||
Tuple of (earliest date, latest date) in YYYY-MM-DD format
|
||
"""
|
||
base_dir = Path(__file__).resolve().parents[1]
|
||
position_file = base_dir / "data" / "agent_data" / modelname / "position" / "position.jsonl"
|
||
|
||
if not position_file.exists():
|
||
return "", ""
|
||
|
||
dates = []
|
||
|
||
with position_file.open("r", encoding="utf-8") as f:
|
||
for line in f:
|
||
if not line.strip():
|
||
continue
|
||
try:
|
||
doc = json.loads(line)
|
||
date = doc.get("date")
|
||
if date:
|
||
dates.append(date)
|
||
except Exception:
|
||
continue
|
||
|
||
if not dates:
|
||
return "", ""
|
||
|
||
dates.sort()
|
||
return dates[0], dates[-1]
|
||
|
||
|
||
def get_daily_portfolio_values(modelname: str, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, float]:
|
||
"""
|
||
Get daily portfolio values
|
||
|
||
Args:
|
||
modelname: Model name
|
||
start_date: Start date in YYYY-MM-DD format, uses earliest date if None
|
||
end_date: End date in YYYY-MM-DD format, uses latest date if None
|
||
|
||
Returns:
|
||
Dictionary of daily portfolio values in format {date: portfolio_value}
|
||
"""
|
||
base_dir = Path(__file__).resolve().parents[1]
|
||
position_file = base_dir / "data" / "agent_data" / modelname / "position" / "position.jsonl"
|
||
merged_file = base_dir / "data" / "merged.jsonl"
|
||
|
||
if not position_file.exists() or not merged_file.exists():
|
||
return {}
|
||
|
||
# Get available date range if not specified
|
||
if start_date is None or end_date is None:
|
||
earliest_date, latest_date = get_available_date_range(modelname)
|
||
if not earliest_date or not latest_date:
|
||
return {}
|
||
|
||
if start_date is None:
|
||
start_date = earliest_date
|
||
if end_date is None:
|
||
end_date = latest_date
|
||
|
||
# Read position data
|
||
position_data = []
|
||
with position_file.open("r", encoding="utf-8") as f:
|
||
for line in f:
|
||
if not line.strip():
|
||
continue
|
||
try:
|
||
doc = json.loads(line)
|
||
position_data.append(doc)
|
||
except Exception:
|
||
continue
|
||
|
||
# Read price data
|
||
price_data = {}
|
||
with merged_file.open("r", encoding="utf-8") as f:
|
||
for line in f:
|
||
if not line.strip():
|
||
continue
|
||
try:
|
||
doc = json.loads(line)
|
||
meta = doc.get("Meta Data", {})
|
||
symbol = meta.get("2. Symbol")
|
||
if symbol:
|
||
price_data[symbol] = doc.get("Time Series (Daily)", {})
|
||
except Exception:
|
||
continue
|
||
|
||
# Calculate daily portfolio values
|
||
daily_values = {}
|
||
|
||
# Group position data by date
|
||
positions_by_date = {}
|
||
for record in position_data:
|
||
date = record.get("date")
|
||
if date:
|
||
if date not in positions_by_date:
|
||
positions_by_date[date] = []
|
||
positions_by_date[date].append(record)
|
||
|
||
# For each date, sort records by id and take latest position
|
||
for date, records in positions_by_date.items():
|
||
if start_date and date < start_date:
|
||
continue
|
||
if end_date and date > end_date:
|
||
continue
|
||
|
||
# Sort by id and take latest position
|
||
latest_record = max(records, key=lambda x: x.get("id", 0))
|
||
positions = latest_record.get("positions", {})
|
||
|
||
# Get daily prices
|
||
daily_prices = {}
|
||
for symbol in all_nasdaq_100_symbols:
|
||
if symbol in price_data:
|
||
symbol_prices = price_data[symbol]
|
||
if date in symbol_prices:
|
||
price_info = symbol_prices[date]
|
||
buy_price = price_info.get("1. buy price")
|
||
sell_price = price_info.get("4. sell price")
|
||
# Use closing (sell) price to calculate value
|
||
if sell_price is not None:
|
||
daily_prices[f'{symbol}_price'] = float(sell_price)
|
||
|
||
# Calculate portfolio value
|
||
cash = positions.get("CASH", 0.0)
|
||
portfolio_value = calculate_portfolio_value(positions, daily_prices, cash)
|
||
daily_values[date] = portfolio_value
|
||
|
||
return daily_values
|
||
|
||
|
||
def calculate_daily_returns(portfolio_values: Dict[str, float]) -> List[float]:
|
||
"""
|
||
Calculate daily returns
|
||
|
||
Args:
|
||
portfolio_values: Daily portfolio value dictionary
|
||
|
||
Returns:
|
||
List of daily returns
|
||
"""
|
||
if len(portfolio_values) < 2:
|
||
return []
|
||
|
||
# Sort by date
|
||
sorted_dates = sorted(portfolio_values.keys())
|
||
returns = []
|
||
|
||
for i in range(1, len(sorted_dates)):
|
||
prev_date = sorted_dates[i-1]
|
||
curr_date = sorted_dates[i]
|
||
|
||
prev_value = portfolio_values[prev_date]
|
||
curr_value = portfolio_values[curr_date]
|
||
|
||
if prev_value > 0:
|
||
daily_return = (curr_value - prev_value) / prev_value
|
||
returns.append(daily_return)
|
||
|
||
return returns
|
||
|
||
|
||
def calculate_sharpe_ratio(returns: List[float], risk_free_rate: float = 0.02) -> float:
|
||
"""
|
||
Calculate Sharpe ratio
|
||
|
||
Args:
|
||
returns: List of returns
|
||
risk_free_rate: Risk-free rate (annualized)
|
||
|
||
Returns:
|
||
Sharpe ratio
|
||
"""
|
||
if not returns or len(returns) < 2:
|
||
return 0.0
|
||
|
||
returns_array = np.array(returns)
|
||
|
||
# Calculate annualized return and volatility
|
||
mean_return = np.mean(returns_array)
|
||
std_return = np.std(returns_array, ddof=1)
|
||
|
||
# Assume 252 trading days per year
|
||
annualized_return = mean_return * 252
|
||
annualized_volatility = std_return * np.sqrt(252)
|
||
|
||
if annualized_volatility == 0:
|
||
return 0.0
|
||
|
||
# Calculate Sharpe ratio
|
||
sharpe_ratio = (annualized_return - risk_free_rate) / annualized_volatility
|
||
|
||
return sharpe_ratio
|
||
|
||
|
||
def calculate_max_drawdown(portfolio_values: Dict[str, float]) -> Tuple[float, str, str]:
|
||
"""
|
||
Calculate maximum drawdown
|
||
|
||
Args:
|
||
portfolio_values: Daily portfolio value dictionary
|
||
|
||
Returns:
|
||
Tuple of (maximum drawdown percentage, drawdown start date, drawdown end date)
|
||
"""
|
||
if not portfolio_values:
|
||
return 0.0, "", ""
|
||
|
||
# Sort by date
|
||
sorted_dates = sorted(portfolio_values.keys())
|
||
values = [portfolio_values[date] for date in sorted_dates]
|
||
|
||
max_drawdown = 0.0
|
||
peak_value = values[0]
|
||
peak_date = sorted_dates[0]
|
||
drawdown_start_date = ""
|
||
drawdown_end_date = ""
|
||
|
||
for i, (date, value) in enumerate(zip(sorted_dates, values)):
|
||
if value > peak_value:
|
||
peak_value = value
|
||
peak_date = date
|
||
|
||
drawdown = (peak_value - value) / peak_value
|
||
if drawdown > max_drawdown:
|
||
max_drawdown = drawdown
|
||
drawdown_start_date = peak_date
|
||
drawdown_end_date = date
|
||
|
||
return max_drawdown, drawdown_start_date, drawdown_end_date
|
||
|
||
|
||
def calculate_cumulative_return(portfolio_values: Dict[str, float]) -> float:
|
||
"""
|
||
Calculate cumulative return
|
||
|
||
Args:
|
||
portfolio_values: Daily portfolio value dictionary
|
||
|
||
Returns:
|
||
Cumulative return
|
||
"""
|
||
if not portfolio_values:
|
||
return 0.0
|
||
|
||
# Sort by date
|
||
sorted_dates = sorted(portfolio_values.keys())
|
||
initial_value = portfolio_values[sorted_dates[0]]
|
||
final_value = portfolio_values[sorted_dates[-1]]
|
||
|
||
if initial_value == 0:
|
||
return 0.0
|
||
|
||
cumulative_return = (final_value - initial_value) / initial_value
|
||
return cumulative_return
|
||
|
||
|
||
def calculate_annualized_return(portfolio_values: Dict[str, float]) -> float:
|
||
"""
|
||
Calculate annualized return
|
||
|
||
Args:
|
||
portfolio_values: Daily portfolio value dictionary
|
||
|
||
Returns:
|
||
Annualized return
|
||
"""
|
||
if not portfolio_values:
|
||
return 0.0
|
||
|
||
# Sort by date
|
||
sorted_dates = sorted(portfolio_values.keys())
|
||
initial_value = portfolio_values[sorted_dates[0]]
|
||
final_value = portfolio_values[sorted_dates[-1]]
|
||
|
||
if initial_value == 0:
|
||
return 0.0
|
||
|
||
# Calculate investment days
|
||
start_date = datetime.strptime(sorted_dates[0], "%Y-%m-%d")
|
||
end_date = datetime.strptime(sorted_dates[-1], "%Y-%m-%d")
|
||
days = (end_date - start_date).days
|
||
|
||
if days == 0:
|
||
return 0.0
|
||
|
||
# Calculate annualized return
|
||
total_return = (final_value - initial_value) / initial_value
|
||
annualized_return = (1 + total_return) ** (365 / days) - 1
|
||
|
||
return annualized_return
|
||
|
||
|
||
def calculate_volatility(returns: List[float]) -> float:
|
||
"""
|
||
Calculate annualized volatility
|
||
|
||
Args:
|
||
returns: List of returns
|
||
|
||
Returns:
|
||
Annualized volatility
|
||
"""
|
||
if not returns or len(returns) < 2:
|
||
return 0.0
|
||
|
||
returns_array = np.array(returns)
|
||
daily_volatility = np.std(returns_array, ddof=1)
|
||
|
||
# Annualize volatility (assuming 252 trading days)
|
||
annualized_volatility = daily_volatility * np.sqrt(252)
|
||
|
||
return annualized_volatility
|
||
|
||
|
||
def calculate_win_rate(returns: List[float]) -> float:
|
||
"""
|
||
Calculate win rate
|
||
|
||
Args:
|
||
returns: List of returns
|
||
|
||
Returns:
|
||
Win rate (percentage of positive return days)
|
||
"""
|
||
if not returns:
|
||
return 0.0
|
||
|
||
positive_days = sum(1 for r in returns if r > 0)
|
||
total_days = len(returns)
|
||
|
||
return positive_days / total_days
|
||
|
||
|
||
def calculate_profit_loss_ratio(returns: List[float]) -> float:
|
||
"""
|
||
Calculate profit/loss ratio
|
||
|
||
Args:
|
||
returns: List of returns
|
||
|
||
Returns:
|
||
Profit/loss ratio (average profit / average loss)
|
||
"""
|
||
if not returns:
|
||
return 0.0
|
||
|
||
positive_returns = [r for r in returns if r > 0]
|
||
negative_returns = [r for r in returns if r < 0]
|
||
|
||
if not positive_returns or not negative_returns:
|
||
return 0.0
|
||
|
||
avg_profit = np.mean(positive_returns)
|
||
avg_loss = abs(np.mean(negative_returns))
|
||
|
||
if avg_loss == 0:
|
||
return 0.0
|
||
|
||
return avg_profit / avg_loss
|
||
|
||
|
||
def calculate_all_metrics(modelname: str, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, any]:
|
||
"""
|
||
Calculate all performance metrics
|
||
|
||
Args:
|
||
modelname: Model name
|
||
start_date: Start date in YYYY-MM-DD format, uses earliest date if None
|
||
end_date: End date in YYYY-MM-DD format, uses latest date if None
|
||
|
||
Returns:
|
||
Dictionary containing all metrics
|
||
"""
|
||
# Get available date range if not specified
|
||
if start_date is None or end_date is None:
|
||
earliest_date, latest_date = get_available_date_range(modelname)
|
||
if not earliest_date or not latest_date:
|
||
return {
|
||
"error": "Unable to get available data date range",
|
||
"portfolio_values": {},
|
||
"daily_returns": [],
|
||
"sharpe_ratio": 0.0,
|
||
"max_drawdown": 0.0,
|
||
"max_drawdown_start": "",
|
||
"max_drawdown_end": "",
|
||
"cumulative_return": 0.0,
|
||
"annualized_return": 0.0,
|
||
"volatility": 0.0,
|
||
"win_rate": 0.0,
|
||
"profit_loss_ratio": 0.0,
|
||
"total_trading_days": 0,
|
||
"start_date": "",
|
||
"end_date": ""
|
||
}
|
||
|
||
if start_date is None:
|
||
start_date = earliest_date
|
||
if end_date is None:
|
||
end_date = latest_date
|
||
|
||
# 获取每日投资组合价值
|
||
portfolio_values = get_daily_portfolio_values(modelname, start_date, end_date)
|
||
|
||
if not portfolio_values:
|
||
return {
|
||
"error": "Unable to get portfolio data",
|
||
"portfolio_values": {},
|
||
"daily_returns": [],
|
||
"sharpe_ratio": 0.0,
|
||
"max_drawdown": 0.0,
|
||
"max_drawdown_start": "",
|
||
"max_drawdown_end": "",
|
||
"cumulative_return": 0.0,
|
||
"annualized_return": 0.0,
|
||
"volatility": 0.0,
|
||
"win_rate": 0.0,
|
||
"profit_loss_ratio": 0.0,
|
||
"total_trading_days": 0,
|
||
"start_date": "",
|
||
"end_date": ""
|
||
}
|
||
|
||
# Calculate daily returns
|
||
daily_returns = calculate_daily_returns(portfolio_values)
|
||
|
||
# Calculate various metrics
|
||
sharpe_ratio = calculate_sharpe_ratio(daily_returns)
|
||
max_drawdown, drawdown_start, drawdown_end = calculate_max_drawdown(portfolio_values)
|
||
cumulative_return = calculate_cumulative_return(portfolio_values)
|
||
annualized_return = calculate_annualized_return(portfolio_values)
|
||
volatility = calculate_volatility(daily_returns)
|
||
win_rate = calculate_win_rate(daily_returns)
|
||
profit_loss_ratio = calculate_profit_loss_ratio(daily_returns)
|
||
|
||
# Get date range
|
||
sorted_dates = sorted(portfolio_values.keys())
|
||
start_date_actual = sorted_dates[0] if sorted_dates else ""
|
||
end_date_actual = sorted_dates[-1] if sorted_dates else ""
|
||
|
||
return {
|
||
"portfolio_values": portfolio_values,
|
||
"daily_returns": daily_returns,
|
||
"sharpe_ratio": round(sharpe_ratio, 4),
|
||
"max_drawdown": round(max_drawdown, 4),
|
||
"max_drawdown_start": drawdown_start,
|
||
"max_drawdown_end": drawdown_end,
|
||
"cumulative_return": round(cumulative_return, 4),
|
||
"annualized_return": round(annualized_return, 4),
|
||
"volatility": round(volatility, 4),
|
||
"win_rate": round(win_rate, 4),
|
||
"profit_loss_ratio": round(profit_loss_ratio, 4),
|
||
"total_trading_days": len(portfolio_values),
|
||
"start_date": start_date_actual,
|
||
"end_date": end_date_actual
|
||
}
|
||
|
||
|
||
def print_performance_report(metrics: Dict[str, any]) -> None:
|
||
"""
|
||
Print performance report
|
||
|
||
Args:
|
||
metrics: Dictionary containing all metrics
|
||
"""
|
||
print("=" * 60)
|
||
print("Portfolio Performance Report")
|
||
print("=" * 60)
|
||
|
||
if "error" in metrics:
|
||
print(f"Error: {metrics['error']}")
|
||
return
|
||
|
||
print(f"Analysis Period: {metrics['start_date']} to {metrics['end_date']}")
|
||
print(f"Trading Days: {metrics['total_trading_days']}")
|
||
print()
|
||
|
||
print("Return Metrics:")
|
||
print(f" Cumulative Return: {metrics['cumulative_return']:.2%}")
|
||
print(f" Annualized Return: {metrics['annualized_return']:.2%}")
|
||
print(f" Annualized Volatility: {metrics['volatility']:.2%}")
|
||
print()
|
||
|
||
print("Risk Metrics:")
|
||
print(f" Sharpe Ratio: {metrics['sharpe_ratio']:.4f}")
|
||
print(f" Maximum Drawdown: {metrics['max_drawdown']:.2%}")
|
||
if metrics['max_drawdown_start'] and metrics['max_drawdown_end']:
|
||
print(f" Drawdown Period: {metrics['max_drawdown_start']} to {metrics['max_drawdown_end']}")
|
||
print()
|
||
|
||
print("Trading Statistics:")
|
||
print(f" Win Rate: {metrics['win_rate']:.2%}")
|
||
print(f" Profit/Loss Ratio: {metrics['profit_loss_ratio']:.4f}")
|
||
print()
|
||
|
||
# Show portfolio value changes
|
||
portfolio_values = metrics['portfolio_values']
|
||
if portfolio_values:
|
||
sorted_dates = sorted(portfolio_values.keys())
|
||
initial_value = portfolio_values[sorted_dates[0]]
|
||
final_value = portfolio_values[sorted_dates[-1]]
|
||
|
||
print("Portfolio Value:")
|
||
print(f" Initial Value: ${initial_value:,.2f}")
|
||
print(f" Final Value: ${final_value:,.2f}")
|
||
print(f" Value Change: ${final_value - initial_value:,.2f}")
|
||
|
||
|
||
def get_next_id(filepath: Path) -> int:
|
||
"""
|
||
Get next ID number
|
||
|
||
Args:
|
||
filepath: JSONL file path
|
||
|
||
Returns:
|
||
Next ID number
|
||
"""
|
||
if not filepath.exists():
|
||
return 0
|
||
|
||
max_id = -1
|
||
with filepath.open("r", encoding="utf-8") as f:
|
||
for line in f:
|
||
if not line.strip():
|
||
continue
|
||
try:
|
||
data = json.loads(line)
|
||
current_id = data.get("id", -1)
|
||
if current_id > max_id:
|
||
max_id = current_id
|
||
except Exception:
|
||
continue
|
||
|
||
return max_id + 1
|
||
|
||
|
||
def save_metrics_to_jsonl(metrics: Dict[str, any], modelname: str, output_dir: Optional[str] = None) -> str:
|
||
"""
|
||
Incrementally save metrics to JSONL format
|
||
|
||
Args:
|
||
metrics: Dictionary containing all metrics
|
||
modelname: Model name
|
||
output_dir: Output directory, defaults to data/agent_data/{modelname}/metrics/
|
||
|
||
Returns:
|
||
Path to saved file
|
||
"""
|
||
base_dir = Path(__file__).resolve().parents[1]
|
||
|
||
if output_dir is None:
|
||
output_dir = base_dir / "data" / "agent_data" / modelname / "metrics"
|
||
else:
|
||
output_dir = Path(output_dir)
|
||
|
||
# Create directory if it doesn't exist
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Use fixed filename
|
||
filename = "performance_metrics.jsonl"
|
||
filepath = output_dir / filename
|
||
|
||
# Get next ID number
|
||
next_id = get_next_id(filepath)
|
||
|
||
# Prepare data to save
|
||
save_data = {
|
||
"id": next_id,
|
||
"model_name": modelname,
|
||
"analysis_period": {
|
||
"start_date": metrics.get("start_date", ""),
|
||
"end_date": metrics.get("end_date", ""),
|
||
"total_trading_days": metrics.get("total_trading_days", 0)
|
||
},
|
||
"performance_metrics": {
|
||
"sharpe_ratio": metrics.get("sharpe_ratio", 0.0),
|
||
"max_drawdown": metrics.get("max_drawdown", 0.0),
|
||
"max_drawdown_period": {
|
||
"start_date": metrics.get("max_drawdown_start", ""),
|
||
"end_date": metrics.get("max_drawdown_end", "")
|
||
},
|
||
"cumulative_return": metrics.get("cumulative_return", 0.0),
|
||
"annualized_return": metrics.get("annualized_return", 0.0),
|
||
"volatility": metrics.get("volatility", 0.0),
|
||
"win_rate": metrics.get("win_rate", 0.0),
|
||
"profit_loss_ratio": metrics.get("profit_loss_ratio", 0.0)
|
||
},
|
||
"portfolio_summary": {}
|
||
}
|
||
|
||
# Add portfolio value summary
|
||
portfolio_values = metrics.get("portfolio_values", {})
|
||
if portfolio_values:
|
||
sorted_dates = sorted(portfolio_values.keys())
|
||
initial_value = portfolio_values[sorted_dates[0]]
|
||
final_value = portfolio_values[sorted_dates[-1]]
|
||
|
||
save_data["portfolio_summary"] = {
|
||
"initial_value": initial_value,
|
||
"final_value": final_value,
|
||
"value_change": final_value - initial_value,
|
||
"value_change_percent": ((final_value - initial_value) / initial_value) if initial_value > 0 else 0.0
|
||
}
|
||
|
||
# Incrementally save to JSONL file (append mode)
|
||
with filepath.open("a", encoding="utf-8") as f:
|
||
f.write(json.dumps(save_data, ensure_ascii=False) + "\n")
|
||
|
||
return str(filepath)
|
||
|
||
|
||
def get_latest_metrics(modelname: str, output_dir: Optional[str] = None) -> Optional[Dict[str, any]]:
|
||
"""
|
||
Get latest performance metrics record
|
||
|
||
Args:
|
||
modelname: Model name
|
||
output_dir: Output directory, defaults to data/agent_data/{modelname}/metrics/
|
||
|
||
Returns:
|
||
Latest metrics record, or None if no records exist
|
||
"""
|
||
base_dir = Path(__file__).resolve().parents[1]
|
||
|
||
if output_dir is None:
|
||
output_dir = base_dir / "data" / "agent_data" / modelname / "metrics"
|
||
else:
|
||
output_dir = Path(output_dir)
|
||
|
||
filepath = output_dir / "performance_metrics.jsonl"
|
||
|
||
if not filepath.exists():
|
||
return None
|
||
|
||
latest_record = None
|
||
max_id = -1
|
||
|
||
with filepath.open("r", encoding="utf-8") as f:
|
||
for line in f:
|
||
if not line.strip():
|
||
continue
|
||
try:
|
||
data = json.loads(line)
|
||
current_id = data.get("id", -1)
|
||
if current_id > max_id:
|
||
max_id = current_id
|
||
latest_record = data
|
||
except Exception:
|
||
continue
|
||
|
||
return latest_record
|
||
|
||
|
||
def get_metrics_history(modelname: str, output_dir: Optional[str] = None, limit: Optional[int] = None) -> List[Dict[str, any]]:
|
||
"""
|
||
Get performance metrics history
|
||
|
||
Args:
|
||
modelname: Model name
|
||
output_dir: Output directory, defaults to data/agent_data/{modelname}/metrics/
|
||
limit: Limit number of records returned, None returns all records
|
||
|
||
Returns:
|
||
List of metrics records, sorted by ID
|
||
"""
|
||
base_dir = Path(__file__).resolve().parents[1]
|
||
|
||
if output_dir is None:
|
||
output_dir = base_dir / "data" / "agent_data" / modelname / "metrics"
|
||
else:
|
||
output_dir = Path(output_dir)
|
||
|
||
filepath = output_dir / "performance_metrics.jsonl"
|
||
|
||
if not filepath.exists():
|
||
return []
|
||
|
||
records = []
|
||
|
||
with filepath.open("r", encoding="utf-8") as f:
|
||
for line in f:
|
||
if not line.strip():
|
||
continue
|
||
try:
|
||
data = json.loads(line)
|
||
records.append(data)
|
||
except Exception:
|
||
continue
|
||
|
||
# Sort by ID
|
||
records.sort(key=lambda x: x.get("id", 0))
|
||
|
||
# Return latest records if limit specified
|
||
if limit is not None and limit > 0:
|
||
records = records[-limit:]
|
||
|
||
return records
|
||
|
||
|
||
def print_metrics_summary(modelname: str, output_dir: Optional[str] = None) -> None:
|
||
"""
|
||
Print performance metrics summary
|
||
|
||
Args:
|
||
modelname: Model name
|
||
output_dir: Output directory
|
||
"""
|
||
print(f"📊 Model '{modelname}' Performance Metrics Summary")
|
||
print("=" * 60)
|
||
|
||
# Get history records
|
||
history = get_metrics_history(modelname, output_dir)
|
||
|
||
if not history:
|
||
print("❌ No history records found")
|
||
return
|
||
|
||
print(f"📈 Total Records: {len(history)}")
|
||
|
||
# Show latest record
|
||
latest = history[-1]
|
||
print(f"🕒 Latest Record (ID: {latest['id']}):")
|
||
print(f" Analysis Period: {latest['analysis_period']['start_date']} to {latest['analysis_period']['end_date']}")
|
||
print(f" Trading Days: {latest['analysis_period']['total_trading_days']}")
|
||
|
||
metrics = latest['performance_metrics']
|
||
print(f" Sharpe Ratio: {metrics['sharpe_ratio']}")
|
||
print(f" Maximum Drawdown: {metrics['max_drawdown']:.2%}")
|
||
print(f" Cumulative Return: {metrics['cumulative_return']:.2%}")
|
||
print(f" Annualized Return: {metrics['annualized_return']:.2%}")
|
||
|
||
# Show trends (if multiple records exist)
|
||
if len(history) > 1:
|
||
print(f"\n📊 Trend Analysis (Last {min(5, len(history))} Records):")
|
||
|
||
recent_records = history[-5:] if len(history) >= 5 else history
|
||
|
||
print("ID | Time | Cum Ret | Ann Ret | Sharpe")
|
||
print("-" * 70)
|
||
|
||
for record in recent_records:
|
||
metrics = record['performance_metrics']
|
||
print(f"{record['id']:2d} | {metrics['cumulative_return']:8.2%} | {metrics['annualized_return']:8.2%} | {metrics['sharpe_ratio']:8.4f}")
|
||
|
||
|
||
def calculate_and_save_metrics(modelname: str, start_date: Optional[str] = None, end_date: Optional[str] = None, output_dir: Optional[str] = None, print_report: bool = True) -> Dict[str, any]:
|
||
"""
|
||
Entry function to calculate all metrics and save in JSONL format
|
||
|
||
Args:
|
||
modelname: Model name (SIGNATURE)
|
||
start_date: Start date in YYYY-MM-DD format, uses earliest date if None
|
||
end_date: End date in YYYY-MM-DD format, uses latest date if None
|
||
output_dir: Output directory, defaults to data/agent_data/{modelname}/metrics/
|
||
print_report: Whether to print report
|
||
|
||
Returns:
|
||
Dictionary containing all metrics and saved file path
|
||
"""
|
||
print(f"Analyzing model: {modelname}")
|
||
|
||
# Show date range to be used if not specified
|
||
if start_date is None or end_date is None:
|
||
earliest_date, latest_date = get_available_date_range(modelname)
|
||
if earliest_date and latest_date:
|
||
if start_date is None:
|
||
start_date = earliest_date
|
||
print(f"Using default start date: {start_date}")
|
||
if end_date is None:
|
||
end_date = latest_date
|
||
print(f"Using default end date: {end_date}")
|
||
else:
|
||
print("❌ Unable to get available data date range")
|
||
|
||
# Calculate all metrics
|
||
metrics = calculate_all_metrics(modelname, start_date, end_date)
|
||
|
||
if "error" in metrics:
|
||
print(f"Error: {metrics['error']}")
|
||
return metrics
|
||
|
||
# Save in JSONL format
|
||
try:
|
||
saved_file = save_metrics_to_jsonl(metrics, modelname, output_dir)
|
||
print(f"Metrics saved to: {saved_file}")
|
||
metrics["saved_file"] = saved_file
|
||
|
||
# Get ID of just saved record
|
||
latest_record = get_latest_metrics(modelname, output_dir)
|
||
if latest_record:
|
||
metrics["record_id"] = latest_record["id"]
|
||
print(f"Record ID: {latest_record['id']}")
|
||
except Exception as e:
|
||
print(f"Error saving file: {e}")
|
||
metrics["save_error"] = str(e)
|
||
|
||
# Print report
|
||
if print_report:
|
||
print_performance_report(metrics)
|
||
|
||
return metrics
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Test code
|
||
# 测试代码
|
||
modelname = get_config_value("SIGNATURE")
|
||
if modelname is None:
|
||
print("错误: 未设置 SIGNATURE 环境变量")
|
||
print("请设置环境变量 SIGNATURE,例如: export SIGNATURE=claude-3.7-sonnet")
|
||
sys.exit(1)
|
||
|
||
# 使用入口函数计算和保存指标
|
||
result = calculate_and_save_metrics(modelname)
|