Source code for deepforest.utilities

"""Utilities model"""
import json
import os
import urllib
import warnings
import functools

import geopandas as gpd
import numpy as np
import pandas as pd
import rasterio
import shapely
import xmltodict
import yaml
from tqdm import tqdm

from deepforest import _ROOT


[docs]def read_config(config_path): """Read config yaml file""" try: with open(config_path, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) except Exception as e: raise FileNotFoundError("There is no config at {}, yields {}".format( config_path, e)) return config
[docs]class DownloadProgressBar(tqdm): """Download progress bar class."""
[docs] def update_to(self, b=1, bsize=1, tsize=None): """ Update class attributes Args: b: bsize: tsize: Returns: """ if tsize is not None: self.total = tsize self.update(b * bsize - self.n)
[docs]def use_bird_release( save_dir=os.path.join(_ROOT, "data/"), prebuilt_model="bird", check_release=True): """ Check the existence of, or download the latest model release from github Args: save_dir: Directory to save filepath, default to "data" in deepforest repo prebuilt_model: Currently only accepts "NEON", but could be expanded to include other prebuilt models. The local model will be called prebuilt_model.h5 on disk. check_release (logical): whether to check github for a model recent release. In cases where you are hitting the github API rate limit, set to False and any local model will be downloaded. If no model has been downloaded an error will raise. Returns: release_tag, output_path (str): path to downloaded model """ # Naming based on pre-built model output_path = os.path.join(save_dir, prebuilt_model + ".pt") if check_release: # Find latest github tag release from the DeepLidar repo _json = json.loads( urllib.request.urlopen( urllib.request.Request( 'https://api.github.com/repos/Weecology/BirdDetector/releases/latest', headers={'Accept': 'application/vnd.github.v3+json'}, )).read()) asset = _json['assets'][0] url = asset['browser_download_url'] # Check the release tagged locally try: release_txt = pd.read_csv(save_dir + "current_bird_release.csv") except BaseException: release_txt = pd.DataFrame({"current_bird_release": [None]}) # Download the current release it doesn't exist if not release_txt.current_bird_release[0] == _json["html_url"]: print("Downloading model from BirdDetector release {}, see {} for details". format(_json["tag_name"], _json["html_url"])) with DownloadProgressBar(unit='B', unit_scale=True, miniters=1, desc=url.split('/')[-1]) as t: urllib.request.urlretrieve(url, filename=output_path, reporthook=t.update_to) print("Model was downloaded and saved to {}".format(output_path)) # record the release tag locally release_txt = pd.DataFrame({"current_bird_release": [_json["html_url"]]}) release_txt.to_csv(save_dir + "current_bird_release.csv") else: print("Model from BirdDetector Repo release {} was already downloaded. " "Loading model from file.".format(_json["html_url"])) return _json["html_url"], output_path else: try: release_txt = pd.read_csv(save_dir + "current_release.csv") except BaseException: raise ValueError("Check release argument is {}, but no release has been " "previously downloaded".format(check_release)) return release_txt.current_release[0], output_path
[docs]def use_release( save_dir=os.path.join(_ROOT, "data/"), prebuilt_model="NEON", check_release=True): """ Check the existence of, or download the latest model release from github Args: save_dir: Directory to save filepath, default to "data" in deepforest repo prebuilt_model: Currently only accepts "NEON", but could be expanded to include other prebuilt models. The local model will be called prebuilt_model.h5 on disk. check_release (logical): whether to check github for a model recent release. In cases where you are hitting the github API rate limit, set to False and any local model will be downloaded. If no model has been downloaded an error will raise. Returns: release_tag, output_path (str): path to downloaded model """ # Naming based on pre-built model output_path = os.path.join(save_dir, prebuilt_model + ".pt") if check_release: # Find latest github tag release from the DeepLidar repo _json = json.loads( urllib.request.urlopen( urllib.request.Request( 'https://api.github.com/repos/Weecology/DeepForest/releases/latest', headers={'Accept': 'application/vnd.github.v3+json'}, )).read()) asset = _json['assets'][0] url = asset['browser_download_url'] # Check the release tagged locally try: release_txt = pd.read_csv(save_dir + "current_release.csv") except BaseException: release_txt = pd.DataFrame({"current_release": [None]}) # Download the current release it doesn't exist if not release_txt.current_release[0] == _json["html_url"]: print("Downloading model from DeepForest release {}, see {} " "for details".format(_json["tag_name"], _json["html_url"])) with DownloadProgressBar(unit='B', unit_scale=True, miniters=1, desc=url.split('/')[-1]) as t: urllib.request.urlretrieve(url, filename=output_path, reporthook=t.update_to) print("Model was downloaded and saved to {}".format(output_path)) # record the release tag locally release_txt = pd.DataFrame({"current_release": [_json["html_url"]]}) release_txt.to_csv(save_dir + "current_release.csv") else: print("Model from DeepForest release {} was already downloaded. " "Loading model from file.".format(_json["html_url"])) return _json["html_url"], output_path else: try: release_txt = pd.read_csv(save_dir + "current_release.csv") except BaseException: raise ValueError("Check release argument is {}, but no release " "has been previously downloaded".format(check_release)) return release_txt.current_release[0], output_path
[docs]def xml_to_annotations(xml_path): """ Load annotations from xml format (e.g. RectLabel editor) and convert them into retinanet annotations format. Args: xml_path (str): Path to the annotations xml, formatted by RectLabel Returns: Annotations (pandas dataframe): in the format -> path-to-image.png,x1,y1,x2,y2,class_name """ # parse with open(xml_path) as fd: doc = xmltodict.parse(fd.read()) # grab xml objects try: tile_xml = doc["annotation"]["object"] except Exception as e: raise Exception("error {} for path {} with doc annotation{}".format( e, xml_path, doc["annotation"])) xmin = [] xmax = [] ymin = [] ymax = [] label = [] if isinstance(tile_xml, list): # Construct frame if multiple trees for tree in tile_xml: xmin.append(tree["bndbox"]["xmin"]) xmax.append(tree["bndbox"]["xmax"]) ymin.append(tree["bndbox"]["ymin"]) ymax.append(tree["bndbox"]["ymax"]) label.append(tree['name']) else: xmin.append(tile_xml["bndbox"]["xmin"]) xmax.append(tile_xml["bndbox"]["xmax"]) ymin.append(tile_xml["bndbox"]["ymin"]) ymax.append(tile_xml["bndbox"]["ymax"]) label.append(tile_xml['name']) rgb_name = os.path.basename(doc["annotation"]["filename"]) # set dtypes, check for floats and round xmin = [round_with_floats(x) for x in xmin] xmax = [round_with_floats(x) for x in xmax] ymin = [round_with_floats(x) for x in ymin] ymax = [round_with_floats(x) for x in ymax] annotations = pd.DataFrame({ "image_path": rgb_name, "xmin": xmin, "ymin": ymin, "xmax": xmax, "ymax": ymax, "label": label }) return (annotations)
[docs]def shapefile_to_annotations(shapefile, rgb, buffer_size=0.5, geometry_type="bbox", savedir="."): """ Convert a shapefile of annotations into annotations csv file for DeepForest training and evaluation Geometry Handling: The geometry_type is the form of the objects in the given shapefile. It can be "bbox" or "point". If geometry_type is set to "bbox" (default) then the bounding boxes in the shapefile will be used as is and transferred over to the annotations file. If the geometry_type is "point" then a bounding box will be created for each point that is centered on the point location and has an apothem equal to buffer_size, resulting in a bounding box with dimensions of 2 times the value of buffer_size. Args: shapefile: Path to a shapefile on disk. If a label column is present, it will be used, else all labels are assumed to be "Tree" rgb: Path to the RGB image on disk savedir: Directory to save csv files buffer_size: size of point to box expansion in map units of the target object, meters for projected data, pixels for unprojected data. The buffer_size is added to each side of the x,y point to create the box. geometry_type: Specifies the spatial representation used in the shapefile; can be "bbox" or "point" Returns: results: a pandas dataframe """ # Verify the geometry_type is valid if geometry_type not in ["bbox", "point"]: raise ValueError( "Invalid argument for 'geometry_type'. Expected 'point' or 'bbox'.") # Read shapefile gdf = gpd.read_file(shapefile) # define in image coordinates and buffer to create a box if geometry_type == "point": gdf["geometry"] = [ shapely.geometry.Point(x, y) for x, y in zip(gdf.geometry.x.astype(float), gdf.geometry.y.astype(float)) ] gdf["geometry"] = [ shapely.geometry.box(left, bottom, right, top) for left, bottom, right, top in gdf.geometry.buffer(buffer_size).bounds.values ] # get coordinates df = gdf.geometry.bounds # raster bounds with rasterio.open(rgb) as src: left, bottom, right, top = src.bounds resolution = src.res[0] raster_crs = src.crs if gdf.crs: # Check matching the crs if not gdf.crs.to_string() == raster_crs.to_string(): raise ValueError( "The shapefile crs {} does not match the image crs {}".format( gdf.crs, src.crs)) # Transform project coordinates to image coordinates df["tile_xmin"] = (df.minx - left) / resolution df["tile_xmin"] = df["tile_xmin"].astype(int) df["tile_xmax"] = (df.maxx - left) / resolution df["tile_xmax"] = df["tile_xmax"].astype(int) # UTM is given from the top, but origin of an image is top left df["tile_ymax"] = (top - df.miny) / resolution df["tile_ymax"] = df["tile_ymax"].astype(int) df["tile_ymin"] = (top - df.maxy) / resolution df["tile_ymin"] = df["tile_ymin"].astype(int) # Add labels is they exist if "label" in gdf.columns: df["label"] = gdf["label"] else: df["label"] = "Tree" # add filename df["image_path"] = os.path.basename(rgb) # select columns result = df[[ "image_path", "tile_xmin", "tile_ymin", "tile_xmax", "tile_ymax", "label" ]] result = result.rename(columns={ "tile_xmin": "xmin", "tile_ymin": "ymin", "tile_xmax": "xmax", "tile_ymax": "ymax" }) # ensure no zero area polygons due to rounding to pixel size result = result[~(result.xmin == result.xmax)] result = result[~(result.ymin == result.ymax)] return result
[docs]def round_with_floats(x): """Check if string x is float or int, return int, rounded if needed.""" try: result = int(x) except BaseException: warnings.warn( "Annotations file contained non-integer coordinates. " "These coordinates were rounded to nearest int. " "All coordinates must correspond to pixels in the image coordinate system. " "If you are attempting to use projected data, " "first convert it into image coordinates see FAQ for suggestions.") result = int(np.round(float(x))) return result
[docs]def check_file(df): """Check a file format for correct column names and structure""" if not all(x in df.columns for x in ["image_path", "xmin", "xmax", "ymin", "ymax", "label"]): raise IOError("Input file has incorrect column names, " "the following columns must exist " "'image_path','xmin','ymin','xmax','ymax','label'.") return df
[docs]def check_image(image): """Check an image is three channel, channel last format Args: image: numpy array Returns: None, throws error on assert """ if not image.shape[2] == 3: raise ValueError("image is expected have three channels, channel last format, " "found image with shape {}".format(image.shape))
[docs]def boxes_to_shapefile(df, root_dir, projected=True, flip_y_axis=False): """ Convert from image coordinates to geographic coordinates Note that this assumes df is just a single plot being passed to this function Args: df: a pandas type dataframe with columns: name, xmin, ymin, xmax, ymax. Name is the relative path to the root_dir arg. root_dir: directory of images to lookup image_path column projected: If True, convert from image to geographic coordinates, if False, keep in image coordinate system flip_y_axis: If True, reflect predictions over y axis to align with raster data in QGIS, which uses a negative y origin compared to numpy. See https://gis.stackexchange.com/questions/306684/why-does-qgis-use-negative-y-spacing-in-the-default-raster-geotransform Returns: df: a geospatial dataframe with the boxes optionally transformed to the target crs """ # Raise a warning and confirm if a user sets projected to True when flip_y_axis is True. if flip_y_axis and projected: warnings.warn( "flip_y_axis is {}, and projected is {}. In most cases, projected should be False when inverting y axis. Setting projected=False" .format(flip_y_axis, projected), UserWarning) projected = False plot_names = df.image_path.unique() if len(plot_names) > 1: raise ValueError("This function projects a single plots worth of data. " "Multiple plot names found {}".format(plot_names)) else: plot_name = plot_names[0] rgb_path = "{}/{}".format(root_dir, plot_name) with rasterio.open(rgb_path) as dataset: bounds = dataset.bounds pixelSizeX, pixelSizeY = dataset.res crs = dataset.crs transform = dataset.transform if projected: # Convert image pixel locations to geographic coordinates xmin_coords, ymin_coords = rasterio.transform.xy(transform=transform, rows=df.ymin, cols=df.xmin, offset='center') xmax_coords, ymax_coords = rasterio.transform.xy(transform=transform, rows=df.ymax, cols=df.xmax, offset='center') # One box polygon for each tree bounding box # Careful of single row edge case where # xmin_coords comes out not as a list, but as a float if type(xmin_coords) == float: xmin_coords = [xmin_coords] ymin_coords = [ymin_coords] xmax_coords = [xmax_coords] ymax_coords = [ymax_coords] box_coords = zip(xmin_coords, ymin_coords, xmax_coords, ymax_coords) box_geoms = [ shapely.geometry.box(xmin, ymin, xmax, ymax) for xmin, ymin, xmax, ymax in box_coords ] geodf = gpd.GeoDataFrame(df, geometry=box_geoms) geodf.crs = crs return geodf else: if flip_y_axis: # See https://gis.stackexchange.com/questions/306684/why-does-qgis-use-negative-y-spacing-in-the-default-raster-geotransform # Numpy uses top left 0,0 origin, flip along y axis. df['geometry'] = df.apply( lambda x: shapely.geometry.box(x.xmin, -x.ymin, x.xmax, -x.ymax), axis=1) else: df['geometry'] = df.apply( lambda x: shapely.geometry.box(x.xmin, x.ymin, x.xmax, x.ymax), axis=1) df = gpd.GeoDataFrame(df, geometry="geometry") return df
[docs]def collate_fn(batch): batch = list(filter(lambda x: x is not None, batch)) return tuple(zip(*batch))
[docs]def annotations_to_shapefile(df, transform, crs): """ Convert output from predict_image and predict_tile to a geopandas data.frame Args: df: prediction data.frame with columns ['xmin','ymin','xmax','ymax','label','score'] transform: A rasterio affine transform object crs: A rasterio crs object Returns: results: a geopandas dataframe where every entry is the bounding box for a detected tree. """ warnings.warn( "This method is deprecated and will be " "removed in version DeepForest 2.0.0, " "please use boxes_to_shapefile which unifies project_boxes and " "annotations_to_shapefile functionalities", DeprecationWarning) # Convert image pixel locations to geographic coordinates xmin_coords, ymin_coords = rasterio.transform.xy(transform=transform, rows=df.ymin, cols=df.xmin, offset='center') xmax_coords, ymax_coords = rasterio.transform.xy(transform=transform, rows=df.ymax, cols=df.xmax, offset='center') # If there is only one tree, the above comes out as a float, not a list if type(xmin_coords) == float: xmin_coords = [xmin_coords] ymin_coords = [ymin_coords] xmax_coords = [xmax_coords] ymax_coords = [ymax_coords] # One box polygon for each tree bounding box box_coords = zip(xmin_coords, ymin_coords, xmax_coords, ymax_coords) box_geoms = [ shapely.geometry.box(xmin, ymin, xmax, ymax) for xmin, ymin, xmax, ymax in box_coords ] geodf = gpd.GeoDataFrame(df, geometry=box_geoms) geodf.crs = crs return geodf
[docs]def project_boxes(df, root_dir, transform=True): """ Convert from image coordinates to geographic coordinates Note that this assumes df is just a single plot being passed to this function df: a pandas type dataframe with columns: name, xmin, ymin, xmax, ymax. Name is the relative path to the root_dir arg. root_dir: directory of images to lookup image_path column transform: If true, convert from image to geographic coordinates """ warnings.warn( "This method is deprecated and will be removed in version " "DeepForest 2.0.0, please use boxes_to_shapefile which " "unifies project_boxes and annotations_to_shapefile functionalities", DeprecationWarning) plot_names = df.image_path.unique() if len(plot_names) > 1: raise ValueError("This function projects a single plots worth of data. " "Multiple plot names found {}".format(plot_names)) else: plot_name = plot_names[0] rgb_path = "{}/{}".format(root_dir, plot_name) with rasterio.open(rgb_path) as dataset: bounds = dataset.bounds pixelSizeX, pixelSizeY = dataset.res crs = dataset.crs if transform: # subtract origin. Recall that numpy origin is top left! Not bottom # left. df["xmin"] = (df["xmin"].astype(float) * pixelSizeX) + bounds.left df["xmax"] = (df["xmax"].astype(float) * pixelSizeX) + bounds.left df["ymin"] = bounds.top - (df["ymin"].astype(float) * pixelSizeY) df["ymax"] = bounds.top - (df["ymax"].astype(float) * pixelSizeY) # combine column to a shapely Box() object, save shapefile df['geometry'] = df.apply( lambda x: shapely.geometry.box(x.xmin, x.ymin, x.xmax, x.ymax), axis=1) df = gpd.GeoDataFrame(df, geometry='geometry') df.crs = crs return df