| """Utility function for gradio/external.py, designed for internal use.""" |
|
|
| from __future__ import annotations |
|
|
| import base64 |
| import math |
| import re |
| import warnings |
|
|
| import httpx |
| import yaml |
| from huggingface_hub import InferenceClient |
|
|
| from gradio import components |
|
|
| |
| |
| |
|
|
|
|
| def get_tabular_examples(model_name: str) -> dict[str, list[float]]: |
| readme = httpx.get(f"https://huggingface.co/{model_name}/resolve/main/README.md") |
| if readme.status_code != 200: |
| warnings.warn(f"Cannot load examples from README for {model_name}", UserWarning) |
| example_data = {} |
| else: |
| yaml_regex = re.search( |
| "(?:^|[\r\n])---[\n\r]+([\\S\\s]*?)[\n\r]+---([\n\r]|$)", readme.text |
| ) |
| if yaml_regex is None: |
| example_data = {} |
| else: |
| example_yaml = next( |
| yaml.safe_load_all(readme.text[: yaml_regex.span()[-1]]) |
| ) |
| example_data = example_yaml.get("widget", {}).get("structuredData", {}) |
| if not example_data: |
| raise ValueError( |
| f"No example data found in README.md of {model_name} - Cannot build gradio demo. " |
| "See the README.md here: https://huggingface.co/scikit-learn/tabular-playground/blob/main/README.md " |
| "for a reference on how to provide example data to your model." |
| ) |
| |
| for data in example_data.values(): |
| for i, val in enumerate(data): |
| if isinstance(val, float) and math.isnan(val): |
| data[i] = "NaN" |
| return example_data |
|
|
|
|
| def cols_to_rows( |
| example_data: dict[str, list[float | str] | None], |
| ) -> tuple[list[str], list[list[float]]]: |
| headers = list(example_data.keys()) |
| n_rows = max(len(example_data[header] or []) for header in headers) |
| data = [] |
| for row_index in range(n_rows): |
| row_data = [] |
| for header in headers: |
| col = example_data[header] or [] |
| if row_index >= len(col): |
| row_data.append("NaN") |
| else: |
| row_data.append(col[row_index]) |
| data.append(row_data) |
| return headers, data |
|
|
|
|
| def rows_to_cols(incoming_data: dict) -> dict[str, dict[str, dict[str, list[str]]]]: |
| data_column_wise = {} |
| for i, header in enumerate(incoming_data["headers"]): |
| data_column_wise[header] = [str(row[i]) for row in incoming_data["data"]] |
| return {"inputs": {"data": data_column_wise}} |
|
|
|
|
| |
| |
| |
|
|
|
|
| def postprocess_label(scores: list[dict[str, str | float]]) -> dict: |
| return {c["label"]: c["score"] for c in scores} |
|
|
|
|
| def postprocess_mask_tokens(scores: list[dict[str, str | float]]) -> dict: |
| return {c["token_str"]: c["score"] for c in scores} |
|
|
|
|
| def postprocess_question_answering(answer: dict) -> tuple[str, dict]: |
| return answer["answer"], {answer["answer"]: answer["score"]} |
|
|
|
|
| def postprocess_visual_question_answering(scores: list[dict[str, str | float]]) -> dict: |
| return {c["answer"]: c["score"] for c in scores} |
|
|
|
|
| def zero_shot_classification_wrapper(client: InferenceClient): |
| def zero_shot_classification_inner(input: str, labels: str, multi_label: bool): |
| return client.zero_shot_classification( |
| input, labels.split(","), multi_label=multi_label |
| ) |
|
|
| return zero_shot_classification_inner |
|
|
|
|
| def sentence_similarity_wrapper(client: InferenceClient): |
| def sentence_similarity_inner(input: str, sentences: str): |
| return client.sentence_similarity(input, sentences.split("\n")) |
|
|
| return sentence_similarity_inner |
|
|
|
|
| def text_generation_wrapper(client: InferenceClient): |
| def text_generation_inner(input: str): |
| return input + client.text_generation(input) |
|
|
| return text_generation_inner |
|
|
|
|
| def encode_to_base64(r: httpx.Response) -> str: |
| |
| base64_repr = base64.b64encode(r.content).decode("utf-8") |
| data_prefix = ";base64," |
| |
| if data_prefix in base64_repr: |
| return base64_repr |
| else: |
| content_type = r.headers.get("content-type") |
| |
| if content_type == "application/json": |
| try: |
| data = r.json()[0] |
| content_type = data["content-type"] |
| base64_repr = data["blob"] |
| except KeyError as ke: |
| raise ValueError( |
| "Cannot determine content type returned by external API." |
| ) from ke |
| |
| else: |
| pass |
| new_base64 = f"data:{content_type};base64,{base64_repr}" |
| return new_base64 |
|
|
|
|
| def format_ner_list(input_string: str, ner_groups: list[dict[str, str | int]]): |
| if len(ner_groups) == 0: |
| return [(input_string, None)] |
|
|
| output = [] |
| end = 0 |
| prev_end = 0 |
|
|
| for group in ner_groups: |
| entity, start, end = group["entity_group"], group["start"], group["end"] |
| output.append((input_string[prev_end:start], None)) |
| output.append((input_string[start:end], entity)) |
| prev_end = end |
|
|
| output.append((input_string[end:], None)) |
| return output |
|
|
|
|
| def token_classification_wrapper(client: InferenceClient): |
| def token_classification_inner(input: str): |
| ner_list = client.token_classification(input) |
| return format_ner_list(input, ner_list) |
|
|
| return token_classification_inner |
|
|
|
|
| def object_detection_wrapper(client: InferenceClient): |
| def object_detection_inner(input: str): |
| annotations = client.object_detection(input) |
| formatted_annotations = [ |
| ( |
| ( |
| a["box"]["xmin"], |
| a["box"]["ymin"], |
| a["box"]["xmax"], |
| a["box"]["ymax"], |
| ), |
| a["label"], |
| ) |
| for a in annotations |
| ] |
| return (input, formatted_annotations) |
|
|
| return object_detection_inner |
|
|
|
|
| def chatbot_preprocess(text, state): |
| if not state: |
| return text, [], [] |
| return ( |
| text, |
| state["conversation"]["generated_responses"], |
| state["conversation"]["past_user_inputs"], |
| ) |
|
|
|
|
| def chatbot_postprocess(response): |
| chatbot_history = list( |
| zip( |
| response["conversation"]["past_user_inputs"], |
| response["conversation"]["generated_responses"], |
| ) |
| ) |
| return chatbot_history, response |
|
|
|
|
| def tabular_wrapper(client: InferenceClient, pipeline: str): |
| |
| |
| |
| def tabular_inner(data): |
| if pipeline not in ("tabular_classification", "tabular_regression"): |
| raise TypeError(f"pipeline type {pipeline!r} not supported") |
| assert client.model |
| if pipeline == "tabular_classification": |
| return client.tabular_classification(data, model=client.model) |
| else: |
| return client.tabular_regression(data, model=client.model) |
|
|
| return tabular_inner |
|
|
|
|
| |
| |
| |
|
|
|
|
| def streamline_spaces_interface(config: dict) -> dict: |
| """Streamlines the interface config dictionary to remove unnecessary keys.""" |
| config["inputs"] = [ |
| components.get_component_instance(component) |
| for component in config["input_components"] |
| ] |
| config["outputs"] = [ |
| components.get_component_instance(component) |
| for component in config["output_components"] |
| ] |
| parameters = { |
| "article", |
| "description", |
| "flagging_options", |
| "inputs", |
| "outputs", |
| "title", |
| } |
| config = {k: config[k] for k in parameters} |
| return config |
|
|