Spaces:
Sleeping
Sleeping
| from langchain_core.tools import tool | |
| import datetime | |
| import requests | |
| import openai | |
| import os | |
| import tempfile | |
| import pandas as pd | |
| from urllib.parse import urlparse, parse_qs | |
| from openai import OpenAI | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound, VideoUnavailable | |
| from pytube import extract | |
| from openai import OpenAI | |
| from bs4 import BeautifulSoup | |
| from io import BytesIO | |
| from PyPDF2 import PdfReader | |
| class GroqKeyManager: | |
| def __init__(self): | |
| self.api_keys = [ | |
| os.getenv("GROQ_API_KEY"), | |
| os.getenv("GROQ_API_KEY_1"), | |
| os.getenv("GROQ_API_KEY_2"), | |
| ] | |
| self.key_index = 0 | |
| def get_next_key(self): | |
| api_key = self.api_keys[self.key_index] | |
| self.key_index = (self.key_index + 1) % len(self.api_keys) | |
| return api_key | |
| # Create a global instance of the key manager | |
| groq_key_manager = GroqKeyManager() | |
| def add(a: float, b: float) -> float: | |
| """ Adds two numbers. | |
| Args: | |
| a (float): first number | |
| b (float): second number | |
| """ | |
| return a + b | |
| def subtract(a: float, b: float) -> int: | |
| """ Subtracts two numbers. | |
| Args: | |
| a (float): first number | |
| b (float): second number | |
| """ | |
| return a - b | |
| def multiply(a: float, b: float) -> float: | |
| """ Multiplies two numbers. | |
| Args: | |
| a (float): first number | |
| b (float): second number | |
| """ | |
| return a * b | |
| def divide(a: float, b: float) -> float: | |
| """ Divides two numbers. | |
| Args: | |
| a (float): first number | |
| b (float): second number | |
| """ | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def power(a: float, b: float) -> float: | |
| """ Calculates the power of two numbers. | |
| Args: | |
| a (float): first number | |
| b (float): second number | |
| """ | |
| return a**b | |
| calculator_basic = [add, subtract, multiply, divide, power] | |
| def current_date(_) -> str : | |
| """ Returns the current date in YYYY-MM-DD format """ | |
| return datetime.datetime.now().strftime("%Y-%m-%d") | |
| def day_of_week(_) -> str : | |
| """ Returns the current day of the week (e.g., Monday, Tuesday) """ | |
| return datetime.datetime.now().strftime("%A") | |
| def days_until(date_str: str) -> str : | |
| """ Returns the number of days from today until a given date (input format: YYYY-MM-DD) """ | |
| try: | |
| future_date = datetime.datetime.strptime(date_str, "%Y-%m-%d").date() | |
| today = datetime.date.today() | |
| delta_days = (future_date - today).days | |
| return f"{delta_days} days until {date_str}" | |
| except Exception as e: | |
| return f"Error parsing date: {str(e)}" | |
| datetime_tools = [current_date, day_of_week, days_until] | |
| def transcribe_audio(audio_file: str, file_extension: str) -> str: | |
| """ Transcribes an audio file to text | |
| Args: | |
| audio_file (str): local file path to the audio file (.mp3, .m4a, etc.) | |
| file_extension (str): file extension of the audio, e.g. mp3 | |
| Returns: | |
| str: The transcribed text from the audio. | |
| """ | |
| try: | |
| response = requests.get(audio_file) # download the audio_file | |
| response.raise_for_status() # check if the http request was successful | |
| # clean file extension and save to disk | |
| file_extension = file_extension.replace('.','') | |
| filename = f'tmp.{file_extension}' | |
| with open(filename, 'wb') as file: # opens a new file for writing with a name like, e.g. tmp.mp3 | |
| file.write(response.content) # write(w) the binary(b) contents (audio file) to disk | |
| # Get the next Groq API key | |
| api_key = groq_key_manager.get_next_key() | |
| # Initialize the client with Groq's base URL and API key | |
| client = OpenAI( | |
| base_url="https://api.groq.com/openai/v1", | |
| api_key=api_key | |
| ) | |
| # Note: Check if Groq supports audio transcription | |
| # If not, you might need to keep using OpenAI's Whisper for this specific function | |
| with open(filename, "rb") as audio_content: | |
| transcription = client.audio.transcriptions.create( | |
| model="whisper-large-v3-turbo", # Check if Groq has an equivalent model | |
| file=audio_content | |
| ) | |
| return transcription.text | |
| except Exception as e: | |
| return f"transcribe_audio failed: {e}" | |
| def transcribe_youtube(youtube_url: str) -> str: | |
| """ Transcribes a YouTube video | |
| Args: | |
| youtube_url (str): youtube video's url | |
| Returns: | |
| str: The transcribed text from the video. | |
| """ | |
| try: | |
| query = urlparse(youtube_url).query | |
| video_id = parse_qs(query)['v'][0] | |
| except Exception: | |
| return "invalid YouTube URL" | |
| try: | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| transcript = transcript_list.find_transcript(['en']).fetch() | |
| # keep only text | |
| text = '\n'.join([t['text'] for t in transcript]) | |
| return text | |
| except (TranscriptsDisabled, NoTranscriptFound, VideoUnavailable) as e: | |
| return f"transcript unavailable: {str(e)}" | |
| except Exception as e: | |
| return f"transcribe_youtube failed: {e}" | |
| def query_image(query: str, image_url: str) -> str: | |
| """ Ask anything about an image using a Vision Language Model | |
| Args: | |
| query (str): the query about the image, e.g. how many animals are on the image? | |
| image_url (str): the image's URL | |
| """ | |
| try: | |
| api_key = groq_key_manager.get_next_key() | |
| client = OpenAI( | |
| base_url="https://api.groq.com/openai/v1", | |
| api_key=api_key | |
| ) | |
| response = client.responses.create( | |
| model="meta-llama/llama-4-scout-17b-16e-instruct", | |
| input=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "input_text", "text": query}, | |
| {"type": "input_image", "image_url": image_url}, | |
| ], | |
| } | |
| ], | |
| ) | |
| return response.output_text | |
| except Exception as e: | |
| return f"query_image failed: {e}" | |
| def webpage_content(url: str) -> str: | |
| """ Fetch text from a webpage or PDF file. | |
| Args: | |
| url (str): The URL of the webpage to fetch. | |
| Returns: | |
| str: Extracted text. | |
| """ | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| content_type = response.headers.get("Content-Type", "") | |
| # PDF file | |
| if "pdf" in content_type: | |
| pdf_content = BytesIO(response.content) | |
| reader = PdfReader(pdf_content) | |
| return "\n".join(page.extract_text() or "" for page in reader.pages) | |
| # HTML file | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| body = soup.body | |
| return body.get_text(separator="\n", strip=True) if body else soup.get_text(strip=True) | |
| except Exception as e: | |
| return f"webpage_content failed: {e}" | |
| def read_excel(file_url: str) -> str: | |
| """ Reads an Excel file from a URL and returns the content as CSV text. | |
| Args: | |
| file_url (str): URL to the Excel file (.xlsx, .xls) | |
| Returns: | |
| str: Content of the Excel file as CSV text. | |
| """ | |
| try: | |
| response = requests.get(file_url) | |
| response.raise_for_status() | |
| excel_content = BytesIO(response.content) | |
| df = pd.read_excel(excel_content) | |
| return df.to_csv(index=False) # convert dataframe to CSV string for easy processing | |
| except Exception as e: | |
| return f"read_excel failed: {str(e)}" | |