import argparse import asyncio import os import sys import qrcode from xhs_utils.common_util import init from xhs_utils.session_manager import SessionManager from xhs_utils.state_store import StateStore from xhs_utils.spider import Data_Spider def _ensure_excel_name(args, default_name): if args.save_choice in ("all", "excel") and not args.excel_name: args.excel_name = default_name def cmd_note(args): cookies_str, base_path = init() sm = SessionManager(cookies_file=args.cookies_file) if args.cookies: cookies_str = args.cookies if args.save_cookies and cookies_str: sm.save_to_file(cookies_str) if args.write_env: sm.save_to_env_file(cookies_str, env_file=args.env_file) state = StateStore(args.state_file) if args.resume else None spider = Data_Spider() _ensure_excel_name(args, "note") proxies = None if args.proxy: proxies = {"http": args.proxy, "https": args.proxy} summary = spider.spider_some_note([args.url], cookies_str, base_path, args.save_choice, args.excel_name, proxies=proxies, state_store=state) if isinstance(summary, dict): print(summary) def cmd_user(args): cookies_str, base_path = init() sm = SessionManager(cookies_file=args.cookies_file) if args.cookies: cookies_str = args.cookies if args.save_cookies and cookies_str: sm.save_to_file(cookies_str) if args.write_env: sm.save_to_env_file(cookies_str, env_file=args.env_file) state = StateStore(args.state_file) if args.resume else None spider = Data_Spider() _ensure_excel_name(args, args.url.split("/")[-1].split("?")[0]) proxies = None if args.proxy: proxies = {"http": args.proxy, "https": args.proxy} summary = spider.spider_user_all_note(args.url, cookies_str, base_path, args.save_choice, args.excel_name, proxies=proxies, state_store=state) print(summary) def cmd_search(args): cookies_str, base_path = init() sm = SessionManager(cookies_file=args.cookies_file) if args.cookies: cookies_str = args.cookies if args.save_cookies and cookies_str: sm.save_to_file(cookies_str) if args.write_env: sm.save_to_env_file(cookies_str, env_file=args.env_file) state = StateStore(args.state_file) if args.resume else None spider = Data_Spider() _ensure_excel_name(args, args.query) proxies = None if args.proxy: proxies = {"http": args.proxy, "https": args.proxy} summary = spider.spider_some_search_note( args.query, args.num, cookies_str, base_path, args.save_choice, args.sort, args.note_type, args.note_time, args.note_range, args.pos_distance, geo=None, excel_name=args.excel_name, proxies=proxies, state_store=state, ) print(summary) async def _pc_login_qrcode(save_path: str | None, headless: bool, poll_interval_s: float): try: from apis.xhs_pc_login_apis import XHSLoginApi except Exception as e: raise RuntimeError(f"missing_login_dependency: {e}") login_api = XHSLoginApi() cookies = await login_api.xhsGenerateInitCookies(headless=headless) success, msg, qrcode_dict = await login_api.xhsGenerateQRcode(cookies) if not success: raise RuntimeError(msg) verify_url = qrcode_dict["verify_url"] if not verify_url: raise RuntimeError("verify_url_empty") if save_path: os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True) img = qrcode.make(verify_url) img.save(save_path) else: print(verify_url) while True: success, msg, res = await login_api.xhsCheckQRCodeLogin(qrcode_dict["qr_id"], qrcode_dict["code"], qrcode_dict["cookies"]) if success and res.get("cookies_str"): return res["cookies_str"] await asyncio.sleep(poll_interval_s) def cmd_login_pc_qrcode(args): cookies_str = asyncio.run(_pc_login_qrcode(args.qr_path, args.headless, args.poll_interval)) sm = SessionManager(cookies_file=args.cookies_file) if args.save_cookies: sm.save_to_file(cookies_str) if args.write_env: sm.save_to_env_file(cookies_str, env_file=args.env_file) print(cookies_str) def build_parser(): parser = argparse.ArgumentParser(prog="xhs", add_help=True) parser.add_argument("--cookies", default=None) parser.add_argument("--cookies-file", default=None) parser.add_argument("--save-cookies", action="store_true") parser.add_argument("--write-env", action="store_true") parser.add_argument("--env-file", default=os.path.join(".env")) parser.add_argument("--proxy", default=None) parser.add_argument("--state-file", default=os.path.join("datas", "state.json")) sub = parser.add_subparsers(dest="cmd", required=True) p_note = sub.add_parser("note") p_note.add_argument("--url", required=True) p_note.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"]) p_note.add_argument("--excel-name", default="") p_note.add_argument("--resume", action="store_true") p_note.set_defaults(func=cmd_note) p_user = sub.add_parser("user") p_user.add_argument("--url", required=True) p_user.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"]) p_user.add_argument("--excel-name", default="") p_user.add_argument("--resume", action="store_true") p_user.set_defaults(func=cmd_user) p_search = sub.add_parser("search") p_search.add_argument("--query", required=True) p_search.add_argument("--num", type=int, default=10) p_search.add_argument("--sort", type=int, default=0) p_search.add_argument("--note-type", type=int, default=0) p_search.add_argument("--note-time", type=int, default=0) p_search.add_argument("--note-range", type=int, default=0) p_search.add_argument("--pos-distance", type=int, default=0) p_search.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"]) p_search.add_argument("--excel-name", default="") p_search.add_argument("--resume", action="store_true") p_search.set_defaults(func=cmd_search) p_login = sub.add_parser("login") login_sub = p_login.add_subparsers(dest="login_cmd", required=True) p_login_pc = login_sub.add_parser("pc-qrcode") p_login_pc.add_argument("--qr-path", default=os.path.join("datas", "qrcode.png")) p_login_pc.add_argument("--headless", action="store_true") p_login_pc.add_argument("--poll-interval", type=float, default=1.0) p_login_pc.set_defaults(func=cmd_login_pc_qrcode) return parser def main(argv=None): parser = build_parser() args = parser.parse_args(argv) try: args.func(args) except KeyboardInterrupt: raise except Exception as e: print(str(e), file=sys.stderr) sys.exit(2) if __name__ == "__main__": main()