| import argparse |
| import asyncio |
| import os |
| import sys |
|
|
| import qrcode |
|
|
| from xhs_utils.common_util import init |
| from xhs_utils.session_manager import SessionManager |
| from xhs_utils.state_store import StateStore |
| from xhs_utils.spider import Data_Spider |
|
|
|
|
| def _ensure_excel_name(args, default_name): |
| if args.save_choice in ("all", "excel") and not args.excel_name: |
| args.excel_name = default_name |
|
|
|
|
| def cmd_note(args): |
| cookies_str, base_path = init() |
| sm = SessionManager(cookies_file=args.cookies_file) |
| if args.cookies: |
| cookies_str = args.cookies |
| if args.save_cookies and cookies_str: |
| sm.save_to_file(cookies_str) |
| if args.write_env: |
| sm.save_to_env_file(cookies_str, env_file=args.env_file) |
|
|
| state = StateStore(args.state_file) if args.resume else None |
| spider = Data_Spider() |
| _ensure_excel_name(args, "note") |
| proxies = None |
| if args.proxy: |
| proxies = {"http": args.proxy, "https": args.proxy} |
| summary = spider.spider_some_note([args.url], cookies_str, base_path, args.save_choice, args.excel_name, proxies=proxies, state_store=state) |
| if isinstance(summary, dict): |
| print(summary) |
|
|
|
|
| def cmd_user(args): |
| cookies_str, base_path = init() |
| sm = SessionManager(cookies_file=args.cookies_file) |
| if args.cookies: |
| cookies_str = args.cookies |
| if args.save_cookies and cookies_str: |
| sm.save_to_file(cookies_str) |
| if args.write_env: |
| sm.save_to_env_file(cookies_str, env_file=args.env_file) |
|
|
| state = StateStore(args.state_file) if args.resume else None |
| spider = Data_Spider() |
| _ensure_excel_name(args, args.url.split("/")[-1].split("?")[0]) |
| proxies = None |
| if args.proxy: |
| proxies = {"http": args.proxy, "https": args.proxy} |
| summary = spider.spider_user_all_note(args.url, cookies_str, base_path, args.save_choice, args.excel_name, proxies=proxies, state_store=state) |
| print(summary) |
|
|
|
|
| def cmd_search(args): |
| cookies_str, base_path = init() |
| sm = SessionManager(cookies_file=args.cookies_file) |
| if args.cookies: |
| cookies_str = args.cookies |
| if args.save_cookies and cookies_str: |
| sm.save_to_file(cookies_str) |
| if args.write_env: |
| sm.save_to_env_file(cookies_str, env_file=args.env_file) |
|
|
| state = StateStore(args.state_file) if args.resume else None |
| spider = Data_Spider() |
| _ensure_excel_name(args, args.query) |
| proxies = None |
| if args.proxy: |
| proxies = {"http": args.proxy, "https": args.proxy} |
| summary = spider.spider_some_search_note( |
| args.query, |
| args.num, |
| cookies_str, |
| base_path, |
| args.save_choice, |
| args.sort, |
| args.note_type, |
| args.note_time, |
| args.note_range, |
| args.pos_distance, |
| geo=None, |
| excel_name=args.excel_name, |
| proxies=proxies, |
| state_store=state, |
| ) |
| print(summary) |
|
|
|
|
| async def _pc_login_qrcode(save_path: str | None, headless: bool, poll_interval_s: float): |
| try: |
| from apis.xhs_pc_login_apis import XHSLoginApi |
| except Exception as e: |
| raise RuntimeError(f"missing_login_dependency: {e}") |
|
|
| login_api = XHSLoginApi() |
| cookies = await login_api.xhsGenerateInitCookies(headless=headless) |
| success, msg, qrcode_dict = await login_api.xhsGenerateQRcode(cookies) |
| if not success: |
| raise RuntimeError(msg) |
|
|
| verify_url = qrcode_dict["verify_url"] |
| if not verify_url: |
| raise RuntimeError("verify_url_empty") |
|
|
| if save_path: |
| os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True) |
| img = qrcode.make(verify_url) |
| img.save(save_path) |
| else: |
| print(verify_url) |
|
|
| while True: |
| success, msg, res = await login_api.xhsCheckQRCodeLogin(qrcode_dict["qr_id"], qrcode_dict["code"], qrcode_dict["cookies"]) |
| if success and res.get("cookies_str"): |
| return res["cookies_str"] |
| await asyncio.sleep(poll_interval_s) |
|
|
|
|
| def cmd_login_pc_qrcode(args): |
| cookies_str = asyncio.run(_pc_login_qrcode(args.qr_path, args.headless, args.poll_interval)) |
| sm = SessionManager(cookies_file=args.cookies_file) |
| if args.save_cookies: |
| sm.save_to_file(cookies_str) |
| if args.write_env: |
| sm.save_to_env_file(cookies_str, env_file=args.env_file) |
| print(cookies_str) |
|
|
|
|
| def build_parser(): |
| parser = argparse.ArgumentParser(prog="xhs", add_help=True) |
| parser.add_argument("--cookies", default=None) |
| parser.add_argument("--cookies-file", default=None) |
| parser.add_argument("--save-cookies", action="store_true") |
| parser.add_argument("--write-env", action="store_true") |
| parser.add_argument("--env-file", default=os.path.join(".env")) |
| parser.add_argument("--proxy", default=None) |
| parser.add_argument("--state-file", default=os.path.join("datas", "state.json")) |
|
|
| sub = parser.add_subparsers(dest="cmd", required=True) |
|
|
| p_note = sub.add_parser("note") |
| p_note.add_argument("--url", required=True) |
| p_note.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"]) |
| p_note.add_argument("--excel-name", default="") |
| p_note.add_argument("--resume", action="store_true") |
| p_note.set_defaults(func=cmd_note) |
|
|
| p_user = sub.add_parser("user") |
| p_user.add_argument("--url", required=True) |
| p_user.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"]) |
| p_user.add_argument("--excel-name", default="") |
| p_user.add_argument("--resume", action="store_true") |
| p_user.set_defaults(func=cmd_user) |
|
|
| p_search = sub.add_parser("search") |
| p_search.add_argument("--query", required=True) |
| p_search.add_argument("--num", type=int, default=10) |
| p_search.add_argument("--sort", type=int, default=0) |
| p_search.add_argument("--note-type", type=int, default=0) |
| p_search.add_argument("--note-time", type=int, default=0) |
| p_search.add_argument("--note-range", type=int, default=0) |
| p_search.add_argument("--pos-distance", type=int, default=0) |
| p_search.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"]) |
| p_search.add_argument("--excel-name", default="") |
| p_search.add_argument("--resume", action="store_true") |
| p_search.set_defaults(func=cmd_search) |
|
|
| p_login = sub.add_parser("login") |
| login_sub = p_login.add_subparsers(dest="login_cmd", required=True) |
| p_login_pc = login_sub.add_parser("pc-qrcode") |
| p_login_pc.add_argument("--qr-path", default=os.path.join("datas", "qrcode.png")) |
| p_login_pc.add_argument("--headless", action="store_true") |
| p_login_pc.add_argument("--poll-interval", type=float, default=1.0) |
| p_login_pc.set_defaults(func=cmd_login_pc_qrcode) |
|
|
| return parser |
|
|
|
|
| def main(argv=None): |
| parser = build_parser() |
| args = parser.parse_args(argv) |
| try: |
| args.func(args) |
| except KeyboardInterrupt: |
| raise |
| except Exception as e: |
| print(str(e), file=sys.stderr) |
| sys.exit(2) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|