Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import io | |
| import requests | |
| from lxml import etree | |
| from converter import ModsConverter | |
| from urllib.parse import quote, unquote | |
| st.set_page_config( | |
| page_title="PubTypeConverter | DORA Tools", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="collapsed" | |
| ) | |
| # Custom CSS for a modern, premium look | |
| st.markdown(""" | |
| <style> | |
| @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;600;700&display=swap'); | |
| html, body, [class*="css"] { | |
| font-family: 'Inter', sans-serif; | |
| } | |
| .stApp { | |
| background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%); | |
| } | |
| /* Premium Header */ | |
| .title-container { | |
| padding: 2rem 0; | |
| text-align: center; | |
| background: rgba(255, 255, 255, 0.4); | |
| backdrop-filter: blur(10px); | |
| border-radius: 20px; | |
| margin-bottom: 2rem; | |
| border: 1px solid rgba(255, 255, 255, 0.5); | |
| box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.07); | |
| } | |
| .main-title { | |
| font-size: 3rem; | |
| font-weight: 700; | |
| background: linear-gradient(90deg, #1e3a8a, #3b82f6); | |
| -webkit-background-clip: text; | |
| -webkit-text-fill-color: transparent; | |
| margin-bottom: 0.5rem; | |
| } | |
| .sub-title { | |
| color: #64748b; | |
| font-size: 1.1rem; | |
| } | |
| /* Section Styling */ | |
| .stSelectbox, .stTextInput, .stButton { | |
| margin-bottom: 1rem; | |
| } | |
| /* Card-like containers for results */ | |
| .result-card { | |
| background: white; | |
| padding: 1.5rem; | |
| border-radius: 15px; | |
| box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); | |
| margin-bottom: 1.5rem; | |
| border-left: 5px solid #3b82f6; | |
| } | |
| /* Step indicators */ | |
| .step-header { | |
| font-weight: 600; | |
| color: #1e293b; | |
| margin-bottom: 1rem; | |
| display: flex; | |
| align-items: center; | |
| gap: 0.5rem; | |
| } | |
| .step-number { | |
| background: #3b82f6; | |
| color: white; | |
| width: 24px; | |
| height: 24px; | |
| border-radius: 50%; | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| font-size: 0.8rem; | |
| } | |
| </style> | |
| <div class="title-container"> | |
| <div class="main-title">DORA PubTypeConverter</div> | |
| <div class="sub-title">DORA Publication Type Transformation Helper</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Setup paths | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| RESOURCE_DIR = os.path.join(BASE_DIR, "PubTypeConverter_resources") | |
| TEMPLATE_DIR = os.path.join(RESOURCE_DIR, "PubTypeConverter_templates") | |
| CONFIG_FILE = os.path.join(RESOURCE_DIR, "PubTypeConverterConfig.xml") | |
| # Initialize Session State | |
| if 'converter' not in st.session_state: | |
| converter = ModsConverter() | |
| if os.path.exists(CONFIG_FILE): | |
| converter.load_config(CONFIG_FILE) | |
| else: | |
| st.warning(f"Configuration file not found at {CONFIG_FILE}. Content moving rules will not be applied.") | |
| st.session_state.converter = converter | |
| if 'loaded_files' not in st.session_state: | |
| st.session_state.loaded_files = [] # List of dicts: {'name': str, 'content': bytes} | |
| # Load templates | |
| templates = [] | |
| if os.path.exists(TEMPLATE_DIR): | |
| for f in os.listdir(TEMPLATE_DIR): | |
| if f.lower().endswith(".xml"): | |
| templates.append(f) | |
| else: | |
| st.error(f"Template directory not found: {TEMPLATE_DIR}") | |
| templates.sort() | |
| # Helper function to fetch from DORA | |
| def fetch_from_dora(pid_or_url, repo): | |
| url = pid_or_url.strip() | |
| # Handle double-encoded PIDs (e.g. psi%253A84411 -> psi%3A84411 -> psi:84411) | |
| # We unquote until the string stabilizes | |
| prev = None | |
| while url != prev: | |
| prev = url | |
| url = unquote(url) | |
| fallback_www = None | |
| fallback_oai = None | |
| if not url.startswith("http"): | |
| # Construct URL from PID | |
| # Assume PID format like "psi:12345" or just "12345" | |
| if ":" in url: | |
| parts = url.split(":") | |
| repo_prefix = parts[0].lower() | |
| # Force prefix to lowercase for repo mapping | |
| pid_val = f"{repo_prefix}:{parts[1]}" | |
| repo = repo_prefix | |
| else: | |
| # Use selected repo | |
| pid_val = f"{repo}:{url}" | |
| # URL Encode the PID part - essential for Islandora | |
| quoted_pid = quote(pid_val) | |
| # Primary: Admin (Intranet, preferred) | |
| url = f"https://admin.dora.lib4ri.ch/{repo}/islandora/object/{quoted_pid}/datastream/MODS/view" | |
| # Fallback 1: WWW (Public mirror) | |
| fallback_www = f"https://www.dora.lib4ri.ch/{repo}/islandora/object/{quoted_pid}/datastream/MODS/view" | |
| # Fallback 2: OAI-PMH (Robust Public Access) | |
| # Use quoted PID for identifier to be safe (suggested by user example) | |
| fallback_oai = f"https://www.dora.lib4ri.ch/{repo}/oai2/request?verb=GetRecord&metadataPrefix=mods&identifier={quoted_pid}" | |
| headers = {'User-Agent': 'curl/7.68.0'} | |
| # helper to check OAI response for valid content | |
| def check_oai_response(content): | |
| try: | |
| root = etree.fromstring(content) | |
| # Check for OAI error code | |
| if root.xpath(".//*[local-name()='error']"): | |
| return False | |
| # Ultimate robust search: find first 'mods' element regardless of namespace/prefix | |
| mods_nodes = root.xpath(".//*[local-name()='mods']") | |
| if mods_nodes: | |
| return etree.tostring(mods_nodes[0], encoding='utf-8') | |
| except: | |
| pass | |
| return None | |
| try: | |
| # Try primary URL | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| return response.content, url | |
| except Exception as e: | |
| status_errors = [f"Admin: {e}"] | |
| # Try Fallback 1: WWW | |
| if fallback_www: | |
| try: | |
| response = requests.get(fallback_www, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| return response.content, fallback_www | |
| except Exception as e2: | |
| status_errors.append(f"WWW: {e2}") | |
| # Try Fallback 2: OAI-PMH | |
| if fallback_oai: | |
| try: | |
| response = requests.get(fallback_oai, headers=headers, timeout=10) | |
| if response.status_code == 200: | |
| mods_content = check_oai_response(response.content) | |
| if mods_content: | |
| return mods_content, fallback_oai | |
| else: | |
| status_errors.append("OAI: Valid HTTP but no MODS found in response") | |
| else: | |
| status_errors.append(f"OAI: HTTP {response.status_code}") | |
| except Exception as e3: | |
| status_errors.append(f"OAI: {e3}") | |
| return None, f"Failed to fetch. Details: {'; '.join(status_errors)}" | |
| # UI Layout | |
| main_col1, main_col2 = st.columns([0.6, 0.4], gap="large") | |
| with main_col1: | |
| st.markdown('<div class="step-header"><div class="step-number">1</div><span>Select Source Data</span></div>', unsafe_allow_html=True) | |
| input_tab1, input_tab2 = st.tabs(["π Pull from DORA", "π Upload Local XML"]) | |
| with input_tab1: | |
| dora_col1, dora_col2 = st.columns([0.7, 0.3]) | |
| dora_input = dora_col1.text_input("PID or URL", placeholder="e.g. psi:84411", label_visibility="collapsed") | |
| repo_select = dora_col2.selectbox("Repo", ["psi", "eawag", "empa", "wsl"], label_visibility="collapsed") | |
| if st.button("Fetch and Load Record", use_container_width=True): | |
| if dora_input: | |
| with st.spinner("Retrieving from DORA..."): | |
| content, error_or_url = fetch_from_dora(dora_input, repo_select) | |
| if content: | |
| filename = dora_input.replace(":", "_").replace("/", "_") + ".xml" | |
| if filename.startswith("http"): filename = "dora_record.xml" | |
| st.session_state.loaded_files.append({"name": filename, "content": content, "source": error_or_url}) | |
| st.toast(f"Loaded {filename}", icon="β ") | |
| else: | |
| st.error(f"Fetch failed: {error_or_url}") | |
| else: | |
| st.warning("Please provide a identifier first.") | |
| with input_tab2: | |
| uploaded_files = st.file_uploader("Upload MODS XML files", type=['xml'], accept_multiple_files=True, label_visibility="collapsed") | |
| # Display loaded files in a modern list | |
| if st.session_state.loaded_files: | |
| st.markdown("### Loaded Documents") | |
| for i, file_data in enumerate(st.session_state.loaded_files): | |
| with st.container(): | |
| f_col1, f_col2 = st.columns([0.85, 0.15]) | |
| f_col1.markdown(f"π **{file_data['name']}**") | |
| if f_col2.button("ποΈ", key=f"remove_{i}", help="Remove this file"): | |
| st.session_state.loaded_files.pop(i) | |
| st.rerun() | |
| with main_col2: | |
| st.markdown('<div class="step-header"><div class="step-number">2</div><span>Target Format</span></div>', unsafe_allow_html=True) | |
| selected_template = st.selectbox("Choose the destination publication type", templates if templates else ["No templates found"], label_visibility="collapsed") | |
| st.markdown("---") | |
| if st.button("π Start Conversion", disabled=not (uploaded_files or st.session_state.loaded_files) or not templates, use_container_width=True, type="primary"): | |
| st.session_state.start_convert = True | |
| else: | |
| st.session_state.start_convert = False | |
| # Combine sources | |
| all_files = [] | |
| if uploaded_files: | |
| for f in uploaded_files: | |
| all_files.append({"name": f.name, "content": f.getvalue()}) | |
| if st.session_state.loaded_files: | |
| all_files.extend(st.session_state.loaded_files) | |
| if st.session_state.get("start_convert"): | |
| st.markdown('<div class="step-header"><div class="step-number">3</div><span>Conversion Reports</span></div>', unsafe_allow_html=True) | |
| for file_data in all_files: | |
| content = file_data['content'] | |
| filename = file_data['name'] | |
| # Try to decode if bytes, though lxml can parse bytes directly | |
| # But we pass string/bytes/path to converter | |
| template_path = os.path.join(TEMPLATE_DIR, selected_template) | |
| try: | |
| # result_xml is the XML string, log is the structured data dict | |
| result_xml, log_data = st.session_state.converter.convert(content, template_path) | |
| with st.container(): | |
| st.markdown(f""" | |
| <div class="result-card"> | |
| <div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 1rem;"> | |
| <span style="font-weight: 700; font-size: 1.2rem; color: #1e3a8a;">{filename}</span> | |
| <span style="background: #dbeafe; color: #1e40af; padding: 0.2rem 0.8rem; border-radius: 999px; font-size: 0.8rem; font-weight: 600;">TRANSFORMED</span> | |
| </div> | |
| <div style="display: flex; align-items: center; gap: 1rem; margin-bottom: 1.5rem;"> | |
| <span style="background: #f1f5f9; padding: 0.4rem 1rem; border-radius: 8px; font-size: 0.9rem; border: 1px solid #e2e8f0;">{log_data['old_genre']}</span> | |
| <span style="color: #94a3b8;">β‘οΈ</span> | |
| <span style="background: #ecfdf5; color: #065f46; padding: 0.4rem 1rem; border-radius: 8px; font-size: 0.9rem; border: 1px solid #d1fae5; font-weight: 600;">{log_data['new_genre']}</span> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| s_col1, s_col2, s_col3 = st.columns(3) | |
| s_col1.metric("Transfers", len(log_data["moves"])) | |
| s_col2.metric("Additions", len(log_data["additions"])) | |
| s_col3.metric("Removals", len(log_data["deletions"])) | |
| with st.expander("Audit Transformation Details", expanded=False): | |
| if log_data["moves"]: | |
| st.write("**π Content Transfers**") | |
| for m in log_data["moves"]: | |
| st.caption(f"β’ {m['summary']}") | |
| if log_data["additions"]: | |
| st.write("**β¨ Smart Additions**") | |
| for a in log_data["additions"]: | |
| st.caption(f"β’ {a['summary']}") | |
| if log_data["deletions"]: | |
| st.write("**ποΈ Legacy Cleanup**") | |
| del_labels = [d['label'] for d in log_data["deletions"]] | |
| st.caption(f"Removed {len(del_labels)} unused fields: " + ", ".join(del_labels[:8]) + ("..." if len(del_labels) > 8 else "")) | |
| if log_data["warnings"]: | |
| for w in log_data["warnings"]: | |
| st.warning(w) | |
| if result_xml: | |
| st.download_button( | |
| label=f"β¬οΈ Download {filename}", | |
| data=result_xml, | |
| file_name=f"{os.path.splitext(filename)[0]}_converted.xml", | |
| mime="application/xml", | |
| key=f"dl_{filename}", | |
| use_container_width=True | |
| ) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| except Exception as e: | |
| st.error(f"Error converting {filename}: {e}") | |
| import traceback | |
| st.code(traceback.format_exc()) | |