import json from crewai.tools import BaseTool from pydantic import BaseModel, Field from typing import Dict, Any, Type # ---------- Input Schema ---------- class AnalyticsInput(BaseModel): market_data: str = Field(..., description="Structured JSON string from MarketDataTool") historical_data: str = Field(..., description="Structured JSON string from HistoricalDataTool") sentiment_data: str = Field(..., description="Structured JSON string from SentimentTool") # ---------- Tool ---------- class AnalyticsTool(BaseTool): name: str = "analytics_tool" description: str = ( "Aggregates structured market, historical, and sentiment data to produce " "quantitative indicators including pct_change, volatility, trend, sentiment, " "sentiment_strength, confidence, alignment, and a composite score." ) args_schema: Type[BaseModel] = AnalyticsInput def _run(self, market_data: str, historical_data: str, sentiment_data: str) -> dict: try: # Parse JSON strings into dicts if isinstance(market_data, str): market_data = json.loads(market_data) if isinstance(historical_data, str): historical_data = json.loads(historical_data) if isinstance(sentiment_data, str): sentiment_data = json.loads(sentiment_data) # ------------------------------------------------------------ # 1) Extract fields from structured tool outputs # ------------------------------------------------------------ price = market_data.get("latest_price") pct_change = historical_data.get("pct_change") volatility = historical_data.get("volatility_pct") trend = historical_data.get("trend") sentiment = sentiment_data.get("sentiment") if price is None or pct_change is None or trend is None or sentiment is None: return { "error": ( "Missing required fields in analytics input. " "Ensure all tools returned structured JSON." ) } sentiment = sentiment.lower() # ------------------------------------------------------------ # 2) Sentiment strength & confidence # ------------------------------------------------------------ # Pull from SentimentTool if present sentiment_strength = sentiment_data.get("sentiment_strength") sentiment_confidence = sentiment_data.get("confidence") # ---- Backwards-compatible defaults ---- if sentiment_strength is None: sentiment_strength = { "bullish": 0.7, "neutral": 0.0, "bearish": -0.7 }.get(sentiment, 0.0) if sentiment_confidence is None: # Basic proxy confidence using number of headlines/comments news_count = len(sentiment_data.get("news_headlines", [])) reddit_count = len(sentiment_data.get("reddit_comments", [])) sources = news_count + reddit_count sentiment_confidence = min(1.0, 0.2 + 0.1 * sources) # Effective weighted sentiment effective_sentiment = sentiment_strength * sentiment_confidence # ------------------------------------------------------------ # 3) Alignment logic # ------------------------------------------------------------ aligned = ( (trend == "upward" and effective_sentiment > 0.2) or (trend == "downward" and effective_sentiment < -0.2) ) # ------------------------------------------------------------ # 4) Composite score # ------------------------------------------------------------ score = ( (pct_change / 10) + # Trend effect (effective_sentiment * 1.5) - # Strong weight for sentiment (volatility / 100 if volatility else 0) # Penalize volatility ) # Bound between [-1, 1] score = round(max(-1, min(1, score)), 2) # ------------------------------------------------------------ # 5) Final structured output # ------------------------------------------------------------ return { "price": price, "pct_change": pct_change, "volatility_pct": volatility, "trend": trend, "sentiment": sentiment, "sentiment_strength": round(sentiment_strength, 3), "sentiment_confidence": round(sentiment_confidence, 3), "effective_sentiment": round(effective_sentiment, 3), "alignment": "aligned" if aligned else "divergent", "composite_score": score, "summary": ( f"Trend={trend}, Sentiment={sentiment}, " f"Strength={round(sentiment_strength,3)}, " f"Confidence={round(sentiment_confidence,3)}, " f"Alignment={'aligned' if aligned else 'divergent'}, " f"Score={score}" ), } except Exception as e: return {"error": f"AnalyticsTool failed: {str(e)}"}