| |
|
| | import matplotlib.pyplot as plt |
| | |
| | from evaluation.viz import plot_example_single |
| | from dataset.torch import unbatch_to_device |
| | import matplotlib.pyplot as plt |
| | from typing import Optional, Tuple |
| | import cv2 |
| | import torch |
| | import numpy as np |
| | import time |
| | from logger import logger |
| | from evaluation.run import resolve_checkpoint_path, pretrained_models |
| | from models.maplocnet import MapLocNet |
| | from models.voting import fuse_gps, argmax_xyr |
| | |
| | from osm.raster import Canvas |
| | from utils.wrappers import Camera |
| | from utils.io import read_image |
| | from utils.geo import BoundaryBox, Projection |
| | from utils.exif import EXIF |
| | import requests |
| | from pathlib import Path |
| | from utils.exif import EXIF |
| | from dataset.image import resize_image, pad_image, rectify_image |
| | |
| | from dataset import UavMapDatasetModule |
| | import torchvision.transforms as tvf |
| | import matplotlib.pyplot as plt |
| | import numpy as np |
| | from sklearn.decomposition import PCA |
| | from PIL import Image |
| | |
| | |
| | from osm.tiling import TileManager |
| | from utils.viz_localization import ( |
| | likelihood_overlay, |
| | plot_dense_rotations, |
| | add_circle_inset, |
| | ) |
| | |
| | from osm.viz import Colormap, plot_nodes |
| | from utils.viz_2d import plot_images |
| |
|
| | from utils.viz_2d import features_to_RGB |
| | import random |
| | from geopy.distance import geodesic |
| |
|
| |
|
| | def vis_image_feature(F): |
| | def normalize(x): |
| | return x / np.linalg.norm(x, axis=-1, keepdims=True) |
| |
|
| | |
| | F = F[:, 0:180, 0:180] |
| | flatten = [] |
| | c, h, w = F.shape |
| | print(F.shape) |
| | F = np.rollaxis(F, 0, 3) |
| | F_flat = F.reshape(-1, c) |
| | flatten.append(F_flat) |
| | flatten = normalize(flatten)[0] |
| |
|
| | flatten = np.nan_to_num(flatten, nan=0) |
| | pca = PCA(n_components=3) |
| |
|
| | print(flatten.shape) |
| | flatten = pca.fit_transform(flatten) |
| | flatten = (normalize(flatten) + 1) / 2 |
| |
|
| | |
| | F_rgb, flatten = np.split(flatten, [h * w], axis=0) |
| | F_rgb = F_rgb.reshape((h, w, 3)) |
| | return F_rgb |
| | def distance(lat1, lon1, lat2, lon2): |
| | point1 = (lat1, lon1) |
| | point2 = (lat2, lon2) |
| | distance_km = geodesic(point1, point2).meters |
| | return distance_km |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | def show_result(map_vis_image, pre_uv, pre_yaw): |
| | |
| | gray_mask = np.zeros_like(map_vis_image) |
| | gray_mask.fill(128) |
| |
|
| | |
| | image = cv2.addWeighted(map_vis_image, 1, gray_mask, 0, 0) |
| | |
| |
|
| | |
| | u, v = pre_uv |
| | x1, y1 = int(u), int(v) |
| | angle = pre_yaw - 90 |
| | |
| | length = 20 |
| | x2 = int(x1 + length * np.cos(np.radians(angle))) |
| | y2 = int(y1 + length * np.sin(np.radians(angle))) |
| | |
| | cv2.arrowedLine(image, (x1, y1), (x2, y2), (0, 0, 0), 2, 5, 0, 0.3) |
| | |
| | return image |
| |
|
| |
|
| | def xyz_to_latlon(x, y, z): |
| | |
| | wgs84 = pyproj.CRS('EPSG:4326') |
| |
|
| | |
| | xyz = pyproj.CRS(f'+proj=geocent +datum=WGS84 +units=m +no_defs') |
| |
|
| | |
| | transformer = pyproj.Transformer.from_crs(xyz, wgs84) |
| |
|
| | |
| | lon, lat, _ = transformer.transform(x, y, z) |
| |
|
| | return lat, lon |
| |
|
| |
|
| | class Demo: |
| | def __init__( |
| | self, |
| | experiment_or_path: Optional[str] = "OrienterNet_MGL", |
| | device=None, |
| | **kwargs |
| | ): |
| | if experiment_or_path in pretrained_models: |
| | experiment_or_path, _ = pretrained_models[experiment_or_path] |
| | path = resolve_checkpoint_path(experiment_or_path) |
| | ckpt = torch.load(path, map_location=(lambda storage, loc: storage)) |
| | config = ckpt["hyper_parameters"] |
| | config.model.update(kwargs) |
| | config.model.image_encoder.backbone.pretrained = False |
| |
|
| | model = MapLocNet(config.model).eval() |
| | state = {k[len("model."):]: v for k, v in ckpt["state_dict"].items()} |
| | model.load_state_dict(state, strict=True) |
| | if device is None: |
| | device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| | model = model.to(device) |
| |
|
| | self.model = model |
| | self.config = config |
| | self.device = device |
| |
|
| | def prepare_data( |
| | self, |
| | image: np.ndarray, |
| | camera: Camera, |
| | canvas: Canvas, |
| | roll_pitch: Optional[Tuple[float]] = None, |
| | ): |
| |
|
| | image = torch.from_numpy(image).permute(2, 0, 1).float().div_(255) |
| |
|
| | return { |
| | 'map': torch.from_numpy(canvas.raster).long(), |
| | 'image': image, |
| | |
| | |
| | |
| | } |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | def localize(self, image: np.ndarray, camera: Camera, canvas: Canvas, **kwargs): |
| |
|
| | data = self.prepare_data(image, camera, canvas, **kwargs) |
| | data_ = {k: v.to(self.device)[None] for k, v in data.items()} |
| | |
| | |
| | |
| | start = time.time() |
| | with torch.no_grad(): |
| | pred = self.model(data_) |
| |
|
| | end = time.time() |
| | xy_gps = canvas.bbox.center |
| | uv_gps = torch.from_numpy(canvas.to_uv(xy_gps)) |
| |
|
| | lp_xyr = pred["log_probs"].squeeze(0) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | xyr = argmax_xyr(lp_xyr).cpu() |
| |
|
| | prob = lp_xyr.exp().cpu() |
| | neural_map = pred["map"]["map_features"][0].squeeze(0).cpu() |
| | print('total time:', start - end) |
| | return xyr[:2], xyr[2], prob, neural_map, data["image"], data_, pred |
| |
|
| |
|
| | def load_test_data( |
| | root: Path, |
| | city: str, |
| | index: int, |
| | ): |
| | uav_image_path = root / city / 'uav' |
| | map_path = root / city / 'map' |
| | map_vis = root / city / 'map_vis' |
| | info_path = root / city / 'info.csv' |
| | osm_path = root / city / '{}.osm'.format(city) |
| |
|
| | info = np.loadtxt(str(info_path), dtype=str, delimiter=",", skiprows=1) |
| |
|
| | id, uav_name, map_name, \ |
| | uav_long, uav_lat, \ |
| | map_long, map_lat, \ |
| | tile_size_meters, pixel_per_meter, \ |
| | u, v, yaw, dis = info[index] |
| | print(info[index]) |
| | uav_image_rgb = cv2.imread(str(uav_image_path / uav_name)) |
| | uav_image_rgb = cv2.cvtColor(uav_image_rgb, cv2.COLOR_BGR2RGB) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | map_vis_image = cv2.imread(str(map_vis / uav_name)) |
| | map_vis_image = cv2.cvtColor(map_vis_image, cv2.COLOR_BGR2RGB) |
| |
|
| | map = np.load(str(map_path / map_name)) |
| |
|
| | tfs = [] |
| | tfs.append(tvf.ToTensor()) |
| | tfs.append(tvf.Resize(256)) |
| | val_tfs = tvf.Compose(tfs) |
| |
|
| | uav_image = val_tfs(uav_image_rgb) |
| | |
| | |
| | |
| | |
| | |
| | uav_path = str(uav_image_path / uav_name) |
| | return { |
| | 'map': torch.from_numpy(np.ascontiguousarray(map)).long().unsqueeze(0), |
| | 'image': torch.tensor(uav_image).unsqueeze(0), |
| | 'roll_pitch_yaw': torch.tensor((0, 0, float(yaw))).float().unsqueeze(0), |
| | 'pixels_per_meter': torch.tensor(float(pixel_per_meter)).float().unsqueeze(0), |
| | "uv": torch.tensor([float(u), float(v)]).float().unsqueeze(0), |
| | }, uav_image_rgb, map_vis_image, uav_path, [float(map_lat), float(map_long)] |
| |
|
| |
|
| | def crop_image(image, width, height): |
| | |
| | x = int((image.shape[1] - width) / 2) |
| | y = int((image.shape[0] - height) / 2) |
| |
|
| | |
| | cropped_image = image[y:y + height, x:x + width] |
| | return cropped_image |
| |
|
| |
|
| | def crop_square(image): |
| | |
| | height, width = image.shape[:2] |
| |
|
| | |
| | min_length = min(height, width) |
| |
|
| | |
| | top = (height - min_length) // 2 |
| | bottom = top + min_length |
| | left = (width - min_length) // 2 |
| | right = left + min_length |
| |
|
| | |
| | cropped_image = image[top:bottom, left:right] |
| |
|
| | return cropped_image |
| | def read_input_image_test( |
| | image, |
| | prior_latlon, |
| | tile_size_meters, |
| | ): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | image = cv2.resize(image,(256,256)) |
| | roll_pitch = None |
| |
|
| |
|
| | latlon = None |
| | if prior_latlon is not None: |
| | latlon = prior_latlon |
| | logger.info("Using prior latlon %s.", prior_latlon) |
| |
|
| | if latlon is None: |
| | with open(image_path, "rb") as fid: |
| | exif = EXIF(fid, lambda: image.shape[:2]) |
| | geo = exif.extract_geo() |
| | if geo: |
| | alt = geo.get("altitude", 0) |
| | latlon = (geo["latitude"], geo["longitude"], alt) |
| | logger.info("Using prior location from EXIF.") |
| | |
| | else: |
| | logger.info("Could not find any prior location in the image EXIF metadata.") |
| |
|
| | latlon = np.array(latlon) |
| |
|
| | proj = Projection(*latlon) |
| | center = proj.project(latlon) |
| | bbox = BoundaryBox(center, center) + float(tile_size_meters) |
| | camera=None |
| | image=cv2.resize(image,(256,256)) |
| | return image, camera, roll_pitch, proj, bbox, latlon |
| | if __name__ == '__main__': |
| | experiment_or_path = "weight/last-step-checkpointing.ckpt" |
| | |
| | image_path='images/00000.jpg' |
| | prior_latlon=(37.75704325989902,-122.435941445631) |
| | tile_size_meters=128 |
| | demo = Demo(experiment_or_path=experiment_or_path, num_rotations=128, device='cpu') |
| | image, camera, gravity, proj, bbox, true_prior_latlon = read_input_image_test( |
| | image_path, |
| | prior_latlon=prior_latlon, |
| | tile_size_meters=tile_size_meters, |
| | ) |
| | tiler = TileManager.from_bbox(projection=proj, bbox=bbox + 10,ppm=1, tile_size=tile_size_meters) |
| | |
| | canvas = tiler.query(bbox) |
| | uv, yaw, prob, neural_map, image_rectified, data_, pred = demo.localize( |
| | image, camera, canvas) |
| | prior_latlon_pred = proj.unproject(canvas.to_xy(uv)) |
| | pass |