| import json |
| import math |
| from pathlib import Path |
| import numpy as np |
| import cv2 |
| from pycocotools import mask as mask_utils |
| from PIL import Image, ExifTags |
|
|
| def get_image_dimensions(image_path): |
| """更健壮的尺寸获取方法,包含多种异常处理""" |
| try: |
| |
| with Image.open(image_path) as img: |
| width, height = img.size |
| orientation = 1 |
| |
| try: |
| exif = img._getexif() or {} |
| for tag, name in ExifTags.TAGS.items(): |
| if name == 'Orientation': |
| orientation = exif.get(tag, 1) |
| break |
| except Exception as e: |
| print(f"EXIF读取警告 [{image_path.name}]: {str(e)}") |
|
|
| |
| if orientation in [5, 6, 7, 8]: |
| return height, width |
| else: |
| return width, height |
| |
| except Exception as pil_error: |
| print(f"PIL读取失败 [{image_path.name}], 尝试OpenCV: {str(pil_error)}") |
| try: |
| |
| img = cv2.imread(str(image_path)) |
| if img is not None: |
| h, w = img.shape[:2] |
| return w, h |
| raise ValueError("OpenCV返回空图像") |
| except Exception as cv_error: |
| print(f"严重错误: 无法获取尺寸 [{image_path.name}]: {str(cv_error)}") |
| return (0, 0) |
|
|
| def points_to_rle(points, img_dimensions): |
| """带安全坐标钳位的多边形转换""" |
| width, height = img_dimensions |
| mask = np.zeros((height, width), dtype=np.uint8) |
| |
| polygon = [] |
| for x, y in points: |
| |
| safe_x = min(max(0, int(round(x))), width - 1) |
| safe_y = min(max(0, int(round(y))), height - 1) |
| |
| |
| |
| |
| |
| polygon.append((safe_x, safe_y)) |
| |
| |
| if len(polygon) < 3: |
| raise ValueError(f"无效多边形,点数不足3个") |
| |
| |
| cv2.fillPoly(mask, [np.array(polygon, dtype=np.int32)], color=1) |
| rle = mask_utils.encode(np.asfortranarray(mask)) |
| |
| return { |
| "size": [height, width], |
| "counts": rle['counts'].decode('utf-8') |
| } |
|
|
| def convert_medical_json(input_file, config=None): |
| """增强版转换函数""" |
| cfg = { |
| "task_type": "Image-Segmentation", |
| "source": "Lisa", |
| "domain": "General", |
| **(config or {}) |
| } |
|
|
| try: |
| input_path = Path(input_file) |
| image_path = input_path.with_suffix('.jpg') |
| |
| |
| if not image_path.exists(): |
| raise FileNotFoundError(f"关联图片不存在: {image_path.name}") |
| |
| media_paths=(Path(".") / "data" / cfg['source'] / image_path.name).as_posix() |
| media_paths = f"./{media_paths}" |
| |
| width, height = get_image_dimensions(image_path) |
| if width == 0 or height == 0: |
| raise ValueError("获取图片尺寸失败") |
|
|
| |
| with open(input_file, 'r', encoding='utf-8') as f: |
| raw_data = json.load(f) |
|
|
| annotations = [] |
| for shape in raw_data.get('shapes', []): |
| if shape.get('label') != 'target': |
| continue |
|
|
| points = shape.get('points', []) |
| try: |
| rle = points_to_rle(points, (width, height)) |
| annotations.append({ |
| "bbox": [], |
| "segmentation": rle, |
| "category_name": "" |
| }) |
| except ValueError as e: |
| print(f"标注跳过 [{input_path.name}]: {str(e)}") |
|
|
| return [{ |
| "index": 0, |
| "media_type": "image", |
| "media_paths": media_paths, |
| "description": "", |
| "task_type": cfg['task_type'], |
| "question": raw_data.get('text', []), |
| "question_type": "detection-form", |
| "options": [], |
| "annotations": [annotations], |
| "answer": [], |
| "source": cfg['source'], |
| "domain": cfg['domain'] |
| }] |
|
|
| except Exception as e: |
| print(f"转换失败 [{input_path.name}]: {str(e)}") |
| return None |
|
|
| def batch_convert(input_dir, output_file): |
| """批量处理增强版""" |
| input_dir = Path(input_dir) |
| all_data = [] |
| success_count = 0 |
| failed_files = [] |
| index_counter = 0 |
|
|
| for json_file in input_dir.glob('*.json'): |
| if result := convert_medical_json(json_file): |
| |
| for item in result: |
| item["index"] = index_counter |
| index_counter += 1 |
| all_data.extend(result) |
| success_count += len(result) |
| else: |
| failed_files.append(json_file.name) |
|
|
| with open(output_file, 'w', encoding='utf-8') as f: |
| json.dump(all_data, f, indent=2, ensure_ascii=False) |
|
|
| print(f"转换完成: 成功 {success_count} 个文件,失败 {len(failed_files)} 个") |
| if failed_files: |
| print("失败文件列表:\n" + "\n".join(failed_files)) |
|
|
| if __name__ == "__main__": |
| batch_convert( |
| input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/general/lisa/image/val", |
| output_file="/mnt/data/users/zys/proj/vlm_reasoning/utils/json/converted_dataset3.json" |
| ) |