XHS / cli.py
Trae Bot
Upload Spider_XHS project
c481f8a
import argparse
import asyncio
import os
import sys
import qrcode
from xhs_utils.common_util import init
from xhs_utils.session_manager import SessionManager
from xhs_utils.state_store import StateStore
from xhs_utils.spider import Data_Spider
def _ensure_excel_name(args, default_name):
if args.save_choice in ("all", "excel") and not args.excel_name:
args.excel_name = default_name
def cmd_note(args):
cookies_str, base_path = init()
sm = SessionManager(cookies_file=args.cookies_file)
if args.cookies:
cookies_str = args.cookies
if args.save_cookies and cookies_str:
sm.save_to_file(cookies_str)
if args.write_env:
sm.save_to_env_file(cookies_str, env_file=args.env_file)
state = StateStore(args.state_file) if args.resume else None
spider = Data_Spider()
_ensure_excel_name(args, "note")
proxies = None
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
summary = spider.spider_some_note([args.url], cookies_str, base_path, args.save_choice, args.excel_name, proxies=proxies, state_store=state)
if isinstance(summary, dict):
print(summary)
def cmd_user(args):
cookies_str, base_path = init()
sm = SessionManager(cookies_file=args.cookies_file)
if args.cookies:
cookies_str = args.cookies
if args.save_cookies and cookies_str:
sm.save_to_file(cookies_str)
if args.write_env:
sm.save_to_env_file(cookies_str, env_file=args.env_file)
state = StateStore(args.state_file) if args.resume else None
spider = Data_Spider()
_ensure_excel_name(args, args.url.split("/")[-1].split("?")[0])
proxies = None
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
summary = spider.spider_user_all_note(args.url, cookies_str, base_path, args.save_choice, args.excel_name, proxies=proxies, state_store=state)
print(summary)
def cmd_search(args):
cookies_str, base_path = init()
sm = SessionManager(cookies_file=args.cookies_file)
if args.cookies:
cookies_str = args.cookies
if args.save_cookies and cookies_str:
sm.save_to_file(cookies_str)
if args.write_env:
sm.save_to_env_file(cookies_str, env_file=args.env_file)
state = StateStore(args.state_file) if args.resume else None
spider = Data_Spider()
_ensure_excel_name(args, args.query)
proxies = None
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
summary = spider.spider_some_search_note(
args.query,
args.num,
cookies_str,
base_path,
args.save_choice,
args.sort,
args.note_type,
args.note_time,
args.note_range,
args.pos_distance,
geo=None,
excel_name=args.excel_name,
proxies=proxies,
state_store=state,
)
print(summary)
async def _pc_login_qrcode(save_path: str | None, headless: bool, poll_interval_s: float):
try:
from apis.xhs_pc_login_apis import XHSLoginApi
except Exception as e:
raise RuntimeError(f"missing_login_dependency: {e}")
login_api = XHSLoginApi()
cookies = await login_api.xhsGenerateInitCookies(headless=headless)
success, msg, qrcode_dict = await login_api.xhsGenerateQRcode(cookies)
if not success:
raise RuntimeError(msg)
verify_url = qrcode_dict["verify_url"]
if not verify_url:
raise RuntimeError("verify_url_empty")
if save_path:
os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True)
img = qrcode.make(verify_url)
img.save(save_path)
else:
print(verify_url)
while True:
success, msg, res = await login_api.xhsCheckQRCodeLogin(qrcode_dict["qr_id"], qrcode_dict["code"], qrcode_dict["cookies"])
if success and res.get("cookies_str"):
return res["cookies_str"]
await asyncio.sleep(poll_interval_s)
def cmd_login_pc_qrcode(args):
cookies_str = asyncio.run(_pc_login_qrcode(args.qr_path, args.headless, args.poll_interval))
sm = SessionManager(cookies_file=args.cookies_file)
if args.save_cookies:
sm.save_to_file(cookies_str)
if args.write_env:
sm.save_to_env_file(cookies_str, env_file=args.env_file)
print(cookies_str)
def build_parser():
parser = argparse.ArgumentParser(prog="xhs", add_help=True)
parser.add_argument("--cookies", default=None)
parser.add_argument("--cookies-file", default=None)
parser.add_argument("--save-cookies", action="store_true")
parser.add_argument("--write-env", action="store_true")
parser.add_argument("--env-file", default=os.path.join(".env"))
parser.add_argument("--proxy", default=None)
parser.add_argument("--state-file", default=os.path.join("datas", "state.json"))
sub = parser.add_subparsers(dest="cmd", required=True)
p_note = sub.add_parser("note")
p_note.add_argument("--url", required=True)
p_note.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"])
p_note.add_argument("--excel-name", default="")
p_note.add_argument("--resume", action="store_true")
p_note.set_defaults(func=cmd_note)
p_user = sub.add_parser("user")
p_user.add_argument("--url", required=True)
p_user.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"])
p_user.add_argument("--excel-name", default="")
p_user.add_argument("--resume", action="store_true")
p_user.set_defaults(func=cmd_user)
p_search = sub.add_parser("search")
p_search.add_argument("--query", required=True)
p_search.add_argument("--num", type=int, default=10)
p_search.add_argument("--sort", type=int, default=0)
p_search.add_argument("--note-type", type=int, default=0)
p_search.add_argument("--note-time", type=int, default=0)
p_search.add_argument("--note-range", type=int, default=0)
p_search.add_argument("--pos-distance", type=int, default=0)
p_search.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"])
p_search.add_argument("--excel-name", default="")
p_search.add_argument("--resume", action="store_true")
p_search.set_defaults(func=cmd_search)
p_login = sub.add_parser("login")
login_sub = p_login.add_subparsers(dest="login_cmd", required=True)
p_login_pc = login_sub.add_parser("pc-qrcode")
p_login_pc.add_argument("--qr-path", default=os.path.join("datas", "qrcode.png"))
p_login_pc.add_argument("--headless", action="store_true")
p_login_pc.add_argument("--poll-interval", type=float, default=1.0)
p_login_pc.set_defaults(func=cmd_login_pc_qrcode)
return parser
def main(argv=None):
parser = build_parser()
args = parser.parse_args(argv)
try:
args.func(args)
except KeyboardInterrupt:
raise
except Exception as e:
print(str(e), file=sys.stderr)
sys.exit(2)
if __name__ == "__main__":
main()