Spaces:
Sleeping
Sleeping
File size: 12,641 Bytes
560caa5 4a5482a 560caa5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 | import streamlit as st
import os
import google.generativeai as genai
from dotenv import load_dotenv
import time
from typing import Any, List, Optional
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
# Load environment variables
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
# Configure Generative AI model
if GOOGLE_API_KEY:
genai.configure(api_key=GOOGLE_API_KEY)
else:
st.error(
"Google AI Studio API key not found. Please add it to your .env file. "
"You can obtain an API key from https://makersuite.google.com/."
)
st.stop()
st.title("Embeddings and Vector Search Demo")
st.subheader("Explore Embeddings and Vector Databases")
# Sidebar for explanations
with st.sidebar:
st.header("Embeddings and Vector Search")
st.markdown(
"""
This app demonstrates how embeddings and vector databases can be used for various tasks.
"""
)
st.subheader("Key Concepts:")
st.markdown(
"""
- **Embeddings**: Numerical representations of text, capturing semantic meaning.
- **Vector Databases**: Databases optimized for storing and querying vectors (simulated here).
- **Retrieval Augmented Generation (RAG)**: Combining retrieval with LLM generation.
- **Cosine Similarity**: A measure of similarity between two vectors.
- **Neural Networks**: Using embeddings as input for classification.
"""
)
st.subheader("Whitepaper Insights")
st.markdown(
"""
- Efficient similarity search using vector indexes (e.g., ANN).
- Handling large datasets and scalability considerations.
- Applications of embeddings: search, recommendation, classification, etc.
"""
)
# --- Helper Functions ---
def code_block(text: str, language: str = "text") -> None:
"""Displays text as a formatted code block in Streamlit."""
st.markdown(f"```{language}\n{text}\n```", unsafe_allow_html=True)
def display_response(response: Any) -> None:
"""Displays the model's response."""
if response and hasattr(response, "text"):
st.subheader("Generated Response:")
st.markdown(response.text)
else:
st.error("Failed to generate a response.")
def generate_embeddings(texts: List[str], model_name: str = "models/embedding-001") -> Optional[List[List[float]]]:
"""Generates embeddings for a list of texts using a specified model.
Args:
texts: List of text strings.
model_name: Name of the embedding model.
Returns:
List of embeddings (list of floats) or None on error.
"""
try:
# Use the embedding model directly
embeddings = []
for text in texts:
result = genai.embed_content(
model=model_name,
content=text,
task_type="retrieval_document" # or "retrieval_query" for queries
)
embeddings.append(result['embedding'])
return embeddings
except Exception as e:
st.error(f"Error generating embeddings with model '{model_name}': {e}")
return None
def generate_with_retry(prompt: str, model_name: str, generation_config: genai.types.GenerationConfig, max_retries: int = 3, delay: int = 5) -> Any:
"""Generates content with retry logic and error handling."""
for i in range(max_retries):
try:
model = genai.GenerativeModel(model_name)
response = model.generate_content(prompt, generation_config=generation_config)
return response
except Exception as e:
error_message = str(e)
st.warning(f"Error during generation (attempt {i + 1}/{max_retries}): {error_message}")
if "404" in error_message and "not found" in error_message:
st.error(
f"Model '{model_name}' is not available or not supported. Please select a different model."
)
return None
elif i < max_retries - 1:
st.info(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
st.error(f"Failed to generate content after {max_retries} attempts. Please check your prompt and model.")
return None
return None
def calculate_similarity(embedding1: List[float], embedding2: List[float]) -> float:
"""Calculates the cosine similarity between two embeddings."""
return cosine_similarity(np.array(embedding1).reshape(1, -1), np.array(embedding2).reshape(1, -1))[0][0]
def create_and_train_model(
embeddings: List[List[float]],
labels: List[int],
num_classes: int,
epochs: int,
batch_size: int,
learning_rate: float,
optimizer_str: str
) -> tf.keras.Model:
"""Creates and trains a neural network for classification."""
model = Sequential([
Input(shape=(len(embeddings[0]),), # Fixed the double comma here
Dense(64, activation='relu'),
Dense(32, activation='relu'),
Dense(num_classes, activation='softmax')
])
if optimizer_str.lower() == 'adam':
optimizer = Adam(learning_rate=learning_rate)
elif optimizer_str.lower() == 'sgd':
optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate)
elif optimizer_str.lower() == 'rmsprop':
optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)
else:
optimizer = Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
encoded_labels = to_categorical(labels, num_classes=num_classes)
model.fit(np.array(embeddings), encoded_labels, epochs=epochs, batch_size=batch_size, verbose=0)
return model
# --- RAG Question Answering ---
st.header("RAG Question Answering")
rag_model_name = st.selectbox("Select model for RAG:", ["gemini-pro"], index=0)
rag_embedding_model = st.selectbox("Select embedding model for RAG:", ["models/embedding-001"], index=0)
rag_context = st.text_area(
"Enter your context documents:",
"Relevant information to answer the question. Separate documents with newlines.",
height=150,
)
rag_question = st.text_area("Ask a question about the context:", "What is the main topic?", height=70)
rag_max_context_length = st.number_input("Maximum Context Length", min_value=100, max_value=2000, value=500, step=100)
if st.button("Answer with RAG"):
if not rag_context or not rag_question:
st.warning("Please provide both context and a question.")
else:
with st.spinner("Generating answer..."):
try:
# 1. Generate embeddings for the context
context_embeddings = generate_embeddings(rag_context.split('\n'), rag_embedding_model)
if not context_embeddings:
st.stop()
# 2. Generate embedding for the question
question_embedding = generate_embeddings([rag_question], rag_embedding_model)
if not question_embedding:
st.stop()
# 3. Calculate similarity scores
similarities = cosine_similarity(np.array(question_embedding).reshape(1, -1), np.array(context_embeddings))[0]
# 4. Find the most relevant document(s)
most_relevant_index = np.argmax(similarities)
relevant_context = rag_context.split('\n')[most_relevant_index]
if len(relevant_context) > rag_max_context_length:
relevant_context = relevant_context[:rag_max_context_length]
# 5. Construct the prompt
rag_prompt = f"Use the following context to answer the question: '{rag_question}'.\nContext: {relevant_context}"
# 6. Generate the answer
response = generate_with_retry(rag_prompt, rag_model_name, generation_config=genai.types.GenerationConfig())
if response:
display_response(response)
except Exception as e:
st.error(f"An error occurred: {e}")
# --- Text Similarity ---
st.header("Text Similarity")
similarity_embedding_model = st.selectbox("Select embedding model for similarity:", ["models/embedding-001"], index=0)
text1 = st.text_area("Enter text 1:", "This is the first sentence.", height=70)
text2 = st.text_area("Enter text 2:", "This is a similar sentence.", height=70)
if st.button("Calculate Similarity"):
if not text1 or not text2:
st.warning("Please provide both texts.")
else:
with st.spinner("Calculating similarity..."):
try:
embeddings = generate_embeddings([text1, text2], similarity_embedding_model)
if not embeddings:
st.stop()
similarity = calculate_similarity(embeddings[0], embeddings[1])
st.subheader("Cosine Similarity:")
st.write(similarity)
except Exception as e:
st.error(f"An error occurred: {e}")
# --- Neural Classification ---
st.header("Neural Classification with Embeddings")
classification_embedding_model = st.selectbox("Select embedding model for classification:", ["models/embedding-001"], index=0)
classification_data = st.text_area(
"Enter your training data (text, label pairs), separated by newlines. Example: text1,0\\ntext2,1",
"text1,0\ntext2,1\ntext3,0\ntext4,1",
height=150,
)
classification_prompt = st.text_area("Enter text to classify:", "This is a test text.", height=70)
num_epochs = st.number_input("Number of Epochs", min_value=1, max_value=200, value=10, step=1)
batch_size = st.number_input("Batch Size", min_value=1, max_value=128, value=32, step=1)
learning_rate = st.number_input("Learning Rate", min_value=0.0001, max_value=0.1, value=0.0001, step=0.0001, format="%.4f")
optimizer_str = st.selectbox("Optimizer", ['adam', 'sgd', 'rmsprop'], index=0)
def process_classification_data(data: str) -> Optional[tuple[List[str], List[int]]]:
"""Processes the classification data string into lists of texts and labels."""
data_pairs = [line.split(',') for line in data.split('\n') if ',' in line]
if not data_pairs:
st.error("No valid data pairs found. Please ensure each line contains 'text,label'.")
return None
texts = []
labels = []
for i, pair in enumerate(data_pairs):
if len(pair) != 2:
st.error(f"Invalid data format in line {i + 1}: '{','.join(pair)}'. Expected 'text,label'.")
return None
text = pair[0].strip()
label_str = pair[1].strip()
try:
label = int(label_str)
texts.append(text)
labels.append(label)
except ValueError:
st.error(f"Invalid label value in line {i + 1}: '{label_str}'. Label must be an integer.")
return None
return texts, labels
if st.button("Classify"):
if not classification_data or not classification_prompt:
st.warning("Please provide training data and text to classify.")
else:
with st.spinner("Classifying..."):
try:
processed_data = process_classification_data(classification_data)
if not processed_data:
st.stop()
train_texts, train_labels = processed_data
num_classes = len(set(train_labels))
train_embeddings = generate_embeddings(train_texts, classification_embedding_model)
if not train_embeddings:
st.stop()
model = create_and_train_model(
train_embeddings, train_labels, num_classes, num_epochs, batch_size, learning_rate, optimizer_str
)
predict_embedding = generate_embeddings([classification_prompt], classification_embedding_model)
if not predict_embedding:
st.stop()
prediction = model.predict(np.array([predict_embedding]), verbose=0)
predicted_class = np.argmax(prediction[0])
st.subheader("Predicted Class:")
st.write(predicted_class)
st.subheader("Prediction Probabilities:")
st.write(prediction)
except Exception as e:
st.error(f"An error occurred: {e}") |