| from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer |
| from datasets import Dataset |
| from tqdm import tqdm |
| import torch |
| import numpy as np |
| import os |
| from langdetect import detect |
| from sklearn.metrics import accuracy_score, f1_score, log_loss, confusion_matrix, ConfusionMatrixDisplay |
| import matplotlib.pyplot as plt |
|
|
| class Classifier: |
| def __init__(self, model_path, label_map, verbose = False): |
| self.model_path = model_path |
| self.classifier = pipeline("text-classification", model=model_path, tokenizer=model_path, device=0 if torch.cuda.is_available() else -1) |
| self.label_map = label_map |
| if verbose: |
| self.print_device_information() |
| |
| def print_device_information(self): |
| |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
| device_properties = torch.cuda.get_device_properties(0) if device.type == "cuda" else "CPU Device" |
|
|
| print(f"Using device: {device}") |
| if device.type == "cuda": |
| print(f"Device Name: {device_properties.name}") |
| |
| print(f"Total Memory: {device_properties.total_memory / 1e9:.2f} GB") |
|
|
| def tokenize_and_trim(self, text): |
| max_length = self.classifier.tokenizer.model_max_length |
| inputs = self.classifier.tokenizer(text, truncation=True, max_length=max_length, return_tensors="tf") |
| return self.classifier.tokenizer.decode(inputs['input_ids'][0], skip_special_tokens=True) |
|
|
|
|
| def classify_dataframe_column(self, df, target_column, feature_suffix): |
|
|
| tqdm.pandas() |
| df[f'trimmed_{target_column}'] = df[target_column].progress_apply(self.tokenize_and_trim) |
|
|
| results = [] |
| for text in tqdm(df[f'trimmed_{target_column}'].tolist(), desc="Classifying"): |
| result = self.classifier(text) |
| results.append(result[0]) |
|
|
| df[f'pred_label_{feature_suffix}'] = [self.label_map[int(result['label'].split('_')[-1])] for result in results] |
| df[f'prob_{feature_suffix}'] = [result['score'] for result in results] |
| df.drop(columns=[f'trimmed_{target_column}'], inplace=True) |
| return df |
| |
| def test_model_predictions(self, df, target_column): |
| """ |
| Tests model predictions on a given dataframe column and computes evaluation metrics. |
| |
| Args: |
| df (pd.DataFrame): Input dataframe containing the data. |
| target_column (str): The name of the column to classify. |
| |
| Requirements: |
| - The dataframe must include a 'label' column for comparison with predictions. |
| |
| Returns: |
| dict: A dictionary containing accuracy, F1 score, cross-entropy loss, |
| and the confusion matrix. |
| """ |
| |
| dataset = Dataset.from_pandas(df) |
|
|
| |
| def process_data(batch): |
| trimmed_text = self.tokenize_and_trim(batch[target_column]) |
| result = self.classifier(trimmed_text) |
| score = result[0]['score'] |
| label = result[0]['label'] |
| return { |
| 'trimmed_text': trimmed_text, |
| 'predicted_prob_0': score if label == 'LABEL_0' else 1 - score, |
| 'predicted_prob_1': 1 - score if label == 'LABEL_0' else score, |
| } |
|
|
| |
| processed_dataset = dataset.map(process_data, batched=False) |
|
|
| |
| processed_df = processed_dataset.to_pandas() |
|
|
| |
| predicted_probs = processed_df[['predicted_prob_0', 'predicted_prob_1']].values |
| true_labels = df['label'].values |
|
|
| |
| accuracy = accuracy_score(true_labels, np.argmax(predicted_probs, axis=1)) |
| f1 = f1_score(true_labels, np.argmax(predicted_probs, axis=1), average='weighted') |
| cross_entropy_loss = log_loss(true_labels, predicted_probs) |
|
|
| |
| print(f"Accuracy: {accuracy:.4f}") |
| print(f"F1 Score: {f1:.4f}") |
| print(f"Cross Entropy Loss: {cross_entropy_loss:.4f}") |
|
|
| |
| cm = confusion_matrix(true_labels, np.argmax(predicted_probs, axis=1)) |
| cmap = plt.cm.Blues |
| disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1]) |
| disp.plot(cmap=cmap) |
| plt.show() |
|
|
| |
| return { |
| "accuracy": accuracy, |
| "f1_score": f1, |
| "cross_entropy_loss": cross_entropy_loss, |
| "confusion_matrix": cm, |
| "predicted_probs": predicted_probs |
| } |
| |
| |
| class LanguageDetector: |
| def __init__(self, dataframe): |
| """ |
| Initializes the LanguageDetector with the provided DataFrame. |
| """ |
| self.dataframe = dataframe |
|
|
| def detect_language_dataframe_column(self, target_column): |
| """ |
| Detects the language of text in the specified column using langdetect and adds |
| a 'detected_language' column to the DataFrame. |
| """ |
| def detect_language(text): |
| try: |
| return detect(text) |
| except Exception: |
| return None |
|
|
| tqdm.pandas() |
| self.dataframe['detected_language'] = self.dataframe[target_column].progress_apply(detect_language) |
|
|
| return self.dataframe |
| |
|
|
| |
| class TensorflowClassifier(Classifier): |
| def __init__(self, model_path, label_map, verbose=False): |
| super().__init__(model_path, label_map, verbose=False) |
| self.is_tensorflow = False |
| |
| if self._is_tensorflow_model(model_path): |
| self.model = tf.keras.models.load_model(model_path) |
| self.tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased") |
| self.is_tensorflow = True |
| if verbose: |
| print("Loaded TensorFlow model.") |
| else: |
| if verbose: |
| print("Fallback to HuggingFace pipeline.") |
|
|
| def _is_tensorflow_model(self, model_path): |
| return os.path.isdir(model_path) and os.path.exists(os.path.join(model_path, "saved_model.pb")) |
|
|
| def classify(self, text): |
| if self.is_tensorflow: |
| inputs = self.tokenizer(text, truncation=True, max_length=self.tokenizer.model_max_length, return_tensors="np") |
| logits = self.model.predict([inputs["input_ids"], inputs["attention_mask"]]) |
| probabilities = tf.nn.softmax(logits).numpy() |
| label_id = np.argmax(probabilities, axis=-1).item() |
| return { |
| "label": f"LABEL_{label_id}", |
| "score": probabilities.max() |
| } |
| else: |
| return self.classifier(text)[0] |
|
|
| def classify_dataframe_column(self, df, target_column, feature_suffix): |
| tqdm.pandas() |
| df[f'trimmed_{target_column}'] = df[target_column].progress_apply( |
| lambda text: self.tokenizer.decode( |
| self.tokenizer(text, truncation=True, max_length=self.tokenizer.model_max_length)["input_ids"], |
| skip_special_tokens=True |
| ) |
| ) |
|
|
| if self.is_tensorflow: |
| results = [self.classify(text) for text in df[f'trimmed_{target_column}']] |
| else: |
| results = [self.classifier(text)[0] for text in df[f'trimmed_{target_column}']] |
|
|
| df[f'pred_label_{feature_suffix}'] = [ |
| self.label_map[int(result['label'].split('_')[-1])] for result in results |
| ] |
| df[f'prob_{feature_suffix}'] = [result['score'] for result in results] |
| df.drop(columns=[f'trimmed_{target_column}'], inplace=True) |
| return df |
|
|
|
|
| class ZeroShotClassifier(Classifier): |
|
|
| def __init__(self, model_path, tokenizer_path, candidate_labels): |
| self.model_path = model_path |
| self.candidate_labels = candidate_labels |
| self.classifier = pipeline("zero-shot-classification", model=model_path, tokenizer=tokenizer_path, clean_up_tokenization_spaces=True, device=0 if torch.cuda.is_available() else -1) |
|
|
| def classify_text(self, text, top_n=None, multi_label=False): |
| """ |
| Classify a single text using zero-shot classification with truncated scores. |
| |
| :param text: The text to classify |
| :param multi_label: Whether to allow multi-label classification |
| :return: Classification result as a dictionary with scores truncated to 3 decimals |
| """ |
| classification_output = self.classifier(text, self.candidate_labels, multi_label=multi_label, clean_up_tokenization_spaces=True) |
| classification_output['scores'] = [round(score, 3) for score in classification_output['scores']] |
| if top_n is not None: |
| classification_output = { |
| 'sequence': classification_output['sequence'], |
| 'labels': classification_output['labels'][:top_n], |
| 'scores': classification_output['scores'][:top_n] |
| } |
| return classification_output |
|
|
| def classify_dataframe_column(self, df, target_column, feature_suffix, multi_label=False): |
| """ |
| Classify the contents of a dataframe column using zero-shot classification. |
| |
| :param df: The dataframe to process |
| :param target_column: The column containing text to classify |
| :param feature_suffix: Suffix for the output columns |
| :param multi_label: Whether to allow multi-label classification |
| :return: The dataframe with classification results |
| """ |
| tqdm.pandas() |
|
|
| |
| results = df[target_column].progress_apply( |
| lambda text: self.classify_text(text, multi_label=multi_label) |
| ) |
|
|
| |
| df[f'top_class_{feature_suffix}'] = results.apply(lambda res: res['labels'][0]) |
| df[f'top_score_{feature_suffix}'] = results.apply(lambda res: res['scores'][0]) |
| df[f'full_results_{feature_suffix}'] = results.apply(lambda res: list(zip(res['labels'], res['scores']))) |
|
|
| return df |
| |
| def test_zs_predictions(self, df, target_column='text', true_classes_column='category', plot_conf_matrix=True): |
| """ |
| Tests model predictions on a given dataset column using the zero-shot classification pipeline. |
| |
| Args: |
| df (pd.DataFrame): Input dataframe containing texts for zero-shot classification. |
| target_column (str): The name of the column containing text to classify. |
| true_classes_column (str): The column containing annotated classes. |
| |
| Returns: |
| dict: A dictionary containing accuracy, F1 score, and confusion matrix. |
| """ |
| |
| tqdm.pandas(desc=f"Zero-shot classification with {self.model_path}") |
| |
| |
| def classify_row(row): |
| classification_output = self.classifier( |
| row[target_column], |
| self.candidate_labels, |
| multi_label=False, |
| clean_up_tokenization_spaces=True, |
| ) |
| return classification_output["labels"][0] |
|
|
| |
| df = df.copy() |
| df.loc[:, 'predicted_class'] = df.progress_apply(classify_row, axis=1) |
| |
| |
| true_classes = df[true_classes_column] |
| predicted_classes = df['predicted_class'] |
|
|
| |
| accuracy = accuracy_score(true_classes, predicted_classes) |
| f1 = f1_score(true_classes, predicted_classes, average="macro") |
| cm = confusion_matrix(true_classes, predicted_classes, labels=self.candidate_labels) |
| if plot_conf_matrix: |
| disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=self.candidate_labels) |
| fig, ax = plt.subplots(figsize=(4, 4)) |
| disp.plot(cmap=plt.cm.Blues, ax=ax, colorbar=False) |
| ax.set_title(f"Zero-shot classification with {self.model_path}", fontsize=10) |
| ax.set_xlabel("Predicted label", fontsize=8) |
| ax.set_ylabel("True label", fontsize=8) |
|
|
| ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right", fontsize=8) |
| ax.set_yticklabels(ax.get_yticklabels(), fontsize=8) |
|
|
| fig.text( |
| 0.5, 0.01, |
| f"Accuracy: {accuracy:.4f} | F1 Score: {f1:.4f}", |
| ha="center", |
| fontsize=10 |
| ) |
| plt.tight_layout(rect=[0, 0.05, 1, 1]) |
| plt.show() |
|
|
| return { |
| "accuracy": accuracy, |
| "f1_score": f1, |
| "confusion_matrix": cm, |
| "detailed_results": df.to_dict(), |
| } |
| |
| def test_zs_predictions_with_dataset(self, df, target_column='text', true_classes_column='category', plot_conf_matrix=True): |
| dataset = Dataset.from_pandas(df) |
| def classify_text(batch): |
| classification_output = self.classifier( |
| batch[target_column], |
| self.candidate_labels, |
| multi_label=False, |
| clean_up_tokenization_spaces=True, |
| ) |
| return { |
| "predicted_class": classification_output["labels"][0], |
| "predicted_scores": classification_output["scores"], |
| } |
|
|
| |
| classified_dataset = dataset.map(classify_text, batched=False) |
| |
|
|
| |
| true_classes = classified_dataset[true_classes_column] |
| predicted_classes = classified_dataset["predicted_class"] |
|
|
| |
| accuracy = accuracy_score(true_classes, predicted_classes) |
| f1 = f1_score(true_classes, predicted_classes, average="macro") |
|
|
| |
| print(f"Accuracy: {accuracy:.4f}") |
| print(f"F1 Score: {f1:.4f}") |
|
|
| |
| cm = confusion_matrix(true_classes, predicted_classes, labels=self.candidate_labels) |
| if plot_conf_matrix: |
| disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=self.candidate_labels) |
| fig, ax = plt.subplots(figsize=(6, 6)) |
| disp.plot(cmap=plt.cm.Blues, ax=ax) |
| plt.xticks(rotation=45, ha="right") |
| plt.show() |
|
|
| |
| return { |
| "accuracy": accuracy, |
| "f1_score": f1, |
| "confusion_matrix": cm, |
| "detailed_results": classified_dataset.to_dict(), |
| } |
| |
| class MetricsComparison: |
| def __init__(self, base_classifier, fine_tuned_classifier, base_metrics, fine_tuned_metrics): |
| self.base_classifier = base_classifier |
| self.fine_tuned_classifier = fine_tuned_classifier |
| self.base_metrics = base_metrics |
| self.fine_tuned_metrics = fine_tuned_metrics |
|
|
| def compare_conf_matrices(self): |
| fig, axes = plt.subplots(1, 2, figsize=(12, 6)) |
| |
| disp1 = ConfusionMatrixDisplay(confusion_matrix=self.base_metrics["confusion_matrix"], |
| display_labels=self.base_classifier.candidate_labels) |
| disp1.plot(cmap=plt.cm.Blues, ax=axes[0], colorbar=False) |
| axes[0].set_title(f"Zero-shot classification with {self.base_classifier.model_path}", fontsize=10) |
| axes[0].set_xlabel("Predicted class", fontsize=8) |
| axes[0].set_ylabel("True class", fontsize=8) |
| axes[0].set_xticklabels(axes[0].get_xticklabels(), rotation=45, ha="right", fontsize=8) |
| axes[0].set_yticklabels(axes[0].get_yticklabels(), fontsize=8) |
|
|
| fig.text( |
| 0.25, 0.01, |
| f"Accuracy: {self.base_metrics['accuracy']:.4f} | F1 Score: {self.base_metrics['f1_score']:.4f}", |
| ha="center", |
| fontsize=10 |
| ) |
|
|
| |
| disp2 = ConfusionMatrixDisplay(confusion_matrix=self.fine_tuned_metrics["confusion_matrix"], |
| display_labels=self.fine_tuned_classifier.candidate_labels) |
| disp2.plot(cmap=plt.cm.Blues, ax=axes[1], colorbar=False) |
| axes[1].set_title(f"ZS classification with {self.fine_tuned_classifier.model_path}", fontsize=10) |
| axes[1].set_xlabel("Predicted class", fontsize=8) |
| axes[1].set_ylabel("True class", fontsize=8) |
| axes[1].set_xticklabels(axes[1].get_xticklabels(), rotation=45, ha="right", fontsize=8) |
| axes[1].set_yticklabels(axes[1].get_yticklabels(), fontsize=8) |
|
|
| fig.text( |
| 0.75, 0.01, |
| f"Accuracy: {self.fine_tuned_metrics['accuracy']:.4f} | F1 Score: {self.fine_tuned_metrics['f1_score']:.4f}", |
| ha="center", |
| fontsize=10 |
| ) |
|
|
| plt.tight_layout(rect=[0, 0.05, 1, 0.95]) |
| plt.show() |
| |
|
|