Spaces:
Sleeping
Sleeping
| import os | |
| import cv2 | |
| import joblib | |
| import numpy as np | |
| from pathlib import Path | |
| class CameraMotionVisualizer: | |
| def draw_motion_grid(frame, transform_matrix, grid_size=32): | |
| if transform_matrix is None: | |
| return frame | |
| h, w = frame.shape[:2] | |
| for y in range(0, h, grid_size): | |
| for x in range(0, w, grid_size): | |
| start = np.array([x, y, 1]) | |
| end = np.dot(transform_matrix, start) | |
| if abs(end[0] - x) > 1 or abs(end[1] - y) > 1: | |
| cv2.arrowedLine( | |
| frame, | |
| (int(x), int(y)), | |
| (int(end[0]), int(end[1])), | |
| (0, 255, 0), | |
| 1, | |
| tipLength=0.2 | |
| ) | |
| return frame | |
| class SlidingWindowRefiner: | |
| def __init__(self): | |
| self.sift = cv2.SIFT_create(nfeatures=2000) | |
| FLANN_INDEX_KDTREE = 1 | |
| index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) | |
| search_params = dict(checks=50) | |
| self.flann = cv2.FlannBasedMatcher(index_params, search_params) | |
| self.scale_levels = 3 | |
| self.scale_step = 1.2 | |
| self.scale_factor = 2.0 | |
| self.overlap = 0.3 | |
| self.template = None | |
| self.template_kp = None | |
| self.template_desc = None | |
| def initialize_template(self, gray, bbox): | |
| x, y, w, h = map(int, bbox) | |
| self.template = gray[y:y+h, x:x+w].copy() | |
| self.template_kp, self.template_desc = self.sift.detectAndCompute( | |
| self.template, None | |
| ) | |
| def generate_windows(self, img_shape, prev_bbox, transform_matrix=None): | |
| x, y, w, h = map(int, prev_bbox) | |
| if transform_matrix is not None: | |
| center = np.array([[x + w/2, y + h/2, 1]]).T | |
| transformed = np.dot(transform_matrix, center) | |
| x = int(transformed[0] - w/2) | |
| y = int(transformed[1] - h/2) | |
| windows = [] | |
| for scale in np.linspace(1/self.scale_step, self.scale_step, self.scale_levels): | |
| ww = int(w * self.scale_factor * scale) | |
| hh = int(h * self.scale_factor * scale) | |
| step_x = int(ww * (1 - self.overlap)) | |
| step_y = int(hh * (1 - self.overlap)) | |
| cx, cy = x + w // 2, y + h // 2 | |
| for dy in range(-step_y, step_y + 1, max(1, step_y // 2)): | |
| for dx in range(-step_x, step_x + 1, max(1, step_x // 2)): | |
| wx = max(0, min(cx - ww // 2 + dx, img_shape[1] - ww)) | |
| wy = max(0, min(cy - hh // 2 + dy, img_shape[0] - hh)) | |
| windows.append((wx, wy, ww, hh)) | |
| return windows | |
| def score_window(self, gray, window): | |
| if self.template_desc is None: | |
| return 0 | |
| x, y, w, h = map(int, window) | |
| roi = gray[y:y+h, x:x+w] | |
| if roi.shape[0] < 20 or roi.shape[1] < 20: | |
| return 0 | |
| roi = cv2.resize(roi, self.template.shape[::-1]) | |
| kp, desc = self.sift.detectAndCompute(roi, None) | |
| if desc is None or len(desc) < 2: | |
| return 0 | |
| if len(self.template_desc) < 2: | |
| return 0 | |
| matches = self.flann.knnMatch(self.template_desc, desc, k=2) | |
| good = [] | |
| for match_pair in matches: | |
| if len(match_pair) == 2: | |
| m, n = match_pair | |
| if m.distance < 0.7 * n.distance: | |
| good.append(m) | |
| if not good: | |
| return 0 | |
| avg_dist = np.mean([m.distance for m in good]) | |
| return len(good) * (1 - avg_dist / 512) | |
| # ================================ | |
| # 🔹 ORIGINAL INFERENCE CLASS | |
| # ================================ | |
| class ObjectTrackerInference: | |
| def __init__(self, model_dir='models'): | |
| self.model_dir = model_dir | |
| print("Loading pre-trained models...") | |
| self.position_model = joblib.load(os.path.join(model_dir, 'position_model.joblib')) | |
| self.size_model = joblib.load(os.path.join(model_dir, 'size_model.joblib')) | |
| self.position_scaler = joblib.load(os.path.join(model_dir, 'position_scaler.joblib')) | |
| self.size_scaler = joblib.load(os.path.join(model_dir, 'size_scaler.joblib')) | |
| print("Models loaded successfully!") | |
| self.sift = cv2.SIFT_create(nfeatures=2000) | |
| self.orb = cv2.ORB_create(nfeatures=1000) | |
| self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) | |
| self.prev_frame = None | |
| self.prev_kp = None | |
| self.prev_desc = None | |
| # 🔹 ADDITIVE | |
| self.window_refiner = SlidingWindowRefiner() | |
| self.template_initialized = False | |
| def estimate_camera_motion(self, frame): | |
| if frame is None: | |
| return np.eye(2, 3, dtype=np.float32) | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| kp, desc = self.orb.detectAndCompute(gray, None) | |
| if self.prev_frame is None: | |
| self.prev_frame = gray | |
| self.prev_kp = kp | |
| self.prev_desc = desc | |
| return np.eye(2, 3, dtype=np.float32) | |
| if desc is None or self.prev_desc is None or len(desc) < 4 or len(self.prev_desc) < 4: | |
| return np.eye(2, 3, dtype=np.float32) | |
| matches = self.matcher.match(self.prev_desc, desc) | |
| if len(matches) < 4: | |
| return np.eye(2, 3, dtype=np.float32) | |
| matches = sorted(matches, key=lambda x: x.distance) | |
| good_matches = matches[:min(len(matches), 50)] | |
| src_pts = np.float32([self.prev_kp[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2) | |
| dst_pts = np.float32([kp[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2) | |
| transform_matrix, _ = cv2.estimateAffinePartial2D(src_pts, dst_pts) | |
| if transform_matrix is None: | |
| transform_matrix = np.eye(2, 3, dtype=np.float32) | |
| self.prev_frame = gray | |
| self.prev_kp = kp | |
| self.prev_desc = desc | |
| return transform_matrix | |
| def local_binary_pattern(self, image, n_points=8, radius=1): | |
| rows, cols = image.shape | |
| output = np.zeros((rows, cols)) | |
| for i in range(radius, rows-radius): | |
| for j in range(radius, cols-radius): | |
| center = image[i, j] | |
| pattern = 0 | |
| for k in range(n_points): | |
| angle = 2 * np.pi * k / n_points | |
| x = j + radius * np.cos(angle) | |
| y = i - radius * np.sin(angle) | |
| x1, x2 = int(np.floor(x)), int(np.ceil(x)) | |
| y1, y2 = int(np.floor(y)), int(np.ceil(y)) | |
| f11 = image[y1, x1] | |
| f12 = image[y1, x2] | |
| f21 = image[y2, x1] | |
| f22 = image[y2, x2] | |
| x_weight = x - x1 | |
| y_weight = y - y1 | |
| pixel_value = (f11 * (1-x_weight) * (1-y_weight) + | |
| f21 * (1-x_weight) * y_weight + | |
| f12 * x_weight * (1-y_weight) + | |
| f22 * x_weight * y_weight) | |
| pattern |= (pixel_value > center) << k | |
| output[i, j] = pattern | |
| return output | |
| def extract_features(self, frame, bbox, transform_matrix=None): | |
| if frame is None: | |
| return None | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| x, y, w, h = map(int, bbox) | |
| x = max(0, min(x, gray.shape[1] - w)) | |
| y = max(0, min(y, gray.shape[0] - h)) | |
| w = min(w, gray.shape[1] - x) | |
| h = min(h, gray.shape[0] - y) | |
| roi = gray[y:y+h, x:x+w] | |
| if roi.size == 0: | |
| roi = gray | |
| roi = cv2.resize(roi, (64, 64)) | |
| features = [] | |
| hog = cv2.HOGDescriptor((64,64), (16,16), (8,8), (8,8), 9) | |
| hog_features = hog.compute(roi) | |
| features.extend(hog_features.flatten()[:64]) | |
| lbp = self.local_binary_pattern(roi, n_points=8, radius=1) | |
| features.extend([ | |
| np.mean(lbp), | |
| np.std(lbp), | |
| *np.percentile(lbp, [25, 50, 75]) | |
| ]) | |
| if transform_matrix is not None: | |
| features.extend([ | |
| transform_matrix[0,0], | |
| transform_matrix[1,1], | |
| transform_matrix[0,2], | |
| transform_matrix[1,2] | |
| ]) | |
| else: | |
| features.extend([1, 1, 0, 0]) | |
| features.extend([x, y, w, h]) | |
| return np.array(features).reshape(1, -1) | |
| def predict_bbox(self, features): | |
| features_position = self.position_scaler.transform(features) | |
| features_size = self.size_scaler.transform(features) | |
| position_pred = self.position_model.predict(features_position) | |
| size_pred = self.size_model.predict(features_size) | |
| bbox = np.hstack([position_pred, size_pred])[0] | |
| return bbox | |
| def track_video(self, video_path, initial_bbox, output_path='output_tracked.mp4', fps=30): | |
| print(f"Processing video: {video_path}") | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Could not open video: {video_path}") | |
| frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| print(f"Video: {frame_width}x{frame_height}, {total_frames} frames") # 🔹 ADD THIS | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) | |
| self.prev_frame = None | |
| self.prev_kp = None | |
| self.prev_desc = None | |
| current_bbox = initial_bbox | |
| frame_idx = 0 | |
| print("Tracking object...") # 🔹 ADD THIS | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| transform_matrix = self.estimate_camera_motion(frame) | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| if not self.template_initialized: | |
| self.window_refiner.initialize_template(gray, current_bbox) | |
| self.template_initialized = True | |
| windows = self.window_refiner.generate_windows( | |
| frame.shape, current_bbox, transform_matrix | |
| ) | |
| best_score = -1 | |
| best_window = None | |
| for win in windows: | |
| score = self.window_refiner.score_window(gray, win) | |
| xw, yw, ww, hh = map(int, win) | |
| cv2.rectangle(frame, (xw, yw), (xw+ww, yw+hh), (0, 255, 255), 1) | |
| if score > best_score: | |
| best_score = score | |
| best_window = win | |
| if best_window is not None: | |
| current_bbox = best_window | |
| features = self.extract_features(frame, current_bbox, transform_matrix) | |
| if features is not None: | |
| current_bbox = self.predict_bbox(features) | |
| frame = CameraMotionVisualizer.draw_motion_grid(frame, transform_matrix) | |
| x, y, w, h = map(int, current_bbox) | |
| cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), 2) | |
| cv2.putText(frame, f'Frame: {frame_idx}', (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | |
| out.write(frame) | |
| frame_idx += 1 | |
| if frame_idx % 30 == 0: # 🔹 ADD THIS | |
| print(f"Processed {frame_idx}/{total_frames} frames") | |
| cap.release() | |
| out.release() | |
| print(f"Tracking complete! Video saved to: {output_path}") # 🔹 ADD THIS | |
| return output_path | |