Spaces:
Running
Running
| from __future__ import annotations | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import List, Tuple, Dict, Any | |
| import matplotlib.pyplot as plt | |
| from .engine import DEFAULT_CASH | |
| def calculate_trade_execution( | |
| signals_df, dates: List, prices: List[float], starting_cash: float | |
| ) -> Tuple[List, List[float], List, List[float]]: | |
| """Calculate executed buy/sell points to overlay on portfolio chart.""" | |
| buy_dates, buy_values, sell_dates, sell_values = [], [], [], [] | |
| current_shares = 0 | |
| current_cash = starting_cash | |
| for i, date in enumerate(dates): | |
| matching_signals = signals_df[signals_df["date"].dt.date == date] | |
| if matching_signals.empty: | |
| continue | |
| signal_row = matching_signals.iloc[0] | |
| current_price = prices[i] | |
| if signal_row["trading_signal"] == "BUY": | |
| position_percent = signal_row["position_size"] / 100.0 | |
| target_cash = current_cash * position_percent | |
| shares_bought = int(target_cash / current_price) if current_cash > current_price else 0 | |
| if shares_bought > 0: | |
| current_shares += shares_bought | |
| current_cash -= shares_bought * current_price | |
| buy_dates.append(date) | |
| buy_values.append(current_shares * current_price + current_cash) | |
| elif signal_row["trading_signal"] == "SELL": | |
| position_percent = signal_row["position_size"] / 100.0 | |
| shares_sold = int(current_shares * position_percent) if current_shares > 0 else 0 | |
| if shares_sold > 0: | |
| current_shares -= shares_sold | |
| current_cash += shares_sold * current_price | |
| sell_dates.append(date) | |
| sell_values.append(current_shares * current_price + current_cash) | |
| return buy_dates, buy_values, sell_dates, sell_values | |
| def _extract_dates_prices(cerebro) -> Tuple[List, List[float]]: | |
| data_feed = cerebro.datas[0] | |
| data_len = len(data_feed.close.array) | |
| dates, prices = [], [] | |
| for i in range(data_len): | |
| dt_val = data_feed.datetime.array[i] | |
| date_obj = datetime.fromordinal(int(dt_val)).date() | |
| dates.append(date_obj) | |
| prices.append(data_feed.close.array[i]) | |
| return dates, prices | |
| def plot_single_stock(symbol: str, primo_cerebro, buyhold_cerebro, output_dir: str, filename: str | None = None) -> Path: | |
| """Create single-stock portfolio comparison chart and save it.""" | |
| dates, prices = _extract_dates_prices(primo_cerebro) | |
| primo_strategy = primo_cerebro.runstrats[0][0] | |
| primo_portfolio = getattr(primo_strategy, "portfolio_values", []) | |
| buyhold_portfolio = [DEFAULT_CASH] | |
| if prices: | |
| buyhold_shares = int(DEFAULT_CASH / prices[0]) | |
| buyhold_cash_left = DEFAULT_CASH - (buyhold_shares * prices[0]) | |
| for price in prices[1:]: | |
| buyhold_portfolio.append(buyhold_shares * price + buyhold_cash_left) | |
| if len(primo_portfolio) != len(dates): | |
| if len(primo_portfolio) < len(dates): | |
| last_value = primo_portfolio[-1] if primo_portfolio else DEFAULT_CASH | |
| primo_portfolio.extend([last_value] * (len(dates) - len(primo_portfolio))) | |
| else: | |
| primo_portfolio = primo_portfolio[: len(dates)] | |
| fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10), gridspec_kw={"height_ratios": [2, 1]}) | |
| ax1.plot(dates, primo_portfolio, color="blue", linewidth=2, label="PrimoAgent Portfolio") | |
| ax1.plot(dates, buyhold_portfolio, color="red", linewidth=2, label="Buy & Hold Portfolio") | |
| ax1.set_ylabel("Portfolio Value ($)") | |
| signals_df = getattr(primo_strategy, "signals_df", None) | |
| if signals_df is not None: | |
| buy_dates, buy_values, sell_dates, sell_values = calculate_trade_execution( | |
| signals_df, dates, prices, DEFAULT_CASH | |
| ) | |
| if buy_dates: | |
| ax1.scatter(buy_dates, buy_values, color="green", marker="^", s=100, alpha=0.8, label="BUY Executed", zorder=5) | |
| if sell_dates: | |
| ax1.scatter(sell_dates, sell_values, color="red", marker="v", s=100, alpha=0.8, label="SELL Executed", zorder=5) | |
| ax1.legend(loc="upper left") | |
| ax1.set_title(f"{symbol}: PrimoAgent vs Buy & Hold Performance") | |
| ax1.grid(True, alpha=0.3) | |
| if signals_df is not None: | |
| buy_volumes, sell_volumes = [], [] | |
| current_shares, current_cash = 0, DEFAULT_CASH | |
| for i, date in enumerate(dates): | |
| matching_signals = signals_df[signals_df["date"].dt.date == date] | |
| if not matching_signals.empty: | |
| signal_row = matching_signals.iloc[0] | |
| price = prices[i] | |
| if signal_row["trading_signal"] == "BUY": | |
| position_percent = signal_row["position_size"] / 100.0 | |
| target_cash = current_cash * position_percent | |
| shares_bought = int(target_cash / price) if current_cash > price else 0 | |
| if shares_bought > 0: | |
| current_shares += shares_bought | |
| current_cash -= shares_bought * price | |
| buy_volumes.append(shares_bought) | |
| sell_volumes.append(0) | |
| else: | |
| buy_volumes.append(0) | |
| sell_volumes.append(0) | |
| elif signal_row["trading_signal"] == "SELL": | |
| position_percent = signal_row["position_size"] / 100.0 | |
| shares_sold = int(current_shares * position_percent) if current_shares > 0 else 0 | |
| if shares_sold > 0: | |
| current_shares -= shares_sold | |
| current_cash += shares_sold * price | |
| buy_volumes.append(0) | |
| sell_volumes.append(-shares_sold) | |
| else: | |
| buy_volumes.append(0) | |
| sell_volumes.append(0) | |
| else: | |
| buy_volumes.append(0) | |
| sell_volumes.append(0) | |
| else: | |
| buy_volumes.append(0) | |
| sell_volumes.append(0) | |
| ax2.bar(dates, buy_volumes, color="green", alpha=0.7, label="BUY Shares") | |
| ax2.bar(dates, sell_volumes, color="red", alpha=0.7, label="SELL Shares") | |
| ax2.set_ylabel("Number of Shares") | |
| ax2.set_xlabel("Date") | |
| ax2.set_title("Trading Volume") | |
| ax2.legend() | |
| ax2.grid(True, alpha=0.3) | |
| ax2.axhline(y=0, color="black", linewidth=0.5) | |
| plt.tight_layout() | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| save_path = output_path / (filename or f"single_backtest_{symbol}.png") | |
| plt.savefig(save_path, dpi=300, bbox_inches="tight") | |
| plt.close() | |
| return save_path | |
| def plot_returns_bar_chart(all_results: Dict[str, Dict[str, Any]], save_path: Path) -> None: | |
| """Create simple bar chart showing returns for all stocks and strategies.""" | |
| fig, ax = plt.subplots(figsize=(12, 8)) | |
| symbols = sorted(all_results.keys()) | |
| primo_returns = [all_results[s]["primo"]["Cumulative Return [%]"] for s in symbols] | |
| buyhold_returns = [all_results[s]["buyhold"]["Cumulative Return [%]"] for s in symbols] | |
| x = range(len(symbols)) | |
| width = 0.35 | |
| bars1 = ax.bar([i - width / 2 for i in x], primo_returns, width, label="PrimoAgent", color="#1f77b4", alpha=0.8) | |
| bars2 = ax.bar([i + width / 2 for i in x], buyhold_returns, width, label="Buy & Hold", color="#ff7f0e", alpha=0.8) | |
| ax.set_xlabel("Stocks") | |
| ax.set_ylabel("Cumulative Return (%)") | |
| ax.set_title("Performance Comparison: PrimoAgent vs Buy & Hold") | |
| ax.set_xticks(list(x)) | |
| ax.set_xticklabels(symbols) | |
| ax.legend() | |
| ax.grid(True, alpha=0.3, axis="y") | |
| ax.axhline(y=0, color="black", linewidth=0.5) | |
| for bar, value in zip(bars1, primo_returns): | |
| height = bar.get_height() | |
| ax.text( | |
| bar.get_x() + bar.get_width() / 2.0, | |
| height + (0.5 if height >= 0 else -1.5), | |
| f"{value:.1f}%", | |
| ha="center", | |
| va="bottom" if height >= 0 else "top", | |
| fontsize=10, | |
| fontweight="bold", | |
| ) | |
| for bar, value in zip(bars2, buyhold_returns): | |
| height = bar.get_height() | |
| ax.text( | |
| bar.get_x() + bar.get_width() / 2.0, | |
| height + (0.5 if height >= 0 else -1.5), | |
| f"{value:.1f}%", | |
| ha="center", | |
| va="bottom" if height >= 0 else "top", | |
| fontsize=10, | |
| fontweight="bold", | |
| ) | |
| plt.tight_layout() | |
| save_path.parent.mkdir(parents=True, exist_ok=True) | |
| plt.savefig(save_path, dpi=300, bbox_inches="tight") | |
| plt.close() | |