web3-copilot / src /tools /chart_data_tool.py
Priyansh Saxena
Fix langchain imports: use langchain.memory for ConversationBufferWindowMemory, use langchain_core.tools for BaseTool (langchain 0.3.27 compatibility)
a68baeb
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Optional
import json
import asyncio
from src.utils.logger import get_logger
logger = get_logger(__name__)
class ChartDataInput(BaseModel):
"""Input schema for chart data requests"""
chart_type: str = Field(description="Chart type: price_chart, market_overview, defi_tvl, portfolio_pie, gas_tracker")
symbol: Optional[str] = Field(default=None, description="Asset symbol (e.g., bitcoin, ethereum)")
timeframe: Optional[str] = Field(default="30d", description="Time range: 1d, 7d, 30d, 90d, 365d")
protocols: Optional[List[str]] = Field(default=None, description="DeFi protocol names")
network: Optional[str] = Field(default="ethereum", description="Blockchain network")
class ChartDataTool(BaseTool):
"""
Chart Data Provider Tool
This tool provides structured data that can be used to create charts.
Instead of returning HTML, it returns clean JSON data for visualization.
"""
name: str = "chart_data_provider"
description: str = """Provides structured data for creating cryptocurrency charts.
Returns JSON data in this format:
{{
"chart_type": "price_chart|market_overview|defi_tvl|portfolio_pie|gas_tracker",
"data": {{...}},
"config": {{...}}
}}
Chart types:
- price_chart: Bitcoin/crypto price and volume data
- market_overview: Top cryptocurrencies market data
- defi_tvl: DeFi protocol TVL comparison
- portfolio_pie: Portfolio allocation breakdown
- gas_tracker: Gas fees across networks
"""
args_schema: type[ChartDataInput] = ChartDataInput
def _run(self, chart_type: str, symbol: str = None, timeframe: str = "30d",
protocols: List[str] = None, network: str = "ethereum") -> str:
"""Synchronous execution"""
return asyncio.run(self._arun(chart_type, symbol, timeframe, protocols, network))
async def _arun(self, chart_type: str, symbol: str = None, timeframe: str = "30d",
protocols: List[str] = None, network: str = "ethereum") -> str:
"""Asynchronous execution with proper session cleanup"""
try:
logger.info(f"Providing {chart_type} data for {symbol or 'general'}")
if chart_type == "price_chart":
if not symbol:
symbol = "bitcoin" # Default symbol
days = {"1d": 1, "7d": 7, "30d": 30, "90d": 90, "365d": 365}.get(timeframe, 30)
result = await self._get_price_chart_data(symbol, days)
elif chart_type == "market_overview":
result = await self._get_market_overview_data()
elif chart_type == "defi_tvl":
result = await self._get_defi_tvl_data(protocols)
elif chart_type == "portfolio_pie":
result = await self._get_portfolio_data()
elif chart_type == "gas_tracker":
result = await self._get_gas_tracker_data(network)
else:
result = json.dumps({
"chart_type": "error",
"error": f"Unknown chart type: {chart_type}",
"available_types": ["price_chart", "market_overview", "defi_tvl", "portfolio_pie", "gas_tracker"]
})
return result
except Exception as e:
logger.error(f"Chart data generation failed: {e}")
return json.dumps({
"chart_type": "error",
"error": str(e),
"message": "Failed to generate chart data"
})
finally:
# Ensure session cleanup
await self.cleanup()
async def _get_price_chart_data(self, symbol: str, days: int) -> str:
"""Get price chart data with fallback for API failures"""
cryptocompare_tool = None
try:
# First try to get real data from CryptoCompare (we have this API key)
from src.tools.cryptocompare_tool import CryptoCompareTool
cryptocompare_tool = CryptoCompareTool()
# Map common symbols to CryptoCompare format
symbol_map = {
"btc": "BTC", "bitcoin": "BTC",
"eth": "ethereum", "ethereum": "ETH",
"sol": "SOL", "solana": "SOL",
"ada": "ADA", "cardano": "ADA",
"bnb": "BNB", "binance": "BNB",
"matic": "MATIC", "polygon": "MATIC",
"avax": "AVAX", "avalanche": "AVAX",
"dot": "DOT", "polkadot": "DOT",
"link": "LINK", "chainlink": "LINK",
"uni": "UNI", "uniswap": "UNI"
}
crypto_symbol = symbol_map.get(symbol.lower(), symbol.upper())
try:
# Use CryptoCompare for price data
query = f"{crypto_symbol} price historical {days} days"
data_result = await cryptocompare_tool._arun(query, {"type": "price_history", "days": days})
if data_result and not data_result.startswith("❌") and not data_result.startswith("⚠️"):
return json.dumps({
"chart_type": "price_chart",
"data": {
"source": "cryptocompare",
"raw_data": data_result,
"symbol": crypto_symbol,
"name": symbol.title()
},
"config": {
"title": f"{symbol.title()} Price Analysis ({days} days)",
"timeframe": f"{days}d",
"currency": "USD"
}
})
else:
raise Exception("No valid price data from CryptoCompare")
except Exception as api_error:
logger.error(f"CryptoCompare price data failed: {api_error}")
# Fallback to mock data on any API error
logger.info(f"Using fallback mock data for {symbol}")
return await self._get_mock_price_data(symbol, days)
except Exception as e:
logger.error(f"Price chart data generation failed: {e}")
# Final fallback to mock data
return await self._get_mock_price_data(symbol, days)
finally:
# Cleanup CryptoCompare tool session
if cryptocompare_tool and hasattr(cryptocompare_tool, 'cleanup'):
try:
await cryptocompare_tool.cleanup()
except Exception:
pass # Ignore cleanup errors
async def _get_mock_price_data(self, symbol: str, days: int) -> str:
"""Fallback mock price data"""
import time
import random
base_price = 35000 if symbol.lower() == "bitcoin" else 1800 if symbol.lower() == "ethereum" else 100
base_timestamp = int(time.time() * 1000) - (days * 24 * 60 * 60 * 1000)
price_data = []
volume_data = []
for i in range(days):
timestamp = base_timestamp + (i * 24 * 60 * 60 * 1000)
price_change = random.uniform(-0.05, 0.05)
price = base_price * (1 + price_change * i / days)
price += random.uniform(-price*0.02, price*0.02)
volume = random.uniform(1000000000, 5000000000)
price_data.append([timestamp, round(price, 2)])
volume_data.append([timestamp, int(volume)])
return json.dumps({
"chart_type": "price_chart",
"data": {
"prices": price_data,
"total_volumes": volume_data,
"symbol": symbol.upper(),
"name": symbol.title()
},
"config": {
"title": f"{symbol.title()} Price Analysis ({days} days)",
"timeframe": f"{days}d",
"currency": "USD"
}
})
async def _get_market_overview_data(self) -> str:
"""Get market overview data using CryptoCompare API"""
cryptocompare_tool = None
try:
from src.tools.cryptocompare_tool import CryptoCompareTool
cryptocompare_tool = CryptoCompareTool()
# Get market overview using CryptoCompare
query = "top cryptocurrencies market cap overview"
data_result = await cryptocompare_tool._arun(query, {"type": "market_overview"})
if data_result and not data_result.startswith("❌") and not data_result.startswith("⚠️"):
return json.dumps({
"chart_type": "market_overview",
"data": {
"source": "cryptocompare",
"raw_data": data_result
},
"config": {
"title": "Top Cryptocurrencies Market Overview",
"currency": "USD"
}
})
else:
raise Exception("No valid market data from CryptoCompare")
except Exception as e:
logger.error(f"Market overview API failed: {e}")
return await self._get_mock_market_data()
finally:
# Cleanup CryptoCompare tool session
if cryptocompare_tool and hasattr(cryptocompare_tool, 'cleanup'):
try:
await cryptocompare_tool.cleanup()
except Exception:
pass # Ignore cleanup errors
async def _get_mock_market_data(self) -> str:
"""Fallback mock market data"""
return json.dumps({
"chart_type": "market_overview",
"data": {
"coins": [
{"name": "Bitcoin", "symbol": "BTC", "current_price": 35000, "market_cap_rank": 1, "price_change_percentage_24h": 2.5},
{"name": "Ethereum", "symbol": "ETH", "current_price": 1800, "market_cap_rank": 2, "price_change_percentage_24h": -1.2},
{"name": "Cardano", "symbol": "ADA", "current_price": 0.25, "market_cap_rank": 3, "price_change_percentage_24h": 3.1},
{"name": "Solana", "symbol": "SOL", "current_price": 22.5, "market_cap_rank": 4, "price_change_percentage_24h": -2.8},
{"name": "Polygon", "symbol": "MATIC", "current_price": 0.52, "market_cap_rank": 5, "price_change_percentage_24h": 1.9}
]
},
"config": {
"title": "Top Cryptocurrencies Market Overview",
"currency": "USD"
}
})
async def _get_defi_tvl_data(self, protocols: List[str]) -> str:
"""Get real DeFi TVL data from DeFiLlama API"""
try:
from src.tools.defillama_tool import DeFiLlamaTool
defillama = DeFiLlamaTool()
# Get protocols data
data = await defillama.make_request(f"{defillama._base_url}/protocols")
if not data:
logger.warning("DeFiLlama API failed, using fallback")
return await self._get_mock_defi_data(protocols)
# Filter for requested protocols or top protocols
if protocols:
filtered_protocols = []
for protocol_name in protocols:
for protocol in data:
if protocol_name.lower() in protocol.get("name", "").lower():
filtered_protocols.append(protocol)
break
protocols_data = filtered_protocols[:8] # Limit to 8
else:
# Get top protocols by TVL (filter out None values)
valid_protocols = [p for p in data if p.get("tvl") is not None and p.get("tvl", 0) > 0]
protocols_data = sorted(valid_protocols, key=lambda x: x.get("tvl", 0), reverse=True)[:8]
if not protocols_data:
return await self._get_mock_defi_data(protocols)
# Format TVL data
tvl_data = []
for protocol in protocols_data:
tvl_data.append({
"name": protocol.get("name", "Unknown"),
"tvl": protocol.get("tvl", 0),
"change_1d": protocol.get("change_1d", 0),
"chain": protocol.get("chain", "Multi-chain"),
"category": protocol.get("category", "DeFi")
})
return json.dumps({
"chart_type": "defi_tvl",
"data": {"protocols": tvl_data},
"config": {
"title": "DeFi Protocols by Total Value Locked",
"currency": "USD"
}
})
except Exception as e:
logger.error(f"DeFi TVL API failed: {e}")
return await self._get_mock_defi_data(protocols)
async def _get_mock_defi_data(self, protocols: List[str]) -> str:
"""Fallback mock DeFi data"""
import random
protocol_names = protocols or ["Uniswap", "Aave", "Compound", "Curve", "MakerDAO"]
tvl_data = []
for protocol in protocol_names[:5]:
tvl = random.uniform(500000000, 5000000000)
change = random.uniform(-10, 15)
tvl_data.append({
"name": protocol,
"tvl": tvl,
"change_1d": change,
"chain": "Ethereum",
"category": "DeFi"
})
return json.dumps({
"chart_type": "defi_tvl",
"data": {"protocols": tvl_data},
"config": {
"title": "DeFi Protocols by Total Value Locked",
"currency": "USD"
}
})
async def _get_portfolio_data(self) -> str:
"""Get portfolio allocation data"""
return json.dumps({
"chart_type": "portfolio_pie",
"data": {
"allocations": [
{"name": "Bitcoin", "symbol": "BTC", "value": 40, "color": "#f7931a"},
{"name": "Ethereum", "symbol": "ETH", "value": 30, "color": "#627eea"},
{"name": "Cardano", "symbol": "ADA", "value": 15, "color": "#0033ad"},
{"name": "Solana", "symbol": "SOL", "value": 10, "color": "#9945ff"},
{"name": "Other", "symbol": "OTHER", "value": 5, "color": "#666666"}
]
},
"config": {
"title": "Sample Portfolio Allocation",
"currency": "Percentage"
}
})
async def _get_gas_data(self, network: str) -> str:
"""Get gas fee data"""
import random
import time
# Generate 24 hours of gas data
gas_data = []
base_timestamp = int(time.time() * 1000) - (24 * 60 * 60 * 1000)
for i in range(24):
timestamp = base_timestamp + (i * 60 * 60 * 1000)
gas_price = random.uniform(20, 100) if network == "ethereum" else random.uniform(1, 10)
gas_data.append([timestamp, round(gas_price, 2)])
return json.dumps({
"chart_type": "gas_tracker",
"data": {
"gas_prices": gas_data,
"network": network.title()
},
"config": {
"title": f"{network.title()} Gas Fee Tracker (24h)",
"unit": "Gwei"
}
})
def _parse_timeframe(self, timeframe: str) -> int:
"""Convert timeframe string to days"""
timeframe_map = {
"1d": 1, "7d": 7, "30d": 30, "90d": 90, "365d": 365, "1y": 365
}
return timeframe_map.get(timeframe, 30)
async def cleanup(self):
"""Cleanup method for session management"""
# ChartDataTool creates temporary tools that may have sessions
# Since we don't maintain persistent references, sessions should auto-close
# But we can force garbage collection to ensure cleanup
import gc
gc.collect()