import os
import warnings
import geopandas as gpd
import numpy as np
import pandas as pd
import rasterio
import shapely
import xmltodict
import yaml
from tqdm import tqdm
from PIL import Image
from deepforest import _ROOT
import warnings
import geopandas as gpd
import rasterio
import shapely
from tqdm import tqdm
from deepforest import _ROOT
import json
import urllib.request
from huggingface_hub import hf_hub_download
from huggingface_hub.errors import RevisionNotFoundError, HfHubHTTPError
[docs]
def read_config(config_path):
"""Read config yaml file."""
try:
with open(config_path, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
except Exception as e:
raise FileNotFoundError("There is no config at {}, yields {}".format(
config_path, e))
return config
[docs]
class DownloadProgressBar(tqdm):
"""Download progress bar class."""
[docs]
def update_to(self, b=1, bsize=1, tsize=None):
"""
Update class attributes
Args:
b:
bsize:
tsize:
Returns:
"""
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
[docs]
def read_pascal_voc(xml_path):
"""Load annotations from xml format (e.g. RectLabel editor) and convert
them into retinanet annotations format.
Args:
xml_path (str): Path to the annotations xml, formatted by RectLabel
Returns:
Annotations (pandas dataframe): in the
format -> path-to-image.png,x1,y1,x2,y2,class_name
"""
# parse
with open(xml_path) as fd:
doc = xmltodict.parse(fd.read())
# grab xml objects
try:
tile_xml = doc["annotation"]["object"]
except Exception as e:
raise Exception("error {} for path {} with doc annotation{}".format(
e, xml_path, doc["annotation"]))
xmin = []
xmax = []
ymin = []
ymax = []
label = []
if isinstance(tile_xml, list):
# Construct frame if multiple trees
for tree in tile_xml:
xmin.append(tree["bndbox"]["xmin"])
xmax.append(tree["bndbox"]["xmax"])
ymin.append(tree["bndbox"]["ymin"])
ymax.append(tree["bndbox"]["ymax"])
label.append(tree['name'])
else:
xmin.append(tile_xml["bndbox"]["xmin"])
xmax.append(tile_xml["bndbox"]["xmax"])
ymin.append(tile_xml["bndbox"]["ymin"])
ymax.append(tile_xml["bndbox"]["ymax"])
label.append(tile_xml['name'])
rgb_name = os.path.basename(doc["annotation"]["filename"])
# set dtypes, check for floats and round
xmin = [round_with_floats(x) for x in xmin]
xmax = [round_with_floats(x) for x in xmax]
ymin = [round_with_floats(x) for x in ymin]
ymax = [round_with_floats(x) for x in ymax]
annotations = pd.DataFrame({
"image_path": rgb_name,
"xmin": xmin,
"ymin": ymin,
"xmax": xmax,
"ymax": ymax,
"label": label
})
return annotations
[docs]
def convert_point_to_bbox(gdf, buffer_size):
"""Convert an input point type annotation to a bounding box by buffering
the point with a fixed size.
Args:
gdf (GeoDataFrame): The input point type annotation.
buffer_size (float): The size of the buffer to be applied to the point.
Returns:
gdf (GeoDataFrame): The output bounding box type annotation.
"""
# define in image coordinates and buffer to create a box
gdf["geometry"] = [
shapely.geometry.Point(x, y)
for x, y in zip(gdf.geometry.x.astype(float), gdf.geometry.y.astype(float))
]
gdf["geometry"] = [
shapely.geometry.box(left, bottom, right, top)
for left, bottom, right, top in gdf.geometry.buffer(buffer_size).bounds.values
]
return gdf
[docs]
def xml_to_annotations(xml_path):
warnings.warn(
"xml_to_annotations will be deprecated in 2.0. Please use read_pascal_voc instead.",
DeprecationWarning)
return read_pascal_voc(xml_path)
# TO DO -> Should this whole function hae a deprecation warning? Shouldn't users just use the read_file function?
[docs]
def shapefile_to_annotations(shapefile,
rgb=None,
root_dir=None,
buffer_size=None,
convert_point=False,
geometry_type=None,
save_dir=None):
"""Convert a shapefile of annotations into annotations csv file for
DeepForest training and evaluation.
Args:
shapefile: Path to a shapefile on disk. If a label column is present, it will be used, else all labels are assumed to be "Tree"
rgb: Path to the RGB image on disk
root_dir: Optional directory to prepend to the image_path column
Returns:
results: a pandas dataframe
"""
# Deprecation of previous arguments
if geometry_type:
warnings.warn(
"geometry_type argument is deprecated and will be removed in DeepForest 2.0. The function will infer geometry from the shapefile directly.",
DeprecationWarning)
if save_dir:
warnings.warn(
"save_dir argument is deprecated and will be removed in DeepForest 2.0. The function will return a pandas dataframe instead of saving to disk.",
DeprecationWarning)
# Read shapefile
if isinstance(shapefile, str):
gdf = gpd.read_file(shapefile)
else:
gdf = shapefile.copy(deep=True)
if rgb is None:
if "image_path" not in gdf.columns:
raise ValueError(
"No image_path column found in shapefile, please specify rgb path")
else:
rgb = gdf.image_path.unique()[0]
print("Found image_path column in shapefile, using {}".format(rgb))
# Determine geometry type and report to user
if gdf.geometry.type.unique().shape[0] > 1:
raise ValueError(
"Multiple geometry types found in shapefile. Please ensure all geometries are of the same type."
)
else:
geometry_type = gdf.geometry.type.unique()[0]
print("Geometry type of shapefile is {}".format(geometry_type))
# Convert point to bounding box if desired
if convert_point:
if geometry_type == "Point":
if buffer_size is None:
raise ValueError(
"buffer_size must be specified to convert point to bounding box")
gdf = convert_point_to_bbox(gdf, buffer_size)
else:
raise ValueError("convert_point is True, but geometry type is not Point")
# raster bounds
if root_dir:
rgb = os.path.join(root_dir, rgb)
with rasterio.open(rgb) as src:
raster_crs = src.crs
if gdf.crs:
# If epsg is 4326, then the buffer size is in degrees, not meters, see https://github.com/weecology/DeepForest/issues/694
if gdf.crs.to_string() == "EPSG:4326":
raise ValueError(
"The shapefile crs is in degrees. This function works for UTM and meter based crs only. see https://github.com/weecology/DeepForest/issues/694"
)
# Check matching the crs
if not gdf.crs.to_string() == raster_crs.to_string():
warnings.warn(
"The shapefile crs {} does not match the image crs {}".format(
gdf.crs.to_string(), src.crs.to_string()), UserWarning)
if src.crs is not None:
print("CRS of shapefile is {}".format(src.crs))
gdf = geo_to_image_coordinates(gdf, src.bounds, src.res[0])
# check for label column
if "label" not in gdf.columns:
raise ValueError(
"No label column found in shapefile. Please add a column named 'label' to your shapefile."
)
else:
gdf["label"] = gdf["label"]
# add filename
gdf["image_path"] = os.path.basename(rgb)
return gdf
[docs]
def determine_geometry_type(df):
"""Determine the geometry type of a prediction or annotation
Args:
df: a pandas dataframe
Returns:
geometry_type: a string of the geometry type
"""
if type(df) in [pd.DataFrame, gpd.GeoDataFrame]:
columns = df.columns
if "geometry" in columns:
df = gpd.GeoDataFrame(geometry=df['geometry'])
geometry_type = df.geometry.type.unique()[0]
if geometry_type == "Polygon":
if (df.geometry.area == df.envelope.area).all():
return 'box'
else:
return 'polygon'
else:
return 'point'
elif "xmin" in columns and "ymin" in columns and "xmax" in columns and "ymax" in columns:
geometry_type = "box"
elif "polygon" in columns:
geometry_type = "polygon"
elif "x" in columns and "y" in columns:
geometry_type = 'point'
else:
raise ValueError(
"Could not determine geometry type from columns {}".format(columns))
elif type(df) == dict:
if 'boxes' in df.keys():
geometry_type = "box"
elif 'polygon' in df.keys():
geometry_type = "polygon"
elif 'points' in df.keys():
geometry_type = "point"
return geometry_type
[docs]
def read_file(input, root_dir=None):
"""Read a file and return a geopandas dataframe.
This is the main entry point for reading annotations into deepforest.
Args:
input: a path to a file or a pandas dataframe
root_dir (str): location of the image files, if not in the same directory as the annotations file
Returns:
df: a geopandas dataframe with the properly formatted geometry column
df.root_dir: the root directory of the image files
"""
# read file
if isinstance(input, str):
if input.endswith(".csv"):
df = pd.read_csv(input)
elif input.endswith((".shp", ".gpkg")):
df = shapefile_to_annotations(input, root_dir=root_dir)
elif input.endswith(".xml"):
df = read_pascal_voc(input)
else:
raise ValueError(
"File type {} not supported. DeepForest currently supports .csv, .shp or .xml files. See https://deepforest.readthedocs.io/en/latest/annotation.html "
.format(df))
else:
if type(input) == pd.DataFrame:
df = input.copy(deep=True)
elif type(input) == gpd.GeoDataFrame:
return shapefile_to_annotations(input, root_dir=root_dir)
else:
raise ValueError(
"Input must be a path to a file, geopandas or a pandas dataframe")
if type(df) == pd.DataFrame:
if df.empty:
raise ValueError("No annotations in dataframe")
# If the geometry column is present, convert to geodataframe directly
if "geometry" in df.columns:
df['geometry'] = gpd.GeoSeries.from_wkt(df['geometry'])
df.crs = None
else:
# Detect geometry type
geom_type = determine_geometry_type(df)
# Check for uppercase names and set to lowercase
df.columns = [x.lower() for x in df.columns]
# convert to geodataframe
if geom_type == "box":
df['geometry'] = df.apply(
lambda x: shapely.geometry.box(x.xmin, x.ymin, x.xmax, x.ymax),
axis=1)
elif geom_type == "polygon":
df['geometry'] = gpd.GeoSeries.from_wkt(df["polygon"])
elif geom_type == "point":
df["geometry"] = [
shapely.geometry.Point(x, y)
for x, y in zip(df.x.astype(float), df.y.astype(float))
]
else:
raise ValueError("Geometry type {} not supported".format(geom_type))
# convert to geodataframe
df = gpd.GeoDataFrame(df, geometry='geometry')
# If root_dir is specified, add as attribute
if root_dir is not None:
df.root_dir = root_dir
else:
try:
df.root_dir = os.path.dirname(input)
except TypeError:
warnings.warn(
"root_dir argument for the location of images should be specified if input is not a path, returning without results.root_dir attribute",
UserWarning)
return df
[docs]
def crop_raster(bounds, rgb_path=None, savedir=None, filename=None, driver="GTiff"):
"""
Crop a raster to a bounding box, save as projected or unprojected crop
Args:
bounds: a tuple of (left, bottom, right, top) bounds
rgb_path: path to the rgb image
savedir: directory to save the crop
filename: filename to save the crop "{}.tif".format(filename)"
driver: rasterio driver to use, default to GTiff, can be 'GTiff' for projected data or 'PNG' unprojected data
Returns:
filename: path to the saved crop, if savedir specified
img: a numpy array of the crop, if savedir not specified
"""
left, bottom, right, top = bounds
src = rasterio.open(rgb_path)
if src.crs is None:
# Read unprojected data using PIL and crop numpy array
img = np.array(Image.open(rgb_path))
img = img[bottom:top, left:right, :]
img = np.rollaxis(img, 2, 0)
cropped_transform = None
if driver == "GTiff":
warnings.warn(
"Driver {} not supported for unprojected data, setting to 'PNG',".format(
driver), UserWarning)
driver = "PNG"
else:
# Read projected data using rasterio and crop
img = src.read(window=rasterio.windows.from_bounds(
left, bottom, right, top, transform=src.transform))
cropped_transform = rasterio.windows.transform(
rasterio.windows.from_bounds(left,
bottom,
right,
top,
transform=src.transform), src.transform)
if img.size == 0:
raise ValueError("Bounds {} does not create a valid crop for source {}".format(
bounds, src.transform))
if savedir:
res = src.res[0]
height = (top - bottom) / res
width = (right - left) / res
# Write the cropped image to disk with transform
if not os.path.exists(savedir):
os.makedirs(savedir)
if driver == "GTiff":
filename = "{}/{}.tif".format(savedir, filename)
with rasterio.open(filename,
"w",
driver="GTiff",
height=height,
width=width,
count=img.shape[0],
dtype=img.dtype,
transform=cropped_transform) as dst:
dst.write(img)
elif driver == "PNG":
# PNG driver does not support transform
filename = "{}/{}.png".format(savedir, filename)
with rasterio.open(filename,
"w",
driver="PNG",
height=height,
width=width,
count=img.shape[0],
dtype=img.dtype) as dst:
dst.write(img)
else:
raise ValueError("Driver {} not supported".format(driver))
if savedir:
return filename
else:
return img
[docs]
def crop_annotations_to_bounds(gdf, bounds):
"""
Crop a geodataframe of annotations to a bounding box
Args:
gdf: a geodataframe of annotations
bounds: a tuple of (left, bottom, right, top) bounds
Returns:
gdf: a geodataframe of annotations cropped to the bounds
"""
# unpack image bounds
left, bottom, right, top = bounds
# Crop the annotations
gdf.geometry = gdf.geometry.translate(xoff=-left, yoff=-bottom)
return gdf
[docs]
def geo_to_image_coordinates(gdf, image_bounds, image_resolution):
"""
Convert from projected coordinates to image coordinates
Args:
gdf: a pandas type dataframe with columns: name, xmin, ymin, xmax, ymax. Name is the relative path to the root_dir arg.
image_bounds: bounds of the image
image_resolution: resolution of the image
Returns:
gdf: a geopandas dataframe with the transformed to image origin. CRS is removed
"""
if len(image_bounds) != 4:
raise ValueError("image_bounds must be a tuple of (left, bottom, right, top)")
transformed_gdf = gdf.copy(deep=True)
# unpack image bounds
left, bottom, right, top = image_bounds
transformed_gdf.geometry = transformed_gdf.geometry.translate(xoff=-left, yoff=-top)
transformed_gdf.geometry = transformed_gdf.geometry.scale(xfact=1 / image_resolution,
yfact=-1 / image_resolution,
origin=(0, 0))
transformed_gdf.crs = None
return transformed_gdf
[docs]
def round_with_floats(x):
"""Check if string x is float or int, return int, rounded if needed."""
try:
result = int(x)
except BaseException:
warnings.warn(
"Annotations file contained non-integer coordinates. "
"These coordinates were rounded to nearest int. "
"All coordinates must correspond to pixels in the image coordinate system. "
"If you are attempting to use projected data, "
"first convert it into image coordinates see FAQ for suggestions.")
result = int(np.round(float(x)))
return result
[docs]
def check_image(image):
"""Check an image is three channel, channel last format
Args:
image: numpy array
Returns: None, throws error on assert
"""
if not image.shape[2] == 3:
raise ValueError("image is expected have three channels, channel last format, "
"found image with shape {}".format(image.shape))
[docs]
def image_to_geo_coordinates(gdf, root_dir, flip_y_axis=False):
"""Convert from image coordinates to geographic coordinates.
Args:
gdf: A geodataframe.
root_dir: Directory of images to lookup image_path column.
flip_y_axis: If True, reflect predictions over y axis to align with raster data in QGIS, which uses a negative y origin compared to numpy.
Returns:
transformed_gdf: A geospatial dataframe with the boxes optionally transformed to the target crs.
"""
transformed_gdf = gdf.copy(deep=True)
plot_names = transformed_gdf.image_path.unique()
if len(plot_names) > 1:
raise ValueError(
"This function projects a single plot's worth of data. Multiple plot names found: {}"
.format(plot_names))
else:
plot_name = plot_names[0]
rgb_path = "{}/{}".format(root_dir, plot_name)
with rasterio.open(rgb_path) as dataset:
bounds = dataset.bounds
left, bottom, right, top = bounds
pixelSizeX, pixelSizeY = dataset.res
crs = dataset.crs
transform = dataset.transform
geom_type = determine_geometry_type(transformed_gdf)
projected_geometry = []
if geom_type == "box":
# Convert image pixel locations to geographic coordinates
coordinates = transformed_gdf.geometry.bounds
xmin_coords, ymin_coords = rasterio.transform.xy(transform=transform,
rows=coordinates.miny,
cols=coordinates.minx,
offset='center')
xmax_coords, ymax_coords = rasterio.transform.xy(transform=transform,
rows=coordinates.maxy,
cols=coordinates.maxx,
offset='center')
for left, bottom, right, top in zip(xmin_coords, ymin_coords, xmax_coords,
ymax_coords):
geom = shapely.geometry.box(left, bottom, right, top)
projected_geometry.append(geom)
elif geom_type == "polygon":
for geom in transformed_gdf.geometry:
polygon_vertices = []
for x, y in geom.exterior.coords:
projected_vertices = rasterio.transform.xy(transform=transform,
rows=y,
cols=x,
offset='center')
polygon_vertices.append(projected_vertices)
geom = shapely.geometry.Polygon(polygon_vertices)
projected_geometry.append(geom)
elif geom_type == "point":
x_coords, y_coords = rasterio.transform.xy(transform=transform,
rows=transformed_gdf.geometry.y,
cols=transformed_gdf.geometry.x,
offset='center')
for x, y in zip(x_coords, y_coords):
geom = shapely.geometry.Point(x, y)
projected_geometry.append(geom)
transformed_gdf.geometry = projected_geometry
if flip_y_axis:
# Numpy uses top left 0,0 origin, flip along y axis.
# See https://gis.stackexchange.com/questions/306684/why-does-qgis-use-negative-y-spacing-in-the-default-raster-geotransform
transformed_gdf.geometry = transformed_gdf.geometry.scale(xfact=1,
yfact=-1,
origin=(0, 0))
# Assign crs
transformed_gdf.crs = crs
return transformed_gdf
[docs]
def collate_fn(batch):
batch = list(filter(lambda x: x is not None, batch))
return tuple(zip(*batch))
[docs]
def boxes_to_shapefile(df, root_dir, projected=True, flip_y_axis=False):
"""
Convert from image coordinates to geographic coordinates
Note that this assumes df is just a single plot being passed to this function
Args:
df: a pandas type dataframe with columns: name, xmin, ymin, xmax, ymax. Name is the relative path to the root_dir arg.
root_dir: directory of images to lookup image_path column
projected: If True, convert from image to geographic coordinates, if False, keep in image coordinate system
flip_y_axis: If True, reflect predictions over y axis to align with raster data in QGIS, which uses a negative y origin compared to numpy. See https://gis.stackexchange.com/questions/306684/why-does-qgis-use-negative-y-spacing-in-the-default-raster-geotransform
Returns:
df: a geospatial dataframe with the boxes optionally transformed to the target crs
"""
warnings.warn(
"This function will be deprecated in DeepForest 2.0, as it only can process boxes and the API now includes point and polygon annotations. Please use image_to_geo_coordinates instead.",
DeprecationWarning)
# Raise a warning and confirm if a user sets projected to True when flip_y_axis is True.
if flip_y_axis and projected:
warnings.warn(
"flip_y_axis is {}, and projected is {}. In most cases, projected should be False when inverting y axis. Setting projected=False"
.format(flip_y_axis, projected), UserWarning)
projected = False
plot_names = df.image_path.unique()
if len(plot_names) > 1:
raise ValueError("This function projects a single plots worth of data. "
"Multiple plot names found {}".format(plot_names))
else:
plot_name = plot_names[0]
rgb_path = "{}/{}".format(root_dir, plot_name)
with rasterio.open(rgb_path) as dataset:
bounds = dataset.bounds
pixelSizeX, pixelSizeY = dataset.res
crs = dataset.crs
transform = dataset.transform
if projected:
# Convert image pixel locations to geographic coordinates
xmin_coords, ymin_coords = rasterio.transform.xy(transform=transform,
rows=df.ymin,
cols=df.xmin,
offset='center')
xmax_coords, ymax_coords = rasterio.transform.xy(transform=transform,
rows=df.ymax,
cols=df.xmax,
offset='center')
# One box polygon for each tree bounding box
# Careful of single row edge case where
# xmin_coords comes out not as a list, but as a float
if type(xmin_coords) == float:
xmin_coords = [xmin_coords]
ymin_coords = [ymin_coords]
xmax_coords = [xmax_coords]
ymax_coords = [ymax_coords]
box_coords = zip(xmin_coords, ymin_coords, xmax_coords, ymax_coords)
box_geoms = [
shapely.geometry.box(xmin, ymin, xmax, ymax)
for xmin, ymin, xmax, ymax in box_coords
]
geodf = gpd.GeoDataFrame(df, geometry=box_geoms)
geodf.crs = crs
return geodf
else:
if flip_y_axis:
# See https://gis.stackexchange.com/questions/306684/why-does-qgis-use-negative-y-spacing-in-the-default-raster-geotransform
# Numpy uses top left 0,0 origin, flip along y axis.
df['geometry'] = df.apply(
lambda x: shapely.geometry.box(x.xmin, -x.ymin, x.xmax, -x.ymax), axis=1)
else:
df['geometry'] = df.apply(
lambda x: shapely.geometry.box(x.xmin, x.ymin, x.xmax, x.ymax), axis=1)
df = gpd.GeoDataFrame(df, geometry="geometry")
return df
[docs]
def annotations_to_shapefile(df, transform, crs):
"""Convert output from predict_image and predict_tile to a geopandas
data.frame.
Args:
df: prediction data.frame with columns ['xmin','ymin','xmax','ymax','label','score']
transform: A rasterio affine transform object
crs: A rasterio crs object
Returns:
results: a geopandas dataframe where every entry is the bounding box for a detected tree.
"""
raise NotImplementedError(
"This function is deprecated. Please use image_to_geo_coordinates instead.")
[docs]
def project_boxes(df, root_dir, transform=True):
"""
Convert from image coordinates to geographic coordinates
Note that this assumes df is just a single plot being passed to this function
df: a pandas type dataframe with columns: name, xmin, ymin, xmax, ymax.
Name is the relative path to the root_dir arg.
root_dir: directory of images to lookup image_path column
transform: If true, convert from image to geographic coordinates
"""
raise NotImplementedError(
"This function is deprecated. Please use image_to_geo_coordinates instead.")
return df