| | |
| | import numpy as np |
| | from scipy.optimize import curve_fit |
| | from sympy import symbols, sympify, lambdify |
| | import warnings |
| | from sklearn.metrics import mean_squared_error |
| |
|
| | class BioprocessModel: |
| | def __init__(self): |
| | self.params = {} |
| | self.models = {} |
| | self.r2 = {} |
| | self.rmse = {} |
| | print("DEBUG (models.py): BioprocessModel instanciado.") |
| |
|
| | def set_model(self, model_type, equation_str, param_str): |
| | print(f"\nDEBUG (models.py): set_model llamado para tipo='{model_type}'") |
| | print(f" Equation str (raw): '{equation_str}'") |
| | print(f" Param str (raw): '{param_str}'") |
| |
|
| | equation_str_cleaned = str(equation_str).strip() |
| | if '=' in equation_str_cleaned: |
| | equation_str_cleaned = equation_str_cleaned.split('=', 1)[1].strip() |
| | |
| | if not equation_str_cleaned: |
| | print(f"ERROR (models.py): Ecuación vacía para {model_type}.") |
| | raise ValueError(f"La cadena de la ecuación para '{model_type}' no puede estar vacía.") |
| | if not param_str: |
| | print(f"ERROR (models.py): Cadena de parámetros vacía para {model_type}.") |
| | raise ValueError(f"La cadena de parámetros para '{model_type}' no puede estar vacía.") |
| |
|
| | params_list = [param.strip() for param in param_str.split(',')] |
| | if not all(params_list): |
| | print(f"ERROR (models.py): Algún nombre de parámetro está vacío en '{param_str}' para {model_type}.") |
| | raise ValueError(f"Los nombres de los parámetros no pueden ser vacíos para '{model_type}'.") |
| |
|
| | print(f" Equation (cleaned): '{equation_str_cleaned}'") |
| | print(f" Params list: {params_list}") |
| |
|
| | self.models[model_type] = { |
| | 'equation_str': equation_str_cleaned, |
| | 'params': params_list |
| | } |
| |
|
| | try: |
| | |
| | t_sym = symbols('t') |
| | |
| | current_param_syms = [] |
| | for p_name in params_list: |
| | if not p_name.isidentifier(): |
| | raise ValueError(f"Nombre de parámetro '{p_name}' no es un identificador Python válido para sympy.") |
| | current_param_syms.append(symbols(p_name)) |
| | |
| | |
| | X_val_sym = symbols('X_val') |
| |
|
| | |
| | sympy_expr = sympify(equation_str_cleaned) |
| | print(f" Sympy expression: {sympy_expr}") |
| | print(f" Free symbols in expr: {sympy_expr.free_symbols}") |
| |
|
| | |
| | self.models[model_type]['sympy_expr'] = sympy_expr |
| | self.models[model_type]['param_symbols'] = tuple(current_param_syms) |
| | self.models[model_type]['time_symbol'] = t_sym |
| | self.models[model_type]['X_val_symbol'] = X_val_sym |
| | print(f" Modelo '{model_type}' configurado exitosamente.") |
| |
|
| | except Exception as e: |
| | print(f"ERROR (models.py): Fallo al procesar con sympy para '{model_type}': {e}") |
| | raise ValueError(f"Error en la ecuación o parámetros para '{model_type}': {e}") |
| |
|
| |
|
| | def fit_model(self, model_type, time, data, bounds, biomass_params_fitted=None): |
| | print(f"\nDEBUG (models.py): fit_model llamado para tipo='{model_type}'") |
| | if model_type not in self.models: |
| | print(f"ERROR (models.py): Modelo para '{model_type}' no configurado.") |
| | raise ValueError(f"Modelo para '{model_type}' no configurado. Llama a set_model primero.") |
| |
|
| | model_config = self.models[model_type] |
| | equation_expr = model_config['sympy_expr'] |
| | current_param_names = model_config['params'] |
| | current_param_syms = model_config['param_symbols'] |
| | t_sym = model_config['time_symbol'] |
| | X_val_sym = model_config['X_val_symbol'] |
| |
|
| | print(f" Ajustando con ecuación: {equation_expr}") |
| | print(f" Parámetros a ajustar: {current_param_names}") |
| | print(f" Datos de tiempo (primeros 5): {time[:5]}") |
| | print(f" Datos experimentales (primeros 5): {data[:5]}") |
| | print(f" Límites: {bounds}") |
| | if biomass_params_fitted: |
| | print(f" Parámetros de biomasa ajustados (para S/P): {biomass_params_fitted}") |
| |
|
| | |
| | def fit_model_wrapper(t_data_wrapper, *current_p_values_wrapper): |
| | |
| | |
| | |
| | |
| | subs_dict_wrapper = {sym: val for sym, val in zip(current_param_syms, current_p_values_wrapper)} |
| | |
| | |
| | lambdify_args_wrapper = [t_sym] + list(current_param_syms) |
| | expr_to_lambdify = equation_expr |
| |
|
| | |
| | if model_type in ['substrate', 'product'] and X_val_sym in equation_expr.free_symbols: |
| | if biomass_params_fitted is None or 'biomass' not in self.models or 'sympy_expr' not in self.models['biomass']: |
| | print("ERROR (models.py fit_model_wrapper): Falta config/params de biomasa para modelo S/P dependiente.") |
| | |
| | return np.full_like(t_data_wrapper, np.nan) |
| | |
| | biomass_model_config = self.models['biomass'] |
| | biomass_expr = biomass_model_config['sympy_expr'] |
| | biomass_p_syms = biomass_model_config['param_symbols'] |
| | biomass_t_sym = biomass_model_config['time_symbol'] |
| | |
| | biomass_subs_for_calc = {sym: biomass_params_fitted[name] for sym, name in zip(biomass_p_syms, biomass_model_config['params'])} |
| | |
| | |
| | |
| | try: |
| | |
| | if 'biomass_func_lambdified' not in biomass_model_config: |
| | biomass_model_config['biomass_func_lambdified'] = lambdify( |
| | [biomass_t_sym] + list(biomass_p_syms), |
| | biomass_expr, |
| | 'numpy' |
| | ) |
| | |
| | biomass_p_values_for_calc = [biomass_params_fitted[p_name] for p_name in biomass_model_config['params']] |
| | X_t_values_wrapper = biomass_model_config['biomass_func_lambdified'](t_data_wrapper, *biomass_p_values_for_calc) |
| |
|
| | except Exception as e_biomass_calc: |
| | print(f"ERROR (models.py fit_model_wrapper): Calculando X(t) para S/P: {e_biomass_calc}") |
| | return np.full_like(t_data_wrapper, np.nan) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | if X_val_sym not in current_param_syms: |
| | lambdify_args_wrapper.append(X_val_sym) |
| | |
| | |
| | |
| | func_compiled = lambdify(lambdify_args_wrapper, expr_to_lambdify, 'numpy') |
| | |
| | |
| | try: |
| | |
| | call_args = [t_data_wrapper] + list(current_p_values_wrapper) |
| | if X_val_sym in lambdify_args_wrapper[-1:]: |
| | call_args.append(X_t_values_wrapper) |
| | return func_compiled(*call_args) |
| | except Exception as e_sp_eval: |
| | print(f"ERROR (models.py fit_model_wrapper): Evaluando S/P con X_val: {e_sp_eval}") |
| | return np.full_like(t_data_wrapper, np.nan) |
| |
|
| | else: |
| | func_compiled = lambdify(lambdify_args_wrapper, expr_to_lambdify, 'numpy') |
| | try: |
| | return func_compiled(t_data_wrapper, *current_p_values_wrapper) |
| | except Exception as e_bio_eval: |
| | print(f"ERROR (models.py fit_model_wrapper): Evaluando biomasa: {e_bio_eval}") |
| | return np.full_like(t_data_wrapper, np.nan) |
| |
|
| |
|
| | p0 = np.ones(len(current_param_names)) |
| | lower_bounds, upper_bounds = bounds |
| | lower_bounds = np.array(lower_bounds if len(lower_bounds) == len(p0) else [-np.inf] * len(p0)) |
| | upper_bounds = np.array(upper_bounds if len(upper_bounds) == len(p0) else [np.inf] * len(p0)) |
| | |
| | print(f" Estimaciones iniciales p0: {p0}") |
| | print(f" Límites para curve_fit: L={lower_bounds}, U={upper_bounds}") |
| |
|
| | popt, pcov = None, None |
| | with warnings.catch_warnings(): |
| | warnings.simplefilter("ignore", RuntimeWarning) |
| | warnings.simplefilter("ignore", UserWarning) |
| | try: |
| | popt, pcov = curve_fit(fit_model_wrapper, time, data, p0=p0, bounds=(lower_bounds, upper_bounds), maxfev=50000, method='trf') |
| | print(f" curve_fit completado. Parámetros optimizados (popt): {popt}") |
| | except RuntimeError as e_curvefit: |
| | print(f"ERROR (models.py): curve_fit falló para {model_type} con RuntimeError: {e_curvefit}") |
| | self.params[model_type] = {p: np.nan for p in current_param_names} |
| | self.r2[model_type] = np.nan |
| | self.rmse[model_type] = np.nan |
| | return np.full_like(data, np.nan), None |
| | except ValueError as e_val_curvefit: |
| | print(f"ERROR (models.py): curve_fit falló para {model_type} con ValueError: {e_val_curvefit}") |
| | self.params[model_type] = {p: np.nan for p in current_param_names} |
| | self.r2[model_type] = np.nan |
| | self.rmse[model_type] = np.nan |
| | return np.full_like(data, np.nan), None |
| | except Exception as e_gen_curvefit: |
| | print(f"ERROR (models.py): curve_fit falló inesperadamente para {model_type}: {e_gen_curvefit}") |
| | self.params[model_type] = {p: np.nan for p in current_param_names} |
| | self.r2[model_type] = np.nan |
| | self.rmse[model_type] = np.nan |
| | return np.full_like(data, np.nan), None |
| |
|
| |
|
| | if popt is None: |
| | return np.full_like(data, np.nan), None |
| |
|
| | self.params[model_type] = dict(zip(current_param_names, popt)) |
| | |
| | |
| | try: |
| | y_pred = fit_model_wrapper(time, *popt) |
| | if np.any(np.isnan(y_pred)): |
| | print(f"ADVERTENCIA (models.py): y_pred contiene NaNs después del ajuste para {model_type}.") |
| | self.r2[model_type] = np.nan |
| | self.rmse[model_type] = np.nan |
| | |
| | else: |
| | ss_res = np.sum((data - y_pred) ** 2) |
| | ss_tot = np.sum((data - np.mean(data)) ** 2) |
| | if ss_tot == 0: |
| | self.r2[model_type] = 1.0 if ss_res < 1e-9 else 0.0 |
| | else: |
| | self.r2[model_type] = 1 - (ss_res / ss_tot) |
| | self.rmse[model_type] = np.sqrt(mean_squared_error(data, y_pred)) |
| | except Exception as e_ypred: |
| | print(f"ERROR (models.py): Calculando y_pred final para {model_type}: {e_ypred}") |
| | y_pred = np.full_like(data, np.nan) |
| | self.r2[model_type] = np.nan |
| | self.rmse[model_type] = np.nan |
| |
|
| |
|
| | print(f" Ajuste para {model_type} completado. R2: {self.r2.get(model_type)}, RMSE: {self.rmse.get(model_type)}") |
| | return y_pred, popt |