import json import os import pickle import random import re import threading import time from collections import Counter, defaultdict from dataclasses import dataclass from datetime import date, datetime, time as dt_time, timedelta from typing import Any, Dict, List, Optional, Set, Tuple import pandas as pd import requests import urllib3 from dotenv import load_dotenv from schedule_api_client import clean_movie_title, fetch_hall_info, fetch_schedule_data, get_valid_token urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) load_dotenv() CONFIG_FILE = os.path.join("cinema_cache", "nextday_schedule_optimizer_config.json") JOB_STATE_FILE = os.path.join("cinema_cache", "nextday_optimizer_job_state.json") JOB_PAYLOAD_FILE = os.path.join("cinema_cache", "nextday_optimizer_job_payload.pkl") JOB_RESULT_FILE = os.path.join("cinema_cache", "nextday_optimizer_job_result.pkl") _JOB_THREAD: Optional[threading.Thread] = None DEFAULT_CONFIG: Dict[str, Any] = { "business_start": "09:30", "business_end": "01:30", "turnaround_base": 10, "golden_start": "14:00", "golden_end": "21:00", "efficiency_enabled": True, "efficiency_penalty_coef": 1.0, "eff_daily_delta_cap": 5, "rule1_enabled": True, "rule1_gap": 30, "rule2_enabled": True, "rule2_threshold": 4, "rule2_window_minutes": 30, "rule2_penalty": 15.0, "rule2_exempt_ranges": ["14:00-15:00", "19:00-20:00"], "rule3_enabled": True, "rule3_gap_minutes": 30, "rule3_penalty": 12.0, "rule4_enabled": True, "rule4_earliest": "10:00", "rule4_latest": "22:30", "rule9_enabled": True, "rule9_hot_top_n": 3, "rule9_min_ratio": 0.30, "rule9_penalty": 20.0, "rule11_enabled": True, "rule11_after_time": "22:00", "rule11_penalty": 30.0, "rule12_enabled": True, "rule12_penalty_each": 25.0, "rule13_enabled": True, "rule13_forbidden_halls": ["2", "8", "9"], "tms_allowance": 0, "maintenance_blocks": [], "iterations": 300, "random_seed": 20260331, } @dataclass class RuleContext: target_date: date business_start_dt: datetime business_end_dt: datetime golden_start_dt: datetime golden_end_dt: datetime params: Dict[str, Any] blockouts_by_hall: Dict[str, List[Tuple[datetime, datetime]]] movie_targets: Dict[str, Dict[str, Any]] movie_weights: Dict[str, float] tms_by_hall: Dict[str, List[Dict[str, Any]]] manual_constraints: Dict[str, Dict[str, Optional[float]]] allowed_movies: Set[str] preview_windows_by_identity: Dict[str, List[Tuple[datetime, datetime]]] @dataclass class CandidateResult: schedule: List[Dict[str, Any]] score: float score_breakdown: List[Tuple[str, float, str]] hard_violations: List[str] def serialize_candidate(cand: CandidateResult) -> Dict[str, Any]: return { "schedule": cand.schedule, "score": float(cand.score), "score_breakdown": [list(x) for x in (cand.score_breakdown or [])], "hard_violations": list(cand.hard_violations or []), } def deserialize_candidate(obj: Any) -> Optional[CandidateResult]: if isinstance(obj, CandidateResult): return obj if not isinstance(obj, dict): return None score_breakdown = obj.get("score_breakdown") or [] parsed_bd: List[Tuple[str, float, str]] = [] for x in score_breakdown: if isinstance(x, (list, tuple)) and len(x) >= 3: parsed_bd.append((str(x[0]), float(x[1]), str(x[2]))) return CandidateResult( schedule=list(obj.get("schedule") or []), score=float(obj.get("score") or 0.0), score_breakdown=parsed_bd, hard_violations=list(obj.get("hard_violations") or []), ) def ensure_cache_dir() -> None: os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True) def load_config() -> Dict[str, Any]: ensure_cache_dir() if not os.path.exists(CONFIG_FILE): return dict(DEFAULT_CONFIG) try: with open(CONFIG_FILE, "r", encoding="utf-8") as f: loaded = json.load(f) cfg = dict(DEFAULT_CONFIG) cfg.update(loaded) return cfg except Exception: return dict(DEFAULT_CONFIG) def save_config(cfg: Dict[str, Any]) -> None: ensure_cache_dir() with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(cfg, f, ensure_ascii=False, indent=2) def _atomic_write_json(path: str, payload: Dict[str, Any]) -> None: ensure_cache_dir() tmp = f"{path}.tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) os.replace(tmp, path) def _read_json(path: str, default: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: if default is None: default = {} if not os.path.exists(path): return dict(default) try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): out = dict(default) out.update(data) return out return dict(default) except Exception: return dict(default) def _now_text() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def _default_job_state() -> Dict[str, Any]: return { "status": "idle", "control": "run", # run | pause | stop "job_id": "", "started_at": "", "started_ts": 0.0, "ended_at": "", "updated_at": "", "target_date": "", "iterations": 0, "iter_done": 0, "progress": 0.0, "elapsed_seconds": 0.0, "feasible_count": 0, "hard_reject": 0, "build_reject": 0, "rule_reject": 0, "reject_reason_top": {}, "reject_detail_top": {}, "message": "", "result_count": 0, } def _atomic_write_pickle(path: str, payload: Any) -> None: ensure_cache_dir() tmp = f"{path}.tmp" with open(tmp, "wb") as f: pickle.dump(payload, f) os.replace(tmp, path) def _read_pickle(path: str, default: Any = None) -> Any: if not os.path.exists(path): return default try: with open(path, "rb") as f: return pickle.load(f) except Exception: return default def _find_live_worker() -> Optional[threading.Thread]: global _JOB_THREAD if _JOB_THREAD is not None and _JOB_THREAD.is_alive(): return _JOB_THREAD for t in threading.enumerate(): if t.name == "nextday-opt-worker" and t.is_alive(): _JOB_THREAD = t return t _JOB_THREAD = None return None def read_job_state() -> Dict[str, Any]: return _read_json(JOB_STATE_FILE, _default_job_state()) def write_job_state(**kwargs: Any) -> Dict[str, Any]: state = read_job_state() state.update(kwargs) state["updated_at"] = _now_text() _atomic_write_json(JOB_STATE_FILE, state) return state def parse_hm(hm: str, fallback: str) -> dt_time: raw = str(hm or "").strip() if not raw: raw = fallback try: return datetime.strptime(raw, "%H:%M").time() except Exception: return datetime.strptime(fallback, "%H:%M").time() def hm_str(t: dt_time) -> str: return t.strftime("%H:%M") def parse_operating_dt(d: date, t: dt_time) -> datetime: dt = datetime.combine(d, t) if t < dt_time(6, 0): dt += timedelta(days=1) return dt def ceil_datetime_to_step(dt: datetime, step_minutes: int = 5) -> datetime: aligned = dt.replace(second=0, microsecond=0) if aligned.minute % step_minutes == 0 and dt.second == 0 and dt.microsecond == 0: return aligned add_minutes = (step_minutes - (aligned.minute % step_minutes)) % step_minutes if add_minutes == 0: add_minutes = step_minutes return aligned + timedelta(minutes=add_minutes) def normalize_hall_key(hall_id: Any, hall_name: Any) -> str: if hall_id not in (None, ""): return str(hall_id) if hall_name in (None, ""): return "" nums = re.findall(r"\d+", str(hall_name)) return nums[0] if nums else str(hall_name) def extract_hall_no(raw: Any) -> str: nums = re.findall(r"\d+", str(raw or "")) return nums[0] if nums else str(raw or "") def normalize_media_type(media: Any) -> str: text = str(media or "").upper() if "3D" in text: return "3D" if "2D" in text: return "2D" return "" def movie_policy_key(movie_name: Any, movie_media_type: Any = "") -> str: """ 片名策略键: - 同片不同语言归并(依赖 clean_movie_title 规则) - 2D/3D 分开(若 clean 后未体现 3D,则追加) """ base = clean_movie_title(movie_name or "") media = normalize_media_type(movie_media_type or movie_name) if media == "3D" and "3D" not in str(base).upper(): return f"{base}(数字3D)" return str(base) def tms_missing_pair_key(session: Dict[str, Any]) -> Tuple[str, str, str]: hall_no = extract_hall_no(session.get("hallName") or session.get("hallId")) policy = movie_policy_key(session.get("movieName", ""), session.get("movieMediaType", "")) media = normalize_media_type(session.get("movieMediaType", "")) return hall_no, policy, media def extract_allowed_movies_from_tuning_df(df: pd.DataFrame) -> Set[str]: if df is None or df.empty: return set() out: Set[str] = set() for _, row in df.iterrows(): selected = row.get("选中", False) if pd.notna(selected) and bool(selected): key = movie_policy_key(row.get("影片", "")) if key: out.add(key) return out def normalize_text_token(text: Any) -> str: s = str(text or "") s = clean_movie_title(s) s = re.sub(r"\s+", "", s) s = re.sub(r"[\[\]【】()()·,.,::!!??'\"-]", "", s) return s.upper() def to_float(v: Any, default: float = 0.0) -> float: try: if v in (None, "", "None"): return default return float(v) except Exception: return default def extract_movie_serial_5_8(movie_num: Any) -> str: movie_num_norm = re.sub(r"[^A-Z0-9]", "", str(movie_num or "").upper()) if len(movie_num_norm) >= 8: return movie_num_norm[4:8] return "" def movie_identity_key(movie_num: Any, movie_name: Any) -> str: serial = extract_movie_serial_5_8(movie_num) if serial: return f"serial:{serial}" return f"name:{clean_movie_title(movie_name or '')}" def is_3d_by_movie_num_or_media(movie_num: Any, media: Any) -> bool: movie_num_norm = re.sub(r"[^A-Z0-9]", "", str(movie_num or "").upper()) if len(movie_num_norm) >= 4 and movie_num_norm[3] == "2": return True return "3D" in str(media or "").upper() def extract_box_office_value(item: Dict[str, Any]) -> float: for key in ( "ticketIncome", "splitTicketIncome", "todayTicketIncome", "todayBoxOffice", "boxOffice", "box", "income", "今日票房", "今日票房(不含费)", ): if key not in item: continue raw = item.get(key) if isinstance(raw, str): raw = raw.replace(",", "").strip() try: val = float(raw) if val >= 0: return val except Exception: continue return 0.0 def sort_movies_by_box_office(box_office_data: List[Dict[str, Any]]) -> List[Tuple[str, float]]: score_map: Dict[str, float] = {} order_map: Dict[str, int] = {} for idx, item in enumerate(box_office_data): name = clean_movie_title(item.get("movieName") or item.get("影片名称") or "") if not name: continue val = extract_box_office_value(item) if name not in order_map: order_map[name] = idx score_map[name] = max(score_map.get(name, 0.0), val) if not score_map: return [] if max(score_map.values()) > 0: ranked = sorted(score_map.items(), key=lambda x: x[1], reverse=True) else: ranked = sorted(score_map.items(), key=lambda x: order_map.get(x[0], 99999)) return ranked def resolve_hot_movies( df: pd.DataFrame, box_office_data: List[Dict[str, Any]], top_n: int, ) -> Tuple[List[str], str, List[Tuple[str, float]]]: bo_ranked = sort_movies_by_box_office(box_office_data) if bo_ranked: top_val = bo_ranked[0][1] if top_val > 0: hot = [m for m, v in bo_ranked if v >= top_val * 0.95] else: hot = [m for m, _ in bo_ranked[:top_n]] if not hot: hot = [m for m, _ in bo_ranked[:top_n]] return hot[: max(top_n, len(hot))], "全国大盘票房", bo_ranked counts = df["movieClean"].value_counts() if counts.empty: return [], "无可用数据", [] max_count = int(counts.iloc[0]) hot = counts[counts >= max_count * 0.95].index.tolist() if not hot: hot = counts.head(top_n).index.tolist() fallback_ranked = [(m, float(c)) for m, c in counts.items()] return hot[: max(top_n, len(hot))], "场次数量", fallback_ranked def rule9_core_windows(d: date) -> List[Tuple[dt_time, dt_time]]: weekday = d.weekday() windows = [ [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], [(dt_time(14, 0), dt_time(15, 30)), (dt_time(19, 0), dt_time(22, 20))], [(dt_time(14, 30), dt_time(16, 0)), (dt_time(19, 0), dt_time(21, 40))], [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], [(dt_time(14, 0), dt_time(15, 0)), (dt_time(19, 0), dt_time(22, 0))], [(dt_time(14, 0), dt_time(16, 0)), (dt_time(19, 0), dt_time(22, 0))], [(dt_time(14, 0), dt_time(17, 0)), (dt_time(19, 0), dt_time(21, 30))], ] return windows[weekday] def time_in_ranges(t: dt_time, ranges: List[Tuple[dt_time, dt_time]]) -> bool: for st_t, et_t in ranges: if st_t <= et_t: if st_t <= t < et_t: return True else: if t >= st_t or t < et_t: return True return False def gap_intersects_any_blockout( g_st: datetime, g_et: datetime, blockouts_by_hall: Dict[str, List[Tuple[datetime, datetime]]], ) -> bool: for _, ranges in blockouts_by_hall.items(): for b_st, b_et in ranges: if interval_overlaps(g_st, g_et, b_st, b_et): return True return False def parse_exempt_ranges(items: List[str]) -> List[Tuple[dt_time, dt_time]]: out: List[Tuple[dt_time, dt_time]] = [] for item in items: s = str(item or "").strip() if not s: continue if "-" not in s: continue try: st_s, et_s = s.split("-", 1) out.append((datetime.strptime(st_s.strip(), "%H:%M").time(), datetime.strptime(et_s.strip(), "%H:%M").time())) except Exception: continue return out def in_any_exempt(ts: datetime, ranges: List[Tuple[dt_time, dt_time]]) -> bool: t = ts.time() for st_t, et_t in ranges: if st_t <= et_t: if st_t <= t <= et_t: return True else: if t >= st_t or t <= et_t: return True return False def interval_overlaps(a_st: datetime, a_et: datetime, b_st: datetime, b_et: datetime) -> bool: return not (a_et <= b_st or a_st >= b_et) def gap_intersects_blockout( hall_key: str, g_st: datetime, g_et: datetime, blockouts_by_hall: Dict[str, List[Tuple[datetime, datetime]]], ) -> bool: for b_st, b_et in blockouts_by_hall.get(hall_key, []): if interval_overlaps(g_st, g_et, b_st, b_et): return True return False def parse_blockouts_from_config(target_date: date, raw: Any) -> List[Dict[str, Any]]: if raw in (None, "", []): return [] parsed: List[Dict[str, Any]] if isinstance(raw, str): try: payload = json.loads(raw) parsed = payload if isinstance(payload, list) else [] except Exception: parsed = [] elif isinstance(raw, list): parsed = raw else: parsed = [] result: List[Dict[str, Any]] = [] for item in parsed: if not isinstance(item, dict): continue hall_token = str(item.get("hall") or item.get("hallId") or item.get("hallName") or "").strip() st_s = str(item.get("start") or "").strip() et_s = str(item.get("end") or "").strip() if not hall_token or not st_s or not et_s: continue try: st_t = datetime.strptime(st_s, "%H:%M").time() et_t = datetime.strptime(et_s, "%H:%M").time() st_dt = parse_operating_dt(target_date, st_t) et_dt = parse_operating_dt(target_date, et_t) if et_dt <= st_dt: et_dt += timedelta(days=1) result.append( { "hall_token": hall_token, "start": st_dt, "end": et_dt, } ) except Exception: continue return result def build_hall_blockouts( blockouts: List[Dict[str, Any]], hall_name_map: Dict[Any, str], ) -> Dict[str, List[Tuple[datetime, datetime]]]: out: Dict[str, List[Tuple[datetime, datetime]]] = {str(hid): [] for hid in hall_name_map.keys()} for hid, hname in hall_name_map.items(): hall_key = str(hid) hall_no = extract_hall_no(hname) for b in blockouts: token = str(b["hall_token"]) token_no = extract_hall_no(token) if token in (hall_key, str(hname), hall_no, f"{hall_no}号厅") or token_no == hall_no: out.setdefault(hall_key, []).append((b["start"], b["end"])) for hall_key in out: out[hall_key].sort(key=lambda x: x[0]) return out def is_3d_movie(movie: Dict[str, Any]) -> bool: text = f"{movie.get('movieMediaType', '')} {movie.get('movieName', '')}".upper() return "3D" in text def fetch_movie_info_for_date(show_date: str) -> List[Dict[str, Any]]: token = get_valid_token(force_refresh=False) if not token: return [] def _call(tok: str) -> Tuple[int, Dict[str, Any]]: url = "https://cawapi.yinghezhong.com/show/getMovieInfo" params = {"showDate": show_date, "token": tok, "_": int(time.time() * 1000)} headers = { "Origin": "https://caw.yinghezhong.com", "Referer": "https://caw.yinghezhong.com/", "User-Agent": "Mozilla/5.0", } resp = requests.get(url, params=params, headers=headers, timeout=15) resp.raise_for_status() payload = resp.json() return int(payload.get("code", -1)), payload try: code, payload = _call(token) if code == 1: return payload.get("data", []) or [] if code == 500: token = get_valid_token(force_refresh=True) if not token: return [] code2, payload2 = _call(token) return payload2.get("data", []) if code2 == 1 else [] except Exception: return [] return [] def dedupe_movies_by_policy_key(movies: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 去重规则: - 同片不同语言按同一条处理 - 不同制式(2D/3D)保留 """ out: List[Dict[str, Any]] = [] seen: Set[str] = set() for m in movies: key = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) if not key or key in seen: continue seen.add(key) out.append(m) return out def build_preview_windows_for_movies( target_date: date, movies: List[Dict[str, Any]], ) -> Dict[str, List[Tuple[datetime, datetime]]]: """ previewShowTime 规则: - previewShowTime 为空:不限时段 - previewShowTime 有值且命中 target_date:仅允许落在该日对应时段内开场 - previewShowTime 有值但未命中 target_date:视为该日不限时段 """ out: Dict[str, List[Tuple[datetime, datetime]]] = {} for m in movies: identity = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) if not identity: continue raw_windows = m.get("previewShowTime") or [] if not isinstance(raw_windows, list) or not raw_windows: continue matched_target_date = False allowed: List[Tuple[datetime, datetime]] = [] for w in raw_windows: if not isinstance(w, dict): continue sd_s = str(w.get("startDate") or "").strip() ed_s = str(w.get("endDate") or "").strip() st_s = str(w.get("startTime") or "").strip() et_s = str(w.get("endTime") or "").strip() if not sd_s or not ed_s or not st_s or not et_s: continue try: sd = datetime.strptime(sd_s, "%Y-%m-%d").date() ed = datetime.strptime(ed_s, "%Y-%m-%d").date() if not (sd <= target_date <= ed): continue matched_target_date = True st_t = datetime.strptime(st_s, "%H:%M").time() et_t = datetime.strptime(et_s, "%H:%M").time() st_dt = parse_operating_dt(target_date, st_t) et_dt = parse_operating_dt(target_date, et_t) if et_dt <= st_dt: et_dt += timedelta(days=1) allowed.append((st_dt, et_dt)) except Exception: continue if matched_target_date and allowed: out[identity] = allowed return out def fetch_realtime_box_office(date_str: str) -> List[Dict[str, Any]]: token = get_valid_token(force_refresh=False) if not token: return [] url = "https://app.bi.piao51.cn/cinema-app/market/realtimeDailyBoxOffice.action" params = {"qTime": date_str, "token": token} headers = {"Host": "app.bi.piao51.cn", "User-Agent": "Mozilla/5.0"} try: resp = requests.get(url, params=params, headers=headers, timeout=10) resp.raise_for_status() data = resp.json() if data.get("code") == "A00000": return data.get("results", {}).get("movieDatalist", []) or [] except Exception: return [] return [] def fetch_tms_server_movies_raw() -> List[Dict[str, Any]]: app_secret = os.getenv("TMS_APP_SECRET") ticket = os.getenv("TMS_TICKET") theater_id = int(os.getenv("TMS_THEATER_ID", "0")) x_session_id = os.getenv("TMS_X_SESSION_ID") if not all([app_secret, ticket, theater_id, x_session_id]): return [] try: token_url = f"https://tms.hengdianfilm.com/cinema-api/admin/generateToken?token=hd&murl=ticket={ticket}" token_headers = {"Cookie": f"JSESSIONID={x_session_id}", "Content-Type": "application/json"} token_payload = {"appId": "hd", "appSecret": app_secret, "timeStamp": int(time.time() * 1000)} token_resp = requests.post(token_url, headers=token_headers, json=token_payload, timeout=10) token_resp.raise_for_status() token_data = token_resp.json() auth_token = token_data.get("param") if not auth_token: return [] list_url = "https://tms.hengdianfilm.com/cinema-api/cinema/server/dcp/list" list_headers = {"Token": auth_token, "X-SESSIONID": x_session_id} all_rows: List[Dict[str, Any]] = [] page_index = 1 while True: payload = { "THEATER_ID": theater_id, "SOURCE": "SERVER", "ASSERT_TYPE": 2, "PAGE_CAPACITY": 200, "PAGE_INDEX": page_index, } movie_resp = requests.post( list_url, params={"token": "hd", "murl": "ContentMovie"}, headers=list_headers, json=payload, verify=False, timeout=20, ) movie_resp.raise_for_status() body = movie_resp.json().get("BODY", {}) rows = body.get("LIST", []) or [] if not rows: break all_rows.extend(rows) count = int(body.get("COUNT") or len(all_rows)) if len(all_rows) >= count: break page_index += 1 time.sleep(0.2) return all_rows except Exception: return [] def fetch_schedule_and_halls(show_date: str) -> Tuple[List[Dict[str, Any]], Dict[Any, Any], Optional[str]]: token = get_valid_token(force_refresh=False) if not token: return [], {}, "未获取到有效 token" try: schedule = fetch_schedule_data(token, show_date) halls = fetch_hall_info(token) return schedule or [], halls or {}, None except ValueError: token = get_valid_token(force_refresh=True) if not token: return [], {}, "token 刷新失败" try: schedule = fetch_schedule_data(token, show_date) halls = fetch_hall_info(token) return schedule or [], halls or {}, None except Exception as e: return [], {}, f"重试后仍失败: {e}" except Exception as e: return [], {}, str(e) def build_hall_name_map(next_day_schedule: List[Dict[str, Any]], hall_seat_map: Dict[Any, Any]) -> Dict[Any, str]: hall_name_map: Dict[Any, str] = {} for s in next_day_schedule: hid = s.get("hallId") hname = s.get("hallName") if hid not in (None, "") and hname: hall_name_map[hid] = str(hname) if hall_name_map: return hall_name_map for hid in hall_seat_map.keys(): hall_name_map[hid] = f"{hid}号厅" if not hall_name_map: hall_name_map = {1: "1号厅", 2: "2号厅", 3: "3号厅", 4: "4号厅"} return hall_name_map def session_display_label(session: Dict[str, Any]) -> str: start = str(session.get("showStartTime") or session.get("startTime") or "").strip() hall = str(session.get("hallName") or session.get("hallId") or "").strip() movie = str(session.get("movieName") or "").strip() return f"{start} | {hall} | {movie}" def apply_session_exclusions( schedule_list: List[Dict[str, Any]], excluded_labels: List[str], ) -> List[Dict[str, Any]]: if not schedule_list or not excluded_labels: return list(schedule_list or []) exclude_set = set(str(x).strip() for x in excluded_labels if str(x).strip()) return [s for s in schedule_list if session_display_label(s) not in exclude_set] def build_today_efficiency( today_schedule: List[Dict[str, Any]], hall_seat_map: Dict[Any, Any], golden_start: dt_time, golden_end: dt_time, ) -> pd.DataFrame: if not today_schedule: return pd.DataFrame(columns=["影片", "场次", "场次效率", "黄金场次", "黄金效率", "票房"]) df = pd.DataFrame(today_schedule) if df.empty: return pd.DataFrame(columns=["影片", "场次", "场次效率", "黄金场次", "黄金效率", "票房"]) df["影片"] = df.get("movieName", "").apply(clean_movie_title) df["总收入"] = pd.to_numeric(df.get("soldBoxOffice", 0), errors="coerce").fillna(0) df["放映时间"] = pd.to_datetime(df.get("showStartTime", "00:00"), format="%H:%M", errors="coerce").dt.time by_movie = ( df.groupby("影片", dropna=False) .agg(场次=("影片", "size"), 票房=("总收入", "sum")) .reset_index() ) total_revenue = float(by_movie["票房"].sum()) total_sessions = int(by_movie["场次"].sum()) by_movie["场次效率"] = 0.0 if total_revenue > 0 and total_sessions > 0: by_movie["票房比"] = by_movie["票房"] / total_revenue by_movie["场次比"] = by_movie["场次"] / total_sessions by_movie["场次效率"] = ( (by_movie["票房比"] / by_movie["场次比"]) .replace([float("inf"), -float("inf")], 0) .fillna(0) ) golden_df = df[df["放映时间"].between(golden_start, golden_end, inclusive="both")].copy() if golden_df.empty: by_movie["黄金场次"] = 0 by_movie["黄金效率"] = 0.0 else: g = ( golden_df.groupby("影片", dropna=False) .agg(黄金场次=("影片", "size"), 黄金票房=("总收入", "sum")) .reset_index() ) g_total_revenue = float(g["黄金票房"].sum()) g_total_count = int(g["黄金场次"].sum()) g["黄金效率"] = 0.0 if g_total_revenue > 0 and g_total_count > 0: g["黄金票房比"] = g["黄金票房"] / g_total_revenue g["黄金场次比"] = g["黄金场次"] / g_total_count g["黄金效率"] = ( (g["黄金票房比"] / g["黄金场次比"]) .replace([float("inf"), -float("inf")], 0) .fillna(0) ) by_movie = by_movie.merge(g[["影片", "黄金场次", "黄金效率"]], on="影片", how="left") by_movie["黄金场次"] = by_movie["黄金场次"].fillna(0).astype(int) by_movie["黄金效率"] = by_movie["黄金效率"].fillna(0.0) return by_movie[["影片", "场次", "场次效率", "黄金场次", "黄金效率", "票房"]] def build_locked_sessions(raw_next_day_schedule: List[Dict[str, Any]], target_date: date) -> List[Dict[str, Any]]: locked: List[Dict[str, Any]] = [] for s in raw_next_day_schedule: sold = int(s.get("soldTicketNum") or s.get("buyTicketNum") or 0) if sold <= 0: continue try: st_t = datetime.strptime(str(s.get("showStartTime", "00:00")), "%H:%M").time() et_t = datetime.strptime(str(s.get("showEndTime", "00:00")), "%H:%M").time() except Exception: continue st_dt = parse_operating_dt(target_date, st_t) et_dt = parse_operating_dt(target_date, et_t) if et_dt <= st_dt: et_dt += timedelta(days=1) locked.append( { "hallId": s.get("hallId"), "hallName": s.get("hallName") or f"{s.get('hallId')}号厅", "movieId": s.get("movieId"), "movieNum": s.get("movieNum"), "movieName": s.get("movieName", "未知影片"), "movieDuration": int(s.get("movieLength") or s.get("movieDuration") or 120), "movieMediaType": s.get("movieMediaType", ""), "startTime": st_dt, "endTime": et_dt, "is_presold": True, "sold": sold, } ) return locked def build_tms_index_by_hall(tms_rows: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: by_hall: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for row in tms_rows: halls = row.get("HALL_INFO") or [] if not isinstance(halls, list) or not halls: continue content_name = str(row.get("CONTENT_NAME") or "") assert_name = str(row.get("ASSERT_NAME") or "") assert_id = str(row.get("ASSERT_ID") or "") source_format = str(row.get("SOURCE_FORMAT") or "") entry = { "assert_12": re.sub(r"[^A-Za-z0-9]", "", assert_id).upper()[:12], "name_norm": normalize_text_token(assert_name or content_name), "media": normalize_media_type(source_format), } for hall in halls: hall_key = extract_hall_no(hall.get("HALL_NAME") or hall.get("HALL_ID")) by_hall[hall_key].append(entry) return dict(by_hall) def session_in_tms(session: Dict[str, Any], hall_key: str, tms_by_hall: Dict[str, List[Dict[str, Any]]]) -> bool: if not tms_by_hall: return True entries = tms_by_hall.get(extract_hall_no(hall_key), []) if not entries: return False movie_name_norm = normalize_text_token(session.get("movieName")) movie_num_12 = re.sub(r"[^A-Za-z0-9]", "", str(session.get("movieNum") or "")).upper()[:12] media = normalize_media_type(session.get("movieMediaType")) for e in entries: id_ok = bool(movie_num_12) and movie_num_12 == e.get("assert_12") name_norm = e.get("name_norm") or "" name_ok = movie_name_norm and ( movie_name_norm == name_norm or (movie_name_norm in name_norm) or (name_norm in movie_name_norm) ) media_ok = (not media) or (not e.get("media")) or media == e.get("media") if media_ok and (id_ok or name_ok): return True return False def build_movie_targets( movies: List[Dict[str, Any]], today_eff: pd.DataFrame, locked_sessions: List[Dict[str, Any]], box_office_data: List[Dict[str, Any]], rule12_enabled: bool = True, ) -> Dict[str, Dict[str, Any]]: locked_total = Counter(movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) for s in locked_sessions) locked_golden = Counter() targets: Dict[str, Dict[str, Any]] = {} eff_map = { movie_policy_key(r["影片"]): r for _, r in (today_eff.iterrows() if not today_eff.empty else []) } rank_boost: Dict[str, float] = {} bo_ranked = sort_movies_by_box_office(box_office_data) top10 = {mv for mv, _ in bo_ranked[:10]} top5 = {mv for mv, _ in bo_ranked[:5]} if rule12_enabled else set() for i, (mv, _) in enumerate(bo_ranked[:10], start=1): rank_boost[mv] = max(0.6, 1.6 - 0.1 * i) for m in movies: mv = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) if not mv: continue eff = eff_map.get(mv) today_total = int(eff.get("场次", 0)) if eff is not None else 0 today_golden = int(eff.get("黄金场次", 0)) if eff is not None else 0 fe = float(eff.get("场次效率", 1.0)) if eff is not None else 1.0 ge = float(eff.get("黄金效率", 1.0)) if eff is not None else 1.0 if today_total <= 0: if mv in top10: min_total, max_total = 0, 1 else: min_total, max_total = 0, 0 min_golden = 0 else: if fe > 1.5: min_total, max_total = today_total + 1, today_total + 4 elif fe < 0.5: min_total, max_total = max(0, today_total - 1), max(today_total, 1) else: min_total, max_total = max(0, today_total - 1), today_total + 2 if ge > 1.5: min_golden = today_golden + 1 elif ge < 0.5: min_golden = max(0, today_golden - 1) else: min_golden = max(0, today_golden) if today_golden == 0 and fe > 1.5: min_golden = max(1, min_golden) # 规则十二优先:票房Top5至少给1个黄金场,并保证总场次可容纳 if mv in top5: min_golden = max(1, min_golden) min_total = max(min_total, 1) max_total = max(max_total, 1) lt = int(locked_total.get(mv, 0)) lg = int(locked_golden.get(mv, 0)) min_total = max(min_total, lt) min_golden = max(min_golden, lg) max_total = max(max_total, min_total) targets[mv] = { "min_total": int(min_total), "max_total": int(max_total), "min_golden": int(min_golden), "today_total": int(today_total), "today_golden": int(today_golden), "fe": float(fe), "ge": float(ge), "base_weight": float(rank_boost.get(mv, 1.0)), } return targets def build_movie_weights( movies: List[Dict[str, Any]], movie_targets: Dict[str, Dict[str, Any]], box_office_data: List[Dict[str, Any]], ) -> Dict[str, float]: weights: Dict[str, float] = {} rank_map: Dict[str, int] = {} bo_ranked = sort_movies_by_box_office(box_office_data) for i, (mv, _) in enumerate(bo_ranked[:20], start=1): rank_map[mv] = i for m in movies: mv = movie_policy_key(m.get("movieName", ""), m.get("movieMediaType", "")) if not mv: continue w = 1.0 rank = rank_map.get(mv) if rank is not None: w *= max(0.7, 1.8 - 0.08 * rank) target = movie_targets.get(mv, {}) fe = float(target.get("fe", 1.0) or 1.0) ge = float(target.get("ge", 1.0) or 1.0) if fe > 1.5: w *= 1.2 elif fe < 0.5: w *= 0.85 if ge > 1.5: w *= 1.1 elif ge < 0.5: w *= 0.92 weights[mv] = max(0.1, w) return weights def can_place( session: Dict[str, Any], hall_sessions: List[Dict[str, Any]], all_sessions: List[Dict[str, Any]], turn_min: int, turn_max: int, hall_key: str, ctx: RuleContext, ) -> bool: st_dt = session["startTime"] et_dt = session["endTime"] if et_dt <= st_dt: return False blockouts = ctx.blockouts_by_hall.get(hall_key, []) for b_st, b_et in blockouts: if interval_overlaps(st_dt, et_dt, b_st, b_et): return False same_hall = sorted(hall_sessions, key=lambda x: x["startTime"]) for s in same_hall: if interval_overlaps(st_dt, et_dt, s["startTime"], s["endTime"]): return False prev_session: Optional[Dict[str, Any]] = None next_session: Optional[Dict[str, Any]] = None for s in same_hall: if s["endTime"] <= st_dt: prev_session = s elif s["startTime"] >= et_dt: next_session = s break if prev_session is not None: gap = (st_dt - prev_session["endTime"]).total_seconds() / 60 if gap < turn_min: return False if gap > turn_max and not gap_intersects_blockout(hall_key, prev_session["endTime"], st_dt, ctx.blockouts_by_hall): return False if next_session is not None: gap = (next_session["startTime"] - et_dt).total_seconds() / 60 if gap < turn_min: return False if gap > turn_max and not gap_intersects_blockout(hall_key, et_dt, next_session["startTime"], ctx.blockouts_by_hall): return False if ctx.params["rule1_enabled"]: identity = movie_identity_key(session.get("movieNum"), session.get("movieName")) for s in all_sessions: if movie_identity_key(s.get("movieNum"), s.get("movieName")) != identity: continue gap = abs((s["startTime"] - st_dt).total_seconds()) / 60 if gap < int(ctx.params["rule1_gap"]): return False # 点映时段限制(previewShowTime) identity = movie_policy_key(session.get("movieName", ""), session.get("movieMediaType", "")) if identity in ctx.preview_windows_by_identity: allowed_windows = ctx.preview_windows_by_identity.get(identity, []) if not allowed_windows: return False if not any(w_st <= st_dt <= w_et for w_st, w_et in allowed_windows): return False return True def construct_weight( movie: Dict[str, Any], start_dt: datetime, in_tms: bool, total_counter: Counter, golden_counter: Counter, ctx: RuleContext, ) -> float: mv = movie_policy_key(movie.get("movieName", ""), movie.get("movieMediaType", "")) target = ctx.movie_targets.get(mv, {"min_total": 1, "max_total": 6, "min_golden": 0}) mc = ctx.manual_constraints.get(mv, {}) cur_total = int(total_counter.get(mv, 0)) cur_golden = int(golden_counter.get(mv, 0)) deficit_total = max(0, int(target.get("min_total", 0)) - cur_total) deficit_golden = max(0, int(target.get("min_golden", 0)) - cur_golden) over_total = max(0, cur_total - int(target.get("max_total", cur_total + 10))) is_golden = ctx.golden_start_dt <= start_dt <= ctx.golden_end_dt w = float(ctx.movie_weights.get(mv, target.get("base_weight", 1.0))) w *= 1.0 + deficit_total * 0.7 fixed_sessions = mc.get("fixed_sessions") min_sessions = mc.get("min_sessions") max_sessions = mc.get("max_sessions") min_golden_sessions = mc.get("min_golden_sessions") max_golden_sessions = mc.get("max_golden_sessions") if fixed_sessions is not None: if cur_total < int(fixed_sessions): w *= 1.6 else: w *= 0.12 else: if min_sessions is not None and cur_total < int(min_sessions): w *= 1.3 + max(0, int(min_sessions) - cur_total) * 0.2 if max_sessions is not None and cur_total >= int(max_sessions): w *= 0.1 if is_golden: w *= 1.05 + deficit_golden * 0.65 if min_golden_sessions is not None and cur_golden < int(min_golden_sessions): w *= 1.25 if max_golden_sessions is not None and cur_golden >= int(max_golden_sessions): w *= 0.2 elif deficit_golden > 0: w *= 0.85 if over_total > 0: w *= max(0.2, 0.8 - over_total * 0.15) if in_tms: w *= 1.06 else: w *= 0.75 w *= random.uniform(0.90, 1.15) return max(0.01, w) def simulate_one_candidate( movies: List[Dict[str, Any]], hall_name_map: Dict[Any, str], locked_sessions: List[Dict[str, Any]], ctx: RuleContext, fail_reason_out: Optional[List[str]] = None, ) -> Optional[List[Dict[str, Any]]]: turn_base = int(ctx.params["turnaround_base"]) turn_min = max(1, turn_base - 3) turn_max = max(turn_min, turn_base + 5) schedule = [dict(s) for s in locked_sessions] by_hall: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for s in schedule: by_hall[str(s["hallId"])].append(s) total_counter = Counter(movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) for s in schedule) golden_counter = Counter( movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")) for s in schedule if ctx.golden_start_dt <= s["startTime"] <= ctx.golden_end_dt ) forbidden_set = {extract_hall_no(h) for h in ctx.params["rule13_forbidden_halls"]} missing_tms_pairs: Set[Tuple[str, str, str]] = set() for s in schedule: hall_key = extract_hall_no(s.get("hallId") or s.get("hallName")) if not session_in_tms(s, hall_key, ctx.tms_by_hall): missing_tms_pairs.add(tms_missing_pair_key(s)) if len(missing_tms_pairs) > int(ctx.params["tms_allowance"]): if fail_reason_out is not None: fail_reason_out.append( f"构造前失败:已售锁定场次导致TMS缺片去重 {len(missing_tms_pairs)} 超过允许值 {int(ctx.params['tms_allowance'])}" ) return None min_duration = min( [int(m.get("movieDuration") or 9999) for m in movies if int(m.get("movieDuration") or 0) > 0] or [90] ) hall_items = list(hall_name_map.items()) random.shuffle(hall_items) density_window = int(ctx.params.get("rule2_window_minutes", 30)) density_threshold = int(ctx.params.get("rule2_threshold", 4)) spread_step = max(5, min(20, int(density_window / max(2, density_threshold + 1)))) for hall_idx, (hall_id, hall_name) in enumerate(hall_items): hall_key = str(hall_id) hall_no = extract_hall_no(hall_name or hall_id) hall_sessions = by_hall.get(hall_key, []) blockouts = ctx.blockouts_by_hall.get(hall_key, []) # 各厅首场按步长错峰启动,避免 10:00~10:20 集中扎堆 base_offset = hall_idx * spread_step jitter = random.choice([0, 5, 10]) cursor = ceil_datetime_to_step(ctx.business_start_dt + timedelta(minutes=base_offset + jitter), 5) attempts = 0 while cursor < ctx.business_end_dt and attempts < 1000: attempts += 1 cursor = ceil_datetime_to_step(cursor, 5) occupied = sorted( hall_sessions + [{"startTime": b[0], "endTime": b[1], "is_block": True} for b in blockouts], key=lambda x: x["startTime"], ) next_anchor = None moved = False for item in occupied: if item["endTime"] <= cursor: continue if item["startTime"] <= cursor < item["endTime"]: cursor = item["endTime"] moved = True break if item["startTime"] > cursor: next_anchor = item break if moved: continue if cursor >= ctx.business_end_dt: break gap_end = next_anchor["startTime"] if next_anchor else ctx.business_end_dt if (gap_end - cursor).total_seconds() / 60 < min_duration: cursor += timedelta(minutes=5) continue candidates: List[Tuple[Dict[str, Any], float, bool]] = [] offsets = [0, 5, 10, 15, 20, 25, 30] random.shuffle(offsets) # 所有算法生成场次的开场时间统一按 5 分钟粒度对齐 for movie in movies: mv_policy = movie_policy_key(movie.get("movieName", ""), movie.get("movieMediaType", "")) if ctx.allowed_movies and mv_policy not in ctx.allowed_movies: continue dur = int(movie.get("movieDuration") or 0) if dur <= 0: continue media = movie.get("movieMediaType", "") if ctx.params["rule13_enabled"] and hall_no in forbidden_set and is_3d_by_movie_num_or_media(movie.get("movieNum"), media): continue for off in offsets: st_dt = cursor + timedelta(minutes=off) et_dt = st_dt + timedelta(minutes=dur) if et_dt > gap_end or et_dt > ctx.business_end_dt: continue cand = { "hallId": hall_id, "hallName": hall_name, "movieId": movie.get("movieId"), "movieNum": movie.get("movieNum"), "movieName": movie.get("movieName", "未知影片"), "movieDuration": dur, "movieMediaType": movie.get("movieMediaType", ""), "startTime": st_dt, "endTime": et_dt, "is_presold": False, "sold": 0, } if not can_place( session=cand, hall_sessions=hall_sessions, all_sessions=schedule, turn_min=turn_min, turn_max=turn_max, hall_key=hall_key, ctx=ctx, ): continue in_tms = session_in_tms(cand, hall_no or hall_key, ctx.tms_by_hall) if not in_tms: cand_key = tms_missing_pair_key(cand) if cand_key not in missing_tms_pairs and len(missing_tms_pairs) >= int(ctx.params["tms_allowance"]): continue w = construct_weight(cand, st_dt, in_tms, total_counter, golden_counter, ctx) # 全局开场密度抑制:优先抑制“前30分钟内已过密”的候选 existing_in_window = int( sum( 1 for s in schedule if 0 <= (st_dt - s["startTime"]).total_seconds() / 60 < density_window ) ) if existing_in_window >= density_threshold and ctx.params.get("rule2_enabled", True): continue if existing_in_window > 0: w *= max(0.30, 1.0 - 0.10 * existing_in_window) if existing_in_window >= max(0, density_threshold - 1): overflow = existing_in_window - density_threshold + 1 w *= max(0.05, 1.0 - 0.22 * overflow) candidates.append((cand, w, in_tms)) if not candidates: cursor += timedelta(minutes=5) continue chosen, _, in_tms = random.choices( population=[c[0] for c in candidates], weights=[c[1] for c in candidates], k=1, )[0], None, None for c in candidates: if c[0] is chosen: in_tms = c[2] break schedule.append(chosen) hall_sessions.append(chosen) by_hall[hall_key] = hall_sessions mv_clean = movie_policy_key(chosen.get("movieName", ""), chosen.get("movieMediaType", "")) total_counter[mv_clean] += 1 if ctx.golden_start_dt <= chosen["startTime"] <= ctx.golden_end_dt: golden_counter[mv_clean] += 1 if in_tms is False: missing_tms_pairs.add(tms_missing_pair_key(chosen)) cursor = ceil_datetime_to_step(chosen["endTime"] + timedelta(minutes=turn_min), 5) return schedule def validate_manual_movie_constraints( schedule: List[Dict[str, Any]], constraints: Dict[str, Dict[str, Optional[float]]], ctx: RuleContext, locked_sessions: Optional[List[Dict[str, Any]]] = None, ) -> List[str]: if not constraints: return [] df = pd.DataFrame(schedule).copy() if df.empty: return [] df["movieClean"] = df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) total_sessions = len(df) violations: List[str] = [] locked_total: Counter = Counter() locked_golden: Counter = Counter() if locked_sessions: locked_df = pd.DataFrame(locked_sessions).copy() if not locked_df.empty: locked_df["movieClean"] = locked_df.apply( lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1 ) locked_total = Counter(locked_df["movieClean"].tolist()) locked_golden = Counter( locked_df[ (locked_df["startTime"] >= ctx.golden_start_dt) & (locked_df["startTime"] <= ctx.golden_end_dt) ]["movieClean"].tolist() ) for mv, c in constraints.items(): sub = df[df["movieClean"] == mv] total = int(len(sub)) golden = int( ((sub["startTime"] >= ctx.golden_start_dt) & (sub["startTime"] <= ctx.golden_end_dt)).sum() ) share_pct = (total / total_sessions * 100.0) if total_sessions > 0 else 0.0 golden_ratio_pct = (golden / total * 100.0) if total > 0 else 0.0 fixed_sessions = c.get("fixed_sessions") min_sessions = c.get("min_sessions") max_sessions = c.get("max_sessions") min_share_pct = c.get("min_share_pct") max_share_pct = c.get("max_share_pct") min_golden_sessions = c.get("min_golden_sessions") max_golden_sessions = c.get("max_golden_sessions") min_golden_ratio_pct = c.get("min_golden_ratio_pct") max_golden_ratio_pct = c.get("max_golden_ratio_pct") locked_total_mv = int(locked_total.get(mv, 0)) locked_golden_mv = int(locked_golden.get(mv, 0)) if max_sessions is not None: max_sessions = max(float(max_sessions), float(locked_total_mv)) if max_golden_sessions is not None: max_golden_sessions = max(float(max_golden_sessions), float(locked_golden_mv)) if fixed_sessions is not None and locked_total_mv > int(fixed_sessions): # 预售锁定优先:固定场次不可低于已售锁定 fixed_sessions = float(locked_total_mv) if fixed_sessions is not None and total != int(fixed_sessions): violations.append(f"《{mv}》固定场次要求 {int(fixed_sessions)},当前 {total}") continue if min_sessions is not None and total < int(min_sessions): violations.append(f"《{mv}》次日场次 {total} 低于最少场次 {int(min_sessions)}") if min_share_pct is not None and share_pct < float(min_share_pct): violations.append(f"《{mv}》排片占比 {share_pct:.1f}% 低于 {float(min_share_pct):.1f}%") if max_share_pct is not None and share_pct > float(max_share_pct): violations.append(f"《{mv}》排片占比 {share_pct:.1f}% 高于 {float(max_share_pct):.1f}%") if min_golden_sessions is not None and golden < int(min_golden_sessions): violations.append(f"《{mv}》次日黄金场次 {golden} 低于 {int(min_golden_sessions)}") if min_golden_ratio_pct is not None and total > 0 and golden_ratio_pct < float(min_golden_ratio_pct): violations.append(f"《{mv}》黄金占比 {golden_ratio_pct:.1f}% 低于 {float(min_golden_ratio_pct):.1f}%") if max_golden_ratio_pct is not None and total > 0 and golden_ratio_pct > float(max_golden_ratio_pct): violations.append(f"《{mv}》黄金占比 {golden_ratio_pct:.1f}% 高于 {float(max_golden_ratio_pct):.1f}%") return violations def validate_hard_rules( schedule: List[Dict[str, Any]], locked_sessions: List[Dict[str, Any]], ctx: RuleContext, ) -> List[str]: if not schedule: return ["方案为空"] p = ctx.params turn_base = int(p["turnaround_base"]) turn_min = max(1, turn_base - 3) turn_max = max(turn_min, turn_base + 5) violations: List[str] = [] for s in schedule: st_dt = s["startTime"] et_dt = s["endTime"] if et_dt <= st_dt: violations.append("存在结束时间早于开始时间的场次") break if st_dt < ctx.business_start_dt or et_dt > ctx.business_end_dt: violations.append("存在场次超出营业时间") break by_hall: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for s in schedule: by_hall[str(s.get("hallId"))].append(s) for hall_key, sessions in by_hall.items(): sessions = sorted(sessions, key=lambda x: x["startTime"]) for i in range(1, len(sessions)): a = sessions[i - 1] b = sessions[i] if interval_overlaps(a["startTime"], a["endTime"], b["startTime"], b["endTime"]): violations.append(f"影厅{hall_key}存在场次重叠") break gap = (b["startTime"] - a["endTime"]).total_seconds() / 60 if gap < turn_min: violations.append(f"影厅{hall_key}存在小于{turn_min}分钟的转换间隔") break if gap > turn_max and not gap_intersects_blockout(hall_key, a["endTime"], b["startTime"], ctx.blockouts_by_hall): violations.append(f"影厅{hall_key}存在大于{turn_max}分钟的转换间隔") break if p["rule1_enabled"]: movie_slots: Dict[str, List[datetime]] = defaultdict(list) for s in schedule: identity = movie_identity_key(s.get("movieNum"), s.get("movieName")) movie_slots[identity].append(s["startTime"]) for identity, starts in movie_slots.items(): starts = sorted(starts) for i in range(1, len(starts)): gap = (starts[i] - starts[i - 1]).total_seconds() / 60 if gap < int(p["rule1_gap"]): violations.append(f"同影片开场间隔小于{int(p['rule1_gap'])}分钟({identity})") break if p["rule4_enabled"]: earliest = min(s["startTime"] for s in schedule).time() latest = max(s["startTime"] for s in schedule).time() if earliest > parse_hm(p["rule4_earliest"], "10:00"): violations.append("最早一场晚于规则四阈值") if latest < parse_hm(p["rule4_latest"], "22:30"): violations.append("最晚一场早于规则四阈值") if p["rule13_enabled"]: forbidden_set = {extract_hall_no(h) for h in p["rule13_forbidden_halls"]} for s in schedule: hall_no = extract_hall_no(s.get("hallName") or s.get("hallId")) if hall_no in forbidden_set and is_3d_by_movie_num_or_media(s.get("movieNum"), s.get("movieMediaType", "")): violations.append(f"规则十三违规:{hall_no}号厅出现3D") break if ctx.tms_by_hall: missing_pairs: Set[Tuple[str, str, str]] = set() for s in schedule: hall_no = extract_hall_no(s.get("hallName") or s.get("hallId")) if not session_in_tms(s, hall_no, ctx.tms_by_hall): missing_pairs.add(tms_missing_pair_key(s)) if len(missing_pairs) > int(p["tms_allowance"]): violations.append(f"TMS 缺片场次(同片同厅去重) {len(missing_pairs)},超过允许值 {int(p['tms_allowance'])}") locked_keys = { ( str(s.get("hallId")), movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")), s.get("startTime"), s.get("endTime"), ) for s in locked_sessions } cand_keys = { ( str(s.get("hallId")), movie_policy_key(s.get("movieName", ""), s.get("movieMediaType", "")), s.get("startTime"), s.get("endTime"), ) for s in schedule if s.get("is_presold") } if not locked_keys.issubset(cand_keys): violations.append("存在已售锁定场次被改动") manual_violations = validate_manual_movie_constraints(schedule, ctx.manual_constraints, ctx, locked_sessions) if manual_violations: violations.extend(manual_violations[:20]) return violations def normalize_reject_reason(msg: str) -> str: text = str(msg or "") if not text: return "其他淘汰原因" if "构造失败" in text: return "构造阶段失败" if "存在场次重叠" in text: return "硬规则:影厅场次重叠" if "转换间隔" in text: return "硬规则:影厅场次转换间隔不符" if "同影片开场间隔" in text: return "硬规则:规则一同影片间隔不足" if "最早一场晚于" in text: return "硬规则:规则四最早场过晚" if "最晚一场早于" in text: return "硬规则:规则四最晚场过早" if "规则十三违规" in text: return "硬规则:规则十三禁3D违规" if "TMS 缺片场次" in text: return "硬规则:TMS缺片超限" if "已售锁定场次被改动" in text: return "硬规则:预售锁定场次被改动" if "固定场次要求" in text: return "微调约束:固定场次不满足" if "低于最少场次" in text: return "微调约束:低于最少场次" if "高于最多场次" in text: return "微调约束:高于最多场次" if "排片占比" in text and "低于" in text: return "微调约束:低于最低场次占比" if "排片占比" in text and "高于" in text: return "微调约束:高于最高场次占比" if "黄金场次" in text and "低于" in text: return "微调约束:低于最少黄金场次" if "黄金场次" in text and "高于" in text: return "微调约束:高于最多黄金场次" if "黄金占比" in text and "低于" in text: return "微调约束:低于最低黄金占比" if "黄金占比" in text and "高于" in text: return "微调约束:高于最高黄金占比" if "超出营业时间" in text: return "硬规则:场次超出营业时间" if "结束时间早于开始时间" in text: return "硬规则:结束时间早于开始时间" if "方案为空" in text: return "硬规则:空方案" return "其他淘汰原因" def score_efficiency_rules( sched_df: pd.DataFrame, today_eff: pd.DataFrame, locked_sessions: List[Dict[str, Any]], ctx: RuleContext, ) -> Tuple[float, str]: if today_eff.empty: return 0.0, "无今日效率数据" bonus = 0.0 reason_parts: List[str] = [] golden_mask = (sched_df["startTime"] >= ctx.golden_start_dt) & (sched_df["startTime"] <= ctx.golden_end_dt) sim_total = sched_df.groupby("movieClean").size().to_dict() sim_golden = sched_df[golden_mask].groupby("movieClean").size().to_dict() if locked_sessions: locked_df = pd.DataFrame(locked_sessions) locked_df["movieClean"] = locked_df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) locked_total = locked_df.groupby("movieClean").size().to_dict() else: locked_total = {} for _, row in today_eff.iterrows(): mv = movie_policy_key(row["影片"]) today_total = int(row.get("场次", 0)) today_golden = int(row.get("黄金场次", 0) or 0) fe = float(row.get("场次效率", 0) or 0) ge = float(row.get("黄金效率", 0) or 0) t_total = int(sim_total.get(mv, 0)) t_golden = int(sim_golden.get(mv, 0)) locked_cnt = int(locked_total.get(mv, 0)) if t_total < locked_cnt: t_total = locked_cnt if today_total == 1: if today_golden == 0: if fe > 1.5: bonus += 30 if (t_total >= 2 and t_golden >= 1) else -35 elif fe < 0.5: bonus += 8 if t_total <= 1 else -8 else: bonus += 4 else: if ge > 1.5: bonus += 30 if (t_total >= 2 and t_golden >= 2) else -35 elif ge < 0.5: if locked_cnt >= today_total: reason_parts.append(f"{mv}: 锁定场次不可减黄金场,跳过扣分") else: bonus += 12 if t_golden <= 0 else -16 else: bonus += 5 else: if today_golden == 0: if fe > 1.5: bonus += 22 if (t_total >= today_total + 1 and t_golden >= 1) else -24 elif fe < 0.5: if locked_cnt >= today_total: reason_parts.append(f"{mv}: 锁定场次不可降总量,跳过扣分") else: bonus += 16 if t_total <= max(0, today_total - 1) else -18 else: bonus += 4 else: if fe > 1.5 and ge > 1.5: bonus += 24 if (t_total >= today_total + 1 and t_golden >= today_golden + 1) else -25 elif fe > 1.5 and 0.5 <= ge <= 1.5: bonus += 18 if t_total >= today_total + 1 else -16 elif fe > 1.5 and ge < 0.5: if locked_cnt >= today_golden: reason_parts.append(f"{mv}: 黄金低效但锁定场次不可减,跳过扣分") else: bonus += 12 if (t_total >= today_total + 1 and t_golden <= max(0, today_golden - 1)) else -20 elif 0.5 <= fe <= 1.5 and ge > 1.5: bonus += 14 if t_golden >= today_golden + 1 else -12 elif 0.5 <= fe <= 1.5 and ge < 0.5: if locked_cnt >= today_total: reason_parts.append(f"{mv}: 锁定场次不可减,跳过扣分") else: bonus += 10 if (t_total <= today_total - 1 and t_golden <= max(0, today_golden - 1)) else -14 elif fe < 0.5 and ge > 1.5: if locked_cnt >= today_total: reason_parts.append(f"{mv}: 锁定场次不可降总场,跳过扣分") else: bonus += 9 if (t_total <= max(1, today_total - 1) and t_golden >= today_golden + 1) else -12 elif fe < 0.5 and 0.5 <= ge <= 1.5: if locked_cnt >= today_total: reason_parts.append(f"{mv}: 锁定场次不可降总场,跳过扣分") else: bonus += 8 if t_total <= max(1, today_total - 1) else -10 elif fe < 0.5 and ge < 0.5: if locked_cnt >= today_total: reason_parts.append(f"{mv}: 锁定场次不可降总场,跳过扣分") else: bonus += 12 if (t_total <= max(1, today_total - 1) and t_golden <= max(0, today_golden - 1)) else -15 return bonus, ";".join(reason_parts[:8]) def score_rule2_density(df: pd.DataFrame, ctx: RuleContext) -> Tuple[float, str]: p = ctx.params if not p["rule2_enabled"]: return 0.0, "未启用" deduct = 0.0 starts = sorted(df["startTime"].tolist()) exempt_ranges = parse_exempt_ranges(p["rule2_exempt_ranges"]) for st_dt in starts: we = st_dt + timedelta(minutes=int(p["rule2_window_minutes"])) cnt = int(((df["startTime"] >= st_dt) & (df["startTime"] < we)).sum()) overflow = cnt - int(p["rule2_threshold"]) if overflow > 0 and not in_any_exempt(st_dt, exempt_ranges): deduct += overflow * float(p["rule2_penalty"]) return -deduct, f"过密窗口扣分 {deduct:.1f}" def score_rule3_gap(df: pd.DataFrame, ctx: RuleContext) -> Tuple[float, str]: p = ctx.params if not p["rule3_enabled"]: return 0.0, "未启用" deduct = 0.0 starts = sorted(df["startTime"].tolist()) if len(starts) <= 1: return 0.0, "场次不足" for i in range(len(starts) - 1): gap = (starts[i + 1] - starts[i]).total_seconds() / 60 if gap > int(p["rule3_gap_minutes"]): if gap_intersects_any_blockout(starts[i], starts[i + 1], ctx.blockouts_by_hall): continue overflow = max(1.0, gap - int(p["rule3_gap_minutes"])) deduct += (overflow / 10.0) * float(p["rule3_penalty"]) return -deduct, f"全局开场断档扣分 {deduct:.1f}" def score_rule9_hot_density(df: pd.DataFrame, ctx: RuleContext, box_office_data: List[Dict[str, Any]]) -> Tuple[float, str]: p = ctx.params if not p["rule9_enabled"]: return 0.0, "未启用" windows = rule9_core_windows(ctx.target_date) golden_df = df[df["startTime"].dt.time.apply(lambda t: time_in_ranges(t, windows))] if golden_df.empty: return -float(p["rule9_penalty"]), "核心黄金窗口无场次" hot_movies, source, _ = resolve_hot_movies(df, box_office_data, int(p["rule9_hot_top_n"])) if not hot_movies: return -float(p["rule9_penalty"]), "无热门片可评估" total = len(golden_df) miss = 0 for mv in hot_movies: ratio = float((golden_df["movieClean"] == mv).sum()) / total if ratio < float(p["rule9_min_ratio"]): miss += 1 deduct = miss * float(p["rule9_penalty"]) return -deduct, f"热门片来源:{source},密度不足 {miss} 部" def score_rule11_late_hot(df: pd.DataFrame, ctx: RuleContext, box_office_data: List[Dict[str, Any]]) -> Tuple[float, str]: p = ctx.params if not p["rule11_enabled"]: return 0.0, "未启用" hot_movies, source, bo_ranked = resolve_hot_movies(df, box_office_data, int(p["rule9_hot_top_n"])) top_movies = hot_movies[:3] if hot_movies else [] if bo_ranked and source == "全国大盘票房": top_movies = [m for m, _ in bo_ranked[:3]] hot_movies = top_movies if not hot_movies: return 0.0, "无热门片" after_t = parse_hm(p["rule11_after_time"], "22:00") late_df = df[df["startTime"].dt.time.apply(lambda t: t >= after_t or t < dt_time(6, 0))] if late_df.empty: return -float(p["rule11_penalty"]), "22:00后无场次" late_movies = set(late_df["movieClean"]) if any(m in late_movies for m in hot_movies): return 0.0, f"热门片来源:{source},符合" return -float(p["rule11_penalty"]), f"热门片来源:{source},22:00后无热门片" def score_rule12_top5_golden(df: pd.DataFrame, ctx: RuleContext, box_office_data: List[Dict[str, Any]]) -> Tuple[float, str]: p = ctx.params if not p["rule12_enabled"]: return 0.0, "未启用" bo_ranked = sort_movies_by_box_office(box_office_data) if not bo_ranked: return 0.0, "未获取到次日票房数据" top5 = [m for m, _ in bo_ranked[:5]] golden_movies = set(df[df["startTime"].dt.time.apply(lambda t: dt_time(14, 0) <= t <= dt_time(21, 0))]["movieClean"]) miss = [m for m in top5 if m and m not in golden_movies] deduct = len(miss) * float(p["rule12_penalty_each"]) return -deduct, f"缺黄金场影片 {len(miss)}" def score_manual_upper_constraints( schedule: List[Dict[str, Any]], constraints: Dict[str, Dict[str, Optional[float]]], locked_sessions: List[Dict[str, Any]], ) -> Tuple[float, str]: """ 将“最多场次 / 最多黄金场次”作为软扣分项,不再作为硬淘汰项。 预售锁定优先:若已售锁定本身超过上限,则不对该部分扣分。 """ if not constraints or not schedule: return 0.0, "无" df = pd.DataFrame(schedule).copy() if df.empty: return 0.0, "无" df["movieClean"] = df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) totals = Counter(df["movieClean"].tolist()) golden = Counter( df[df["startTime"].dt.time.apply(lambda t: dt_time(14, 0) <= t <= dt_time(21, 0))]["movieClean"].tolist() ) locked_total: Counter = Counter() locked_golden: Counter = Counter() if locked_sessions: ldf = pd.DataFrame(locked_sessions).copy() if not ldf.empty: ldf["movieClean"] = ldf.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) locked_total = Counter(ldf["movieClean"].tolist()) locked_golden = Counter( ldf[ldf["startTime"].dt.time.apply(lambda t: dt_time(14, 0) <= t <= dt_time(21, 0))]["movieClean"].tolist() ) # 扣分力度:每超1场分别扣 8 / 10 分 penalty = 0.0 lines: List[str] = [] for mv, c in constraints.items(): cur_total = int(totals.get(mv, 0)) cur_golden = int(golden.get(mv, 0)) max_total = c.get("max_sessions") max_golden = c.get("max_golden_sessions") if max_total is not None: eff_max_total = max(int(max_total), int(locked_total.get(mv, 0))) overflow = max(0, cur_total - eff_max_total) if overflow > 0: d = overflow * 8.0 penalty += d lines.append(f"{mv} 超总场 {overflow} 场(-{d:.0f})") if max_golden is not None: eff_max_golden = max(int(max_golden), int(locked_golden.get(mv, 0))) overflow_g = max(0, cur_golden - eff_max_golden) if overflow_g > 0: d = overflow_g * 10.0 penalty += d lines.append(f"{mv} 超黄金场 {overflow_g} 场(-{d:.0f})") return (-penalty, ";".join(lines[:8]) if lines else "满足上限约束") def score_candidate( schedule: List[Dict[str, Any]], ctx: RuleContext, today_eff: pd.DataFrame, locked_sessions: List[Dict[str, Any]], box_office_data: List[Dict[str, Any]], ) -> CandidateResult: if not schedule: return CandidateResult(schedule=[], score=0.0, score_breakdown=[], hard_violations=["空方案"]) df = pd.DataFrame(schedule).sort_values(["startTime", "hallId"]).copy() df["movieClean"] = df.apply(lambda r: movie_policy_key(r.get("movieName", ""), r.get("movieMediaType", "")), axis=1) score = 1000.0 breakdown: List[Tuple[str, float, str]] = [] if ctx.params["efficiency_enabled"]: delta, msg = score_efficiency_rules(df, today_eff, locked_sessions, ctx) penalty_coef = float(ctx.params.get("efficiency_penalty_coef", 1.0) or 1.0) if delta < 0: delta *= max(0.0, penalty_coef) score += delta breakdown.append(("效率分析表", delta, msg or "按18种情况评估")) d2, m2 = score_rule2_density(df, ctx) score += d2 breakdown.append(("规则二", d2, m2)) d3, m3 = score_rule3_gap(df, ctx) score += d3 breakdown.append(("规则三", d3, m3)) d9, m9 = score_rule9_hot_density(df, ctx, box_office_data) score += d9 breakdown.append(("规则九", d9, m9)) d11, m11 = score_rule11_late_hot(df, ctx, box_office_data) score += d11 breakdown.append(("规则十一", d11, m11)) d12, m12 = score_rule12_top5_golden(df, ctx, box_office_data) score += d12 breakdown.append(("规则十二", d12, m12)) d_manual_max, m_manual_max = score_manual_upper_constraints(schedule, ctx.manual_constraints, locked_sessions) score += d_manual_max breakdown.append(("微调上限扣分", d_manual_max, m_manual_max)) return CandidateResult(schedule=schedule, score=score, score_breakdown=breakdown, hard_violations=[]) def _append_rule_logs(parts: List[str], title: str, logs: List[str]) -> None: parts.append(title) if logs: for i, log in enumerate(logs, 1): parts.append(f"{i}. {log}") else: parts.append("(无)") def _hall_display(raw: Any, with_ting: bool = True) -> str: hall_no = extract_hall_no(raw) if not hall_no: return str(raw or "") return f"{hall_no}号厅" if with_ting else f"{hall_no}号" def generate_schedule_check_logs_text( schedule: List[Dict[str, Any]], target_date: date, params: Dict[str, Any], today_eff: pd.DataFrame, box_office_data: List[Dict[str, Any]], ) -> str: if not schedule: return "无排片数据,无法进行合理性检查。" df = pd.DataFrame(schedule).copy() if df.empty: return "无排片数据,无法进行合理性检查。" df["startTime"] = pd.to_datetime(df["startTime"], errors="coerce") df["endTime"] = pd.to_datetime(df["endTime"], errors="coerce") df = df.dropna(subset=["startTime", "endTime"]).sort_values("startTime").reset_index(drop=True) if df.empty: return "无有效排片时间数据,无法进行合理性检查。" df["filmName"] = df["movieName"].astype(str) df["clean_filmName"] = df["filmName"].apply(clean_movie_title) df["simpleHallName"] = df["hallName"].apply(lambda x: _hall_display(x, with_ting=True)) bo_ranked = sort_movies_by_box_office(box_office_data) bo_sorted_movies = [m for m, _ in bo_ranked] movie_box_office = {m: float(v) for m, v in bo_ranked} final_log_parts: List[str] = [] # Rule 1 logs_r1: List[str] = [] gap_limit = int(params.get("rule1_gap", 30)) movie_num_series = df["movieNum"] if "movieNum" in df.columns else pd.Series([""] * len(df), index=df.index) df["movieSerial_5_8"] = movie_num_series.apply(extract_movie_serial_5_8) serial_values = [s for s in df["movieSerial_5_8"].dropna().unique() if str(s).strip()] for serial in serial_values: film_schedules = df[df["movieSerial_5_8"] == serial].sort_values("startTime").reset_index(drop=True) for i in range(len(film_schedules) - 1): s1, s2 = film_schedules.iloc[i], film_schedules.iloc[i + 1] interval = (s2["startTime"] - s1["startTime"]).total_seconds() / 60 if interval < gap_limit: logs_r1.append( f"《{s1['filmName']}》{s1['simpleHallName']}【{s1['startTime'].strftime('%H:%M')}】和 " f"《{s2['filmName']}》{s2['simpleHallName']}【{s2['startTime'].strftime('%H:%M')}】" f"开场时间距离 {int(interval)} 分钟(年度顺序号:{serial})" ) _append_rule_logs( final_log_parts, f"规则一:同影片场次间隔过近(按 movieNum 第5~8位年度顺序号,少于 {gap_limit} 分钟)", logs_r1, ) # Rule 2 logs_r2: List[str] = [] window_minutes = int(params.get("rule2_window_minutes", 30)) threshold = int(params.get("rule2_threshold", 4)) i = 0 processed_indices_r2 = set() while i < len(df): if i in processed_indices_r2: i += 1 continue window_start_time = df.iloc[i]["startTime"] window_end_time = window_start_time + timedelta(minutes=window_minutes) window_df = df[(df["startTime"] >= window_start_time) & (df["startTime"] < window_end_time)] if len(window_df) > threshold: start_t_str = window_df.iloc[0]["startTime"].strftime("%H:%M") end_t_str = window_df.iloc[-1]["startTime"].strftime("%H:%M") lines = [f"【{start_t_str} - {end_t_str}】开场时间比较集中:"] for _, row in window_df.iterrows(): lines.append(f" {row['simpleHallName']}《{row['filmName']}》> {row['startTime'].strftime('%H:%M')}") processed_indices_r2.add(int(row.name)) logs_r2.append("\n".join(lines)) i += 1 _append_rule_logs(final_log_parts, f"\n规则二:{window_minutes} 分钟内影片开场超过 {threshold} 场", logs_r2) # Rule 3 logs_r3: List[str] = [] gap_minutes = int(params.get("rule3_gap_minutes", 30)) if len(df) > 1: for i in range(len(df) - 1): s1_start, s2_start = df.iloc[i]["startTime"], df.iloc[i + 1]["startTime"] gap = (s2_start - s1_start).total_seconds() / 60 if gap > gap_minutes: logs_r3.append(f"【{s1_start.strftime('%H:%M')} ~ {s2_start.strftime('%H:%M')}】缺少影片开场,间隔 {int(gap)} 分钟") _append_rule_logs(final_log_parts, f"\n规则三:场次开场间隔超过 {gap_minutes} 分钟", logs_r3) # Rule 4 logs_r4: List[str] = [] if not df.empty: first_sched = df.iloc[0] last_sched = df.iloc[-1] earliest_limit = parse_hm(params.get("rule4_earliest", "10:00"), "10:00") latest_limit = parse_hm(params.get("rule4_latest", "22:30"), "22:30") if first_sched["startTime"].time() > earliest_limit: logs_r4.append( f"最早一场 {first_sched['simpleHallName']}《{first_sched['filmName']}》{first_sched['startTime'].strftime('%H:%M')} 晚于 {earliest_limit.strftime('%H:%M')}" ) if last_sched["startTime"].time() < latest_limit: logs_r4.append( f"最晚一场 {last_sched['simpleHallName']}《{last_sched['filmName']}》{last_sched['startTime'].strftime('%H:%M')} 早于 {latest_limit.strftime('%H:%M')}" ) _append_rule_logs(final_log_parts, "\n规则四:最早一场晚于 10:00,最晚一场早于 22:30", logs_r4) # Rule 5 logs_r5: List[str] = [] w5_start = datetime.combine(target_date, dt_time(10, 0)) w5_end = datetime.combine(target_date, dt_time(23, 0)) for hall_name in df["simpleHallName"].unique(): hall_df = df[df["simpleHallName"] == hall_name].sort_values("startTime") for i in range(len(hall_df) - 1): prev_end = hall_df.iloc[i]["endTime"] curr_start = hall_df.iloc[i + 1]["startTime"] if prev_end < w5_end and curr_start > w5_start: idle_mins = (curr_start - prev_end).total_seconds() / 60 if idle_mins > 60: logs_r5.append(f"{hall_name.replace('厅', '')} 【{prev_end.strftime('%H:%M')} ~ {curr_start.strftime('%H:%M')}】无影片在播,时长 {int(idle_mins)} 分钟") _append_rule_logs(final_log_parts, "\n规则五:影厅空闲时间超过 1 小时(10:00-23:00)", logs_r5) # Rule 6 logs_r6: List[str] = [] convert_limit = int(params.get("turnaround_base", 10)) for hall_name in df["simpleHallName"].unique(): hall_df = df[df["simpleHallName"] == hall_name].sort_values("startTime") for i in range(len(hall_df) - 1): prev_sched = hall_df.iloc[i] next_sched = hall_df.iloc[i + 1] conversion = (next_sched["startTime"] - prev_sched["endTime"]).total_seconds() / 60 if conversion < convert_limit: logs_r6.append( f"{hall_name.replace('厅', '')} {prev_sched['endTime'].strftime('%H:%M')} 《{prev_sched['filmName']}》结束后影厅空闲时间仅为 {int(conversion)} 分钟" ) _append_rule_logs(final_log_parts, "\n规则六:影厅场次转换时间检查", logs_r6) # Rule 7 logs_r7: List[str] = [] if not df.empty: current_time = df.iloc[0]["startTime"].replace(second=0, microsecond=0) end_time = df.iloc[-1]["endTime"] reported_windows = set() while current_time < end_time: window_end = current_time + timedelta(minutes=10) starts_in_window = df[(df["startTime"] >= current_time) & (df["startTime"] < window_end)] ends_in_window = df[(df["endTime"] > current_time) & (df["endTime"] <= window_end)] if len(starts_in_window) + len(ends_in_window) > 5: window_tuple = (current_time.strftime("%H:%M"), window_end.strftime("%H:%M")) if window_tuple not in reported_windows: exit_halls = "、".join(sorted(set(ends_in_window["simpleHallName"].tolist()))) entry_halls = "、".join(sorted(set(starts_in_window["simpleHallName"].tolist()))) log_msg = f"【{current_time.strftime('%H:%M')} ~ {window_end.strftime('%H:%M')}】" if exit_halls: log_msg += f",{exit_halls}集中散场" if entry_halls: log_msg += ",同时" if exit_halls else "," log_msg += f"{entry_halls}即将入场" log_msg += ",预计人流瞬时压力过大。" logs_r7.append(log_msg) reported_windows.add(window_tuple) current_time += timedelta(minutes=5) start_groups = df.groupby("startTime").filter(lambda x: len(x) > 3) for time_val, group in start_groups.groupby("startTime"): halls = "、".join(sorted(set(group["simpleHallName"].tolist()))) logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时开场,注意预计人流瞬时压力过大。") end_groups = df.groupby("endTime").filter(lambda x: len(x) > 3) for time_val, group in end_groups.groupby("endTime"): halls = "、".join(sorted(set(group["simpleHallName"].tolist()))) logs_r7.append(f"{time_val.strftime('%H:%M')},{halls}电影同时散场,注意预计人流瞬时压力过大。") _append_rule_logs(final_log_parts, "\n规则七:动态散场和入场高峰预警", logs_r7) # Rule 8 logs_r8: List[str] = [] for hall_name in df["simpleHallName"].unique(): hall_df = df[df["simpleHallName"] == hall_name] last_sched = hall_df.nlargest(1, "endTime").iloc[0] if last_sched["endTime"].date() == target_date and last_sched["endTime"].time() < dt_time(22, 30): logs_r8.append(f"{hall_name.replace('厅', '')} 最后一场于【{last_sched['endTime'].strftime('%H:%M')}】结束,过早停运。") _append_rule_logs(final_log_parts, "\n规则八:影厅结束运营过早预警", logs_r8) # Rule 9 logs_r9: List[str] = [] windows = rule9_core_windows(target_date) period_str = " 和 ".join([f"{s.strftime('%H:%M')}-{e.strftime('%H:%M')}" for s, e in windows]) golden_df = df[df["startTime"].apply(lambda x: time_in_ranges(x.time(), windows))] if not golden_df.empty: if bo_sorted_movies: max_bo = float(movie_box_office.get(bo_sorted_movies[0], 0)) if max_bo > 0: hot_films = [m for m, v in movie_box_office.items() if v >= max_bo * 0.95] else: hot_films = bo_sorted_movies[: int(params.get("rule9_hot_top_n", 3))] else: counts = df["clean_filmName"].value_counts() max_count = int(counts.iloc[0]) if not counts.empty else 0 hot_films = counts[counts >= max_count * 0.95].index.tolist() if max_count > 0 else [] min_ratio = float(params.get("rule9_min_ratio", 0.3)) for film in hot_films: ratio = float((golden_df["clean_filmName"] == film).sum()) / max(1, len(golden_df)) if ratio < min_ratio: logs_r9.append(f"《{film}》在核心黄金时段 {period_str} 排片占比仅为{ratio:.1%},低于 {min_ratio:.0%}。") _append_rule_logs(final_log_parts, "\n规则九:黄金时段热门影片排片密度检查", logs_r9) # Rule 10 logs_r10: List[str] = [] if today_eff is not None and not today_eff.empty: tomorrow_stats: Dict[str, Dict[str, int]] = {} for film in df["clean_filmName"].unique(): fdf = df[df["clean_filmName"] == film] tom_total = len(fdf) tom_golden = len(fdf[fdf["startTime"].apply(lambda x: dt_time(14, 0) <= x.time() <= dt_time(21, 0))]) tomorrow_stats[film] = {"total": int(tom_total), "golden": int(tom_golden)} for _, row in today_eff.iterrows(): film = clean_movie_title(row.get("影片", "")) if film not in tomorrow_stats: continue today_total = int(row.get("场次", 0) or 0) today_golden = int(row.get("黄金场次", 0) or 0) fe = float(row.get("场次效率", 0) or 0) ge = float(row.get("黄金效率", 0) or 0) tom_total = int(tomorrow_stats[film]["total"]) tom_golden = int(tomorrow_stats[film]["golden"]) is_valid = True if today_total == 1: if today_golden == 0: if fe > 1.5: is_valid = tom_golden >= 1 and tom_total >= 2 elif fe < 0.5: is_valid = tom_total in [0, 1] else: is_valid = tom_total in [0, 1, 2] else: if ge > 1.5: is_valid = tom_golden >= 2 and tom_total >= 2 elif ge < 0.5: is_valid = tom_golden == 0 and tom_total in [0, 1] else: is_valid = (tom_total, tom_golden) in [(1, 1), (2, 1), (1, 0)] else: if today_golden == 0: if fe > 1.5: is_valid = tom_total > today_total and tom_golden >= 1 elif fe < 0.5: is_valid = tom_total < today_total and tom_golden == 0 else: if fe > 1.5 and ge > 1.5: is_valid = tom_total > today_total and tom_golden >= today_golden + 1 elif fe > 1.5 and 0.5 <= ge <= 1.5: is_valid = tom_total > today_total elif fe > 1.5 and ge < 0.5: is_valid = tom_total > today_total and tom_golden <= max(0, today_golden - 1) elif 0.5 <= fe <= 1.5 and ge > 1.5: is_valid = tom_golden >= today_golden + 1 elif 0.5 <= fe <= 1.5 and ge < 0.5: is_valid = tom_total < today_total and tom_golden <= max(0, today_golden - 1) elif fe < 0.5 and ge > 1.5: is_valid = tom_total <= max(1, today_total - 1) and tom_golden >= today_golden elif fe < 0.5 and 0.5 <= ge <= 1.5: is_valid = tom_total <= max(1, today_total - 1) elif fe < 0.5 and ge < 0.5: is_valid = tom_total <= max(1, today_total - 1) and tom_golden <= max(0, today_golden - 1) if not is_valid: film_rows = df[df["clean_filmName"] == film] locked_cnt = int(film_rows["is_presold"].fillna(False).sum()) if ("is_presold" in film_rows.columns) else 0 # 预售优先:若次日已有预售锁定场次,与效率建议冲突时可忽略 if locked_cnt > 0: continue logs_r10.append(f"《{film}》全天场次效率:{fe:.2f} 黄金时段场次效率:{ge:.2f} 次日的排片不满足要求。") _append_rule_logs(final_log_parts, "\n规则十:次日排片效率匹配度检查", logs_r10) # Rule 11 logs_r11: List[str] = [] if bo_sorted_movies: top_movies = bo_sorted_movies[:3] top_movies_type = "票房排行前三" else: top_movies = df["clean_filmName"].value_counts().head(3).index.tolist() top_movies_type = "排片量前三" if top_movies: after_t = parse_hm(params.get("rule11_after_time", "22:00"), "22:00") late_sessions = df[df["startTime"].apply(lambda t: t.time() >= after_t or t.time() < dt_time(6, 0))] late_movies = set(late_sessions["clean_filmName"].unique()) if not late_sessions.empty else set() if not any(m in late_movies for m in top_movies): top_movies_str = "、".join([f"《{m}》" for m in top_movies]) logs_r11.append(f"{top_movies_type}的影片 {top_movies_str} 在 22:00 后均无场次,建议增加热门影片晚场。") _append_rule_logs(final_log_parts, "\n规则十一:22:00 后热门影片排片检查", logs_r11) # Rule 12 logs_r12: List[str] = [] if bo_sorted_movies: for movie in bo_sorted_movies[:5]: movie_df = df[df["clean_filmName"] == movie] if movie_df.empty: logs_r12.append(f"《{movie}》为次日票房排行前五的影片,但目前未排片。") continue golden_sessions = movie_df[movie_df["startTime"].apply(lambda x: dt_time(14, 0) <= x.time() <= dt_time(21, 0))] if golden_sessions.empty: logs_r12.append(f"《{movie}》为次日票房排行前五的影片,但没有安排黄金场(14:00-21:00)。") else: logs_r12.append("未获取到次日票房数据,无法检查规则十二。") _append_rule_logs(final_log_parts, "\n规则十二:次日票房前五的影片必须有一场黄金场", logs_r12) # Rule 13 logs_r13: List[str] = [] restricted = {extract_hall_no(x) for x in params.get("rule13_forbidden_halls", ["2", "8", "9"])} for _, row in df.iterrows(): hall_no = extract_hall_no(row.get("hallName")) if hall_no in restricted and is_3d_by_movie_num_or_media(row.get("movieNum"), row.get("movieMediaType", "")): logs_r13.append(f"{hall_no}号厅《{row.get('filmName', '未知影片')}》疑似3D排片(movieNum第4位为2)") _append_rule_logs(final_log_parts, "\n规则十三:2号、8号、9号厅禁止3D排片检查(movieNum第4位为2)", logs_r13) return "\n".join(final_log_parts) def schedule_signature(schedule: List[Dict[str, Any]]) -> str: tokens: List[str] = [] for s in sorted( schedule, key=lambda x: ( str(x.get("hallId")), x.get("startTime"), movie_policy_key(x.get("movieName", ""), x.get("movieMediaType", "")), ), ): tokens.append( f"{s.get('hallId')}|{movie_policy_key(s.get('movieName',''), s.get('movieMediaType',''))}|" f"{s.get('startTime').strftime('%H:%M')}|{s.get('endTime').strftime('%H:%M')}" ) return "#".join(tokens) def render_gantt(schedule: List[Dict[str, Any]], date_str: str, tab_key: str) -> None: if not schedule: st.info("无排片数据") return df = pd.DataFrame(schedule).copy() if df.empty: st.info("无排片数据") return df["startTime"] = pd.to_datetime(df["startTime"], errors="coerce") df["endTime"] = pd.to_datetime(df["endTime"], errors="coerce") df = df.dropna(subset=["hallName", "movieName", "startTime", "endTime"]).copy() if df.empty: st.info("无有效排片数据") return def _hall_sort_key(h: Any) -> Tuple[int, str]: nums = re.findall(r"\d+", str(h)) return (int(nums[0]), str(h)) if nums else (9999, str(h)) hall_order = sorted(df["hallName"].astype(str).unique().tolist(), key=_hall_sort_key) t_min = df["startTime"].min().replace(minute=0, second=0, microsecond=0) t_max = (df["endTime"].max() + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) total_minutes = max(60.0, (t_max - t_min).total_seconds() / 60.0) total_hours = max(1, int((t_max - t_min).total_seconds() / 3600)) palette = [ "#2A9D8F", "#E76F51", "#264653", "#F4A261", "#5B8FF9", "#6DC8EC", "#5D7092", "#9270CA", "#FF9D4D", "#269A99", ] movies = sorted(df["movieName"].astype(str).unique().tolist()) color_map = {m: palette[i % len(palette)] for i, m in enumerate(movies)} labels: List[str] = [] for i in range(total_hours + 1): labels.append(f'