| | import os |
| | import numpy as np |
| | import pandas as pd |
| | from sklearn.neighbors import NearestNeighbors |
| | from sklearn.preprocessing import QuantileTransformer |
| | from scipy.stats import gamma |
| | import json |
| |
|
| | class FrontDatasetHandler: |
| | def __init__(self, maestro: pd.DataFrame=None, precios_cierre: pd.DataFrame=None, app_dataset: pd.DataFrame=None, |
| | json_path: str = None, pickle_path: str = None, ignore_columns: list = None, numeric_columns: list = None): |
| | self.maestro = maestro |
| | self.app_dataset = app_dataset |
| | self.pickle_path = pickle_path |
| | |
| | if self.app_dataset is None and json_path is not None: |
| | with open(os.path.join(json_path, "ignore_columns.json"), "r") as f: |
| | self.ignore_columns = json.load(f)['ignore_columns'] |
| | print(f"ignore_columns: {self.ignore_columns}") |
| | with open(os.path.join(json_path, "numeric_columns.json"), "r") as f: |
| | self.numeric_columns = json.load(f)['numeric_columns'] |
| | print(f"numeric_columns: {self.numeric_columns}") |
| | with open(os.path.join(json_path, "app_column_config.json"), "rb") as f: |
| | self.app_dataset_cols = json.load(f)['app_dataset_cols'] |
| | print(f"app_dataset_cols: {self.app_dataset_cols}") |
| |
|
| | with open(os.path.join(json_path, "cat_to_num_maps.json"), "r") as f: |
| | num_maps = json.load(f) |
| | self.sector_num_map = num_maps['sector_num_map'] |
| | self.industry_num_map = num_maps['industry_num_map'] |
| | else: |
| | self.ignore_columns = ignore_columns |
| | self.numeric_columns = numeric_columns |
| | print(f"ignore_columns: {self.ignore_columns}") |
| | print(f"numeric_columns: {self.numeric_columns}") |
| |
|
| | self.norm_columns = None |
| | if maestro is not None: |
| | maestro.drop(columns=self.ignore_columns, inplace=True, errors='ignore') |
| | self.precios_cierre = precios_cierre |
| | self.rend_diario_log = None |
| | self.precios_cierre_fh = None |
| | self.rendimientos_y_volatilidad = None |
| | self.mapeos_var_categoricas = None |
| | self.activos_descartados = [] |
| | self.quantile_scaler = None |
| |
|
| | def filtra_y_homogeneiza(self, n_dias=366, n_dias_descartar=1, min_dias=100): |
| | if self.precios_cierre.index.name != 'date': |
| | self.precios_cierre.set_index('date', inplace=True) |
| | self.precios_cierre.columns.name = 'ticker' |
| | end_date = self.precios_cierre.index.max() |
| | start_date = end_date - pd.Timedelta(days=n_dias) |
| | |
| | if start_date not in self.precios_cierre.index: |
| | earlier_dates = self.precios_cierre.index[self.precios_cierre.index < start_date] |
| | if len(earlier_dates) > 0: |
| | start_date = earlier_dates.max() |
| | else: |
| | start_date = self.precios_cierre.index.min() |
| | |
| | |
| | precios_cierre_fh = self.precios_cierre.loc[start_date:end_date].copy() |
| | |
| | |
| | if n_dias_descartar > 0: |
| | dates_to_drop = precios_cierre_fh.index.sort_values(ascending=False)[:n_dias_descartar] |
| | precios_cierre_fh.drop(dates_to_drop, inplace=True) |
| | |
| | precios_cierre_fh.ffill(axis=0, inplace=True) |
| | self.activos_descartados = precios_cierre_fh.columns[precios_cierre_fh.notna().sum(axis=0) < min_dias].tolist() |
| | precios_cierre_fh.drop(columns=self.activos_descartados, inplace=True) |
| | self.precios_cierre = precios_cierre_fh |
| | return |
| | |
| | def calcula_rendimientos_y_volatilidad(self, n_dias=365, umbral_max=0.3, umbral_min=-0.3): |
| | end_date = self.precios_cierre.index.max() |
| | print(f"Última fecha: {end_date}") |
| | print("Primera fecha: ",self.precios_cierre.index.min()) |
| | start_date = end_date - pd.Timedelta(days=n_dias) |
| | |
| | if start_date not in self.precios_cierre.index: |
| | earlier_dates = self.precios_cierre.index[self.precios_cierre.index < start_date] |
| | if len(earlier_dates) > 0: |
| | start_date = earlier_dates.max() |
| | else: |
| | start_date = self.precios_cierre.index.min() |
| | |
| | _df_rend_y_vol = self.precios_cierre.loc[start_date:end_date].copy() |
| | |
| | _df_rend_y_vol.dropna(how='all', inplace=True) |
| | |
| | _df_rend_y_vol[_df_rend_y_vol <= 0] = np.nextafter(0, 1) |
| | if self.activos_descartados: |
| | _df_rend_y_vol = _df_rend_y_vol.drop(columns=[col for col in self.activos_descartados if col in _df_rend_y_vol.columns]) |
| | if len(_df_rend_y_vol) == 0: |
| | raise ValueError(f"No hay datos disponibles en el rango de {n_dias} días") |
| | |
| | |
| | _rend_diario_log = np.log(_df_rend_y_vol).diff() |
| | _rend_diario_log = _rend_diario_log.iloc[1:] |
| | |
| | print(f'Datos rentabilidad ({n_dias} días) con outliers: {_rend_diario_log.shape}') |
| | |
| | _activos_outliers = _rend_diario_log.columns[((_rend_diario_log > umbral_max) | (_rend_diario_log < umbral_min)).any()].tolist() |
| | self.activos_descartados.extend([asset for asset in _activos_outliers if asset not in self.activos_descartados]) |
| | |
| | _rend_diario_log = _rend_diario_log.loc[:, ~((_rend_diario_log > umbral_max) | (_rend_diario_log < umbral_min)).any()] |
| | print(f'Datos rentabilidad sin outliers: {_rend_diario_log.shape}') |
| | |
| | self.rend_diario_log = _rend_diario_log.copy() |
| | |
| | |
| | if self.rendimientos_y_volatilidad is None: |
| | self.rendimientos_y_volatilidad = pd.DataFrame(columns=_rend_diario_log.columns) |
| | |
| | else: |
| | |
| | self.rendimientos_y_volatilidad = self.rendimientos_y_volatilidad.loc[:, _rend_diario_log.columns] |
| | |
| | |
| | |
| | self.rendimientos_y_volatilidad.loc[f'ret_log_{n_dias}'] = np.sum(_rend_diario_log, axis=0) |
| | self.rendimientos_y_volatilidad.loc[f'ret_{n_dias}'] = (_df_rend_y_vol.ffill().bfill().iloc[-1] / _df_rend_y_vol.ffill().bfill().iloc[0]) - 1 |
| | self.rendimientos_y_volatilidad.loc[f'vol_{n_dias}'] = _rend_diario_log.var()**0.5 |
| | |
| | return |
| | |
| | def cruza_maestro(self): |
| | _rets_y_vol_maestro = self.rendimientos_y_volatilidad.T.reset_index().copy() |
| | _columns_to_merge = [col for col in _rets_y_vol_maestro.columns if col not in self.maestro.columns] |
| | if len(_columns_to_merge) > 0: |
| | _maestro_v2 = self.maestro.merge(_rets_y_vol_maestro, left_on='ticker', right_on='ticker') |
| | _maestro_v2 = _maestro_v2.replace([float('inf'), float('-inf')], np.nan) |
| | self.maestro = _maestro_v2 |
| | else: |
| | raise ValueError("No hay nuevas columnas para cruzar con el dataframe maestro") |
| | return |
| | |
| | def _cat_to_num_(self, df, cat, pre_map=None): |
| | """ |
| | Transforma una columna categórica en un DataFrame a valores numéricos asignando un número entero a cada categoría. |
| | Si no se proporciona un mapeo (`pre_map`), asigna 0 a la categoría más frecuente, 1 a la siguiente más frecuente, y así sucesivamente. |
| | Si se proporciona un mapeo (`pre_map`), utiliza ese mapeo para la conversión. |
| | Parámetros |
| | ---------- |
| | df : pandas.DataFrame |
| | DataFrame que contiene la columna categórica a transformar. |
| | cat : str |
| | Nombre de la columna categórica a transformar. |
| | pre_map : dict, opcional |
| | Diccionario que mapea cada categoría a un valor numérico. Si no se proporciona, el mapeo se genera automáticamente. |
| | Devuelve |
| | -------- |
| | pandas.DataFrame |
| | DataFrame con dos columnas: la columna categórica original y una columna con los valores numéricos asignados. |
| | """ |
| | if not pre_map: |
| | pivot = pd.pivot_table(df, index=[cat], aggfunc='size') |
| | df_sorted = pivot.sort_values(ascending=False).reset_index(name='count') |
| | df_sorted[cat + '_num'] = range(len(df_sorted)) |
| | else: |
| | df_sorted = pd.DataFrame({cat: list(pre_map.keys()), cat + '_num': list(pre_map.values())}) |
| | return df_sorted |
| | |
| | def var_categorica_a_numerica(self, cat_cols): |
| | for col in cat_cols: |
| | if col == 'sectorDisp': |
| | globals()[f"pt_{col}"] = self._cat_to_num_(self.maestro, col, self.sector_num_map) |
| | elif col == 'industryDisp': |
| | globals()[f"pt_{col}"] = self._cat_to_num_(self.maestro, col, self.industry_num_map) |
| | else: |
| | globals()[f"pt_{col}"] = self._cat_to_num_(self.maestro, col) |
| | self.mapeos_var_categoricas = [globals()[f"pt_{col}"] for col in cat_cols] |
| |
|
| | _maestro = self.maestro.copy() |
| | for col, pt in zip(cat_cols, self.mapeos_var_categoricas): |
| | _maestro[col] = _maestro[col].astype(str) |
| | pt[col] = pt[col].astype(str) |
| | |
| | mapping_dict = dict(zip(pt[col], pt[col + '_num'])) |
| | _maestro[col + '_num'] = _maestro[col].map(mapping_dict) |
| | _maestro[col + '_num'] = pd.to_numeric(_maestro[col + '_num'], errors='coerce') |
| |
|
| | self.maestro = _maestro |
| | return |
| | |
| | def normaliza_por_cuantiles(self): |
| | maestro_copy = self.maestro.copy() |
| | numeric_columns = maestro_copy.select_dtypes(include=np.number).columns |
| | self.quantile_scaler = QuantileTransformer(output_distribution='uniform') |
| | variables_numericas = [col for col in numeric_columns if not col.endswith('_norm')] |
| | all_na_cols = [col for col in variables_numericas if maestro_copy[col].isna().all()] |
| | variables_numericas = [col for col in variables_numericas if col not in all_na_cols] |
| | self.norm_columns = ['{}_norm'.format(var) for var in variables_numericas] |
| | maestro_copy[self.norm_columns] = self.quantile_scaler.fit_transform(maestro_copy[variables_numericas]) |
| | maestro_copy[self.norm_columns] = maestro_copy[self.norm_columns].clip(0, 1) |
| | self.maestro = maestro_copy |
| | return |
| | |
| | def var_estandar_z(self): |
| | maestro_copy = self.maestro.copy() |
| | numeric_columns = maestro_copy.select_dtypes(include=np.number).columns |
| | variables_numericas = [col for col in numeric_columns if not col.endswith('_std')] |
| | variables_num_std = ['{}_std'.format(var) for var in variables_numericas] |
| |
|
| | def estandarizar(x): |
| | |
| | mean_val = x.mean() |
| | std_val = x.std() |
| | if pd.isna(std_val) or std_val == 0: |
| | return pd.Series(0.0, index=x.index, name=x.name) |
| | else: |
| | normalized_series = (x - mean_val) / std_val |
| | return normalized_series.fillna(0.0) |
| |
|
| | normalized_data = maestro_copy[variables_numericas].apply(estandarizar, axis=0) |
| | maestro_copy[variables_num_std] = normalized_data |
| | self.maestro = maestro_copy |
| | return |
| | |
| | def configura_distr_prob(self, shape, loc, scale, max_dist, precision_cdf): |
| | x = np.linspace(0, max_dist, num=precision_cdf) |
| | y_pdf = gamma.pdf(x, shape, loc, scale ) |
| | y_cdf = gamma.cdf(x, shape, loc, scale ) |
| | return y_pdf, y_cdf |
| | |
| | def calculos_y_ajustes_dataset_activos(self): |
| | maestro_copy = self.maestro.copy() |
| | |
| | for column in self.numeric_columns: |
| | if column in maestro_copy.columns: |
| | maestro_copy[column] = pd.to_numeric(maestro_copy[column], errors='coerce') |
| | |
| | |
| | |
| | |
| | if self.precios_cierre is not None and not self.precios_cierre.index.empty: |
| | _most_recent_date = self.precios_cierre.index.max().date() |
| | else: |
| | _most_recent_date = pd.Timestamp.today().date() |
| | |
| | maestro_copy['firstTradeDateMilliseconds'] = pd.to_datetime(maestro_copy['firstTradeDateMilliseconds'], unit='ms', errors='coerce').dt.date |
| | maestro_copy['asset_age'] = maestro_copy['firstTradeDateMilliseconds'].apply( |
| | lambda x: ((_most_recent_date - x).days / 365) if pd.notnull(x) and hasattr(x, 'day') else 0 |
| | ).astype(int) |
| | outlier_thresholds = { |
| | 'beta': (-100, 100), |
| | 'dividendYield': (-1,100), |
| | 'fiveYearAvgDividendYield': (-1,100), |
| | 'trailingAnnualDividendYield': (-1,100), |
| | 'quickRatio': (-1, 500), |
| | 'currentRatio': (-1, 500), |
| | 'ebitda': (-1e12, 1e12), |
| | 'grossProfits': (-1e12, 1e12), |
| | } |
| | for column, (lower_bound, upper_bound) in outlier_thresholds.items(): |
| | maestro_copy.loc[(maestro_copy[column] < lower_bound) | (maestro_copy[column] > upper_bound), column] = pd.NA |
| | self.maestro = maestro_copy.copy() |
| | return |
| | |
| | def filtra_df_activos(self, df, isin_target, filtros, debug=False): |
| | ''' |
| | LEGACY |
| | Devuelve un dataframe filtrado, sin alterar el orden, eliminando características no deseadas, para usar en aplicación de búsqueda de activos sustitutivos. |
| | Las características y valores a filtrar son las de un fondo objetivo dado por su isin. |
| | Por ejemplo, si clean_share es False en filtros, el dataframe final no incluirá más activos con el mismo valor de clean_share que el ISIN objetivo |
| | Argumentos: |
| | df (pandas.core.frame.DataFrame): Dataframe maestro de activos |
| | isin_target (str): ISIN del fondo objetivo |
| | # caracteristicas (list): Lista de str con los nombres de las características |
| | filtros (dict): Diccionario donde las claves son las características y los valores son True si se quiere conservar |
| | debug (bool, optional): Muestra información de depuración. Por defecto False. |
| | Resultado: |
| | df_filt (pandas.core.frame.DataFrame): Dataframe filtrado |
| | ''' |
| | |
| | fondo_target = df[df['ticker'] == isin_target].iloc[0] |
| | if debug: print(f'Tamaño inicial: {df.shape}') |
| | |
| | car_numericas = ['ret_365', 'vol_365', 'marketCap', 'asset_age'] |
| | |
| | |
| | for feature in list(filtros.keys()): |
| | value = fondo_target[feature] |
| | if debug: print(f'{feature} = {value}') |
| | |
| | |
| | if feature in filtros and not filtros[feature]: |
| | if debug: print(f'FILTRO: {feature} != {value}') |
| | df = df[df[feature] != value] |
| | |
| | |
| | if feature in car_numericas: |
| | if feature == 'ret_365': |
| | if debug: print(f'FILTRO NUMÉRICO: {feature} > {value}') |
| | df = df[df[feature] > value] |
| | elif feature == 'vol_365': |
| | if debug: print(f'FILTRO NUMÉRICO: {feature} < {value}') |
| | df = df[df[feature] < value] |
| | elif feature == 'asset_age': |
| | if debug: print(f'FILTRO NUMÉRICO: {feature} > {value}') |
| | df = df[df[feature] > value] |
| | elif feature == 'marketCap': |
| | if debug: print(f'FILTRO NUMÉRICO: {feature} > {value}') |
| | df = df[df[feature] < value] |
| | |
| | df_filt = df |
| | if debug: print(f'Tamaño final: {df_filt.shape}') |
| | return df_filt |
| | |
| | def calcula_ind_sust (self, dist, y_cdf, precision_cdf, max_dist): |
| | try: |
| | idx = int((precision_cdf / max_dist) * dist) |
| | idx = min(idx, len(y_cdf) - 1) |
| | norm_dist = y_cdf[idx] |
| | ind_sust = max(0.0, 1.0 - norm_dist) |
| | except IndexError: |
| | ind_sust = 0 |
| | return ind_sust |
| | |
| |
|
| | def vecinos_cercanos(self, df, variables_busq, caracteristicas, target_ticker, y_cdf, precision_cdf, max_dist, n_neighbors, filtros): |
| | if target_ticker not in df['ticker'].values: |
| | return f"Error: '{target_ticker}' no encontrado en la base de datos" |
| | target_row = df[df['ticker'] == target_ticker] |
| | if ~target_row.index.isin(df.index): |
| | df = pd.concat([df, target_row], ignore_index=True) |
| | |
| | X = df[variables_busq] |
| | model = NearestNeighbors(n_neighbors=n_neighbors) |
| | model.fit(X) |
| | target_row = df[df['ticker'] == target_ticker][variables_busq] |
| | |
| | distances, indices = model.kneighbors(target_row) |
| | |
| | neighbors_df = df.iloc[indices[0]][caracteristicas] |
| | neighbors_df['distance'] = distances[0] |
| | ind_sust = np.array([self.calcula_ind_sust(dist, y_cdf, precision_cdf, max_dist) for dist in distances[0]]) |
| | |
| | neighbors_df['ind_sust'] = ind_sust |
| | neighbors_df = neighbors_df.sort_values(by='distance', ascending=True) |
| | target_row = neighbors_df[neighbors_df['ticker'] == target_ticker] |
| | |
| | |
| | |
| | neighbors_df = self.filtra_df_activos (df = neighbors_df, isin_target = target_ticker, filtros = filtros) |
| | |
| |
|
| | |
| | if ~target_row.index.isin(neighbors_df.index): |
| | neighbors_df = pd.concat([pd.DataFrame(target_row), neighbors_df], ignore_index=True) |
| | |
| | |
| | neighbors_df.set_index('ticker', inplace = True) |
| | return neighbors_df |
| | |
| | def format_large_number(self, n, decimals=2): |
| | if n >= 1e12: |
| | return f'{n / 1e12:.{decimals}f} T' |
| | elif n >= 1e9: |
| | return f'{n / 1e9:.{decimals}f} B' |
| | elif n >= 1e6: |
| | return f'{n / 1e6:.{decimals}f} M' |
| | else: |
| | return str(n) |
| | |
| | def trae_embeddings_desde_pkl(self, embeddings_df_file_name='df_with_embeddings.pkl', embeddings_col_name='embeddings'): |
| | embeddings_df = pd.read_pickle(os.path.join(self.pickle_path, embeddings_df_file_name)) |
| | self.maestro = self.maestro.merge( |
| | embeddings_df[['ticker', embeddings_col_name]], |
| | on='ticker', |
| | how='left' |
| | ) |
| | print(f"Agregados embeddings {self.maestro.shape}") |
| | return |
| | |
| | def procesa_app_dataset(self, periodo=366, n_dias_descartar=1, min_dias=250, umbrales_rend=(-0.3, +0.3), periodos_metricas=[60, 365], |
| | cat_cols = ['industryDisp', 'sectorDisp', 'country', 'city', 'exchange', 'financialCurrency', 'quoteType'], |
| | embeddings_df_file_name='df_with_embeddings.pkl', embeddings_col_name='embeddings'): |
| | if self.app_dataset is not None: |
| | print("app_dataset already exists, skipping processing") |
| | return |
| |
|
| | self.filtra_y_homogeneiza(n_dias=periodo, n_dias_descartar=n_dias_descartar, min_dias=min_dias) |
| | |
| | for periodo_metricas in periodos_metricas: |
| | self.calcula_rendimientos_y_volatilidad(n_dias=periodo_metricas, umbral_max=umbrales_rend[1], umbral_min=umbrales_rend[0]) |
| | self.cruza_maestro() |
| | self.var_categorica_a_numerica(cat_cols) |
| | |
| | self.calculos_y_ajustes_dataset_activos() |
| | self.normaliza_por_cuantiles() |
| | self.trae_embeddings_desde_pkl(embeddings_df_file_name=embeddings_df_file_name, embeddings_col_name=embeddings_col_name) |
| | app_dataset = self.maestro.copy() |
| | app_dataset = app_dataset.fillna({col: 0.5 for col in self.norm_columns}) |
| | |
| | self.app_dataset = app_dataset[self.app_dataset_cols].copy() |
| | print(f"app_dataset preparado: {self.app_dataset.shape}") |
| | return |