import json import time from html import unescape from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional, Union import cloudscraper from bs4 import BeautifulSoup _GALLERY_URL = "https://image-generation.perchance.org/gallery" _PER_PAGE = 200 _HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.0.0 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Referer": "https://image-generation.perchance.org/", "Origin": "https://image-generation.perchance.org", } _VALID_SORT = ("recent", "trending", "top") _VALID_TIME = ("all-time", "1-month") _VALID_FILTER = ("none", "pg13") class GalleryScraper: """ Perchance AI Gallery scraper. Example: result = GalleryScraper(start_page=1, pages=3, sort="top") print(result.data) """ def __init__( self, start_page: int = 1, pages: int = 1, sort: str = "top", time_range: str = "all-time", content_filter: str = "none", concurrency: int = 1, timeout: int = 30, save: Union[bool, str] = False, ): if start_page < 1: raise ValueError("start_page must be >= 1") if pages < 1: raise ValueError("pages must be >= 1") if sort not in _VALID_SORT: raise ValueError(f"sort must be one of {_VALID_SORT}, got '{sort}'") if time_range not in _VALID_TIME: raise ValueError(f"time_range must be one of {_VALID_TIME}, got '{time_range}'") if content_filter not in _VALID_FILTER: raise ValueError(f"content_filter must be one of {_VALID_FILTER}, got '{content_filter}'") if concurrency < 1: concurrency = 1 self.start_page = start_page self.pages = pages self.sort = sort self.time_range = time_range self.content_filter = content_filter self.concurrency = concurrency self.timeout = timeout self.data: list[dict] = [] self.total: int = 0 self.elapsed: float = 0.0 self._log( f"start_page={start_page} pages={pages} concurrency={concurrency} " f"sort={sort} time={time_range} filter={content_filter}" ) self._log("=" * 60) started = time.time() scraper = cloudscraper.create_scraper() raw_pages = self._fetch_all(scraper) self.data = self._parse_all(raw_pages) self.total = len(self.data) self.elapsed = time.time() - started self._log("=" * 60) self._log(f"Done | {self.total} items | {self.elapsed:.2f}s") if save: self._save(save) def _build_params(self, page_index: int) -> dict: skip = page_index * _PER_PAGE params = { "sort": self.sort, "timeRange": self.time_range, "hideIfScoreIsBelow": "-1", "contentFilter": self.content_filter, "subChannel": "public", "channel": "ai-text-to-image-generator", } if skip > 0: params["skip"] = skip return params def _fetch_one( self, scraper: cloudscraper.CloudScraper, page_index: int, ) -> tuple[int, str]: actual_page = self.start_page + page_index skip = (actual_page - 1) * _PER_PAGE self._log(f" [→] Fetching page {actual_page} (skip={skip}) ...") t = time.time() try: resp = scraper.get( _GALLERY_URL, params=self._build_params(actual_page - 1), headers=_HEADERS, timeout=self.timeout, ) except Exception as exc: self._log(f" [✗] Page {actual_page} error: {exc} ({time.time() - t:.2f}s)") return (page_index, "") dt = time.time() - t if resp.status_code != 200: self._log(f" [✗] Page {actual_page} HTTP {resp.status_code} ({dt:.2f}s)") return (page_index, "") self._log(f" [✓] Page {actual_page} OK — {len(resp.text):,} chars ({dt:.2f}s)") return (page_index, resp.text) def _fetch_all(self, scraper: cloudscraper.CloudScraper) -> dict[int, str]: results: dict[int, str] = {} if self.concurrency == 1: for page_index in range(self.pages): idx, html = self._fetch_one(scraper, page_index) results[idx] = html return results with ThreadPoolExecutor(max_workers=self.concurrency) as pool: futures = { pool.submit(self._fetch_one, scraper, page_index): page_index for page_index in range(self.pages) } for future in as_completed(futures): idx, html = future.result() results[idx] = html return results @staticmethod def _clean(value: Optional[str]) -> str: if value is None: return "" return unescape(str(value)).replace("\r", "\n").strip() def _parse_page(self, html: str) -> list[dict]: if not html: return [] soup = BeautifulSoup(html, "html.parser") items: list[dict] = [] for card in soup.select(".imageCtn"): prompt = self._clean(card.get("data-prompt")) negative_prompt = self._clean(card.get("data-negative-prompt")) guidance_scale = self._clean(card.get("data-guidance-scale")) seed = self._clean(card.get("data-seed")) is_nsfw = self._clean(card.get("data-is-nsfw")).lower() == "true" title_attr = self._clean(card.get("data-title")) img_tag = card.select_one(".imageWrapperInner img.image") image_url = img_tag.get("src", "") if img_tag else "" title_el = card.select_one(".image-title") visible_title = self._clean(title_el.get_text(" ", strip=True)) if title_el else "" item = { "no": "", "image_url": image_url, "title": title_attr or visible_title, "prompt": prompt, "guidance_scale": guidance_scale, "seed": seed, "nsfw": is_nsfw, } if negative_prompt: item["negative_prompt"] = negative_prompt items.append(item) return items def _parse_all(self, raw_pages: dict[int, str]) -> list[dict]: all_items: list[dict] = [] for page_index in sorted(raw_pages.keys()): parsed = self._parse_page(raw_pages[page_index]) actual_page = self.start_page + page_index self._log(f" [parse] Page {actual_page} → {len(parsed)} items") all_items.extend(parsed) for idx, item in enumerate(all_items, start=1): item["no"] = idx return all_items def _save(self, save: Union[bool, str]) -> None: out = save if isinstance(save, str) else "gallery_data.json" Path(out).parent.mkdir(parents=True, exist_ok=True) with open(out, "w", encoding="utf-8") as fp: json.dump(self.data, fp, ensure_ascii=False, indent=2) self._log(f"Saved → {out}") @staticmethod def _log(msg: str) -> None: print(msg) def __len__(self) -> int: return self.total def __getitem__(self, index): return self.data[index] def __iter__(self): return iter(self.data) def __repr__(self) -> str: return ( f"GalleryScraper(" f"total={self.total}, " f"pages={self.pages}, " f"start_page={self.start_page}, " f"sort='{self.sort}', " f"elapsed={self.elapsed:.2f}s)" )