Spaces:
Sleeping
Sleeping
import gradio as gr | |
import datasets | |
import huggingface_hub | |
import sys | |
import os | |
import time | |
from pathlib import Path | |
import json | |
import logging | |
import pandas as pd | |
from transformers.pipelines import TextClassificationPipeline | |
HF_REPO_ID = 'HF_REPO_ID' | |
HF_SPACE_ID = 'SPACE_ID' | |
HF_WRITE_TOKEN = 'HF_WRITE_TOKEN' | |
theme = gr.themes.Soft( | |
primary_hue="green", | |
) | |
def check_model(model_id): | |
try: | |
task = huggingface_hub.model_info(model_id).pipeline_tag | |
except Exception: | |
return None, None | |
try: | |
from transformers import pipeline | |
ppl = pipeline(task=task, model=model_id) | |
return model_id, ppl | |
except Exception as e: | |
return model_id, e | |
def check_dataset(dataset_id, dataset_config="default", dataset_split="test"): | |
try: | |
configs = datasets.get_dataset_config_names(dataset_id) | |
except Exception: | |
# Dataset may not exist | |
return None, dataset_config, dataset_split | |
if dataset_config not in configs: | |
# Need to choose dataset subset (config) | |
return dataset_id, configs, dataset_split | |
ds = datasets.load_dataset(dataset_id, dataset_config) | |
if isinstance(ds, datasets.DatasetDict): | |
# Need to choose dataset split | |
if dataset_split not in ds.keys(): | |
return dataset_id, None, list(ds.keys()) | |
elif not isinstance(ds, datasets.Dataset): | |
# Unknown type | |
return dataset_id, None, None | |
return dataset_id, dataset_config, dataset_split | |
def text_classificaiton_match_label_case_unsensative(id2label_mapping, label): | |
for model_label in id2label_mapping.keys(): | |
if model_label.upper() == label.upper(): | |
return model_label, label | |
return None, label | |
def text_classification_map_model_and_dataset_labels(id2label, dataset_features): | |
id2label_mapping = {id2label[k]: None for k in id2label.keys()} | |
dataset_labels = None | |
for feature in dataset_features.values(): | |
if not isinstance(feature, datasets.ClassLabel): | |
continue | |
if len(feature.names) != len(id2label_mapping.keys()): | |
continue | |
dataset_labels = feature.names | |
# Try to match labels | |
for label in feature.names: | |
if label in id2label_mapping.keys(): | |
model_label = label | |
else: | |
# Try to find case unsensative | |
model_label, label = text_classificaiton_match_label_case_unsensative(id2label_mapping, label) | |
if model_label is not None: | |
id2label_mapping[model_label] = label | |
return id2label_mapping, dataset_labels | |
def text_classification_fix_column_mapping(column_mapping, ppl, d_id, config, split): | |
# We assume dataset is ok here | |
ds = datasets.load_dataset(d_id, config)[split] | |
try: | |
dataset_features = ds.features | |
except AttributeError: | |
# Dataset does not have features, need to provide everything | |
return None, None, None | |
# Check whether we need to infer the text input column | |
infer_text_input_column = True | |
if "text" in column_mapping.keys(): | |
dataset_text_column = column_mapping["text"] | |
if dataset_text_column in dataset_features.keys(): | |
infer_text_input_column = False | |
else: | |
logging.warning(f"Provided {dataset_text_column} is not in Dataset columns") | |
if infer_text_input_column: | |
# Try to retrieve one | |
candidates = [f for f in dataset_features if dataset_features[f].dtype == "string"] | |
if len(candidates) > 0: | |
logging.debug(f"Candidates are {candidates}") | |
column_mapping["text"] = candidates[0] | |
else: | |
# Not found a text feature | |
return column_mapping, None, None | |
# Load dataset as DataFrame | |
df = ds.to_pandas() | |
# Retrieve all labels | |
id2label_mapping = {} | |
id2label = ppl.model.config.id2label | |
label2id = {v: k for k, v in id2label.items()} | |
prediction_result = None | |
try: | |
# Use the first item to test prediction | |
results = ppl({"text": df.head(1).at[0, column_mapping["text"]]}, top_k=None) | |
prediction_result = { | |
f'{result["label"]}({label2id[result["label"]]})': result["score"] for result in results | |
} | |
except Exception: | |
# Pipeline prediction failed, need to provide labels | |
return column_mapping, None, None | |
# Infer labels | |
id2label_mapping, dataset_labels = text_classification_map_model_and_dataset_labels(id2label, dataset_features) | |
if "label" in column_mapping.keys(): | |
if not isinstance(column_mapping["label"], dict) or set(column_mapping["label"].values()) != set(dataset_labels): | |
logging.warning(f'Provided {column_mapping["label"]} does not match labels in Dataset') | |
return column_mapping, prediction_result, None | |
if isinstance(column_mapping["label"], dict): | |
for model_label in id2label_mapping.keys(): | |
id2label_mapping[model_label] = column_mapping["label"][str(label2id[model_label])] | |
elif None in id2label_mapping.values(): | |
column_mapping["label"] = { | |
i: None for i in id2label.keys() | |
} | |
return column_mapping, prediction_result, None | |
id2label_df = pd.DataFrame({ | |
"ID": [i for i in id2label.keys()], | |
"Model labels": [id2label[label] for label in id2label.keys()], | |
"Dataset labels": [id2label_mapping[id2label[label]] for label in id2label.keys()], | |
}) | |
if "label" not in column_mapping.keys(): | |
column_mapping["label"] = { | |
i: id2label_mapping[id2label[i]] for i in id2label.keys() | |
} | |
return column_mapping, prediction_result, id2label_df | |
def try_validate(model_id, dataset_id, dataset_config, dataset_split, column_mapping): | |
# Validate model | |
m_id, ppl = check_model(model_id=model_id) | |
if m_id is None: | |
gr.Warning(f'Model "{model_id}" is not accessible. Please set your HF_TOKEN if it is a private model.') | |
return ( | |
dataset_config, dataset_split, | |
gr.update(interactive=False), # Submit button | |
gr.update(visible=False), # Model prediction preview | |
gr.update(visible=False), # Label mapping preview | |
gr.update(visible=True), # Column mapping | |
) | |
if isinstance(ppl, Exception): | |
gr.Warning(f'Failed to load "{model_id} model": {ppl}') | |
return ( | |
dataset_config, dataset_split, | |
gr.update(interactive=False), # Submit button | |
gr.update(visible=False), # Model prediction preview | |
gr.update(visible=False), # Label mapping preview | |
gr.update(visible=True), # Column mapping | |
) | |
# Validate dataset | |
d_id, config, split = check_dataset(dataset_id=dataset_id, dataset_config=dataset_config, dataset_split=dataset_split) | |
dataset_ok = False | |
if d_id is None: | |
gr.Warning(f'Dataset "{dataset_id}" is not accessible. Please set your HF_TOKEN if it is a private dataset.') | |
elif isinstance(config, list): | |
gr.Warning(f'Dataset "{dataset_id}" does not have "{dataset_config}" config. Please choose a valid config.') | |
config = gr.update(choices=config, value=config[0]) | |
elif isinstance(split, list): | |
gr.Warning(f'Dataset "{dataset_id}" does not have "{dataset_split}" split. Please choose a valid split.') | |
split = gr.update(choices=split, value=split[0]) | |
else: | |
dataset_ok = True | |
if not dataset_ok: | |
return ( | |
config, split, | |
gr.update(interactive=False), # Submit button | |
gr.update(visible=False), # Model prediction preview | |
gr.update(visible=False), # Label mapping preview | |
gr.update(visible=True), # Column mapping | |
) | |
# TODO: Validate column mapping by running once | |
prediction_result = None | |
id2label_df = None | |
if isinstance(ppl, TextClassificationPipeline): | |
try: | |
column_mapping = json.loads(column_mapping) | |
except Exception: | |
column_mapping = {} | |
column_mapping, prediction_result, id2label_df = \ | |
text_classification_fix_column_mapping(column_mapping, ppl, d_id, config, split) | |
column_mapping = json.dumps(column_mapping, indent=2) | |
del ppl | |
if prediction_result is None: | |
gr.Warning('The model failed to predict with the first row in the dataset. Please provide column mappings in "Advance" settings.') | |
return ( | |
config, split, | |
gr.update(interactive=False), # Submit button | |
gr.update(visible=False), # Model prediction preview | |
gr.update(visible=False), # Label mapping preview | |
gr.update(value=column_mapping, visible=True, interactive=True), # Column mapping | |
) | |
elif id2label_df is None: | |
gr.Warning('The prediction result does not conform the labels in the dataset. Please provide label mappings in "Advance" settings.') | |
return ( | |
config, split, | |
gr.update(interactive=False), # Submit button | |
gr.update(value=prediction_result, visible=True), # Model prediction preview | |
gr.update(visible=False), # Label mapping preview | |
gr.update(value=column_mapping, visible=True, interactive=True), # Column mapping | |
) | |
gr.Info("Model and dataset validations passed. Your can submit the evaluation task.") | |
return ( | |
config, split, | |
gr.update(interactive=True), # Submit button | |
gr.update(value=prediction_result, visible=True), # Model prediction preview | |
gr.update(value=id2label_df, visible=True), # Label mapping preview | |
gr.update(value=column_mapping, visible=True, interactive=True), # Column mapping | |
) | |
def try_submit(m_id, d_id, config, split, local): | |
if local: | |
if "cicd" not in sys.path: | |
sys.path.append("cicd") | |
from giskard_cicd.loaders import HuggingFaceLoader | |
from giskard_cicd.pipeline.runner import PipelineRunner | |
from automation import create_discussion_detailed | |
supported_loaders = { | |
"huggingface": HuggingFaceLoader(), | |
} | |
runner = PipelineRunner(loaders=supported_loaders) | |
runner_kwargs = { | |
"loader_id": "huggingface", | |
"model": m_id, | |
"dataset": d_id, | |
"scan_config": None, | |
"dataset_split": split, | |
"dataset_config": config, | |
} | |
eval_str = f"[{m_id}]<{d_id}({config}, {split} set)>" | |
start = time.time() | |
print(f"Start local evaluation on {eval_str}") | |
report = runner.run(**runner_kwargs) | |
# TODO: Publish it with given repo id/model id | |
if os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID) and os.environ.get(HF_WRITE_TOKEN): | |
rendered_report = report.to_markdown(template="github") | |
repo = os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID) | |
create_discussion_detailed(repo, m_id, d_id, config, split, os.environ.get(HF_WRITE_TOKEN), rendered_report) | |
# Cache locally | |
rendered_report = report.to_html() | |
output_dir = Path(f"output/{m_id}/{d_id}/{config}/{split}/") | |
output_dir.mkdir(parents=True, exist_ok=True) | |
with open(output_dir / "report.html", "w") as f: | |
print(f'Writing to {output_dir / "report.html"}') | |
f.write(rendered_report) | |
print(f"Finished local evaluation on {eval_str}: {time.time() - start:.2f}s") | |
with gr.Blocks(theme=theme) as iface: | |
with gr.Row(): | |
with gr.Column(): | |
model_id_input = gr.Textbox( | |
label="Hugging Face model id", | |
placeholder="cardiffnlp/twitter-roberta-base-sentiment-latest", | |
) | |
# TODO: Add supported model pairs: Text Classification - text-classification | |
model_type = gr.Dropdown( | |
label="Hugging Face model type", | |
choices=[ | |
("Auto-detect", 0), | |
("Text Classification", 1), | |
], | |
value=0, | |
) | |
example_labels = gr.Label(label='Model pipeline test prediction result', visible=False) | |
with gr.Column(): | |
dataset_id_input = gr.Textbox( | |
label="Hugging Face dataset id", | |
placeholder="tweet_eval", | |
) | |
dataset_config_input = gr.Dropdown( | |
label="Hugging Face dataset subset", | |
choices=[ | |
"default", | |
], | |
allow_custom_value=True, | |
value="default", | |
) | |
dataset_split_input = gr.Dropdown( | |
label="Hugging Face dataset split", | |
choices=[ | |
"test", | |
], | |
allow_custom_value=True, | |
value="test", | |
) | |
id2label_mapping_dataframe = gr.DataFrame(visible=False) | |
with gr.Row(): | |
with gr.Accordion("Advance", open=False): | |
run_local = gr.Checkbox(value=True, label="Run in this Space") | |
column_mapping_input = gr.Textbox( | |
value="", | |
lines=5, | |
label="Column mapping", | |
placeholder="Description of mapping of columns in model to dataset, in json format, e.g.:\n" | |
'{\n' | |
' "text": "context",\n' | |
' "label": {0: "Positive", 1: "Negative"}\n' | |
'}', | |
) | |
with gr.Row(): | |
validate_btn = gr.Button("Validate model and dataset", variant="primary") | |
run_btn = gr.Button( | |
"Submit evaluation task", | |
variant="primary", | |
interactive=False, | |
) | |
validate_btn.click( | |
try_validate, | |
inputs=[ | |
model_id_input, | |
dataset_id_input, | |
dataset_config_input, | |
dataset_split_input, | |
column_mapping_input, | |
], | |
outputs=[ | |
dataset_config_input, | |
dataset_split_input, | |
run_btn, | |
example_labels, | |
id2label_mapping_dataframe, | |
column_mapping_input, | |
], | |
) | |
run_btn.click( | |
try_submit, | |
inputs=[ | |
model_id_input, | |
dataset_id_input, | |
dataset_config_input, | |
dataset_split_input, | |
run_local, | |
], | |
) | |
iface.queue(max_size=20) | |
iface.launch() | |