| import numpy as np |
| import pandas as pd |
| from .base import BaseController |
|
|
|
|
| class PIDController(BaseController): |
| """PID-регулятор для управления отступами на основе загрузки специалистов""" |
|
|
| def __init__(self, name="PID", |
| kp_load=0.1, ki_load=0.01, kd_load=0.05, |
| load_weight=1.0, target_load=0.8, |
| |
| init_threshold=0.5, |
| init_lr_low=0.3, init_lr_high=0.4, |
| init_second_low=0.35, init_second_high=0.45): |
|
|
| super().__init__(name) |
|
|
| |
| self.kp_load = kp_load |
| self.ki_load = ki_load |
| self.kd_load = kd_load |
|
|
| self.load_weight = load_weight |
| self.target_load = target_load |
|
|
| |
| self.prev_error_load = 0 |
| self.integral_load = 0 |
|
|
| |
| self.init_threshold = init_threshold |
| self.init_lr_low = init_lr_low |
| self.init_lr_high = init_lr_high |
| self.init_second_low = init_second_low |
| self.init_second_high = init_second_high |
|
|
| |
| self.threshold = init_threshold |
| self.lr_low = init_lr_low |
| self.lr_high = init_lr_high |
| self.second_low = init_second_low |
| self.second_high = init_second_high |
|
|
| |
| self.bounds = { |
| 'lr_low': (0.05, self.threshold - 0.05), |
| 'lr_high': (0.05, 1 - self.threshold - 0.05), |
| 'second_low': (0.05, self.threshold - 0.05), |
| 'second_high': (0.05, 1 - self.threshold - 0.05) |
| } |
|
|
| |
| self.integral_limit = 1.0 |
|
|
| def update(self, current_load): |
| |
| """ current_load: текущая загрузка специалистов (0-1)""" |
| |
| |
| error_load = self.target_load - current_load |
|
|
| |
| P_load = self.kp_load * error_load |
| self.integral_load += error_load |
| self.integral_load = np.clip(self.integral_load, -self.integral_limit, self.integral_limit) |
| I_load = self.ki_load * self.integral_load |
| D_load = self.kd_load * (error_load - self.prev_error_load) |
| self.prev_error_load = error_load |
|
|
| |
| output_load = P_load + I_load + D_load |
| output = self.load_weight * output_load |
|
|
| |
| self._update_parameters(output) |
|
|
| |
| self.history.append({ |
| 'time': len(self.history), |
| 'error_load': error_load, |
| 'output': output, |
| 'threshold': self.threshold, |
| 'lr_low': self.lr_low, |
| 'lr_high': self.lr_high, |
| 'second_low': self.second_low, |
| 'second_high': self.second_high, |
| 'load': current_load, |
| }) |
|
|
| return self.get_margins() |
|
|
| def _update_parameters(self, output): |
| """Обновляет отступы на основе выхода регулятора""" |
| delta = output * 0.1 |
| self.lr_low = np.clip( |
| self.lr_low + delta, |
| self.bounds['lr_low'][0], |
| self.bounds['lr_low'][1] |
| ) |
| self.lr_high = np.clip( |
| self.lr_high + delta, |
| self.bounds['lr_high'][0], |
| self.bounds['lr_high'][1] |
| ) |
| self.second_low = np.clip( |
| self.second_low + delta, |
| self.bounds['second_low'][0], |
| self.bounds['second_low'][1] |
| ) |
| self.second_high = np.clip( |
| self.second_high + delta, |
| self.bounds['second_high'][0], |
| self.bounds['second_high'][1] |
| ) |
|
|
| def get_margins(self, hour=None): |
| """Возвращает текущие отступы""" |
| return { |
| 'lr_low': self.lr_low, |
| 'lr_high': self.lr_high, |
| 'second_low': self.second_low, |
| 'second_high': self.second_high |
| } |
|
|
| def get_history(self): |
| """Возвращает историю для визуализации""" |
| return pd.DataFrame(self.history) |