Image Segmentation
Transformers
English
clipseg
segmentation
construction
drywall
quality-assurance
text-conditioned
binary-mask
Instructions to use youngPhilosopher/drywall-qa-clipseg with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use youngPhilosopher/drywall-qa-clipseg with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("image-segmentation", model="youngPhilosopher/drywall-qa-clipseg")# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("youngPhilosopher/drywall-qa-clipseg", dtype="auto") - Notebooks
- Google Colab
- Kaggle
| """Inspect annotations, generate masks, create train/val/test splits.""" | |
| import json | |
| import random | |
| from pathlib import Path | |
| import numpy as np | |
| from PIL import Image | |
| from pycocotools.coco import COCO | |
| from pycocotools import mask as mask_utils | |
| RAW_DIR = Path(__file__).resolve().parents[2] / "data" / "raw" | |
| PROCESSED_DIR = Path(__file__).resolve().parents[2] / "data" / "processed" | |
| SPLITS_DIR = Path(__file__).resolve().parents[2] / "data" / "splits" | |
| def inspect_dataset(coco_json_path: str) -> dict: | |
| """Check what annotation types exist in a COCO JSON file.""" | |
| with open(coco_json_path) as f: | |
| data = json.load(f) | |
| total = len(data.get("annotations", [])) | |
| has_seg = 0 | |
| has_bbox_only = 0 | |
| for ann in data.get("annotations", []): | |
| seg = ann.get("segmentation") | |
| if seg and isinstance(seg, list) and len(seg) > 0 and len(seg[0]) >= 6: | |
| has_seg += 1 | |
| elif seg and isinstance(seg, dict): # RLE format | |
| has_seg += 1 | |
| else: | |
| has_bbox_only += 1 | |
| return { | |
| "total_annotations": total, | |
| "total_images": len(data.get("images", [])), | |
| "has_segmentation": has_seg, | |
| "has_bbox_only": has_bbox_only, | |
| "annotation_type": "segmentation" if has_seg > has_bbox_only else "bbox_only", | |
| "categories": [c["name"] for c in data.get("categories", [])], | |
| } | |
| def render_masks_from_coco(coco_json_path: str, images_dir: str, output_dir: str) -> list[dict]: | |
| """Render binary masks from COCO polygon/RLE annotations. | |
| Returns list of {image_path, mask_path, image_id, width, height}. | |
| """ | |
| output_dir = Path(output_dir) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| coco = COCO(coco_json_path) | |
| records = [] | |
| for img_id in sorted(coco.getImgIds()): | |
| img_info = coco.loadImgs(img_id)[0] | |
| h, w = img_info["height"], img_info["width"] | |
| ann_ids = coco.getAnnIds(imgIds=img_id) | |
| anns = coco.loadAnns(ann_ids) | |
| if not anns: | |
| continue | |
| # Merge all annotations into one binary mask | |
| combined = np.zeros((h, w), dtype=np.uint8) | |
| for ann in anns: | |
| seg = ann.get("segmentation") | |
| # Skip annotations with empty or invalid segmentation | |
| if not seg: | |
| continue | |
| if isinstance(seg, list) and (len(seg) == 0 or (len(seg) > 0 and isinstance(seg[0], list) and len(seg[0]) < 6)): | |
| continue | |
| if isinstance(seg, list) and len(seg) > 0 and not isinstance(seg[0], list) and len(seg) < 6: | |
| continue | |
| try: | |
| rle = coco.annToRLE(ann) | |
| m = mask_utils.decode(rle) | |
| combined = np.maximum(combined, m) | |
| except (IndexError, ValueError): | |
| # Fall back to bbox if segmentation decode fails | |
| if "bbox" in ann: | |
| x, y, bw, bh = [int(v) for v in ann["bbox"]] | |
| combined[y:y+bh, x:x+bw] = 1 | |
| mask_img = Image.fromarray(combined * 255, mode="L") | |
| mask_name = Path(img_info["file_name"]).stem + "_mask.png" | |
| mask_path = output_dir / mask_name | |
| mask_img.save(mask_path) | |
| image_path = Path(images_dir) / img_info["file_name"] | |
| records.append({ | |
| "image_path": str(image_path), | |
| "mask_path": str(mask_path), | |
| "image_id": img_id, | |
| "width": w, | |
| "height": h, | |
| }) | |
| return records | |
| def render_masks_from_bboxes(coco_json_path: str, images_dir: str, output_dir: str) -> list[dict]: | |
| """Create filled-rectangle masks from bounding boxes (fallback when no segmentation).""" | |
| output_dir = Path(output_dir) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| with open(coco_json_path) as f: | |
| data = json.load(f) | |
| img_lookup = {img["id"]: img for img in data["images"]} | |
| anns_by_img: dict[int, list] = {} | |
| for ann in data["annotations"]: | |
| anns_by_img.setdefault(ann["image_id"], []).append(ann) | |
| records = [] | |
| for img_id, img_info in sorted(img_lookup.items()): | |
| anns = anns_by_img.get(img_id, []) | |
| if not anns: | |
| continue | |
| h, w = img_info["height"], img_info["width"] | |
| combined = np.zeros((h, w), dtype=np.uint8) | |
| for ann in anns: | |
| x, y, bw, bh = [int(v) for v in ann["bbox"]] | |
| combined[y:y+bh, x:x+bw] = 1 | |
| mask_img = Image.fromarray(combined * 255, mode="L") | |
| mask_name = Path(img_info["file_name"]).stem + "_mask.png" | |
| mask_path = output_dir / mask_name | |
| mask_img.save(mask_path) | |
| image_path = Path(images_dir) / img_info["file_name"] | |
| records.append({ | |
| "image_path": str(image_path), | |
| "mask_path": str(mask_path), | |
| "image_id": img_id, | |
| "width": w, | |
| "height": h, | |
| }) | |
| return records | |
| def find_coco_json(dataset_dir: Path) -> tuple[str, str] | None: | |
| """Find the COCO JSON and images directory in a Roboflow download.""" | |
| for split in ["train", "valid", "test"]: | |
| json_path = dataset_dir / split / "_annotations.coco.json" | |
| if json_path.exists(): | |
| return str(json_path), str(dataset_dir / split) | |
| # Single-folder layout | |
| for json_path in dataset_dir.rglob("_annotations.coco.json"): | |
| return str(json_path), str(json_path.parent) | |
| return None | |
| def process_dataset(name: str, dataset_dir: Path, prompt_synonyms: list[str]) -> list[dict]: | |
| """Process a single dataset: inspect, render masks, return records with prompts.""" | |
| records = [] | |
| mask_dir = PROCESSED_DIR / name / "masks" | |
| # Process each split folder (train/valid/test from Roboflow) | |
| for split_dir in sorted(dataset_dir.iterdir()): | |
| if not split_dir.is_dir(): | |
| continue | |
| json_path = split_dir / "_annotations.coco.json" | |
| if not json_path.exists(): | |
| continue | |
| print(f"\n Processing {name}/{split_dir.name}...") | |
| info = inspect_dataset(str(json_path)) | |
| print(f" Images: {info['total_images']}, Annotations: {info['total_annotations']}") | |
| print(f" Type: {info['annotation_type']}, Categories: {info['categories']}") | |
| split_mask_dir = mask_dir / split_dir.name | |
| if info["annotation_type"] == "segmentation": | |
| split_records = render_masks_from_coco( | |
| str(json_path), str(split_dir), str(split_mask_dir) | |
| ) | |
| else: | |
| print(f" WARNING: bbox-only annotations, using filled rectangles") | |
| split_records = render_masks_from_bboxes( | |
| str(json_path), str(split_dir), str(split_mask_dir) | |
| ) | |
| for r in split_records: | |
| r["dataset"] = name | |
| r["prompts"] = prompt_synonyms | |
| records.extend(split_records) | |
| return records | |
| def create_splits(records: list[dict], ratios: tuple = (0.70, 0.15, 0.15), seed: int = 42): | |
| """Split records into train/val/test, stratified by dataset.""" | |
| random.seed(seed) | |
| by_dataset: dict[str, list] = {} | |
| for r in records: | |
| by_dataset.setdefault(r["dataset"], []).append(r) | |
| train, val, test = [], [], [] | |
| for name, recs in by_dataset.items(): | |
| random.shuffle(recs) | |
| n = len(recs) | |
| n_train = int(n * ratios[0]) | |
| n_val = int(n * ratios[1]) | |
| train.extend(recs[:n_train]) | |
| val.extend(recs[n_train:n_train + n_val]) | |
| test.extend(recs[n_train + n_val:]) | |
| random.shuffle(train) | |
| random.shuffle(val) | |
| random.shuffle(test) | |
| SPLITS_DIR.mkdir(parents=True, exist_ok=True) | |
| for split_name, split_data in [("train", train), ("val", val), ("test", test)]: | |
| path = SPLITS_DIR / f"{split_name}.json" | |
| with open(path, "w") as f: | |
| json.dump(split_data, f, indent=2) | |
| print(f" {split_name}: {len(split_data)} samples -> {path}") | |
| return {"train": train, "val": val, "test": test} | |
| def run(config: dict): | |
| """Run full preprocessing pipeline.""" | |
| synonyms = config["data"]["prompt_synonyms"] | |
| ratios = tuple(config["data"]["split_ratios"]) | |
| all_records = [] | |
| for name in ["taping", "cracks"]: | |
| dataset_dir = RAW_DIR / name | |
| if not dataset_dir.exists(): | |
| print(f"WARNING: {dataset_dir} not found, skipping {name}") | |
| continue | |
| print(f"\n{'='*60}") | |
| print(f"Processing dataset: {name}") | |
| print(f"{'='*60}") | |
| records = process_dataset(name, dataset_dir, synonyms[name]) | |
| all_records.extend(records) | |
| print(f" Total records for {name}: {len(records)}") | |
| print(f"\n{'='*60}") | |
| print(f"Creating splits (total: {len(all_records)} records)") | |
| print(f"{'='*60}") | |
| splits = create_splits(all_records, ratios=ratios, seed=config["seed"]) | |
| return splits | |
| if __name__ == "__main__": | |
| import yaml | |
| config_path = Path(__file__).resolve().parents[2] / "configs" / "train_config.yaml" | |
| with open(config_path) as f: | |
| config = yaml.safe_load(f) | |
| run(config) | |