Spaces:
Runtime error
Runtime error
| import logging | |
| import os | |
| import re | |
| import cv2 | |
| from PIL import Image | |
| # Set up logging | |
| logger = logging.getLogger(__name__) | |
| def preprocess_image(image_path, target_size=(224, 224)): | |
| """ | |
| Preprocess X-ray image for model input. | |
| Args: | |
| image_path (str): Path to the X-ray image | |
| target_size (tuple): Target size for resizing | |
| Returns: | |
| PIL.Image: Preprocessed image | |
| """ | |
| try: | |
| # Check if file exists | |
| if not os.path.exists(image_path): | |
| raise FileNotFoundError(f"Image file not found: {image_path}") | |
| # Load image | |
| image = Image.open(image_path) | |
| # Convert grayscale to RGB if needed | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| # Resize image | |
| image = image.resize(target_size, Image.LANCZOS) | |
| return image | |
| except Exception as e: | |
| logger.error(f"Error preprocessing image: {e}") | |
| raise | |
| def enhance_xray_image(image_path, output_path=None, clahe_clip=2.0, clahe_grid=(8, 8)): | |
| """ | |
| Enhance X-ray image contrast using CLAHE (Contrast Limited Adaptive Histogram Equalization). | |
| Args: | |
| image_path (str): Path to the X-ray image | |
| output_path (str, optional): Path to save enhanced image | |
| clahe_clip (float): Clip limit for CLAHE | |
| clahe_grid (tuple): Grid size for CLAHE | |
| Returns: | |
| str or np.ndarray: Path to enhanced image or image array | |
| """ | |
| try: | |
| # Read image | |
| img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) | |
| if img is None: | |
| raise ValueError(f"Failed to read image: {image_path}") | |
| # Create CLAHE object | |
| clahe = cv2.createCLAHE(clipLimit=clahe_clip, tileGridSize=clahe_grid) | |
| # Apply CLAHE | |
| enhanced = clahe.apply(img) | |
| # Save enhanced image if output path is provided | |
| if output_path: | |
| cv2.imwrite(output_path, enhanced) | |
| return output_path | |
| else: | |
| return enhanced | |
| except Exception as e: | |
| logger.error(f"Error enhancing X-ray image: {e}") | |
| raise | |
| def normalize_report_text(text): | |
| """ | |
| Normalize medical report text for consistent processing. | |
| Args: | |
| text (str): Medical report text | |
| Returns: | |
| str: Normalized text | |
| """ | |
| try: | |
| # Remove multiple whitespaces | |
| text = re.sub(r"\s+", " ", text) | |
| # Standardize section headers | |
| section_patterns = { | |
| r"(?i)clinical\s*(?:history|indication)": "CLINICAL HISTORY:", | |
| r"(?i)technique": "TECHNIQUE:", | |
| r"(?i)comparison": "COMPARISON:", | |
| r"(?i)findings": "FINDINGS:", | |
| r"(?i)impression": "IMPRESSION:", | |
| r"(?i)recommendation": "RECOMMENDATION:", | |
| r"(?i)comment": "COMMENT:", | |
| } | |
| for pattern, replacement in section_patterns.items(): | |
| text = re.sub(pattern + r"\s*:", replacement, text) | |
| # Standardize common abbreviations | |
| abbrev_patterns = { | |
| r"(?i)\bw\/\b": "with", | |
| r"(?i)\bw\/o\b": "without", | |
| r"(?i)\bs\/p\b": "status post", | |
| r"(?i)\bc\/w\b": "consistent with", | |
| r"(?i)\br\/o\b": "rule out", | |
| r"(?i)\bhx\b": "history", | |
| r"(?i)\bdx\b": "diagnosis", | |
| r"(?i)\btx\b": "treatment", | |
| } | |
| for pattern, replacement in abbrev_patterns.items(): | |
| text = re.sub(pattern, replacement, text) | |
| return text.strip() | |
| except Exception as e: | |
| logger.error(f"Error normalizing report text: {e}") | |
| return text # Return original text if normalization fails | |
| def extract_sections(text): | |
| """ | |
| Extract sections from a medical report. | |
| Args: | |
| text (str): Medical report text | |
| Returns: | |
| dict: Dictionary of extracted sections | |
| """ | |
| try: | |
| # Normalize text first | |
| normalized_text = normalize_report_text(text) | |
| # Define section patterns | |
| section_headers = [ | |
| "CLINICAL HISTORY:", | |
| "TECHNIQUE:", | |
| "COMPARISON:", | |
| "FINDINGS:", | |
| "IMPRESSION:", | |
| "RECOMMENDATION:", | |
| ] | |
| # Find all section headers in the text | |
| sections = {} | |
| current_section = "PREAMBLE" # For text before first section header | |
| sections[current_section] = [] | |
| for line in normalized_text.split("\n"): | |
| section_found = False | |
| for header in section_headers: | |
| if header in line: | |
| current_section = header.rstrip(":") | |
| sections[current_section] = [] | |
| section_found = True | |
| # Add the rest of the line after the header | |
| content = line.split(header, 1)[1].strip() | |
| if content: | |
| sections[current_section].append(content) | |
| break | |
| if not section_found and current_section: | |
| sections[current_section].append(line) | |
| # Join each section's lines | |
| for section, lines in sections.items(): | |
| sections[section] = " ".join(lines).strip() | |
| # Remove empty sections | |
| sections = {k: v for k, v in sections.items() if v} | |
| return sections | |
| except Exception as e: | |
| logger.error(f"Error extracting sections: {e}") | |
| return {"FULL_TEXT": text} # Return full text if extraction fails | |
| def extract_measurements(text): | |
| """ | |
| Extract measurements from medical text (sizes, volumes, etc.). | |
| Args: | |
| text (str): Medical text | |
| Returns: | |
| list: List of tuples containing (measurement, value, unit) | |
| """ | |
| try: | |
| # Pattern for measurements like "5mm nodule" or "nodule measuring 5mm" | |
| # or "8x10mm mass" or "mass of size 8x10mm" | |
| size_pattern = r"(\d+(?:\.\d+)?(?:\s*[x×]\s*\d+(?:\.\d+)?)?(?:\s*[x×]\s*\d+(?:\.\d+)?)?)\s*(mm|cm|mm2|cm2|mm3|cm3|ml|cc)" | |
| # Find measurements with context | |
| context_pattern = ( | |
| r"([A-Za-z\s]+(?:mass|nodule|effusion|opacity|lesion|tumor|cyst|structure|area|region)[A-Za-z\s]*)" | |
| + size_pattern | |
| ) | |
| context_measurements = [] | |
| for match in re.finditer(context_pattern, text, re.IGNORECASE): | |
| context, size, unit = match.groups() | |
| context_measurements.append((context.strip(), size, unit)) | |
| # For measurements without clear context, just extract size and unit | |
| all_measurements = [] | |
| for match in re.finditer(size_pattern, text): | |
| size, unit = match.groups() | |
| all_measurements.append((size, unit)) | |
| return context_measurements | |
| except Exception as e: | |
| logger.error(f"Error extracting measurements: {e}") | |
| return [] | |
| def prepare_sample_batch(image_paths, reports=None, target_size=(224, 224)): | |
| """ | |
| Prepare a batch of samples for model processing. | |
| Args: | |
| image_paths (list): List of paths to images | |
| reports (list, optional): List of corresponding reports | |
| target_size (tuple): Target image size | |
| Returns: | |
| tuple: Batch of preprocessed images and reports | |
| """ | |
| try: | |
| processed_images = [] | |
| processed_reports = [] | |
| for i, image_path in enumerate(image_paths): | |
| # Process image | |
| image = preprocess_image(image_path, target_size) | |
| processed_images.append(image) | |
| # Process report if available | |
| if reports and i < len(reports): | |
| normalized_report = normalize_report_text(reports[i]) | |
| processed_reports.append(normalized_report) | |
| return processed_images, processed_reports if reports else None | |
| except Exception as e: | |
| logger.error(f"Error preparing sample batch: {e}") | |
| raise | |