CiscsoPonce's picture
Initial Deploy (Clean)
a2cbcac
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import List, Tuple, Dict, Any
import matplotlib.pyplot as plt
from .engine import DEFAULT_CASH
def calculate_trade_execution(
signals_df, dates: List, prices: List[float], starting_cash: float
) -> Tuple[List, List[float], List, List[float]]:
"""Calculate executed buy/sell points to overlay on portfolio chart."""
buy_dates, buy_values, sell_dates, sell_values = [], [], [], []
current_shares = 0
current_cash = starting_cash
for i, date in enumerate(dates):
matching_signals = signals_df[signals_df["date"].dt.date == date]
if matching_signals.empty:
continue
signal_row = matching_signals.iloc[0]
current_price = prices[i]
if signal_row["trading_signal"] == "BUY":
position_percent = signal_row["position_size"] / 100.0
target_cash = current_cash * position_percent
shares_bought = int(target_cash / current_price) if current_cash > current_price else 0
if shares_bought > 0:
current_shares += shares_bought
current_cash -= shares_bought * current_price
buy_dates.append(date)
buy_values.append(current_shares * current_price + current_cash)
elif signal_row["trading_signal"] == "SELL":
position_percent = signal_row["position_size"] / 100.0
shares_sold = int(current_shares * position_percent) if current_shares > 0 else 0
if shares_sold > 0:
current_shares -= shares_sold
current_cash += shares_sold * current_price
sell_dates.append(date)
sell_values.append(current_shares * current_price + current_cash)
return buy_dates, buy_values, sell_dates, sell_values
def _extract_dates_prices(cerebro) -> Tuple[List, List[float]]:
data_feed = cerebro.datas[0]
data_len = len(data_feed.close.array)
dates, prices = [], []
for i in range(data_len):
dt_val = data_feed.datetime.array[i]
date_obj = datetime.fromordinal(int(dt_val)).date()
dates.append(date_obj)
prices.append(data_feed.close.array[i])
return dates, prices
def plot_single_stock(symbol: str, primo_cerebro, buyhold_cerebro, output_dir: str, filename: str | None = None) -> Path:
"""Create single-stock portfolio comparison chart and save it."""
dates, prices = _extract_dates_prices(primo_cerebro)
primo_strategy = primo_cerebro.runstrats[0][0]
primo_portfolio = getattr(primo_strategy, "portfolio_values", [])
buyhold_portfolio = [DEFAULT_CASH]
if prices:
buyhold_shares = int(DEFAULT_CASH / prices[0])
buyhold_cash_left = DEFAULT_CASH - (buyhold_shares * prices[0])
for price in prices[1:]:
buyhold_portfolio.append(buyhold_shares * price + buyhold_cash_left)
if len(primo_portfolio) != len(dates):
if len(primo_portfolio) < len(dates):
last_value = primo_portfolio[-1] if primo_portfolio else DEFAULT_CASH
primo_portfolio.extend([last_value] * (len(dates) - len(primo_portfolio)))
else:
primo_portfolio = primo_portfolio[: len(dates)]
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10), gridspec_kw={"height_ratios": [2, 1]})
ax1.plot(dates, primo_portfolio, color="blue", linewidth=2, label="PrimoAgent Portfolio")
ax1.plot(dates, buyhold_portfolio, color="red", linewidth=2, label="Buy & Hold Portfolio")
ax1.set_ylabel("Portfolio Value ($)")
signals_df = getattr(primo_strategy, "signals_df", None)
if signals_df is not None:
buy_dates, buy_values, sell_dates, sell_values = calculate_trade_execution(
signals_df, dates, prices, DEFAULT_CASH
)
if buy_dates:
ax1.scatter(buy_dates, buy_values, color="green", marker="^", s=100, alpha=0.8, label="BUY Executed", zorder=5)
if sell_dates:
ax1.scatter(sell_dates, sell_values, color="red", marker="v", s=100, alpha=0.8, label="SELL Executed", zorder=5)
ax1.legend(loc="upper left")
ax1.set_title(f"{symbol}: PrimoAgent vs Buy & Hold Performance")
ax1.grid(True, alpha=0.3)
if signals_df is not None:
buy_volumes, sell_volumes = [], []
current_shares, current_cash = 0, DEFAULT_CASH
for i, date in enumerate(dates):
matching_signals = signals_df[signals_df["date"].dt.date == date]
if not matching_signals.empty:
signal_row = matching_signals.iloc[0]
price = prices[i]
if signal_row["trading_signal"] == "BUY":
position_percent = signal_row["position_size"] / 100.0
target_cash = current_cash * position_percent
shares_bought = int(target_cash / price) if current_cash > price else 0
if shares_bought > 0:
current_shares += shares_bought
current_cash -= shares_bought * price
buy_volumes.append(shares_bought)
sell_volumes.append(0)
else:
buy_volumes.append(0)
sell_volumes.append(0)
elif signal_row["trading_signal"] == "SELL":
position_percent = signal_row["position_size"] / 100.0
shares_sold = int(current_shares * position_percent) if current_shares > 0 else 0
if shares_sold > 0:
current_shares -= shares_sold
current_cash += shares_sold * price
buy_volumes.append(0)
sell_volumes.append(-shares_sold)
else:
buy_volumes.append(0)
sell_volumes.append(0)
else:
buy_volumes.append(0)
sell_volumes.append(0)
else:
buy_volumes.append(0)
sell_volumes.append(0)
ax2.bar(dates, buy_volumes, color="green", alpha=0.7, label="BUY Shares")
ax2.bar(dates, sell_volumes, color="red", alpha=0.7, label="SELL Shares")
ax2.set_ylabel("Number of Shares")
ax2.set_xlabel("Date")
ax2.set_title("Trading Volume")
ax2.legend()
ax2.grid(True, alpha=0.3)
ax2.axhline(y=0, color="black", linewidth=0.5)
plt.tight_layout()
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
save_path = output_path / (filename or f"single_backtest_{symbol}.png")
plt.savefig(save_path, dpi=300, bbox_inches="tight")
plt.close()
return save_path
def plot_returns_bar_chart(all_results: Dict[str, Dict[str, Any]], save_path: Path) -> None:
"""Create simple bar chart showing returns for all stocks and strategies."""
fig, ax = plt.subplots(figsize=(12, 8))
symbols = sorted(all_results.keys())
primo_returns = [all_results[s]["primo"]["Cumulative Return [%]"] for s in symbols]
buyhold_returns = [all_results[s]["buyhold"]["Cumulative Return [%]"] for s in symbols]
x = range(len(symbols))
width = 0.35
bars1 = ax.bar([i - width / 2 for i in x], primo_returns, width, label="PrimoAgent", color="#1f77b4", alpha=0.8)
bars2 = ax.bar([i + width / 2 for i in x], buyhold_returns, width, label="Buy & Hold", color="#ff7f0e", alpha=0.8)
ax.set_xlabel("Stocks")
ax.set_ylabel("Cumulative Return (%)")
ax.set_title("Performance Comparison: PrimoAgent vs Buy & Hold")
ax.set_xticks(list(x))
ax.set_xticklabels(symbols)
ax.legend()
ax.grid(True, alpha=0.3, axis="y")
ax.axhline(y=0, color="black", linewidth=0.5)
for bar, value in zip(bars1, primo_returns):
height = bar.get_height()
ax.text(
bar.get_x() + bar.get_width() / 2.0,
height + (0.5 if height >= 0 else -1.5),
f"{value:.1f}%",
ha="center",
va="bottom" if height >= 0 else "top",
fontsize=10,
fontweight="bold",
)
for bar, value in zip(bars2, buyhold_returns):
height = bar.get_height()
ax.text(
bar.get_x() + bar.get_width() / 2.0,
height + (0.5 if height >= 0 else -1.5),
f"{value:.1f}%",
ha="center",
va="bottom" if height >= 0 else "top",
fontsize=10,
fontweight="bold",
)
plt.tight_layout()
save_path.parent.mkdir(parents=True, exist_ok=True)
plt.savefig(save_path, dpi=300, bbox_inches="tight")
plt.close()