Adarshu07 commited on
Commit
925e697
Β·
verified Β·
1 Parent(s): a99e3b0

Create gallery_scraper.py

Browse files
Files changed (1) hide show
  1. gallery_scraper.py +234 -0
gallery_scraper.py ADDED
@@ -0,0 +1,234 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import time
3
+ from html import unescape
4
+ from pathlib import Path
5
+ from concurrent.futures import ThreadPoolExecutor, as_completed
6
+ from typing import Optional, Union
7
+
8
+ import cloudscraper
9
+ from bs4 import BeautifulSoup
10
+
11
+
12
+ _GALLERY_URL = "https://image-generation.perchance.org/gallery"
13
+ _PER_PAGE = 200
14
+
15
+ _HEADERS = {
16
+ "User-Agent": (
17
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
18
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
19
+ "Chrome/145.0.0.0 Safari/537.36"
20
+ ),
21
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
22
+ "Referer": "https://image-generation.perchance.org/",
23
+ "Origin": "https://image-generation.perchance.org",
24
+ }
25
+
26
+ _VALID_SORT = ("recent", "trending", "top")
27
+ _VALID_TIME = ("all-time", "1-month")
28
+ _VALID_FILTER = ("none", "pg13")
29
+
30
+
31
+ class GalleryScraper:
32
+ """
33
+ Scraper for Perchance AI Gallery.
34
+
35
+ Example:
36
+ scraper = GalleryScraper(pages=3, sort="top")
37
+ print(scraper.data)
38
+ """
39
+
40
+ def __init__(
41
+ self,
42
+ pages: int = 1,
43
+ sort: str = "top",
44
+ time_range: str = "all-time",
45
+ content_filter: str = "none",
46
+ concurrency: int = 1,
47
+ timeout: int = 30,
48
+ save: Union[bool, str] = False,
49
+ ):
50
+ if sort not in _VALID_SORT:
51
+ raise ValueError(f"sort must be one of {_VALID_SORT}, got '{sort}'")
52
+ if time_range not in _VALID_TIME:
53
+ raise ValueError(f"time_range must be one of {_VALID_TIME}, got '{time_range}'")
54
+ if content_filter not in _VALID_FILTER:
55
+ raise ValueError(f"content_filter must be one of {_VALID_FILTER}, got '{content_filter}'")
56
+ if pages < 1:
57
+ raise ValueError("pages must be >= 1")
58
+
59
+ self.pages = pages
60
+ self.sort = sort
61
+ self.time_range = time_range
62
+ self.content_filter = content_filter
63
+ self.concurrency = max(1, concurrency)
64
+ self.timeout = timeout
65
+
66
+ self.data: list[dict] = []
67
+ self.total: int = 0
68
+ self.elapsed: float = 0.0
69
+
70
+ self._log(
71
+ f"pages={pages} concurrency={self.concurrency} "
72
+ f"sort={sort} time={time_range} filter={content_filter}"
73
+ )
74
+ self._log("=" * 60)
75
+
76
+ start = time.time()
77
+ scraper = cloudscraper.create_scraper()
78
+
79
+ raw_pages = self._fetch_all(scraper)
80
+ self.data = self._parse_all(raw_pages)
81
+ self.total = len(self.data)
82
+
83
+ self.elapsed = time.time() - start
84
+ self._log("=" * 60)
85
+ self._log(f"Done | {self.total} items | {self.elapsed:.2f}s")
86
+
87
+ if save:
88
+ self._save(save)
89
+
90
+ def _build_params(self, page: int) -> dict:
91
+ skip = page * _PER_PAGE
92
+ params = {
93
+ "sort": self.sort,
94
+ "timeRange": self.time_range,
95
+ "hideIfScoreIsBelow": "-1",
96
+ "contentFilter": self.content_filter,
97
+ "subChannel": "public",
98
+ "channel": "ai-text-to-image-generator",
99
+ }
100
+ if skip > 0:
101
+ params["skip"] = skip
102
+ return params
103
+
104
+ def _fetch_one(self, scraper: cloudscraper.CloudScraper, page: int) -> tuple[int, str]:
105
+ skip = page * _PER_PAGE
106
+ self._log(f" [β†’] Fetching page {page + 1} (skip={skip}) ...")
107
+ t = time.time()
108
+
109
+ try:
110
+ resp = scraper.get(
111
+ _GALLERY_URL,
112
+ params=self._build_params(page),
113
+ headers=_HEADERS,
114
+ timeout=self.timeout,
115
+ )
116
+ except Exception as exc:
117
+ self._log(f" [βœ—] Page {page + 1} error: {exc} ({time.time() - t:.2f}s)")
118
+ return (page, "")
119
+
120
+ dt = time.time() - t
121
+ if resp.status_code != 200:
122
+ self._log(f" [βœ—] Page {page + 1} HTTP {resp.status_code} ({dt:.2f}s)")
123
+ return (page, "")
124
+
125
+ self._log(f" [βœ“] Page {page + 1} OK β€” {len(resp.text):,} chars ({dt:.2f}s)")
126
+ return (page, resp.text)
127
+
128
+ def _fetch_all(self, scraper: cloudscraper.CloudScraper) -> dict[int, str]:
129
+ results: dict[int, str] = {}
130
+
131
+ if self.concurrency == 1:
132
+ for pg in range(self.pages):
133
+ page, html = self._fetch_one(scraper, pg)
134
+ results[page] = html
135
+ return results
136
+
137
+ with ThreadPoolExecutor(max_workers=self.concurrency) as pool:
138
+ futures = {
139
+ pool.submit(self._fetch_one, scraper, pg): pg
140
+ for pg in range(self.pages)
141
+ }
142
+ for future in as_completed(futures):
143
+ pg, html = future.result()
144
+ results[pg] = html
145
+
146
+ return results
147
+
148
+ @staticmethod
149
+ def _clean(value: Optional[str]) -> str:
150
+ if value is None:
151
+ return ""
152
+ return unescape(str(value)).replace("\r", "\n").strip()
153
+
154
+ def _parse_page(self, html: str) -> list[dict]:
155
+ if not html:
156
+ return []
157
+
158
+ soup = BeautifulSoup(html, "html.parser")
159
+ items: list[dict] = []
160
+
161
+ for card in soup.select(".imageCtn"):
162
+ prompt = self._clean(card.get("data-prompt"))
163
+ negative_prompt = self._clean(card.get("data-negative-prompt"))
164
+ guidance_scale = self._clean(card.get("data-guidance-scale"))
165
+ seed = self._clean(card.get("data-seed"))
166
+ is_nsfw = self._clean(card.get("data-is-nsfw")).lower() == "true"
167
+ title_attr = self._clean(card.get("data-title"))
168
+
169
+ img_tag = card.select_one(".imageWrapperInner img.image")
170
+ image_url = img_tag.get("src", "") if img_tag else ""
171
+
172
+ title_el = card.select_one(".image-title")
173
+ visible_title = self._clean(title_el.get_text(" ", strip=True)) if title_el else ""
174
+
175
+ item = {
176
+ "no": "",
177
+ "image_url": image_url,
178
+ "title": title_attr or visible_title,
179
+ "prompt": prompt,
180
+ "guidance_scale": guidance_scale,
181
+ "seed": seed,
182
+ "nsfw": is_nsfw,
183
+ }
184
+
185
+ if negative_prompt:
186
+ item["negative_prompt"] = negative_prompt
187
+
188
+ items.append(item)
189
+
190
+ return items
191
+
192
+ def _parse_all(self, raw_pages: dict[int, str]) -> list[dict]:
193
+ all_items: list[dict] = []
194
+
195
+ for pg in sorted(raw_pages.keys()):
196
+ parsed = self._parse_page(raw_pages[pg])
197
+ self._log(f" [parse] Page {pg + 1} β†’ {len(parsed)} items")
198
+ all_items.extend(parsed)
199
+
200
+ for idx, item in enumerate(all_items, start=1):
201
+ item["no"] = idx
202
+
203
+ return all_items
204
+
205
+ def _save(self, save: Union[bool, str]) -> None:
206
+ out = save if isinstance(save, str) else "gallery_data.json"
207
+ Path(out).parent.mkdir(parents=True, exist_ok=True)
208
+
209
+ with open(out, "w", encoding="utf-8") as fp:
210
+ json.dump(self.data, fp, ensure_ascii=False, indent=2)
211
+
212
+ self._log(f"Saved β†’ {out}")
213
+
214
+ @staticmethod
215
+ def _log(msg: str) -> None:
216
+ print(msg)
217
+
218
+ def __len__(self) -> int:
219
+ return self.total
220
+
221
+ def __getitem__(self, index):
222
+ return self.data[index]
223
+
224
+ def __iter__(self):
225
+ return iter(self.data)
226
+
227
+ def __repr__(self) -> str:
228
+ return (
229
+ f"GalleryScraper("
230
+ f"total={self.total}, "
231
+ f"pages={self.pages}, "
232
+ f"sort='{self.sort}', "
233
+ f"elapsed={self.elapsed:.2f}s)"
234
+ )