| import os |
| import uuid |
| import shutil |
| import re |
| from datetime import datetime, timedelta, date |
| from io import BytesIO |
| from typing import Dict, List, Optional,Any |
| import numpy as np |
| from fastapi import ( |
| FastAPI, |
| UploadFile, |
| File, |
| HTTPException, |
| Depends, |
| Header, |
| Request, |
| Form, |
| ) |
| from fastapi.responses import FileResponse, JSONResponse |
| from pydantic import BaseModel |
| from PIL import Image, UnidentifiedImageError |
| import cv2 |
| import logging |
| from gridfs import GridFS |
| from gridfs.errors import NoFile |
|
|
| from bson import ObjectId |
| from pymongo import MongoClient |
| import time |
|
|
| |
| try: |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
| except Exception: |
| pass |
|
|
| logging.basicConfig(level=logging.INFO) |
| log = logging.getLogger("api") |
|
|
| from src.core import process_inpaint |
|
|
| |
| BASE_DIR = os.environ.get("DATA_DIR", "/data") |
| if not os.path.isdir(BASE_DIR): |
| |
| BASE_DIR = "/tmp" |
|
|
| UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") |
| OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") |
|
|
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| |
| ENV_TOKEN = os.environ.get("API_TOKEN") |
|
|
| app = FastAPI(title="Photo Object Removal API", version="1.0.0") |
|
|
| |
| file_store: Dict[str, Dict[str, str]] = {} |
| logs: List[Dict[str, str]] = [] |
|
|
| MONGO_URI = os.environ.get("MONGO_URI") or os.environ.get("MONGODB_URI") |
| mongo_client = None |
| mongo_db = None |
| mongo_logs = None |
| grid_fs = None |
|
|
| if MONGO_URI: |
| try: |
| mongo_client = MongoClient(MONGO_URI) |
| |
| try: |
| mongo_db = mongo_client.get_default_database() |
| log.info("Using database from connection string: %s", mongo_db.name) |
| except Exception as db_err: |
| mongo_db = None |
| log.warning("Could not extract database from connection string: %s", db_err) |
| |
| |
| if mongo_db is None: |
| mongo_db = mongo_client["object_remover"] |
| log.info("Using default database: object_remover") |
| |
| mongo_logs = mongo_db["api_logs"] |
| grid_fs = GridFS(mongo_db) |
| log.info("MongoDB connection initialized successfully - Database: %s, Collection: %s", mongo_db.name, mongo_logs.name) |
| except Exception as err: |
| log.error("Failed to initialize MongoDB connection: %s", err, exc_info=True) |
| log.warning("GridFS operations will be disabled. Set MONGO_URI or MONGODB_URI environment variable.") |
| else: |
| log.warning("MONGO_URI not set. GridFS operations will be disabled. Upload endpoints will not work.") |
|
|
| API_LOGS_MONGO_URI = os.environ.get("API_LOGS_MONGODB_URL") |
|
|
| api_logs_client = None |
| api_logs_db = None |
| api_logs_collection = None |
|
|
| if API_LOGS_MONGO_URI: |
| try: |
| api_logs_client = MongoClient(API_LOGS_MONGO_URI) |
| api_logs_db = api_logs_client["logs"] |
| api_logs_collection = api_logs_db["objectRemover"] |
| log.info("API Logs Mongo initialized → logs/objectRemover") |
| except Exception as e: |
| log.error("Failed to initialize API Logs MongoDB: %s", e) |
| api_logs_collection = None |
| else: |
| log.warning("API_LOGS_MONGODB_URL not set. API logging disabled.") |
| |
| ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN") |
| DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984" |
|
|
| |
| COLLAGE_MAKER_MONGO_URI = os.environ.get("MONGODB_COLLAGE_MAKER") |
| COLLAGE_MAKER_DB_NAME = os.environ.get("MONGODB_COLLAGE_MAKER_DB_NAME", "collage-maker") |
| COLLAGE_MAKER_ADMIN_DB_NAME = os.environ.get("MONGODB_COLLAGE_MAKER_ADMIN_DB_NAME", "adminPanel") |
| collage_maker_client = None |
| collage_maker_db = None |
| collage_maker_admin_db = None |
| collage_maker_categories = None |
|
|
| |
| AI_ENHANCER_MONGO_URI = os.environ.get("MONGODB_AI_ENHANCER") |
| AI_ENHANCER_DB_NAME = os.environ.get("MONGODB_AI_ENHANCER_DB_NAME", "ai-enhancer") |
| AI_ENHANCER_ADMIN_DB_NAME = os.environ.get("MONGODB_AI_ENHANCER_ADMIN_DB_NAME", "test") |
| ai_enhancer_client = None |
| ai_enhancer_db = None |
| ai_enhancer_admin_db = None |
|
|
|
|
| def get_collage_maker_client() -> Optional[MongoClient]: |
| """Get collage-maker MongoDB client.""" |
| global collage_maker_client |
| if collage_maker_client is None and COLLAGE_MAKER_MONGO_URI: |
| try: |
| collage_maker_client = MongoClient(COLLAGE_MAKER_MONGO_URI) |
| log.info("Collage-maker MongoDB client initialized") |
| except Exception as err: |
| log.error("Failed to initialize collage-maker MongoDB client: %s", err) |
| collage_maker_client = None |
| return collage_maker_client |
|
|
|
|
| def get_collage_maker_database() -> Optional[Any]: |
| """Get collage-maker database instance.""" |
| global collage_maker_db |
| client = get_collage_maker_client() |
| if client is None: |
| return None |
| if collage_maker_db is None: |
| try: |
| collage_maker_db = client[COLLAGE_MAKER_DB_NAME] |
| log.info("Collage-maker database initialized: %s", COLLAGE_MAKER_DB_NAME) |
| except Exception as err: |
| log.error("Failed to get collage-maker database: %s", err) |
| collage_maker_db = None |
| return collage_maker_db |
|
|
|
|
| def _init_collage_maker_mongo() -> None: |
| """Initialize collage-maker MongoDB connections.""" |
| global collage_maker_admin_db, collage_maker_categories |
| client = get_collage_maker_client() |
| if client is None: |
| log.info("Collage-maker Mongo URI not provided; collage-maker features disabled") |
| return |
| try: |
| collage_maker_admin_db = client[COLLAGE_MAKER_ADMIN_DB_NAME] |
| collage_maker_categories = collage_maker_admin_db["categories"] |
| log.info( |
| "Collage-maker admin initialized: db=%s, categories=%s", |
| COLLAGE_MAKER_ADMIN_DB_NAME, |
| collage_maker_categories.name, |
| ) |
| except Exception as err: |
| log.error("Failed to init collage-maker admin Mongo: %s", err) |
| collage_maker_admin_db = None |
| collage_maker_categories = None |
|
|
|
|
| _init_collage_maker_mongo() |
|
|
|
|
| def get_ai_enhancer_client() -> Optional[MongoClient]: |
| """Get AI-Enhancer MongoDB client.""" |
| global ai_enhancer_client |
| if ai_enhancer_client is None and AI_ENHANCER_MONGO_URI: |
| try: |
| ai_enhancer_client = MongoClient(AI_ENHANCER_MONGO_URI) |
| log.info("AI-Enhancer MongoDB client initialized") |
| except Exception as err: |
| log.error("Failed to initialize AI-Enhancer MongoDB client: %s", err) |
| ai_enhancer_client = None |
| return ai_enhancer_client |
|
|
|
|
| def get_ai_enhancer_database() -> Optional[Any]: |
| """Get AI-Enhancer database instance.""" |
| global ai_enhancer_db |
| client = get_ai_enhancer_client() |
| if client is None: |
| return None |
| if ai_enhancer_db is None: |
| try: |
| ai_enhancer_db = client[AI_ENHANCER_DB_NAME] |
| log.info("AI-Enhancer database initialized: %s", AI_ENHANCER_DB_NAME) |
| except Exception as err: |
| log.error("Failed to get AI-Enhancer database: %s", err) |
| ai_enhancer_db = None |
| return ai_enhancer_db |
|
|
|
|
| def _init_ai_enhancer_mongo() -> None: |
| """Initialize AI-Enhancer MongoDB connections.""" |
| global ai_enhancer_admin_db |
| client = get_ai_enhancer_client() |
| if client is None: |
| log.info("AI-Enhancer Mongo URI not provided; AI-Enhancer features disabled") |
| return |
| try: |
| ai_enhancer_admin_db = client[AI_ENHANCER_ADMIN_DB_NAME] |
| log.info( |
| "AI-Enhancer admin initialized: db=%s", |
| AI_ENHANCER_ADMIN_DB_NAME, |
| ) |
| except Exception as err: |
| log.error("Failed to init AI-Enhancer admin Mongo: %s", err) |
| ai_enhancer_admin_db = None |
|
|
|
|
| _init_ai_enhancer_mongo() |
|
|
|
|
| def get_category_id_from_collage_maker() -> Optional[str]: |
| """Query category ID from collage-maker categories collection.""" |
| if collage_maker_categories is None: |
| log.warning("Collage-maker categories collection not initialized") |
| return None |
| try: |
| |
| |
| category_doc = collage_maker_categories.find_one() |
| if category_doc: |
| category_id = str(category_doc.get("_id", "")) |
| log.info("Found category ID from collage-maker: %s", category_id) |
| return category_id |
| else: |
| log.warning("No categories found in collage-maker collection") |
| return None |
| except Exception as err: |
| log.error("Failed to query collage-maker categories: %s", err) |
| return None |
|
|
|
|
| def _init_admin_mongo() -> None: |
| |
| pass |
|
|
|
|
| _init_admin_mongo() |
|
|
|
|
| def _admin_logging_status() -> Dict[str, object]: |
| return { |
| "enabled": False, |
| "db": None, |
| "collection": None, |
| } |
|
|
|
|
| def _save_upload_to_gridfs(upload: UploadFile, file_type: str) -> str: |
| """Store an uploaded file into GridFS and return its ObjectId string.""" |
| if grid_fs is None: |
| raise HTTPException( |
| status_code=503, |
| detail="MongoDB/GridFS not configured. Set MONGO_URI or MONGODB_URI environment variable." |
| ) |
| data = upload.file.read() |
| if not data: |
| raise HTTPException(status_code=400, detail=f"{file_type} file is empty") |
| oid = grid_fs.put( |
| data, |
| filename=upload.filename or f"{file_type}.bin", |
| contentType=upload.content_type, |
| metadata={"type": file_type}, |
| ) |
| return str(oid) |
|
|
|
|
| def _read_gridfs_bytes(file_id: str, expected_type: str) -> bytes: |
| """Fetch raw bytes from GridFS and validate the stored type metadata.""" |
| if grid_fs is None: |
| raise HTTPException( |
| status_code=503, |
| detail="MongoDB/GridFS not configured. Set MONGO_URI or MONGODB_URI environment variable." |
| ) |
| try: |
| oid = ObjectId(file_id) |
| except Exception: |
| raise HTTPException(status_code=404, detail=f"{expected_type}_id invalid") |
|
|
| try: |
| grid_out = grid_fs.get(oid) |
| except NoFile: |
| raise HTTPException(status_code=404, detail=f"{expected_type}_id not found") |
|
|
| meta = grid_out.metadata or {} |
| stored_type = meta.get("type") |
| if stored_type and stored_type != expected_type: |
| raise HTTPException(status_code=404, detail=f"{expected_type}_id not found") |
|
|
| return grid_out.read() |
|
|
|
|
| def _load_rgba_image_from_gridfs(file_id: str, expected_type: str) -> Image.Image: |
| """Load an image from GridFS and convert to RGBA.""" |
| data = _read_gridfs_bytes(file_id, expected_type) |
| try: |
| img = Image.open(BytesIO(data)) |
| except UnidentifiedImageError: |
| raise HTTPException(status_code=422, detail=f"{expected_type} is not a valid image") |
| return img.convert("RGBA") |
|
|
|
|
| def _build_ai_edit_daily_count( |
| existing: Optional[List[Dict[str, object]]], |
| today: date, |
| ) -> List[Dict[str, object]]: |
| """ |
| Build / extend the ai_edit_daily_count array with the following rules: |
| |
| - Case A (no existing data): return [{date: today, count: 1}] |
| - Case B (today already recorded): return list unchanged |
| - Case C (gap in days): fill missing days with count=0 and append today with count=1 |
| |
| Additionally, the returned list is capped to the most recent 32 entries. |
| |
| The stored "date" value is a midnight UTC (naive UTC) datetime for the given day. |
| """ |
|
|
| def _to_date_only(value: object) -> date: |
| if isinstance(value, datetime): |
| return value.date() |
| if isinstance(value, date): |
| return value |
| |
| try: |
| text = str(value) |
| if len(text) == 10: |
| return datetime.strptime(text, "%Y-%m-%d").date() |
| return datetime.fromisoformat(text).date() |
| except Exception: |
| |
| return today |
|
|
| |
| if not existing: |
| return [ |
| { |
| "date": datetime(today.year, today.month, today.day), |
| "count": 1, |
| } |
| ] |
|
|
| |
| result: List[Dict[str, object]] = list(existing) |
|
|
| last_entry = result[-1] if result else None |
| if not last_entry or "date" not in last_entry: |
| |
| return [ |
| { |
| "date": datetime(today.year, today.month, today.day), |
| "count": 1, |
| } |
| ] |
|
|
| last_date = _to_date_only(last_entry["date"]) |
|
|
| |
| if last_date > today: |
| return result |
|
|
| |
| if last_date == today: |
| return result |
|
|
| |
| cursor = last_date + timedelta(days=1) |
| while cursor < today: |
| result.append( |
| { |
| "date": datetime(cursor.year, cursor.month, cursor.day), |
| "count": 0, |
| } |
| ) |
| cursor += timedelta(days=1) |
|
|
| |
| result.append( |
| { |
| "date": datetime(today.year, today.month, today.day), |
| "count": 1, |
| } |
| ) |
|
|
|
|
| |
| try: |
| result.sort(key=lambda entry: _to_date_only(entry.get("date"))) |
| except Exception: |
| |
| pass |
|
|
| |
| if len(result) > 32: |
| result = result[-32:] |
|
|
| return result |
|
|
| def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None: |
| if not ENV_TOKEN: |
| return |
| if authorization is None or not authorization.lower().startswith("bearer "): |
| raise HTTPException(status_code=401, detail="Unauthorized") |
| token = authorization.split(" ", 1)[1] |
| if token != ENV_TOKEN: |
| raise HTTPException(status_code=403, detail="Forbidden") |
|
|
|
|
| class InpaintRequest(BaseModel): |
| image_id: str |
| mask_id: str |
| invert_mask: bool = True |
| passthrough: bool = False |
| prompt: Optional[str] = None |
| user_id: Optional[str] = None |
| category_id: Optional[str] = None |
| appname: Optional[str] = None |
|
|
|
|
| class SimpleRemoveRequest(BaseModel): |
| image_id: str |
|
|
|
|
| def _coerce_object_id(value: Optional[str]) -> ObjectId: |
| if value is None: |
| return ObjectId() |
| value_str = str(value).strip() |
| if re.fullmatch(r"[0-9a-fA-F]{24}", value_str): |
| return ObjectId(value_str) |
| if value_str.isdigit(): |
| hex_str = format(int(value_str), "x") |
| if len(hex_str) > 24: |
| hex_str = hex_str[-24:] |
| hex_str = hex_str.rjust(24, "0") |
| return ObjectId(hex_str) |
| return ObjectId() |
|
|
|
|
| def _coerce_category_id(category_id: Optional[str]) -> ObjectId: |
| raw = category_id or DEFAULT_CATEGORY_ID |
| raw_str = str(raw).strip() |
| if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str): |
| return ObjectId(raw_str) |
| return _coerce_object_id(raw_str) |
|
|
|
|
| def log_media_click(user_id: Optional[str], category_id: Optional[str], appname: Optional[str] = None) -> None: |
| """Media clicks logging disabled - no-op function.""" |
| pass |
|
|
|
|
| @app.get("/") |
| def root() -> Dict[str, Any]: |
| return { |
| "success": True, |
| "message": "Object Remover API", |
| "data": { |
| "version": "1.0.0", |
| "product_name": "Beauty Camera - GlowCam AI Studio", |
| "released_by": "LogicGo Infotech" |
| } |
| } |
|
|
|
|
|
|
| @app.get("/health") |
| def health() -> Dict[str, str]: |
| return {"status": "healthy"} |
|
|
|
|
| @app.get("/logging-status") |
| def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: |
| """Helper endpoint to verify admin media logging wiring (no secrets exposed).""" |
| return _admin_logging_status() |
|
|
|
|
| @app.get("/mongo-status") |
| def mongo_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: |
| """Check MongoDB connection status and verify data storage.""" |
| status = { |
| "mongo_configured": MONGO_URI is not None, |
| "mongo_connected": mongo_client is not None, |
| "database": mongo_db.name if mongo_db else None, |
| "collection": mongo_logs.name if mongo_logs else None, |
| "admin_logging": _admin_logging_status(), |
| } |
| |
| |
| if mongo_logs is not None: |
| try: |
| count = mongo_logs.count_documents({}) |
| status["api_logs_count"] = count |
| |
| latest_docs = list(mongo_logs.find().sort("timestamp", -1).limit(5)) |
| status["recent_logs"] = [] |
| for doc in latest_docs: |
| doc_dict = { |
| "_id": str(doc.get("_id")), |
| "output_id": doc.get("output_id"), |
| "status": doc.get("status"), |
| "timestamp": doc.get("timestamp").isoformat() if isinstance(doc.get("timestamp"), datetime) else str(doc.get("timestamp")), |
| } |
| if "input_image_id" in doc: |
| doc_dict["input_image_id"] = doc.get("input_image_id") |
| if "input_mask_id" in doc: |
| doc_dict["input_mask_id"] = doc.get("input_mask_id") |
| if "error" in doc: |
| doc_dict["error"] = doc.get("error") |
| status["recent_logs"].append(doc_dict) |
| |
| |
| if latest_docs: |
| latest = latest_docs[0] |
| status["latest_log"] = { |
| "_id": str(latest.get("_id")), |
| "output_id": latest.get("output_id"), |
| "status": latest.get("status"), |
| "timestamp": latest.get("timestamp").isoformat() if isinstance(latest.get("timestamp"), datetime) else str(latest.get("timestamp")), |
| } |
| except Exception as err: |
| status["api_logs_error"] = str(err) |
| log.error("Error querying MongoDB: %s", err, exc_info=True) |
| |
| return status |
|
|
|
|
| @app.post("/upload-image") |
| def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: |
| file_id = _save_upload_to_gridfs(image, "image") |
| logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()}) |
| return {"id": file_id, "filename": image.filename} |
|
|
|
|
| @app.post("/upload-mask") |
| def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: |
| file_id = _save_upload_to_gridfs(mask, "mask") |
| logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()}) |
| return {"id": file_id, "filename": mask.filename} |
|
|
|
|
| def _compress_image(image_path: str, output_path: str, quality: int = 85) -> None: |
| """ |
| Compress an image to reduce file size. |
| Converts to JPEG format with specified quality to achieve smaller file size. |
| """ |
| img = Image.open(image_path) |
| |
| if img.mode == "RGBA": |
| rgb_img = Image.new("RGB", img.size, (255, 255, 255)) |
| rgb_img.paste(img, mask=img.split()[3]) |
| img = rgb_img |
| elif img.mode != "RGB": |
| img = img.convert("RGB") |
| |
| |
| img.save(output_path, "JPEG", quality=quality, optimize=True) |
|
|
|
|
| def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray: |
| """ |
| Convert mask image to RGBA format (black/white mask). |
| Standard convention: white (255) = area to remove, black (0) = area to keep |
| Returns RGBA with white in RGB channels where removal is needed, alpha=255 |
| """ |
| if img.mode != "RGBA": |
| |
| gray = img.convert("L") |
| arr = np.array(gray) |
| |
| mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8) |
| |
| rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) |
| rgba[:, :, 0] = mask_bw |
| rgba[:, :, 1] = mask_bw |
| rgba[:, :, 2] = mask_bw |
| rgba[:, :, 3] = 255 |
| log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)") |
| return rgba |
| |
| |
| arr = np.array(img) |
| alpha = arr[:, :, 3] |
| rgb = arr[:, :, :3] |
| |
| |
| if alpha.mean() > 200: |
| |
| gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY) |
| |
| magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255 |
| mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta) |
| |
| rgba = arr.copy() |
| rgba[:, :, 0] = mask_bw |
| rgba[:, :, 1] = mask_bw |
| rgba[:, :, 2] = mask_bw |
| rgba[:, :, 3] = 255 |
| log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)") |
| return rgba |
| |
| |
| |
| mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8) |
| rgba = arr.copy() |
| rgba[:, :, 0] = mask_bw |
| rgba[:, :, 1] = mask_bw |
| rgba[:, :, 2] = mask_bw |
| rgba[:, :, 3] = 255 |
| log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)") |
| return rgba |
|
|
| @app.post("/inpaint") |
| def inpaint(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: |
| start_time = time.time() |
| status = "success" |
| error_msg = None |
| output_name = None |
| compressed_url = None |
|
|
| try: |
| |
| category_id = req.category_id |
| if req.appname == "collage-maker" and not category_id: |
| category_id = get_category_id_from_collage_maker() |
| if category_id: |
| log.info("Using category_id from collage-maker: %s", category_id) |
|
|
| img_rgba = _load_rgba_image_from_gridfs(req.image_id, "image") |
| mask_img = _load_rgba_image_from_gridfs(req.mask_id, "mask") |
| mask_rgba = _load_rgba_mask_from_image(mask_img) |
|
|
| if req.passthrough: |
| result = np.array(img_rgba.convert("RGB")) |
| else: |
| result = process_inpaint( |
| np.array(img_rgba), |
| mask_rgba, |
| invert_mask=req.invert_mask, |
| prompt=req.prompt, |
| ) |
|
|
| output_name = f"output_{uuid.uuid4().hex}.png" |
| output_path = os.path.join(OUTPUT_DIR, output_name) |
|
|
| Image.fromarray(result).save( |
| output_path, "PNG", optimize=False, compress_level=1 |
| ) |
|
|
| |
| compressed_name = f"compressed_{output_name.replace('.png', '.jpg')}" |
| compressed_path = os.path.join(OUTPUT_DIR, compressed_name) |
| try: |
| _compress_image(output_path, compressed_path, quality=85) |
| compressed_url = str(request.url_for("download_file", filename=compressed_name)) |
| except Exception as compress_err: |
| log.warning("Failed to create compressed image: %s", compress_err) |
| compressed_url = None |
|
|
| response = {"result": output_name} |
| if compressed_url: |
| response["Compressed_Image_URL"] = compressed_url |
| return response |
|
|
| except Exception as e: |
| status = "fail" |
| error_msg = str(e) |
| raise |
|
|
| finally: |
| end_time = time.time() |
| response_time_ms = (end_time - start_time) * 1000 |
|
|
| log_doc = { |
| "endpoint": "/inpaint", |
| "status": status, |
| "response_time_ms": float(response_time_ms), |
| "timestamp": datetime.utcnow(), |
| "appname": req.appname if req.appname else "None", |
| "error": error_msg |
| } |
|
|
| |
| if req.appname: |
| log_doc["appname"] = req.appname |
|
|
| if error_msg: |
| log_doc["error"] = error_msg |
| if api_logs_collection is not None: |
| try: |
| api_logs_collection.insert_one(log_doc) |
| log.info("API log inserted into logs/objectRemover") |
| except Exception as e: |
| log.error("Failed to insert API log: %s", e) |
|
|
|
|
| @app.post("/inpaint-url") |
| def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: |
| """Same as /inpaint but returns a JSON with a public download URL instead of image bytes.""" |
| start_time = time.time() |
| status = "success" |
| error_msg = None |
| result_name = None |
|
|
| try: |
| |
| category_id = req.category_id |
| if req.appname == "collage-maker" and not category_id: |
| category_id = get_category_id_from_collage_maker() |
| if category_id: |
| log.info("Using category_id from collage-maker: %s", category_id) |
|
|
| img_rgba = _load_rgba_image_from_gridfs(req.image_id, "image") |
| mask_img = _load_rgba_image_from_gridfs(req.mask_id, "mask") |
| mask_rgba = _load_rgba_mask_from_image(mask_img) |
|
|
| if req.passthrough: |
| result = np.array(img_rgba.convert("RGB")) |
| else: |
| result = process_inpaint( |
| np.array(img_rgba), |
| mask_rgba, |
| invert_mask=req.invert_mask, |
| prompt=req.prompt, |
| ) |
| result_name = f"output_{uuid.uuid4().hex}.png" |
| result_path = os.path.join(OUTPUT_DIR, result_name) |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
|
|
| url = str(request.url_for("download_file", filename=result_name)) |
| logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()}) |
| return {"result": result_name, "url": url} |
| except Exception as e: |
| status = "fail" |
| error_msg = str(e) |
| raise |
| finally: |
| |
| end_time = time.time() |
| response_time_ms = (end_time - start_time) * 1000 |
| log_doc = { |
| "input_image_id": req.image_id, |
| "input_mask_id": req.mask_id, |
| "output_id": result_name, |
| "status": status, |
| "timestamp": datetime.utcnow(), |
| "ts": int(time.time()), |
| "response_time_ms": response_time_ms, |
| } |
| |
| if req.appname: |
| log_doc["appname"] = req.appname |
| if error_msg: |
| log_doc["error"] = error_msg |
| if mongo_logs is not None: |
| try: |
| log.info("Inserting log to MongoDB - Database: %s, Collection: %s", mongo_logs.database.name, mongo_logs.name) |
| result = mongo_logs.insert_one(log_doc) |
| log.info("Mongo log inserted successfully: _id=%s, output_id=%s, status=%s, db=%s, collection=%s", |
| result.inserted_id, output_name, status, mongo_logs.database.name, mongo_logs.name) |
| |
| |
| try: |
| verify_doc = mongo_logs.find_one({"_id": result.inserted_id}) |
| if verify_doc: |
| log.info("Verified: Document exists in MongoDB after insert") |
| else: |
| log.error("WARNING: Document not found after insert! _id=%s", result.inserted_id) |
| except Exception as verify_err: |
| log.warning("Could not verify insert: %s", verify_err) |
| except Exception as mongo_err: |
| log.error("Mongo log insert failed: %s, log_doc=%s", mongo_err, log_doc, exc_info=True) |
| else: |
| log.warning("MongoDB not configured, skipping log insert") |
|
|
|
|
| @app.post("/inpaint-multipart") |
| def inpaint_multipart( |
| image: UploadFile = File(...), |
| mask: UploadFile = File(...), |
| request: Request = None, |
| invert_mask: bool = True, |
| mask_is_painted: bool = False, |
| passthrough: bool = False, |
| prompt: Optional[str] = Form(None), |
| user_id: Optional[str] = Form(None), |
| category_id: Optional[str] = Form(None), |
| appname: Optional[str] = Form(None), |
| _: None = Depends(bearer_auth), |
| ) -> Dict[str, str]: |
| start_time = time.time() |
| status = "success" |
| error_msg = None |
| result_name = None |
| |
| try: |
| |
| final_category_id = category_id |
| if appname == "collage-maker" and not final_category_id: |
| final_category_id = get_category_id_from_collage_maker() |
| if final_category_id: |
| log.info("Using category_id from collage-maker: %s", final_category_id) |
| |
| |
| img = Image.open(image.file).convert("RGBA") |
| m = Image.open(mask.file).convert("RGBA") |
|
|
| if passthrough: |
| |
| result = np.array(img.convert("RGB")) |
| result_name = f"output_{uuid.uuid4().hex}.png" |
| result_path = os.path.join(OUTPUT_DIR, result_name) |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
|
|
| url: Optional[str] = None |
| try: |
| if request is not None: |
| url = str(request.url_for("download_file", filename=result_name)) |
| except Exception: |
| url = None |
|
|
| entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} |
| if url: |
| entry["url"] = url |
| logs.append(entry) |
| resp: Dict[str, str] = {"result": result_name} |
| if url: |
| resp["url"] = url |
| return resp |
|
|
| if mask_is_painted: |
| |
| |
| log.info("Auto-detecting pink/magenta paint from uploaded image...") |
| |
| m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB) |
| |
| |
| lower = np.array([150, 0, 100], dtype=np.uint8) |
| upper = np.array([255, 120, 255], dtype=np.uint8) |
| magenta_detected = ( |
| (m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) & |
| (m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) & |
| (m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2]) |
| ).astype(np.uint8) * 255 |
| |
| |
| if img is not None: |
| img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) |
| if img_rgb.shape == m_rgb.shape: |
| diff = cv2.absdiff(img_rgb, m_rgb) |
| gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY) |
| |
| diff_mask = (gray_diff > 50).astype(np.uint8) * 255 |
| |
| binmask = cv2.bitwise_or(magenta_detected, diff_mask) |
| else: |
| binmask = magenta_detected |
| else: |
| |
| binmask = magenta_detected |
| |
| |
| kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) |
| |
| binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) |
| |
| binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) |
| |
| nonzero = int((binmask > 0).sum()) |
| log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero) |
| |
| |
| |
| |
| if nonzero < 50: |
| log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.") |
| result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB")) |
| result_name = f"output_{uuid.uuid4().hex}.png" |
| result_path = os.path.join(OUTPUT_DIR, result_name) |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
| return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"} |
| |
| |
| |
| |
| |
| |
| mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) |
| mask_rgba[:, :, 0] = binmask |
| mask_rgba[:, :, 1] = binmask |
| mask_rgba[:, :, 2] = binmask |
| |
| mask_rgba[:, :, 3] = 255 - binmask |
| |
| log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)", |
| nonzero, binmask.shape[0] * binmask.shape[1] - nonzero) |
| else: |
| mask_rgba = _load_rgba_mask_from_image(m) |
|
|
| |
| actual_invert = invert_mask |
| log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted) |
| |
| result = process_inpaint( |
| np.array(img), |
| mask_rgba, |
| invert_mask=actual_invert, |
| prompt=prompt, |
| ) |
| result_name = f"output_{uuid.uuid4().hex}.png" |
| result_path = os.path.join(OUTPUT_DIR, result_name) |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
|
|
| url: Optional[str] = None |
| try: |
| if request is not None: |
| url = str(request.url_for("download_file", filename=result_name)) |
| except Exception: |
| url = None |
|
|
| entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} |
| if url: |
| entry["url"] = url |
| logs.append(entry) |
| resp: Dict[str, str] = {"result": result_name} |
| if url: |
| resp["url"] = url |
| return resp |
| except Exception as e: |
| status = "fail" |
| error_msg = str(e) |
| raise |
| finally: |
| |
| end_time = time.time() |
| response_time_ms = (end_time - start_time) * 1000 |
| log_doc = { |
| "endpoint": "inpaint-multipart", |
| "output_id": result_name, |
| "status": status, |
| "timestamp": datetime.utcnow(), |
| "ts": int(time.time()), |
| "response_time_ms": response_time_ms, |
| } |
| |
| if appname: |
| log_doc["appname"] = appname |
| if error_msg: |
| log_doc["error"] = error_msg |
| if mongo_logs is not None: |
| try: |
| log.info("Inserting log to MongoDB - Database: %s, Collection: %s", mongo_logs.database.name, mongo_logs.name) |
| result = mongo_logs.insert_one(log_doc) |
| log.info("Mongo log inserted successfully: _id=%s, output_id=%s, status=%s, db=%s, collection=%s", |
| result.inserted_id, output_name, status, mongo_logs.database.name, mongo_logs.name) |
| |
| |
| try: |
| verify_doc = mongo_logs.find_one({"_id": result.inserted_id}) |
| if verify_doc: |
| log.info("Verified: Document exists in MongoDB after insert") |
| else: |
| log.error("WARNING: Document not found after insert! _id=%s", result.inserted_id) |
| except Exception as verify_err: |
| log.warning("Could not verify insert: %s", verify_err) |
| except Exception as mongo_err: |
| log.error("Mongo log insert failed: %s, log_doc=%s", mongo_err, log_doc, exc_info=True) |
| else: |
| log.warning("MongoDB not configured, skipping log insert") |
|
|
|
|
| @app.post("/remove-pink") |
| def remove_pink_segments( |
| image: UploadFile = File(...), |
| request: Request = None, |
| user_id: Optional[str] = Form(None), |
| category_id: Optional[str] = Form(None), |
| appname: Optional[str] = Form(None), |
| _: None = Depends(bearer_auth), |
| ) -> Dict[str, str]: |
| """ |
| Simple endpoint: upload an image with pink/magenta segments to remove. |
| - Pink/Magenta segments → automatically removed (white in mask) |
| - Everything else → automatically kept (black in mask) |
| Just paint pink/magenta on areas you want to remove, upload the image, and it works! |
| """ |
| start_time = time.time() |
| status = "success" |
| error_msg = None |
| result_name = None |
| |
| try: |
| |
| final_category_id = category_id |
| if appname == "collage-maker" and not final_category_id: |
| final_category_id = get_category_id_from_collage_maker() |
| if final_category_id: |
| log.info("Using category_id from collage-maker: %s", final_category_id) |
| |
| log.info(f"Simple remove-pink: processing image {image.filename}") |
| |
| |
| img = Image.open(image.file).convert("RGBA") |
| img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) |
| |
| |
| |
| |
| |
| |
| lower = np.array([150, 0, 100], dtype=np.uint8) |
| upper = np.array([255, 120, 255], dtype=np.uint8) |
| binmask = ( |
| (img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) & |
| (img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) & |
| (img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2]) |
| ).astype(np.uint8) * 255 |
| |
| |
| kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) |
| binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) |
| binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) |
| |
| nonzero = int((binmask > 0).sum()) |
| total_pixels = binmask.shape[0] * binmask.shape[1] |
| log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove") |
| |
| |
| log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]") |
| |
| if nonzero < 50: |
| log.error("No pink segments detected! Returning original image.") |
| result = np.array(img.convert("RGB")) |
| result_name = f"output_{uuid.uuid4().hex}.png" |
| result_path = os.path.join(OUTPUT_DIR, result_name) |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
| return { |
| "result": result_name, |
| "error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)." |
| } |
| |
| |
| |
| |
| |
| |
| mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) |
| |
| mask_rgba[:, :, 0] = binmask |
| mask_rgba[:, :, 1] = binmask |
| mask_rgba[:, :, 2] = binmask |
| |
| |
| mask_rgba[:, :, 3] = 255 - binmask |
| |
| |
| alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum()) |
| alpha_255_count = int((mask_rgba[:,:,3] == 255).sum()) |
| total_pixels = binmask.shape[0] * binmask.shape[1] |
| log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)") |
| log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)") |
| |
| |
| |
| |
| img_clean = np.array(img.convert("RGBA")) |
| |
| |
| |
| |
| |
| log.info(f"Starting inpainting process...") |
| result = process_inpaint(img_clean, mask_rgba, invert_mask=True) |
| log.info(f"Inpainting complete, result shape: {result.shape}") |
| result_name = f"output_{uuid.uuid4().hex}.png" |
| result_path = os.path.join(OUTPUT_DIR, result_name) |
| Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
| |
| url: Optional[str] = None |
| try: |
| if request is not None: |
| url = str(request.url_for("download_file", filename=result_name)) |
| except Exception: |
| url = None |
| |
| logs.append({ |
| "result": result_name, |
| "filename": image.filename, |
| "pink_pixels": nonzero, |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
| |
| resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)} |
| if url: |
| resp["url"] = url |
| return resp |
| except Exception as e: |
| status = "fail" |
| error_msg = str(e) |
| raise |
| finally: |
| |
| end_time = time.time() |
| response_time_ms = (end_time - start_time) * 1000 |
| log_doc = { |
| "endpoint": "remove-pink", |
| "output_id": result_name, |
| "status": status, |
| "timestamp": datetime.utcnow(), |
| "ts": int(time.time()), |
| "response_time_ms": response_time_ms, |
| } |
| |
| if appname: |
| log_doc["appname"] = appname |
| if error_msg: |
| log_doc["error"] = error_msg |
| if mongo_logs is not None: |
| try: |
| log.info("Inserting log to MongoDB - Database: %s, Collection: %s", mongo_logs.database.name, mongo_logs.name) |
| result = mongo_logs.insert_one(log_doc) |
| log.info("Mongo log inserted successfully: _id=%s, output_id=%s, status=%s, db=%s, collection=%s", |
| result.inserted_id, output_name, status, mongo_logs.database.name, mongo_logs.name) |
| |
| |
| try: |
| verify_doc = mongo_logs.find_one({"_id": result.inserted_id}) |
| if verify_doc: |
| log.info("Verified: Document exists in MongoDB after insert") |
| else: |
| log.error("WARNING: Document not found after insert! _id=%s", result.inserted_id) |
| except Exception as verify_err: |
| log.warning("Could not verify insert: %s", verify_err) |
| except Exception as mongo_err: |
| log.error("Mongo log insert failed: %s, log_doc=%s", mongo_err, log_doc, exc_info=True) |
| else: |
| log.warning("MongoDB not configured, skipping log insert") |
|
|
|
|
| @app.get("/download/{filename}") |
| def download_file(filename: str): |
| path = os.path.join(OUTPUT_DIR, filename) |
| if not os.path.isfile(path): |
| raise HTTPException(status_code=404, detail="file not found") |
| return FileResponse(path) |
|
|
|
|
| @app.get("/result/{filename}") |
| def view_result(filename: str): |
| """View result image directly in browser (same as download but with proper content-type for viewing)""" |
| path = os.path.join(OUTPUT_DIR, filename) |
| if not os.path.isfile(path): |
| raise HTTPException(status_code=404, detail="file not found") |
| return FileResponse(path, media_type="image/png") |
|
|
|
|
| @app.get("/logs") |
| def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse: |
| return JSONResponse(content=logs) |
|
|