File size: 7,129 Bytes
c481f8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import argparse
import asyncio
import os
import sys

import qrcode

from xhs_utils.common_util import init
from xhs_utils.session_manager import SessionManager
from xhs_utils.state_store import StateStore
from xhs_utils.spider import Data_Spider


def _ensure_excel_name(args, default_name):
    if args.save_choice in ("all", "excel") and not args.excel_name:
        args.excel_name = default_name


def cmd_note(args):
    cookies_str, base_path = init()
    sm = SessionManager(cookies_file=args.cookies_file)
    if args.cookies:
        cookies_str = args.cookies
    if args.save_cookies and cookies_str:
        sm.save_to_file(cookies_str)
        if args.write_env:
            sm.save_to_env_file(cookies_str, env_file=args.env_file)

    state = StateStore(args.state_file) if args.resume else None
    spider = Data_Spider()
    _ensure_excel_name(args, "note")
    proxies = None
    if args.proxy:
        proxies = {"http": args.proxy, "https": args.proxy}
    summary = spider.spider_some_note([args.url], cookies_str, base_path, args.save_choice, args.excel_name, proxies=proxies, state_store=state)
    if isinstance(summary, dict):
        print(summary)


def cmd_user(args):
    cookies_str, base_path = init()
    sm = SessionManager(cookies_file=args.cookies_file)
    if args.cookies:
        cookies_str = args.cookies
    if args.save_cookies and cookies_str:
        sm.save_to_file(cookies_str)
        if args.write_env:
            sm.save_to_env_file(cookies_str, env_file=args.env_file)

    state = StateStore(args.state_file) if args.resume else None
    spider = Data_Spider()
    _ensure_excel_name(args, args.url.split("/")[-1].split("?")[0])
    proxies = None
    if args.proxy:
        proxies = {"http": args.proxy, "https": args.proxy}
    summary = spider.spider_user_all_note(args.url, cookies_str, base_path, args.save_choice, args.excel_name, proxies=proxies, state_store=state)
    print(summary)


def cmd_search(args):
    cookies_str, base_path = init()
    sm = SessionManager(cookies_file=args.cookies_file)
    if args.cookies:
        cookies_str = args.cookies
    if args.save_cookies and cookies_str:
        sm.save_to_file(cookies_str)
        if args.write_env:
            sm.save_to_env_file(cookies_str, env_file=args.env_file)

    state = StateStore(args.state_file) if args.resume else None
    spider = Data_Spider()
    _ensure_excel_name(args, args.query)
    proxies = None
    if args.proxy:
        proxies = {"http": args.proxy, "https": args.proxy}
    summary = spider.spider_some_search_note(
        args.query,
        args.num,
        cookies_str,
        base_path,
        args.save_choice,
        args.sort,
        args.note_type,
        args.note_time,
        args.note_range,
        args.pos_distance,
        geo=None,
        excel_name=args.excel_name,
        proxies=proxies,
        state_store=state,
    )
    print(summary)


async def _pc_login_qrcode(save_path: str | None, headless: bool, poll_interval_s: float):
    try:
        from apis.xhs_pc_login_apis import XHSLoginApi
    except Exception as e:
        raise RuntimeError(f"missing_login_dependency: {e}")

    login_api = XHSLoginApi()
    cookies = await login_api.xhsGenerateInitCookies(headless=headless)
    success, msg, qrcode_dict = await login_api.xhsGenerateQRcode(cookies)
    if not success:
        raise RuntimeError(msg)

    verify_url = qrcode_dict["verify_url"]
    if not verify_url:
        raise RuntimeError("verify_url_empty")

    if save_path:
        os.makedirs(os.path.dirname(os.path.abspath(save_path)), exist_ok=True)
        img = qrcode.make(verify_url)
        img.save(save_path)
    else:
        print(verify_url)

    while True:
        success, msg, res = await login_api.xhsCheckQRCodeLogin(qrcode_dict["qr_id"], qrcode_dict["code"], qrcode_dict["cookies"])
        if success and res.get("cookies_str"):
            return res["cookies_str"]
        await asyncio.sleep(poll_interval_s)


def cmd_login_pc_qrcode(args):
    cookies_str = asyncio.run(_pc_login_qrcode(args.qr_path, args.headless, args.poll_interval))
    sm = SessionManager(cookies_file=args.cookies_file)
    if args.save_cookies:
        sm.save_to_file(cookies_str)
        if args.write_env:
            sm.save_to_env_file(cookies_str, env_file=args.env_file)
    print(cookies_str)


def build_parser():
    parser = argparse.ArgumentParser(prog="xhs", add_help=True)
    parser.add_argument("--cookies", default=None)
    parser.add_argument("--cookies-file", default=None)
    parser.add_argument("--save-cookies", action="store_true")
    parser.add_argument("--write-env", action="store_true")
    parser.add_argument("--env-file", default=os.path.join(".env"))
    parser.add_argument("--proxy", default=None)
    parser.add_argument("--state-file", default=os.path.join("datas", "state.json"))

    sub = parser.add_subparsers(dest="cmd", required=True)

    p_note = sub.add_parser("note")
    p_note.add_argument("--url", required=True)
    p_note.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"])
    p_note.add_argument("--excel-name", default="")
    p_note.add_argument("--resume", action="store_true")
    p_note.set_defaults(func=cmd_note)

    p_user = sub.add_parser("user")
    p_user.add_argument("--url", required=True)
    p_user.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"])
    p_user.add_argument("--excel-name", default="")
    p_user.add_argument("--resume", action="store_true")
    p_user.set_defaults(func=cmd_user)

    p_search = sub.add_parser("search")
    p_search.add_argument("--query", required=True)
    p_search.add_argument("--num", type=int, default=10)
    p_search.add_argument("--sort", type=int, default=0)
    p_search.add_argument("--note-type", type=int, default=0)
    p_search.add_argument("--note-time", type=int, default=0)
    p_search.add_argument("--note-range", type=int, default=0)
    p_search.add_argument("--pos-distance", type=int, default=0)
    p_search.add_argument("--save-choice", default="all", choices=["all", "excel", "media", "media-video", "media-image"])
    p_search.add_argument("--excel-name", default="")
    p_search.add_argument("--resume", action="store_true")
    p_search.set_defaults(func=cmd_search)

    p_login = sub.add_parser("login")
    login_sub = p_login.add_subparsers(dest="login_cmd", required=True)
    p_login_pc = login_sub.add_parser("pc-qrcode")
    p_login_pc.add_argument("--qr-path", default=os.path.join("datas", "qrcode.png"))
    p_login_pc.add_argument("--headless", action="store_true")
    p_login_pc.add_argument("--poll-interval", type=float, default=1.0)
    p_login_pc.set_defaults(func=cmd_login_pc_qrcode)

    return parser


def main(argv=None):
    parser = build_parser()
    args = parser.parse_args(argv)
    try:
        args.func(args)
    except KeyboardInterrupt:
        raise
    except Exception as e:
        print(str(e), file=sys.stderr)
        sys.exit(2)


if __name__ == "__main__":
    main()