| | |
| | |
| |
|
| | import os |
| | import cv2 |
| | import torch |
| | import numpy as np |
| | from . import util |
| | from .wholebody import Wholebody, HWC3, resize_image |
| | from PIL import Image |
| | import onnxruntime as ort |
| | from concurrent.futures import ThreadPoolExecutor |
| | import threading |
| |
|
| | os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" |
| |
|
| | def convert_to_numpy(image): |
| | if isinstance(image, Image.Image): |
| | image = np.array(image) |
| | elif isinstance(image, torch.Tensor): |
| | image = image.detach().cpu().numpy() |
| | elif isinstance(image, np.ndarray): |
| | image = image.copy() |
| | else: |
| | raise f'Unsurpport datatype{type(image)}, only surpport np.ndarray, torch.Tensor, Pillow Image.' |
| | return image |
| |
|
| | def draw_pose(pose, H, W, use_hand=False, use_body=False, use_face=False): |
| | bodies = pose['bodies'] |
| | faces = pose['faces'] |
| | hands = pose['hands'] |
| | candidate = bodies['candidate'] |
| | subset = bodies['subset'] |
| | canvas = np.zeros(shape=(H, W, 3), dtype=np.uint8) |
| |
|
| | if use_body: |
| | canvas = util.draw_bodypose(canvas, candidate, subset) |
| | if use_hand: |
| | canvas = util.draw_handpose(canvas, hands) |
| | if use_face: |
| | canvas = util.draw_facepose(canvas, faces) |
| |
|
| | return canvas |
| |
|
| |
|
| | class OptimizedWholebody: |
| | """Optimized version of Wholebody for faster serial processing""" |
| | def __init__(self, onnx_det, onnx_pose, device='cuda:0'): |
| | providers = ['CPUExecutionProvider'] if device == 'cpu' else ['CUDAExecutionProvider'] |
| | self.session_det = ort.InferenceSession(path_or_bytes=onnx_det, providers=providers) |
| | self.session_pose = ort.InferenceSession(path_or_bytes=onnx_pose, providers=providers) |
| | self.device = device |
| | |
| | |
| | self.session_det.set_providers(providers) |
| | self.session_pose.set_providers(providers) |
| | |
| | |
| | self.det_input_name = self.session_det.get_inputs()[0].name |
| | self.pose_input_name = self.session_pose.get_inputs()[0].name |
| | self.pose_output_names = [out.name for out in self.session_pose.get_outputs()] |
| | |
| | def __call__(self, ori_img): |
| | from .onnxdet import inference_detector |
| | from .onnxpose import inference_pose |
| | |
| | det_result = inference_detector(self.session_det, ori_img) |
| | keypoints, scores = inference_pose(self.session_pose, det_result, ori_img) |
| |
|
| | keypoints_info = np.concatenate( |
| | (keypoints, scores[..., None]), axis=-1) |
| | |
| | neck = np.mean(keypoints_info[:, [5, 6]], axis=1) |
| | |
| | neck[:, 2:4] = np.logical_and( |
| | keypoints_info[:, 5, 2:4] > 0.3, |
| | keypoints_info[:, 6, 2:4] > 0.3).astype(int) |
| | new_keypoints_info = np.insert( |
| | keypoints_info, 17, neck, axis=1) |
| | mmpose_idx = [ |
| | 17, 6, 8, 10, 7, 9, 12, 14, 16, 13, 15, 2, 1, 4, 3 |
| | ] |
| | openpose_idx = [ |
| | 1, 2, 3, 4, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17 |
| | ] |
| | new_keypoints_info[:, openpose_idx] = \ |
| | new_keypoints_info[:, mmpose_idx] |
| | keypoints_info = new_keypoints_info |
| |
|
| | keypoints, scores = keypoints_info[ |
| | ..., :2], keypoints_info[..., 2] |
| | |
| | return keypoints, scores, det_result |
| |
|
| |
|
| | class PoseAnnotator: |
| | def __init__(self, cfg, device=None): |
| | onnx_det = cfg['DETECTION_MODEL'] |
| | onnx_pose = cfg['POSE_MODEL'] |
| | self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if device is None else device |
| | self.pose_estimation = Wholebody(onnx_det, onnx_pose, device=self.device) |
| | self.resize_size = cfg.get("RESIZE_SIZE", 1024) |
| | self.use_body = cfg.get('USE_BODY', True) |
| | self.use_face = cfg.get('USE_FACE', True) |
| | self.use_hand = cfg.get('USE_HAND', True) |
| |
|
| | @torch.no_grad() |
| | @torch.inference_mode |
| | def forward(self, image): |
| | image = convert_to_numpy(image) |
| | input_image = HWC3(image[..., ::-1]) |
| | return self.process(resize_image(input_image, self.resize_size), image.shape[:2]) |
| |
|
| | def process(self, ori_img, ori_shape): |
| | ori_h, ori_w = ori_shape |
| | ori_img = ori_img.copy() |
| | H, W, C = ori_img.shape |
| | with torch.no_grad(): |
| | candidate, subset, det_result = self.pose_estimation(ori_img) |
| | |
| | if len(candidate) == 0: |
| | |
| | empty_ret_data = {} |
| | if self.use_body: |
| | empty_ret_data["detected_map_body"] = np.zeros((ori_h, ori_w, 3), dtype=np.uint8) |
| | if self.use_face: |
| | empty_ret_data["detected_map_face"] = np.zeros((ori_h, ori_w, 3), dtype=np.uint8) |
| | if self.use_body and self.use_face: |
| | empty_ret_data["detected_map_bodyface"] = np.zeros((ori_h, ori_w, 3), dtype=np.uint8) |
| | if self.use_hand and self.use_body and self.use_face: |
| | empty_ret_data["detected_map_handbodyface"] = np.zeros((ori_h, ori_w, 3), dtype=np.uint8) |
| | return empty_ret_data, np.array([]) |
| | |
| | nums, keys, locs = candidate.shape |
| | candidate[..., 0] /= float(W) |
| | candidate[..., 1] /= float(H) |
| | body = candidate[:, :18].copy() |
| | body = body.reshape(nums * 18, locs) |
| | score = subset[:, :18] |
| | for i in range(len(score)): |
| | for j in range(len(score[i])): |
| | if score[i][j] > 0.3: |
| | score[i][j] = int(18 * i + j) |
| | else: |
| | score[i][j] = -1 |
| |
|
| | un_visible = subset < 0.3 |
| | candidate[un_visible] = -1 |
| |
|
| | foot = candidate[:, 18:24] |
| | faces = candidate[:, 24:92] |
| | hands = candidate[:, 92:113] |
| | hands = np.vstack([hands, candidate[:, 113:]]) |
| |
|
| | bodies = dict(candidate=body, subset=score) |
| | pose = dict(bodies=bodies, hands=hands, faces=faces) |
| |
|
| | ret_data = {} |
| | if self.use_body: |
| | detected_map_body = draw_pose(pose, H, W, use_body=True) |
| | detected_map_body = cv2.resize(detected_map_body[..., ::-1], (ori_w, ori_h), |
| | interpolation=cv2.INTER_LANCZOS4 if ori_h * ori_w > H * W else cv2.INTER_AREA) |
| | ret_data["detected_map_body"] = detected_map_body |
| |
|
| | if self.use_face: |
| | detected_map_face = draw_pose(pose, H, W, use_face=True) |
| | detected_map_face = cv2.resize(detected_map_face[..., ::-1], (ori_w, ori_h), |
| | interpolation=cv2.INTER_LANCZOS4 if ori_h * ori_w > H * W else cv2.INTER_AREA) |
| | ret_data["detected_map_face"] = detected_map_face |
| |
|
| | if self.use_body and self.use_face: |
| | detected_map_bodyface = draw_pose(pose, H, W, use_body=True, use_face=True) |
| | detected_map_bodyface = cv2.resize(detected_map_bodyface[..., ::-1], (ori_w, ori_h), |
| | interpolation=cv2.INTER_LANCZOS4 if ori_h * ori_w > H * W else cv2.INTER_AREA) |
| | ret_data["detected_map_bodyface"] = detected_map_bodyface |
| |
|
| | if self.use_hand and self.use_body and self.use_face: |
| | detected_map_handbodyface = draw_pose(pose, H, W, use_hand=True, use_body=True, use_face=True) |
| | detected_map_handbodyface = cv2.resize(detected_map_handbodyface[..., ::-1], (ori_w, ori_h), |
| | interpolation=cv2.INTER_LANCZOS4 if ori_h * ori_w > H * W else cv2.INTER_AREA) |
| | ret_data["detected_map_handbodyface"] = detected_map_handbodyface |
| |
|
| | |
| | if det_result.shape[0] > 0: |
| | w_ratio, h_ratio = ori_w / W, ori_h / H |
| | det_result[..., ::2] *= h_ratio |
| | det_result[..., 1::2] *= w_ratio |
| | det_result = det_result.astype(np.int32) |
| | return ret_data, det_result |
| |
|
| |
|
| | class OptimizedPoseAnnotator(PoseAnnotator): |
| | """Optimized version using improved Wholebody class""" |
| | def __init__(self, cfg, device=None): |
| | onnx_det = cfg['DETECTION_MODEL'] |
| | onnx_pose = cfg['POSE_MODEL'] |
| | self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if device is None else device |
| | self.pose_estimation = OptimizedWholebody(onnx_det, onnx_pose, device=self.device) |
| | self.resize_size = cfg.get("RESIZE_SIZE", 1024) |
| | self.use_body = cfg.get('USE_BODY', True) |
| | self.use_face = cfg.get('USE_FACE', True) |
| | self.use_hand = cfg.get('USE_HAND', True) |
| |
|
| |
|
| | class PoseBodyFaceAnnotator(PoseAnnotator): |
| | def __init__(self, cfg): |
| | super().__init__(cfg) |
| | self.use_body, self.use_face, self.use_hand = True, True, False |
| | |
| | @torch.no_grad() |
| | @torch.inference_mode |
| | def forward(self, image): |
| | ret_data, det_result = super().forward(image) |
| | return ret_data['detected_map_bodyface'] |
| |
|
| |
|
| | class OptimizedPoseBodyFaceVideoAnnotator: |
| | """Optimized video annotator with multiple optimization strategies""" |
| | def __init__(self, cfg, num_workers=2, chunk_size=8): |
| | self.cfg = cfg |
| | self.num_workers = num_workers |
| | self.chunk_size = chunk_size |
| | self.use_body, self.use_face, self.use_hand = True, True, True |
| | |
| | |
| | self.annotators = [] |
| | for _ in range(num_workers): |
| | annotator = OptimizedPoseAnnotator(cfg) |
| | annotator.use_body, annotator.use_face, annotator.use_hand = True, True, True |
| | self.annotators.append(annotator) |
| | |
| | self._current_worker = 0 |
| | self._worker_lock = threading.Lock() |
| | |
| | def _get_annotator(self): |
| | """Get next available annotator in round-robin fashion""" |
| | with self._worker_lock: |
| | annotator = self.annotators[self._current_worker] |
| | self._current_worker = (self._current_worker + 1) % len(self.annotators) |
| | return annotator |
| | |
| | def _process_single_frame(self, frame_data): |
| | """Process a single frame with error handling""" |
| | frame, frame_idx = frame_data |
| | try: |
| | annotator = self._get_annotator() |
| | |
| | |
| | frame = convert_to_numpy(frame) |
| | input_image = HWC3(frame[..., ::-1]) |
| | resized_image = resize_image(input_image, annotator.resize_size) |
| | |
| | |
| | ret_data, _ = annotator.process(resized_image, frame.shape[:2]) |
| | |
| | if 'detected_map_handbodyface' in ret_data: |
| | return frame_idx, ret_data['detected_map_handbodyface'] |
| | else: |
| | |
| | h, w = frame.shape[:2] |
| | return frame_idx, np.zeros((h, w, 3), dtype=np.uint8) |
| | |
| | except Exception as e: |
| | print(f"Error processing frame {frame_idx}: {e}") |
| | |
| | h, w = frame.shape[:2] if hasattr(frame, 'shape') else (480, 640) |
| | return frame_idx, np.zeros((h, w, 3), dtype=np.uint8) |
| | |
| | def forward(self, frames): |
| | """Process video frames with optimizations""" |
| | if len(frames) == 0: |
| | return [] |
| | |
| | |
| | if len(frames) <= 4: |
| | annotator = self.annotators[0] |
| | ret_frames = [] |
| | for frame in frames: |
| | frame = convert_to_numpy(frame) |
| | input_image = HWC3(frame[..., ::-1]) |
| | resized_image = resize_image(input_image, annotator.resize_size) |
| | ret_data, _ = annotator.process(resized_image, frame.shape[:2]) |
| | |
| | if 'detected_map_handbodyface' in ret_data: |
| | ret_frames.append(ret_data['detected_map_handbodyface']) |
| | else: |
| | h, w = frame.shape[:2] |
| | ret_frames.append(np.zeros((h, w, 3), dtype=np.uint8)) |
| | return ret_frames |
| | |
| | |
| | frame_data = [(frame, idx) for idx, frame in enumerate(frames)] |
| | results = [None] * len(frames) |
| | |
| | |
| | for chunk_start in range(0, len(frame_data), self.chunk_size * self.num_workers): |
| | chunk_end = min(chunk_start + self.chunk_size * self.num_workers, len(frame_data)) |
| | chunk_data = frame_data[chunk_start:chunk_end] |
| | |
| | with ThreadPoolExecutor(max_workers=self.num_workers) as executor: |
| | chunk_results = list(executor.map(self._process_single_frame, chunk_data)) |
| | |
| | |
| | for frame_idx, result in chunk_results: |
| | results[frame_idx] = result |
| | |
| | return results |
| |
|
| |
|
| | class OptimizedPoseBodyFaceHandVideoAnnotator: |
| | """Optimized video annotator that includes hands, body, and face""" |
| | def __init__(self, cfg, num_workers=2, chunk_size=8): |
| | self.cfg = cfg |
| | self.num_workers = num_workers |
| | self.chunk_size = chunk_size |
| | self.use_body, self.use_face, self.use_hand = True, True, True |
| | |
| | |
| | self.annotators = [] |
| | for _ in range(num_workers): |
| | annotator = OptimizedPoseAnnotator(cfg) |
| | annotator.use_body, annotator.use_face, annotator.use_hand = True, True, True |
| | self.annotators.append(annotator) |
| | |
| | self._current_worker = 0 |
| | self._worker_lock = threading.Lock() |
| | |
| | def _get_annotator(self): |
| | """Get next available annotator in round-robin fashion""" |
| | with self._worker_lock: |
| | annotator = self.annotators[self._current_worker] |
| | self._current_worker = (self._current_worker + 1) % len(self.annotators) |
| | return annotator |
| | |
| | def _process_single_frame(self, frame_data): |
| | """Process a single frame with error handling""" |
| | frame, frame_idx = frame_data |
| | try: |
| | annotator = self._get_annotator() |
| | |
| | |
| | frame = convert_to_numpy(frame) |
| | input_image = HWC3(frame[..., ::-1]) |
| | resized_image = resize_image(input_image, annotator.resize_size) |
| | |
| | |
| | ret_data, _ = annotator.process(resized_image, frame.shape[:2]) |
| | |
| | if 'detected_map_handbodyface' in ret_data: |
| | return frame_idx, ret_data['detected_map_handbodyface'] |
| | else: |
| | |
| | h, w = frame.shape[:2] |
| | return frame_idx, np.zeros((h, w, 3), dtype=np.uint8) |
| | |
| | except Exception as e: |
| | print(f"Error processing frame {frame_idx}: {e}") |
| | |
| | h, w = frame.shape[:2] if hasattr(frame, 'shape') else (480, 640) |
| | return frame_idx, np.zeros((h, w, 3), dtype=np.uint8) |
| | |
| | def forward(self, frames): |
| | """Process video frames with optimizations""" |
| | if len(frames) == 0: |
| | return [] |
| | |
| | |
| | if len(frames) <= 4: |
| | annotator = self.annotators[0] |
| | ret_frames = [] |
| | for frame in frames: |
| | frame = convert_to_numpy(frame) |
| | input_image = HWC3(frame[..., ::-1]) |
| | resized_image = resize_image(input_image, annotator.resize_size) |
| | ret_data, _ = annotator.process(resized_image, frame.shape[:2]) |
| | |
| | if 'detected_map_handbodyface' in ret_data: |
| | ret_frames.append(ret_data['detected_map_handbodyface']) |
| | else: |
| | h, w = frame.shape[:2] |
| | ret_frames.append(np.zeros((h, w, 3), dtype=np.uint8)) |
| | return ret_frames |
| | |
| | |
| | frame_data = [(frame, idx) for idx, frame in enumerate(frames)] |
| | results = [None] * len(frames) |
| | |
| | |
| | for chunk_start in range(0, len(frame_data), self.chunk_size * self.num_workers): |
| | chunk_end = min(chunk_start + self.chunk_size * self.num_workers, len(frame_data)) |
| | chunk_data = frame_data[chunk_start:chunk_end] |
| | |
| | with ThreadPoolExecutor(max_workers=self.num_workers) as executor: |
| | chunk_results = list(executor.map(self._process_single_frame, chunk_data)) |
| | |
| | |
| | for frame_idx, result in chunk_results: |
| | results[frame_idx] = result |
| | |
| | return results |
| |
|
| |
|
| | |
| |
|
| | |
| | class PoseBodyFaceVideoAnnotator(OptimizedPoseBodyFaceVideoAnnotator): |
| | """Backward compatible class name - Body and Face only""" |
| | |
| | class PoseBodyFaceHandVideoAnnotator(OptimizedPoseBodyFaceHandVideoAnnotator): |
| | """Video annotator with hands, body, and face""" |
| | def __init__(self, cfg): |
| | super().__init__(cfg, num_workers=2, chunk_size=4) |
| |
|
| |
|
| | |
| | import imageio |
| |
|
| | def save_one_video(file_path, videos, fps=8, quality=8, macro_block_size=None): |
| | try: |
| | video_writer = imageio.get_writer(file_path, fps=fps, codec='libx264', quality=quality, macro_block_size=macro_block_size) |
| | for frame in videos: |
| | video_writer.append_data(frame) |
| | video_writer.close() |
| | return True |
| | except Exception as e: |
| | print(f"Video save error: {e}") |
| | return False |
| | |
| | def get_frames(video_path): |
| | frames = [] |
| | cap = cv2.VideoCapture(video_path) |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | print("video fps: " + str(fps)) |
| | i = 0 |
| | while cap.isOpened(): |
| | ret, frame = cap.read() |
| | if ret == False: |
| | break |
| | frames.append(frame) |
| | i += 1 |
| | cap.release() |
| | cv2.destroyAllWindows() |
| | return frames, fps |