_dash_dev / gallery_scraper.py
Adarshu07's picture
Update gallery_scraper.py
70b2a49 verified
import json
import time
from html import unescape
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional, Union
import cloudscraper
from bs4 import BeautifulSoup
_GALLERY_URL = "https://image-generation.perchance.org/gallery"
_PER_PAGE = 200
_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://image-generation.perchance.org/",
"Origin": "https://image-generation.perchance.org",
}
_VALID_SORT = ("recent", "trending", "top")
_VALID_TIME = ("all-time", "1-month")
_VALID_FILTER = ("none", "pg13")
class GalleryScraper:
"""
Perchance AI Gallery scraper.
Example:
result = GalleryScraper(start_page=1, pages=3, sort="top")
print(result.data)
"""
def __init__(
self,
start_page: int = 1,
pages: int = 1,
sort: str = "top",
time_range: str = "all-time",
content_filter: str = "none",
concurrency: int = 1,
timeout: int = 30,
save: Union[bool, str] = False,
):
if start_page < 1:
raise ValueError("start_page must be >= 1")
if pages < 1:
raise ValueError("pages must be >= 1")
if sort not in _VALID_SORT:
raise ValueError(f"sort must be one of {_VALID_SORT}, got '{sort}'")
if time_range not in _VALID_TIME:
raise ValueError(f"time_range must be one of {_VALID_TIME}, got '{time_range}'")
if content_filter not in _VALID_FILTER:
raise ValueError(f"content_filter must be one of {_VALID_FILTER}, got '{content_filter}'")
if concurrency < 1:
concurrency = 1
self.start_page = start_page
self.pages = pages
self.sort = sort
self.time_range = time_range
self.content_filter = content_filter
self.concurrency = concurrency
self.timeout = timeout
self.data: list[dict] = []
self.total: int = 0
self.elapsed: float = 0.0
self._log(
f"start_page={start_page} pages={pages} concurrency={concurrency} "
f"sort={sort} time={time_range} filter={content_filter}"
)
self._log("=" * 60)
started = time.time()
scraper = cloudscraper.create_scraper()
raw_pages = self._fetch_all(scraper)
self.data = self._parse_all(raw_pages)
self.total = len(self.data)
self.elapsed = time.time() - started
self._log("=" * 60)
self._log(f"Done | {self.total} items | {self.elapsed:.2f}s")
if save:
self._save(save)
def _build_params(self, page_index: int) -> dict:
skip = page_index * _PER_PAGE
params = {
"sort": self.sort,
"timeRange": self.time_range,
"hideIfScoreIsBelow": "-1",
"contentFilter": self.content_filter,
"subChannel": "public",
"channel": "ai-text-to-image-generator",
}
if skip > 0:
params["skip"] = skip
return params
def _fetch_one(
self,
scraper: cloudscraper.CloudScraper,
page_index: int,
) -> tuple[int, str]:
actual_page = self.start_page + page_index
skip = (actual_page - 1) * _PER_PAGE
self._log(f" [β†’] Fetching page {actual_page} (skip={skip}) ...")
t = time.time()
try:
resp = scraper.get(
_GALLERY_URL,
params=self._build_params(actual_page - 1),
headers=_HEADERS,
timeout=self.timeout,
)
except Exception as exc:
self._log(f" [βœ—] Page {actual_page} error: {exc} ({time.time() - t:.2f}s)")
return (page_index, "")
dt = time.time() - t
if resp.status_code != 200:
self._log(f" [βœ—] Page {actual_page} HTTP {resp.status_code} ({dt:.2f}s)")
return (page_index, "")
self._log(f" [βœ“] Page {actual_page} OK β€” {len(resp.text):,} chars ({dt:.2f}s)")
return (page_index, resp.text)
def _fetch_all(self, scraper: cloudscraper.CloudScraper) -> dict[int, str]:
results: dict[int, str] = {}
if self.concurrency == 1:
for page_index in range(self.pages):
idx, html = self._fetch_one(scraper, page_index)
results[idx] = html
return results
with ThreadPoolExecutor(max_workers=self.concurrency) as pool:
futures = {
pool.submit(self._fetch_one, scraper, page_index): page_index
for page_index in range(self.pages)
}
for future in as_completed(futures):
idx, html = future.result()
results[idx] = html
return results
@staticmethod
def _clean(value: Optional[str]) -> str:
if value is None:
return ""
return unescape(str(value)).replace("\r", "\n").strip()
def _parse_page(self, html: str) -> list[dict]:
if not html:
return []
soup = BeautifulSoup(html, "html.parser")
items: list[dict] = []
for card in soup.select(".imageCtn"):
prompt = self._clean(card.get("data-prompt"))
negative_prompt = self._clean(card.get("data-negative-prompt"))
guidance_scale = self._clean(card.get("data-guidance-scale"))
seed = self._clean(card.get("data-seed"))
is_nsfw = self._clean(card.get("data-is-nsfw")).lower() == "true"
title_attr = self._clean(card.get("data-title"))
img_tag = card.select_one(".imageWrapperInner img.image")
image_url = img_tag.get("src", "") if img_tag else ""
title_el = card.select_one(".image-title")
visible_title = self._clean(title_el.get_text(" ", strip=True)) if title_el else ""
item = {
"no": "",
"image_url": image_url,
"title": title_attr or visible_title,
"prompt": prompt,
"guidance_scale": guidance_scale,
"seed": seed,
"nsfw": is_nsfw,
}
if negative_prompt:
item["negative_prompt"] = negative_prompt
items.append(item)
return items
def _parse_all(self, raw_pages: dict[int, str]) -> list[dict]:
all_items: list[dict] = []
for page_index in sorted(raw_pages.keys()):
parsed = self._parse_page(raw_pages[page_index])
actual_page = self.start_page + page_index
self._log(f" [parse] Page {actual_page} β†’ {len(parsed)} items")
all_items.extend(parsed)
for idx, item in enumerate(all_items, start=1):
item["no"] = idx
return all_items
def _save(self, save: Union[bool, str]) -> None:
out = save if isinstance(save, str) else "gallery_data.json"
Path(out).parent.mkdir(parents=True, exist_ok=True)
with open(out, "w", encoding="utf-8") as fp:
json.dump(self.data, fp, ensure_ascii=False, indent=2)
self._log(f"Saved β†’ {out}")
@staticmethod
def _log(msg: str) -> None:
print(msg)
def __len__(self) -> int:
return self.total
def __getitem__(self, index):
return self.data[index]
def __iter__(self):
return iter(self.data)
def __repr__(self) -> str:
return (
f"GalleryScraper("
f"total={self.total}, "
f"pages={self.pages}, "
f"start_page={self.start_page}, "
f"sort='{self.sort}', "
f"elapsed={self.elapsed:.2f}s)"
)