AgentBase-Platform / pages /interface.py
Arastun's picture
fix: dataframe rendering error
65b059f
Raw
History Blame Contribute Delete
7.04 kB
"""
AgentBase Visualisation UI.
Author: Arastun Mammadli
Date: [Current Date]
"""
from typing import List, Tuple
from pathlib import Path
import streamlit as st
import pandas as pd
import numpy as np
from retrieval.models.bm25 import BM25Retriever
from retrieval.models.sentence_bert import DenseRetriever
from retrieval.utils import load_queries
@st.cache_resource()
def load_retrievers(agentbase_path: str, index_configs: List[str]) -> Tuple[dict, dict]:
bm25s = {}
bges = {}
toolrets = {}
for idx_config in index_configs:
bm25s[idx_config] = BM25Retriever(agentbase_path, index_config=idx_config)
bges[idx_config] = DenseRetriever("BAAI/bge-large-en-v1.5", agentbase_path, index_config=idx_config)
toolrets[idx_config] = DenseRetriever("mangopy/ToolRet-trained-bge-large-en-v1.5", agentbase_path, index_config=idx_config)
return bm25s, bges, toolrets
@st.cache_resource()
def load_agentbase_data(agentbase_path: str) -> pd.DataFrame:
return pd.read_csv(agentbase_path)
def keyword_filter(query, top_k, df, columns=["agent_name", "agent_description"]) -> List[Tuple[str, float]]:
"""
Simple keyword-based boolean filter across specified columns.
"""
mask = df[columns].astype(str).apply(
lambda col: col.str.contains(query, case=False, na=False)
).any(axis=1)
filtered_df = df[mask].head(top_k).copy()
filtered_df["scores"] = 1
return filtered_df
class AgentBaseUI:
"""
AgentBase Streamlit-based UI Components.
"""
def __init__(self, agentbase_path, platforms_path):
self.agents_df = load_agentbase_data(agentbase_path)
self.platforms_df = pd.read_csv(platforms_path)
self.bm25s, self.bges, self.toolrets = load_retrievers(agentbase_path, index_configs=["v1", "naive"])
# selection options and defaults
self.retrieval_models = ["bge-large", "toolret", "bm25", "keyword"]
self.selected_model = "bge-large"
self.indexing_configs = ["v1", "naive"]
self.indexing_config = "v1"
def header_panel(self):
st.title("AgentBase Platform Demo")
st.write("A Large-Scale Agent Collection for Automated Agent Recommendation.")
st.subheader("🔍 Retrieval")
if "query" not in st.session_state:
st.session_state.query = ""
query_suggestions = list(load_queries("data/samples.json").values())
suggestion_cols = st.columns(len(query_suggestions))
for i, suggestion in enumerate(query_suggestions):
if suggestion_cols[i].button(suggestion):
st.session_state.query = suggestion
col1, col2, col3 = st.columns([4, 1, 1])
with col1:
st.text_input("", placeholder="Type to search...", key="query")
with col2:
self.selected_model = st.selectbox("", self.retrieval_models, index=0)
with col3:
self.indexing_config = st.selectbox("", self.indexing_configs, index=0)
_, col2 = st.columns([2, 1])
with col2:
with st.expander("See explanation"):
st.write('''
- **Retrieval Models**:
- **BGE-Large**: a dense retrieval model.
- **ToolRet**: a dense retrieval model fine-tuned for tool search.
- **BM25**: a sparse retrieval model.
- **Keyword**: simple boolean keyword matching.
- **Indexing Configurations**:
- **v1**: using all columns with priority ordering (e.g., name, description come first).
- **naive**: using agent name and description only.
''')
def retrieval_panel(self):
top_k = st.slider("Top K", 3, 100, 5)
if st.session_state.query:
self.filtered_df = self.retrieve_agents(st.session_state.query, top_k)
else:
self.filtered_df = self.agents_df.copy()
self.filtered_df['scores'] = 0.0
if len(self.filtered_df) > 0:
st.write(f"Showing {top_k} of {len(self.agents_df)} agents")
agent_config = { # clean column display
"agent_url": st.column_config.LinkColumn("agent_url", display_text="Visit →"),
"agent_description": st.column_config.TextColumn("agent_description", width="large"),
"agent_accessibility": st.column_config.TextColumn("agent_accessibility", width="small"),
"agent_pricing": st.column_config.TextColumn("agent_pricing", width="medium"),
"base_model": st.column_config.TextColumn("base_model", width="medium"),
}
key_columns = ['agent_name', 'platform_name', 'agent_description', 'agent_pricing', 'base_model', 'agent_url', 'scores']
if (self.filtered_df['scores'] == 0).all(): key_columns.remove("scores")
st.dataframe(
self.filtered_df[key_columns].head(top_k),
column_config=agent_config,
use_container_width=True,
hide_index=True
)
else:
st.info("No agents match your search.")
def retrieve_agents(self, query, top_k=100) -> pd.DataFrame:
"""
Returns a filtered dataframe with updated scores.
Default maximum top_k of 100
"""
if self.selected_model == 'keyword':
return keyword_filter(query, top_k, self.agents_df)
elif self.selected_model == 'bm25':
res = self.bm25s[self.indexing_config].retrieve(query, top_k)
elif self.selected_model == 'bge-large':
res = self.bges[self.indexing_config].retrieve(query, top_k)
elif self.selected_model == 'toolret':
res = self.toolrets[self.indexing_config].retrieve(query, top_k)
else:
raise ValueError(f"Selected model must be one of {self.retrieval_models}")
self.agents_df["scores"] = 0.0
agent_ids, _ = zip(*res)
filtered_df = self.agents_df.loc[self.agents_df.agent_id.isin(agent_ids)]
for index, row in filtered_df.iterrows():
score = dict(res).get(row['agent_id'], 0)
filtered_df.at[index, 'scores'] = score
return filtered_df.sort_values(by="scores", ascending=False)
def info_panel(self):
with st.expander(f"View AgentBase-v1.1"):
st.dataframe(
self.agents_df,
use_container_width=True,
hide_index=True
)
st.dataframe(
self.platforms_df,
use_container_width=True,
hide_index=True
)
if __name__ == "__main__":
BASE_DIR = Path(__file__).resolve().parent
agentbase_path = BASE_DIR / "../data/agentbase-v1.1.csv"
platforms_path = BASE_DIR / "../data/platforms.csv"
agentbaseui = AgentBaseUI(agentbase_path, platforms_path)
agentbaseui.header_panel()
agentbaseui.retrieval_panel()
agentbaseui.info_panel()