Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import pickle | |
| import random | |
| import re | |
| import threading | |
| import time | |
| from collections import Counter, defaultdict | |
| from dataclasses import dataclass | |
| from datetime import date, datetime, time as dt_time, timedelta | |
| from typing import Any, Dict, List, Optional, Set, Tuple | |
| import pandas as pd | |
| import requests | |
| import urllib3 | |
| from dotenv import load_dotenv | |
| from schedule_api_client import clean_movie_title, fetch_hall_info, fetch_schedule_data, get_valid_token | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| load_dotenv() | |
| CONFIG_FILE = os.path.join("cinema_cache", "nextday_schedule_optimizer_config.json") | |
| JOB_STATE_FILE = os.path.join("cinema_cache", "nextday_optimizer_job_state.json") | |
| JOB_PAYLOAD_FILE = os.path.join("cinema_cache", "nextday_optimizer_job_payload.pkl") | |
| JOB_RESULT_FILE = os.path.join("cinema_cache", "nextday_optimizer_job_result.pkl") | |
| _JOB_THREAD: Optional[threading.Thread] = None | |
| DEFAULT_CONFIG: Dict[str, Any] = { | |
| "business_start": "09:30", | |
| "business_end": "01:30", | |
| "turnaround_base": 10, | |
| "golden_start": "14:00", | |
| "golden_end": "21:00", | |
| "efficiency_enabled": True, | |
| "efficiency_penalty_coef": 1.0, | |
| "eff_daily_delta_cap": 5, | |
| "rule1_enabled": True, | |
| "rule1_gap": 30, | |
| "rule2_enabled": True, | |
| "rule2_threshold": 4, | |
| "rule2_window_minutes": 30, | |
| "rule2_penalty": 15.0, | |
| "rule2_exempt_ranges": ["14:00-15:00", "19:00-20:00"], | |
| "rule3_enabled": True, | |
| "rule3_gap_minutes": 30, | |
| "rule3_penalty": 12.0, | |
| "rule4_enabled": True, | |
| "rule4_earliest": "10:00", | |
| "rule4_latest": "22:30", | |
| "rule9_enabled": True, | |
| "rule9_hot_top_n": 3, | |
| "rule9_min_ratio": 0.30, | |
| "rule9_penalty": 20.0, | |
| "rule11_enabled": True, | |
| "rule11_after_time": "22:00", | |
| "rule11_penalty": 30.0, | |
| "rule12_enabled": True, | |
| "rule12_penalty_each": 25.0, | |
| "rule13_enabled": True, | |
| "rule13_forbidden_halls": ["2", "8", "9"], | |
| "tms_allowance": 0, | |
| "maintenance_blocks": [], | |
| "iterations": 300, | |
| "random_seed": 20260331, | |
| } | |
| class RuleContext: | |
| target_date: date | |
| business_start_dt: datetime | |
| business_end_dt: datetime | |
| golden_start_dt: datetime | |
| golden_end_dt: datetime | |
| params: Dict[str, Any] | |
| blockouts_by_hall: Dict[str, List[Tuple[datetime, datetime]]] | |
| movie_targets: Dict[str, Dict[str, Any]] | |
| movie_weights: Dict[str, float] | |
| tms_by_hall: Dict[str, List[Dict[str, Any]]] | |
| manual_constraints: Dict[str, Dict[str, Optional[float]]] | |
| allowed_movies: Set[str] | |
| preview_windows_by_identity: Dict[str, List[Tuple[datetime, datetime]]] | |
| class CandidateResult: | |
| schedule: List[Dict[str, Any]] | |
| score: float | |
| score_breakdown: List[Tuple[str, float, str]] | |
| hard_violations: List[str] | |
| def serialize_candidate(cand: CandidateResult) -> Dict[str, Any]: | |
| return { | |
| "schedule": cand.schedule, | |
| "score": float(cand.score), | |
| "score_breakdown": [list(x) for x in (cand.score_breakdown or [])], | |
| "hard_violations": list(cand.hard_violations or []), | |
| } | |
| def deserialize_candidate(obj: Any) -> Optional[CandidateResult]: | |
| if isinstance(obj, CandidateResult): | |
| return obj | |
| if not isinstance(obj, dict): | |
| return None | |
| score_breakdown = obj.get("score_breakdown") or [] | |
| parsed_bd: List[Tuple[str, float, str]] = [] | |
| for x in score_breakdown: | |
| if isinstance(x, (list, tuple)) and len(x) >= 3: | |
| parsed_bd.append((str(x[0]), float(x[1]), str(x[2]))) | |
| return CandidateResult( | |
| schedule=list(obj.get("schedule") or []), | |
| score=float(obj.get("score") or 0.0), | |
| score_breakdown=parsed_bd, | |
| hard_violations=list(obj.get("hard_violations") or []), | |
| ) | |
| def ensure_cache_dir() -> None: | |
| os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True) | |
| def load_config() -> Dict[str, Any]: | |
| ensure_cache_dir() | |
| if not os.path.exists(CONFIG_FILE): | |
| return dict(DEFAULT_CONFIG) | |
| try: | |
| with open(CONFIG_FILE, "r", encoding="utf-8") as f: | |
| loaded = json.load(f) | |
| cfg = dict(DEFAULT_CONFIG) | |
| cfg.update(loaded) | |
| return cfg | |
| except Exception: | |
| return dict(DEFAULT_CONFIG) | |
| def save_config(cfg: Dict[str, Any]) -> None: | |
| ensure_cache_dir() | |
| with open(CONFIG_FILE, "w", encoding="utf-8") as f: | |
| json.dump(cfg, f, ensure_ascii=False, indent=2) | |
| def _atomic_write_json(path: str, payload: Dict[str, Any]) -> None: | |
| ensure_cache_dir() | |
| tmp = f"{path}.tmp" | |
| with open(tmp, "w", encoding="utf-8") as f: | |
| json.dump(payload, f, ensure_ascii=False, indent=2) | |
| os.replace(tmp, path) | |
| def _read_json(path: str, default: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| if default is None: | |
| default = {} | |
| if not os.path.exists(path): | |
| return dict(default) | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if isinstance(data, dict): | |
| out = dict(default) | |
| out.update(data) | |
| return out | |
| return dict(default) | |
| except Exception: | |
| return dict(default) | |
| def _now_text() -> str: | |
| return datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| def _default_job_state() -> Dict[str, Any]: | |
| return { | |
| "status": "idle", | |
| "control": "run", # run | pause | stop | |
| "job_id": "", | |
| "started_at": "", | |
| "started_ts": 0.0, | |
| "ended_at": "", | |
| "updated_at": "", | |
| "target_date": "", | |
| "iterations": 0, | |
| "iter_done": 0, | |
| "progress": 0.0, | |
| "elapsed_seconds": 0.0, | |
| "feasible_count": 0, | |
| "hard_reject": 0, | |
| "build_reject": 0, | |
| "rule_reject": 0, | |
| "reject_reason_top": {}, | |
| "reject_detail_top": {}, | |
| "message": "", | |
| "result_count": 0, | |
| } | |
| def _atomic_write_pickle(path: str, payload: Any) -> None: | |
| ensure_cache_dir() | |
| tmp = f"{path}.tmp" | |
| with open(tmp, "wb") as f: | |
| pickle.dump(payload, f) | |
| os.replace(tmp, path) | |
| def _read_pickle(path: str, default: Any = None) -> Any: | |
| if not os.path.exists(path): | |
| return default | |
| try: | |
| with open(path, "rb") as f: | |
| return pickle.load(f) | |
| except Exception: | |
| return default | |
| def _find_live_worker() -> Optional[threading.Thread]: | |
| global _JOB_THREAD | |
| if _JOB_THREAD is not None and _JOB_THREAD.is_alive(): | |
| return _JOB_THREAD | |
| for t in threading.enumerate(): | |
| if t.name == "nextday-opt-worker" and t.is_alive(): | |
| _JOB_THREAD = t | |
| return t | |
| _JOB_THREAD = None | |
| return None | |
| def read_job_state() -> Dict[str, Any]: | |
| return _read_json(JOB_STATE_FILE, _default_job_state()) | |
| def write_job_state(**kwargs: Any) -> Dict[str, Any]: | |
| state = read_job_state() | |
| state.update(kwargs) | |
| state["updated_at"] = _now_text() | |
| _atomic_write_json(JOB_STATE_FILE, state) | |
| return state | |
| def parse_hm(hm: str, fallback: str) -> dt_time: | |
| raw = str(hm or "").strip() | |
| if not raw: | |
| raw = fallback | |
| try: | |
| return datetime.strptime(raw, "%H:%M").time() | |
| except Exception: | |
| return datetime.strptime(fallback, "%H:%M").time() | |
| def hm_str(t: dt_time) -> str: | |
| return t.strftime("%H:%M") | |
| def parse_operating_dt(d: date, t: dt_time) -> datetime: | |
| dt = datetime.combine(d, t) | |
| if t < dt_time(6, 0): | |
| dt += timedelta(days=1) | |
| return dt | |
| def ceil_datetime_to_step(dt: datetime, step_minutes: int = 5) -> datetime: | |
| aligned = dt.replace(second=0, microsecond=0) | |
| if aligned.minute % step_minutes == 0 and dt.second == 0 and dt.microsecond == 0: | |
| return aligned | |
| add_minutes = (step_minutes - (aligned.minute % step_minutes)) % step_minutes | |
| if add_minutes == 0: | |
| add_minutes = step_minutes | |
| return aligned + timedelta(minutes=add_minutes) | |
| def normalize_hall_key(hall_id: Any, hall_name: Any) -> str: | |
| if hall_id not in (None, ""): | |
| return str(hall_id) | |
| if hall_name in (None, ""): | |
| return "" | |
| nums = re.findall(r"\d+", str(hall_name)) | |
| return nums[0] if nums else str(hall_name) | |
| def extract_hall_no(raw: Any) -> str: | |
| nums = re.findall(r"\d+", str(raw or "")) | |
| return nums[0] if nums else str(raw or "") | |
| def normalize_media_type(media: Any) -> str: | |
| text = str(media or "").upper() | |
| if "3D" in text: | |
| return "3D" | |
| if "2D" in text: | |
| return "2D" | |
| return "" | |
| def movie_policy_key(movie_name: Any, movie_media_type: Any = "") -> str: | |
| """ | |
| 片名策略键: | |
| - 同片不同语言归并(依赖 clean_movie_title 规则) | |
| - 2D/3D 分开(若 clean 后未体现 3D,则追加) | |
| """ | |
| base = clean_movie_title(movie_name or "") | |
| media = normalize_media_type(movie_media_type or movie_name) | |
| if media == "3D" and "3D" not in str(base).upper(): | |
| return f"{base}(数字3D)" | |
| return str(base) | |
| def tms_missing_pair_key(session: Dict[str, Any]) -> Tuple[str, str, str]: | |
| hall_no = extract_hall_no(session.get("hallName") or session.get("hallId")) | |
| policy = movie_policy_key(session.get("movieName", ""), session.get("movieMediaType", "")) | |
| media = normalize_media_type(session.get("movieMediaType", "")) | |
| return hall_no, policy, media | |
| def extract_allowed_movies_from_tuning_df(df: pd.DataFrame) -> Set[str]: | |
| if df is None or df.empty: | |
| return set() | |
| out: Set[str] = set() | |
| for _, row in df.iterrows(): | |
| selected = row.get("选中", False) | |
| if pd.notna(selected) and bool(selected): | |
| key = movie_policy_key(row.get("影片", "")) | |
| if key: | |
| out.add(key) | |
| return out | |
| def normalize_text_token(text: Any) -> str: | |
| s = str(text or "") | |
| s = clean_movie_title(s) | |
| s = re.sub(r"\s+", "", s) | |
| s = re.sub(r"[\[\]【】()()·,.,::!!??'\"-]", "", s) | |
| return s.upper() | |
| def to_float(v: Any, default: float = 0.0) -> float: | |
| try: | |
| if v in (None, "", "None"): | |
| return default | |
| return float(v) | |
| except Exception: | |
| return default | |
| def extract_movie_serial_5_8(movie_num: Any) -> str: | |
| movie_num_norm = re.sub(r"[^A-Z0-9]", "", str(movie_num or "").upper()) | |
| if len(movie_num_norm) >= 8: | |
| return movie_num_norm[4:8] | |
| return "" | |
| def movie_identity_key(movie_num: Any, movie_name: Any) -> str: | |
| serial = extract_movie_serial_5_8(movie_num) | |
| if serial: | |
| return f"serial:{serial}" | |
| return f"name:{clean_movie_title(movie_name or '')}" | |
| def is_3d_by_movie_num_or_media(movie_num: Any, media: Any) -> bool: | |
| movie_num_norm = re.sub(r"[^A-Z0-9]", "", str(movie_num or "").upper()) | |
| if len(movie_num_norm) >= 4 and movie_num_norm[3] == "2": | |
| return True | |
| return "3D" in str(media or "").upper() | |
| def extract_box_office_value(item: Dict[str, Any]) -> float: | |
| for key in ( | |
| "ticketIncome", | |
| "splitTicketIncome", | |
| "todayTicketIncome", | |
| "todayBoxOffice", | |
| "boxOffice", | |
| "box", | |
| "income", | |
| "今日票房", | |
| "今日票房(不含费)", | |
| ): | |
| if key not in item: | |
| continue | |
| raw = item.get(key) | |
| if isinstance(raw, str): | |
| raw = raw.replace(",", "").strip() | |
| try: | |
| val = float(raw) | |
| if val >= 0: | |
| return val | |
| except Exception: | |
| continue | |
| return 0.0 | |
| def sort_movies_by_box_office(box_office_data: List[Dict[str, Any]]) -> List[Tuple[str, float]]: | |
| score_map: Dict[str, float] = {} | |
| order_map: Dict[str, int] = {} | |
| for idx, item in enumerate(box_office_data): | |
| name = clean_movie_title(item.get("movieName") or item.get("影片名称") or "") | |
| if not name: | |
| continue | |
| val = extract_box_office_value(item) | |
| if name not in order_map: | |
| order_map[name] = idx | |
| score_map[name] = max(score_map.get(name, 0.0), val) | |
| if not score_map: | |
| return [] | |
| if max(score_map.values()) > 0: | |
| ranked = sorted(score_map.items(), key=lambda x: x[1], reverse=True) | |
| else: | |
| ranked = sorted(score_map.items(), key=lambda x: order_map.get(x[0], 99999)) | |
| return ranked | |
| def resolve_hot_movies( | |
| df: pd.DataFrame, | |
| box_office_data: List[Dict[str, Any]], | |
| top_n: int, | |
| ) -> Tuple[List[str], str, List[Tuple[str, float]]]: | |
| bo_ranked = sort_movies_by_box_office(box_office_data) | |
| if bo_ranked: | |
| top_val = bo_ranked[0][1] | |
| if top_val > 0: | |
| hot = [m for m, v in bo_ranked if v >= top_val * 0.95] | |
| else: | |
| hot = [m for m, _ in bo_ranked[:top_n]] | |
| if not hot: | |
| hot = [m for m, _ in bo_ranked[:top_n]] | |
| return hot[: max(top_n, len(hot))], "全国大盘票房", bo_ranked | |
| counts = df["movieClean"].value_counts() | |
| if counts.empty: | |
| return [], "无可用数据", [] | |
| max_count = int(counts.iloc[0]) | |
| hot = counts[counts >= max_count * 0.95].index.tolist() | |
| if not hot: | |
| hot = counts.head(top_n).index.tolist() | |
| fallback_ranked = [(m, float(c)) for m, c in counts.items()] | |
| return hot[: max(top_n, len(hot))], "场次数量", fallback_ranked | |
| def rule9_core_windows(d: date) -> List[Tuple[dt_time, dt_time]]: | |
| weekday = d.weekday() | |
| windows = [ | |
| [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], | |
| [(dt_time(14, 0), dt_time(15, 30)), (dt_time(19, 0), dt_time(22, 20))], | |
| [(dt_time(14, 30), dt_time(16, 0)), (dt_time(19, 0), dt_time(21, 40))], | |
| [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], | |
| [(dt_time(14, 0), dt_time(15, 0)), (dt_time(19, 0), dt_time(22, 0))], | |
| [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], | |
| [(dt_time(14, 0), dt_time(17, 0)), (dt_time(19, 0), dt_time(21, 30))], | |
| ] | |
| return windows[weekday] | |
| def time_in_ranges(t: dt_time, ranges: List[Tuple[dt_time, dt_time]]) -> bool: | |
| for st_t, et_t in ranges: | |
| if st_t <= et_t: | |
| if st_t <= t < et_t: | |
| return True | |
| else: | |
| if t >= st_t or t < et_t: | |
| return True | |
| return False | |
| def gap_intersects_any_blockout( | |
| g_st: datetime, | |
| g_et: datetime, | |
| blockouts_by_hall: Dict[str, List[Tuple[datetime, datetime]]], | |
| ) -> bool: | |
| for _, ranges in blockouts_by_hall.items(): | |
| for b_st, b_et in ranges: | |
| if interval_overlaps(g_st, g_et, b_st, b_et): | |
| return True | |
| return False | |
| def parse_exempt_ranges(items: List[str]) -> List[Tuple[dt_time, dt_time]]: | |
| out: List[Tuple[dt_time, dt_time]] = [] | |
| for item in items: | |
| s = str(item or "").strip() | |
| if not s: | |
| continue | |
| if "-" not in s: | |
| continue | |
| try: | |
| st_s, et_s = s.split("-", 1) | |
| out.append((datetime.strptime(st_s.strip(), "%H:%M").time(), datetime.strptime(et_s.strip(), "%H:%M").time())) | |
| except Exception: | |
| continue | |
| return out | |
| def in_any_exempt(ts: datetime, ranges: List[Tuple[dt_time, dt_time]]) -> bool: | |
| t = ts.time() | |
| for st_t, et_t in ranges: | |
| if st_t <= et_t: | |
| if st_t <= t <= et_t: | |
| return True | |
| else: | |
| if t >= st_t or t <= et_t: | |
| return True | |
| return False | |
| def interval_overlaps(a_st: datetime, a_et: datetime, b_st: datetime, b_et: datetime) -> bool: | |
| return not (a_et <= b_st or a_st >= b_et) | |
| def gap_intersects_blockout( | |
| hall_key: str, | |
| g_st: datetime, | |
| g_et: datetime, | |
| blockouts_by_hall: Dict[str, List[Tuple[datetime, datetime]]], | |
| ) -> bool: | |
| for b_st, b_et in blockouts_by_hall.get(hall_key, []): | |
| if interval_overlaps(g_st, g_et, b_st, b_et): | |
| return True | |
| return False | |
| def parse_blockouts_from_config(target_date: date, raw: Any) -> List[Dict[str, Any]]: | |
| if raw in (None, "", []): | |
| return [] | |
| parsed: List[Dict[str, Any]] | |
| if isinstance(raw, str): | |
| try: | |
| payload = json.loads(raw) | |
| parsed = payload if isinstance(payload, list) else [] | |
| except Exception: | |
| parsed = [] | |
| elif isinstance(raw, list): | |
| parsed = raw | |
| else: | |
| parsed = [] | |
| result: List[Dict[str, Any]] = [] | |
| for item in parsed: | |
| if not isinstance(item, dict): | |
| continue | |
| hall_token = str(item.get("hall") or item.get("hallId") or item.get("hallName") or "").strip() | |
| st_s = str(item.get("start") or "").strip() | |
| et_s = str(item.get("end") or "").strip() | |
| if not hall_token or not st_s or not et_s: | |
| continue | |
| try: | |
| st_t = datetime.strptime(st_s, "%H:%M").time() | |
| et_t = datetime.strptime(et_s, "%H:%M").time() | |
| st_dt = parse_operating_dt(target_date, st_t) | |
| et_dt = parse_operating_dt(target_date, et_t) | |
| if et_dt <= st_dt: | |
| et_dt += timedelta(days=1) | |
| result.append( | |
| { | |
| "hall_token": hall_token, | |
| "start": st_dt, | |
| "end": et_dt, | |
| } | |
| ) | |
| except Exception: | |
| continue | |
| return result | |
| def build_hall_blockouts( | |
| blockouts: List[Dict[str, Any]], | |
| hall_name_map: Dict[Any, str], | |
| ) -> Dict[str, List[Tuple[datetime, datetime]]]: | |
| out: Dict[str, List[Tuple[datetime, datetime]]] = {str(hid): [] for hid in hall_name_map.keys()} | |
| for hid, hname in hall_name_map.items(): | |
| hall_key = str(hid) | |
| hall_no = extract_hall_no(hname) | |
| for b in blockouts: | |
| token = str(b["hall_token"]) | |
| token_no = extract_hall_no(token) | |
| if token in (hall_key, str(hname), hall_no, f"{hall_no}号厅") or token_no == hall_no: | |
| out.setdefault(hall_key, []).append((b["start"], b["end"])) | |
| for hall_key in out: | |
| out[hall_key].sort(key=lambda x: x[0]) | |
| return out | |
| def is_3d_movie(movie: Dict[str, Any]) -> bool: | |
| text = f"{movie.get('movieMediaType', '')} {movie.get('movieName', '')}".upper() | |
| return "3D" in text | |
| def fetch_movie_info_for_date(show_date: str) -> List[Dict[str, Any]]: | |
| token = get_valid_token(force_refresh=False) | |
| if not token: | |
| return [] | |
| def _call(tok: str) -> Tuple[int, Dict[str, Any]]: | |
| url = "https://cawapi.yinghezhong.com/show/getMovieInfo" | |
| params = {"showDate": show_date, "token": tok, "_": int(time.time() * 1000)} | |
| headers = { | |
| "Origin": "https://caw.yinghezhong.com", | |
| "Referer": "https://caw.yinghezhong.com/", | |
| "User-Agent": "Mozilla/5.0", | |
| } | |
| resp = requests.get(url, params=params, headers=headers, timeout=15) | |
| resp.raise_for_status() | |
| payload = resp.json() | |
| return int(payload.get("code", -1)), payload | |
| try: | |
| code, payload = _call(token) | |
| if code == 1: | |
| return payload.get("data", []) or [] | |
| if code == 500: | |
| token = get_valid_token(force_refresh=True) | |
| if not token: | |
| return [] | |
| code2, payload2 = _call(token) | |
| return payload2.get("data", []) if code2 == 1 else [] | |
| except Exception: | |
| return [] | |
| return [] | |
| def dedupe_movies_by_policy_key(movies: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| 去重规则: | |
| - 同片不同语言按同一条处理 | |
| - 不同制式(2D/3D)保留 | |
| """ | |
| out: List[Dict[str, Any]] = [] | |
| seen: Set[str] = set() | |
| for m in movies: | |
| key = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) | |
| if not key or key in seen: | |
| continue | |
| seen.add(key) | |
| out.append(m) | |
| return out | |
| def build_preview_windows_for_movies( | |
| target_date: date, | |
| movies: List[Dict[str, Any]], | |
| ) -> Dict[str, List[Tuple[datetime, datetime]]]: | |
| """ | |
| previewShowTime 规则: | |
| - previewShowTime 为空:不限时段 | |
| - previewShowTime 有值且命中 target_date:仅允许落在该日对应时段内开场 | |
| - previewShowTime 有值但未命中 target_date:视为该日不限时段 | |
| """ | |
| out: Dict[str, List[Tuple[datetime, datetime]]] = {} | |
| for m in movies: | |
| identity = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) | |
| if not identity: | |
| continue | |
| raw_windows = m.get("previewShowTime") or [] | |
| if not isinstance(raw_windows, list) or not raw_windows: | |
| continue | |
| matched_target_date = False | |
| allowed: List[Tuple[datetime, datetime]] = [] | |
| for w in raw_windows: | |
| if not isinstance(w, dict): | |
| continue | |
| sd_s = str(w.get("startDate") or "").strip() | |
| ed_s = str(w.get("endDate") or "").strip() | |
| st_s = str(w.get("startTime") or "").strip() | |
| et_s = str(w.get("endTime") or "").strip() | |
| if not sd_s or not ed_s or not st_s or not et_s: | |
| continue | |
| try: | |
| sd = datetime.strptime(sd_s, "%Y-%m-%d").date() | |
| ed = datetime.strptime(ed_s, "%Y-%m-%d").date() | |
| if not (sd <= target_date <= ed): | |
| continue | |
| matched_target_date = True | |
| st_t = datetime.strptime(st_s, "%H:%M").time() | |
| et_t = datetime.strptime(et_s, "%H:%M").time() | |
| st_dt = parse_operating_dt(target_date, st_t) | |
| et_dt = parse_operating_dt(target_date, et_t) | |
| if et_dt <= st_dt: | |
| et_dt += timedelta(days=1) | |
| allowed.append((st_dt, et_dt)) | |
| except Exception: | |
| continue | |
| if matched_target_date and allowed: | |
| out[identity] = allowed | |
| return out | |
| def fetch_realtime_box_office(date_str: str) -> List[Dict[str, Any]]: | |
| token = get_valid_token(force_refresh=False) | |
| if not token: | |
| return [] | |
| url = "https://app.bi.piao51.cn/cinema-app/market/realtimeDailyBoxOffice.action" | |
| params = {"qTime": date_str, "token": token} | |
| headers = {"Host": "app.bi.piao51.cn", "User-Agent": "Mozilla/5.0"} | |
| try: | |
| resp = requests.get(url, params=params, headers=headers, timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get("code") == "A00000": | |
| return data.get("results", {}).get("movieDatalist", []) or [] | |
| except Exception: | |
| return [] | |
| return [] | |
| def fetch_tms_server_movies_raw() -> List[Dict[str, Any]]: | |
| app_secret = os.getenv("TMS_APP_SECRET") | |
| ticket = os.getenv("TMS_TICKET") | |
| theater_id = int(os.getenv("TMS_THEATER_ID", "0")) | |
| x_session_id = os.getenv("TMS_X_SESSION_ID") | |
| if not all([app_secret, ticket, theater_id, x_session_id]): | |
| return [] | |
| try: | |
| token_url = f"https://tms.hengdianfilm.com/cinema-api/admin/generateToken?token=hd&murl=ticket={ticket}" | |
| token_headers = {"Cookie": f"JSESSIONID={x_session_id}", "Content-Type": "application/json"} | |
| token_payload = {"appId": "hd", "appSecret": app_secret, "timeStamp": int(time.time() * 1000)} | |
| token_resp = requests.post(token_url, headers=token_headers, json=token_payload, timeout=10) | |
| token_resp.raise_for_status() | |
| token_data = token_resp.json() | |
| auth_token = token_data.get("param") | |
| if not auth_token: | |
| return [] | |
| list_url = "https://tms.hengdianfilm.com/cinema-api/cinema/server/dcp/list" | |
| list_headers = {"Token": auth_token, "X-SESSIONID": x_session_id} | |
| all_rows: List[Dict[str, Any]] = [] | |
| page_index = 1 | |
| while True: | |
| payload = { | |
| "THEATER_ID": theater_id, | |
| "SOURCE": "SERVER", | |
| "ASSERT_TYPE": 2, | |
| "PAGE_CAPACITY": 200, | |
| "PAGE_INDEX": page_index, | |
| } | |
| movie_resp = requests.post( | |
| list_url, | |
| params={"token": "hd", "murl": "ContentMovie"}, | |
| headers=list_headers, | |
| json=payload, | |
| verify=False, | |
| timeout=20, | |
| ) | |
| movie_resp.raise_for_status() | |
| body = movie_resp.json().get("BODY", {}) | |
| rows = body.get("LIST", []) or [] | |
| if not rows: | |
| break | |
| all_rows.extend(rows) | |
| count = int(body.get("COUNT") or len(all_rows)) | |
| if len(all_rows) >= count: | |
| break | |
| page_index += 1 | |
| time.sleep(0.2) | |
| return all_rows | |
| except Exception: | |
| return [] | |
| def fetch_schedule_and_halls(show_date: str) -> Tuple[List[Dict[str, Any]], Dict[Any, Any], Optional[str]]: | |
| token = get_valid_token(force_refresh=False) | |
| if not token: | |
| return [], {}, "未获取到有效 token" | |
| try: | |
| schedule = fetch_schedule_data(token, show_date) | |
| halls = fetch_hall_info(token) | |
| return schedule or [], halls or {}, None | |
| except ValueError: | |
| token = get_valid_token(force_refresh=True) | |
| if not token: | |
| return [], {}, "token 刷新失败" | |
| try: | |
| schedule = fetch_schedule_data(token, show_date) | |
| halls = fetch_hall_info(token) | |
| return schedule or [], halls or {}, None | |
| except Exception as e: | |
| return [], {}, f"重试后仍失败: {e}" | |
| except Exception as e: | |
| return [], {}, str(e) | |
| def build_hall_name_map(next_day_schedule: List[Dict[str, Any]], hall_seat_map: Dict[Any, Any]) -> Dict[Any, str]: | |
| hall_name_map: Dict[Any, str] = {} | |
| for s in next_day_schedule: | |
| hid = s.get("hallId") | |
| hname = s.get("hallName") | |
| if hid not in (None, "") and hname: | |
| hall_name_map[hid] = str(hname) | |
| if hall_name_map: | |
| return hall_name_map | |
| for hid in hall_seat_map.keys(): | |
| hall_name_map[hid] = f"{hid}号厅" | |
| if not hall_name_map: | |
| hall_name_map = {1: "1号厅", 2: "2号厅", 3: "3号厅", 4: "4号厅"} | |
| return hall_name_map | |
| def session_display_label(session: Dict[str, Any]) -> str: | |
| start = str(session.get("showStartTime") or session.get("startTime") or "").strip() | |
| hall = str(session.get("hallName") or session.get("hallId") or "").strip() | |
| movie = str(session.get("movieName") or "").strip() | |
| return f"{start} | {hall} | {movie}" | |
| def apply_session_exclusions( | |
| schedule_list: List[Dict[str, Any]], | |
| excluded_labels: List[str], | |
| ) -> List[Dict[str, Any]]: | |
| if not schedule_list or not excluded_labels: | |
| return list(schedule_list or []) | |
| exclude_set = set(str(x).strip() for x in excluded_labels if str(x).strip()) | |
| return [s for s in schedule_list if session_display_label(s) not in exclude_set] | |
| def build_today_efficiency( | |
| today_schedule: List[Dict[str, Any]], | |
| hall_seat_map: Dict[Any, Any], | |
| golden_start: dt_time, | |
| golden_end: dt_time, | |
| ) -> pd.DataFrame: | |
| if not today_schedule: | |
| return pd.DataFrame(columns=["影片", "场次", "场次效率", "黄金场次", "黄金效率", "票房"]) | |
| df = pd.DataFrame(today_schedule) | |
| if df.empty: | |
| return pd.DataFrame(columns=["影片", "场次", "场次效率", "黄金场次", "黄金效率", "票房"]) | |
| df["影片"] = df.get("movieName", "").apply(clean_movie_title) | |
| df["总收入"] = pd.to_numeric(df.get("soldBoxOffice", 0), errors="coerce").fillna(0) | |
| df["放映时间"] = pd.to_datetime(df.get("showStartTime", "00:00"), format="%H:%M", errors="coerce").dt.time | |
| by_movie = ( | |
| df.groupby("影片", dropna=False) | |
| .agg(场次=("影片", "size"), 票房=("总收入", "sum")) | |
| .reset_index() | |
| ) | |
| total_revenue = float(by_movie["票房"].sum()) | |
| total_sessions = int(by_movie["场次"].sum()) | |
| by_movie["场次效率"] = 0.0 | |
| if total_revenue > 0 and total_sessions > 0: | |
| by_movie["票房比"] = by_movie["票房"] / total_revenue | |
| by_movie["场次比"] = by_movie["场次"] / total_sessions | |
| by_movie["场次效率"] = ( | |
| (by_movie["票房比"] / by_movie["场次比"]) | |
| .replace([float("inf"), -float("inf")], 0) | |
| .fillna(0) | |
| ) | |
| golden_df = df[df["放映时间"].between(golden_start, golden_end, inclusive="both")].copy() | |
| if golden_df.empty: | |
| by_movie["黄金场次"] = 0 | |
| by_movie["黄金效率"] = 0.0 | |
| else: | |
| g = ( | |
| golden_df.groupby("影片", dropna=False) | |
| .agg(黄金场次=("影片", "size"), 黄金票房=("总收入", "sum")) | |
| .reset_index() | |
| ) | |
| g_total_revenue = float(g["黄金票房"].sum()) | |
| g_total_count = int(g["黄金场次"].sum()) | |
| g["黄金效率"] = 0.0 | |
| if g_total_revenue > 0 and g_total_count > 0: | |
| g["黄金票房比"] = g["黄金票房"] / g_total_revenue | |
| g["黄金场次比"] = g["黄金场次"] / g_total_count | |
| g["黄金效率"] = ( | |
| (g["黄金票房比"] / g["黄金场次比"]) | |
| .replace([float("inf"), -float("inf")], 0) | |
| .fillna(0) | |
| ) | |
| by_movie = by_movie.merge(g[["影片", "黄金场次", "黄金效率"]], on="影片", how="left") | |
| by_movie["黄金场次"] = by_movie["黄金场次"].fillna(0).astype(int) | |
| by_movie["黄金效率"] = by_movie["黄金效率"].fillna(0.0) | |
| return by_movie[["影片", "场次", "场次效率", "黄金场次", "黄金效率", "票房"]] | |
| def build_locked_sessions(raw_next_day_schedule: List[Dict[str, Any]], target_date: date) -> List[Dict[str, Any]]: | |
| locked: List[Dict[str, Any]] = [] | |
| for s in raw_next_day_schedule: | |
| sold = int(s.get("soldTicketNum") or s.get("buyTicketNum") or 0) | |
| if sold <= 0: | |
| continue | |
| try: | |
| st_t = datetime.strptime(str(s.get("showStartTime", "00:00")), "%H:%M").time() | |
| et_t = datetime.strptime(str(s.get("showEndTime", "00:00")), "%H:%M").time() | |
| except Exception: | |
| continue | |
| st_dt = parse_operating_dt(target_date, st_t) | |
| et_dt = parse_operating_dt(target_date, et_t) | |
| if et_dt <= st_dt: | |
| et_dt += timedelta(days=1) | |
| locked.append( | |
| { | |
| "hallId": s.get("hallId"), | |
| "hallName": s.get("hallName") or f"{s.get('hallId')}号厅", | |
| "movieId": s.get("movieId"), | |
| "movieNum": s.get("movieNum"), | |
| "movieName": s.get("movieName", "未知影片"), | |
| "movieDuration": int(s.get("movieLength") or s.get("movieDuration") or 120), | |
| "movieMediaType": s.get("movieMediaType", ""), | |
| "startTime": st_dt, | |
| "endTime": et_dt, | |
| "is_presold": True, | |
| "sold": sold, | |
| } | |
| ) | |
| return locked | |
| def build_tms_index_by_hall(tms_rows: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: | |
| by_hall: Dict[str, List[Dict[str, Any]]] = defaultdict(list) | |
| for row in tms_rows: | |
| halls = row.get("HALL_INFO") or [] | |
| if not isinstance(halls, list) or not halls: | |
| continue | |
| content_name = str(row.get("CONTENT_NAME") or "") | |
| assert_name = str(row.get("ASSERT_NAME") or "") | |
| assert_id = str(row.get("ASSERT_ID") or "") | |
| source_format = str(row.get("SOURCE_FORMAT") or "") | |
| entry = { | |
| "assert_12": re.sub(r"[^A-Za-z0-9]", "", assert_id).upper()[:12], | |
| "name_norm": normalize_text_token(assert_name or content_name), | |
| "media": normalize_media_type(source_format), | |
| } | |
| for hall in halls: | |
| hall_key = extract_hall_no(hall.get("HALL_NAME") or hall.get("HALL_ID")) | |
| by_hall[hall_key].append(entry) | |
| return dict(by_hall) | |
| def session_in_tms(session: Dict[str, Any], hall_key: str, tms_by_hall: Dict[str, List[Dict[str, Any]]]) -> bool: | |
| if not tms_by_hall: | |
| return True | |
| entries = tms_by_hall.get(extract_hall_no(hall_key), []) | |
| if not entries: | |
| return False | |
| movie_name_norm = normalize_text_token(session.get("movieName")) | |
| movie_num_12 = re.sub(r"[^A-Za-z0-9]", "", str(session.get("movieNum") or "")).upper()[:12] | |
| media = normalize_media_type(session.get("movieMediaType")) | |
| for e in entries: | |
| id_ok = bool(movie_num_12) and movie_num_12 == e.get("assert_12") | |
| name_norm = e.get("name_norm") or "" | |
| name_ok = movie_name_norm and ( | |
| movie_name_norm == name_norm | |
| or (movie_name_norm in name_norm) | |
| or (name_norm in movie_name_norm) | |
| ) | |
| media_ok = (not media) or (not e.get("media")) or media == e.get("media") | |
| if media_ok and (id_ok or name_ok): | |
| return True | |
| return False | |
| def build_movie_targets( | |
| movies: List[Dict[str, Any]], | |
| today_eff: pd.DataFrame, | |
| locked_sessions: List[Dict[str, Any]], | |
| box_office_data: List[Dict[str, Any]], | |
| rule12_enabled: bool = True, | |
| ) -> Dict[str, Dict[str, Any]]: | |
| locked_total = Counter(movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) for s in locked_sessions) | |
| locked_golden = Counter() | |
| targets: Dict[str, Dict[str, Any]] = {} | |
| eff_map = { | |
| movie_policy_key(r["影片"]): r | |
| for _, r in (today_eff.iterrows() if not today_eff.empty else []) | |
| } | |
| rank_boost: Dict[str, float] = {} | |
| bo_ranked = sort_movies_by_box_office(box_office_data) | |
| top10 = {mv for mv, _ in bo_ranked[:10]} | |
| top5 = {mv for mv, _ in bo_ranked[:5]} if rule12_enabled else set() | |
| for i, (mv, _) in enumerate(bo_ranked[:10], start=1): | |
| rank_boost[mv] = max(0.6, 1.6 - 0.1 * i) | |
| for m in movies: | |
| mv = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) | |
| if not mv: | |
| continue | |
| eff = eff_map.get(mv) | |
| today_total = int(eff.get("场次", 0)) if eff is not None else 0 | |
| today_golden = int(eff.get("黄金场次", 0)) if eff is not None else 0 | |
| fe = float(eff.get("场次效率", 1.0)) if eff is not None else 1.0 | |
| ge = float(eff.get("黄金效率", 1.0)) if eff is not None else 1.0 | |
| if today_total <= 0: | |
| if mv in top10: | |
| min_total, max_total = 0, 1 | |
| else: | |
| min_total, max_total = 0, 0 | |
| min_golden = 0 | |
| else: | |
| if fe > 1.5: | |
| min_total, max_total = today_total + 1, today_total + 4 | |
| elif fe < 0.5: | |
| min_total, max_total = max(0, today_total - 1), max(today_total, 1) | |
| else: | |
| min_total, max_total = max(0, today_total - 1), today_total + 2 | |
| if ge > 1.5: | |
| min_golden = today_golden + 1 | |
| elif ge < 0.5: | |
| min_golden = max(0, today_golden - 1) | |
| else: | |
| min_golden = max(0, today_golden) | |
| if today_golden == 0 and fe > 1.5: | |
| min_golden = max(1, min_golden) | |
| # 规则十二优先:票房Top5至少给1个黄金场,并保证总场次可容纳 | |
| if mv in top5: | |
| min_golden = max(1, min_golden) | |
| min_total = max(min_total, 1) | |
| max_total = max(max_total, 1) | |
| lt = int(locked_total.get(mv, 0)) | |
| lg = int(locked_golden.get(mv, 0)) | |
| min_total = max(min_total, lt) | |
| min_golden = max(min_golden, lg) | |
| max_total = max(max_total, min_total) | |
| targets[mv] = { | |
| "min_total": int(min_total), | |
| "max_total": int(max_total), | |
| "min_golden": int(min_golden), | |
| "today_total": int(today_total), | |
| "today_golden": int(today_golden), | |
| "fe": float(fe), | |
| "ge": float(ge), | |
| "base_weight": float(rank_boost.get(mv, 1.0)), | |
| } | |
| return targets | |
| def build_movie_weights( | |
| movies: List[Dict[str, Any]], | |
| movie_targets: Dict[str, Dict[str, Any]], | |
| box_office_data: List[Dict[str, Any]], | |
| ) -> Dict[str, float]: | |
| weights: Dict[str, float] = {} | |
| rank_map: Dict[str, int] = {} | |
| bo_ranked = sort_movies_by_box_office(box_office_data) | |
| for i, (mv, _) in enumerate(bo_ranked[:20], start=1): | |
| rank_map[mv] = i | |
| for m in movies: | |
| mv = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) | |
| if not mv: | |
| continue | |
| w = 1.0 | |
| rank = rank_map.get(mv) | |
| if rank is not None: | |
| w *= max(0.7, 1.8 - 0.08 * rank) | |
| target = movie_targets.get(mv, {}) | |
| fe = float(target.get("fe", 1.0) or 1.0) | |
| ge = float(target.get("ge", 1.0) or 1.0) | |
| if fe > 1.5: | |
| w *= 1.2 | |
| elif fe < 0.5: | |
| w *= 0.85 | |
| if ge > 1.5: | |
| w *= 1.1 | |
| elif ge < 0.5: | |
| w *= 0.92 | |
| weights[mv] = max(0.1, w) | |
| return weights | |
| def can_place( | |
| session: Dict[str, Any], | |
| hall_sessions: List[Dict[str, Any]], | |
| all_sessions: List[Dict[str, Any]], | |
| turn_min: int, | |
| turn_max: int, | |
| hall_key: str, | |
| ctx: RuleContext, | |
| ) -> bool: | |
| st_dt = session["startTime"] | |
| et_dt = session["endTime"] | |
| if et_dt <= st_dt: | |
| return False | |
| blockouts = ctx.blockouts_by_hall.get(hall_key, []) | |
| for b_st, b_et in blockouts: | |
| if interval_overlaps(st_dt, et_dt, b_st, b_et): | |
| return False | |
| same_hall = sorted(hall_sessions, key=lambda x: x["startTime"]) | |
| for s in same_hall: | |
| if interval_overlaps(st_dt, et_dt, s["startTime"], s["endTime"]): | |
| return False | |
| prev_session: Optional[Dict[str, Any]] = None | |
| next_session: Optional[Dict[str, Any]] = None | |
| for s in same_hall: | |
| if s["endTime"] <= st_dt: | |
| prev_session = s | |
| elif s["startTime"] >= et_dt: | |
| next_session = s | |
| break | |
| if prev_session is not None: | |
| gap = (st_dt - prev_session["endTime"]).total_seconds() / 60 | |
| if gap < turn_min: | |
| return False | |
| if gap > turn_max and not gap_intersects_blockout(hall_key, prev_session["endTime"], st_dt, ctx.blockouts_by_hall): | |
| return False | |
| if next_session is not None: | |
| gap = (next_session["startTime"] - et_dt).total_seconds() / 60 | |
| if gap < turn_min: | |
| return False | |
| if gap > turn_max and not gap_intersects_blockout(hall_key, et_dt, next_session["startTime"], ctx.blockouts_by_hall): | |
| return False | |
| if ctx.params["rule1_enabled"]: | |
| identity = movie_identity_key(session.get("movieNum"), session.get("movieName")) | |
| for s in all_sessions: | |
| if movie_identity_key(s.get("movieNum"), s.get("movieName")) != identity: | |
| continue | |
| gap = abs((s["startTime"] - st_dt).total_seconds()) / 60 | |
| if gap < int(ctx.params["rule1_gap"]): | |
| return False | |
| # 点映时段限制(previewShowTime) | |
| identity = movie_policy_key(session.get("movieName", ""), session.get("movieMediaType", "")) | |
| if identity in ctx.preview_windows_by_identity: | |
| allowed_windows = ctx.preview_windows_by_identity.get(identity, []) | |
| if not allowed_windows: | |
| return False | |
| if not any(w_st <= st_dt <= w_et for w_st, w_et in allowed_windows): | |
| return False | |
| return True | |
| def construct_weight( | |
| movie: Dict[str, Any], | |
| start_dt: datetime, | |
| in_tms: bool, | |
| total_counter: Counter, | |
| golden_counter: Counter, | |
| ctx: RuleContext, | |
| ) -> float: | |
| mv = movie_policy_key(movie.get("movieName", ""), movie.get("movieMediaType", "")) | |
| target = ctx.movie_targets.get(mv, {"min_total": 1, "max_total": 6, "min_golden": 0}) | |
| mc = ctx.manual_constraints.get(mv, {}) | |
| cur_total = int(total_counter.get(mv, 0)) | |
| cur_golden = int(golden_counter.get(mv, 0)) | |
| deficit_total = max(0, int(target.get("min_total", 0)) - cur_total) | |
| deficit_golden = max(0, int(target.get("min_golden", 0)) - cur_golden) | |
| over_total = max(0, cur_total - int(target.get("max_total", cur_total + 10))) | |
| is_golden = ctx.golden_start_dt <= start_dt <= ctx.golden_end_dt | |
| w = float(ctx.movie_weights.get(mv, target.get("base_weight", 1.0))) | |
| w *= 1.0 + deficit_total * 0.7 | |
| fixed_sessions = mc.get("fixed_sessions") | |
| min_sessions = mc.get("min_sessions") | |
| max_sessions = mc.get("max_sessions") | |
| min_golden_sessions = mc.get("min_golden_sessions") | |
| max_golden_sessions = mc.get("max_golden_sessions") | |
| if fixed_sessions is not None: | |
| if cur_total < int(fixed_sessions): | |
| w *= 1.6 | |
| else: | |
| w *= 0.12 | |
| else: | |
| if min_sessions is not None and cur_total < int(min_sessions): | |
| w *= 1.3 + max(0, int(min_sessions) - cur_total) * 0.2 | |
| if max_sessions is not None and cur_total >= int(max_sessions): | |
| w *= 0.1 | |
| if is_golden: | |
| w *= 1.05 + deficit_golden * 0.65 | |
| if min_golden_sessions is not None and cur_golden < int(min_golden_sessions): | |
| w *= 1.25 | |
| if max_golden_sessions is not None and cur_golden >= int(max_golden_sessions): | |
| w *= 0.2 | |
| elif deficit_golden > 0: | |
| w *= 0.85 | |
| if over_total > 0: | |
| w *= max(0.2, 0.8 - over_total * 0.15) | |
| if in_tms: | |
| w *= 1.06 | |
| else: | |
| w *= 0.75 | |
| w *= random.uniform(0.90, 1.15) | |
| return max(0.01, w) | |
| def simulate_one_candidate( | |
| movies: List[Dict[str, Any]], | |
| hall_name_map: Dict[Any, str], | |
| locked_sessions: List[Dict[str, Any]], | |
| ctx: RuleContext, | |
| fail_reason_out: Optional[List[str]] = None, | |
| ) -> Optional[List[Dict[str, Any]]]: | |
| turn_base = int(ctx.params["turnaround_base"]) | |
| turn_min = max(1, turn_base - 3) | |
| turn_max = max(turn_min, turn_base + 5) | |
| schedule = [dict(s) for s in locked_sessions] | |
| by_hall: Dict[str, List[Dict[str, Any]]] = defaultdict(list) | |
| for s in schedule: | |
| by_hall[str(s["hallId"])].append(s) | |
| total_counter = Counter(movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) for s in schedule) | |
| golden_counter = Counter( | |
| movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) | |
| for s in schedule | |
| if ctx.golden_start_dt <= s["startTime"] <= ctx.golden_end_dt | |
| ) | |
| forbidden_set = {extract_hall_no(h) for h in ctx.params["rule13_forbidden_halls"]} | |
| missing_tms_pairs: Set[Tuple[str, str, str]] = set() | |
| for s in schedule: | |
| hall_key = extract_hall_no(s.get("hallId") or s.get("hallName")) | |
| if not session_in_tms(s, hall_key, ctx.tms_by_hall): | |
| missing_tms_pairs.add(tms_missing_pair_key(s)) | |
| if len(missing_tms_pairs) > int(ctx.params["tms_allowance"]): | |
| if fail_reason_out is not None: | |
| fail_reason_out.append( | |
| f"构造前失败:已售锁定场次导致TMS缺片去重 {len(missing_tms_pairs)} 超过允许值 {int(ctx.params['tms_allowance'])}" | |
| ) | |
| return None | |
| min_duration = min( | |
| [int(m.get("movieDuration") or 9999) for m in movies if int(m.get("movieDuration") or 0) > 0] or [90] | |
| ) | |
| hall_items = list(hall_name_map.items()) | |
| random.shuffle(hall_items) | |
| density_window = int(ctx.params.get("rule2_window_minutes", 30)) | |
| density_threshold = int(ctx.params.get("rule2_threshold", 4)) | |
| spread_step = max(5, min(20, int(density_window / max(2, density_threshold + 1)))) | |
| for hall_idx, (hall_id, hall_name) in enumerate(hall_items): | |
| hall_key = str(hall_id) | |
| hall_no = extract_hall_no(hall_name or hall_id) | |
| hall_sessions = by_hall.get(hall_key, []) | |
| blockouts = ctx.blockouts_by_hall.get(hall_key, []) | |
| # 各厅首场按步长错峰启动,避免 10:00~10:20 集中扎堆 | |
| base_offset = hall_idx * spread_step | |
| jitter = random.choice([0, 5, 10]) | |
| cursor = ceil_datetime_to_step(ctx.business_start_dt + timedelta(minutes=base_offset + jitter), 5) | |
| attempts = 0 | |
| while cursor < ctx.business_end_dt and attempts < 1000: | |
| attempts += 1 | |
| cursor = ceil_datetime_to_step(cursor, 5) | |
| occupied = sorted( | |
| hall_sessions + [{"startTime": b[0], "endTime": b[1], "is_block": True} for b in blockouts], | |
| key=lambda x: x["startTime"], | |
| ) | |
| next_anchor = None | |
| moved = False | |
| for item in occupied: | |
| if item["endTime"] <= cursor: | |
| continue | |
| if item["startTime"] <= cursor < item["endTime"]: | |
| cursor = item["endTime"] | |
| moved = True | |
| break | |
| if item["startTime"] > cursor: | |
| next_anchor = item | |
| break | |
| if moved: | |
| continue | |
| if cursor >= ctx.business_end_dt: | |
| break | |
| gap_end = next_anchor["startTime"] if next_anchor else ctx.business_end_dt | |
| if (gap_end - cursor).total_seconds() / 60 < min_duration: | |
| cursor += timedelta(minutes=5) | |
| continue | |
| candidates: List[Tuple[Dict[str, Any], float, bool]] = [] | |
| offsets = [0, 5, 10, 15, 20, 25, 30] | |
| random.shuffle(offsets) | |
| # 所有算法生成场次的开场时间统一按 5 分钟粒度对齐 | |
| for movie in movies: | |
| mv_policy = movie_policy_key(movie.get("movieName", ""), movie.get("movieMediaType", "")) | |
| if ctx.allowed_movies and mv_policy not in ctx.allowed_movies: | |
| continue | |
| dur = int(movie.get("movieDuration") or 0) | |
| if dur <= 0: | |
| continue | |
| media = movie.get("movieMediaType", "") | |
| if ctx.params["rule13_enabled"] and hall_no in forbidden_set and is_3d_by_movie_num_or_media(movie.get("movieNum"), media): | |
| continue | |
| for off in offsets: | |
| st_dt = cursor + timedelta(minutes=off) | |
| et_dt = st_dt + timedelta(minutes=dur) | |
| if et_dt > gap_end or et_dt > ctx.business_end_dt: | |
| continue | |
| cand = { | |
| "hallId": hall_id, | |
| "hallName": hall_name, | |
| "movieId": movie.get("movieId"), | |
| "movieNum": movie.get("movieNum"), | |
| "movieName": movie.get("movieName", "未知影片"), | |
| "movieDuration": dur, | |
| "movieMediaType": movie.get("movieMediaType", ""), | |
| "startTime": st_dt, | |
| "endTime": et_dt, | |
| "is_presold": False, | |
| "sold": 0, | |
| } | |
| if not can_place( | |
| session=cand, | |
| hall_sessions=hall_sessions, | |
| all_sessions=schedule, | |
| turn_min=turn_min, | |
| turn_max=turn_max, | |
| hall_key=hall_key, | |
| ctx=ctx, | |
| ): | |
| continue | |
| in_tms = session_in_tms(cand, hall_no or hall_key, ctx.tms_by_hall) | |
| if not in_tms: | |
| cand_key = tms_missing_pair_key(cand) | |
| if cand_key not in missing_tms_pairs and len(missing_tms_pairs) >= int(ctx.params["tms_allowance"]): | |
| continue | |
| w = construct_weight(cand, st_dt, in_tms, total_counter, golden_counter, ctx) | |
| # 全局开场密度抑制:优先抑制“前30分钟内已过密”的候选 | |
| existing_in_window = int( | |
| sum( | |
| 1 | |
| for s in schedule | |
| if 0 <= (st_dt - s["startTime"]).total_seconds() / 60 < density_window | |
| ) | |
| ) | |
| if existing_in_window >= density_threshold and ctx.params.get("rule2_enabled", True): | |
| continue | |
| if existing_in_window > 0: | |
| w *= max(0.30, 1.0 - 0.10 * existing_in_window) | |
| if existing_in_window >= max(0, density_threshold - 1): | |
| overflow = existing_in_window - density_threshold + 1 | |
| w *= max(0.05, 1.0 - 0.22 * overflow) | |
| candidates.append((cand, w, in_tms)) | |
| if not candidates: | |
| cursor += timedelta(minutes=5) | |
| continue | |
| chosen, _, in_tms = random.choices( | |
| population=[c[0] for c in candidates], | |
| weights=[c[1] for c in candidates], | |
| k=1, | |
| )[0], None, None | |
| for c in candidates: | |
| if c[0] is chosen: | |
| in_tms = c[2] | |
| break | |
| schedule.append(chosen) | |
| hall_sessions.append(chosen) | |
| by_hall[hall_key] = hall_sessions | |
| mv_clean = movie_policy_key(chosen.get("movieName", ""), chosen.get("movieMediaType", "")) | |
| total_counter[mv_clean] += 1 | |
| if ctx.golden_start_dt <= chosen["startTime"] <= ctx.golden_end_dt: | |
| golden_counter[mv_clean] += 1 | |
| if in_tms is False: | |
| missing_tms_pairs.add(tms_missing_pair_key(chosen)) | |
| cursor = ceil_datetime_to_step(chosen["endTime"] + timedelta(minutes=turn_min), 5) | |
| return schedule | |
| def validate_manual_movie_constraints( | |
| schedule: List[Dict[str, Any]], | |
| constraints: Dict[str, Dict[str, Optional[float]]], | |
| ctx: RuleContext, | |
| locked_sessions: Optional[List[Dict[str, Any]]] = None, | |
| ) -> List[str]: | |
| if not constraints: | |
| return [] | |
| df = pd.DataFrame(schedule).copy() | |
| if df.empty: | |
| return [] | |
| df["movieClean"] = df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) | |
| total_sessions = len(df) | |
| violations: List[str] = [] | |
| locked_total: Counter = Counter() | |
| locked_golden: Counter = Counter() | |
| if locked_sessions: | |
| locked_df = pd.DataFrame(locked_sessions).copy() | |
| if not locked_df.empty: | |
| locked_df["movieClean"] = locked_df.apply( | |
| lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1 | |
| ) | |
| locked_total = Counter(locked_df["movieClean"].tolist()) | |
| locked_golden = Counter( | |
| locked_df[ | |
| (locked_df["startTime"] >= ctx.golden_start_dt) & (locked_df["startTime"] <= ctx.golden_end_dt) | |
| ]["movieClean"].tolist() | |
| ) | |
| for mv, c in constraints.items(): | |
| sub = df[df["movieClean"] == mv] | |
| total = int(len(sub)) | |
| golden = int( | |
| ((sub["startTime"] >= ctx.golden_start_dt) & (sub["startTime"] <= ctx.golden_end_dt)).sum() | |
| ) | |
| share_pct = (total / total_sessions * 100.0) if total_sessions > 0 else 0.0 | |
| golden_ratio_pct = (golden / total * 100.0) if total > 0 else 0.0 | |
| fixed_sessions = c.get("fixed_sessions") | |
| min_sessions = c.get("min_sessions") | |
| max_sessions = c.get("max_sessions") | |
| min_share_pct = c.get("min_share_pct") | |
| max_share_pct = c.get("max_share_pct") | |
| min_golden_sessions = c.get("min_golden_sessions") | |
| max_golden_sessions = c.get("max_golden_sessions") | |
| min_golden_ratio_pct = c.get("min_golden_ratio_pct") | |
| max_golden_ratio_pct = c.get("max_golden_ratio_pct") | |
| locked_total_mv = int(locked_total.get(mv, 0)) | |
| locked_golden_mv = int(locked_golden.get(mv, 0)) | |
| if max_sessions is not None: | |
| max_sessions = max(float(max_sessions), float(locked_total_mv)) | |
| if max_golden_sessions is not None: | |
| max_golden_sessions = max(float(max_golden_sessions), float(locked_golden_mv)) | |
| if fixed_sessions is not None and locked_total_mv > int(fixed_sessions): | |
| # 预售锁定优先:固定场次不可低于已售锁定 | |
| fixed_sessions = float(locked_total_mv) | |
| if fixed_sessions is not None and total != int(fixed_sessions): | |
| violations.append(f"《{mv}》固定场次要求 {int(fixed_sessions)},当前 {total}") | |
| continue | |
| if min_sessions is not None and total < int(min_sessions): | |
| violations.append(f"《{mv}》次日场次 {total} 低于最少场次 {int(min_sessions)}") | |
| if min_share_pct is not None and share_pct < float(min_share_pct): | |
| violations.append(f"《{mv}》排片占比 {share_pct:.1f}% 低于 {float(min_share_pct):.1f}%") | |
| if max_share_pct is not None and share_pct > float(max_share_pct): | |
| violations.append(f"《{mv}》排片占比 {share_pct:.1f}% 高于 {float(max_share_pct):.1f}%") | |
| if min_golden_sessions is not None and golden < int(min_golden_sessions): | |
| violations.append(f"《{mv}》次日黄金场次 {golden} 低于 {int(min_golden_sessions)}") | |
| if min_golden_ratio_pct is not None and total > 0 and golden_ratio_pct < float(min_golden_ratio_pct): | |
| violations.append(f"《{mv}》黄金占比 {golden_ratio_pct:.1f}% 低于 {float(min_golden_ratio_pct):.1f}%") | |
| if max_golden_ratio_pct is not None and total > 0 and golden_ratio_pct > float(max_golden_ratio_pct): | |
| violations.append(f"《{mv}》黄金占比 {golden_ratio_pct:.1f}% 高于 {float(max_golden_ratio_pct):.1f}%") | |
| return violations | |
| def validate_hard_rules( | |
| schedule: List[Dict[str, Any]], | |
| locked_sessions: List[Dict[str, Any]], | |
| ctx: RuleContext, | |
| ) -> List[str]: | |
| if not schedule: | |
| return ["方案为空"] | |
| p = ctx.params | |
| turn_base = int(p["turnaround_base"]) | |
| turn_min = max(1, turn_base - 3) | |
| turn_max = max(turn_min, turn_base + 5) | |
| violations: List[str] = [] | |
| for s in schedule: | |
| st_dt = s["startTime"] | |
| et_dt = s["endTime"] | |
| if et_dt <= st_dt: | |
| violations.append("存在结束时间早于开始时间的场次") | |
| break | |
| if st_dt < ctx.business_start_dt or et_dt > ctx.business_end_dt: | |
| violations.append("存在场次超出营业时间") | |
| break | |
| by_hall: Dict[str, List[Dict[str, Any]]] = defaultdict(list) | |
| for s in schedule: | |
| by_hall[str(s.get("hallId"))].append(s) | |
| for hall_key, sessions in by_hall.items(): | |
| sessions = sorted(sessions, key=lambda x: x["startTime"]) | |
| for i in range(1, len(sessions)): | |
| a = sessions[i - 1] | |
| b = sessions[i] | |
| if interval_overlaps(a["startTime"], a["endTime"], b["startTime"], b["endTime"]): | |
| violations.append(f"影厅{hall_key}存在场次重叠") | |
| break | |
| gap = (b["startTime"] - a["endTime"]).total_seconds() / 60 | |
| if gap < turn_min: | |
| violations.append(f"影厅{hall_key}存在小于{turn_min}分钟的转换间隔") | |
| break | |
| if gap > turn_max and not gap_intersects_blockout(hall_key, a["endTime"], b["startTime"], ctx.blockouts_by_hall): | |
| violations.append(f"影厅{hall_key}存在大于{turn_max}分钟的转换间隔") | |
| break | |
| if p["rule1_enabled"]: | |
| movie_slots: Dict[str, List[datetime]] = defaultdict(list) | |
| for s in schedule: | |
| identity = movie_identity_key(s.get("movieNum"), s.get("movieName")) | |
| movie_slots[identity].append(s["startTime"]) | |
| for identity, starts in movie_slots.items(): | |
| starts = sorted(starts) | |
| for i in range(1, len(starts)): | |
| gap = (starts[i] - starts[i - 1]).total_seconds() / 60 | |
| if gap < int(p["rule1_gap"]): | |
| violations.append(f"同影片开场间隔小于{int(p['rule1_gap'])}分钟({identity})") | |
| break | |
| if p["rule4_enabled"]: | |
| earliest = min(s["startTime"] for s in schedule).time() | |
| latest = max(s["startTime"] for s in schedule).time() | |
| if earliest > parse_hm(p["rule4_earliest"], "10:00"): | |
| violations.append("最早一场晚于规则四阈值") | |
| if latest < parse_hm(p["rule4_latest"], "22:30"): | |
| violations.append("最晚一场早于规则四阈值") | |
| if p["rule13_enabled"]: | |
| forbidden_set = {extract_hall_no(h) for h in p["rule13_forbidden_halls"]} | |
| for s in schedule: | |
| hall_no = extract_hall_no(s.get("hallName") or s.get("hallId")) | |
| if hall_no in forbidden_set and is_3d_by_movie_num_or_media(s.get("movieNum"), s.get("movieMediaType", "")): | |
| violations.append(f"规则十三违规:{hall_no}号厅出现3D") | |
| break | |
| if ctx.tms_by_hall: | |
| missing_pairs: Set[Tuple[str, str, str]] = set() | |
| for s in schedule: | |
| hall_no = extract_hall_no(s.get("hallName") or s.get("hallId")) | |
| if not session_in_tms(s, hall_no, ctx.tms_by_hall): | |
| missing_pairs.add(tms_missing_pair_key(s)) | |
| if len(missing_pairs) > int(p["tms_allowance"]): | |
| violations.append(f"TMS 缺片场次(同片同厅去重) {len(missing_pairs)},超过允许值 {int(p['tms_allowance'])}") | |
| locked_keys = { | |
| ( | |
| str(s.get("hallId")), | |
| movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")), | |
| s.get("startTime"), | |
| s.get("endTime"), | |
| ) | |
| for s in locked_sessions | |
| } | |
| cand_keys = { | |
| ( | |
| str(s.get("hallId")), | |
| movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")), | |
| s.get("startTime"), | |
| s.get("endTime"), | |
| ) | |
| for s in schedule | |
| if s.get("is_presold") | |
| } | |
| if not locked_keys.issubset(cand_keys): | |
| violations.append("存在已售锁定场次被改动") | |
| manual_violations = validate_manual_movie_constraints(schedule, ctx.manual_constraints, ctx, locked_sessions) | |
| if manual_violations: | |
| violations.extend(manual_violations[:20]) | |
| return violations | |
| def normalize_reject_reason(msg: str) -> str: | |
| text = str(msg or "") | |
| if not text: | |
| return "其他淘汰原因" | |
| if "构造失败" in text: | |
| return "构造阶段失败" | |
| if "存在场次重叠" in text: | |
| return "硬规则:影厅场次重叠" | |
| if "转换间隔" in text: | |
| return "硬规则:影厅场次转换间隔不符" | |
| if "同影片开场间隔" in text: | |
| return "硬规则:规则一同影片间隔不足" | |
| if "最早一场晚于" in text: | |
| return "硬规则:规则四最早场过晚" | |
| if "最晚一场早于" in text: | |
| return "硬规则:规则四最晚场过早" | |
| if "规则十三违规" in text: | |
| return "硬规则:规则十三禁3D违规" | |
| if "TMS 缺片场次" in text: | |
| return "硬规则:TMS缺片超限" | |
| if "已售锁定场次被改动" in text: | |
| return "硬规则:预售锁定场次被改动" | |
| if "固定场次要求" in text: | |
| return "微调约束:固定场次不满足" | |
| if "低于最少场次" in text: | |
| return "微调约束:低于最少场次" | |
| if "高于最多场次" in text: | |
| return "微调约束:高于最多场次" | |
| if "排片占比" in text and "低于" in text: | |
| return "微调约束:低于最低场次占比" | |
| if "排片占比" in text and "高于" in text: | |
| return "微调约束:高于最高场次占比" | |
| if "黄金场次" in text and "低于" in text: | |
| return "微调约束:低于最少黄金场次" | |
| if "黄金场次" in text and "高于" in text: | |
| return "微调约束:高于最多黄金场次" | |
| if "黄金占比" in text and "低于" in text: | |
| return "微调约束:低于最低黄金占比" | |
| if "黄金占比" in text and "高于" in text: | |
| return "微调约束:高于最高黄金占比" | |
| if "超出营业时间" in text: | |
| return "硬规则:场次超出营业时间" | |
| if "结束时间早于开始时间" in text: | |
| return "硬规则:结束时间早于开始时间" | |
| if "方案为空" in text: | |
| return "硬规则:空方案" | |
| return "其他淘汰原因" | |
| def score_efficiency_rules( | |
| sched_df: pd.DataFrame, | |
| today_eff: pd.DataFrame, | |
| locked_sessions: List[Dict[str, Any]], | |
| ctx: RuleContext, | |
| ) -> Tuple[float, str]: | |
| if today_eff.empty: | |
| return 0.0, "无今日效率数据" | |
| bonus = 0.0 | |
| reason_parts: List[str] = [] | |
| golden_mask = (sched_df["startTime"] >= ctx.golden_start_dt) & (sched_df["startTime"] <= ctx.golden_end_dt) | |
| sim_total = sched_df.groupby("movieClean").size().to_dict() | |
| sim_golden = sched_df[golden_mask].groupby("movieClean").size().to_dict() | |
| if locked_sessions: | |
| locked_df = pd.DataFrame(locked_sessions) | |
| locked_df["movieClean"] = locked_df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) | |
| locked_total = locked_df.groupby("movieClean").size().to_dict() | |
| else: | |
| locked_total = {} | |
| for _, row in today_eff.iterrows(): | |
| mv = movie_policy_key(row["影片"]) | |
| today_total = int(row.get("场次", 0)) | |
| today_golden = int(row.get("黄金场次", 0) or 0) | |
| fe = float(row.get("场次效率", 0) or 0) | |
| ge = float(row.get("黄金效率", 0) or 0) | |
| t_total = int(sim_total.get(mv, 0)) | |
| t_golden = int(sim_golden.get(mv, 0)) | |
| locked_cnt = int(locked_total.get(mv, 0)) | |
| if t_total < locked_cnt: | |
| t_total = locked_cnt | |
| if today_total == 1: | |
| if today_golden == 0: | |
| if fe > 1.5: | |
| bonus += 30 if (t_total >= 2 and t_golden >= 1) else -35 | |
| elif fe < 0.5: | |
| bonus += 8 if t_total <= 1 else -8 | |
| else: | |
| bonus += 4 | |
| else: | |
| if ge > 1.5: | |
| bonus += 30 if (t_total >= 2 and t_golden >= 2) else -35 | |
| elif ge < 0.5: | |
| if locked_cnt >= today_total: | |
| reason_parts.append(f"{mv}: 锁定场次不可减黄金场,跳过扣分") | |
| else: | |
| bonus += 12 if t_golden <= 0 else -16 | |
| else: | |
| bonus += 5 | |
| else: | |
| if today_golden == 0: | |
| if fe > 1.5: | |
| bonus += 22 if (t_total >= today_total + 1 and t_golden >= 1) else -24 | |
| elif fe < 0.5: | |
| if locked_cnt >= today_total: | |
| reason_parts.append(f"{mv}: 锁定场次不可降总量,跳过扣分") | |
| else: | |
| bonus += 16 if t_total <= max(0, today_total - 1) else -18 | |
| else: | |
| bonus += 4 | |
| else: | |
| if fe > 1.5 and ge > 1.5: | |
| bonus += 24 if (t_total >= today_total + 1 and t_golden >= today_golden + 1) else -25 | |
| elif fe > 1.5 and 0.5 <= ge <= 1.5: | |
| bonus += 18 if t_total >= today_total + 1 else -16 | |
| elif fe > 1.5 and ge < 0.5: | |
| if locked_cnt >= today_golden: | |
| reason_parts.append(f"{mv}: 黄金低效但锁定场次不可减,跳过扣分") | |
| else: | |
| bonus += 12 if (t_total >= today_total + 1 and t_golden <= max(0, today_golden - 1)) else -20 | |
| elif 0.5 <= fe <= 1.5 and ge > 1.5: | |
| bonus += 14 if t_golden >= today_golden + 1 else -12 | |
| elif 0.5 <= fe <= 1.5 and ge < 0.5: | |
| if locked_cnt >= today_total: | |
| reason_parts.append(f"{mv}: 锁定场次不可减,跳过扣分") | |
| else: | |
| bonus += 10 if (t_total <= today_total - 1 and t_golden <= max(0, today_golden - 1)) else -14 | |
| elif fe < 0.5 and ge > 1.5: | |
| if locked_cnt >= today_total: | |
| reason_parts.append(f"{mv}: 锁定场次不可降总场,跳过扣分") | |
| else: | |
| bonus += 9 if (t_total <= max(1, today_total - 1) and t_golden >= today_golden + 1) else -12 | |
| elif fe < 0.5 and 0.5 <= ge <= 1.5: | |
| if locked_cnt >= today_total: | |
| reason_parts.append(f"{mv}: 锁定场次不可降总场,跳过扣分") | |
| else: | |
| bonus += 8 if t_total <= max(1, today_total - 1) else -10 | |
| elif fe < 0.5 and ge < 0.5: | |
| if locked_cnt >= today_total: | |
| reason_parts.append(f"{mv}: 锁定场次不可降总场,跳过扣分") | |
| else: | |
| bonus += 12 if (t_total <= max(1, today_total - 1) and t_golden <= max(0, today_golden - 1)) else -15 | |
| return bonus, ";".join(reason_parts[:8]) | |
| def score_rule2_density(df: pd.DataFrame, ctx: RuleContext) -> Tuple[float, str]: | |
| p = ctx.params | |
| if not p["rule2_enabled"]: | |
| return 0.0, "未启用" | |
| deduct = 0.0 | |
| starts = sorted(df["startTime"].tolist()) | |
| exempt_ranges = parse_exempt_ranges(p["rule2_exempt_ranges"]) | |
| for st_dt in starts: | |
| we = st_dt + timedelta(minutes=int(p["rule2_window_minutes"])) | |
| cnt = int(((df["startTime"] >= st_dt) & (df["startTime"] < we)).sum()) | |
| overflow = cnt - int(p["rule2_threshold"]) | |
| if overflow > 0 and not in_any_exempt(st_dt, exempt_ranges): | |
| deduct += overflow * float(p["rule2_penalty"]) | |
| return -deduct, f"过密窗口扣分 {deduct:.1f}" | |
| def score_rule3_gap(df: pd.DataFrame, ctx: RuleContext) -> Tuple[float, str]: | |
| p = ctx.params | |
| if not p["rule3_enabled"]: | |
| return 0.0, "未启用" | |
| deduct = 0.0 | |
| starts = sorted(df["startTime"].tolist()) | |
| if len(starts) <= 1: | |
| return 0.0, "场次不足" | |
| for i in range(len(starts) - 1): | |
| gap = (starts[i + 1] - starts[i]).total_seconds() / 60 | |
| if gap > int(p["rule3_gap_minutes"]): | |
| if gap_intersects_any_blockout(starts[i], starts[i + 1], ctx.blockouts_by_hall): | |
| continue | |
| overflow = max(1.0, gap - int(p["rule3_gap_minutes"])) | |
| deduct += (overflow / 10.0) * float(p["rule3_penalty"]) | |
| return -deduct, f"全局开场断档扣分 {deduct:.1f}" | |
| def score_rule9_hot_density(df: pd.DataFrame, ctx: RuleContext, box_office_data: List[Dict[str, Any]]) -> Tuple[float, str]: | |
| p = ctx.params | |
| if not p["rule9_enabled"]: | |
| return 0.0, "未启用" | |
| windows = rule9_core_windows(ctx.target_date) | |
| golden_df = df[df["startTime"].dt.time.apply(lambda t: time_in_ranges(t, windows))] | |
| if golden_df.empty: | |
| return -float(p["rule9_penalty"]), "核心黄金窗口无场次" | |
| hot_movies, source, _ = resolve_hot_movies(df, box_office_data, int(p["rule9_hot_top_n"])) | |
| if not hot_movies: | |
| return -float(p["rule9_penalty"]), "无热门片可评估" | |
| total = len(golden_df) | |
| miss = 0 | |
| for mv in hot_movies: | |
| ratio = float((golden_df["movieClean"] == mv).sum()) / total | |
| if ratio < float(p["rule9_min_ratio"]): | |
| miss += 1 | |
| deduct = miss * float(p["rule9_penalty"]) | |
| return -deduct, f"热门片来源:{source},密度不足 {miss} 部" | |
| def score_rule11_late_hot(df: pd.DataFrame, ctx: RuleContext, box_office_data: List[Dict[str, Any]]) -> Tuple[float, str]: | |
| p = ctx.params | |
| if not p["rule11_enabled"]: | |
| return 0.0, "未启用" | |
| hot_movies, source, bo_ranked = resolve_hot_movies(df, box_office_data, int(p["rule9_hot_top_n"])) | |
| top_movies = hot_movies[:3] if hot_movies else [] | |
| if bo_ranked and source == "全国大盘票房": | |
| top_movies = [m for m, _ in bo_ranked[:3]] | |
| hot_movies = top_movies | |
| if not hot_movies: | |
| return 0.0, "无热门片" | |
| after_t = parse_hm(p["rule11_after_time"], "22:00") | |
| late_df = df[df["startTime"].dt.time.apply(lambda t: t >= after_t or t < dt_time(6, 0))] | |
| if late_df.empty: | |
| return -float(p["rule11_penalty"]), "22:00后无场次" | |
| late_movies = set(late_df["movieClean"]) | |
| if any(m in late_movies for m in hot_movies): | |
| return 0.0, f"热门片来源:{source},符合" | |
| return -float(p["rule11_penalty"]), f"热门片来源:{source},22:00后无热门片" | |
| def score_rule12_top5_golden(df: pd.DataFrame, ctx: RuleContext, box_office_data: List[Dict[str, Any]]) -> Tuple[float, str]: | |
| p = ctx.params | |
| if not p["rule12_enabled"]: | |
| return 0.0, "未启用" | |
| bo_ranked = sort_movies_by_box_office(box_office_data) | |
| if not bo_ranked: | |
| return 0.0, "未获取到次日票房数据" | |
| top5 = [m for m, _ in bo_ranked[:5]] | |
| golden_movies = set(df[df["startTime"].dt.time.apply(lambda t: dt_time(14, 0) <= t <= dt_time(21, 0))]["movieClean"]) | |
| miss = [m for m in top5 if m and m not in golden_movies] | |
| deduct = len(miss) * float(p["rule12_penalty_each"]) | |
| return -deduct, f"缺黄金场影片 {len(miss)}" | |
| def score_manual_upper_constraints( | |
| schedule: List[Dict[str, Any]], | |
| constraints: Dict[str, Dict[str, Optional[float]]], | |
| locked_sessions: List[Dict[str, Any]], | |
| ) -> Tuple[float, str]: | |
| """ | |
| 将“最多场次 / 最多黄金场次”作为软扣分项,不再作为硬淘汰项。 | |
| 预售锁定优先:若已售锁定本身超过上限,则不对该部分扣分。 | |
| """ | |
| if not constraints or not schedule: | |
| return 0.0, "无" | |
| df = pd.DataFrame(schedule).copy() | |
| if df.empty: | |
| return 0.0, "无" | |
| df["movieClean"] = df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) | |
| totals = Counter(df["movieClean"].tolist()) | |
| golden = Counter( | |
| df[df["startTime"].dt.time.apply(lambda t: dt_time(14, 0) <= t <= dt_time(21, 0))]["movieClean"].tolist() | |
| ) | |
| locked_total: Counter = Counter() | |
| locked_golden: Counter = Counter() | |
| if locked_sessions: | |
| ldf = pd.DataFrame(locked_sessions).copy() | |
| if not ldf.empty: | |
| ldf["movieClean"] = ldf.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) | |
| locked_total = Counter(ldf["movieClean"].tolist()) | |
| locked_golden = Counter( | |
| ldf[ldf["startTime"].dt.time.apply(lambda t: dt_time(14, 0) <= t <= dt_time(21, 0))]["movieClean"].tolist() | |
| ) | |
| # 扣分力度:每超1场分别扣 8 / 10 分 | |
| penalty = 0.0 | |
| lines: List[str] = [] | |
| for mv, c in constraints.items(): | |
| cur_total = int(totals.get(mv, 0)) | |
| cur_golden = int(golden.get(mv, 0)) | |
| max_total = c.get("max_sessions") | |
| max_golden = c.get("max_golden_sessions") | |
| if max_total is not None: | |
| eff_max_total = max(int(max_total), int(locked_total.get(mv, 0))) | |
| overflow = max(0, cur_total - eff_max_total) | |
| if overflow > 0: | |
| d = overflow * 8.0 | |
| penalty += d | |
| lines.append(f"{mv} 超总场 {overflow} 场(-{d:.0f})") | |
| if max_golden is not None: | |
| eff_max_golden = max(int(max_golden), int(locked_golden.get(mv, 0))) | |
| overflow_g = max(0, cur_golden - eff_max_golden) | |
| if overflow_g > 0: | |
| d = overflow_g * 10.0 | |
| penalty += d | |
| lines.append(f"{mv} 超黄金场 {overflow_g} 场(-{d:.0f})") | |
| return (-penalty, ";".join(lines[:8]) if lines else "满足上限约束") | |
| def score_candidate( | |
| schedule: List[Dict[str, Any]], | |
| ctx: RuleContext, | |
| today_eff: pd.DataFrame, | |
| locked_sessions: List[Dict[str, Any]], | |
| box_office_data: List[Dict[str, Any]], | |
| ) -> CandidateResult: | |
| if not schedule: | |
| return CandidateResult(schedule=[], score=0.0, score_breakdown=[], hard_violations=["空方案"]) | |
| df = pd.DataFrame(schedule).sort_values(["startTime", "hallId"]).copy() | |
| df["movieClean"] = df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) | |
| score = 1000.0 | |
| breakdown: List[Tuple[str, float, str]] = [] | |
| if ctx.params["efficiency_enabled"]: | |
| delta, msg = score_efficiency_rules(df, today_eff, locked_sessions, ctx) | |
| penalty_coef = float(ctx.params.get("efficiency_penalty_coef", 1.0) or 1.0) | |
| if delta < 0: | |
| delta *= max(0.0, penalty_coef) | |
| score += delta | |
| breakdown.append(("效率分析表", delta, msg or "按18种情况评估")) | |
| d2, m2 = score_rule2_density(df, ctx) | |
| score += d2 | |
| breakdown.append(("规则二", d2, m2)) | |
| d3, m3 = score_rule3_gap(df, ctx) | |
| score += d3 | |
| breakdown.append(("规则三", d3, m3)) | |
| d9, m9 = score_rule9_hot_density(df, ctx, box_office_data) | |
| score += d9 | |
| breakdown.append(("规则九", d9, m9)) | |
| d11, m11 = score_rule11_late_hot(df, ctx, box_office_data) | |
| score += d11 | |
| breakdown.append(("规则十一", d11, m11)) | |
| d12, m12 = score_rule12_top5_golden(df, ctx, box_office_data) | |
| score += d12 | |
| breakdown.append(("规则十二", d12, m12)) | |
| d_manual_max, m_manual_max = score_manual_upper_constraints(schedule, ctx.manual_constraints, locked_sessions) | |
| score += d_manual_max | |
| breakdown.append(("微调上限扣分", d_manual_max, m_manual_max)) | |
| return CandidateResult(schedule=schedule, score=score, score_breakdown=breakdown, hard_violations=[]) | |
| def _append_rule_logs(parts: List[str], title: str, logs: List[str]) -> None: | |
| parts.append(title) | |
| if logs: | |
| for i, log in enumerate(logs, 1): | |
| parts.append(f"{i}. {log}") | |
| else: | |
| parts.append("(无)") | |
| def _hall_display(raw: Any, with_ting: bool = True) -> str: | |
| hall_no = extract_hall_no(raw) | |
| if not hall_no: | |
| return str(raw or "") | |
| return f"{hall_no}号厅" if with_ting else f"{hall_no}号" | |
| def generate_schedule_check_logs_text( | |
| schedule: List[Dict[str, Any]], | |
| target_date: date, | |
| params: Dict[str, Any], | |
| today_eff: pd.DataFrame, | |
| box_office_data: List[Dict[str, Any]], | |
| ) -> str: | |
| if not schedule: | |
| return "无排片数据,无法进行合理性检查。" | |
| df = pd.DataFrame(schedule).copy() | |
| if df.empty: | |
| return "无排片数据,无法进行合理性检查。" | |
| df["startTime"] = pd.to_datetime(df["startTime"], errors="coerce") | |
| df["endTime"] = pd.to_datetime(df["endTime"], errors="coerce") | |
| df = df.dropna(subset=["startTime", "endTime"]).sort_values("startTime").reset_index(drop=True) | |
| if df.empty: | |
| return "无有效排片时间数据,无法进行合理性检查。" | |
| df["filmName"] = df["movieName"].astype(str) | |
| df["clean_filmName"] = df["filmName"].apply(clean_movie_title) | |
| df["simpleHallName"] = df["hallName"].apply(lambda x: _hall_display(x, with_ting=True)) | |
| bo_ranked = sort_movies_by_box_office(box_office_data) | |
| bo_sorted_movies = [m for m, _ in bo_ranked] | |
| movie_box_office = {m: float(v) for m, v in bo_ranked} | |
| final_log_parts: List[str] = [] | |
| # Rule 1 | |
| logs_r1: List[str] = [] | |
| gap_limit = int(params.get("rule1_gap", 30)) | |
| movie_num_series = df["movieNum"] if "movieNum" in df.columns else pd.Series([""] * len(df), index=df.index) | |
| df["movieSerial_5_8"] = movie_num_series.apply(extract_movie_serial_5_8) | |
| serial_values = [s for s in df["movieSerial_5_8"].dropna().unique() if str(s).strip()] | |
| for serial in serial_values: | |
| film_schedules = df[df["movieSerial_5_8"] == serial].sort_values("startTime").reset_index(drop=True) | |
| for i in range(len(film_schedules) - 1): | |
| s1, s2 = film_schedules.iloc[i], film_schedules.iloc[i + 1] | |
| interval = (s2["startTime"] - s1["startTime"]).total_seconds() / 60 | |
| if interval < gap_limit: | |
| logs_r1.append( | |
| f"《{s1['filmName']}》{s1['simpleHallName']}【{s1['startTime'].strftime('%H:%M')}】和 " | |
| f"《{s2['filmName']}》{s2['simpleHallName']}【{s2['startTime'].strftime('%H:%M')}】" | |
| f"开场时间距离 {int(interval)} 分钟(年度顺序号:{serial})" | |
| ) | |
| _append_rule_logs( | |
| final_log_parts, | |
| f"规则一:同影片场次间隔过近(按 movieNum 第5~8位年度顺序号,少于 {gap_limit} 分钟)", | |
| logs_r1, | |
| ) | |
| # Rule 2 | |
| logs_r2: List[str] = [] | |
| window_minutes = int(params.get("rule2_window_minutes", 30)) | |
| threshold = int(params.get("rule2_threshold", 4)) | |
| i = 0 | |
| processed_indices_r2 = set() | |
| while i < len(df): | |
| if i in processed_indices_r2: | |
| i += 1 | |
| continue | |
| window_start_time = df.iloc[i]["startTime"] | |
| window_end_time = window_start_time + timedelta(minutes=window_minutes) | |
| window_df = df[(df["startTime"] >= window_start_time) & (df["startTime"] < window_end_time)] | |
| if len(window_df) > threshold: | |
| start_t_str = window_df.iloc[0]["startTime"].strftime("%H:%M") | |
| end_t_str = window_df.iloc[-1]["startTime"].strftime("%H:%M") | |
| lines = [f"【{start_t_str} - {end_t_str}】开场时间比较集中:"] | |
| for _, row in window_df.iterrows(): | |
| lines.append(f" {row['simpleHallName']}《{row['filmName']}》> {row['startTime'].strftime('%H:%M')}") | |
| processed_indices_r2.add(int(row.name)) | |
| logs_r2.append("\n".join(lines)) | |
| i += 1 | |
| _append_rule_logs(final_log_parts, f"\n规则二:{window_minutes} 分钟内影片开场超过 {threshold} 场", logs_r2) | |
| # Rule 3 | |
| logs_r3: List[str] = [] | |
| gap_minutes = int(params.get("rule3_gap_minutes", 30)) | |
| if len(df) > 1: | |
| for i in range(len(df) - 1): | |
| s1_start, s2_start = df.iloc[i]["startTime"], df.iloc[i + 1]["startTime"] | |
| gap = (s2_start - s1_start).total_seconds() / 60 | |
| if gap > gap_minutes: | |
| logs_r3.append(f"【{s1_start.strftime('%H:%M')} ~ {s2_start.strftime('%H:%M')}】缺少影片开场,间隔 {int(gap)} 分钟") | |
| _append_rule_logs(final_log_parts, f"\n规则三:场次开场间隔超过 {gap_minutes} 分钟", logs_r3) | |
| # Rule 4 | |
| logs_r4: List[str] = [] | |
| if not df.empty: | |
| first_sched = df.iloc[0] | |
| last_sched = df.iloc[-1] | |
| earliest_limit = parse_hm(params.get("rule4_earliest", "10:00"), "10:00") | |
| latest_limit = parse_hm(params.get("rule4_latest", "22:30"), "22:30") | |
| if first_sched["startTime"].time() > earliest_limit: | |
| logs_r4.append( | |
| f"最早一场 {first_sched['simpleHallName']}《{first_sched['filmName']}》{first_sched['startTime'].strftime('%H:%M')} 晚于 {earliest_limit.strftime('%H:%M')}" | |
| ) | |
| if last_sched["startTime"].time() < latest_limit: | |
| logs_r4.append( | |
| f"最晚一场 {last_sched['simpleHallName']}《{last_sched['filmName']}》{last_sched['startTime'].strftime('%H:%M')} 早于 {latest_limit.strftime('%H:%M')}" | |
| ) | |
| _append_rule_logs(final_log_parts, "\n规则四:最早一场晚于 10:00,最晚一场早于 22:30", logs_r4) | |
| # Rule 5 | |
| logs_r5: List[str] = [] | |
| w5_start = datetime.combine(target_date, dt_time(10, 0)) | |
| w5_end = datetime.combine(target_date, dt_time(23, 0)) | |
| for hall_name in df["simpleHallName"].unique(): | |
| hall_df = df[df["simpleHallName"] == hall_name].sort_values("startTime") | |
| for i in range(len(hall_df) - 1): | |
| prev_end = hall_df.iloc[i]["endTime"] | |
| curr_start = hall_df.iloc[i + 1]["startTime"] | |
| if prev_end < w5_end and curr_start > w5_start: | |
| idle_mins = (curr_start - prev_end).total_seconds() / 60 | |
| if idle_mins > 60: | |
| logs_r5.append(f"{hall_name.replace('厅', '')} 【{prev_end.strftime('%H:%M')} ~ {curr_start.strftime('%H:%M')}】无影片在播,时长 {int(idle_mins)} 分钟") | |
| _append_rule_logs(final_log_parts, "\n规则五:影厅空闲时间超过 1 小时(10:00-23:00)", logs_r5) | |
| # Rule 6 | |
| logs_r6: List[str] = [] | |
| convert_limit = int(params.get("turnaround_base", 10)) | |
| for hall_name in df["simpleHallName"].unique(): | |
| hall_df = df[df["simpleHallName"] == hall_name].sort_values("startTime") | |
| for i in range(len(hall_df) - 1): | |
| prev_sched = hall_df.iloc[i] | |
| next_sched = hall_df.iloc[i + 1] | |
| conversion = (next_sched["startTime"] - prev_sched["endTime"]).total_seconds() / 60 | |
| if conversion < convert_limit: | |
| logs_r6.append( | |
| f"{hall_name.replace('厅', '')} {prev_sched['endTime'].strftime('%H:%M')} 《{prev_sched['filmName']}》结束后影厅空闲时间仅为 {int(conversion)} 分钟" | |
| ) | |
| _append_rule_logs(final_log_parts, "\n规则六:影厅场次转换时间检查", logs_r6) | |
| # Rule 7 | |
| logs_r7: List[str] = [] | |
| if not df.empty: | |
| current_time = df.iloc[0]["startTime"].replace(second=0, microsecond=0) | |
| end_time = df.iloc[-1]["endTime"] | |
| reported_windows = set() | |
| while current_time < end_time: | |
| window_end = current_time + timedelta(minutes=10) | |
| starts_in_window = df[(df["startTime"] >= current_time) & (df["startTime"] < window_end)] | |
| ends_in_window = df[(df["endTime"] > current_time) & (df["endTime"] <= window_end)] | |
| if len(starts_in_window) + len(ends_in_window) > 5: | |
| window_tuple = (current_time.strftime("%H:%M"), window_end.strftime("%H:%M")) | |
| if window_tuple not in reported_windows: | |
| exit_halls = "、".join(sorted(set(ends_in_window["simpleHallName"].tolist()))) | |
| entry_halls = "、".join(sorted(set(starts_in_window["simpleHallName"].tolist()))) | |
| log_msg = f"【{current_time.strftime('%H:%M')} ~ {window_end.strftime('%H:%M')}】" | |
| if exit_halls: | |
| log_msg += f",{exit_halls}集中散场" | |
| if entry_halls: | |
| log_msg += ",同时" if exit_halls else "," | |
| log_msg += f"{entry_halls}即将入场" | |
| log_msg += ",预计人流瞬时压力过大。" | |
| logs_r7.append(log_msg) | |
| reported_windows.add(window_tuple) | |
| current_time += timedelta(minutes=5) | |
| start_groups = df.groupby("startTime").filter(lambda x: len(x) > 3) | |
| for time_val, group in start_groups.groupby("startTime"): | |
| halls = "、".join(sorted(set(group["simpleHallName"].tolist()))) | |
| logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时开场,注意预计人流瞬时压力过大。") | |
| end_groups = df.groupby("endTime").filter(lambda x: len(x) > 3) | |
| for time_val, group in end_groups.groupby("endTime"): | |
| halls = "、".join(sorted(set(group["simpleHallName"].tolist()))) | |
| logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时散场,注意预计人流瞬时压力过大。") | |
| _append_rule_logs(final_log_parts, "\n规则七:动态散场和入场高峰预警", logs_r7) | |
| # Rule 8 | |
| logs_r8: List[str] = [] | |
| for hall_name in df["simpleHallName"].unique(): | |
| hall_df = df[df["simpleHallName"] == hall_name] | |
| last_sched = hall_df.nlargest(1, "endTime").iloc[0] | |
| if last_sched["endTime"].date() == target_date and last_sched["endTime"].time() < dt_time(22, 30): | |
| logs_r8.append(f"{hall_name.replace('厅', '')} 最后一场于【{last_sched['endTime'].strftime('%H:%M')}】结束,过早停运。") | |
| _append_rule_logs(final_log_parts, "\n规则八:影厅结束运营过早预警", logs_r8) | |
| # Rule 9 | |
| logs_r9: List[str] = [] | |
| windows = rule9_core_windows(target_date) | |
| period_str = " 和 ".join([f"{s.strftime('%H:%M')}-{e.strftime('%H:%M')}" for s, e in windows]) | |
| golden_df = df[df["startTime"].apply(lambda x: time_in_ranges(x.time(), windows))] | |
| if not golden_df.empty: | |
| if bo_sorted_movies: | |
| max_bo = float(movie_box_office.get(bo_sorted_movies[0], 0)) | |
| if max_bo > 0: | |
| hot_films = [m for m, v in movie_box_office.items() if v >= max_bo * 0.95] | |
| else: | |
| hot_films = bo_sorted_movies[: int(params.get("rule9_hot_top_n", 3))] | |
| else: | |
| counts = df["clean_filmName"].value_counts() | |
| max_count = int(counts.iloc[0]) if not counts.empty else 0 | |
| hot_films = counts[counts >= max_count * 0.95].index.tolist() if max_count > 0 else [] | |
| min_ratio = float(params.get("rule9_min_ratio", 0.3)) | |
| for film in hot_films: | |
| ratio = float((golden_df["clean_filmName"] == film).sum()) / max(1, len(golden_df)) | |
| if ratio < min_ratio: | |
| logs_r9.append(f"《{film}》在核心黄金时段 {period_str} 排片占比仅为{ratio:.1%},低于 {min_ratio:.0%}。") | |
| _append_rule_logs(final_log_parts, "\n规则九:黄金时段热门影片排片密度检查", logs_r9) | |
| # Rule 10 | |
| logs_r10: List[str] = [] | |
| if today_eff is not None and not today_eff.empty: | |
| tomorrow_stats: Dict[str, Dict[str, int]] = {} | |
| for film in df["clean_filmName"].unique(): | |
| fdf = df[df["clean_filmName"] == film] | |
| tom_total = len(fdf) | |
| tom_golden = len(fdf[fdf["startTime"].apply(lambda x: dt_time(14, 0) <= x.time() <= dt_time(21, 0))]) | |
| tomorrow_stats[film] = {"total": int(tom_total), "golden": int(tom_golden)} | |
| for _, row in today_eff.iterrows(): | |
| film = clean_movie_title(row.get("影片", "")) | |
| if film not in tomorrow_stats: | |
| continue | |
| today_total = int(row.get("场次", 0) or 0) | |
| today_golden = int(row.get("黄金场次", 0) or 0) | |
| fe = float(row.get("场次效率", 0) or 0) | |
| ge = float(row.get("黄金效率", 0) or 0) | |
| tom_total = int(tomorrow_stats[film]["total"]) | |
| tom_golden = int(tomorrow_stats[film]["golden"]) | |
| is_valid = True | |
| if today_total == 1: | |
| if today_golden == 0: | |
| if fe > 1.5: | |
| is_valid = tom_golden >= 1 and tom_total >= 2 | |
| elif fe < 0.5: | |
| is_valid = tom_total in [0, 1] | |
| else: | |
| is_valid = tom_total in [0, 1, 2] | |
| else: | |
| if ge > 1.5: | |
| is_valid = tom_golden >= 2 and tom_total >= 2 | |
| elif ge < 0.5: | |
| is_valid = tom_golden == 0 and tom_total in [0, 1] | |
| else: | |
| is_valid = (tom_total, tom_golden) in [(1, 1), (2, 1), (1, 0)] | |
| else: | |
| if today_golden == 0: | |
| if fe > 1.5: | |
| is_valid = tom_total > today_total and tom_golden >= 1 | |
| elif fe < 0.5: | |
| is_valid = tom_total < today_total and tom_golden == 0 | |
| else: | |
| if fe > 1.5 and ge > 1.5: | |
| is_valid = tom_total > today_total and tom_golden >= today_golden + 1 | |
| elif fe > 1.5 and 0.5 <= ge <= 1.5: | |
| is_valid = tom_total > today_total | |
| elif fe > 1.5 and ge < 0.5: | |
| is_valid = tom_total > today_total and tom_golden <= max(0, today_golden - 1) | |
| elif 0.5 <= fe <= 1.5 and ge > 1.5: | |
| is_valid = tom_golden >= today_golden + 1 | |
| elif 0.5 <= fe <= 1.5 and ge < 0.5: | |
| is_valid = tom_total < today_total and tom_golden <= max(0, today_golden - 1) | |
| elif fe < 0.5 and ge > 1.5: | |
| is_valid = tom_total <= max(1, today_total - 1) and tom_golden >= today_golden | |
| elif fe < 0.5 and 0.5 <= ge <= 1.5: | |
| is_valid = tom_total <= max(1, today_total - 1) | |
| elif fe < 0.5 and ge < 0.5: | |
| is_valid = tom_total <= max(1, today_total - 1) and tom_golden <= max(0, today_golden - 1) | |
| if not is_valid: | |
| film_rows = df[df["clean_filmName"] == film] | |
| locked_cnt = int(film_rows["is_presold"].fillna(False).sum()) if ("is_presold" in film_rows.columns) else 0 | |
| # 预售优先:若次日已有预售锁定场次,与效率建议冲突时可忽略 | |
| if locked_cnt > 0: | |
| continue | |
| logs_r10.append(f"《{film}》全天场次效率:{fe:.2f} 黄金时段场次效率:{ge:.2f} 次日的排片不满足要求。") | |
| _append_rule_logs(final_log_parts, "\n规则十:次日排片效率匹配度检查", logs_r10) | |
| # Rule 11 | |
| logs_r11: List[str] = [] | |
| if bo_sorted_movies: | |
| top_movies = bo_sorted_movies[:3] | |
| top_movies_type = "票房排行前三" | |
| else: | |
| top_movies = df["clean_filmName"].value_counts().head(3).index.tolist() | |
| top_movies_type = "排片量前三" | |
| if top_movies: | |
| after_t = parse_hm(params.get("rule11_after_time", "22:00"), "22:00") | |
| late_sessions = df[df["startTime"].apply(lambda t: t.time() >= after_t or t.time() < dt_time(6, 0))] | |
| late_movies = set(late_sessions["clean_filmName"].unique()) if not late_sessions.empty else set() | |
| if not any(m in late_movies for m in top_movies): | |
| top_movies_str = "、".join([f"《{m}》" for m in top_movies]) | |
| logs_r11.append(f"{top_movies_type}的影片 {top_movies_str} 在 22:00 后均无场次,建议增加热门影片晚场。") | |
| _append_rule_logs(final_log_parts, "\n规则十一:22:00 后热门影片排片检查", logs_r11) | |
| # Rule 12 | |
| logs_r12: List[str] = [] | |
| if bo_sorted_movies: | |
| for movie in bo_sorted_movies[:5]: | |
| movie_df = df[df["clean_filmName"] == movie] | |
| if movie_df.empty: | |
| logs_r12.append(f"《{movie}》为次日票房排行前五的影片,但目前未排片。") | |
| continue | |
| golden_sessions = movie_df[movie_df["startTime"].apply(lambda x: dt_time(14, 0) <= x.time() <= dt_time(21, 0))] | |
| if golden_sessions.empty: | |
| logs_r12.append(f"《{movie}》为次日票房排行前五的影片,但没有安排黄金场(14:00-21:00)。") | |
| else: | |
| logs_r12.append("未获取到次日票房数据,无法检查规则十二。") | |
| _append_rule_logs(final_log_parts, "\n规则十二:次日票房前五的影片必须有一场黄金场", logs_r12) | |
| # Rule 13 | |
| logs_r13: List[str] = [] | |
| restricted = {extract_hall_no(x) for x in params.get("rule13_forbidden_halls", ["2", "8", "9"])} | |
| for _, row in df.iterrows(): | |
| hall_no = extract_hall_no(row.get("hallName")) | |
| if hall_no in restricted and is_3d_by_movie_num_or_media(row.get("movieNum"), row.get("movieMediaType", "")): | |
| logs_r13.append(f"{hall_no}号厅《{row.get('filmName', '未知影片')}》疑似3D排片(movieNum第4位为2)") | |
| _append_rule_logs(final_log_parts, "\n规则十三:2号、8号、9号厅禁止3D排片检查(movieNum第4位为2)", logs_r13) | |
| return "\n".join(final_log_parts) | |
| def schedule_signature(schedule: List[Dict[str, Any]]) -> str: | |
| tokens: List[str] = [] | |
| for s in sorted( | |
| schedule, | |
| key=lambda x: ( | |
| str(x.get("hallId")), | |
| x.get("startTime"), | |
| movie_policy_key(x.get("movieName", ""), x.get("movieMediaType", "")), | |
| ), | |
| ): | |
| tokens.append( | |
| f"{s.get('hallId')}|{movie_policy_key(s.get('movieName',''), s.get('movieMediaType',''))}|" | |
| f"{s.get('startTime').strftime('%H:%M')}|{s.get('endTime').strftime('%H:%M')}" | |
| ) | |
| return "#".join(tokens) | |
| def render_gantt(schedule: List[Dict[str, Any]], date_str: str, tab_key: str) -> None: | |
| if not schedule: | |
| st.info("无排片数据") | |
| return | |
| df = pd.DataFrame(schedule).copy() | |
| if df.empty: | |
| st.info("无排片数据") | |
| return | |
| df["startTime"] = pd.to_datetime(df["startTime"], errors="coerce") | |
| df["endTime"] = pd.to_datetime(df["endTime"], errors="coerce") | |
| df = df.dropna(subset=["hallName", "movieName", "startTime", "endTime"]).copy() | |
| if df.empty: | |
| st.info("无有效排片数据") | |
| return | |
| def _hall_sort_key(h: Any) -> Tuple[int, str]: | |
| nums = re.findall(r"\d+", str(h)) | |
| return (int(nums[0]), str(h)) if nums else (9999, str(h)) | |
| hall_order = sorted(df["hallName"].astype(str).unique().tolist(), key=_hall_sort_key) | |
| t_min = df["startTime"].min().replace(minute=0, second=0, microsecond=0) | |
| t_max = (df["endTime"].max() + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) | |
| total_minutes = max(60.0, (t_max - t_min).total_seconds() / 60.0) | |
| total_hours = max(1, int((t_max - t_min).total_seconds() / 3600)) | |
| palette = [ | |
| "#2A9D8F", | |
| "#E76F51", | |
| "#264653", | |
| "#F4A261", | |
| "#5B8FF9", | |
| "#6DC8EC", | |
| "#5D7092", | |
| "#9270CA", | |
| "#FF9D4D", | |
| "#269A99", | |
| ] | |
| movies = sorted(df["movieName"].astype(str).unique().tolist()) | |
| color_map = {m: palette[i % len(palette)] for i, m in enumerate(movies)} | |
| labels: List[str] = [] | |
| for i in range(total_hours + 1): | |
| labels.append(f'<div class="time-label">{(t_min + timedelta(hours=i)).strftime("%H:%M")}</div>') | |
| time_labels_html = "".join(labels) | |
| halls_html = "" | |
| for hall in hall_order: | |
| row_df = df[df["hallName"].astype(str) == hall].sort_values("startTime") | |
| blocks = "" | |
| for _, r in row_df.iterrows(): | |
| start = r["startTime"] | |
| end = r["endTime"] | |
| left = ((start - t_min).total_seconds() / 60.0 / total_minutes) * 100.0 | |
| width = ((end - start).total_seconds() / 60.0 / total_minutes) * 100.0 | |
| if left < 0: | |
| width += left | |
| left = 0 | |
| width = max(0.4, width) | |
| if left + width > 100: | |
| width = max(0.4, 100 - left) | |
| tooltip = ( | |
| f"{r['movieName']}\\n" | |
| f"{start.strftime('%H:%M')} - {end.strftime('%H:%M')}\\n" | |
| f"{int((end - start).total_seconds() / 60)}min" | |
| ) | |
| blocks += ( | |
| f'<div class="schedule-block" style="left:{left:.3f}%;width:{width:.3f}%;' | |
| f'background-color:{color_map.get(r["movieName"], "#778899")};" title="{tooltip}">' | |
| f'<div class="block-film-name">{r["movieName"]}</div>' | |
| f'<div class="block-time-duration">{start.strftime("%H:%M")}-{end.strftime("%H:%M")}</div>' | |
| "</div>" | |
| ) | |
| halls_html += ( | |
| f'<div class="hall-row"><div class="hall-name-cell">{hall}</div>' | |
| f'<div class="timeline-cell">{blocks}</div></div>' | |
| ) | |
| dt_obj = datetime.strptime(date_str, "%Y-%m-%d") | |
| weekdays = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"] | |
| date_display = f"{dt_obj.strftime('%Y.%m.%d')} {weekdays[dt_obj.weekday()]} • 显示场次:{len(df)}" | |
| half_hour_grid_size = 100 / max(1, total_hours * 2) | |
| html = f""" | |
| <style> | |
| .scroll-wrapper {{width:100%;overflow-x:auto;border:1px solid #dee2e6;border-radius:8px;margin-bottom:1rem;}} | |
| .schedule-container {{font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",sans-serif;background:#fff;min-width:1500px;}} | |
| .header-bar {{display:flex;align-items:center;padding:10px 16px;background:#f8f9fa;border-bottom:1px solid #dee2e6;font-size:15px;font-weight:600;}} | |
| .chart-grid {{display:grid;grid-template-columns:140px 1fr;}} | |
| .top-left-corner {{grid-column:1;grid-row:1;border-bottom:1px solid #dee2e6;border-right:1px solid #dee2e6;background:#f8f9fa;}} | |
| .time-axis-wrapper {{grid-column:2;display:flex;background:#f8f9fa;border-bottom:1px solid #dee2e6;}} | |
| .time-label {{flex:1;text-align:center;padding:8px 0;font-size:12px;color:#555;border-left:1px solid #d3d3d3;}} | |
| .hall-row {{display:contents;}} | |
| .hall-name-cell {{grid-column:1;padding:12px 8px;font-size:13px;font-weight:500;border-right:1px solid #dee2e6;border-top:1px solid #dee2e6;background:#fdfdfd;text-align:center;display:flex;align-items:center;justify-content:center;line-height:1.2;}} | |
| .timeline-cell {{grid-column:2;position:relative;border-top:1px solid #dee2e6;background-image:linear-gradient(to right,#f0f0f0 1px,transparent 1px),linear-gradient(to right,#d3d3d3 1px,transparent 1px);background-size:{half_hour_grid_size}% 100%, {100 / max(1, total_hours)}% 100%;min-height:64px;}} | |
| .schedule-block {{position:absolute;top:4px;bottom:4px;border-radius:5px;padding:6px 10px;color:#fff;overflow:hidden;display:flex;flex-direction:column;justify-content:center;align-items:flex-start;box-sizing:border-box;border:1px solid rgba(0,0,0,0.2);box-shadow:0 1px 3px rgba(0,0,0,0.1);cursor:pointer;}} | |
| .block-film-name,.block-time-duration {{width:100%;white-space:nowrap;overflow:hidden;text-overflow:ellipsis;}} | |
| .block-film-name {{font-weight:600;font-size:13px;line-height:1.25;}} | |
| .block-time-duration {{font-size:11px;opacity:0.95;line-height:1.2;}} | |
| </style> | |
| <div class="scroll-wrapper"> | |
| <div class="schedule-container"> | |
| <div class="header-bar">{date_display}</div> | |
| <div class="chart-grid"> | |
| <div class="top-left-corner"></div> | |
| <div class="time-axis-wrapper">{time_labels_html}</div> | |
| {halls_html} | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| st.markdown(html, unsafe_allow_html=True) | |
| def derive_movies_from_schedule(next_day_schedule: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| pool: Dict[Tuple[Any, str], Dict[str, Any]] = {} | |
| for s in next_day_schedule: | |
| movie_id = s.get("movieId") | |
| movie_name = s.get("movieName") | |
| if not movie_name: | |
| continue | |
| key = (movie_id, clean_movie_title(movie_name)) | |
| if key not in pool: | |
| pool[key] = { | |
| "movieId": movie_id, | |
| "movieNum": s.get("movieNum"), | |
| "movieName": movie_name, | |
| "movieDuration": int(s.get("movieLength") or s.get("movieDuration") or 120), | |
| "movieMediaType": s.get("movieMediaType", ""), | |
| } | |
| return list(pool.values()) | |
| def df_schedule_for_display(schedule: List[Dict[str, Any]]) -> pd.DataFrame: | |
| if not schedule: | |
| return pd.DataFrame() | |
| df = pd.DataFrame(schedule).sort_values(["hallName", "startTime"]) | |
| df["开始"] = pd.to_datetime(df["startTime"]).dt.strftime("%H:%M") | |
| df["结束"] = pd.to_datetime(df["endTime"]).dt.strftime("%H:%M") | |
| out = df[ | |
| ["hallName", "开始", "结束", "movieName", "movieDuration", "movieMediaType", "is_presold", "sold"] | |
| ].rename( | |
| columns={ | |
| "hallName": "影厅", | |
| "movieName": "影片", | |
| "movieDuration": "片长(分钟)", | |
| "movieMediaType": "制式", | |
| "is_presold": "已售锁定", | |
| "sold": "锁定已售票数", | |
| } | |
| ) | |
| return out | |
| def normalize_maintenance_blocks_input(raw_blocks: Any) -> List[Dict[str, Any]]: | |
| if raw_blocks is None: | |
| return [] | |
| if isinstance(raw_blocks, pd.DataFrame): | |
| records = raw_blocks.to_dict("records") | |
| elif isinstance(raw_blocks, list): | |
| records = raw_blocks | |
| elif isinstance(raw_blocks, str): | |
| try: | |
| data = json.loads(raw_blocks) | |
| records = data if isinstance(data, list) else [] | |
| except Exception: | |
| records = [] | |
| else: | |
| records = [] | |
| out: List[Dict[str, Any]] = [] | |
| for r in records: | |
| hall = str((r or {}).get("hall") or (r or {}).get("hallId") or (r or {}).get("hallName") or "").strip() | |
| st_s = str((r or {}).get("start") or "").strip() | |
| et_s = str((r or {}).get("end") or "").strip() | |
| if not hall or not st_s or not et_s: | |
| continue | |
| try: | |
| datetime.strptime(st_s, "%H:%M") | |
| datetime.strptime(et_s, "%H:%M") | |
| except Exception: | |
| continue | |
| out.append({"hall": hall, "start": st_s, "end": et_s}) | |
| return out | |
| def build_presale_rank_from_schedule(next_day_schedule: List[Dict[str, Any]]) -> List[Tuple[str, float, int]]: | |
| score_map: Dict[str, float] = defaultdict(float) | |
| ticket_map: Dict[str, int] = defaultdict(int) | |
| for s in next_day_schedule or []: | |
| mv = movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) | |
| if not mv: | |
| continue | |
| sold_tickets = int(s.get("soldTicketNum") or s.get("buyTicketNum") or 0) | |
| sold_box = float(s.get("soldBoxOffice") or 0.0) | |
| score_map[mv] += max(0.0, sold_box) | |
| ticket_map[mv] += max(0, sold_tickets) | |
| ranked = sorted(score_map.keys(), key=lambda mv: (score_map[mv], ticket_map[mv], mv), reverse=True) | |
| return [(mv, float(score_map[mv]), int(ticket_map[mv])) for mv in ranked] | |
| def _clamp_int(v: Optional[int], low: int = 0) -> Optional[int]: | |
| if v is None: | |
| return None | |
| return max(low, int(v)) | |
| def derive_efficiency_defaults_18_cases( | |
| *, | |
| N: int, | |
| M: int, | |
| D: float, | |
| G: float, | |
| C: int, | |
| ) -> Tuple[Optional[int], Optional[int], Optional[int], Optional[int]]: | |
| """ | |
| 根据 18 种策略给出默认范围(总场次最少/最多、黄金场次最少/最多)。 | |
| """ | |
| N = max(0, int(N)) | |
| M = max(0, int(M)) | |
| C = max(0, int(C)) | |
| D = max(0.0, float(D)) | |
| G = max(0.0, float(G)) | |
| theo_total = int(round(N * (D ** 0.5))) | |
| theo_golden = int(round(M * (G ** 0.5))) | |
| min_total: Optional[int] = None | |
| max_total: Optional[int] = None | |
| min_golden: Optional[int] = None | |
| max_golden: Optional[int] = None | |
| if N == 1: | |
| if M == 0: | |
| if D >= 1.5: # case1 | |
| min_total = 2 | |
| max_total = min(1 + C, theo_total + 1) | |
| min_golden = 1 | |
| max_golden = max_total | |
| elif D >= 0.5: # case2 | |
| min_total = 0 | |
| max_total = 2 | |
| min_golden = 0 | |
| max_golden = 1 | |
| else: # case3 | |
| min_total = 0 | |
| max_total = 1 | |
| min_golden = 0 | |
| max_golden = 0 | |
| else: | |
| if G >= 1.5: # case4 | |
| min_total = 2 | |
| max_total = min(1 + C, int(round(1 * (G ** 0.5))) + 1) | |
| min_golden = 1 | |
| max_golden = max_total | |
| elif G >= 0.5: # case5 | |
| min_total = 1 | |
| max_total = 2 | |
| min_golden = 0 | |
| max_golden = 1 | |
| else: # case6 | |
| min_total = 0 | |
| max_total = 1 | |
| min_golden = 0 | |
| max_golden = 0 | |
| else: | |
| if M == 0: | |
| if D >= 1.5: # case7 | |
| min_total = N + 1 | |
| max_total = max(N + 1, min(N + C, theo_total)) | |
| min_golden = 1 | |
| max_golden = min(max_total, int(round(N * (D ** 0.5) * 0.5))) | |
| elif D >= 0.5: # case8 | |
| min_total = max(1, int(round(N * 0.8))) | |
| max_total = N + 1 | |
| min_golden = 0 | |
| max_golden = 1 | |
| else: # case9 | |
| min_total = max(0, theo_total) | |
| max_total = N - 1 | |
| min_golden = 0 | |
| max_golden = 0 | |
| else: | |
| if D >= 1.5 and G >= 1.5: # case10 | |
| min_total = N + 1 | |
| max_total = max(N + 1, min(N + C, theo_total)) | |
| min_golden = M + 1 | |
| max_golden = max(M + 1, min(M + C, theo_golden, max_total)) | |
| elif D >= 1.5 and 0.5 <= G < 1.5: # case11 | |
| min_total = N + 1 | |
| max_total = max(N + 1, min(N + C, theo_total)) | |
| min_golden = M | |
| max_golden = M + 1 | |
| elif D >= 1.5 and G < 0.5: # case12 | |
| min_total = N + 1 | |
| max_total = max(N + 1, min(N + C, theo_total)) | |
| min_golden = max(0, theo_golden) | |
| max_golden = M - 1 | |
| elif 0.5 <= D < 1.5 and G >= 1.5: # case13 | |
| min_total = N | |
| max_total = min(N + C, max(N + 1, int(round(N * (G ** 0.5))))) | |
| min_golden = M + 1 | |
| max_golden = max(M + 1, min(M + C, theo_golden, max_total)) | |
| elif 0.5 <= D < 1.5 and 0.5 <= G < 1.5: # case14 | |
| min_total = max(M, int(round(N * 0.8))) | |
| max_total = N + 1 | |
| min_golden = max(1, M - 1) | |
| max_golden = M + 1 | |
| elif 0.5 <= D < 1.5 and G < 0.5: # case15 | |
| min_total = max(1, int(round(N * 0.8))) | |
| max_total = N | |
| min_golden = max(0, theo_golden) | |
| max_golden = M - 1 | |
| elif D < 0.5 and G >= 1.5: # case16 | |
| min_total = M | |
| max_total = N - 1 | |
| min_golden = M | |
| max_golden = min(max_total, M + 1) | |
| elif D < 0.5 and 0.5 <= G < 1.5: # case17 | |
| min_total = max(M, theo_total) | |
| max_total = N - 1 | |
| min_golden = max(1, M - 1) | |
| max_golden = M | |
| else: # case18 | |
| min_total = 0 | |
| max_total = max(0, N - 1) | |
| min_golden = 0 | |
| max_golden = max(0, M - 1) | |
| min_total = _clamp_int(min_total) | |
| max_total = _clamp_int(max_total) | |
| min_golden = _clamp_int(min_golden) | |
| max_golden = _clamp_int(max_golden) | |
| # 物理防穿透:最多总场 >= 最少总场;总场 >= 黄金场 | |
| if min_total is not None and max_total is not None and max_total < min_total: | |
| max_total = min_total | |
| if min_total is not None and min_golden is not None and min_golden > min_total: | |
| min_golden = min_total | |
| if max_total is not None and max_golden is not None and max_golden > max_total: | |
| max_golden = max_total | |
| if min_golden is not None and max_golden is not None and max_golden < min_golden: | |
| max_golden = min_golden | |
| return min_total, max_total, min_golden, max_golden | |
| def build_default_tuning_table( | |
| movies: List[Dict[str, Any]], | |
| movie_targets: Dict[str, Dict[str, Any]], | |
| today_eff: pd.DataFrame, | |
| next_day_schedule: List[Dict[str, Any]], | |
| box_office_data: List[Dict[str, Any]], | |
| efficiency_enabled: bool, | |
| rule12_enabled: bool, | |
| daily_delta_cap: int, | |
| ) -> pd.DataFrame: | |
| eff_map: Dict[str, Dict[str, Any]] = {} | |
| if not today_eff.empty: | |
| for _, row in today_eff.iterrows(): | |
| eff_map[movie_policy_key(row.get("影片", ""))] = row.to_dict() | |
| bo_ranked = sort_movies_by_box_office(box_office_data) | |
| nextday_pre_top5 = {m for m, _ in bo_ranked[:5]} if rule12_enabled else set() | |
| today_rank = ( | |
| today_eff.sort_values(["票房", "场次", "影片"], ascending=[False, False, True]).reset_index(drop=True) | |
| if (today_eff is not None and not today_eff.empty) | |
| else pd.DataFrame(columns=["影片", "票房", "场次"]) | |
| ) | |
| today_top10 = {movie_policy_key(x) for x in today_rank["影片"].head(10).tolist()} | |
| presale_rank = build_presale_rank_from_schedule(next_day_schedule) | |
| presale_movies = {m for m, _, t in presale_rank if t > 0} | |
| presale_top5 = {m for m, _, _ in presale_rank[:5]} | |
| rows: List[Dict[str, Any]] = [] | |
| seen_policy: Set[str] = set() | |
| for m in movies: | |
| display_name = str(m.get("movieName") or "").strip() | |
| policy_key = movie_policy_key(display_name, m.get("movieMediaType", "")) | |
| if not display_name or not policy_key or policy_key in seen_policy: | |
| continue | |
| seen_policy.add(policy_key) | |
| mv = policy_key | |
| target = movie_targets.get(mv, {}) | |
| eff = eff_map.get(mv, {}) | |
| today_sessions = int(eff.get("场次", 0) or 0) | |
| today_golden_sessions = int(eff.get("黄金场次", 0) or 0) | |
| day_eff = round(float(eff.get("场次效率", 0) or 0), 3) | |
| golden_eff = round(float(eff.get("黄金效率", 0) or 0), 3) | |
| is_selected = ( | |
| today_sessions > 0 | |
| or mv in today_top10 | |
| or mv in presale_top5 | |
| or mv in presale_movies | |
| or mv in nextday_pre_top5 | |
| ) | |
| min_sessions: Optional[int] = None | |
| max_sessions: Optional[int] = None | |
| min_golden: Optional[int] = None | |
| max_golden: Optional[int] = None | |
| if is_selected and efficiency_enabled: | |
| if today_sessions <= 0: | |
| # 当日未上映:仅“今日票房Top10 / 次日预售Top5”默认允许 0~1 | |
| if mv in today_top10 or mv in presale_top5 or mv in nextday_pre_top5: | |
| min_sessions, max_sessions, min_golden, max_golden = 0, 1, 0, 1 | |
| else: | |
| min_sessions, max_sessions, min_golden, max_golden = None, None, None, None | |
| else: | |
| min_sessions, max_sessions, min_golden, max_golden = derive_efficiency_defaults_18_cases( | |
| N=today_sessions, | |
| M=today_golden_sessions, | |
| D=day_eff, | |
| G=golden_eff, | |
| C=int(daily_delta_cap), | |
| ) | |
| # 规则十二优先:票房Top5至少1个黄金场 | |
| if mv in nextday_pre_top5: | |
| min_golden = max(1, int(min_golden or 0)) | |
| min_sessions = max(1, int(min_sessions or 0)) | |
| max_sessions = max(int(max_sessions or 0), min_sessions) | |
| else: | |
| min_sessions, max_sessions, min_golden, max_golden = None, None, None, None | |
| rows.append( | |
| { | |
| "选中": bool(is_selected), | |
| "影片": display_name, | |
| "今日场次": today_sessions, | |
| "今日黄金场次": today_golden_sessions, | |
| "今日全天效率": day_eff, | |
| "今日黄金效率": golden_eff, | |
| "最少场次": min_sessions, | |
| "最多场次": max_sessions, | |
| "固定场次": None, | |
| "最少黄金场次": min_golden, | |
| "最多黄金场次": max_golden, | |
| "最低场次占比": None, | |
| "最高场次占比": None, | |
| } | |
| ) | |
| df = pd.DataFrame(rows) | |
| if df.empty: | |
| return df | |
| # 排序:当日影城票房降序,其次当日场次降序,再按影片名 | |
| df["_sort_box"] = df["影片"].apply(lambda x: float((eff_map.get(movie_policy_key(x), {}) or {}).get("票房", 0) or 0)) | |
| df["_sort_cnt"] = df["影片"].apply(lambda x: int((eff_map.get(movie_policy_key(x), {}) or {}).get("场次", 0) or 0)) | |
| df = ( | |
| df.sort_values(["_sort_box", "_sort_cnt", "影片"], ascending=[False, False, True]) | |
| .drop(columns=["_sort_box", "_sort_cnt"]) | |
| .reset_index(drop=True) | |
| ) | |
| return coerce_tuning_editor_df(df) | |
| def parse_movie_tuning_constraints(df: pd.DataFrame) -> Dict[str, Dict[str, Optional[float]]]: | |
| if df is None or df.empty: | |
| return {} | |
| def _num(v: Any) -> Optional[float]: | |
| if pd.isna(v) or v in ("", None): | |
| return None | |
| try: | |
| return float(v) | |
| except Exception: | |
| return None | |
| constraints: Dict[str, Dict[str, Optional[float]]] = {} | |
| for _, row in df.iterrows(): | |
| selected = row.get("选中", True) | |
| if pd.notna(selected) and not bool(selected): | |
| continue | |
| mv = movie_policy_key(row.get("影片", "")) | |
| if not mv: | |
| continue | |
| fixed_sessions = _num(row.get("固定场次")) | |
| min_sessions = _num(row.get("最少场次")) | |
| max_sessions = _num(row.get("最多场次")) | |
| min_share_pct = _num(row.get("最低场次占比")) | |
| max_share_pct = _num(row.get("最高场次占比")) | |
| min_golden_sessions = _num(row.get("最少黄金场次")) | |
| max_golden_sessions = _num(row.get("最多黄金场次")) | |
| min_golden_ratio_pct = None | |
| max_golden_ratio_pct = None | |
| if min_share_pct is not None: | |
| min_share_pct = max(0.0, min(100.0, min_share_pct)) | |
| if max_share_pct is not None: | |
| max_share_pct = max(0.0, min(100.0, max_share_pct)) | |
| if min_golden_ratio_pct is not None: | |
| min_golden_ratio_pct = max(0.0, min(100.0, min_golden_ratio_pct)) | |
| if max_golden_ratio_pct is not None: | |
| max_golden_ratio_pct = max(0.0, min(100.0, max_golden_ratio_pct)) | |
| if fixed_sessions is not None: | |
| min_sessions = fixed_sessions | |
| max_sessions = fixed_sessions | |
| if min_sessions is not None and max_sessions is not None and min_sessions > max_sessions: | |
| max_sessions = min_sessions | |
| if min_golden_sessions is not None and max_golden_sessions is not None and min_golden_sessions > max_golden_sessions: | |
| max_golden_sessions = min_golden_sessions | |
| if min_sessions is not None and min_golden_sessions is not None: | |
| min_golden_sessions = min(min_golden_sessions, min_sessions) | |
| if max_sessions is not None and max_golden_sessions is not None: | |
| max_golden_sessions = min(max_golden_sessions, max_sessions) | |
| constraints[mv] = { | |
| "fixed_sessions": int(fixed_sessions) if fixed_sessions is not None else None, | |
| "min_sessions": int(min_sessions) if min_sessions is not None else None, | |
| "max_sessions": int(max_sessions) if max_sessions is not None else None, | |
| "min_share_pct": min_share_pct, | |
| "max_share_pct": max_share_pct, | |
| "min_golden_sessions": int(min_golden_sessions) if min_golden_sessions is not None else None, | |
| "max_golden_sessions": int(max_golden_sessions) if max_golden_sessions is not None else None, | |
| "min_golden_ratio_pct": min_golden_ratio_pct, | |
| "max_golden_ratio_pct": max_golden_ratio_pct, | |
| } | |
| return constraints | |
| def coerce_tuning_editor_df(df: pd.DataFrame) -> pd.DataFrame: | |
| if df is None or df.empty: | |
| return pd.DataFrame() | |
| out = df.copy() | |
| int_cols = ["今日场次", "今日黄金场次", "最少场次", "最多场次", "固定场次", "最少黄金场次", "最多黄金场次"] | |
| float_cols = ["今日全天效率", "今日黄金效率", "最低场次占比", "最高场次占比"] | |
| for col in int_cols: | |
| if col in out.columns: | |
| out[col] = pd.to_numeric(out[col], errors="coerce").astype("Int64") | |
| for col in float_cols: | |
| if col in out.columns: | |
| out[col] = pd.to_numeric(out[col], errors="coerce").astype("Float64") | |
| if "选中" in out.columns: | |
| out["选中"] = out["选中"].fillna(False).astype(bool) | |
| if "影片" in out.columns: | |
| out["影片"] = out["影片"].astype(str).str.strip() | |
| return out | |
| def build_candidate_summary_table( | |
| schedule: List[Dict[str, Any]], | |
| today_eff: pd.DataFrame, | |
| golden_start_dt: datetime, | |
| golden_end_dt: datetime, | |
| ) -> pd.DataFrame: | |
| if not schedule: | |
| return pd.DataFrame( | |
| columns=["影片", "场次", "今日黄金场次", "全天场次效率", "黄金时段场次效率", "次日场次", "次日黄金时段场次"] | |
| ) | |
| df_next = pd.DataFrame(schedule) | |
| df_next["movieClean"] = df_next.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) | |
| next_total = df_next.groupby("movieClean").size().to_dict() | |
| next_golden = df_next[ | |
| (df_next["startTime"] >= golden_start_dt) & (df_next["startTime"] <= golden_end_dt) | |
| ].groupby("movieClean").size().to_dict() | |
| rows: List[Dict[str, Any]] = [] | |
| if today_eff is not None and not today_eff.empty: | |
| for _, row in today_eff.iterrows(): | |
| mv = movie_policy_key(row.get("影片", "")) | |
| rows.append( | |
| { | |
| "影片": mv, | |
| "场次": int(row.get("场次", 0) or 0), | |
| "今日黄金场次": int(row.get("黄金场次", 0) or 0), | |
| "全天场次效率": float(row.get("场次效率", 0) or 0), | |
| "黄金时段场次效率": float(row.get("黄金效率", 0) or 0), | |
| "次日场次": int(next_total.get(mv, 0)), | |
| "次日黄金时段场次": int(next_golden.get(mv, 0)), | |
| } | |
| ) | |
| seen = {r["影片"] for r in rows} | |
| for mv, cnt in next_total.items(): | |
| if mv in seen: | |
| continue | |
| rows.append( | |
| { | |
| "影片": mv, | |
| "场次": 0, | |
| "今日黄金场次": 0, | |
| "全天场次效率": 0.0, | |
| "黄金时段场次效率": 0.0, | |
| "次日场次": int(cnt), | |
| "次日黄金时段场次": int(next_golden.get(mv, 0)), | |
| } | |
| ) | |
| out = pd.DataFrame(rows) | |
| if not out.empty: | |
| out = out.sort_values(["次日场次", "影片"], ascending=[False, True]).reset_index(drop=True) | |
| return out | |
| def build_runtime_config_from_widgets(cfg: Dict[str, Any], widgets: Dict[str, Any]) -> Dict[str, Any]: | |
| rule2_ranges = [x.strip() for x in re.split(r"[\n,]", widgets["rule2_exempt_ranges"]) if x.strip()] | |
| forbidden_halls = [extract_hall_no(x.strip()) for x in re.split(r"[\n,]", widgets["rule13_forbidden_halls"]) if x.strip()] | |
| maintenance_blocks = normalize_maintenance_blocks_input(widgets["maintenance_blocks"]) | |
| if widgets.get("maintenance_blocks") is None: | |
| maintenance_blocks = cfg.get("maintenance_blocks", []) | |
| runtime_cfg = { | |
| "business_start": hm_str(widgets["business_start"]), | |
| "business_end": hm_str(widgets["business_end"]), | |
| "turnaround_base": int(widgets["turnaround_base"]), | |
| "golden_start": hm_str(widgets["golden_start"]), | |
| "golden_end": hm_str(widgets["golden_end"]), | |
| "efficiency_enabled": bool(widgets["efficiency_enabled"]), | |
| "efficiency_penalty_coef": float(widgets["efficiency_penalty_coef"]), | |
| "eff_daily_delta_cap": int(widgets["eff_daily_delta_cap"]), | |
| "rule1_enabled": bool(widgets["rule1_enabled"]), | |
| "rule1_gap": int(widgets["rule1_gap"]), | |
| "rule2_enabled": bool(widgets["rule2_enabled"]), | |
| "rule2_threshold": int(widgets["rule2_threshold"]), | |
| "rule2_window_minutes": int(widgets["rule2_window_minutes"]), | |
| "rule2_penalty": float(widgets["rule2_penalty"]), | |
| "rule2_exempt_ranges": rule2_ranges, | |
| "rule3_enabled": bool(widgets["rule3_enabled"]), | |
| "rule3_gap_minutes": int(widgets["rule3_gap_minutes"]), | |
| "rule3_penalty": float(widgets["rule3_penalty"]), | |
| "rule4_enabled": bool(widgets["rule4_enabled"]), | |
| "rule4_earliest": hm_str(widgets["rule4_earliest"]), | |
| "rule4_latest": hm_str(widgets["rule4_latest"]), | |
| "rule9_enabled": bool(widgets["rule9_enabled"]), | |
| "rule9_hot_top_n": int(widgets["rule9_hot_top_n"]), | |
| "rule9_min_ratio": float(widgets["rule9_min_ratio"]), | |
| "rule9_penalty": float(widgets["rule9_penalty"]), | |
| "rule11_enabled": bool(widgets["rule11_enabled"]), | |
| "rule11_after_time": hm_str(widgets["rule11_after_time"]), | |
| "rule11_penalty": float(widgets["rule11_penalty"]), | |
| "rule12_enabled": bool(widgets["rule12_enabled"]), | |
| "rule12_penalty_each": float(widgets["rule12_penalty_each"]), | |
| "rule13_enabled": bool(widgets["rule13_enabled"]), | |
| "rule13_forbidden_halls": forbidden_halls, | |
| "tms_allowance": int(widgets["tms_allowance"]), | |
| "maintenance_blocks": maintenance_blocks, | |
| "iterations": int(widgets["iterations"]), | |
| "random_seed": int(widgets["random_seed"]), | |
| } | |
| merged = dict(DEFAULT_CONFIG) | |
| merged.update(runtime_cfg) | |
| return merged | |
| def build_locked_movie_policy_set(locked_sessions: List[Dict[str, Any]]) -> Set[str]: | |
| out: Set[str] = set() | |
| for s in locked_sessions or []: | |
| mv = movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) | |
| if mv: | |
| out.add(mv) | |
| return out | |
| def build_rule_context( | |
| bundle: Dict[str, Any], | |
| runtime_cfg: Dict[str, Any], | |
| manual_constraints: Dict[str, Dict[str, Optional[float]]], | |
| allowed_movies: Set[str], | |
| ) -> RuleContext: | |
| return RuleContext( | |
| target_date=bundle["target_date"], | |
| business_start_dt=bundle["biz_start_dt"], | |
| business_end_dt=bundle["biz_end_dt"], | |
| golden_start_dt=bundle["golden_start_dt"], | |
| golden_end_dt=bundle["golden_end_dt"], | |
| params=runtime_cfg, | |
| blockouts_by_hall=bundle["blockouts_by_hall"], | |
| movie_targets=bundle["movie_targets"], | |
| movie_weights=bundle["movie_weights"], | |
| tms_by_hall=bundle["tms_by_hall"], | |
| manual_constraints=manual_constraints, | |
| allowed_movies=allowed_movies, | |
| preview_windows_by_identity=bundle.get("preview_windows_by_identity", {}), | |
| ) | |
| def build_job_payload( | |
| *, | |
| bundle: Dict[str, Any], | |
| runtime_cfg: Dict[str, Any], | |
| manual_constraints: Dict[str, Dict[str, Optional[float]]], | |
| allowed_movies: Set[str], | |
| ) -> Dict[str, Any]: | |
| return { | |
| "target_str": bundle.get("target_str", ""), | |
| "runtime_cfg": dict(runtime_cfg), | |
| "manual_constraints": manual_constraints, | |
| "allowed_movies": set(allowed_movies), | |
| "bundle": { | |
| "target_date": bundle["target_date"], | |
| "target_str": bundle["target_str"], | |
| "hall_name_map": bundle["hall_name_map"], | |
| "locked_sessions": bundle["locked_sessions"], | |
| "movies": bundle["movies"], | |
| "preview_windows_by_identity": bundle.get("preview_windows_by_identity", {}), | |
| "blockouts_by_hall": bundle["blockouts_by_hall"], | |
| "biz_start_dt": bundle["biz_start_dt"], | |
| "biz_end_dt": bundle["biz_end_dt"], | |
| "golden_start_dt": bundle["golden_start_dt"], | |
| "golden_end_dt": bundle["golden_end_dt"], | |
| "today_eff": bundle["today_eff"], | |
| "movie_targets": bundle["movie_targets"], | |
| "movie_weights": bundle["movie_weights"], | |
| "box_office_data": bundle["box_office_data"], | |
| "tms_by_hall": bundle["tms_by_hall"], | |
| }, | |
| } | |
| def optimization_worker() -> None: | |
| try: | |
| payload = _read_pickle(JOB_PAYLOAD_FILE, {}) | |
| if not isinstance(payload, dict) or not payload: | |
| write_job_state(status="failed", message="后台任务参数缺失", control="run") | |
| return | |
| bundle = payload.get("bundle") or {} | |
| runtime_cfg = payload.get("runtime_cfg") or dict(DEFAULT_CONFIG) | |
| manual_constraints = payload.get("manual_constraints") or {} | |
| allowed_movies = set(payload.get("allowed_movies") or []) | |
| target_str = str(payload.get("target_str") or "") | |
| iterations = int(runtime_cfg.get("iterations", 0) or 0) | |
| if iterations <= 0: | |
| write_job_state(status="failed", message="迭代次数必须大于0", control="run") | |
| return | |
| state0 = read_job_state() | |
| start_ts = float(state0.get("started_ts") or 0.0) | |
| if start_ts <= 0: | |
| start_ts = time.time() | |
| ctx = build_rule_context(bundle, runtime_cfg, manual_constraints, allowed_movies) | |
| feasible_map: Dict[str, CandidateResult] = {} | |
| hard_reject = 0 | |
| build_reject = 0 | |
| rule_reject = 0 | |
| reject_reason_counter: Counter = Counter() | |
| reject_detail_counter: Counter = Counter() | |
| reject_phase_counter: Counter = Counter() | |
| reject_examples: Dict[str, List[str]] = defaultdict(list) | |
| last_push_ts = 0.0 | |
| for i in range(iterations): | |
| state = read_job_state() | |
| control = str(state.get("control") or "run") | |
| elapsed_now = max(0.0, time.time() - start_ts) | |
| if control == "stop": | |
| write_job_state( | |
| status="stopped", | |
| message="任务已停止", | |
| iter_done=i, | |
| progress=round(i / iterations, 4), | |
| feasible_count=len(feasible_map), | |
| hard_reject=hard_reject, | |
| build_reject=build_reject, | |
| rule_reject=rule_reject, | |
| reject_reason_top=dict(sorted(reject_reason_counter.items(), key=lambda x: x[1], reverse=True)[:10]), | |
| reject_detail_top=dict(sorted(reject_detail_counter.items(), key=lambda x: x[1], reverse=True)[:10]), | |
| elapsed_seconds=round(elapsed_now, 2), | |
| ended_at=_now_text(), | |
| result_count=0, | |
| control="run", | |
| ) | |
| return | |
| while control == "pause": | |
| write_job_state( | |
| status="paused", | |
| message="任务已暂停", | |
| iter_done=i, | |
| progress=round(i / iterations, 4), | |
| feasible_count=len(feasible_map), | |
| hard_reject=hard_reject, | |
| build_reject=build_reject, | |
| rule_reject=rule_reject, | |
| reject_reason_top=dict(sorted(reject_reason_counter.items(), key=lambda x: x[1], reverse=True)[:10]), | |
| reject_detail_top=dict(sorted(reject_detail_counter.items(), key=lambda x: x[1], reverse=True)[:10]), | |
| elapsed_seconds=round(max(0.0, time.time() - start_ts), 2), | |
| ) | |
| time.sleep(0.5) | |
| state = read_job_state() | |
| control = str(state.get("control") or "run") | |
| if control == "stop": | |
| write_job_state( | |
| status="stopped", | |
| message="任务已停止", | |
| iter_done=i, | |
| progress=round(i / iterations, 4), | |
| feasible_count=len(feasible_map), | |
| hard_reject=hard_reject, | |
| build_reject=build_reject, | |
| rule_reject=rule_reject, | |
| reject_reason_top=dict(sorted(reject_reason_counter.items(), key=lambda x: x[1], reverse=True)[:10]), | |
| reject_detail_top=dict(sorted(reject_detail_counter.items(), key=lambda x: x[1], reverse=True)[:10]), | |
| elapsed_seconds=round(max(0.0, time.time() - start_ts), 2), | |
| ended_at=_now_text(), | |
| result_count=0, | |
| control="run", | |
| ) | |
| return | |
| if runtime_cfg.get("random_seed", 0) > 0: | |
| random.seed(int(runtime_cfg["random_seed"]) + i) | |
| fail_reasons: List[str] = [] | |
| sched = simulate_one_candidate( | |
| movies=bundle["movies"], | |
| hall_name_map=bundle["hall_name_map"], | |
| locked_sessions=bundle["locked_sessions"], | |
| ctx=ctx, | |
| fail_reason_out=fail_reasons, | |
| ) | |
| if sched is None: | |
| hard_reject += 1 | |
| build_reject += 1 | |
| reject_phase_counter["构造阶段失败"] += 1 | |
| reason = fail_reasons[0] if fail_reasons else "构造失败:随机构造阶段无可行候选" | |
| norm = normalize_reject_reason(reason) | |
| reject_reason_counter[norm] += 1 | |
| reject_detail_counter[reason] += 1 | |
| if len(reject_examples[norm]) < 5: | |
| reject_examples[norm].append(reason) | |
| else: | |
| hard_violations = validate_hard_rules(sched, bundle["locked_sessions"], ctx) | |
| if hard_violations: | |
| hard_reject += 1 | |
| rule_reject += 1 | |
| reject_phase_counter["硬性规则淘汰"] += 1 | |
| seen_reason: Set[str] = set() | |
| for rv in hard_violations: | |
| norm = normalize_reject_reason(rv) | |
| if norm in seen_reason: | |
| continue | |
| seen_reason.add(norm) | |
| reject_reason_counter[norm] += 1 | |
| reject_detail_counter[str(rv)] += 1 | |
| if len(reject_examples[norm]) < 5: | |
| reject_examples[norm].append(str(rv)) | |
| else: | |
| cand = score_candidate( | |
| schedule=sched, | |
| ctx=ctx, | |
| today_eff=bundle["today_eff"], | |
| locked_sessions=bundle["locked_sessions"], | |
| box_office_data=bundle["box_office_data"], | |
| ) | |
| sig = schedule_signature(sched) | |
| prev = feasible_map.get(sig) | |
| if prev is None or cand.score > prev.score: | |
| feasible_map[sig] = cand | |
| now_ts = time.time() | |
| if (i + 1 == iterations) or (i % 5 == 0) or (now_ts - last_push_ts >= 0.5): | |
| write_job_state( | |
| status="running", | |
| message=f"运行中:第 {i + 1}/{iterations} 轮,可行方案 {len(feasible_map)}", | |
| iter_done=i + 1, | |
| progress=round((i + 1) / iterations, 4), | |
| feasible_count=len(feasible_map), | |
| hard_reject=hard_reject, | |
| build_reject=build_reject, | |
| rule_reject=rule_reject, | |
| reject_reason_top=dict(sorted(reject_reason_counter.items(), key=lambda x: x[1], reverse=True)[:10]), | |
| reject_detail_top=dict(sorted(reject_detail_counter.items(), key=lambda x: x[1], reverse=True)[:10]), | |
| elapsed_seconds=round(max(0.0, now_ts - start_ts), 2), | |
| ) | |
| last_push_ts = now_ts | |
| elapsed_total = round(max(0.0, time.time() - start_ts), 2) | |
| reject_reason_stats = dict(sorted(reject_reason_counter.items(), key=lambda x: x[1], reverse=True)) | |
| reject_detail_stats = dict(sorted(reject_detail_counter.items(), key=lambda x: x[1], reverse=True)) | |
| reject_phase_stats = dict(reject_phase_counter) | |
| reject_examples_out = {k: v for k, v in reject_examples.items()} | |
| results = sorted(feasible_map.values(), key=lambda x: x.score, reverse=True) | |
| if not results: | |
| _atomic_write_pickle( | |
| JOB_RESULT_FILE, | |
| { | |
| "results": [], | |
| "all_results_count": 0, | |
| "hard_reject": hard_reject, | |
| "build_reject": build_reject, | |
| "rule_reject": rule_reject, | |
| "reject_reason_stats": reject_reason_stats, | |
| "reject_detail_stats": reject_detail_stats, | |
| "reject_phase_stats": reject_phase_stats, | |
| "reject_examples": reject_examples_out, | |
| "elapsed_seconds": elapsed_total, | |
| "target_str": target_str, | |
| "runtime_cfg": runtime_cfg, | |
| "box_office_data": bundle.get("box_office_data", []), | |
| }, | |
| ) | |
| write_job_state( | |
| status="failed", | |
| message="未生成任何满足硬性约束的方案", | |
| iter_done=iterations, | |
| progress=1.0, | |
| feasible_count=0, | |
| hard_reject=hard_reject, | |
| build_reject=build_reject, | |
| rule_reject=rule_reject, | |
| reject_reason_top=dict(list(reject_reason_stats.items())[:10]), | |
| reject_detail_top=dict(list(reject_detail_stats.items())[:10]), | |
| elapsed_seconds=elapsed_total, | |
| ended_at=_now_text(), | |
| result_count=0, | |
| control="run", | |
| ) | |
| return | |
| result_payload = { | |
| "results": [serialize_candidate(x) for x in results[:10]], | |
| "all_results_count": len(results), | |
| "hard_reject": hard_reject, | |
| "build_reject": build_reject, | |
| "rule_reject": rule_reject, | |
| "reject_reason_stats": reject_reason_stats, | |
| "reject_detail_stats": reject_detail_stats, | |
| "reject_phase_stats": reject_phase_stats, | |
| "reject_examples": reject_examples_out, | |
| "elapsed_seconds": elapsed_total, | |
| "target_str": target_str, | |
| "locked_count": len(bundle.get("locked_sessions", [])), | |
| "movie_targets": bundle.get("movie_targets", {}), | |
| "today_eff": bundle.get("today_eff", pd.DataFrame()), | |
| "golden_start_dt": bundle.get("golden_start_dt"), | |
| "golden_end_dt": bundle.get("golden_end_dt"), | |
| "box_office_data": bundle.get("box_office_data", []), | |
| "runtime_cfg": runtime_cfg, | |
| } | |
| _atomic_write_pickle(JOB_RESULT_FILE, result_payload) | |
| write_job_state( | |
| status="completed", | |
| message=f"完成:可行方案 {len(results)},已保存 Top{min(10, len(results))}", | |
| iter_done=iterations, | |
| progress=1.0, | |
| feasible_count=len(results), | |
| hard_reject=hard_reject, | |
| build_reject=build_reject, | |
| rule_reject=rule_reject, | |
| reject_reason_top=dict(list(reject_reason_stats.items())[:10]), | |
| reject_detail_top=dict(list(reject_detail_stats.items())[:10]), | |
| elapsed_seconds=elapsed_total, | |
| ended_at=_now_text(), | |
| result_count=min(10, len(results)), | |
| control="run", | |
| ) | |
| except Exception as e: | |
| state = read_job_state() | |
| start_ts = float(state.get("started_ts") or 0.0) | |
| elapsed = round(max(0.0, time.time() - start_ts), 2) if start_ts > 0 else 0.0 | |
| write_job_state( | |
| status="failed", | |
| message=f"后台任务异常: {e}", | |
| elapsed_seconds=elapsed, | |
| ended_at=_now_text(), | |
| control="run", | |
| ) | |
| def start_background_job(payload: Dict[str, Any]) -> Tuple[bool, str]: | |
| global _JOB_THREAD | |
| worker = _find_live_worker() | |
| state = read_job_state() | |
| if worker is not None and state.get("status") in {"running", "paused"}: | |
| return False, "已有后台任务在运行,请先暂停/停止。" | |
| target_str = str(payload.get("target_str") or "") | |
| iterations = int((payload.get("runtime_cfg") or {}).get("iterations", 0) or 0) | |
| if iterations <= 0: | |
| return False, "迭代次数必须大于0。" | |
| _atomic_write_pickle(JOB_PAYLOAD_FILE, payload) | |
| job_id = str(int(time.time() * 1000)) | |
| write_job_state( | |
| status="running", | |
| control="run", | |
| job_id=job_id, | |
| started_at=_now_text(), | |
| started_ts=time.time(), | |
| ended_at="", | |
| target_date=target_str, | |
| iterations=iterations, | |
| iter_done=0, | |
| progress=0.0, | |
| elapsed_seconds=0.0, | |
| feasible_count=0, | |
| hard_reject=0, | |
| build_reject=0, | |
| rule_reject=0, | |
| reject_reason_top={}, | |
| reject_detail_top={}, | |
| result_count=0, | |
| message="后台任务已启动", | |
| ) | |
| _JOB_THREAD = threading.Thread(target=optimization_worker, name="nextday-opt-worker", daemon=True) | |
| _JOB_THREAD.start() | |
| return True, "后台任务已启动" | |
| def main() -> None: | |
| st.title("🎬 次日排片最优化(随机贪心构造 + 蒙特卡洛评估)") | |
| st.caption( | |
| "按规则自动生成次日排片。硬性规则不满足直接淘汰,软性规则按评分排序,输出 Top10 方案和甘特图。" | |
| ) | |
| cfg = load_config() | |
| c_date, c_target = st.columns([1, 2]) | |
| with c_date: | |
| base_date = st.date_input("基准日期(默认今天)", value=date.today(), key="opt_base_date") | |
| with c_target: | |
| target_date = base_date + timedelta(days=1) | |
| st.info(f"将基于 **{base_date.strftime('%Y-%m-%d')}** 的规则与数据,生成 **{target_date.strftime('%Y-%m-%d')}** 的排片。") | |
| st.markdown("### 参数配置(全部在页面内展示,JSON 持久化)") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.markdown("**营业与黄金时段**") | |
| w_business_start = st.time_input("营业开始", value=parse_hm(cfg["business_start"], "09:30"), key="w_business_start") | |
| w_business_end = st.time_input("营业结束", value=parse_hm(cfg["business_end"], "01:30"), key="w_business_end") | |
| w_turnaround_base = st.number_input("场次转换基准(分钟)", min_value=5, max_value=40, value=int(cfg["turnaround_base"]), step=1, key="w_turnaround_base") | |
| st.caption(f"硬约束转换范围:{max(1, int(w_turnaround_base)-3)} - {int(w_turnaround_base)+5} 分钟") | |
| w_golden_start = st.time_input("黄金时段开始", value=parse_hm(cfg["golden_start"], "14:00"), key="w_golden_start") | |
| w_golden_end = st.time_input("黄金时段结束", value=parse_hm(cfg["golden_end"], "21:00"), key="w_golden_end") | |
| w_efficiency_enabled = st.checkbox("启用排片效率分析表(18规则)", value=bool(cfg["efficiency_enabled"]), key="w_efficiency_enabled") | |
| w_efficiency_penalty_coef = st.number_input( | |
| "效率规则扣分系数", | |
| min_value=0.1, | |
| max_value=10.0, | |
| value=float(cfg.get("efficiency_penalty_coef", 1.0)), | |
| step=0.1, | |
| key="w_efficiency_penalty_coef", | |
| disabled=not w_efficiency_enabled, | |
| ) | |
| w_eff_daily_delta_cap = st.number_input( | |
| "效率日增减上限 C", | |
| min_value=0, | |
| max_value=20, | |
| value=int(cfg.get("eff_daily_delta_cap", 5)), | |
| step=1, | |
| key="w_eff_daily_delta_cap", | |
| disabled=not w_efficiency_enabled, | |
| ) | |
| with col2: | |
| st.markdown("**硬性规则参数**") | |
| w_rule1_enabled = st.checkbox("规则一:同影片最小开场间隔", value=bool(cfg["rule1_enabled"]), key="w_rule1_enabled") | |
| w_rule1_gap = st.number_input("规则一间隔(分钟)", min_value=5, max_value=180, value=int(cfg["rule1_gap"]), step=1, key="w_rule1_gap", disabled=not w_rule1_enabled) | |
| w_rule4_enabled = st.checkbox("规则四:首末场强约束", value=bool(cfg["rule4_enabled"]), key="w_rule4_enabled") | |
| w_rule4_earliest = st.time_input("最早场不得晚于", value=parse_hm(cfg["rule4_earliest"], "10:00"), key="w_rule4_earliest", disabled=not w_rule4_enabled) | |
| w_rule4_latest = st.time_input("最晚场不得早于", value=parse_hm(cfg["rule4_latest"], "22:30"), key="w_rule4_latest", disabled=not w_rule4_enabled) | |
| w_rule13_enabled = st.checkbox("规则十三:2/8/9号厅禁3D", value=bool(cfg["rule13_enabled"]), key="w_rule13_enabled") | |
| w_rule13_forbidden_halls = st.text_input( | |
| "禁3D影厅号(逗号分隔)", | |
| value=",".join([str(x) for x in cfg.get("rule13_forbidden_halls", ["2", "8", "9"])]), | |
| key="w_rule13_forbidden_halls", | |
| disabled=not w_rule13_enabled, | |
| ) | |
| w_tms_allowance = st.number_input("TMS 缺片允许场次数", min_value=0, max_value=50, value=int(cfg["tms_allowance"]), step=1, key="w_tms_allowance") | |
| with col3: | |
| st.markdown("**软性规则参数**") | |
| w_rule2_enabled = st.checkbox("规则二:30分钟内开场超阈值", value=bool(cfg["rule2_enabled"]), key="w_rule2_enabled") | |
| w_rule2_threshold = st.number_input("规则二阈值(场)", min_value=1, max_value=20, value=int(cfg["rule2_threshold"]), step=1, key="w_rule2_threshold", disabled=not w_rule2_enabled) | |
| w_rule2_window_minutes = st.number_input("规则二窗口(分钟)", min_value=5, max_value=120, value=int(cfg["rule2_window_minutes"]), step=5, key="w_rule2_window_minutes", disabled=not w_rule2_enabled) | |
| w_rule2_penalty = st.number_input("规则二每超1场扣分", min_value=1.0, max_value=200.0, value=float(cfg["rule2_penalty"]), step=1.0, key="w_rule2_penalty", disabled=not w_rule2_enabled) | |
| w_rule3_enabled = st.checkbox("规则三:场次开场断档扣分", value=bool(cfg["rule3_enabled"]), key="w_rule3_enabled") | |
| w_rule3_gap_minutes = st.number_input("规则三断档阈值(分钟)", min_value=10, max_value=180, value=int(cfg["rule3_gap_minutes"]), step=5, key="w_rule3_gap_minutes", disabled=not w_rule3_enabled) | |
| w_rule3_penalty = st.number_input("规则三扣分系数", min_value=1.0, max_value=100.0, value=float(cfg["rule3_penalty"]), step=1.0, key="w_rule3_penalty", disabled=not w_rule3_enabled) | |
| w_rule9_enabled = st.checkbox("规则九:黄金热门密度", value=bool(cfg["rule9_enabled"]), key="w_rule9_enabled") | |
| w_rule9_hot_top_n = st.number_input("规则九热门TopN", min_value=1, max_value=10, value=int(cfg["rule9_hot_top_n"]), step=1, key="w_rule9_hot_top_n", disabled=not w_rule9_enabled) | |
| w_rule9_min_ratio = st.slider("规则九最小占比", min_value=0.05, max_value=0.90, value=float(cfg["rule9_min_ratio"]), step=0.05, key="w_rule9_min_ratio", disabled=not w_rule9_enabled) | |
| w_rule9_penalty = st.number_input("规则九扣分", min_value=1.0, max_value=200.0, value=float(cfg["rule9_penalty"]), step=1.0, key="w_rule9_penalty", disabled=not w_rule9_enabled) | |
| st.markdown("**规则二豁免时段(可多个,逗号或换行分隔)**") | |
| w_rule2_exempt_ranges = st.text_area( | |
| "例如:14:00-15:00, 19:00-20:00", | |
| value=", ".join(cfg.get("rule2_exempt_ranges", ["14:00-15:00", "19:00-20:00"])), | |
| height=70, | |
| key="w_rule2_exempt_ranges", | |
| ) | |
| c4, c5, c6 = st.columns(3) | |
| with c4: | |
| w_rule11_enabled = st.checkbox("规则十一:22:00后热门影片", value=bool(cfg["rule11_enabled"]), key="w_rule11_enabled") | |
| w_rule11_after_time = st.time_input("规则十一起算时间", value=parse_hm(cfg["rule11_after_time"], "22:00"), key="w_rule11_after_time", disabled=not w_rule11_enabled) | |
| w_rule11_penalty = st.number_input("规则十一扣分", min_value=1.0, max_value=200.0, value=float(cfg["rule11_penalty"]), step=1.0, key="w_rule11_penalty", disabled=not w_rule11_enabled) | |
| with c5: | |
| w_rule12_enabled = st.checkbox("规则十二:票房Top5需黄金场", value=bool(cfg["rule12_enabled"]), key="w_rule12_enabled") | |
| w_rule12_penalty_each = st.number_input("规则十二每片扣分", min_value=1.0, max_value=200.0, value=float(cfg["rule12_penalty_each"]), step=1.0, key="w_rule12_penalty_each", disabled=not w_rule12_enabled) | |
| w_iterations = st.number_input("Monte Carlo迭代次数", min_value=20, max_value=100000, value=int(cfg["iterations"]), step=10, key="w_iterations") | |
| with c6: | |
| w_random_seed = st.number_input("随机种子(可复现)", min_value=0, max_value=99999999, value=int(cfg["random_seed"]), step=1, key="w_random_seed") | |
| st.markdown("**影厅维护/包场时段(可直接增删行,更便捷)**") | |
| maintenance_df_default = pd.DataFrame( | |
| cfg.get("maintenance_blocks", []), | |
| columns=["hall", "start", "end"], | |
| ) | |
| w_maintenance_blocks = st.data_editor( | |
| maintenance_df_default, | |
| num_rows="dynamic", | |
| use_container_width=True, | |
| hide_index=True, | |
| key="w_maintenance_blocks_editor", | |
| column_config={ | |
| "hall": st.column_config.TextColumn("影厅", help="例如 2号厅 / 2"), | |
| "start": st.column_config.TextColumn("开始", help="HH:MM"), | |
| "end": st.column_config.TextColumn("结束", help="HH:MM"), | |
| }, | |
| ) | |
| runtime_cfg = build_runtime_config_from_widgets( | |
| cfg, | |
| { | |
| "business_start": w_business_start, | |
| "business_end": w_business_end, | |
| "turnaround_base": w_turnaround_base, | |
| "golden_start": w_golden_start, | |
| "golden_end": w_golden_end, | |
| "efficiency_enabled": w_efficiency_enabled, | |
| "efficiency_penalty_coef": w_efficiency_penalty_coef, | |
| "eff_daily_delta_cap": w_eff_daily_delta_cap, | |
| "rule1_enabled": w_rule1_enabled, | |
| "rule1_gap": w_rule1_gap, | |
| "rule2_enabled": w_rule2_enabled, | |
| "rule2_threshold": w_rule2_threshold, | |
| "rule2_window_minutes": w_rule2_window_minutes, | |
| "rule2_penalty": w_rule2_penalty, | |
| "rule2_exempt_ranges": w_rule2_exempt_ranges, | |
| "rule3_enabled": w_rule3_enabled, | |
| "rule3_gap_minutes": w_rule3_gap_minutes, | |
| "rule3_penalty": w_rule3_penalty, | |
| "rule4_enabled": w_rule4_enabled, | |
| "rule4_earliest": w_rule4_earliest, | |
| "rule4_latest": w_rule4_latest, | |
| "rule9_enabled": w_rule9_enabled, | |
| "rule9_hot_top_n": w_rule9_hot_top_n, | |
| "rule9_min_ratio": w_rule9_min_ratio, | |
| "rule9_penalty": w_rule9_penalty, | |
| "rule11_enabled": w_rule11_enabled, | |
| "rule11_after_time": w_rule11_after_time, | |
| "rule11_penalty": w_rule11_penalty, | |
| "rule12_enabled": w_rule12_enabled, | |
| "rule12_penalty_each": w_rule12_penalty_each, | |
| "rule13_enabled": w_rule13_enabled, | |
| "rule13_forbidden_halls": w_rule13_forbidden_halls, | |
| "tms_allowance": w_tms_allowance, | |
| "maintenance_blocks": w_maintenance_blocks, | |
| "iterations": w_iterations, | |
| "random_seed": w_random_seed, | |
| }, | |
| ) | |
| b1, b2 = st.columns([1, 5]) | |
| with b1: | |
| if st.button("↩ 恢复默认参数", use_container_width=True): | |
| save_config(dict(DEFAULT_CONFIG)) | |
| st.rerun() | |
| save_config(runtime_cfg) | |
| st.caption(f"参数已自动保存到 {CONFIG_FILE}(日期参数不写入配置)") | |
| st.divider() | |
| target_str = target_date.strftime("%Y-%m-%d") | |
| today_str = base_date.strftime("%Y-%m-%d") | |
| load_btn = st.button("📥 加载数据并生成可编辑微调约束", type="primary", use_container_width=True) | |
| if load_btn: | |
| with st.status("正在加载数据...", expanded=True) as status: | |
| status.write("1/7 拉取次日/当日排片、影厅座位") | |
| next_day_schedule, hall_seat_map, err_next = fetch_schedule_and_halls(target_str) | |
| today_schedule, _, err_today = fetch_schedule_and_halls(today_str) | |
| if err_next: | |
| status.update(label="失败:次日排片接口异常", state="error") | |
| st.error(f"次日排片拉取失败:{err_next}") | |
| st.stop() | |
| if err_today: | |
| status.update(label="失败:当日排片接口异常", state="error") | |
| st.error(f"当日排片拉取失败:{err_today}") | |
| st.stop() | |
| hall_name_map = build_hall_name_map(next_day_schedule, hall_seat_map) | |
| locked_sessions = build_locked_sessions(next_day_schedule, target_date) | |
| status.write("2/7 拉取次日可放映电影") | |
| movies = fetch_movie_info_for_date(target_str) | |
| if not movies: | |
| status.update(label="失败:未获取到可放映电影", state="error") | |
| st.error("getMovieInfo 接口未返回可放映电影,无法生成排片。") | |
| st.stop() | |
| movies = dedupe_movies_by_policy_key(movies) | |
| preview_windows_by_identity = build_preview_windows_for_movies(target_date, movies) | |
| status.write("3/7 构建包场/维护窗口、黄金时段、营业时间") | |
| blockouts = parse_blockouts_from_config(target_date, runtime_cfg.get("maintenance_blocks", [])) | |
| blockouts_by_hall = build_hall_blockouts(blockouts, hall_name_map) | |
| biz_start_t = parse_hm(runtime_cfg["business_start"], "09:30") | |
| biz_end_t = parse_hm(runtime_cfg["business_end"], "01:30") | |
| golden_start_t = parse_hm(runtime_cfg["golden_start"], "14:00") | |
| golden_end_t = parse_hm(runtime_cfg["golden_end"], "21:00") | |
| biz_start_dt = parse_operating_dt(target_date, biz_start_t) | |
| biz_end_dt = parse_operating_dt(target_date, biz_end_t) | |
| if biz_end_dt <= biz_start_dt: | |
| biz_end_dt += timedelta(days=1) | |
| golden_start_dt = parse_operating_dt(target_date, golden_start_t) | |
| golden_end_dt = parse_operating_dt(target_date, golden_end_t) | |
| if golden_end_dt < golden_start_dt: | |
| golden_end_dt += timedelta(days=1) | |
| status.write("4/7 拉取票房Top与TMS信息") | |
| box_office_data = fetch_realtime_box_office(target_str) | |
| if not box_office_data: | |
| box_office_data = fetch_realtime_box_office(today_str) | |
| tms_rows = fetch_tms_server_movies_raw() | |
| tms_by_hall = build_tms_index_by_hall(tms_rows) | |
| status.write("5/7 生成可放映电影微调约束(可编辑)") | |
| today_eff = build_today_efficiency(today_schedule, hall_seat_map, golden_start_t, golden_end_t) | |
| movie_targets = build_movie_targets( | |
| movies=movies, | |
| today_eff=today_eff, | |
| locked_sessions=locked_sessions, | |
| box_office_data=box_office_data, | |
| rule12_enabled=bool(runtime_cfg["rule12_enabled"]), | |
| ) | |
| movie_weights = build_movie_weights(movies, movie_targets, box_office_data) | |
| tuning_df = build_default_tuning_table( | |
| movies=movies, | |
| movie_targets=movie_targets, | |
| today_eff=today_eff, | |
| next_day_schedule=next_day_schedule, | |
| box_office_data=box_office_data, | |
| efficiency_enabled=bool(runtime_cfg["efficiency_enabled"]), | |
| rule12_enabled=bool(runtime_cfg["rule12_enabled"]), | |
| daily_delta_cap=int(runtime_cfg.get("eff_daily_delta_cap", 5)), | |
| ) | |
| st.session_state["nextday_loaded_bundle"] = { | |
| "target_date": target_date, | |
| "target_str": target_str, | |
| "today_str": today_str, | |
| "next_day_schedule": next_day_schedule, | |
| "today_schedule_raw": list(today_schedule), | |
| "today_schedule_excluded_labels": [], | |
| "today_schedule": today_schedule, | |
| "hall_seat_map": hall_seat_map, | |
| "hall_name_map": hall_name_map, | |
| "locked_sessions": locked_sessions, | |
| "movies": movies, | |
| "preview_windows_by_identity": preview_windows_by_identity, | |
| "blockouts_by_hall": blockouts_by_hall, | |
| "biz_start_dt": biz_start_dt, | |
| "biz_end_dt": biz_end_dt, | |
| "golden_start_dt": golden_start_dt, | |
| "golden_end_dt": golden_end_dt, | |
| "today_eff": today_eff, | |
| "movie_targets": movie_targets, | |
| "movie_weights": movie_weights, | |
| "box_office_data": box_office_data, | |
| "tms_by_hall": tms_by_hall, | |
| } | |
| st.session_state["nextday_tuning_df"] = coerce_tuning_editor_df(tuning_df) | |
| st.session_state.pop("nextday_tuning_editor", None) | |
| status.update(label="完成:已加载数据并生成微调约束", state="complete") | |
| bundle = st.session_state.get("nextday_loaded_bundle") | |
| job_state = read_job_state() | |
| if job_state.get("status") in {"running", "paused"} and _find_live_worker() is None: | |
| if int(job_state.get("iter_done") or 0) < int(job_state.get("iterations") or 0): | |
| job_state = write_job_state(status="failed", control="run", message="后台任务已中断,请重新启动。") | |
| job_result = _read_pickle(JOB_RESULT_FILE, {}) | |
| if bundle: | |
| if bundle.get("target_str") != target_str: | |
| st.warning("你已切换日期,请重新点击“加载数据并生成可编辑微调约束”。") | |
| else: | |
| raw_today_schedule = bundle.get("today_schedule_raw") or bundle.get("today_schedule") or [] | |
| excluded_default = bundle.get("today_schedule_excluded_labels", []) | |
| exclude_options = [session_display_label(s) for s in raw_today_schedule] | |
| exclude_options = sorted(list(dict.fromkeys(exclude_options))) | |
| with st.expander("🗑️ 剔除特殊场次 (包场/无效数据)", expanded=False): | |
| exclude_key = f"nextday_today_exclude_{target_str}" | |
| selected_labels = st.multiselect( | |
| "选择需要剔除的当日场次(仅影响效率分析与默认微调建议)", | |
| options=exclude_options, | |
| default=[x for x in excluded_default if x in exclude_options], | |
| key=exclude_key, | |
| ) | |
| if sorted(selected_labels) != sorted(excluded_default): | |
| filtered_today_schedule = apply_session_exclusions(raw_today_schedule, selected_labels) | |
| golden_start_t = parse_hm(runtime_cfg["golden_start"], "14:00") | |
| golden_end_t = parse_hm(runtime_cfg["golden_end"], "21:00") | |
| today_eff = build_today_efficiency( | |
| filtered_today_schedule, | |
| bundle["hall_seat_map"], | |
| golden_start_t, | |
| golden_end_t, | |
| ) | |
| movie_targets = build_movie_targets( | |
| movies=bundle["movies"], | |
| today_eff=today_eff, | |
| locked_sessions=bundle["locked_sessions"], | |
| box_office_data=bundle["box_office_data"], | |
| rule12_enabled=bool(runtime_cfg["rule12_enabled"]), | |
| ) | |
| movie_weights = build_movie_weights(bundle["movies"], movie_targets, bundle["box_office_data"]) | |
| tuning_df = build_default_tuning_table( | |
| movies=bundle["movies"], | |
| movie_targets=movie_targets, | |
| today_eff=today_eff, | |
| next_day_schedule=bundle["next_day_schedule"], | |
| box_office_data=bundle["box_office_data"], | |
| efficiency_enabled=bool(runtime_cfg["efficiency_enabled"]), | |
| rule12_enabled=bool(runtime_cfg["rule12_enabled"]), | |
| daily_delta_cap=int(runtime_cfg.get("eff_daily_delta_cap", 5)), | |
| ) | |
| bundle["today_schedule_excluded_labels"] = selected_labels | |
| bundle["today_schedule"] = filtered_today_schedule | |
| bundle["today_eff"] = today_eff | |
| bundle["movie_targets"] = movie_targets | |
| bundle["movie_weights"] = movie_weights | |
| st.session_state["nextday_loaded_bundle"] = bundle | |
| st.session_state["nextday_tuning_df"] = coerce_tuning_editor_df(tuning_df) | |
| st.session_state.pop("nextday_tuning_editor", None) | |
| st.rerun() | |
| st.markdown("### 微调约束(留空则不生效)") | |
| st.caption("可按影片设置:最多/最少/固定场次、场次占比范围、黄金场次范围。仅勾选行参与约束。") | |
| tune_df_base = st.session_state.get("nextday_tuning_df", pd.DataFrame()) | |
| editor_cache = st.session_state.get("nextday_tuning_editor") | |
| if isinstance(editor_cache, pd.DataFrame) and isinstance(tune_df_base, pd.DataFrame): | |
| same_shape = ( | |
| list(editor_cache.columns) == list(tune_df_base.columns) | |
| and len(editor_cache) == len(tune_df_base) | |
| ) | |
| tune_df = editor_cache.copy() if same_shape else tune_df_base.copy() | |
| else: | |
| tune_df = tune_df_base.copy() if isinstance(tune_df_base, pd.DataFrame) else pd.DataFrame() | |
| if tune_df.empty and isinstance(tune_df_base, pd.DataFrame): | |
| tune_df = tune_df_base.copy() | |
| try: | |
| film_col = st.column_config.TextColumn("影片", pinned="left") | |
| except TypeError: | |
| film_col = st.column_config.TextColumn("影片") | |
| edited_tune_df = st.data_editor( | |
| tune_df, | |
| num_rows="fixed", | |
| use_container_width=True, | |
| hide_index=True, | |
| disabled=["影片", "今日场次", "今日黄金场次", "今日全天效率", "今日黄金效率"], | |
| key="nextday_tuning_editor", | |
| column_order=[ | |
| "选中", | |
| "影片", | |
| "今日场次", | |
| "今日黄金场次", | |
| "今日全天效率", | |
| "今日黄金效率", | |
| "最少场次", | |
| "最多场次", | |
| "固定场次", | |
| "最少黄金场次", | |
| "最多黄金场次", | |
| "最低场次占比", | |
| "最高场次占比", | |
| ], | |
| column_config={ | |
| "选中": st.column_config.CheckboxColumn("选中", default=False), | |
| "影片": film_col, | |
| "今日场次": st.column_config.NumberColumn("今日场次"), | |
| "今日黄金场次": st.column_config.NumberColumn("今日黄金场次"), | |
| "今日全天效率": st.column_config.NumberColumn("今日全天效率", format="%.3f"), | |
| "今日黄金效率": st.column_config.NumberColumn("今日黄金效率", format="%.3f"), | |
| "固定场次": st.column_config.NumberColumn("固定场次", min_value=0, step=1), | |
| "最少场次": st.column_config.NumberColumn("最少场次", min_value=0, step=1), | |
| "最多场次": st.column_config.NumberColumn("最多场次", min_value=0, step=1), | |
| "最少黄金场次": st.column_config.NumberColumn("最少黄金场次", min_value=0, step=1), | |
| "最多黄金场次": st.column_config.NumberColumn("最多黄金场次", min_value=0, step=1), | |
| "最低场次占比": st.column_config.NumberColumn("最低场次占比", min_value=0.0, max_value=100.0, step=0.5), | |
| "最高场次占比": st.column_config.NumberColumn("最高场次占比", min_value=0.0, max_value=100.0, step=0.5), | |
| }, | |
| ) | |
| # 直接保存编辑器结果,避免每轮强制类型转换导致“需编辑两次”现象 | |
| st.session_state["nextday_tuning_df"] = edited_tune_df.copy() | |
| st.markdown("### 后台运算控制") | |
| st.caption("支持暂停、继续、停止;刷新页面后任务仍可持续运行并展示实时进度。") | |
| state_target = str(job_state.get("target_date") or "") | |
| state_status = str(job_state.get("status") or "idle") | |
| progress_ratio = float(job_state.get("progress") or 0.0) | |
| progress_ratio = max(0.0, min(1.0, progress_ratio)) | |
| iter_done = int(job_state.get("iter_done") or 0) | |
| iterations = int(job_state.get("iterations") or 0) | |
| feasible_count = int(job_state.get("feasible_count") or 0) | |
| hard_reject = int(job_state.get("hard_reject") or 0) | |
| build_reject = int(job_state.get("build_reject") or 0) | |
| rule_reject = int(job_state.get("rule_reject") or 0) | |
| elapsed_seconds = float(job_state.get("elapsed_seconds") or 0.0) | |
| reject_reason_top = job_state.get("reject_reason_top") or {} | |
| reject_detail_top = job_state.get("reject_detail_top") or {} | |
| status_msg = str(job_state.get("message") or "") | |
| s1, s2, s3, s4, s5 = st.columns(5) | |
| s1.metric("任务状态", state_status) | |
| s2.metric("当前进度", f"{progress_ratio * 100:.1f}%") | |
| s3.metric("可行方案", feasible_count) | |
| s4.metric("硬性淘汰", hard_reject) | |
| s5.metric("已运行时长", f"{elapsed_seconds:.1f}s") | |
| st.progress(progress_ratio) | |
| if iterations > 0: | |
| st.caption( | |
| f"迭代进度:{iter_done}/{iterations};构造失败 {build_reject},硬规则淘汰 {rule_reject};{status_msg}" | |
| ) | |
| elif status_msg: | |
| st.caption(status_msg) | |
| if reject_reason_top: | |
| reason_df = pd.DataFrame( | |
| [{"淘汰原因": k, "次数": int(v)} for k, v in reject_reason_top.items()] | |
| ) | |
| st.dataframe(reason_df, use_container_width=True, hide_index=True, height=220) | |
| if reject_detail_top: | |
| detail_df = pd.DataFrame( | |
| [{"详细原因": k, "次数": int(v)} for k, v in reject_detail_top.items()] | |
| ) | |
| st.dataframe(detail_df, use_container_width=True, hide_index=True, height=220) | |
| c_run, c_pause, c_resume, c_stop = st.columns(4) | |
| run_disabled = state_status in {"running", "paused"} | |
| do_run = c_run.button("🚀 后台生成 Top10", type="primary", use_container_width=True, disabled=run_disabled) | |
| do_pause = c_pause.button("⏸ 暂停", use_container_width=True, disabled=state_status != "running") | |
| do_resume = c_resume.button("▶️ 继续", use_container_width=True, disabled=state_status != "paused") | |
| do_stop = c_stop.button("⏹ 停止", use_container_width=True, disabled=state_status not in {"running", "paused"}) | |
| if do_pause: | |
| write_job_state(control="pause", message="收到暂停请求") | |
| st.rerun() | |
| if do_resume: | |
| write_job_state(control="run", message="任务继续", status="running") | |
| st.rerun() | |
| if do_stop: | |
| write_job_state(control="stop", message="收到停止请求") | |
| st.rerun() | |
| if do_run: | |
| manual_constraints = parse_movie_tuning_constraints(edited_tune_df) | |
| allowed_movies = extract_allowed_movies_from_tuning_df(edited_tune_df) | |
| # 预售锁定优先:即使未勾选,也必须允许保留已售锁定场次 | |
| allowed_movies |= build_locked_movie_policy_set(bundle.get("locked_sessions", [])) | |
| payload = build_job_payload( | |
| bundle=bundle, | |
| runtime_cfg=runtime_cfg, | |
| manual_constraints=manual_constraints, | |
| allowed_movies=allowed_movies, | |
| ) | |
| ok, msg = start_background_job(payload) | |
| if ok: | |
| st.session_state["nextday_results"] = [] | |
| st.success(msg) | |
| st.rerun() | |
| st.warning(msg) | |
| # 任务运行时自动刷新,展示实时进度 | |
| if state_target == target_str and state_status == "running": | |
| time.sleep(0.8) | |
| st.rerun() | |
| if not bundle and job_state.get("status") in {"running", "paused", "completed", "failed", "stopped"}: | |
| st.markdown("### 后台任务状态") | |
| progress_ratio = float(job_state.get("progress") or 0.0) | |
| progress_ratio = max(0.0, min(1.0, progress_ratio)) | |
| iter_done = int(job_state.get("iter_done") or 0) | |
| iterations = int(job_state.get("iterations") or 0) | |
| feasible_count = int(job_state.get("feasible_count") or 0) | |
| hard_reject = int(job_state.get("hard_reject") or 0) | |
| build_reject = int(job_state.get("build_reject") or 0) | |
| rule_reject = int(job_state.get("rule_reject") or 0) | |
| elapsed_seconds = float(job_state.get("elapsed_seconds") or 0.0) | |
| reject_reason_top = job_state.get("reject_reason_top") or {} | |
| reject_detail_top = job_state.get("reject_detail_top") or {} | |
| status_msg = str(job_state.get("message") or "") | |
| state_status = str(job_state.get("status") or "idle") | |
| state_target = str(job_state.get("target_date") or "") | |
| q1, q2, q3, q4, q5 = st.columns(5) | |
| q1.metric("任务状态", state_status) | |
| q2.metric("目标日期", state_target or "-") | |
| q3.metric("可行方案", feasible_count) | |
| q4.metric("硬性淘汰", hard_reject) | |
| q5.metric("已运行时长", f"{elapsed_seconds:.1f}s") | |
| st.progress(progress_ratio) | |
| if iterations > 0: | |
| st.caption( | |
| f"迭代进度:{iter_done}/{iterations};构造失败 {build_reject},硬规则淘汰 {rule_reject};{status_msg}" | |
| ) | |
| elif status_msg: | |
| st.caption(status_msg) | |
| if reject_reason_top: | |
| reason_df = pd.DataFrame( | |
| [{"淘汰原因": k, "次数": int(v)} for k, v in reject_reason_top.items()] | |
| ) | |
| st.dataframe(reason_df, use_container_width=True, hide_index=True, height=220) | |
| if reject_detail_top: | |
| detail_df = pd.DataFrame( | |
| [{"详细原因": k, "次数": int(v)} for k, v in reject_detail_top.items()] | |
| ) | |
| st.dataframe(detail_df, use_container_width=True, hide_index=True, height=220) | |
| f_pause, f_resume, f_stop = st.columns(3) | |
| if f_pause.button("⏸ 暂停任务", use_container_width=True, disabled=state_status != "running"): | |
| write_job_state(control="pause", message="收到暂停请求") | |
| st.rerun() | |
| if f_resume.button("▶️ 继续任务", use_container_width=True, disabled=state_status != "paused"): | |
| write_job_state(control="run", status="running", message="任务继续") | |
| st.rerun() | |
| if f_stop.button("⏹ 停止任务", use_container_width=True, disabled=state_status not in {"running", "paused"}): | |
| write_job_state(control="stop", message="收到停止请求") | |
| st.rerun() | |
| if state_status == "running": | |
| time.sleep(0.8) | |
| st.rerun() | |
| # 完成态:从后台结果文件恢复展示数据 | |
| if isinstance(job_result, dict) and job_result.get("target_str") == target_str and job_state.get("status") == "completed": | |
| raw_rs = list(job_result.get("results") or []) | |
| st.session_state["nextday_results"] = [x for x in (deserialize_candidate(r) for r in raw_rs) if x is not None] | |
| st.session_state["nextday_all_results"] = int(job_result.get("all_results_count", 0)) | |
| st.session_state["nextday_hard_reject"] = int(job_result.get("hard_reject", 0)) | |
| st.session_state["nextday_build_reject"] = int(job_result.get("build_reject", 0)) | |
| st.session_state["nextday_rule_reject"] = int(job_result.get("rule_reject", 0)) | |
| st.session_state["nextday_elapsed_seconds"] = float(job_result.get("elapsed_seconds", 0.0)) | |
| st.session_state["nextday_reject_reason_stats"] = job_result.get("reject_reason_stats", {}) | |
| st.session_state["nextday_reject_detail_stats"] = job_result.get("reject_detail_stats", {}) | |
| st.session_state["nextday_reject_phase_stats"] = job_result.get("reject_phase_stats", {}) | |
| st.session_state["nextday_reject_examples"] = job_result.get("reject_examples", {}) | |
| st.session_state["nextday_target_date"] = str(job_result.get("target_str", "")) | |
| st.session_state["nextday_locked_count"] = int(job_result.get("locked_count", 0)) | |
| st.session_state["nextday_movie_targets"] = job_result.get("movie_targets", {}) | |
| st.session_state["nextday_today_eff"] = job_result.get("today_eff", pd.DataFrame()) | |
| st.session_state["nextday_golden_start_dt"] = job_result.get("golden_start_dt") | |
| st.session_state["nextday_golden_end_dt"] = job_result.get("golden_end_dt") | |
| st.session_state["nextday_runtime_cfg"] = job_result.get("runtime_cfg", runtime_cfg) | |
| st.session_state["nextday_box_office_data"] = job_result.get("box_office_data", []) | |
| if ( | |
| isinstance(job_result, dict) | |
| and job_result.get("target_str") == target_str | |
| and job_state.get("status") in {"failed", "stopped"} | |
| and job_result.get("reject_reason_stats") | |
| ): | |
| st.markdown("### 最近一次任务淘汰统计") | |
| st.caption(f"运行耗时:{float(job_result.get('elapsed_seconds', 0.0)):.1f}s") | |
| p1, p2 = st.columns(2) | |
| p1.metric("构造阶段失败", int(job_result.get("build_reject", 0))) | |
| p2.metric("硬性规则淘汰", int(job_result.get("rule_reject", 0))) | |
| reason_df = pd.DataFrame( | |
| [{"淘汰原因": k, "次数": int(v)} for k, v in (job_result.get("reject_reason_stats") or {}).items()] | |
| ).sort_values("次数", ascending=False) | |
| st.dataframe(reason_df, use_container_width=True, hide_index=True, height=280) | |
| if job_result.get("reject_detail_stats"): | |
| detail_df = pd.DataFrame( | |
| [{"详细原因": k, "次数": int(v)} for k, v in (job_result.get("reject_detail_stats") or {}).items()] | |
| ).sort_values("次数", ascending=False) | |
| st.dataframe(detail_df, use_container_width=True, hide_index=True, height=280) | |
| raw_results = st.session_state.get("nextday_results", []) | |
| results: List[CandidateResult] = [x for x in (deserialize_candidate(r) for r in (raw_results or [])) if x is not None] | |
| st.session_state["nextday_results"] = results | |
| if results: | |
| st.divider() | |
| target_str = st.session_state.get("nextday_target_date", "") | |
| total_feasible = st.session_state.get("nextday_all_results", 0) | |
| hard_reject = st.session_state.get("nextday_hard_reject", 0) | |
| build_reject = st.session_state.get("nextday_build_reject", 0) | |
| rule_reject = st.session_state.get("nextday_rule_reject", 0) | |
| elapsed_seconds = st.session_state.get("nextday_elapsed_seconds", 0.0) | |
| locked_count = st.session_state.get("nextday_locked_count", 0) | |
| display_runtime_cfg = st.session_state.get("nextday_runtime_cfg", runtime_cfg) | |
| display_box_office_data = st.session_state.get("nextday_box_office_data", []) | |
| reject_reason_stats = st.session_state.get("nextday_reject_reason_stats", {}) | |
| reject_detail_stats = st.session_state.get("nextday_reject_detail_stats", {}) | |
| reject_phase_stats = st.session_state.get("nextday_reject_phase_stats", {}) | |
| reject_examples = st.session_state.get("nextday_reject_examples", {}) | |
| m1, m2, m3, m4, m5 = st.columns(5) | |
| m1.metric("目标排片日期", target_str) | |
| m2.metric("可行方案数", total_feasible) | |
| m3.metric("硬性规则淘汰", hard_reject) | |
| m4.metric("已售锁定场", locked_count) | |
| m5.metric("生成耗时", f"{float(elapsed_seconds):.1f}s") | |
| with st.expander("硬性淘汰统计", expanded=False): | |
| c_stat1, c_stat2 = st.columns(2) | |
| c_stat1.metric("构造阶段失败", int(build_reject)) | |
| c_stat2.metric("硬性规则淘汰", int(rule_reject)) | |
| if reject_phase_stats: | |
| phase_df = pd.DataFrame( | |
| [{"淘汰阶段": k, "次数": int(v)} for k, v in reject_phase_stats.items()] | |
| ).sort_values("次数", ascending=False) | |
| st.dataframe(phase_df, use_container_width=True, hide_index=True) | |
| if reject_reason_stats: | |
| reason_df = pd.DataFrame( | |
| [{"淘汰原因": k, "次数": int(v)} for k, v in reject_reason_stats.items()] | |
| ).sort_values("次数", ascending=False) | |
| st.dataframe(reason_df, use_container_width=True, hide_index=True, height=300) | |
| if reject_detail_stats: | |
| detail_df = pd.DataFrame( | |
| [{"详细原因": k, "次数": int(v)} for k, v in reject_detail_stats.items()] | |
| ).sort_values("次数", ascending=False) | |
| st.dataframe(detail_df, use_container_width=True, hide_index=True, height=300) | |
| if reject_examples: | |
| show_examples: List[str] = [] | |
| for reason, samples in reject_examples.items(): | |
| if not samples: | |
| continue | |
| show_examples.append(f"{reason}:") | |
| for x in samples[:3]: | |
| show_examples.append(f"- {x}") | |
| if show_examples: | |
| st.code("\n".join(show_examples), language="text") | |
| with st.expander("影片目标(由效率规则推导)", expanded=False): | |
| targets = st.session_state.get("nextday_movie_targets", {}) | |
| st.json(targets) | |
| with st.expander("当日效率基准", expanded=False): | |
| today_eff = st.session_state.get("nextday_today_eff") | |
| if isinstance(today_eff, pd.DataFrame) and not today_eff.empty: | |
| st.dataframe(today_eff, use_container_width=True, hide_index=True) | |
| else: | |
| st.info("无当日效率数据") | |
| tabs = st.tabs([f"方案{i+1}|分数 {results[i].score:.1f}" for i in range(len(results))]) | |
| for i, tab in enumerate(tabs): | |
| with tab: | |
| cand = results[i] | |
| c1, c2 = st.columns([2, 1]) | |
| with c1: | |
| render_gantt(cand.schedule, target_str, tab_key=f"{i}") | |
| with c2: | |
| st.caption("评分拆解") | |
| bd_df = pd.DataFrame(cand.score_breakdown, columns=["规则", "分值", "说明"]) | |
| st.dataframe(bd_df, use_container_width=True, hide_index=True, height=280) | |
| st.caption("结果汇总") | |
| today_eff = st.session_state.get("nextday_today_eff") | |
| g_st = st.session_state.get("nextday_golden_start_dt") | |
| g_et = st.session_state.get("nextday_golden_end_dt") | |
| summary_df = build_candidate_summary_table(cand.schedule, today_eff, g_st, g_et) | |
| st.dataframe( | |
| summary_df[["影片", "场次", "今日黄金场次", "全天场次效率", "黄金时段场次效率", "次日场次", "次日黄金时段场次"]], | |
| use_container_width=True, | |
| hide_index=True, | |
| height=340, | |
| ) | |
| st.markdown("#### 🔍 场次合理性检查日志") | |
| log_target_date = datetime.strptime(target_str, "%Y-%m-%d").date() | |
| log_text = generate_schedule_check_logs_text( | |
| schedule=cand.schedule, | |
| target_date=log_target_date, | |
| params=display_runtime_cfg, | |
| today_eff=today_eff if isinstance(today_eff, pd.DataFrame) else pd.DataFrame(), | |
| box_office_data=display_box_office_data, | |
| ) | |
| st.code(log_text, language="text") | |
| if __name__ == "__main__": | |
| main() | |