import os import json import numpy as np import pandas as pd from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple import sys # Add project root directory to Python path to allow running this file from subdirectories project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if project_root not in sys.path: sys.path.insert(0, project_root) from tools.price_tools import ( get_yesterday_date, get_open_prices, get_yesterday_open_and_close_price, get_today_init_position, get_latest_position, all_nasdaq_100_symbols ) from tools.general_tools import get_config_value def calculate_portfolio_value(positions: Dict[str, float], prices: Dict[str, Optional[float]], cash: float = 0.0) -> float: """ Calculate total portfolio value Args: positions: Position dictionary in format {symbol: shares} prices: Price dictionary in format {symbol_price: price} cash: Cash balance Returns: Total portfolio value """ total_value = cash for symbol, shares in positions.items(): if symbol == "CASH": continue price_key = f'{symbol}_price' price = prices.get(price_key) if price is not None and shares > 0: total_value += shares * price return total_value def get_available_date_range(modelname: str) -> Tuple[str, str]: """ Get available data date range Args: modelname: Model name Returns: Tuple of (earliest date, latest date) in YYYY-MM-DD format """ base_dir = Path(__file__).resolve().parents[1] position_file = base_dir / "data" / "agent_data" / modelname / "position" / "position.jsonl" if not position_file.exists(): return "", "" dates = [] with position_file.open("r", encoding="utf-8") as f: for line in f: if not line.strip(): continue try: doc = json.loads(line) date = doc.get("date") if date: dates.append(date) except Exception: continue if not dates: return "", "" dates.sort() return dates[0], dates[-1] def get_daily_portfolio_values(modelname: str, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, float]: """ Get daily portfolio values Args: modelname: Model name start_date: Start date in YYYY-MM-DD format, uses earliest date if None end_date: End date in YYYY-MM-DD format, uses latest date if None Returns: Dictionary of daily portfolio values in format {date: portfolio_value} """ base_dir = Path(__file__).resolve().parents[1] position_file = base_dir / "data" / "agent_data" / modelname / "position" / "position.jsonl" merged_file = base_dir / "data" / "merged.jsonl" if not position_file.exists() or not merged_file.exists(): return {} # Get available date range if not specified if start_date is None or end_date is None: earliest_date, latest_date = get_available_date_range(modelname) if not earliest_date or not latest_date: return {} if start_date is None: start_date = earliest_date if end_date is None: end_date = latest_date # Read position data position_data = [] with position_file.open("r", encoding="utf-8") as f: for line in f: if not line.strip(): continue try: doc = json.loads(line) position_data.append(doc) except Exception: continue # Read price data price_data = {} with merged_file.open("r", encoding="utf-8") as f: for line in f: if not line.strip(): continue try: doc = json.loads(line) meta = doc.get("Meta Data", {}) symbol = meta.get("2. Symbol") if symbol: price_data[symbol] = doc.get("Time Series (Daily)", {}) except Exception: continue # Calculate daily portfolio values daily_values = {} # Group position data by date positions_by_date = {} for record in position_data: date = record.get("date") if date: if date not in positions_by_date: positions_by_date[date] = [] positions_by_date[date].append(record) # For each date, sort records by id and take latest position for date, records in positions_by_date.items(): if start_date and date < start_date: continue if end_date and date > end_date: continue # Sort by id and take latest position latest_record = max(records, key=lambda x: x.get("id", 0)) positions = latest_record.get("positions", {}) # Get daily prices daily_prices = {} for symbol in all_nasdaq_100_symbols: if symbol in price_data: symbol_prices = price_data[symbol] if date in symbol_prices: price_info = symbol_prices[date] buy_price = price_info.get("1. buy price") sell_price = price_info.get("4. sell price") # Use closing (sell) price to calculate value if sell_price is not None: daily_prices[f'{symbol}_price'] = float(sell_price) # Calculate portfolio value cash = positions.get("CASH", 0.0) portfolio_value = calculate_portfolio_value(positions, daily_prices, cash) daily_values[date] = portfolio_value return daily_values def calculate_daily_returns(portfolio_values: Dict[str, float]) -> List[float]: """ Calculate daily returns Args: portfolio_values: Daily portfolio value dictionary Returns: List of daily returns """ if len(portfolio_values) < 2: return [] # Sort by date sorted_dates = sorted(portfolio_values.keys()) returns = [] for i in range(1, len(sorted_dates)): prev_date = sorted_dates[i-1] curr_date = sorted_dates[i] prev_value = portfolio_values[prev_date] curr_value = portfolio_values[curr_date] if prev_value > 0: daily_return = (curr_value - prev_value) / prev_value returns.append(daily_return) return returns def calculate_sharpe_ratio(returns: List[float], risk_free_rate: float = 0.02) -> float: """ Calculate Sharpe ratio Args: returns: List of returns risk_free_rate: Risk-free rate (annualized) Returns: Sharpe ratio """ if not returns or len(returns) < 2: return 0.0 returns_array = np.array(returns) # Calculate annualized return and volatility mean_return = np.mean(returns_array) std_return = np.std(returns_array, ddof=1) # Assume 252 trading days per year annualized_return = mean_return * 252 annualized_volatility = std_return * np.sqrt(252) if annualized_volatility == 0: return 0.0 # Calculate Sharpe ratio sharpe_ratio = (annualized_return - risk_free_rate) / annualized_volatility return sharpe_ratio def calculate_max_drawdown(portfolio_values: Dict[str, float]) -> Tuple[float, str, str]: """ Calculate maximum drawdown Args: portfolio_values: Daily portfolio value dictionary Returns: Tuple of (maximum drawdown percentage, drawdown start date, drawdown end date) """ if not portfolio_values: return 0.0, "", "" # Sort by date sorted_dates = sorted(portfolio_values.keys()) values = [portfolio_values[date] for date in sorted_dates] max_drawdown = 0.0 peak_value = values[0] peak_date = sorted_dates[0] drawdown_start_date = "" drawdown_end_date = "" for i, (date, value) in enumerate(zip(sorted_dates, values)): if value > peak_value: peak_value = value peak_date = date drawdown = (peak_value - value) / peak_value if drawdown > max_drawdown: max_drawdown = drawdown drawdown_start_date = peak_date drawdown_end_date = date return max_drawdown, drawdown_start_date, drawdown_end_date def calculate_cumulative_return(portfolio_values: Dict[str, float]) -> float: """ Calculate cumulative return Args: portfolio_values: Daily portfolio value dictionary Returns: Cumulative return """ if not portfolio_values: return 0.0 # Sort by date sorted_dates = sorted(portfolio_values.keys()) initial_value = portfolio_values[sorted_dates[0]] final_value = portfolio_values[sorted_dates[-1]] if initial_value == 0: return 0.0 cumulative_return = (final_value - initial_value) / initial_value return cumulative_return def calculate_annualized_return(portfolio_values: Dict[str, float]) -> float: """ Calculate annualized return Args: portfolio_values: Daily portfolio value dictionary Returns: Annualized return """ if not portfolio_values: return 0.0 # Sort by date sorted_dates = sorted(portfolio_values.keys()) initial_value = portfolio_values[sorted_dates[0]] final_value = portfolio_values[sorted_dates[-1]] if initial_value == 0: return 0.0 # Calculate investment days start_date = datetime.strptime(sorted_dates[0], "%Y-%m-%d") end_date = datetime.strptime(sorted_dates[-1], "%Y-%m-%d") days = (end_date - start_date).days if days == 0: return 0.0 # Calculate annualized return total_return = (final_value - initial_value) / initial_value annualized_return = (1 + total_return) ** (365 / days) - 1 return annualized_return def calculate_volatility(returns: List[float]) -> float: """ Calculate annualized volatility Args: returns: List of returns Returns: Annualized volatility """ if not returns or len(returns) < 2: return 0.0 returns_array = np.array(returns) daily_volatility = np.std(returns_array, ddof=1) # Annualize volatility (assuming 252 trading days) annualized_volatility = daily_volatility * np.sqrt(252) return annualized_volatility def calculate_win_rate(returns: List[float]) -> float: """ Calculate win rate Args: returns: List of returns Returns: Win rate (percentage of positive return days) """ if not returns: return 0.0 positive_days = sum(1 for r in returns if r > 0) total_days = len(returns) return positive_days / total_days def calculate_profit_loss_ratio(returns: List[float]) -> float: """ Calculate profit/loss ratio Args: returns: List of returns Returns: Profit/loss ratio (average profit / average loss) """ if not returns: return 0.0 positive_returns = [r for r in returns if r > 0] negative_returns = [r for r in returns if r < 0] if not positive_returns or not negative_returns: return 0.0 avg_profit = np.mean(positive_returns) avg_loss = abs(np.mean(negative_returns)) if avg_loss == 0: return 0.0 return avg_profit / avg_loss def calculate_all_metrics(modelname: str, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, any]: """ Calculate all performance metrics Args: modelname: Model name start_date: Start date in YYYY-MM-DD format, uses earliest date if None end_date: End date in YYYY-MM-DD format, uses latest date if None Returns: Dictionary containing all metrics """ # Get available date range if not specified if start_date is None or end_date is None: earliest_date, latest_date = get_available_date_range(modelname) if not earliest_date or not latest_date: return { "error": "Unable to get available data date range", "portfolio_values": {}, "daily_returns": [], "sharpe_ratio": 0.0, "max_drawdown": 0.0, "max_drawdown_start": "", "max_drawdown_end": "", "cumulative_return": 0.0, "annualized_return": 0.0, "volatility": 0.0, "win_rate": 0.0, "profit_loss_ratio": 0.0, "total_trading_days": 0, "start_date": "", "end_date": "" } if start_date is None: start_date = earliest_date if end_date is None: end_date = latest_date # 获取每日投资组合价值 portfolio_values = get_daily_portfolio_values(modelname, start_date, end_date) if not portfolio_values: return { "error": "Unable to get portfolio data", "portfolio_values": {}, "daily_returns": [], "sharpe_ratio": 0.0, "max_drawdown": 0.0, "max_drawdown_start": "", "max_drawdown_end": "", "cumulative_return": 0.0, "annualized_return": 0.0, "volatility": 0.0, "win_rate": 0.0, "profit_loss_ratio": 0.0, "total_trading_days": 0, "start_date": "", "end_date": "" } # Calculate daily returns daily_returns = calculate_daily_returns(portfolio_values) # Calculate various metrics sharpe_ratio = calculate_sharpe_ratio(daily_returns) max_drawdown, drawdown_start, drawdown_end = calculate_max_drawdown(portfolio_values) cumulative_return = calculate_cumulative_return(portfolio_values) annualized_return = calculate_annualized_return(portfolio_values) volatility = calculate_volatility(daily_returns) win_rate = calculate_win_rate(daily_returns) profit_loss_ratio = calculate_profit_loss_ratio(daily_returns) # Get date range sorted_dates = sorted(portfolio_values.keys()) start_date_actual = sorted_dates[0] if sorted_dates else "" end_date_actual = sorted_dates[-1] if sorted_dates else "" return { "portfolio_values": portfolio_values, "daily_returns": daily_returns, "sharpe_ratio": round(sharpe_ratio, 4), "max_drawdown": round(max_drawdown, 4), "max_drawdown_start": drawdown_start, "max_drawdown_end": drawdown_end, "cumulative_return": round(cumulative_return, 4), "annualized_return": round(annualized_return, 4), "volatility": round(volatility, 4), "win_rate": round(win_rate, 4), "profit_loss_ratio": round(profit_loss_ratio, 4), "total_trading_days": len(portfolio_values), "start_date": start_date_actual, "end_date": end_date_actual } def print_performance_report(metrics: Dict[str, any]) -> None: """ Print performance report Args: metrics: Dictionary containing all metrics """ print("=" * 60) print("Portfolio Performance Report") print("=" * 60) if "error" in metrics: print(f"Error: {metrics['error']}") return print(f"Analysis Period: {metrics['start_date']} to {metrics['end_date']}") print(f"Trading Days: {metrics['total_trading_days']}") print() print("Return Metrics:") print(f" Cumulative Return: {metrics['cumulative_return']:.2%}") print(f" Annualized Return: {metrics['annualized_return']:.2%}") print(f" Annualized Volatility: {metrics['volatility']:.2%}") print() print("Risk Metrics:") print(f" Sharpe Ratio: {metrics['sharpe_ratio']:.4f}") print(f" Maximum Drawdown: {metrics['max_drawdown']:.2%}") if metrics['max_drawdown_start'] and metrics['max_drawdown_end']: print(f" Drawdown Period: {metrics['max_drawdown_start']} to {metrics['max_drawdown_end']}") print() print("Trading Statistics:") print(f" Win Rate: {metrics['win_rate']:.2%}") print(f" Profit/Loss Ratio: {metrics['profit_loss_ratio']:.4f}") print() # Show portfolio value changes portfolio_values = metrics['portfolio_values'] if portfolio_values: sorted_dates = sorted(portfolio_values.keys()) initial_value = portfolio_values[sorted_dates[0]] final_value = portfolio_values[sorted_dates[-1]] print("Portfolio Value:") print(f" Initial Value: ${initial_value:,.2f}") print(f" Final Value: ${final_value:,.2f}") print(f" Value Change: ${final_value - initial_value:,.2f}") def get_next_id(filepath: Path) -> int: """ Get next ID number Args: filepath: JSONL file path Returns: Next ID number """ if not filepath.exists(): return 0 max_id = -1 with filepath.open("r", encoding="utf-8") as f: for line in f: if not line.strip(): continue try: data = json.loads(line) current_id = data.get("id", -1) if current_id > max_id: max_id = current_id except Exception: continue return max_id + 1 def save_metrics_to_jsonl(metrics: Dict[str, any], modelname: str, output_dir: Optional[str] = None) -> str: """ Incrementally save metrics to JSONL format Args: metrics: Dictionary containing all metrics modelname: Model name output_dir: Output directory, defaults to data/agent_data/{modelname}/metrics/ Returns: Path to saved file """ base_dir = Path(__file__).resolve().parents[1] if output_dir is None: output_dir = base_dir / "data" / "agent_data" / modelname / "metrics" else: output_dir = Path(output_dir) # Create directory if it doesn't exist output_dir.mkdir(parents=True, exist_ok=True) # Use fixed filename filename = "performance_metrics.jsonl" filepath = output_dir / filename # Get next ID number next_id = get_next_id(filepath) # Prepare data to save save_data = { "id": next_id, "model_name": modelname, "analysis_period": { "start_date": metrics.get("start_date", ""), "end_date": metrics.get("end_date", ""), "total_trading_days": metrics.get("total_trading_days", 0) }, "performance_metrics": { "sharpe_ratio": metrics.get("sharpe_ratio", 0.0), "max_drawdown": metrics.get("max_drawdown", 0.0), "max_drawdown_period": { "start_date": metrics.get("max_drawdown_start", ""), "end_date": metrics.get("max_drawdown_end", "") }, "cumulative_return": metrics.get("cumulative_return", 0.0), "annualized_return": metrics.get("annualized_return", 0.0), "volatility": metrics.get("volatility", 0.0), "win_rate": metrics.get("win_rate", 0.0), "profit_loss_ratio": metrics.get("profit_loss_ratio", 0.0) }, "portfolio_summary": {} } # Add portfolio value summary portfolio_values = metrics.get("portfolio_values", {}) if portfolio_values: sorted_dates = sorted(portfolio_values.keys()) initial_value = portfolio_values[sorted_dates[0]] final_value = portfolio_values[sorted_dates[-1]] save_data["portfolio_summary"] = { "initial_value": initial_value, "final_value": final_value, "value_change": final_value - initial_value, "value_change_percent": ((final_value - initial_value) / initial_value) if initial_value > 0 else 0.0 } # Incrementally save to JSONL file (append mode) with filepath.open("a", encoding="utf-8") as f: f.write(json.dumps(save_data, ensure_ascii=False) + "\n") return str(filepath) def get_latest_metrics(modelname: str, output_dir: Optional[str] = None) -> Optional[Dict[str, any]]: """ Get latest performance metrics record Args: modelname: Model name output_dir: Output directory, defaults to data/agent_data/{modelname}/metrics/ Returns: Latest metrics record, or None if no records exist """ base_dir = Path(__file__).resolve().parents[1] if output_dir is None: output_dir = base_dir / "data" / "agent_data" / modelname / "metrics" else: output_dir = Path(output_dir) filepath = output_dir / "performance_metrics.jsonl" if not filepath.exists(): return None latest_record = None max_id = -1 with filepath.open("r", encoding="utf-8") as f: for line in f: if not line.strip(): continue try: data = json.loads(line) current_id = data.get("id", -1) if current_id > max_id: max_id = current_id latest_record = data except Exception: continue return latest_record def get_metrics_history(modelname: str, output_dir: Optional[str] = None, limit: Optional[int] = None) -> List[Dict[str, any]]: """ Get performance metrics history Args: modelname: Model name output_dir: Output directory, defaults to data/agent_data/{modelname}/metrics/ limit: Limit number of records returned, None returns all records Returns: List of metrics records, sorted by ID """ base_dir = Path(__file__).resolve().parents[1] if output_dir is None: output_dir = base_dir / "data" / "agent_data" / modelname / "metrics" else: output_dir = Path(output_dir) filepath = output_dir / "performance_metrics.jsonl" if not filepath.exists(): return [] records = [] with filepath.open("r", encoding="utf-8") as f: for line in f: if not line.strip(): continue try: data = json.loads(line) records.append(data) except Exception: continue # Sort by ID records.sort(key=lambda x: x.get("id", 0)) # Return latest records if limit specified if limit is not None and limit > 0: records = records[-limit:] return records def print_metrics_summary(modelname: str, output_dir: Optional[str] = None) -> None: """ Print performance metrics summary Args: modelname: Model name output_dir: Output directory """ print(f"📊 Model '{modelname}' Performance Metrics Summary") print("=" * 60) # Get history records history = get_metrics_history(modelname, output_dir) if not history: print("❌ No history records found") return print(f"📈 Total Records: {len(history)}") # Show latest record latest = history[-1] print(f"🕒 Latest Record (ID: {latest['id']}):") print(f" Analysis Period: {latest['analysis_period']['start_date']} to {latest['analysis_period']['end_date']}") print(f" Trading Days: {latest['analysis_period']['total_trading_days']}") metrics = latest['performance_metrics'] print(f" Sharpe Ratio: {metrics['sharpe_ratio']}") print(f" Maximum Drawdown: {metrics['max_drawdown']:.2%}") print(f" Cumulative Return: {metrics['cumulative_return']:.2%}") print(f" Annualized Return: {metrics['annualized_return']:.2%}") # Show trends (if multiple records exist) if len(history) > 1: print(f"\n📊 Trend Analysis (Last {min(5, len(history))} Records):") recent_records = history[-5:] if len(history) >= 5 else history print("ID | Time | Cum Ret | Ann Ret | Sharpe") print("-" * 70) for record in recent_records: metrics = record['performance_metrics'] print(f"{record['id']:2d} | {metrics['cumulative_return']:8.2%} | {metrics['annualized_return']:8.2%} | {metrics['sharpe_ratio']:8.4f}") def calculate_and_save_metrics(modelname: str, start_date: Optional[str] = None, end_date: Optional[str] = None, output_dir: Optional[str] = None, print_report: bool = True) -> Dict[str, any]: """ Entry function to calculate all metrics and save in JSONL format Args: modelname: Model name (SIGNATURE) start_date: Start date in YYYY-MM-DD format, uses earliest date if None end_date: End date in YYYY-MM-DD format, uses latest date if None output_dir: Output directory, defaults to data/agent_data/{modelname}/metrics/ print_report: Whether to print report Returns: Dictionary containing all metrics and saved file path """ print(f"Analyzing model: {modelname}") # Show date range to be used if not specified if start_date is None or end_date is None: earliest_date, latest_date = get_available_date_range(modelname) if earliest_date and latest_date: if start_date is None: start_date = earliest_date print(f"Using default start date: {start_date}") if end_date is None: end_date = latest_date print(f"Using default end date: {end_date}") else: print("❌ Unable to get available data date range") # Calculate all metrics metrics = calculate_all_metrics(modelname, start_date, end_date) if "error" in metrics: print(f"Error: {metrics['error']}") return metrics # Save in JSONL format try: saved_file = save_metrics_to_jsonl(metrics, modelname, output_dir) print(f"Metrics saved to: {saved_file}") metrics["saved_file"] = saved_file # Get ID of just saved record latest_record = get_latest_metrics(modelname, output_dir) if latest_record: metrics["record_id"] = latest_record["id"] print(f"Record ID: {latest_record['id']}") except Exception as e: print(f"Error saving file: {e}") metrics["save_error"] = str(e) # Print report if print_report: print_performance_report(metrics) return metrics if __name__ == "__main__": # Test code # 测试代码 modelname = get_config_value("SIGNATURE") if modelname is None: print("错误: 未设置 SIGNATURE 环境变量") print("请设置环境变量 SIGNATURE,例如: export SIGNATURE=claude-3.7-sonnet") sys.exit(1) # 使用入口函数计算和保存指标 result = calculate_and_save_metrics(modelname)