| import threading |
| import pandas as pd |
| from dataclasses import dataclass |
|
|
| from ..core.logging import logger |
| from ..core.decorators import atomic_method |
| from ..core.callbacks import suppress_cost_logs |
| from ..core.registry import MODEL_REGISTRY |
| from .model_configs import LLMConfig |
| from ..models.base_model import BaseLLM |
|
|
| def get_openai_model_cost() -> dict: |
| import json |
| from importlib.resources import files |
| |
| |
| |
| json_path = files('litellm') / 'model_prices_and_context_window_backup.json' |
| model_cost = json.loads(json_path.read_text(encoding="utf-8")) |
| return model_cost |
|
|
| def infer_litellm_company_from_model(model: str) -> str: |
|
|
| if "/" in model: |
| company = model.split("/")[0] |
| else: |
| if "claude" in model or "anthropic" in model: |
| company = "anthropic" |
| elif "gemini" in model: |
| company = "gemini" |
| elif "deepseek" in model: |
| company = "deepseek" |
| elif "openrouter" in model: |
| company = "openrouter" |
| elif "azure" in model.lower(): |
| company = "azure" |
| else: |
| company = "openai" |
| return company |
|
|
|
|
| @dataclass |
| class Cost: |
| input_tokens: int |
| output_tokens: int |
| input_cost: float |
| output_cost: float |
|
|
|
|
| class CostManager: |
|
|
| def __init__(self): |
|
|
| self.total_input_tokens = {} |
| self.total_output_tokens = {} |
| self.total_tokens = {} |
|
|
| self.total_input_cost = {} |
| self.total_output_cost = {} |
| self.total_cost = {} |
|
|
| self._lock = threading.Lock() |
|
|
| def compute_total_cost(self): |
| total_tokens, total_cost = 0, 0.0 |
| for _, value in self.total_tokens.items(): |
| total_tokens += value |
| for _, value in self.total_cost.items(): |
| total_cost += value |
| return total_tokens, total_cost |
|
|
| @atomic_method |
| def update_cost(self, cost: Cost, model: str): |
|
|
| self.total_input_tokens[model] = self.total_input_tokens.get(model, 0) + cost.input_tokens |
| self.total_output_tokens[model] = self.total_output_tokens.get(model, 0) + cost.output_tokens |
| current_total_tokens = cost.input_tokens + cost.output_tokens |
| self.total_tokens[model] = self.total_tokens.get(model, 0) + current_total_tokens |
|
|
| self.total_input_cost[model] = self.total_input_cost.get(model, 0.0) + cost.input_cost |
| self.total_output_cost[model] = self.total_output_cost.get(model, 0.0) + cost.output_cost |
| current_total_cost = cost.input_cost + cost.output_cost |
| self.total_cost[model] = self.total_cost.get(model, 0.0) + current_total_cost |
| |
| total_tokens, total_cost = self.compute_total_cost() |
| if not suppress_cost_logs.get(): |
| logger.info(f"Total cost: ${total_cost:.3f} | Total tokens: {total_tokens} | Current cost: ${current_total_cost:.3f} | Current tokens: {current_total_tokens}") |
|
|
| def display_cost(self): |
|
|
| data = { |
| "Model": [], |
| "Total Cost (USD)": [], |
| "Total Input Cost (USD)": [], |
| "Total Output Cost (USD)": [], |
| "Total Tokens": [], |
| "Total Input Tokens": [], |
| "Total Output Tokens": [], |
| } |
|
|
| for model in self.total_tokens.keys(): |
|
|
| data["Model"].append(model) |
| data["Total Cost (USD)"].append(round(self.total_cost[model], 4)) |
| data["Total Input Cost (USD)"].append(round(self.total_input_cost[model], 4)) |
| data["Total Output Cost (USD)"].append(round(self.total_output_cost[model], 4)) |
|
|
| data["Total Tokens"].append(self.total_tokens[model]) |
| data["Total Input Tokens"].append(self.total_input_tokens[model]) |
| data["Total Output Tokens"].append(self.total_output_tokens[model]) |
| |
| |
| df = pd.DataFrame(data) |
| if len(df) > 1: |
| summary = { |
| "Model": "TOTAL", |
| "Total Cost (USD)": df["Total Cost (USD)"].sum(), |
| "Total Input Cost (USD)": df["Total Input Cost (USD)"].sum(), |
| "Total Output Cost (USD)": df["Total Output Cost (USD)"].sum(), |
| "Total Tokens": df["Total Tokens"].sum(), |
| "Total Input Tokens": df["Total Input Tokens"].sum(), |
| "Total Output Tokens": df["Total Output Tokens"].sum(), |
| } |
| df = df._append(summary, ignore_index=True) |
| |
| print(df.to_string(index=False)) |
|
|
| def get_total_cost(self): |
| |
| total_cost = 0.0 |
| for model in self.total_cost.keys(): |
| total_cost += self.total_cost[model] |
| return total_cost |
|
|
|
|
| cost_manager = CostManager() |
|
|
|
|
| def create_llm_instance(llm_config: LLMConfig) -> BaseLLM: |
|
|
| llm_cls = MODEL_REGISTRY.get_model(llm_config.llm_type) |
| llm = llm_cls(config=llm_config) |
| return llm |
|
|