File size: 15,026 Bytes
75a0e92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7dc3aac
 
 
75a0e92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7dc3aac
 
 
 
 
 
75a0e92
7dc3aac
75a0e92
7dc3aac
75a0e92
7dc3aac
75a0e92
 
 
 
 
 
 
 
 
 
 
 
 
7dc3aac
 
 
 
 
75a0e92
7dc3aac
75a0e92
7dc3aac
75a0e92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
"""
VLM-based floor plan parser — extract structured wall/door/window data from floor plan images.

Uses a vision-language model (via OpenAI-compatible API) to:
1. Parse the floor plan image into the schema format
2. Optionally compare rendered overlay with original for corrections
"""

from __future__ import annotations

import base64
import io
import json
import re
from pathlib import Path
from typing import Optional

from PIL import Image

from .schema import (
    FloorPlan,
    Wall,
    Opening,
    OpeningType,
    Point2D,
    CorrectionResult,
    Correction,
    CorrectionAction,
)
from .geometry import build_rooms, compute_floor_plan_geometry


def _image_to_base64(image: Image.Image | str | Path) -> str:
    """Convert image to base64 data URI."""
    if isinstance(image, (str, Path)):
        image = Image.open(image)
    image = image.convert("RGB")
    buf = io.BytesIO()
    image.save(buf, format="PNG")
    b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
    return f"data:image/png;base64,{b64}"


def _extract_json_from_response(text: str) -> dict:
    """Extract JSON from a VLM response that may contain markdown code blocks or thinking tokens."""
    # Strip Qwen3-style thinking blocks
    text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
    text = text.strip()
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        pass
    patterns = [
        r"```json\s*\n?(.*?)\n?\s*```",
        r"```\s*\n?(.*?)\n?\s*```",
        r"\{.*\}",
    ]
    for pattern in patterns:
        match = re.search(pattern, text, re.DOTALL)
        if match:
            try:
                candidate = match.group(1) if match.lastindex else match.group(0)
                return json.loads(candidate)
            except (json.JSONDecodeError, IndexError):
                continue
    raise ValueError(f"Could not extract JSON from response:\n{text[:500]}")


PARSE_SYSTEM_PROMPT = """You are an expert architectural floor plan parser. Your job is to analyze floor plan images and extract structured data about walls, doors, and windows.

IMPORTANT RULES:
1. All coordinates are in a consistent unit system (use the image pixel coordinates, we'll convert to meters later)
2. Walls are defined by their centerline (a polyline) and thickness
3. Doors and windows are defined by their position ALONG a wall's centerline
4. Each wall must connect to adjacent walls (shared endpoints or very close endpoints)
5. The wall network must form closed loops (rooms)
6. Exterior walls are typically thicker (15-30 pixels) than interior walls (8-15 pixels)
7. Curved walls should be approximated as polylines with many points

OUTPUT FORMAT:
You must output valid JSON matching this exact schema:
{
  "walls": [
    {
      "id": "w1",
      "centerline": [{"x": 100, "y": 50}, {"x": 500, "y": 50}],
      "thickness": 20,
      "openings": [
        {
          "id": "d1",
          "type": "door",
          "start": 50,
          "length": 80
        }
      ]
    }
  ]
}

WALL DETECTION GUIDELINES:
- Trace the CENTER of each wall, not its edges
- Each wall segment should be a straight run between junctions/corners
- At T-junctions and L-corners, start a new wall segment
- Estimate thickness by measuring the visible wall width in pixels
- Be precise with coordinates — they will be overlaid on the original image for verification

OPENING DETECTION GUIDELINES:
- "start" = distance from the first centerline point along the wall to where the opening begins
- "length" = length of the opening along the wall
- Doors typically show an arc swing or a gap in the wall
- Windows typically show parallel lines or a different fill pattern within the wall
- Double-check that start + length doesn't exceed the wall's total centerline length"""


CORRECTION_SYSTEM_PROMPT = """You are an expert architectural floor plan verifier. You are shown two images:

1. The ORIGINAL floor plan image
2. An OVERLAY showing the parsed schema rendered on top of the original

Your job is to compare them and identify discrepancies. Look for:
- Walls that are misaligned (rendered wall doesn't match original wall position)
- Missing walls (visible in original but not in the overlay)
- Extra walls (in overlay but not in original)
- Wrong wall thickness (too thick or too thin compared to original)
- Misplaced doors/windows (wrong position along the wall)
- Missing doors/windows (visible in original but not detected)
- Wrong opening sizes (too wide or too narrow)

OUTPUT FORMAT:
{
  "score": 0.85,
  "converged": false,
  "corrections": [
    {
      "action": "modify",
      "target": "w3",
      "field": "centerline",
      "value": [{"x": 150, "y": 200}, {"x": 150, "y": 500}],
      "reason": "Wall is about 20 pixels too far to the right"
    },
    {
      "action": "add",
      "target": null,
      "field": null,
      "value": {"id": "w11", "centerline": [{"x": 100, "y": 300}, {"x": 400, "y": 300}], "thickness": 12, "openings": []},
      "reason": "Interior partition wall visible in original but missing from parse"
    },
    {
      "action": "delete",
      "target": "w5",
      "field": null,
      "value": null,
      "reason": "This is not a wall - it's a staircase outline"
    }
  ]
}

SCORING GUIDE:
- 1.0: Perfect alignment, no corrections needed
- 0.9+: Minor tweaks only (small position adjustments)
- 0.7-0.9: Some walls need repositioning, maybe 1-2 missing
- 0.5-0.7: Major misalignments, several missing/extra walls
- <0.5: Fundamental errors, most walls wrong

Set "converged": true if score >= 0.92 or if remaining corrections are cosmetic only."""


class FloorPlanParser:
    """VLM-based floor plan parser with iterative correction."""

    def __init__(self, api_key=None, base_url=None, model="gpt-4o"):
        try:
            from openai import OpenAI
        except ImportError:
            raise ImportError("openai package required: pip install openai")
        kwargs = {}
        if api_key:
            kwargs["api_key"] = api_key
        if base_url:
            kwargs["base_url"] = base_url
        self.client = OpenAI(**kwargs)
        self.model = model

    def parse_image(self, image, detail="high", temperature=0.1):
        """Parse a floor plan image into the structured schema."""
        image_b64 = _image_to_base64(image)
        
        # For Qwen3 models, prepend /no_think to disable extended reasoning
        system_content = PARSE_SYSTEM_PROMPT
        if "Qwen3" in self.model:
            system_content = "/no_think\n" + system_content
        
        response = self.client.chat.completions.create(
            model=self.model, temperature=temperature, max_tokens=8192,
            messages=[
                {"role": "system", "content": system_content},
                {"role": "user", "content": [
                    {"type": "text", "text": "Parse this floor plan image. Extract ALL walls with their centerlines, thicknesses, and any doors/windows on them. Use pixel coordinates. Be thorough — every wall segment matters for room detection. Output ONLY the JSON, no explanation."},
                    {"type": "image_url", "image_url": {"url": image_b64, "detail": detail}},
                ]},
            ],
        )
        raw = response.choices[0].message.content
        data = _extract_json_from_response(raw)
        return self._data_to_floorplan(data)

    def critique_overlay(self, original_image, overlay_image, current_schema, iteration=1, temperature=0.2):
        """Compare overlay with original and generate corrections."""
        orig_b64 = _image_to_base64(original_image)
        overlay_b64 = _image_to_base64(overlay_image)
        schema_json = current_schema.model_dump_json(indent=2)
        
        system_content = CORRECTION_SYSTEM_PROMPT
        if "Qwen3" in self.model:
            system_content = "/no_think\n" + system_content
        
        response = self.client.chat.completions.create(
            model=self.model, temperature=temperature, max_tokens=4096,
            messages=[
                {"role": "system", "content": system_content},
                {"role": "user", "content": [
                    {"type": "text", "text": f"Iteration {iteration}. Compare these two images and identify corrections needed.\n\nCurrent schema ({len(current_schema.walls)} walls, {sum(len(w.openings) for w in current_schema.walls)} openings):\n```json\n{schema_json}\n```\n\nImage 1 is the ORIGINAL floor plan. Image 2 is the OVERLAY (parsed schema rendered on the original)."},
                    {"type": "image_url", "image_url": {"url": orig_b64, "detail": "high"}},
                    {"type": "image_url", "image_url": {"url": overlay_b64, "detail": "high"}},
                ]},
            ],
        )
        raw = response.choices[0].message.content
        data = _extract_json_from_response(raw)
        return CorrectionResult(
            iteration=iteration, score=data.get("score", 0.5), converged=data.get("converged", False),
            corrections=[
                Correction(action=CorrectionAction(c["action"]), target=c.get("target"),
                           field=c.get("field"), value=c.get("value"), reason=c["reason"])
                for c in data.get("corrections", [])
            ],
        )

    def apply_corrections(self, floorplan, corrections):
        """Apply corrections to a floor plan schema. Returns (new_floorplan, room_polygons)."""
        walls_dict = {w.id: w.model_dump() for w in floorplan.walls}
        for correction in corrections.corrections:
            if correction.action == CorrectionAction.DELETE:
                if correction.target and correction.target in walls_dict:
                    del walls_dict[correction.target]
            elif correction.action == CorrectionAction.ADD:
                if correction.value and isinstance(correction.value, dict):
                    wall_data = correction.value
                    wall_id = wall_data.get("id", f"w_new_{len(walls_dict)}")
                    walls_dict[wall_id] = wall_data
            elif correction.action == CorrectionAction.MODIFY:
                if not correction.target:
                    continue
                parts = correction.target.split(".")
                wall_id = parts[0]
                if wall_id not in walls_dict:
                    continue
                if len(parts) == 1 and correction.field:
                    walls_dict[wall_id][correction.field] = correction.value
                elif len(parts) == 3 and parts[1] == "openings":
                    opening_id = parts[2]
                    for op in walls_dict[wall_id].get("openings", []):
                        if op.get("id") == opening_id and correction.field:
                            op[correction.field] = correction.value
                            break
        new_walls = []
        for wall_data in walls_dict.values():
            try:
                new_walls.append(self._parse_wall(wall_data))
            except Exception as e:
                print(f"Warning: skipping wall due to error: {e}")
                continue
        rooms, room_polygons = build_rooms(new_walls, min_area=1.0)
        return FloorPlan(scale=floorplan.scale, origin=floorplan.origin, walls=new_walls, rooms=rooms), room_polygons

    def _data_to_floorplan(self, data):
        """Convert raw parsed JSON to FloorPlan with rooms detected."""
        walls = []
        for w in data.get("walls", []):
            try:
                walls.append(self._parse_wall(w))
            except Exception as e:
                print(f"Warning: skipping wall {w.get('id', '?')}: {e}")
                continue
        rooms, room_polygons = build_rooms(walls, min_area=100.0)
        fp = FloorPlan(walls=walls, rooms=rooms)
        return fp, room_polygons

    @staticmethod
    def _parse_wall(w):
        """Parse a wall from dict, handling various coordinate formats."""
        centerline = []
        for pt in w["centerline"]:
            if isinstance(pt, dict):
                centerline.append(Point2D(x=pt["x"], y=pt["y"]))
            elif isinstance(pt, (list, tuple)):
                centerline.append(Point2D(x=pt[0], y=pt[1]))
        openings = []
        for o in w.get("openings", []):
            openings.append(Opening(id=o["id"], type=OpeningType(o["type"]),
                                     start=float(o["start"]), length=float(o["length"])))
        return Wall(id=w["id"], centerline=centerline, thickness=float(w["thickness"]), openings=openings)


def parse_floorplan(image, api_key=None, base_url=None, model="gpt-4o",
                    max_iterations=4, convergence_threshold=0.92, verbose=True):
    """Full parse-render-correct pipeline.
    
    Returns (final_floorplan, room_polygons, overlay_images_per_iteration)
    """
    from .renderer import render_to_image, overlay_on_image
    parser = FloorPlanParser(api_key=api_key, base_url=base_url, model=model)
    if verbose:
        print("🔍 Parsing floor plan image...")
    floorplan, room_polygons = parser.parse_image(image)
    if verbose:
        print(f"   Found {len(floorplan.walls)} walls, "
              f"{sum(len(w.openings) for w in floorplan.walls)} openings, "
              f"{len(floorplan.rooms)} rooms")
    overlays = []
    for iteration in range(1, max_iterations + 1):
        if verbose:
            print(f"\n📐 Iteration {iteration}: rendering overlay...")
        overlay = overlay_on_image(floorplan, image, room_polygons=room_polygons,
                                    schema_opacity=0.6, original_opacity=0.7)
        overlays.append(overlay)
        if verbose:
            print(f"   Comparing overlay with original...")
        critique = parser.critique_overlay(original_image=image, overlay_image=overlay,
                                            current_schema=floorplan, iteration=iteration)
        if verbose:
            print(f"   Score: {critique.score:.2f}, Corrections: {len(critique.corrections)}, Converged: {critique.converged}")
            for c in critique.corrections:
                print(f"     - [{c.action.value}] {c.target or 'new'}: {c.reason}")
        if critique.converged or critique.score >= convergence_threshold:
            if verbose:
                print(f"\n✅ Converged at iteration {iteration} (score={critique.score:.2f})")
            break
        if not critique.corrections:
            if verbose:
                print(f"\n⚠️ No corrections suggested, stopping.")
            break
        floorplan, room_polygons = parser.apply_corrections(floorplan, critique)
        if verbose:
            print(f"   Applied corrections → {len(floorplan.walls)} walls, {len(floorplan.rooms)} rooms")
    final_overlay = overlay_on_image(floorplan, image, room_polygons=room_polygons,
                                      schema_opacity=0.6, original_opacity=0.7)
    overlays.append(final_overlay)
    return floorplan, room_polygons, overlays