Spaces:
Running
Running
| import re | |
| import sys | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import bs4 | |
| import numpy as np | |
| import requests | |
| from sambanova import SambaNova | |
| import yaml | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| class DocumentChunk: | |
| text: str | |
| source: str | |
| vector: np.ndarray | |
| def load_config(path: Path) -> dict: | |
| with path.open("r", encoding="utf-8") as f: | |
| return yaml.safe_load(f) | |
| def scrape_website(url: str) -> str: | |
| response = requests.get(url, timeout=15) | |
| response.raise_for_status() | |
| soup = bs4.BeautifulSoup(response.text, "html.parser") | |
| for tag in soup(["script", "style", "header", "footer", "nav", "aside"]): | |
| tag.decompose() | |
| text = soup.get_text(separator="\n") | |
| text = re.sub(r"\n{2,}", "\n", text).strip() | |
| return text | |
| def split_into_chunks(text: str, chunk_size: int = 400, overlap: int = 100) -> list[str]: | |
| sentences = [s.strip() for s in re.split(r"(?<=[\.\?\!])\s+", text) if s.strip()] | |
| chunks = [] | |
| current = "" | |
| for sentence in sentences: | |
| if len(current) + len(sentence) + 1 > chunk_size and current: | |
| chunks.append(current.strip()) | |
| current = current[-overlap:] if overlap < len(current) else current | |
| current += " " + sentence | |
| if current.strip(): | |
| chunks.append(current.strip()) | |
| return chunks | |
| def embed_texts(texts: list[str], embed_model: HuggingFaceEmbeddings = None) -> list[np.ndarray]: | |
| if not texts: | |
| return [] | |
| if embed_model: | |
| return embed_model.embed_documents(texts) | |
| def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: | |
| if np.linalg.norm(a) == 0 or np.linalg.norm(b) == 0: | |
| return 0.0 | |
| return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) | |
| def build_rag_corpus(config: dict, embed_model: HuggingFaceEmbeddings, url: str) -> list[DocumentChunk]: | |
| print(f"Scraping website: {url}") | |
| page_text = scrape_website(url) | |
| chunks = split_into_chunks(page_text) | |
| print(f"Split content into {len(chunks)} chunks") | |
| embeddings = embed_texts(chunks, embed_model) | |
| return [DocumentChunk(text=chunk, source=url, vector=np.array(vector)) for chunk, vector in zip(chunks, embeddings)] | |
| def retrieve_relevant_chunks(chunks: list[DocumentChunk], question: str, embed_model: HuggingFaceEmbeddings, top_k: int = 4) -> list[DocumentChunk]: | |
| question_embeddings = embed_texts([question], embed_model) | |
| if not question_embeddings: | |
| return chunks[:top_k] | |
| question_vector = np.array(question_embeddings[0]) | |
| scored = [ | |
| (chunk, cosine_similarity(question_vector, chunk.vector)) | |
| for chunk in chunks | |
| ] | |
| scored.sort(key=lambda item: item[1], reverse=True) | |
| return [chunk for chunk, _ in scored[:top_k]] | |
| def build_prompt(system_prompt: str, question: str, context_chunks: list[DocumentChunk]) -> str: | |
| context_text = "\n---\n".join(chunk.text for chunk in context_chunks) | |
| return ( | |
| f"{system_prompt}\n\n" | |
| f"Use the following extracted website text to answer the question clearly.\n" | |
| f"Context:\n{context_text}\n\n" | |
| f"Question: {question}\n" | |
| ) | |
| def create_llm_client(config: dict) -> SambaNova: | |
| return SambaNova( | |
| api_key=config.get("sambanova_api_key"), | |
| base_url="https://api.sambanova.ai/v1", | |
| timeout=30, | |
| ) | |
| def ask_model(prompt: str, client: SambaNova) -> str: | |
| response = client.chat.completions.create( | |
| model="DeepSeek-V3.1", | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=1056, | |
| temperature=0.2, | |
| ) | |
| return response.choices[0].message.content.strip() | |
| def format_answer(raw: str, chunks: list[DocumentChunk]) -> str: | |
| return raw | |
| def main() -> int: | |
| config_path = Path(__file__).parent / "config.yaml" | |
| if not config_path.exists(): | |
| print(f"Missing config file: {config_path}") | |
| return 1 | |
| config = load_config(config_path) | |
| llm_api_key = config.get("sambanova_api_key") | |
| website = config.get("website") | |
| system_prompt = config.get("system_prompt", "You are a helpful assistant.") | |
| if not llm_api_key or not website: | |
| print("Please set sambanova_api_key and website in config.yaml") | |
| return 1 | |
| embed_model = HuggingFaceEmbeddings(model_name=config.get("embedding_model")) | |
| chunks = build_rag_corpus(config, embed_model, website) | |
| client = create_llm_client(config) | |
| print("RAG corpus ready. Ask a question or type 'exit'.") | |
| while True: | |
| try: | |
| question = input("Question> ").strip() | |
| except EOFError: | |
| break | |
| if not question: | |
| continue | |
| if question.lower() in {"exit", "quit"}: | |
| break | |
| selected = retrieve_relevant_chunks(chunks, question, embed_model) | |
| prompt = build_prompt(system_prompt, question, selected) | |
| raw_answer = ask_model(prompt, client) | |
| response = format_answer(raw_answer, selected) | |
| print(response) | |
| print() | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |