Spaces:
Build error
Build error
| """Custom tools go in here""" | |
| from google import genai | |
| from se_agents.tools import Tool | |
| from se_agents.agent import Agent | |
| from se_agents.runner import Runner | |
| from openai import OpenAI | |
| import types | |
| import requests | |
| from config import Config | |
| from langchain_community.document_loaders import ( | |
| UnstructuredExcelLoader, | |
| TextLoader, | |
| PyPDFLoader, | |
| ) | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from openai import Client | |
| from pathlib import Path | |
| from utils import read_file_content | |
| from google.genai import types | |
| class YoutubeVideoInterpreter(Tool): | |
| def __init__(self): | |
| super().__init__( | |
| name="youtube_video_interpreter", | |
| description="Given a certain youtube video url, it analyzes the video and response any question asked by the user", | |
| parameters={ | |
| "query": { | |
| "type": "string", | |
| "description": "question about the video", | |
| "required": True, | |
| }, | |
| "video_url": { | |
| "type": "string", | |
| "description": "the url of the video", | |
| "required": True, | |
| }, | |
| }, | |
| ) | |
| def execute(self, **kwargs) -> str: | |
| params = self._process_parameters(**kwargs) | |
| query = params.get("query") | |
| video_url = params.get("video_url") | |
| client = genai.Client(api_key=Config.get_gemini_api_key()) | |
| response = client.models.generate_content( | |
| model="models/gemini-2.0-flash", | |
| contents=types.Content( | |
| parts=[ | |
| types.Part(file_data=types.FileData(file_uri=video_url)), | |
| types.Part(text=query), | |
| ] | |
| ), | |
| ) | |
| return response.text | |
| class TaskFileDownloader(Tool): | |
| def __init__(self): | |
| super().__init__( | |
| name="task_file_downloader", | |
| description="Given a certain Taks id, it downloads the complementary file, outputs the path of the file", | |
| parameters={ | |
| "task_id": { | |
| "type": "string", | |
| "description": "the id of the task", | |
| "required": True, | |
| }, | |
| "complementary_file": { | |
| "type": "string", | |
| "description": "the name with extension of the file", | |
| "required": True, | |
| }, | |
| }, | |
| ) | |
| def execute(self, **kwargs) -> str: | |
| params = self._process_parameters(**kwargs) | |
| task_id = params.get("task_id") | |
| complementary_file_ext = params.get("complementary_file").split(".")[-1] | |
| response = requests.get(f"{Config.get_default_api_url()}/files/{task_id}") | |
| # Verify the request was successful | |
| if response.status_code == 200: | |
| # Save the file | |
| with open( | |
| f"{Config.get_task_file_folder()}/{task_id}_complementary_file.{complementary_file_ext}", | |
| "wb", | |
| ) as file: | |
| file.write(response.content) | |
| return f"{Config.get_task_file_folder()}/{task_id}_complementary_file.{complementary_file_ext}" | |
| else: | |
| return f"Failed to retrieve file. Status code: {response.status_code}" | |
| class RetriveInfoTaskFile(Tool): | |
| def __init__(self): | |
| super().__init__( | |
| name="retrive_info_task_file", | |
| description="Given a certain Taks file path, outputs info related to the file", | |
| parameters={ | |
| "complementary_file": { | |
| "type": "string", | |
| "description": "the path with extension of the file", | |
| "required": True, | |
| }, | |
| "query": { | |
| "type": "string", | |
| "description": "question about the file", | |
| "required": True, | |
| }, | |
| }, | |
| ) | |
| self.retriver = { | |
| ".xlsx": UnstructuredExcelLoader, | |
| ".pdf": PyPDFLoader, | |
| ".txt": TextLoader, | |
| } | |
| def execute(self, **kwargs) -> str: | |
| params = self._process_parameters(**kwargs) | |
| complementary_file = params.get("complementary_file") | |
| query = params.get("query") | |
| complementary_file_path = Path(complementary_file) | |
| # Validate file format | |
| if complementary_file_path.suffix not in self.retriver: | |
| return f"Unsupported file format: {complementary_file_path.suffix}" | |
| # Load and process the document | |
| loader = self.retriver[complementary_file_path.suffix](complementary_file_path) | |
| documents = loader.load() | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, chunk_overlap=100 | |
| ) | |
| docs = text_splitter.split_documents(documents) | |
| # Embed and retrieve relevant information | |
| embeddings = OpenAIEmbeddings() | |
| vectorstore = Chroma.from_documents(docs, embeddings) | |
| retriever = vectorstore.as_retriever() | |
| results = retriever.invoke(query) | |
| # Return formatted results or a fallback message | |
| return ( | |
| "\n\n".join([doc.page_content for doc in results[:3]]) | |
| if results | |
| else "No relevant information found." | |
| ) | |
| class SpeechToText(Tool): | |
| def __init__(self): | |
| super().__init__( | |
| name="speech_to_text", | |
| description="Given a certain audio file path, outputs the path to a .txt file with the transcription", | |
| parameters={ | |
| "audio_file_path": { | |
| "type": "string", | |
| "description": "the name with extension of the file", | |
| "required": True, | |
| }, | |
| }, | |
| ) | |
| async def execute(self, **kwargs) -> str: | |
| params = self._process_parameters(**kwargs) | |
| audio_file_path = params.get("audio_file_path") | |
| client = OpenAI(api_key=Config.get_openai_api_key()) | |
| if Path(audio_file_path).exists(): | |
| with open(audio_file_path, "rb") as audio_file: | |
| transcription = client.audio.transcriptions.create( | |
| model="gpt-4o-transcribe", file=audio_file | |
| ) | |
| # Write transcription to a .txt file | |
| txt_file_path = Path(audio_file_path).with_suffix(".txt") | |
| with open(txt_file_path, "w") as txt_file: | |
| txt_file.write(transcription.text) | |
| return f"Transcription saved to {txt_file_path}" | |
| else: | |
| return "Audio file does not exist" | |
| class CodeInterpreter(Tool): | |
| def __init__(self): | |
| super().__init__( | |
| name="code_interpreter", | |
| description="Given a certain code file path, outputs the content of the file", | |
| parameters={ | |
| "code_file_path": { | |
| "type": "string", | |
| "description": "the name with extension of the file", | |
| "required": True, | |
| }, | |
| }, | |
| ) | |
| async def execute(self, **kwargs) -> str: | |
| params = self._process_parameters(**kwargs) | |
| code_file_path = params.get("code_file_path") | |
| client = OpenAI(api_key=Config.get_openai_api_key()) | |
| if Path(code_file_path).exists(): | |
| content = read_file_content(code_file_path) | |
| return content | |
| else: | |
| return "Code file does not exist" | |