| """ |
| Core shared utilities for the Nymbo-Tools MCP server. |
| |
| Consolidates three key areas: |
| 1. Sandboxed filesystem operations (path resolution, reading, writing, safe_open) |
| 2. Sandboxed Python execution (code interpreter, agent terminal) |
| 3. Hugging Face inference utilities (token, providers, error handling) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import ast |
| import json |
| import os |
| import re |
| import stat |
| import sys |
| from datetime import datetime |
| from io import StringIO |
| from typing import Any, Callable, Optional, TypeVar |
|
|
| import gradio as gr |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _fmt_size(num_bytes: int) -> str: |
| """Format byte size as human-readable string.""" |
| units = ["B", "KB", "MB", "GB"] |
| size = float(num_bytes) |
| for unit in units: |
| if size < 1024.0: |
| return f"{size:.1f} {unit}" |
| size /= 1024.0 |
| return f"{size:.1f} TB" |
|
|
|
|
| def build_tree(entries: list[tuple[str, dict]]) -> dict: |
| """ |
| Build a nested tree structure from flat path entries. |
| |
| Args: |
| entries: List of (path, metadata) tuples where path uses forward slashes. |
| Paths ending with '/' are treated as directories. |
| |
| Returns: |
| Nested dict with "__files__" key for files at each level. |
| """ |
| root: dict = {"__files__": []} |
| |
| for path, metadata in entries: |
| parts = path.rstrip("/").split("/") |
| is_dir = path.endswith("/") |
| |
| node = root |
| for i, part in enumerate(parts[:-1]): |
| if part not in node: |
| node[part] = {"__files__": []} |
| node = node[part] |
| |
| final = parts[-1] |
| if is_dir: |
| if final not in node: |
| node[final] = {"__files__": []} |
| if metadata: |
| node[final]["__meta__"] = metadata |
| else: |
| node["__files__"].append((final, metadata)) |
| |
| return root |
|
|
|
|
| def render_tree( |
| node: dict, |
| prefix: str = "", |
| format_entry: Optional[Callable[[str, dict, bool], str]] = None, |
| ) -> list[str]: |
| """ |
| Render a tree with line connectors. |
| |
| Args: |
| node: Nested dict from build_tree() |
| prefix: Current line prefix for indentation |
| format_entry: Optional callback to format each entry. |
| |
| Returns: |
| List of formatted lines. |
| """ |
| result = [] |
| |
| def default_format(name: str, meta: dict, is_dir: bool) -> str: |
| if is_dir: |
| return f"{name}/" |
| size = meta.get("size") |
| if size is not None: |
| return f"{name} ({_fmt_size(size)})" |
| return name |
| |
| fmt = format_entry or default_format |
| |
| entries = [] |
| subdirs = sorted(k for k in node.keys() if k not in ("__files__", "__meta__")) |
| files_here = sorted(node.get("__files__", []), key=lambda x: x[0]) |
| |
| for dirname in subdirs: |
| dir_meta = node[dirname].get("__meta__", {}) |
| entries.append(("dir", dirname, node[dirname], dir_meta)) |
| for fname, fmeta in files_here: |
| entries.append(("file", fname, None, fmeta)) |
| |
| for i, entry in enumerate(entries): |
| is_last = (i == len(entries) - 1) |
| connector = "└── " if is_last else "├── " |
| child_prefix = prefix + (" " if is_last else "│ ") |
| |
| etype, name, subtree, meta = entry |
| |
| if etype == "dir": |
| result.append(f"{prefix}{connector}{fmt(name, meta, True)}") |
| result.extend(render_tree(subtree, child_prefix, format_entry)) |
| else: |
| result.append(f"{prefix}{connector}{fmt(name, meta, False)}") |
| |
| return result |
|
|
|
|
| def walk_and_build_tree( |
| abs_path: str, |
| *, |
| show_hidden: bool = False, |
| recursive: bool = False, |
| max_entries: int = 100, |
| ) -> tuple[dict, int, bool]: |
| """ |
| Walk a directory and build a tree structure. |
| |
| Returns: |
| (tree, total_entries, truncated) |
| """ |
| entries: list[tuple[str, dict]] = [] |
| total = 0 |
| truncated = False |
| |
| for root, dirs, files in os.walk(abs_path): |
| if not show_hidden: |
| dirs[:] = [d for d in dirs if not d.startswith('.')] |
| files = [f for f in files if not f.startswith('.')] |
| |
| dirs.sort() |
| files.sort() |
| |
| try: |
| rel_root = os.path.relpath(root, abs_path) |
| except Exception: |
| rel_root = "" |
| prefix = "" if rel_root == "." else rel_root.replace("\\", "/") + "/" |
| |
| for d in dirs: |
| p = os.path.join(root, d) |
| try: |
| mtime = datetime.fromtimestamp(os.path.getmtime(p)).strftime("%Y-%m-%d %H:%M") |
| except Exception: |
| mtime = "?" |
| entries.append((f"{prefix}{d}/", {"mtime": mtime})) |
| total += 1 |
| if total >= max_entries: |
| truncated = True |
| break |
| |
| if truncated: |
| break |
| |
| for f in files: |
| p = os.path.join(root, f) |
| try: |
| size = os.path.getsize(p) |
| mtime = datetime.fromtimestamp(os.path.getmtime(p)).strftime("%Y-%m-%d %H:%M") |
| except Exception: |
| size, mtime = 0, "?" |
| entries.append((f"{prefix}{f}", {"size": size, "mtime": mtime})) |
| total += 1 |
| if total >= max_entries: |
| truncated = True |
| break |
| |
| if truncated: |
| break |
| |
| if not recursive: |
| break |
| |
| return build_tree(entries), total, truncated |
|
|
|
|
| def format_dir_listing( |
| abs_path: str, |
| display_path: str, |
| *, |
| show_hidden: bool = False, |
| recursive: bool = False, |
| max_entries: int = 100, |
| fmt_size_fn: Optional[Callable[[int], str]] = None, |
| ) -> str: |
| """Format a directory listing as a visual tree.""" |
| fmt_size = fmt_size_fn or _fmt_size |
| |
| tree, total, truncated = walk_and_build_tree( |
| abs_path, |
| show_hidden=show_hidden, |
| recursive=recursive, |
| max_entries=max_entries, |
| ) |
| |
| def format_entry(name: str, meta: dict, is_dir: bool) -> str: |
| mtime = meta.get("mtime", "") |
| if is_dir: |
| return f"{name}/ ({mtime})" |
| size = meta.get("size", 0) |
| return f"{name} ({fmt_size(size)}, {mtime})" |
| |
| tree_lines = render_tree(tree, " ", format_entry) |
| |
| header = f"Listing of {display_path}\nRoot: /\nEntries: {total}" |
| if truncated: |
| header += f"\n… Truncated at {max_entries} entries." |
| |
| lines = [header, "", "└── /"] |
| lines.extend(tree_lines) |
| |
| return "\n".join(lines).strip() |
|
|
|
|
| |
| |
| |
|
|
|
|
| class SandboxedRoot: |
| """ |
| A configurable sandboxed root directory with path resolution and safety checks. |
| |
| Args: |
| root_dir: Absolute path to the sandbox root. |
| allow_abs: If True, allow absolute paths outside the sandbox. |
| """ |
| |
| def __init__(self, root_dir: str, allow_abs: bool = False): |
| self.root_dir = os.path.abspath(root_dir) |
| self.allow_abs = allow_abs |
| |
| try: |
| os.makedirs(self.root_dir, exist_ok=True) |
| except Exception: |
| pass |
| |
| def safe_err(self, exc: Exception | str) -> str: |
| """Return an error string with any absolute root replaced by '/' and slashes normalized.""" |
| s = str(exc) |
| s_norm = s.replace("\\", "/") |
| root_fwd = self.root_dir.replace("\\", "/") |
| root_variants = {self.root_dir, root_fwd, re.sub(r"/+", "/", root_fwd)} |
| for variant in root_variants: |
| if variant: |
| s_norm = s_norm.replace(variant, "/") |
| s_norm = re.sub(r"/+", "/", s_norm) |
| return s_norm |
|
|
| def err( |
| self, |
| code: str, |
| message: str, |
| *, |
| path: Optional[str] = None, |
| hint: Optional[str] = None, |
| data: Optional[dict] = None, |
| ) -> str: |
| """Return a structured error JSON string.""" |
| payload = { |
| "status": "error", |
| "code": code, |
| "message": message, |
| "root": "/", |
| } |
| if path is not None and path != "": |
| payload["path"] = path |
| if hint: |
| payload["hint"] = hint |
| if data: |
| payload["data"] = data |
| return json.dumps(payload, ensure_ascii=False) |
|
|
| def display_path(self, abs_path: str) -> str: |
| """Return a user-friendly path relative to root using forward slashes.""" |
| try: |
| norm_root = os.path.normpath(self.root_dir) |
| norm_abs = os.path.normpath(abs_path) |
| common = os.path.commonpath([norm_root, norm_abs]) |
| if os.path.normcase(common) == os.path.normcase(norm_root): |
| rel = os.path.relpath(norm_abs, norm_root) |
| if rel == ".": |
| return "/" |
| return "/" + rel.replace("\\", "/") |
| except Exception: |
| pass |
| return abs_path.replace("\\", "/") |
|
|
| def resolve_path(self, path: str) -> tuple[str, str]: |
| """ |
| Resolve a user-provided path to an absolute, normalized path constrained to root. |
| Returns (abs_path, error_message). error_message is empty when ok. |
| """ |
| try: |
| user_input = (path or "/").strip() or "/" |
| if user_input.startswith("/"): |
| rel_part = user_input.lstrip("/") or "." |
| raw = os.path.expanduser(rel_part) |
| treat_as_relative = True |
| else: |
| raw = os.path.expanduser(user_input) |
| treat_as_relative = False |
|
|
| if not treat_as_relative and os.path.isabs(raw): |
| if not self.allow_abs: |
| return "", self.err( |
| "absolute_path_disabled", |
| "Absolute paths are disabled in safe mode.", |
| path=raw.replace("\\", "/"), |
| hint="Use a path relative to / (e.g., /notes/todo.txt).", |
| ) |
| abs_path = os.path.abspath(raw) |
| else: |
| abs_path = os.path.abspath(os.path.join(self.root_dir, raw)) |
| |
| |
| if not self.allow_abs: |
| try: |
| common = os.path.commonpath( |
| [os.path.normpath(self.root_dir), os.path.normpath(abs_path)] |
| ) |
| if common != os.path.normpath(self.root_dir): |
| return "", self.err( |
| "path_outside_root", |
| "Path is outside the sandbox root.", |
| path=abs_path, |
| ) |
| except Exception: |
| return "", self.err( |
| "path_outside_root", |
| "Path is outside the sandbox root.", |
| path=abs_path, |
| ) |
|
|
| return abs_path, "" |
| except Exception as exc: |
| return "", self.err( |
| "resolve_path_failed", |
| "Failed to resolve path.", |
| path=(path or ""), |
| data={"error": self.safe_err(exc)}, |
| ) |
|
|
| def safe_open(self, file, *args, **kwargs): |
| """A drop-in replacement for open() that enforces sandbox constraints.""" |
| if isinstance(file, int): |
| return open(file, *args, **kwargs) |
| |
| path_str = os.fspath(file) |
| abs_path, err = self.resolve_path(path_str) |
| if err: |
| try: |
| msg = json.loads(err)["message"] |
| except Exception: |
| msg = err |
| raise PermissionError(f"Sandboxed open() failed: {msg}") |
| |
| return open(abs_path, *args, **kwargs) |
|
|
| def list_dir( |
| self, |
| abs_path: str, |
| *, |
| show_hidden: bool = False, |
| recursive: bool = False, |
| max_entries: int = 100, |
| ) -> str: |
| """List directory contents as a visual tree.""" |
| return format_dir_listing( |
| abs_path, |
| self.display_path(abs_path), |
| show_hidden=show_hidden, |
| recursive=recursive, |
| max_entries=max_entries, |
| fmt_size_fn=_fmt_size, |
| ) |
|
|
| def search_text( |
| self, |
| abs_path: str, |
| query: str, |
| *, |
| recursive: bool = False, |
| show_hidden: bool = False, |
| max_results: int = 20, |
| case_sensitive: bool = False, |
| start_index: int = 0, |
| ) -> str: |
| """Search for text within files.""" |
| if not os.path.exists(abs_path): |
| return self.err( |
| "path_not_found", |
| f"Path not found: {self.display_path(abs_path)}", |
| path=self.display_path(abs_path), |
| ) |
|
|
| query = query or "" |
| normalized_query = query if case_sensitive else query.lower() |
| if normalized_query == "": |
| return self.err( |
| "missing_search_query", |
| "Search query is required for the search action.", |
| hint="Provide text in the Content field to search for.", |
| ) |
|
|
| max_results = max(1, int(max_results) if max_results is not None else 20) |
| start_index = max(0, int(start_index) if start_index is not None else 0) |
| matches: list[tuple[str, int, str]] = [] |
| errors: list[str] = [] |
| files_scanned = 0 |
| truncated = False |
| total_matches = 0 |
|
|
| def _should_skip(name: str) -> bool: |
| return not show_hidden and name.startswith(".") |
|
|
| def _handle_match(file_path: str, line_no: int, line_text: str) -> bool: |
| nonlocal truncated, total_matches |
| total_matches += 1 |
| if total_matches <= start_index: |
| return False |
| if len(matches) < max_results: |
| snippet = line_text.strip() |
| if len(snippet) > 200: |
| snippet = snippet[:197] + "…" |
| matches.append((self.display_path(file_path), line_no, snippet)) |
| return False |
| truncated = True |
| return True |
|
|
| def _search_file(file_path: str) -> bool: |
| nonlocal files_scanned |
| files_scanned += 1 |
| try: |
| with open(file_path, "r", encoding="utf-8", errors="replace") as handle: |
| for line_no, line in enumerate(handle, start=1): |
| haystack = line if case_sensitive else line.lower() |
| if normalized_query in haystack: |
| if _handle_match(file_path, line_no, line): |
| return True |
| except Exception as exc: |
| errors.append(f"{self.display_path(file_path)} ({self.safe_err(exc)})") |
| return truncated |
|
|
| if os.path.isfile(abs_path): |
| _search_file(abs_path) |
| else: |
| for root, dirs, files in os.walk(abs_path): |
| dirs[:] = [d for d in dirs if not _should_skip(d)] |
| visible_files = [f for f in files if show_hidden or not f.startswith(".")] |
| for name in visible_files: |
| file_path = os.path.join(root, name) |
| if _search_file(file_path): |
| break |
| if truncated: |
| break |
| if not recursive: |
| break |
|
|
| header_lines = [ |
| f"Search results for {query!r}", |
| f"Scope: {self.display_path(abs_path)}", |
| f"Recursive: {'yes' if recursive else 'no'}, Hidden: {'yes' if show_hidden else 'no'}, Case-sensitive: {'yes' if case_sensitive else 'no'}", |
| f"Start offset: {start_index}", |
| f"Matches returned: {len(matches)}" + (" (truncated)" if truncated else ""), |
| f"Files scanned: {files_scanned}", |
| ] |
|
|
| next_cursor = start_index + len(matches) if truncated else None |
|
|
| if truncated: |
| header_lines.append(f"Matches encountered before truncation: {total_matches}") |
| header_lines.append(f"Truncated: yes — re-run with offset={next_cursor} to continue.") |
| header_lines.append(f"Next cursor: {next_cursor}") |
| else: |
| header_lines.append(f"Total matches found: {total_matches}") |
| header_lines.append("Truncated: no — end of results.") |
| header_lines.append("Next cursor: None") |
|
|
| if not matches: |
| if total_matches > 0 and start_index >= total_matches: |
| hint_limit = max(total_matches - 1, 0) |
| body_lines = [ |
| f"No matches found at or after offset {start_index}. Total matches available: {total_matches}.", |
| (f"Try a smaller offset (≤ {hint_limit})." if hint_limit >= 0 else ""), |
| ] |
| body_lines = [line for line in body_lines if line] |
| else: |
| body_lines = [ |
| "No matches found.", |
| (f"Total matches encountered: {total_matches}." if total_matches else ""), |
| ] |
| body_lines = [line for line in body_lines if line] |
| else: |
| body_lines = [ |
| f"{idx}. {path}:{line_no}: {text}" |
| for idx, (path, line_no, text) in enumerate(matches, start=1) |
| ] |
|
|
| if errors: |
| shown = errors[:5] |
| body_lines.extend(["", "Warnings:"]) |
| body_lines.extend(shown) |
| if len(errors) > len(shown): |
| body_lines.append(f"… {len(errors) - len(shown)} additional files could not be read.") |
|
|
| return "\n".join(header_lines) + "\n\n" + "\n".join(body_lines) |
|
|
| def read_file(self, abs_path: str, *, offset: int = 0, max_chars: int = 4000) -> str: |
| """Read file contents with optional offset and character limit.""" |
| if not os.path.exists(abs_path): |
| return self.err( |
| "file_not_found", |
| f"File not found: {self.display_path(abs_path)}", |
| path=self.display_path(abs_path), |
| ) |
| if os.path.isdir(abs_path): |
| return self.err( |
| "is_directory", |
| f"Path is a directory, not a file: {self.display_path(abs_path)}", |
| path=self.display_path(abs_path), |
| hint="Provide a file path.", |
| ) |
| try: |
| with open(abs_path, "r", encoding="utf-8", errors="replace") as f: |
| data = f.read() |
| except Exception as exc: |
| return self.err( |
| "read_failed", |
| "Failed to read file.", |
| path=self.display_path(abs_path), |
| data={"error": self.safe_err(exc)}, |
| ) |
| total = len(data) |
| start = max(0, min(offset, total)) |
| if max_chars > 0: |
| end = min(total, start + max_chars) |
| else: |
| end = total |
| chunk = data[start:end] |
| next_cursor = end if end < total else None |
| header = ( |
| f"Reading {self.display_path(abs_path)}\n" |
| f"Offset {start}, returned {len(chunk)} of {total}." |
| + (f"\nNext cursor: {next_cursor}" if next_cursor is not None else "") |
| ) |
| sep = "\n\n---\n\n" |
| return header + sep + chunk |
|
|
| def info(self, abs_path: str) -> str: |
| """Get file/directory metadata as JSON.""" |
| try: |
| st = os.stat(abs_path) |
| except Exception as exc: |
| return self.err( |
| "stat_failed", |
| "Failed to stat path.", |
| path=self.display_path(abs_path), |
| data={"error": self.safe_err(exc)}, |
| ) |
| info_dict = { |
| "path": self.display_path(abs_path), |
| "type": "directory" if stat.S_ISDIR(st.st_mode) else "file", |
| "size": st.st_size, |
| "modified": datetime.fromtimestamp(st.st_mtime).isoformat(sep=" ", timespec="seconds"), |
| "created": datetime.fromtimestamp(st.st_ctime).isoformat(sep=" ", timespec="seconds"), |
| "mode": oct(st.st_mode), |
| "root": "/", |
| } |
| return json.dumps(info_dict, indent=2) |
|
|
|
|
| |
| |
| |
|
|
| def _get_filesystem_root() -> str: |
| """Get the default filesystem root directory.""" |
| root = os.getenv("NYMBO_TOOLS_ROOT") |
| if root and root.strip(): |
| return os.path.abspath(os.path.expanduser(root.strip())) |
| try: |
| here = os.path.abspath(__file__) |
| tools_dir = os.path.dirname(os.path.dirname(here)) |
| return os.path.abspath(os.path.join(tools_dir, "Filesystem")) |
| except Exception: |
| return os.path.abspath(os.getcwd()) |
|
|
|
|
| def _get_obsidian_root() -> str: |
| """Get the default Obsidian vault root directory.""" |
| env_root = os.getenv("OBSIDIAN_VAULT_ROOT") |
| if env_root and env_root.strip(): |
| return os.path.abspath(os.path.expanduser(env_root.strip())) |
| try: |
| here = os.path.abspath(__file__) |
| tools_dir = os.path.dirname(os.path.dirname(here)) |
| return os.path.abspath(os.path.join(tools_dir, "Obsidian")) |
| except Exception: |
| return os.path.abspath(os.getcwd()) |
|
|
|
|
| |
| ALLOW_ABS = bool(int(os.getenv("UNSAFE_ALLOW_ABS_PATHS", "0"))) |
|
|
| FILESYSTEM_ROOT = _get_filesystem_root() |
| OBSIDIAN_ROOT = _get_obsidian_root() |
|
|
| |
| filesystem_sandbox = SandboxedRoot(FILESYSTEM_ROOT, allow_abs=ALLOW_ABS) |
|
|
| |
| obsidian_sandbox = SandboxedRoot(OBSIDIAN_ROOT, allow_abs=ALLOW_ABS) |
|
|
|
|
| |
| ROOT_DIR = FILESYSTEM_ROOT |
|
|
| def _resolve_path(path: str) -> tuple[str, str]: |
| """Resolve path using the default filesystem sandbox.""" |
| return filesystem_sandbox.resolve_path(path) |
|
|
| def _display_path(abs_path: str) -> str: |
| """Display path using the default filesystem sandbox.""" |
| return filesystem_sandbox.display_path(abs_path) |
|
|
| def safe_open(file, *args, **kwargs): |
| """Open file using the default filesystem sandbox.""" |
| return filesystem_sandbox.safe_open(file, *args, **kwargs) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def create_safe_builtins() -> dict: |
| """Create a builtins dict with sandboxed open().""" |
| if isinstance(__builtins__, dict): |
| safe_builtins = __builtins__.copy() |
| else: |
| safe_builtins = vars(__builtins__).copy() |
| safe_builtins["open"] = safe_open |
| return safe_builtins |
|
|
|
|
| def sandboxed_exec( |
| code: str, |
| *, |
| extra_globals: dict[str, Any] | None = None, |
| ast_mode: bool = False, |
| ) -> str: |
| """ |
| Execute Python code in a sandboxed environment. |
| |
| Args: |
| code: Python source code to execute |
| extra_globals: Additional globals to inject (e.g., tools) |
| ast_mode: If True, parse and print results of all expression statements |
| (like Agent_Terminal). If False, simple exec (like Code_Interpreter). |
| |
| Returns: |
| Captured stdout output, or exception text on error. |
| """ |
| if not code: |
| return "No code provided." |
| |
| old_stdout = sys.stdout |
| old_cwd = os.getcwd() |
| redirected_output = sys.stdout = StringIO() |
| |
| |
| safe_builtins = create_safe_builtins() |
| env: dict[str, Any] = { |
| "open": safe_open, |
| "__builtins__": safe_builtins, |
| "print": print, |
| } |
| if extra_globals: |
| env.update(extra_globals) |
| |
| try: |
| os.chdir(ROOT_DIR) |
| |
| if ast_mode: |
| |
| tree = ast.parse(code) |
| for node in tree.body: |
| if isinstance(node, ast.Expr): |
| |
| expr = compile(ast.Expression(node.value), filename="<string>", mode="eval") |
| result_val = eval(expr, env) |
| if result_val is not None: |
| print(result_val) |
| else: |
| |
| mod = ast.Module(body=[node], type_ignores=[]) |
| exec(compile(mod, filename="<string>", mode="exec"), env) |
| else: |
| |
| exec(code, env) |
| |
| result = redirected_output.getvalue() |
| except Exception as exc: |
| result = str(exc) |
| finally: |
| sys.stdout = old_stdout |
| try: |
| os.chdir(old_cwd) |
| except Exception: |
| pass |
| |
| return result |
|
|
|
|
| |
| |
| |
|
|
|
|
| def get_hf_token() -> str | None: |
| """Get the HF API token from environment variables. |
| |
| Checks HF_READ_TOKEN first, then falls back to HF_TOKEN. |
| """ |
| return os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN") |
|
|
|
|
| |
| HF_TOKEN = get_hf_token() |
|
|
| |
| DEFAULT_PROVIDERS = ["auto", "replicate", "fal-ai"] |
|
|
| |
| TEXTGEN_PROVIDERS = ["cerebras", "auto"] |
|
|
|
|
| T = TypeVar("T") |
|
|
|
|
| def handle_hf_error(msg: str, model_id: str, *, context: str = "generation") -> None: |
| """ |
| Raise appropriate gr.Error for common HF API error codes. |
| |
| Args: |
| msg: Error message string to analyze |
| model_id: The model ID being used (for error messages) |
| context: Description of operation for error messages |
| |
| Raises: |
| gr.Error: With user-friendly message based on error type |
| """ |
| lowered = msg.lower() |
| |
| if "404" in msg: |
| raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and your HF token access.") |
| |
| if "503" in msg: |
| raise gr.Error("The model is warming up. Please try again shortly.") |
| |
| if "401" in msg or "403" in msg: |
| raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.") |
| |
| if any(pattern in lowered for pattern in ("api_key", "hf auth login", "unauthorized", "forbidden")): |
| raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.") |
| |
| |
| raise gr.Error(f"{context.capitalize()} failed: {msg}") |
|
|
|
|
| def invoke_with_fallback( |
| fn: Callable[[str], T], |
| providers: list[str] | None = None, |
| ) -> T: |
| """ |
| Try calling fn(provider) for each provider until one succeeds. |
| |
| Args: |
| fn: Function that takes a provider string and returns a result. |
| Should raise an exception on failure. |
| providers: List of provider strings to try. Defaults to DEFAULT_PROVIDERS. |
| |
| Returns: |
| The result from the first successful fn() call. |
| |
| Raises: |
| The last exception if all providers fail. |
| """ |
| if providers is None: |
| providers = DEFAULT_PROVIDERS |
| |
| last_error: Exception | None = None |
| |
| for provider in providers: |
| try: |
| return fn(provider) |
| except Exception as exc: |
| last_error = exc |
| continue |
| |
| |
| if last_error: |
| raise last_error |
| raise RuntimeError("No providers available") |
|
|
|
|
| |
| |
| |
|
|
| __all__ = [ |
| |
| "_fmt_size", |
| "build_tree", |
| "render_tree", |
| "walk_and_build_tree", |
| "format_dir_listing", |
| |
| "SandboxedRoot", |
| "filesystem_sandbox", |
| "obsidian_sandbox", |
| "ROOT_DIR", |
| "FILESYSTEM_ROOT", |
| "OBSIDIAN_ROOT", |
| "ALLOW_ABS", |
| "_resolve_path", |
| "_display_path", |
| "safe_open", |
| |
| "sandboxed_exec", |
| "create_safe_builtins", |
| |
| "get_hf_token", |
| "HF_TOKEN", |
| "DEFAULT_PROVIDERS", |
| "TEXTGEN_PROVIDERS", |
| "handle_hf_error", |
| "invoke_with_fallback", |
| ] |
|
|