| | from fastapi import FastAPI, File, Form, UploadFile |
| | from fastapi.responses import JSONResponse |
| | from google.cloud import storage |
| | import os |
| | import io |
| | import numpy as np |
| | import cv2 |
| | from PIL import Image |
| | import uuid |
| | import base64 |
| |
|
| | os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "./da-kalbe-63ee33c9cdbb.json" |
| | bucket_name = "da-kalbe-ml-result-png" |
| | storage_client = storage.Client() |
| | bucket = storage_client.bucket(bucket_name) |
| |
|
| | app = FastAPI() |
| |
|
| | |
| | def upload_to_gcs(image: Image, filename: str): |
| | """Uploads an image to Google Cloud Storage.""" |
| | try: |
| | blob = bucket.blob(filename) |
| | image_buffer = io.BytesIO() |
| | image.save(image_buffer, format='PNG') |
| | image_buffer.seek(0) |
| | blob.upload_from_file(image_buffer, content_type='image/png') |
| | except Exception as e: |
| | return {'error': f"An unexpected error occurred: {e}"} |
| |
|
| | def upload_folder_images(image_path, enhanced_image_path): |
| | |
| | folder_name = os.path.splitext(os.path.basename(image_path))[0] |
| |
|
| | |
| | bucket.blob(folder_name + '/').upload_from_string('', content_type='application/x-www-form-urlencoded') |
| |
|
| | |
| | original_image = Image.open(image_path) |
| | enhanced_image = Image.open(enhanced_image_path) |
| |
|
| | |
| | upload_to_gcs(original_image, folder_name + '/' + 'original_image.png') |
| | upload_to_gcs(enhanced_image, folder_name + '/' + enhancement_type + '.png') |
| |
|
| | def calculate_mse(original_image, enhanced_image): |
| | mse = np.mean((original_image - enhanced_image) ** 2) |
| | return mse |
| |
|
| | def calculate_psnr(original_image, enhanced_image): |
| | mse = calculate_mse(original_image, enhanced_image) |
| | if mse == 0: |
| | return float('inf') |
| | max_pixel_value = 255.0 |
| | psnr = 20 * np.log10(max_pixel_value / np.sqrt(mse)) |
| | return psnr |
| |
|
| | def calculate_maxerr(original_image, enhanced_image): |
| | maxerr = np.max((original_image - enhanced_image) ** 2) |
| | return maxerr |
| |
|
| | def calculate_l2rat(original_image, enhanced_image): |
| | l2norm_ratio = np.sum(original_image ** 2) / np.sum((original_image - enhanced_image) ** 2) |
| | return l2norm_ratio |
| |
|
| | def process_image(original_image, enhancement_type, fix_monochrome=True): |
| | if fix_monochrome and original_image.shape[-1] == 3: |
| | original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2GRAY) |
| |
|
| | image = original_image - np.min(original_image) |
| | image = image / np.max(original_image) |
| | image = (image * 255).astype(np.uint8) |
| |
|
| | enhanced_image = enhance_image(image, enhancement_type) |
| |
|
| | mse = calculate_mse(original_image, enhanced_image) |
| | psnr = calculate_psnr(original_image, enhanced_image) |
| | maxerr = calculate_maxerr(original_image, enhanced_image) |
| | l2rat = calculate_l2rat(original_image, enhanced_image) |
| |
|
| | return enhanced_image, mse, psnr, maxerr, l2rat |
| |
|
| | def apply_clahe(image): |
| | clahe = cv2.createCLAHE(clipLimit=40.0, tileGridSize=(8, 8)) |
| | return clahe.apply(image) |
| |
|
| | def invert(image): |
| | return cv2.bitwise_not(image) |
| |
|
| | def hp_filter(image, kernel=None): |
| | if kernel is None: |
| | kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) |
| | return cv2.filter2D(image, -1, kernel) |
| |
|
| | def unsharp_mask(image, radius=5, amount=2): |
| | def usm(image, radius, amount): |
| | blurred = cv2.GaussianBlur(image, (0, 0), radius) |
| | sharpened = cv2.addWeighted(image, 1.0 + amount, blurred, -amount, 0) |
| | return sharpened |
| | return usm(image, radius, amount) |
| |
|
| | def hist_eq(image): |
| | return cv2.equalizeHist(image) |
| |
|
| | def enhance_image(image, enhancement_type): |
| | if enhancement_type == "Invert": |
| | return invert(image) |
| | elif enhancement_type == "High Pass Filter": |
| | return hp_filter(image) |
| | elif enhancement_type == "Unsharp Masking": |
| | return unsharp_mask(image) |
| | elif enhancement_type == "Histogram Equalization": |
| | return hist_eq(image) |
| | elif enhancement_type == "CLAHE": |
| | return apply_clahe(image) |
| | else: |
| | raise ValueError(f"Unknown enhancement type: {enhancement_type}") |
| |
|
| | @app.post("/process_image") |
| | async def process_image_api(image: UploadFile = File(...), enhancement_type: str = Form(...)): |
| | """Processes an uploaded image and returns the enhanced image and metrics.""" |
| | |
| | if not image: |
| | return JSONResponse(status_code=400, content={'error': 'No image file provided'}) |
| |
|
| | allowed_extensions = {'png', 'jpg', 'jpeg'} |
| | if '.' not in image.filename or image.filename.split('.')[-1].lower() not in allowed_extensions: |
| | return JSONResponse(status_code=400, content={'error': 'Invalid image file'}) |
| |
|
| | try: |
| | |
| | image_pil = Image.open(image.file).convert('RGB') |
| |
|
| | |
| | image_np = np.array(image_pil) |
| |
|
| | |
| | enhanced_image, mse, psnr, maxerr, l2rat = process_image(image_np, enhancement_type) |
| |
|
| | |
| | enhanced_image_pil = Image.fromarray(enhanced_image) |
| |
|
| | |
| | image_buffer = io.BytesIO() |
| | enhanced_image_pil.save(image_buffer, format='PNG') |
| | image_buffer.seek(0) |
| |
|
| | |
| | image_base64 = base64.b64encode(image_buffer.getvalue()).decode('utf-8') |
| |
|
| | response = { |
| | 'message': 'Image processed successfully!', |
| | 'processed_image': image_base64, |
| | 'mse': float(mse), |
| | 'psnr': float(psnr), |
| | 'maxerr': float(maxerr), |
| | 'l2rat': float(l2rat) |
| | } |
| | upload_folder_images(image, enhanced_image_pil) |
| | return JSONResponse(status_code=200, content=response) |
| |
|
| | except Exception as e: |
| | return JSONResponse(status_code=500, content={'error': f'Error processing image: {str(e)}'}) |
| |
|