Zirok05's picture
Update app/simulation/controllers/pid.py
7ca0449 verified
Raw
History Blame Contribute Delete
4.48 kB
import numpy as np
import pandas as pd
from .base import BaseController
class PIDController(BaseController):
"""PID-регулятор для управления отступами на основе загрузки специалистов"""
def __init__(self, name="PID",
kp_load=0.1, ki_load=0.01, kd_load=0.05,
load_weight=1.0, target_load=0.8,
# Начальные значения параметров
init_threshold=0.5,
init_lr_low=0.3, init_lr_high=0.4,
init_second_low=0.35, init_second_high=0.45):
super().__init__(name)
# Коэффициенты PID для загрузки
self.kp_load = kp_load
self.ki_load = ki_load
self.kd_load = kd_load
self.load_weight = load_weight
self.target_load = target_load
# Состояния PID
self.prev_error_load = 0
self.integral_load = 0
# Начальные параметры
self.init_threshold = init_threshold
self.init_lr_low = init_lr_low
self.init_lr_high = init_lr_high
self.init_second_low = init_second_low
self.init_second_high = init_second_high
# Текущие параметры (отступы)
self.threshold = init_threshold
self.lr_low = init_lr_low
self.lr_high = init_lr_high
self.second_low = init_second_low
self.second_high = init_second_high
# Границы отступов
self.bounds = {
'lr_low': (0.05, self.threshold - 0.05),
'lr_high': (0.05, 1 - self.threshold - 0.05),
'second_low': (0.05, self.threshold - 0.05),
'second_high': (0.05, 1 - self.threshold - 0.05)
}
# Ограничение интеграла
self.integral_limit = 1.0
def update(self, current_load):
""" current_load: текущая загрузка специалистов (0-1)"""
# Ошибка по загрузке
error_load = self.target_load - current_load
# PID для загрузки
P_load = self.kp_load * error_load
self.integral_load += error_load
self.integral_load = np.clip(self.integral_load, -self.integral_limit, self.integral_limit)
I_load = self.ki_load * self.integral_load
D_load = self.kd_load * (error_load - self.prev_error_load)
self.prev_error_load = error_load
# Выход регулятора
output_load = P_load + I_load + D_load
output = self.load_weight * output_load
# Адаптируем отступы
self._update_parameters(output)
# Сохраняем историю
self.history.append({
'time': len(self.history),
'error_load': error_load,
'output': output,
'threshold': self.threshold,
'lr_low': self.lr_low,
'lr_high': self.lr_high,
'second_low': self.second_low,
'second_high': self.second_high,
'load': current_load,
})
return self.get_margins()
def _update_parameters(self, output):
"""Обновляет отступы на основе выхода регулятора"""
delta = output * 0.1
self.lr_low = np.clip(
self.lr_low + delta,
self.bounds['lr_low'][0],
self.bounds['lr_low'][1]
)
self.lr_high = np.clip(
self.lr_high + delta,
self.bounds['lr_high'][0],
self.bounds['lr_high'][1]
)
self.second_low = np.clip(
self.second_low + delta,
self.bounds['second_low'][0],
self.bounds['second_low'][1]
)
self.second_high = np.clip(
self.second_high + delta,
self.bounds['second_high'][0],
self.bounds['second_high'][1]
)
def get_margins(self, hour=None):
"""Возвращает текущие отступы"""
return {
'lr_low': self.lr_low,
'lr_high': self.lr_high,
'second_low': self.second_low,
'second_high': self.second_high
}
def get_history(self):
"""Возвращает историю для визуализации"""
return pd.DataFrame(self.history)