| import os |
|
|
| import numpy as np |
| import pandas as pd |
| from dotenv import load_dotenv |
| from langdetect import detect |
| from loguru import logger |
| from sklearn.model_selection import train_test_split |
| from time import sleep |
| from transformers import BertModel, AutoTokenizer |
| from tqdm import tqdm |
| import torch |
| from config import DEVICE |
|
|
| from src.utils.text_functions import clean_text, detect_language |
| from src.utils import ( |
| get_sentiment, |
| detect_language, |
| ) |
|
|
| from src.regression.PL import ( |
| get_bert_embedding, |
| get_concat_embedding, |
| ) |
|
|
| from src.utils.s3 import read_csv, save_csv |
|
|
|
|
| load_dotenv() |
|
|
|
|
| class RegressionDataset: |
| def __init__( |
| self, |
| s3: bool = False, |
| bucket: str = "lebesgue-data-science", |
| folder: str = os.getenv("GLOBAL_PATH_TO_REPO") + "/data", |
| s3_folder: str = "transformers/data", |
| ): |
| self.s3 = s3 |
| self.bucket = bucket |
|
|
| if self.s3: |
| self.folder = s3_folder |
| else: |
| self.folder = folder |
|
|
| self.original_path = f"{self.folder}/original.csv" |
| self.untrimmed_path = f"{self.folder}/untrimmed.csv" |
| self.normalized_path = f"{self.folder}/normalized.csv" |
| self.trimmed_path = f"{self.folder}/trimmed.csv" |
|
|
| self.train_path = f"{self.folder}/train.csv" |
| self.val_path = f"{self.folder}/val.csv" |
| self.test_path = f"{self.folder}/test.csv" |
|
|
| self.text_types = ["primary", "title", "description"] |
|
|
| self.col_func_dict = { |
| "number": len, |
| "len": lambda texts: np.mean([len(text) for text in texts]), |
| } |
|
|
| @property |
| def original(self) -> pd.DataFrame: |
| df = read_csv(path=self.original_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| return df |
|
|
| @property |
| def untrimmed(self) -> pd.DataFrame: |
| df = read_csv(path=self.untrimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| return df |
|
|
| @property |
| def normalized(self) -> pd.DataFrame: |
| df = read_csv(path=self.normalized_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| return df |
|
|
| @property |
| def trimmed(self) -> pd.DataFrame: |
| df = read_csv(path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| return df |
|
|
| @property |
| def train(self) -> pd.DataFrame: |
| df = read_csv(path=self.train_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| return df |
|
|
| @property |
| def val(self) -> pd.DataFrame: |
| df = read_csv(path=self.val_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| return df |
|
|
| @property |
| def test(self) -> pd.DataFrame: |
| df = read_csv(path=self.test_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| return df |
|
|
| def normalize_untrimmed(self, group_cols: list[str] = ["text", "target", "shop_id"]) -> pd.DataFrame: |
| df = self.untrimmed |
| grouped = df.groupby(group_cols) |
|
|
| filters_df = grouped.agg({"impr": "sum", "spend": "sum"}).reset_index() |
| ctr = grouped.apply(lambda df: df.link_clicks.sum() / df.impr.sum()) |
| ctr_df = pd.DataFrame(ctr, columns=["ctr"]).reset_index() |
| normalised = filters_df.merge(ctr_df, on=group_cols) |
|
|
| merged = df.merge(normalised, on=group_cols, validate="m:1", suffixes=["___", None]) |
| merged.drop(list([col for col in merged.columns if "___" in col]), inplace=True, axis=1) |
| final = merged.drop_duplicates(group_cols) |
| save_csv( |
| df=final, |
| path=self.normalized_path, |
| s3=self.s3, |
| s3_args={"bucket": self.bucket}, |
| ) |
| return df |
|
|
| def expand_untrimmed(self, update_existing_columns: bool = False) -> pd.DataFrame: |
|
|
| df = self.untrimmed |
|
|
| |
| |
| |
| |
| |
| |
|
|
| new_col_func_dict = self.col_func_dict |
|
|
| if not update_existing_columns: |
| new_col_func_dict = { |
| col: fun for col, fun in new_col_func_dict.items() if "primary_" + col not in df.columns |
| } |
|
|
| |
| for col, func in new_col_func_dict.items(): |
| logger.debug(col) |
| for text_type in self.text_types: |
| df[f"{text_type}_{col}"] = df[text_type].apply(func) |
|
|
| df["has_text"] = df.apply( |
| lambda df: bool(df.primary_number + df.title_number + df.description_number), |
| axis=1, |
| ) |
|
|
| |
| df = df.apply(_get_text, axis=1) |
| df = df.apply(_get_concatinated_text, axis=1) |
|
|
| df["language"] = df.text.apply(detect_language) |
| df = df[df.language == "en"] |
| df = df[df.ctr.notna()] |
|
|
| save_csv(df=df, path=self.untrimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
| return df |
|
|
| def trim(self, min_impr: int = 900, min_spend: float = 90) -> pd.DataFrame: |
| df = self.normalized |
| df = df[(df.impr >= min_impr) & (df.spend >= min_spend)] |
| df = df[df.target == "acquisition"] |
| df = df[df.aov.notna()] |
|
|
| df = df[df.has_text == True] |
|
|
| save_csv(df=df, path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
|
|
| return df |
|
|
| def expand_trimmed( |
| self, bert: BertModel = None, tokenizer: AutoTokenizer = None, add_bert_embeddings_bool: bool = False |
| ) -> pd.DataFrame: |
| df = self.trimmed |
|
|
| |
| for col in ["text", "concat_text"]: |
| df[f"{col}_clean"] = df[col].apply(clean_text) |
|
|
| df["text_clean_sentiment"] = df.text_clean.apply(get_sentiment) |
|
|
| if add_bert_embeddings_bool: |
| if tokenizer is None or bert is None: |
| raise ValueError("tokenizer or bert is None") |
| layer_dict = {"bert": bert, "tokenizer": tokenizer} |
| df = add_bert_embeddings(df=df, save_path=self.trimmed_path, layer_dict=layer_dict) |
|
|
| df = df.apply(add_concat_embeddings, axis=1) |
|
|
| save_csv(df=df, path=self.trimmed_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| return df |
|
|
| def split_into_train_and_test( |
| self, |
| ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
| df = self.trimmed |
| train, test = train_test_split(df, train_size=0.9, random_state=42) |
| train, val = train_test_split(train, train_size=0.85, random_state=42) |
| save_csv(df=train, path=self.train_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| save_csv(df=val, path=self.val_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| save_csv(df=test, path=self.test_path, s3=self.s3, s3_args={"bucket": self.bucket}) |
| return train, val, test |
|
|
| def expand_normalise_trim_split( |
| self, |
| update_existing_columns: bool = False, |
| group_cols=["text", "target", "shop_id"], |
| ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
| self.expand_untrimmed(update_existing_columns=update_existing_columns) |
| self.normalize_untrimmed(group_cols=group_cols) |
| self.trim() |
| self.expand_trimmed() |
| train, val, test = self.split_into_train_and_test() |
| return train, val, test |
|
|
|
|
| def _get_text(ad: pd.Series) -> pd.Series: |
|
|
| if ad.primary_number > 0: |
| ad["text"] = ad.primary[0] |
|
|
| elif ad.description_number > 0: |
| ad["text"] = ad.description[0] |
|
|
| elif ad.title_number > 0: |
| ad["text"] = ad.title[0] |
|
|
| else: |
| ad["text"] = None |
|
|
| return ad |
|
|
|
|
| def _get_concatinated_text(ad: pd.Series) -> pd.Series: |
|
|
| concat_text = "" |
|
|
| if ad.primary_number > 0: |
| concat_text = concat_text + ad.primary[0] |
|
|
| if ad.description_number > 0: |
| concat_text = concat_text + ad.description[0] |
|
|
| if ad.title_number > 0: |
| concat_text = concat_text + ad.title[0] |
|
|
| ad["concat_text"] = concat_text |
|
|
| return ad |
|
|
|
|
| regression_dataset = RegressionDataset() |
|
|
| regression_dataset_s3 = RegressionDataset(s3=True) |
|
|
|
|
| def add_bert_embeddings(df: pd.DataFrame, save_path: str, layer_dict: dict = {}, device=DEVICE) -> pd.DataFrame: |
|
|
| if device == torch.device("cuda"): |
| df["my_bert_cls_embedding"] = df.text_clean.apply( |
| lambda text: get_bert_embedding(text=text, cls=True, layer_dict=layer_dict) |
| ) |
| df["my_bert_mean_embedding"] = df.text_clean.apply( |
| lambda text: get_bert_embedding(text=text, cls=False, layer_dict=layer_dict) |
| ) |
| return df |
|
|
| if "my_bert_cls_embedding" not in df.columns: |
| df["my_bert_cls_embedding"] = None |
|
|
| if "my_bert_mean_embedding" not in df.columns: |
| df["my_bert_mean_embedding"] = None |
|
|
| counter = 0 |
|
|
| df["my_bert_cls_embedding"] = df["my_bert_cls_embedding"].astype(object) |
| df["my_bert_mean_embedding"] = df["my_bert_mean_embedding"].astype(object) |
|
|
| for i in tqdm(range(len(df))): |
|
|
| if df.at[i, "my_bert_cls_embedding"] is not None: |
| df.at[i, "my_bert_cls_embedding"] = get_bert_embedding( |
| text=df.at[i, "text_clean"], cls=False, layer_dict=layer_dict |
| ) |
| counter = counter + 1 |
| sleep(0.5) |
|
|
| if df.at[i, "my_bert_mean_embedding"] is not None: |
| df.at[i, "my_bert_mean_embedding"] = get_bert_embedding( |
| text=df.at[i, "text_clean"], cls=True, layer_dict=layer_dict |
| ) |
| counter = counter + 1 |
| sleep(0.5) |
|
|
| if counter % 50 in [0, 1]: |
| df.to_csv(save_path, index=False) |
|
|
| df.to_csv(save_path, index=False) |
|
|
| return df |
|
|
|
|
| def add_concat_embeddings(series: pd.DataFrame) -> pd.Series: |
| other_features = {"aov": series["aov"]} | series["text_clean_sentiment"] |
|
|
| for type in ["cls", "mean"]: |
| bert_embedding = series[f"my_bert_{type}_embedding"] |
| series[f"my_full_{type}_embedding"] = get_concat_embedding( |
| bert_embedding=bert_embedding, other_features=other_features |
| ) |
|
|
| return series |
|
|