import streamlit as st import os import io import requests from lxml import etree from converter import ModsConverter from urllib.parse import quote, unquote st.set_page_config( page_title="PubTypeConverter | DORA Tools", page_icon="🔄", layout="wide", initial_sidebar_state="collapsed" ) # Custom CSS for a modern, premium look st.markdown("""
DORA PubTypeConverter
DORA Publication Type Transformation Helper
""", unsafe_allow_html=True) # Setup paths BASE_DIR = os.path.dirname(os.path.abspath(__file__)) RESOURCE_DIR = os.path.join(BASE_DIR, "PubTypeConverter_resources") TEMPLATE_DIR = os.path.join(RESOURCE_DIR, "PubTypeConverter_templates") CONFIG_FILE = os.path.join(RESOURCE_DIR, "PubTypeConverterConfig.xml") # Initialize Session State if 'converter' not in st.session_state: converter = ModsConverter() if os.path.exists(CONFIG_FILE): converter.load_config(CONFIG_FILE) else: st.warning(f"Configuration file not found at {CONFIG_FILE}. Content moving rules will not be applied.") st.session_state.converter = converter if 'loaded_files' not in st.session_state: st.session_state.loaded_files = [] # List of dicts: {'name': str, 'content': bytes} # Load templates templates = [] if os.path.exists(TEMPLATE_DIR): for f in os.listdir(TEMPLATE_DIR): if f.lower().endswith(".xml"): templates.append(f) else: st.error(f"Template directory not found: {TEMPLATE_DIR}") templates.sort() # Helper function to fetch from DORA def fetch_from_dora(pid_or_url, repo): url = pid_or_url.strip() # Handle double-encoded PIDs (e.g. psi%253A84411 -> psi%3A84411 -> psi:84411) # We unquote until the string stabilizes prev = None while url != prev: prev = url url = unquote(url) fallback_www = None fallback_oai = None if not url.startswith("http"): # Construct URL from PID # Assume PID format like "psi:12345" or just "12345" if ":" in url: parts = url.split(":") repo_prefix = parts[0].lower() # Force prefix to lowercase for repo mapping pid_val = f"{repo_prefix}:{parts[1]}" repo = repo_prefix else: # Use selected repo pid_val = f"{repo}:{url}" # URL Encode the PID part - essential for Islandora quoted_pid = quote(pid_val) # Primary: Admin (Intranet, preferred) url = f"https://admin.dora.lib4ri.ch/{repo}/islandora/object/{quoted_pid}/datastream/MODS/view" # Fallback 1: WWW (Public mirror) fallback_www = f"https://www.dora.lib4ri.ch/{repo}/islandora/object/{quoted_pid}/datastream/MODS/view" # Fallback 2: OAI-PMH (Robust Public Access) # Use quoted PID for identifier to be safe (suggested by user example) fallback_oai = f"https://www.dora.lib4ri.ch/{repo}/oai2/request?verb=GetRecord&metadataPrefix=mods&identifier={quoted_pid}" headers = {'User-Agent': 'curl/7.68.0'} # helper to check OAI response for valid content def check_oai_response(content): try: root = etree.fromstring(content) # Check for OAI error code if root.xpath(".//*[local-name()='error']"): return False # Ultimate robust search: find first 'mods' element regardless of namespace/prefix mods_nodes = root.xpath(".//*[local-name()='mods']") if mods_nodes: return etree.tostring(mods_nodes[0], encoding='utf-8') except: pass return None try: # Try primary URL response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() return response.content, url except Exception as e: status_errors = [f"Admin: {e}"] # Try Fallback 1: WWW if fallback_www: try: response = requests.get(fallback_www, headers=headers, timeout=10) response.raise_for_status() return response.content, fallback_www except Exception as e2: status_errors.append(f"WWW: {e2}") # Try Fallback 2: OAI-PMH if fallback_oai: try: response = requests.get(fallback_oai, headers=headers, timeout=10) if response.status_code == 200: mods_content = check_oai_response(response.content) if mods_content: return mods_content, fallback_oai else: status_errors.append("OAI: Valid HTTP but no MODS found in response") else: status_errors.append(f"OAI: HTTP {response.status_code}") except Exception as e3: status_errors.append(f"OAI: {e3}") return None, f"Failed to fetch. Details: {'; '.join(status_errors)}" # UI Layout main_col1, main_col2 = st.columns([0.6, 0.4], gap="large") with main_col1: st.markdown('
1
Select Source Data
', unsafe_allow_html=True) input_tab1, input_tab2 = st.tabs(["🌐 Pull from DORA", "📁 Upload Local XML"]) with input_tab1: dora_col1, dora_col2 = st.columns([0.7, 0.3]) dora_input = dora_col1.text_input("PID or URL", placeholder="e.g. psi:84411", label_visibility="collapsed") repo_select = dora_col2.selectbox("Repo", ["psi", "eawag", "empa", "wsl"], label_visibility="collapsed") if st.button("Fetch and Load Record", use_container_width=True): if dora_input: with st.spinner("Retrieving from DORA..."): content, error_or_url = fetch_from_dora(dora_input, repo_select) if content: filename = dora_input.replace(":", "_").replace("/", "_") + ".xml" if filename.startswith("http"): filename = "dora_record.xml" st.session_state.loaded_files.append({"name": filename, "content": content, "source": error_or_url}) st.toast(f"Loaded {filename}", icon="✅") else: st.error(f"Fetch failed: {error_or_url}") else: st.warning("Please provide a identifier first.") with input_tab2: uploaded_files = st.file_uploader("Upload MODS XML files", type=['xml'], accept_multiple_files=True, label_visibility="collapsed") # Display loaded files in a modern list if st.session_state.loaded_files: st.markdown("### Loaded Documents") for i, file_data in enumerate(st.session_state.loaded_files): with st.container(): f_col1, f_col2 = st.columns([0.85, 0.15]) f_col1.markdown(f"📄 **{file_data['name']}**") if f_col2.button("🗑️", key=f"remove_{i}", help="Remove this file"): st.session_state.loaded_files.pop(i) st.rerun() with main_col2: st.markdown('
2
Target Format
', unsafe_allow_html=True) selected_template = st.selectbox("Choose the destination publication type", templates if templates else ["No templates found"], label_visibility="collapsed") st.markdown("---") if st.button("🚀 Start Conversion", disabled=not (uploaded_files or st.session_state.loaded_files) or not templates, use_container_width=True, type="primary"): st.session_state.start_convert = True else: st.session_state.start_convert = False # Combine sources all_files = [] if uploaded_files: for f in uploaded_files: all_files.append({"name": f.name, "content": f.getvalue()}) if st.session_state.loaded_files: all_files.extend(st.session_state.loaded_files) if st.session_state.get("start_convert"): st.markdown('
3
Conversion Reports
', unsafe_allow_html=True) for file_data in all_files: content = file_data['content'] filename = file_data['name'] # Try to decode if bytes, though lxml can parse bytes directly # But we pass string/bytes/path to converter template_path = os.path.join(TEMPLATE_DIR, selected_template) try: # result_xml is the XML string, log is the structured data dict result_xml, log_data = st.session_state.converter.convert(content, template_path) with st.container(): st.markdown(f"""
{filename} TRANSFORMED
{log_data['old_genre']} ➡️ {log_data['new_genre']}
""", unsafe_allow_html=True) s_col1, s_col2, s_col3 = st.columns(3) s_col1.metric("Transfers", len(log_data["moves"])) s_col2.metric("Additions", len(log_data["additions"])) s_col3.metric("Removals", len(log_data["deletions"])) with st.expander("Audit Transformation Details", expanded=False): if log_data["moves"]: st.write("**🔄 Content Transfers**") for m in log_data["moves"]: st.caption(f"• {m['summary']}") if log_data["additions"]: st.write("**✨ Smart Additions**") for a in log_data["additions"]: st.caption(f"• {a['summary']}") if log_data["deletions"]: st.write("**🗑️ Legacy Cleanup**") del_labels = [d['label'] for d in log_data["deletions"]] st.caption(f"Removed {len(del_labels)} unused fields: " + ", ".join(del_labels[:8]) + ("..." if len(del_labels) > 8 else "")) if log_data["warnings"]: for w in log_data["warnings"]: st.warning(w) if result_xml: st.download_button( label=f"⬇️ Download {filename}", data=result_xml, file_name=f"{os.path.splitext(filename)[0]}_converted.xml", mime="application/xml", key=f"dl_{filename}", use_container_width=True ) st.markdown('
', unsafe_allow_html=True) except Exception as e: st.error(f"Error converting {filename}: {e}") import traceback st.code(traceback.format_exc())