Spaces:
Sleeping
Sleeping
| # std lib | |
| import base64 | |
| from typing import Optional | |
| import os | |
| import requests | |
| from pathlib import Path | |
| # 3rd party imports | |
| import pandas as pd | |
| from langchain_core.messages import HumanMessage | |
| from langchain_openai import ChatOpenAI | |
| from tavily import TavilyClient | |
| import wikipedia | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| openai_token = os.getenv("HF_FINAL_ASSIGNMENT_OPENAI") | |
| tavily_api_key = os.getenv("HF_FINAL_ASSIGNMENT_TAVILY") | |
| tavily_client = TavilyClient(api_key=tavily_api_key) | |
| vision_llm = ChatOpenAI(model="gpt-5.2", api_key=openai_token, temperature=0) | |
| def extract_text_from_image(img_path: str) -> str: | |
| """ | |
| Extract text from an image file using a multimodal model. | |
| Use this method only for image files. | |
| Args: | |
| img_path: A local image file path (strings). | |
| Returns: | |
| A single string containing the concatenated text extracted from each image. | |
| """ | |
| all_text = "" | |
| try: | |
| # Read image and encode as base64 | |
| with open(img_path, "rb") as image_file: | |
| image_bytes = image_file.read() | |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| # Prepare the prompt including the base64 image data | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": ( | |
| "Extract all the text from this image. " | |
| "Return only the extracted text, no explanations." | |
| ), | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/png;base64,{image_base64}"}, | |
| }, | |
| ] | |
| ) | |
| ] | |
| # Call the vision-capable model | |
| response = vision_llm.invoke(message) | |
| # Append extracted text | |
| all_text += response.content + "\n\n" | |
| return all_text.strip() | |
| except Exception as e: | |
| # You can choose whether to raise or just return an empty string / error message | |
| error_msg = f"Error extracting text: {str(e)}" | |
| print(error_msg) | |
| return "" | |
| def tavily_search(query: str) -> dict: | |
| """Search the web using Tavily and return a compact list of results as plain text.""" | |
| response = tavily_client.search(query=query, search_depth="advanced") | |
| return response | |
| def wikipedia_get_suggested_title_for_query(query: str) -> str: | |
| """Get the most relevant Wikipedia page title for a given query.""" | |
| try: | |
| suggested_title = wikipedia.suggest(query) | |
| return suggested_title if suggested_title else "" | |
| except Exception as e: | |
| print(f"Error getting Wikipedia suggestion: {str(e)}") | |
| return "" | |
| def wikipedia_search_pages(query: str): | |
| """ | |
| Search Wikipedia for a query and return a list of relevant page titles. | |
| """ | |
| try: | |
| search_results = wikipedia.search(query) | |
| return "\n".join(search_results) | |
| except Exception as e: | |
| print(f"Error searching Wikipedia: {str(e)}") | |
| return "" | |
| def wikipedia_get_page_summary(page_title: str, lang: str = "en") -> str: | |
| """ | |
| Get the summary of a Wikipedia page given its title. | |
| """ | |
| try: | |
| summary = wikipedia.summary(page_title) | |
| return summary | |
| except Exception as e: | |
| print(f"Error getting Wikipedia page summary: {str(e)}") | |
| return "" | |
| def wikipedia_get_page_full_content(page_title: str): | |
| """ | |
| Get the full content of a Wikipedia page given its title. | |
| We can access most properties using property methods. Example: | |
| ny = wikipedia.page("New York") | |
| ny.title | |
| u'New York' | |
| ny.url | |
| u'http://en.wikipedia.org/wiki/NewYork' | |
| ny.content | |
| u'New York is a state in the Northeastern region of the United States. New York is the 27th-most exten'... | |
| ny.images[0] | |
| u'http://upload.wikimedia.org/wikipedia/commons/9/91/New_York_quarter%2C_reverse_side%2C_2001.jpg' | |
| ny.links[0] | |
| u'1790 United States Census' | |
| """ | |
| try: | |
| page = wikipedia.page(page_title) | |
| return page.content | |
| except Exception as e: | |
| print(f"Error getting Wikipedia page content: {str(e)}") | |
| return "" | |
| def youtube_get_transcript_of_video(video_url: str): | |
| """ | |
| Get the transcript of a YouTube video given its URL. | |
| using the YouTube Data API or a third-party library | |
| This will return a FetchedTranscript object looking somewhat like this: | |
| FetchedTranscript( | |
| snippets=[ | |
| FetchedTranscriptSnippet( | |
| text="Hey there", | |
| start=0.0, | |
| duration=1.54, | |
| ), | |
| FetchedTranscriptSnippet( | |
| text="how are you", | |
| start=1.54, | |
| duration=4.16, | |
| ), | |
| # ... | |
| ], | |
| video_id="12345", | |
| language="English", | |
| language_code="en", | |
| is_generated=False, | |
| ) | |
| Do NOT run: `YouTubeTranscriptApi().fetch("https://www.youtube.com/watch?v=1234")` | |
| Instead run: `YouTubeTranscriptApi().fetch("1234")` | |
| """ | |
| # Placeholder implementation | |
| ytt_api = YouTubeTranscriptApi() | |
| # extract video ID from URL | |
| video_id = video_url.split("v=")[-1] | |
| fetched_transcript = ytt_api.fetch(video_id) | |
| return fetched_transcript | |
| def chessboard_image_to_text_description_to_fen_notation( | |
| image_path: str, color_to_move: str | |
| ) -> str: | |
| """ | |
| Converts a chessboard image into a textual description of the position and its FEN notation. | |
| Args: | |
| image_path: A local image file path (string) representing the chessboard position. | |
| color_to_move: A string indicating which color is to move ("white" or "black"). | |
| Returns: | |
| A string indicating the FEN notation of the chess position. | |
| """ | |
| all_text = "" | |
| try: | |
| # Read image and encode as base64 | |
| with open(image_path, "rb") as image_file: | |
| image_bytes = image_file.read() | |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| # Prepare the prompt including the base64 image data | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": ( | |
| "Draw a 8x8 table representing the chessboard." | |
| "Describe the chess position rank by rank from rank 8 to rank 1. " | |
| "For each rank, list what occupies each square from file a to file h. " | |
| "One square at a time, complete the table with the piece occupying that square if any, or with '1' if the square is empty. " | |
| "Once the table is complete, provide a textual description of the chessboard : uppercase letters for white pieces, lowercase letters for black pieces, and '1' for empty squares. " | |
| "the values '1' in the table are helpful to determine the number of consecutive empty squares in a row, which is necessary to determine the FEN notation. " | |
| "Based on this description, determine the FEN notation of the position." | |
| "Reminder: for the FEN notation, start counting from rank 8 to rank 1, and for each rank, count from file a to file h." | |
| "And if it is white to move, the FEN notation should end with 'w', and if it is black to move, the FEN notation should end with 'b'." | |
| "Finally, the FEN notation should finish with the string '- - 0 1'" | |
| ), | |
| }, | |
| { | |
| "type": "text", | |
| "text": (f"It is {color_to_move} to move in this position."), | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/png;base64,{image_base64}"}, | |
| }, | |
| ] | |
| ) | |
| ] | |
| # Call the vision-capable model | |
| response = vision_llm.invoke(message) | |
| # Append extracted text | |
| all_text += response.content + "\n\n" | |
| print(f"Extracted table description: {all_text.strip()}") | |
| return all_text.strip() | |
| except Exception as e: | |
| # You can choose whether to raise or just return an empty string / error message | |
| error_msg = f"Error extracting text: {str(e)}" | |
| print(error_msg) | |
| return "" | |
| def chessboard_get_fen_notation(image_path: str, color_to_move: str) -> str: | |
| """ | |
| Converts digital chessboard image into Forsyth-Edwards notation (FEN) notation | |
| Args: | |
| - image_path: A local image file path (string) representing the chessboard position. | |
| - color_to_move: A string indicating which color is to move ("white" or "black"). | |
| Returns: | |
| A string representing the chess position in FEN notation. | |
| """ | |
| all_text = "" | |
| try: | |
| # Read image and encode as base64 | |
| with open(image_path, "rb") as image_file: | |
| image_bytes = image_file.read() | |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| # Prepare the prompt including the base64 image data | |
| message = [ | |
| HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": ( | |
| "Describe the chess position rank by rank from rank 8 to rank 1. " | |
| "For each rank, list what occupies each square from a to h. " | |
| "Then convert your description to FEN notation." | |
| "Reminder: for the FEN notation, start counting from rank 8 to rank 1, and for each rank, count from file a to file h." | |
| "And if it is white to move, the FEN notation should end with 'w', and if it is black to move, the FEN notation should end with 'b'." | |
| "Finally, the FEN notation should finish with the string '- - 0 1'" | |
| ), | |
| }, | |
| { | |
| "type": "text", | |
| "text": (f"It is {color_to_move} to move in this position."), | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/png;base64,{image_base64}"}, | |
| }, | |
| ] | |
| ) | |
| ] | |
| # Call the vision-capable model | |
| response = vision_llm.invoke(message) | |
| # Append extracted text | |
| all_text += response.content + "\n\n" | |
| print(f"Extracted FEN notation: {all_text.strip()}") | |
| return all_text.strip() | |
| except Exception as e: | |
| # You can choose whether to raise or just return an empty string / error message | |
| error_msg = f"Error extracting text: {str(e)}" | |
| print(error_msg) | |
| return "" | |
| def get_best_next_move_from_fen(fen: str): | |
| """ | |
| requests Lichess API to get the best next move given a chess position in FEN notation. | |
| required parameters: | |
| - fen: A string representing the chess position in Forsyth-Edwards Notation (FEN). | |
| """ | |
| lichess_api_url = f"https://lichess.org/api/cloud-eval?fen={fen}" | |
| try: | |
| response = requests.get(lichess_api_url) | |
| if response.status_code == 200: | |
| data = response.json() | |
| pvs = data.get( | |
| "pvs", [] | |
| ) # list of principal variations (best move sequences) | |
| if pvs and isinstance(pvs, list): | |
| best_move = ( | |
| pvs[0].get("moves", "").split()[0] | |
| ) # Get the first move of the best sequence | |
| return best_move | |
| else: | |
| print(f"Error fetching best move from Lichess API: {response.status_code}") | |
| return "" | |
| except Exception as e: | |
| print(f"Exception occurred while fetching best move from Lichess API: {str(e)}") | |
| return "" | |
| def execute_python_code_with_subprocess(code: str) -> str: | |
| """ | |
| Executes Python code in a subprocess and returns the output as a string. | |
| This can be used to execute code from the GAIA level 1 tasks in a safe environment. | |
| Args: | |
| - code: A string containing the Python code to execute. | |
| Returns: | |
| - A string containing the standard output from the executed code, or an error message if execution fails. | |
| """ | |
| import subprocess | |
| import sys | |
| try: | |
| # Run the code in a subprocess and capture the output | |
| result = subprocess.run( | |
| [sys.executable, "-c", code], | |
| capture_output=True, | |
| text=True, | |
| timeout=60, # Set a timeout to prevent hanging | |
| ) | |
| return result.stdout.strip() | |
| except subprocess.TimeoutExpired: | |
| return "Error: Code execution timed out." | |
| except Exception as e: | |
| return f"Error executing code: {str(e)}" | |
| def transcribe_audio_file(audio_file_path: str) -> str: | |
| """ | |
| Transcribes an audio file to text using OpenAI's gpt-4o-transcribe model. | |
| Args: | |
| - audio_file_path: A string representing the local path to the audio file. | |
| Returns: | |
| - A string containing the transcribed text from the audio file, or an error message if transcription fails. | |
| """ | |
| from openai import OpenAI | |
| client = OpenAI(api_key=openai_token) | |
| try: | |
| with open(audio_file_path, "rb") as audio_file: | |
| transcript = client.audio.transcriptions.create( | |
| model="gpt-4o-transcribe", file=audio_file, response_format="text" | |
| ) | |
| return transcript.strip() | |
| except Exception as e: | |
| return f"Error transcribing audio: {str(e)}" | |
| def read_excel_file(file_path: str) -> str: | |
| """ | |
| Reads an Excel file and returns its content as a string. | |
| Args: | |
| - file_path: A string representing the local path to the Excel file. | |
| Returns: | |
| - A string containing the content of the Excel file, or an error message if reading fails. | |
| """ | |
| try: | |
| df = pd.read_excel(file_path) | |
| print(f"Excel file read successfully. DataFrame shape: {df.head()}") | |
| return df.to_string(index=False) | |
| except Exception as e: | |
| return f"Error reading Excel file: {str(e)}" | |
| def divide(a: float, b: float) -> float: | |
| """Divide a and b.""" | |
| return a / b | |
| def multiply(a: float, b: float) -> float: | |
| """Multiply a and b.""" | |
| return a * b | |
| def add(a: float, b: float) -> float: | |
| """Add a and b.""" | |
| return a + b | |
| def subtract(a: float, b: float) -> float: | |
| """Subtract b from a.""" | |
| return a - b | |
| tools = [ | |
| extract_text_from_image, | |
| divide, | |
| multiply, | |
| add, | |
| subtract, | |
| tavily_search, | |
| wikipedia_get_suggested_title_for_query, | |
| wikipedia_search_pages, | |
| wikipedia_get_page_summary, | |
| wikipedia_get_page_full_content, | |
| youtube_get_transcript_of_video, | |
| # chessboard_get_fen_notation, | |
| get_best_next_move_from_fen, | |
| chessboard_image_to_text_description_to_fen_notation, | |
| execute_python_code_with_subprocess, | |
| transcribe_audio_file, | |
| read_excel_file, | |
| ] | |
| def select_tools_for_input(input_file: Optional[str]): | |
| suffix = Path(input_file).suffix.lower() if input_file else "" | |
| # Cas tableur | |
| if suffix in [".xls", ".xlsx"]: | |
| print("Selecting tools for Excel file input.") | |
| return [ | |
| read_excel_file, | |
| execute_python_code_with_subprocess, | |
| add, | |
| subtract, | |
| multiply, | |
| divide, | |
| ] | |
| if suffix in [".py"]: | |
| print("Selecting tools for Python code input.") | |
| return [ | |
| execute_python_code_with_subprocess, | |
| add, | |
| subtract, | |
| multiply, | |
| divide, | |
| ] | |
| # Cas image | |
| if suffix in [".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif"]: | |
| return [ | |
| extract_text_from_image, | |
| chessboard_image_to_text_description_to_fen_notation, | |
| get_best_next_move_from_fen, | |
| ] | |
| # Fallback général | |
| return [ | |
| tavily_search, | |
| wikipedia_get_suggested_title_for_query, | |
| wikipedia_search_pages, | |
| wikipedia_get_page_summary, | |
| wikipedia_get_page_full_content, | |
| youtube_get_transcript_of_video, | |
| get_best_next_move_from_fen, | |
| chessboard_image_to_text_description_to_fen_notation, | |
| execute_python_code_with_subprocess, | |
| transcribe_audio_file, | |
| read_excel_file, | |
| add, | |
| subtract, | |
| multiply, | |
| divide, | |
| ] | |