File size: 7,129 Bytes
c481f8a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | import argparse
import asyncio
import os
import sys
import qrcode
from xhs_utils.common_util import init
from xhs_utils.session_manager import SessionManager
from xhs_utils.state_store import StateStore
from xhs_utils.spider import Data_Spider
def _ensure_excel_name(args, default_name):
if args.save_choice in ("all", "excel") and not args.excel_name:
args.excel_name = default_name
def cmd_note(args):
cookies_str, base_path = init()
sm = SessionManager(cookies_file=args.cookies_file)
if args.cookies:
cookies_str = args.cookies
if args.save_cookies and cookies_str:
sm.save_to_file(cookies_str)
if args.write_env:
sm.save_to_env_file(cookies_str, env_file=args.env_file)
state = StateStore(args.state_file) if args.resume else None
spider = Data_Spider()
_ensure_excel_name(args, "note")
proxies = None
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
summary = spider.spider_some_note([args.url], cookies_str, base_path, args.save_choice, args.excel_name, proxies=proxies, state_store=state)
if isinstance(summary, dict):
print(summary)
def cmd_user(args):
cookies_str, base_path = init()
sm = SessionManager(cookies_file=args.cookies_file)
if args.cookies:
cookies_str = args.cookies
if args.save_cookies and cookies_str:
sm.save_to_file(cookies_str)
if args.write_env:
sm.save_to_env_file(cookies_str, env_file=args.env_file)
state = StateStore(args.state_file) if args.resume else None
spider = Data_Spider()
_ensure_excel_name(args, args.url.split("/")[-1].split("?")[0])
proxies = None
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
summary = spider.spider_user_all_note(args.url, cookies_str, base_path, args.save_choice, args.excel_name, proxies=proxies, state_store=state)
print(summary)
def cmd_search(args):
cookies_str, base_path = init()
sm = SessionManager(cookies_file=args.cookies_file)
if args.cookies:
cookies_str = args.cookies
if args.save_cookies and cookies_str:
sm.save_to_file(cookies_str)
if args.write_env:
sm.save_to_env_file(cookies_str, env_file=args.env_file)
state = StateStore(args.state_file) if args.resume else None
spider = Data_Spider()
_ensure_excel_name(args, args.query)
proxies = None
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
summary = spider.spider_some_search_note(
args.query,
args.num,
cookies_str,
base_path,
args.save_choice,
args.sort,
args.note_type,
args.note_time,
args.note_range,
args.pos_distance,
geo=None,
excel_name=args.excel_name,
proxies=proxies,
state_store=state,
)
print(summary)
async def _pc_login_qrcode(save_path: str | None, headless: bool, poll_interval_s: float):
try:
from apis.xhs_pc_login_apis import XHSLoginApi
except Exception as e:
raise RuntimeError(f"missing_login_dependency: {e}")
login_api = XHSLoginApi()
cookies = await login_api.xhsGenerateInitCookies(headless=headless)
success, msg, qrcode_dict = await login_api.xhsGenerateQRcode(cookies)
if not success:
raise RuntimeError(msg)
verify_url = qrcode_dict["verify_url"]
if not verify_url:
raise RuntimeError("verify_url_empty")
if save_path:
os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True)
img = qrcode.make(verify_url)
img.save(save_path)
else:
print(verify_url)
while True:
success, msg, res = await login_api.xhsCheckQRCodeLogin(qrcode_dict["qr_id"], qrcode_dict["code"], qrcode_dict["cookies"])
if success and res.get("cookies_str"):
return res["cookies_str"]
await asyncio.sleep(poll_interval_s)
def cmd_login_pc_qrcode(args):
cookies_str = asyncio.run(_pc_login_qrcode(args.qr_path, args.headless, args.poll_interval))
sm = SessionManager(cookies_file=args.cookies_file)
if args.save_cookies:
sm.save_to_file(cookies_str)
if args.write_env:
sm.save_to_env_file(cookies_str, env_file=args.env_file)
print(cookies_str)
def build_parser():
parser = argparse.ArgumentParser(prog="xhs", add_help=True)
parser.add_argument("--cookies", default=None)
parser.add_argument("--cookies-file", default=None)
parser.add_argument("--save-cookies", action="store_true")
parser.add_argument("--write-env", action="store_true")
parser.add_argument("--env-file", default=os.path.join(".env"))
parser.add_argument("--proxy", default=None)
parser.add_argument("--state-file", default=os.path.join("datas", "state.json"))
sub = parser.add_subparsers(dest="cmd", required=True)
p_note = sub.add_parser("note")
p_note.add_argument("--url", required=True)
p_note.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"])
p_note.add_argument("--excel-name", default="")
p_note.add_argument("--resume", action="store_true")
p_note.set_defaults(func=cmd_note)
p_user = sub.add_parser("user")
p_user.add_argument("--url", required=True)
p_user.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"])
p_user.add_argument("--excel-name", default="")
p_user.add_argument("--resume", action="store_true")
p_user.set_defaults(func=cmd_user)
p_search = sub.add_parser("search")
p_search.add_argument("--query", required=True)
p_search.add_argument("--num", type=int, default=10)
p_search.add_argument("--sort", type=int, default=0)
p_search.add_argument("--note-type", type=int, default=0)
p_search.add_argument("--note-time", type=int, default=0)
p_search.add_argument("--note-range", type=int, default=0)
p_search.add_argument("--pos-distance", type=int, default=0)
p_search.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"])
p_search.add_argument("--excel-name", default="")
p_search.add_argument("--resume", action="store_true")
p_search.set_defaults(func=cmd_search)
p_login = sub.add_parser("login")
login_sub = p_login.add_subparsers(dest="login_cmd", required=True)
p_login_pc = login_sub.add_parser("pc-qrcode")
p_login_pc.add_argument("--qr-path", default=os.path.join("datas", "qrcode.png"))
p_login_pc.add_argument("--headless", action="store_true")
p_login_pc.add_argument("--poll-interval", type=float, default=1.0)
p_login_pc.set_defaults(func=cmd_login_pc_qrcode)
return parser
def main(argv=None):
parser = build_parser()
args = parser.parse_args(argv)
try:
args.func(args)
except KeyboardInterrupt:
raise
except Exception as e:
print(str(e), file=sys.stderr)
sys.exit(2)
if __name__ == "__main__":
main()
|