| import os |
| import uuid |
| import threading |
| import time |
|
|
| import cv2 |
| import numpy as np |
| import base64 |
| from flask import Flask, render_template_string, request, redirect, flash, url_for, jsonify |
| import roboflow |
| import torch |
| from collections import Counter |
|
|
| app = Flask(__name__) |
| app.secret_key = 'your_secret_key' |
|
|
| |
| jobs = {} |
|
|
| |
| |
| |
|
|
| |
| API_KEY = "wLjPoPYaLmrqCIOFA0RH" |
| PROJECT_ID = "base-model-box-r4suo-8lkk1-6dbqh" |
| VERSION_NUMBER = "2" |
|
|
| try: |
| rf = roboflow.Roboflow(api_key=API_KEY) |
| workspace = rf.workspace() |
| project = workspace.project(PROJECT_ID) |
| version = project.version(VERSION_NUMBER) |
| box_model = version.model |
| print("Roboflow model loaded successfully.") |
| except Exception as e: |
| print("Error initializing Roboflow model:", e) |
| box_model = None |
|
|
| |
| try: |
| yolov5_model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True) |
| print("YOLOv5 model loaded successfully.") |
| except Exception as e: |
| print("Error loading YOLOv5 model:", e) |
| yolov5_model = None |
|
|
| |
| |
| |
|
|
| def compute_iou(boxA, boxB): |
| xA = max(boxA[0], boxB[0]) |
| yA = max(boxA[1], boxB[1]) |
| xB = min(boxA[2], boxB[2]) |
| yB = min(boxA[3], boxB[3]) |
| interWidth = max(0, xB - xA) |
| interHeight = max(0, yB - yA) |
| interArea = interWidth * interHeight |
| boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) |
| boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) |
| if boxAArea + boxBArea - interArea == 0: |
| return 0 |
| return interArea / float(boxAArea + boxBArea - interArea) |
|
|
| |
| def custom_nms(preds, iou_threshold=0.3): |
| preds = sorted(preds, key=lambda x: x["confidence"], reverse=True) |
| filtered_preds = [] |
| for pred in preds: |
| keep = True |
| for kept in filtered_preds: |
| if compute_iou(pred["box"], kept["box"]) > iou_threshold: |
| keep = False |
| break |
| if keep: |
| filtered_preds.append(pred) |
| return filtered_preds |
|
|
| |
| |
| |
| |
| def process_image(job_id, image_path, object_type, multiplier): |
| try: |
| jobs[job_id]['progress'] = 10 |
| |
| image = cv2.imread(image_path) |
| if image is None: |
| jobs[job_id]['progress'] = 100 |
| jobs[job_id]['result'] = {"error": "Could not read the image."} |
| return |
|
|
| jobs[job_id]['progress'] = 20 |
| img_height, img_width = image.shape[:2] |
| |
| thickness = max(2, int(min(img_width, img_height) / 300)) * multiplier |
| detection_info = [] |
|
|
| if object_type == "box": |
| if box_model is None: |
| jobs[job_id]['progress'] = 100 |
| jobs[job_id]['result'] = {"error": "Roboflow model not available."} |
| return |
|
|
| |
| |
| scale_factor = 1 |
| if img_width < 1000 or img_height < 1000: |
| scale_factor = 2 |
|
|
| |
| if scale_factor > 1: |
| upscaled_image = cv2.resize(image, None, fx=scale_factor, fy=scale_factor, interpolation=cv2.INTER_LINEAR) |
| temp_path = "upscaled.jpg" |
| cv2.imwrite(temp_path, upscaled_image) |
| results = box_model.predict(temp_path, confidence=50, overlap=10).json() |
| else: |
| results = box_model.predict(image_path, confidence=50, overlap=10).json() |
|
|
| predictions = results.get("predictions", []) |
| processed_preds = [] |
| for prediction in predictions: |
| try: |
| if scale_factor > 1: |
| x = prediction["x"] / scale_factor |
| y = prediction["y"] / scale_factor |
| width = prediction["width"] / scale_factor |
| height = prediction["height"] / scale_factor |
| else: |
| x = prediction["x"] |
| y = prediction["y"] |
| width = prediction["width"] |
| height = prediction["height"] |
|
|
| |
| x1 = int(round(x - width / 2)) |
| y1 = int(round(y - height / 2)) |
| x2 = int(round(x + width / 2)) |
| y2 = int(round(y + height / 2)) |
| |
| x1 = max(0, min(x1, img_width - 1)) |
| y1 = max(0, min(y1, img_height - 1)) |
| x2 = max(0, min(x2, img_width - 1)) |
| y2 = max(0, min(y2, img_height - 1)) |
| processed_preds.append({ |
| "box": (x1, y1, x2, y2), |
| "class": prediction["class"], |
| "confidence": prediction["confidence"] |
| }) |
| except Exception as e: |
| continue |
|
|
| |
| box_detections = custom_nms(processed_preds, iou_threshold=0.3) |
| jobs[job_id]['progress'] = 60 |
|
|
| |
| marker_real_width_cm = 5.0 |
| try: |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_250) |
| if hasattr(cv2.aruco, 'DetectorParameters_create'): |
| aruco_params = cv2.aruco.DetectorParameters_create() |
| else: |
| aruco_params = cv2.aruco.DetectorParameters() |
| corners, ids, _ = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=aruco_params) |
| if ids is not None and len(corners) > 0: |
| marker_corners = corners[0].reshape((4, 2)) |
| cv2.aruco.drawDetectedMarkers(image, corners, ids) |
| |
| min_x = np.min(marker_corners[:, 0]) |
| max_x = np.max(marker_corners[:, 0]) |
| min_y = np.min(marker_corners[:, 1]) |
| max_y = np.max(marker_corners[:, 1]) |
| width_pixels = max_x - min_x |
| height_pixels = max_y - min_y |
| if width_pixels > 0 and height_pixels > 0: |
| |
| conversion_factor = (marker_real_width_cm / width_pixels + marker_real_width_cm / height_pixels) / 2 |
| else: |
| conversion_factor = None |
| else: |
| conversion_factor = None |
| except Exception as e: |
| conversion_factor = None |
|
|
| |
| for pred in box_detections: |
| x1, y1, x2, y2 = pred["box"] |
| label = pred["class"] |
| confidence = pred["confidence"] |
| cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), thickness) |
| if conversion_factor is not None: |
| box_width_pixels = x2 - x1 |
| box_height_pixels = y2 - y1 |
| box_width_cm = box_width_pixels * conversion_factor |
| box_height_cm = box_height_pixels * conversion_factor |
| detection_info.append({ |
| "class": label, |
| "confidence": f"{confidence:.2f}", |
| "width_cm": f"{box_width_cm:.1f}", |
| "height_cm": f"{box_height_cm:.1f}" |
| }) |
| else: |
| detection_info.append({ |
| "class": label, |
| "confidence": f"{confidence:.2f}", |
| "width_cm": "N/A", |
| "height_cm": "N/A" |
| }) |
| text = f"{label} ({confidence:.2f})" |
| (text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) |
| cv2.rectangle(image, (x1, y1 - text_height - baseline - 5), (x1 + text_width, y1 - 5), (0, 255, 0), -1) |
| cv2.putText(image, text, (x1, y1 - 5 - baseline), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) |
|
|
| elif object_type in {"person", "car"}: |
| if yolov5_model is None: |
| jobs[job_id]['progress'] = 100 |
| jobs[job_id]['result'] = {"error": "YOLOv5 model not available."} |
| return |
| try: |
| img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
| yolo_results = yolov5_model(img_rgb) |
| df = yolo_results.pandas().xyxy[0] |
| for _, row in df.iterrows(): |
| if row['name'] == object_type: |
| xmin = int(row['xmin']) |
| ymin = int(row['ymin']) |
| xmax = int(row['xmax']) |
| ymax = int(row['ymax']) |
| conf = row['confidence'] |
| label = row['name'] |
| cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (255, 0, 0), thickness) |
| text = f"{label} ({conf:.2f})" |
| (text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) |
| cv2.rectangle(image, (xmin, ymin - text_height - baseline - 5), (xmin + text_width, ymin - 5), (255, 0, 0), -1) |
| cv2.putText(image, text, (xmin, ymin - 5 - baseline), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1) |
| detection_info.append({ |
| "class": label, |
| "confidence": f"{conf:.2f}", |
| "width_cm": "N/A", |
| "height_cm": "N/A" |
| }) |
| except Exception as e: |
| jobs[job_id]['progress'] = 100 |
| jobs[job_id]['result'] = {"error": "Error during YOLOv5 inference."} |
| return |
|
|
| |
| detection_counts = Counter(det["class"] for det in detection_info) |
| if detection_counts: |
| top_text = ", ".join(f"{cls}: {count}" for cls, count in detection_counts.items()) |
| (info_width, info_height), info_baseline = cv2.getTextSize(top_text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2) |
| cv2.rectangle(image, (5, 5), (5 + info_width, 5 + info_height + info_baseline), (0, 255, 0), -1) |
| cv2.putText(image, top_text, (5, 5 + info_height), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
| jobs[job_id]['progress'] = 100 |
| retval, buffer = cv2.imencode('.jpg', image) |
| image_data = base64.b64encode(buffer).decode('utf-8') |
| jobs[job_id]['result'] = {"image_data": image_data, "detection_info": detection_info} |
| except Exception as e: |
| jobs[job_id]['progress'] = 100 |
| jobs[job_id]['result'] = {"error": "Unexpected error during processing."} |
|
|
| |
| |
| |
|
|
| landing_template = ''' |
| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| <title>MathLens</title> |
| <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css"> |
| <style> |
| @import url('https://fonts.googleapis.com/css2?family=Share+Tech+Mono&display=swap'); |
| body { background-color: #fff; color: #000; font-family: "Share Tech Mono", monospace; |
| text-align: center; display: flex; flex-direction: column; justify-content: center; |
| align-items: center; min-height: 100vh; padding: 20px; } |
| h1 { font-size: 2.5rem; margin-bottom: 20px; } |
| p { font-size: 1.5rem; margin-bottom: 40px; } |
| .btn { display: inline-block; margin: 10px; padding: 15px 30px; |
| font-size: 1.2rem; text-decoration: none; border: 2px solid #000; |
| color: #000; transition: background-color 0.3s, color 0.3s; } |
| .btn:hover { background-color: #000; color: #fff; } |
| </style> |
| </head> |
| <body> |
| <h1>MathLens</h1> |
| <p>What do you want to count?</p> |
| <div> |
| <a href="{{ url_for('upload') }}?object_type=person" class="btn">People</a> |
| <a href="{{ url_for('upload') }}?object_type=car" class="btn">Cars</a> |
| <a href="{{ url_for('upload') }}?object_type=box" class="btn">Boxes</a> |
| </div> |
| </body> |
| </html> |
| ''' |
|
|
| upload_template = ''' |
| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| <title>MathLens - AI Detection & Measurement</title> |
| <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css"> |
| <style> |
| @import url('https://fonts.googleapis.com/css2?family=Share+Tech+Mono&display=swap'); |
| body { background-color: #fff; color: #000; font-family: "Share Tech Mono", monospace; |
| text-align: center; display: flex; flex-direction: column; justify-content: center; |
| align-items: center; min-height: 100vh; padding: 20px; } |
| .typing-effect { font-size: 2rem; font-weight: bold; margin-bottom: 20px; |
| height: 50px; white-space: nowrap; } |
| form { margin-bottom: 20px; } |
| input[type="file"], button { display: block; margin: 10px auto; padding: 10px; |
| background: none; border: 2px solid #000; color: #000; |
| font-size: 1rem; font-family: "Share Tech Mono", monospace; |
| cursor: pointer; } |
| input[type="file"]::file-selector-button { background: none; border: none; color: #000; } |
| .home-btn { display: inline-block; margin: 10px auto 20px; padding: 10px 20px; |
| border: 2px solid #000; color: #000; text-decoration: none; |
| font-family: "Share Tech Mono", monospace; transition: background-color 0.3s, color 0.3s; } |
| .home-btn:hover { background-color: #000; color: #fff; } |
| /* Progress overlay styles */ |
| #progressOverlay { position: fixed; top: 0; left: 0; width: 100%; height: 100%; |
| background: rgba(255,255,255,0.9); display: none; |
| align-items: center; justify-content: center; flex-direction: column; |
| z-index: 9999; } |
| #progressContainer { width: 80%; max-width: 400px; } |
| #progressBar { height: 20px; width: 0; background-color: #000; border-radius: 10px; |
| transition: width 0.2s linear; } |
| #progressText { margin-top: 10px; font-size: 1.2rem; } |
| .content-wrapper { display: flex; flex-direction: row; align-items: center; |
| justify-content: space-evenly; width: 100%; max-width: 1200px; |
| flex-wrap: wrap; gap: 20px; } |
| .result-img { max-width: 100%; border: 2px solid #000; } |
| table { width: 100%; max-width: 600px; border-collapse: collapse; } |
| th, td { border: 1px solid #000; padding: 5px; text-align: center; } |
| .footer { margin-top: 20px; font-size: 0.9em; color: #000; } |
| @media (max-width: 768px) { |
| .typing-effect { font-size: 1.5rem; } |
| .content-wrapper { flex-direction: column; align-items: center; } |
| .result-img { max-width: 90%; } |
| table { max-width: 100%; } |
| } |
| </style> |
| </head> |
| <body> |
| <!-- Home button --> |
| <a href="{{ url_for('landing') }}" class="home-btn">Home</a> |
| <!-- Progress overlay --> |
| <div id="progressOverlay"> |
| <div id="progressContainer"> |
| <div id="progressBar"></div> |
| <div id="progressText">Starting up... ๐ ๏ธ</div> |
| </div> |
| </div> |
| <div class="typing-effect" id="typing"></div> |
| <!-- The file upload form (submission handled via AJAX) --> |
| <form id="uploadForm"> |
| <input type="file" name="file" accept="image/*" required> |
| <!-- Hidden fields to pass the selected object type and device multiplier --> |
| <input type="hidden" name="object_type" value="{{ object_type }}"> |
| <input type="hidden" name="multiplier" id="multiplier" value="1"> |
| <button type="submit">Analyze Image</button> |
| </form> |
| <div id="resultContainer"> |
| {% if image_data or detection_info %} |
| <div class="content-wrapper"> |
| <img src="data:image/jpeg;base64,{{ image_data }}" alt="Processed Image" class="result-img"> |
| <table> |
| <thead> |
| <tr> |
| <th>#</th> |
| <th>Class</th> |
| <th>Confidence</th> |
| <th>Width (cm)</th> |
| <th>Height (cm)</th> |
| </tr> |
| </thead> |
| <tbody> |
| {% for det in detection_info %} |
| <tr> |
| <td>{{ loop.index }}</td> |
| <td>{{ det.class }}</td> |
| <td>{{ det.confidence }}</td> |
| <td>{{ det.width_cm }}</td> |
| <td>{{ det.height_cm }}</td> |
| </tr> |
| {% endfor %} |
| </tbody> |
| </table> |
| </div> |
| {% endif %} |
| </div> |
| <div class="footer">© 2024 MathLens AI Detection App. All rights reserved.</div> |
| <script> |
| // Handle typing effect |
| const textArray = ["MathLens", "Smart Counting with Maths"]; |
| let textIndex = 0, charIndex = 0, isDeleting = false; |
| const typingElement = document.getElementById("typing"); |
| function typeEffect() { |
| let currentText = textArray[textIndex]; |
| if (isDeleting) { |
| typingElement.textContent = currentText.substring(0, charIndex--); |
| } else { |
| typingElement.textContent = currentText.substring(0, charIndex++); |
| } |
| if (!isDeleting && charIndex === currentText.length) { |
| setTimeout(() => { isDeleting = true; typeEffect(); }, 3000); |
| } else if (isDeleting && charIndex === 0) { |
| isDeleting = false; |
| textIndex = (textIndex + 1) % textArray.length; |
| setTimeout(typeEffect, 500); |
| } else { |
| setTimeout(typeEffect, isDeleting ? 50 : 100); |
| } |
| } |
| document.addEventListener("DOMContentLoaded", () => { |
| setTimeout(typeEffect, 500); |
| // Detect if the device is mobile and update the thickness multiplier accordingly. |
| var isMobile = /Mobi|Android/i.test(navigator.userAgent); |
| document.getElementById("multiplier").value = isMobile ? 2 : 1; |
| }); |
| // AJAX-based submission and progress polling |
| const uploadForm = document.getElementById("uploadForm"); |
| uploadForm.addEventListener("submit", function(e) { |
| e.preventDefault(); |
| const formData = new FormData(uploadForm); |
| // Show progress overlay |
| document.getElementById("progressOverlay").style.display = "flex"; |
| // Start the analysis job |
| fetch("{{ url_for('analyze') }}", { |
| method: "POST", |
| body: formData |
| }) |
| .then(response => response.json()) |
| .then(data => { |
| const jobId = data.job_id; |
| // Start polling progress every 500ms |
| const progressInterval = setInterval(() => { |
| fetch("{{ url_for('progress') }}?job_id=" + jobId) |
| .then(response => response.json()) |
| .then(progData => { |
| const progress = progData.progress; |
| document.getElementById("progressBar").style.width = progress + "%"; |
| if (progress < 10) { |
| document.getElementById("progressText").textContent = "Starting up... ๐ ๏ธ"; |
| } else if (progress < 30) { |
| document.getElementById("progressText").textContent = "Writing scripts... ๐ค"; |
| } else if (progress < 50) { |
| document.getElementById("progressText").textContent = "Calculating formulas... ๐งฎ"; |
| } else if (progress < 70) { |
| document.getElementById("progressText").textContent = "Crunching numbers... ๐ข"; |
| } else if (progress < 90) { |
| document.getElementById("progressText").textContent = "Almost there... ๐"; |
| } else { |
| document.getElementById("progressText").textContent = "Finalizing... ๐"; |
| } |
| if (progress >= 100) { |
| clearInterval(progressInterval); |
| fetch("{{ url_for('result') }}?job_id=" + jobId) |
| .then(response => response.json()) |
| .then(resultData => { |
| document.getElementById("progressOverlay").style.display = "none"; |
| document.getElementById("resultContainer").innerHTML = ` |
| <div class="content-wrapper"> |
| <img src="data:image/jpeg;base64,${resultData.image_data}" alt="Processed Image" class="result-img"> |
| ${buildTableHTML(resultData.detection_info)} |
| </div>`; |
| }); |
| } |
| }); |
| }, 500); |
| }); |
| }); |
| function buildTableHTML(detectionInfo) { |
| if (!detectionInfo || detectionInfo.length === 0) return ""; |
| let tableHTML = `<table> |
| <thead> |
| <tr> |
| <th>#</th> |
| <th>Class</th> |
| <th>Confidence</th> |
| <th>Width (cm)</th> |
| <th>Height (cm)</th> |
| </tr> |
| </thead> |
| <tbody>`; |
| detectionInfo.forEach((det, index) => { |
| tableHTML += `<tr> |
| <td>${index+1}</td> |
| <td>${det.class}</td> |
| <td>${det.confidence}</td> |
| <td>${det.width_cm}</td> |
| <td>${det.height_cm}</td> |
| </tr>`; |
| }); |
| tableHTML += `</tbody></table>`; |
| return tableHTML; |
| } |
| </script> |
| </body> |
| </html> |
| ''' |
|
|
| |
| |
| |
|
|
| @app.route('/') |
| def landing(): |
| return render_template_string(landing_template) |
|
|
| @app.route('/upload', methods=['GET']) |
| def upload(): |
| object_type = request.args.get('object_type', '').lower() |
| if object_type not in {"person", "car", "box"}: |
| flash("Please select a valid object type.") |
| return redirect(url_for('landing')) |
| return render_template_string(upload_template, object_type=object_type) |
|
|
| @app.route('/analyze', methods=['POST']) |
| def analyze(): |
| if 'file' not in request.files: |
| return jsonify({"error": "No file provided."}), 400 |
| file = request.files['file'] |
| if file.filename == '': |
| return jsonify({"error": "No selected file."}), 400 |
| object_type = request.form.get('object_type', '').lower() |
| if object_type not in {"person", "car", "box"}: |
| return jsonify({"error": "Invalid object type."}), 400 |
| try: |
| multiplier = int(request.form.get('multiplier', 1)) |
| except ValueError: |
| multiplier = 1 |
| upload_path = "uploaded.jpg" |
| try: |
| file.save(upload_path) |
| except Exception as e: |
| return jsonify({"error": "Error saving file."}), 500 |
|
|
| job_id = str(uuid.uuid4()) |
| jobs[job_id] = {"progress": 0, "result": None} |
| thread = threading.Thread(target=process_image, args=(job_id, upload_path, object_type, multiplier)) |
| thread.start() |
| return jsonify({"job_id": job_id}) |
|
|
| @app.route('/progress', methods=['GET']) |
| def progress(): |
| job_id = request.args.get('job_id', '') |
| if job_id not in jobs: |
| return jsonify({"progress": 0}) |
| return jsonify({"progress": jobs[job_id].get("progress", 0)}) |
|
|
| @app.route('/result', methods=['GET']) |
| def result(): |
| job_id = request.args.get('job_id', '') |
| if job_id not in jobs or jobs[job_id].get("result") is None: |
| return jsonify({"error": "Result not available."}), 404 |
| result = jobs[job_id]["result"] |
| del jobs[job_id] |
| return jsonify(result) |
|
|
| |
| |
| |
|
|
| if __name__ == '__main__': |
| app.run(host="0.0.0.0", port=7860, threaded=True) |
|
|