| import os |
| from typing import Literal, Optional |
|
|
| from fastapi import FastAPI, Query, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
|
|
| from gallery_scraper import GalleryScraper |
|
|
| app = FastAPI( |
| title="Perchance Gallery API", |
| version="1.0.0", |
| description="FastAPI server for Perchance gallery scraping", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| @app.get("/") |
| def root(): |
| return { |
| "ok": True, |
| "service": "Perchance Gallery API", |
| "endpoints": { |
| "/api/gallery": "Fetch gallery data", |
| "/health": "Health check", |
| }, |
| } |
|
|
|
|
| @app.get("/health") |
| def health(): |
| return {"status": "ok"} |
|
|
|
|
| @app.get("/api/gallery") |
| def api_gallery( |
| page: int = Query(1, ge=1, description="Starting page, 1-based"), |
| pages: int = Query(1, ge=1, le=50, description="How many pages to fetch"), |
| sort: Literal["recent", "trending", "top"] = Query("top"), |
| timeRange: Literal["all-time", "1-month"] = Query("all-time"), |
| contentFilter: Literal["none", "pg13"] = Query("none"), |
| concurrency: int = Query(1, ge=1, le=16), |
| timeout: int = Query(30, ge=5, le=120), |
| save: Optional[str] = Query(None, description="Optional local file path to save JSON"), |
| ): |
| """ |
| Example: |
| /api/gallery?page=1&pages=3&sort=top&timeRange=all-time&contentFilter=none |
| """ |
|
|
| try: |
| start_page = page - 1 |
|
|
| scraper = GalleryScraper( |
| pages=pages, |
| sort=sort, |
| time_range=timeRange, |
| content_filter=contentFilter, |
| concurrency=concurrency, |
| timeout=timeout, |
| save=save if save else False, |
| ) |
|
|
| |
| |
| data = _fetch_from_start_page( |
| start_page=start_page, |
| pages=pages, |
| sort=sort, |
| time_range=timeRange, |
| content_filter=contentFilter, |
| concurrency=concurrency, |
| timeout=timeout, |
| ) |
|
|
| return JSONResponse( |
| { |
| "ok": True, |
| "page": page, |
| "pages": pages, |
| "sort": sort, |
| "timeRange": timeRange, |
| "contentFilter": contentFilter, |
| "count": len(data), |
| "data": data, |
| } |
| ) |
|
|
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Server error: {e}") |
|
|
|
|
| def _fetch_from_start_page( |
| start_page: int, |
| pages: int, |
| sort: str, |
| time_range: str, |
| content_filter: str, |
| concurrency: int, |
| timeout: int, |
| ): |
| """ |
| Helper that fetches from an arbitrary starting page. |
| """ |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import time |
| import cloudscraper |
| from bs4 import BeautifulSoup |
| from html import unescape |
|
|
| GALLERY_URL = "https://image-generation.perchance.org/gallery" |
| PER_PAGE = 200 |
|
|
| headers = { |
| "User-Agent": ( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/145.0.0.0 Safari/537.36" |
| ), |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
| "Referer": "https://image-generation.perchance.org/", |
| "Origin": "https://image-generation.perchance.org", |
| } |
|
|
| def clean(value): |
| if value is None: |
| return "" |
| return unescape(str(value)).replace("\r", "\n").strip() |
|
|
| def build_params(page_index: int): |
| skip = page_index * PER_PAGE |
| params = { |
| "sort": sort, |
| "timeRange": time_range, |
| "hideIfScoreIsBelow": "-1", |
| "contentFilter": content_filter, |
| "subChannel": "public", |
| "channel": "ai-text-to-image-generator", |
| } |
| if skip > 0: |
| params["skip"] = skip |
| return params |
|
|
| def parse_page(html: str): |
| if not html: |
| return [] |
| soup = BeautifulSoup(html, "html.parser") |
| items = [] |
|
|
| for card in soup.select(".imageCtn"): |
| prompt = clean(card.get("data-prompt")) |
| negative_prompt = clean(card.get("data-negative-prompt")) |
| guidance_scale = clean(card.get("data-guidance-scale")) |
| seed = clean(card.get("data-seed")) |
| nsfw = clean(card.get("data-is-nsfw")).lower() == "true" |
| title_attr = clean(card.get("data-title")) |
|
|
| img_tag = card.select_one(".imageWrapperInner img.image") |
| image_url = img_tag.get("src", "") if img_tag else "" |
|
|
| title_el = card.select_one(".image-title") |
| visible_title = clean(title_el.get_text(" ", strip=True)) if title_el else "" |
|
|
| item = { |
| "image_url": image_url, |
| "title": title_attr or visible_title, |
| "prompt": prompt, |
| "guidance_scale": guidance_scale, |
| "seed": seed, |
| "nsfw": nsfw, |
| } |
| if negative_prompt: |
| item["negative_prompt"] = negative_prompt |
| items.append(item) |
|
|
| return items |
|
|
| scraper = cloudscraper.create_scraper() |
| results = {} |
|
|
| def fetch_one(i: int): |
| page_index = start_page + i |
| try: |
| resp = scraper.get( |
| GALLERY_URL, |
| params=build_params(page_index), |
| headers=headers, |
| timeout=timeout, |
| ) |
| if resp.status_code != 200: |
| return i, [] |
| return i, parse_page(resp.text) |
| except Exception: |
| return i, [] |
|
|
| if concurrency <= 1: |
| for i in range(pages): |
| _, items = fetch_one(i) |
| results[i] = items |
| else: |
| with ThreadPoolExecutor(max_workers=concurrency) as pool: |
| futures = [pool.submit(fetch_one, i) for i in range(pages)] |
| for future in as_completed(futures): |
| i, items = future.result() |
| results[i] = items |
|
|
| merged = [] |
| for i in range(pages): |
| merged.extend(results.get(i, [])) |
|
|
| for idx, item in enumerate(merged, start=1): |
| item["no"] = idx |
|
|
| return merged |