| | from flask import Flask, request, jsonify |
| | import os |
| | import time |
| | import tempfile |
| | import cv2 |
| | import insightface |
| | import onnxruntime |
| | import gfpgan |
| | import io |
| | import concurrent.futures |
| | import numpy as np |
| | from PIL import Image |
| |
|
| | app = Flask(__name__) |
| |
|
| | class Predictor: |
| | def __init__(self): |
| | self.setup() |
| |
|
| | def setup(self): |
| | os.makedirs('models', exist_ok=True) |
| | os.chdir('models') |
| | if not os.path.exists('GFPGANv1.4.pth'): |
| | os.system( |
| | 'wget https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth' |
| | ) |
| | if not os.path.exists('inswapper_128.onnx'): |
| | os.system( |
| | 'wget https://huggingface.co/ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx' |
| | ) |
| | os.chdir('..') |
| |
|
| | """Load the model into memory to make running multiple predictions efficient""" |
| | self.face_swapper = insightface.model_zoo.get_model('models/inswapper_128.onnx', |
| | providers=onnxruntime.get_available_providers()) |
| | self.face_enhancer = gfpgan.GFPGANer(model_path='models/GFPGANv1.4.pth', upscale=1) |
| | self.face_analyser = insightface.app.FaceAnalysis(name='buffalo_l') |
| | self.face_analyser.prepare(ctx_id=0, det_size=(640, 640)) |
| |
|
| | def get_face(self, img_data): |
| | analysed = self.face_analyser.get(img_data) |
| | try: |
| | largest = max(analysed, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1])) |
| | return largest |
| | except: |
| | print("No face found") |
| | return None |
| |
|
| | def process_images(self, input_image, swap_image): |
| | """Process a pair of images: target image and swap image""" |
| | try: |
| | |
| | input_image_data = np.asarray(bytearray(input_image.read()), dtype=np.uint8) |
| | swap_image_data = np.asarray(bytearray(swap_image.read()), dtype=np.uint8) |
| |
|
| | frame = cv2.imdecode(input_image_data, cv2.IMREAD_COLOR) |
| | swap_frame = cv2.imdecode(swap_image_data, cv2.IMREAD_COLOR) |
| |
|
| | face = self.get_face(frame) |
| | source_face = self.get_face(swap_frame) |
| |
|
| | if face is None or source_face is None: |
| | return None |
| |
|
| | result = self.face_swapper.get(frame, face, source_face, paste_back=True) |
| | _, _, result = self.face_enhancer.enhance(result, paste_back=True) |
| |
|
| | |
| | _, result_image = cv2.imencode('.jpg', result) |
| | return result_image.tobytes() |
| |
|
| | except Exception as e: |
| | print(f"Error in processing images: {e}") |
| | return None |
| |
|
| | |
| | predictor = Predictor() |
| |
|
| | @app.route('/predict', methods=['POST']) |
| | def predict(): |
| | if 'target_images' not in request.files or 'swap_images' not in request.files: |
| | return jsonify({'error': 'No image files provided'}), 400 |
| |
|
| | target_images = request.files.getlist('target_images') |
| | swap_images = request.files.getlist('swap_images') |
| |
|
| | if len(target_images) != len(swap_images): |
| | return jsonify({'error': 'Number of target images must match number of swap images'}), 400 |
| |
|
| | results = [] |
| |
|
| | with concurrent.futures.ThreadPoolExecutor() as executor: |
| | future_to_pair = { |
| | executor.submit(predictor.process_images, target_images[i], swap_images[i]): i |
| | for i in range(len(target_images)) |
| | } |
| |
|
| | for future in concurrent.futures.as_completed(future_to_pair): |
| | idx = future_to_pair[future] |
| | result = future.result() |
| | if result: |
| | results.append({ |
| | 'index': idx, |
| | 'result_image': result |
| | }) |
| | else: |
| | results.append({ |
| | 'index': idx, |
| | 'error': 'Face swap failed' |
| | }) |
| |
|
| | return jsonify({'results': results}) |
| |
|
| | if __name__ == "__main__": |
| | app.run(debug=True, threaded=True) |
| |
|