| import pickle |
| import json |
| import numpy as np |
| import pandas as pd |
| from pathlib import Path |
| from typing import Dict, List, Optional, Union |
| import warnings |
|
|
| warnings.filterwarnings('ignore') |
|
|
|
|
| class EarlyWarningPredictor: |
| """자영업 조기경보 예측 모델""" |
|
|
| def __init__(self, model_path: Optional[str] = None): |
| self.model_path = Path(model_path) if model_path else Path(__file__).parent.parent / 'model' |
| self.xgb_model = None |
| self.lgb_model = None |
| self.catboost_model = None |
| self.label_encoders = {} |
| self.feature_names = [] |
| self.config = {} |
| self.is_loaded = False |
|
|
| @classmethod |
| def from_pretrained(cls, model_name_or_path: str): |
| predictor = cls(model_path=model_name_or_path) |
| predictor.load_model() |
| return predictor |
|
|
| def load_model(self): |
| """모델 및 설정 로드""" |
| if not self.model_path.exists(): |
| raise FileNotFoundError(f"Model directory not found: {self.model_path}") |
|
|
| |
| xgb_path = self.model_path / 'xgboost_model.pkl' |
| if xgb_path.exists(): |
| with open(xgb_path, 'rb') as f: |
| self.xgb_model = pickle.load(f) |
|
|
| |
| lgb_path = self.model_path / 'lightgbm_model.pkl' |
| if lgb_path.exists(): |
| with open(lgb_path, 'rb') as f: |
| self.lgb_model = pickle.load(f) |
|
|
| |
| catboost_path = self.model_path / 'catboost_model.pkl' |
| if catboost_path.exists(): |
| with open(catboost_path, 'rb') as f: |
| self.catboost_model = pickle.load(f) |
|
|
| |
| le_path = self.model_path / 'label_encoders.pkl' |
| if le_path.exists(): |
| with open(le_path, 'rb') as f: |
| self.label_encoders = pickle.load(f) |
|
|
| |
| fn_path = self.model_path / 'feature_names.json' |
| if fn_path.exists(): |
| with open(fn_path, 'r', encoding='utf-8') as f: |
| self.feature_names = json.load(f) |
|
|
| |
| config_path = self.model_path / 'config.json' |
| if config_path.exists(): |
| with open(config_path, 'r', encoding='utf-8') as f: |
| self.config = json.load(f) |
|
|
| self.is_loaded = True |
| print(f"모델 로드 완료: v{self.config.get('model_version', '2.0')}") |
|
|
| def predict(self, store_data: Dict, |
| monthly_usage: Optional[pd.DataFrame] = None, |
| monthly_customers: Optional[pd.DataFrame] = None, |
| threshold: Optional[float] = None) -> Dict: |
| if not self.is_loaded: |
| self.load_model() |
|
|
| |
| from src.feature_engineering import FeatureEngineer |
| engineer = FeatureEngineer() |
|
|
| if monthly_usage is None or monthly_customers is None: |
| |
| features = self._create_simple_features(store_data) |
| else: |
| |
| features = engineer.create_features(store_data, monthly_usage, monthly_customers) |
|
|
| |
| features = self._align_features(features) |
|
|
| |
| threshold = threshold or self.config.get('threshold', 0.5) |
|
|
| if self.xgb_model and self.lgb_model: |
| |
| xgb_prob = self.xgb_model.predict_proba(features)[0][1] |
| lgb_prob = self.lgb_model.predict_proba(features)[0][1] |
|
|
| weights = self.config.get('ensemble_weights', [0.5, 0.5]) |
| closure_probability = weights[0] * xgb_prob + weights[1] * lgb_prob |
|
|
| if self.catboost_model and len(weights) > 2: |
| cat_prob = self.catboost_model.predict_proba(features)[0][1] |
| closure_probability = (weights[0] * xgb_prob + |
| weights[1] * lgb_prob + |
| weights[2] * cat_prob) |
| else: |
| closure_probability = 0.5 |
|
|
| |
| risk_score = closure_probability * 100 |
|
|
| |
| if risk_score < 30: |
| risk_level = '낮음' |
| risk_color = 'green' |
| elif risk_score < 60: |
| risk_level = '보통' |
| risk_color = 'yellow' |
| else: |
| risk_level = '높음' |
| risk_color = 'red' |
|
|
| |
| result = { |
| 'risk_score': round(risk_score, 2), |
| 'risk_level': risk_level, |
| 'risk_color': risk_color, |
| 'closure_probability': round(closure_probability, 4), |
| 'is_at_risk': closure_probability > threshold, |
| 'threshold': threshold, |
| 'confidence': max(closure_probability, 1 - closure_probability), |
| 'model_version': self.config.get('model_version', '2.0') |
| } |
|
|
| |
| if self.xgb_model: |
| result['risk_factors'] = self._analyze_risk_factors(features) |
|
|
| |
| result['action_items'] = self._generate_action_items(result, store_data) |
|
|
| return result |
|
|
| def predict_batch(self, stores_df: pd.DataFrame) -> pd.DataFrame: |
| results = [] |
|
|
| for idx, row in stores_df.iterrows(): |
| store_data = row.to_dict() |
| result = self.predict(store_data) |
| result['store_id'] = row.get('store_id', idx) |
| results.append(result) |
|
|
| return pd.DataFrame(results) |
|
|
| def explain(self, store_data: Dict, top_n: int = 10) -> Dict: |
| |
| result = self.predict(store_data) |
|
|
| explanation = { |
| 'prediction': result, |
| 'top_features': result.get('risk_factors', {}), |
| 'interpretation': self._interpret_prediction(result) |
| } |
|
|
| return explanation |
|
|
| def _create_simple_features(self, store_data: Dict) -> pd.DataFrame: |
| """간단한 특징 생성""" |
| |
| features = { |
| 'sales_avg_all': store_data.get('avg_sales', 50), |
| 'customer_reuse_rate': store_data.get('reuse_rate', 25), |
| 'operation_months': store_data.get('operating_months', 12), |
| 'trend_slope': store_data.get('sales_trend', 0), |
| } |
|
|
| |
| for fname in self.feature_names: |
| if fname not in features: |
| features[fname] = 0 |
|
|
| return pd.DataFrame([features]) |
|
|
| def _align_features(self, features: pd.DataFrame) -> pd.DataFrame: |
| """특징 정렬 및 전처리""" |
| |
| aligned = pd.DataFrame() |
|
|
| for fname in self.feature_names: |
| if fname in features.columns: |
| aligned[fname] = features[fname] |
| else: |
| aligned[fname] = 0 |
|
|
| |
| aligned = aligned.fillna(aligned.median().fillna(0)) |
|
|
| return aligned |
|
|
| def _analyze_risk_factors(self, features: pd.DataFrame) -> Dict[str, float]: |
| """위험 요인 분석""" |
| |
| if not hasattr(self.xgb_model, 'feature_importances_'): |
| return {} |
|
|
| importance = self.xgb_model.feature_importances_ |
| feature_values = features.iloc[0].values |
|
|
| |
| contributions = {} |
|
|
| for i, fname in enumerate(self.feature_names): |
| if importance[i] > 0.01: |
| score = importance[i] * abs(feature_values[i]) * 10 |
|
|
| |
| readable_name = self._translate_feature_name(fname) |
| contributions[readable_name] = min(round(score, 1), 100) |
|
|
| |
| sorted_factors = sorted(contributions.items(), key=lambda x: x[1], reverse=True)[:6] |
|
|
| return dict(sorted_factors) |
|
|
| def _translate_feature_name(self, fname: str) -> str: |
| """특징명을 읽기 쉬운 형태로 변환""" |
| translations = { |
| 'sales_avg': '매출', |
| 'trend_slope': '매출 추세', |
| 'trend_consecutive_down': '연속 하락', |
| 'customer_reuse_rate': '재이용률', |
| 'volatility_cv': '매출 변동성', |
| 'operation_months': '영업 기간', |
| 'sales_recent_vs_previous': '최근 매출 변화' |
| } |
|
|
| for key, value in translations.items(): |
| if key in fname: |
| return value |
|
|
| return fname |
|
|
| def _generate_action_items(self, result: Dict, store_data: Dict) -> List[str]: |
| """액션 아이템 생성""" |
| actions = [] |
|
|
| risk_score = result['risk_score'] |
|
|
| if risk_score > 70: |
| actions.append("즉시 조치 필요: 비용 절감 및 매출 증대 방안 마련") |
| actions.append("현금흐름 개선: 외상 매출 회수 및 재고 최적화") |
| actions.append("전문가 상담: 경영 컨설팅 및 구조조정 검토") |
| elif risk_score > 40: |
| actions.append("매출 분석: 주력 상품/서비스 재점검") |
| actions.append("마케팅 강화: 신규 고객 유치 캠페인") |
| actions.append("차별화 전략: 경쟁력 있는 요소 발굴 및 강화") |
| else: |
| actions.append("현재 상태 유지: 정기적인 모니터링 지속") |
| actions.append("성장 기회 탐색: 추가 매출원 발굴") |
| actions.append("고객 충성도 강화: 멤버십 프로그램 등") |
|
|
| return actions |
|
|
| def _interpret_prediction(self, result: Dict) -> str: |
| """예측 결과 해석""" |
| risk_level = result['risk_level'] |
| risk_score = result['risk_score'] |
|
|
| if risk_level == '높음': |
| return f"위험도가 매우 높습니다 ({risk_score:.1f}점). 폐업 위험이 높으므로 즉각적인 대응이 필요합니다." |
| elif risk_level == '보통': |
| return f"주의가 필요합니다 ({risk_score:.1f}점). 개선 방안을 마련하여 위험을 줄이세요." |
| else: |
| return f"안정적입니다 ({risk_score:.1f}점). 현재의 운영 방식을 유지하면서 지속적으로 모니터링하세요." |
|
|
| def get_model_info(self) -> Dict: |
| """모델 정보 반환""" |
| return { |
| 'version': self.config.get('model_version', '2.0'), |
| 'n_features': self.config.get('n_features', 0), |
| 'performance': self.config.get('performance', {}), |
| 'ensemble_weights': self.config.get('ensemble_weights', []), |
| 'models': { |
| 'xgboost': self.xgb_model is not None, |
| 'lightgbm': self.lgb_model is not None, |
| 'catboost': self.catboost_model is not None |
| } |
| } |
|
|
|
|
| if __name__ == "__main__": |
| |
| print("=" * 70) |
| print("Early Warning Predictor v2.0 테스트") |
| print("=" * 70) |
|
|
| |
| predictor = EarlyWarningPredictor(model_path='../model') |
|
|
| try: |
| predictor.load_model() |
|
|
| |
| store_data = { |
| 'store_id': 'TEST_001', |
| 'industry': '카페', |
| 'location': '서울 강남구', |
| 'avg_sales': 45, |
| 'reuse_rate': 22.5, |
| 'operating_months': 18, |
| 'sales_trend': -0.05 |
| } |
|
|
| |
| result = predictor.predict(store_data) |
|
|
| print("\n예측 결과:") |
| print(f" 위험도 점수: {result['risk_score']}/100") |
| print(f" 위험 등급: {result['risk_level']}") |
| print(f" 폐업 확률: {result['closure_probability']:.1%}") |
|
|
| if 'risk_factors' in result: |
| print("\n주요 위험 요인:") |
| for factor, score in result['risk_factors'].items(): |
| print(f" - {factor}: {score:.1f}점") |
|
|
| print("\n액션 아이템:") |
| for action in result['action_items']: |
| print(f" {action}") |
|
|
| except FileNotFoundError: |
| print("모델 파일이 없습니다. 먼저 모델을 학습해주세요.") |
|
|