CustomTopics / app.py
gmedin's picture
Create app.py
b7d7186 verified
import streamlit as st
import os
import json
import clickhouse_connect
# -------------------------------
# Config
# -------------------------------
CH_PASS = os.getenv("CH_PASS")
CH_DATABASE = os.getenv("CH_DATABASE", "topic_tables")
APP_PASSWORD = os.getenv("APP_PASSWORD")
# Check if credentials are available
if not CH_PASS:
st.error("CH_PASS environment variable not found. Please set it before running the app.")
st.stop()
if not APP_PASSWORD:
st.error("APP_PASSWORD environment variable not found. Please set it before running the app.")
st.stop()
# -------------------------------
# ClickHouse connection
# -------------------------------
def get_clickhouse_client():
"""Create a new ClickHouse client."""
return clickhouse_connect.get_client(
host='td6vvza14q.us-east-2.aws.clickhouse.cloud',
user='internal_tool_builder',
password=CH_PASS,
secure=True,
connect_timeout=30,
send_receive_timeout=60
)
# Brand and content type options
BRANDS = ["drumeo", "pianote", "guitareo", "singeo"]
CONTENT_TYPES = ["song", "lesson"]
# -------------------------------
# Streamlit App
# -------------------------------
st.set_page_config(page_title="Custom Topics Manager", layout="wide")
# -------------------------------
# Password Protection
# -------------------------------
# Initialize session state for authentication
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
# Show login form if not authenticated
if not st.session_state.authenticated:
st.title("Custom Topics Manager")
st.subheader("Please enter the password to continue")
# Create a form for password input
with st.form("login_form"):
password_input = st.text_input("Password", type="password")
submit_button = st.form_submit_button("Login")
if submit_button:
if password_input == APP_PASSWORD:
st.session_state.authenticated = True
st.success("Authentication successful!")
st.rerun()
else:
st.error("Incorrect password. Please try again.")
st.stop() # Stop execution here if not authenticated
# If we reach here, user is authenticated
st.title("Custom Topics Manager")
# Sidebar for brand and content type selection
st.sidebar.header("Settings")
brand = st.sidebar.selectbox("Select Brand", BRANDS, index=0)
content_type = st.sidebar.selectbox("Select Content Type", CONTENT_TYPES, index=0)
# Logout button
st.sidebar.divider()
if st.sidebar.button("πŸ”“ Logout"):
st.session_state.authenticated = False
st.rerun()
# Debug info
st.sidebar.divider()
st.sidebar.caption(f"ClickHouse password: {'βœ“' if CH_PASS else 'βœ—'}")
st.sidebar.caption(f"Database: {CH_DATABASE}")
# -------------------------------
# Load data from ClickHouse
# -------------------------------
# Table naming pattern: TOPIC_GROUPS_{BRAND}_{CONTENT_TYPE}S (e.g., TOPIC_GROUPS_DRUMEO_SONGS)
content_type_suffix = f"{content_type}s"
table_name = f"{CH_DATABASE}.TOPIC_GROUPS_{brand.upper()}_{content_type_suffix.upper()}"
try:
with st.spinner(f"Loading topics for {brand} ({content_type}s)..."):
# Get ClickHouse client
ch_client = get_clickhouse_client()
# Query all rows from the table including feedback columns
query = f"""
SELECT group_id, brand, content_type, title, item_ids, created_at,
accepted, suggested_title, reviewer_comments
FROM {table_name}
ORDER BY created_at DESC
"""
result = ch_client.query(query)
# Convert to list of dictionaries for easier processing
topics = []
for row in result.result_rows:
topics.append({
"group_id": row[0],
"brand": row[1],
"content_type": row[2],
"title": row[3],
"item_ids": row[4], # Already a JSON string
"created_at": str(row[5]),
"accepted": row[6],
"suggested_title": row[7],
"reviewer_comments": row[8]
})
st.success(f"Loaded {len(topics)} topics from {table_name}")
# -------------------------------
# Fetch content titles from DIM_CONTENT
# -------------------------------
with st.spinner("Loading content titles..."):
# Collect all unique content IDs from all topics
all_content_ids = set()
for topic in topics:
try:
item_ids = json.loads(topic['item_ids'])
all_content_ids.update(item_ids)
except:
pass
# Query DIM_CONTENT for titles
content_titles = {}
if all_content_ids:
# Convert to list and create IN clause
ids_list = list(all_content_ids)
ids_str = ",".join([f"'{id}'" for id in ids_list])
dim_content_query = f"""
SELECT CONTENT_ID, CONTENT_TITLE
FROM snowflake_synced_tables.DIM_CONTENT
WHERE CONTENT_ID IN ({ids_str})
"""
try:
content_result = ch_client.query(dim_content_query)
for row in content_result.result_rows:
content_titles[str(row[0])] = row[1]
st.success(f"Loaded titles for {len(content_titles)} content items")
except Exception as e:
st.warning(f"Could not load content titles: {e}")
st.info("Continuing without titles...")
# -------------------------------
# Display topics
# -------------------------------
st.header(f"Topics for {brand.capitalize()} - {content_type.capitalize()}s")
# Add search/filter
search_query = st.text_input("Search topics by title", "")
# Filter topics based on search
filtered_topics = []
for topic in topics:
if search_query.lower() in topic["title"].lower():
filtered_topics.append(topic)
if not filtered_topics:
st.warning("No topics match your search query.")
else:
st.write(f"Showing {len(filtered_topics)} of {len(topics)} topics")
# Display each topic
for topic in filtered_topics:
with st.expander(f"**{topic['title']}** ({topic['group_id']})", expanded=False):
col1, col2 = st.columns([1, 3])
with col1:
st.write("**Metadata:**")
st.write(f"- Brand: `{topic['brand']}`")
st.write(f"- Content Type: `{topic['content_type']}`")
st.write(f"- Created: `{topic['created_at']}`")
# Parse item_ids from JSON string
try:
item_ids = json.loads(topic['item_ids'])
st.write(f"- **Items:** {len(item_ids)}")
except:
item_ids = []
st.error("Error parsing item IDs")
with col2:
st.write("**Content Items:**")
if item_ids:
# Display as a numbered list with ID and title
for i, item_id in enumerate(item_ids, 1):
title = content_titles.get(str(item_id), "Title not found")
st.write(f"{i}. **{title}**")
st.write(f" `ID: {item_id}`")
else:
st.write("No items found")
# Feedback section
st.divider()
st.write("**Reviewer Feedback:**")
feedback_col1, feedback_col2 = st.columns([1, 2])
with feedback_col1:
# Accept/Reject checkbox
accepted = st.checkbox(
"βœ“ Accept this topic",
value=topic['accepted'] if topic['accepted'] is not None else False,
key=f"accept_{topic['group_id']}"
)
with feedback_col2:
# Suggested alternate title
suggested_title = st.text_input(
"Suggested alternate title (optional)",
value=topic['suggested_title'] if topic['suggested_title'] else "",
key=f"title_{topic['group_id']}"
)
# Reviewer comments
reviewer_comments = st.text_area(
"Comments (optional)",
value=topic['reviewer_comments'] if topic['reviewer_comments'] else "",
height=100,
key=f"comments_{topic['group_id']}"
)
# Save button
if st.button("πŸ’Ύ Save Feedback", key=f"save_{topic['group_id']}"):
try:
# Create a fresh client for the update operation
update_client = get_clickhouse_client()
# Update the database
update_query = f"""
ALTER TABLE {table_name}
UPDATE
accepted = %(accepted)s,
suggested_title = %(suggested_title)s,
reviewer_comments = %(reviewer_comments)s
WHERE group_id = %(group_id)s
"""
update_client.command(update_query, parameters={
"accepted": accepted,
"suggested_title": suggested_title if suggested_title else None,
"reviewer_comments": reviewer_comments if reviewer_comments else None,
"group_id": topic['group_id']
})
st.success("βœ“ Feedback saved!")
st.rerun() # Refresh to show updated data
except Exception as e:
st.error(f"Error saving feedback: {e}")
except Exception as e:
st.error(f"Error loading data from ClickHouse: {e}")
st.info(f"Troubleshooting steps:")
st.markdown(f"""
1. **Verify the table exists**: Check that `{table_name}` exists in ClickHouse
2. **Check authentication**: Make sure your CH_PASS environment variable is set correctly
3. **Verify database**: Confirm the database `{CH_DATABASE}` exists
4. **Test connection**: Try connecting to ClickHouse directly to verify credentials
If the table doesn't exist or is empty, you can:
- Run `python transfer.py` to copy data from HuggingFace to ClickHouse
- Or run `python gemini_topics_clickhouse.py` to generate new topics directly to ClickHouse
(Make sure to set `brand = "{brand}"` and `content_type = "{content_type}"` in the script first)
""")