# Deepforest Preprocessing model
"""The preprocessing module is used to reshape data into format suitable for
training or prediction.
For example cutting large tiles into smaller images.
"""
import os
import numpy as np
import pandas as pd
import slidingwindow
from PIL import Image
import torch
import warnings
import rasterio
import geopandas as gpd
from deepforest.utilities import read_file, determine_geometry_type
from shapely import geometry
[docs]
def preprocess_image(image):
"""Preprocess a single RGB numpy array as a prediction from channels last,
to channels first."""
image = torch.tensor(image).permute(2, 0, 1)
image = image / 255
return image
[docs]
def image_name_from_path(image_path):
"""Convert path to image name for use in indexing."""
image_name = os.path.basename(image_path)
image_name = os.path.splitext(image_name)[0]
return image_name
[docs]
def compute_windows(numpy_image, patch_size, patch_overlap):
"""Create a sliding window object from a raster tile.
Args:
numpy_image (array): Raster object as numpy array to cut into crops
Returns:
windows (list): a sliding windows object
"""
if patch_overlap > 1:
raise ValueError("Patch overlap {} must be between 0 - 1".format(patch_overlap))
# Generate overlapping sliding windows
windows = slidingwindow.generate(numpy_image,
slidingwindow.DimOrder.HeightWidthChannel,
patch_size, patch_overlap)
return (windows)
[docs]
def select_annotations(annotations, window):
"""Select annotations that overlap with selected image crop.
Args:
annotations: a geopandas dataframe of annotations with a geometry column
windows: A sliding window object (see compute_windows)
Returns:
selected_annotations: a pandas dataframe of annotations
"""
# Get window coordinates
window_xmin, window_ymin, w, h = window.getRect()
# Create a shapely box from the window
window_box = geometry.box(window_xmin, window_ymin, window_xmin + w, window_ymin + h)
selected_annotations = annotations[annotations.intersects(window_box)]
# cut off any annotations over the border
original_area = selected_annotations.geometry.area
clipped_annotations = gpd.clip(selected_annotations, window_box)
if clipped_annotations.empty:
return clipped_annotations
# For points, keep all annotations.
if selected_annotations.iloc[0].geometry.geom_type == "Point":
return selected_annotations
else:
# Only keep clipped boxes if they are more than 50% of the original size.
clipped_area = clipped_annotations.geometry.area
clipped_annotations = clipped_annotations[(clipped_area / original_area) > 0.5]
clipped_annotations.geometry = clipped_annotations.geometry.translate(
xoff=-window_xmin, yoff=-window_ymin)
# Update xmin, ymin, xmax, ymax based on the clipped annotations' geometry
if not clipped_annotations.empty and determine_geometry_type(
clipped_annotations) == "box":
if clipped_annotations.shape[0] > 0:
clipped_annotations['xmin'] = clipped_annotations.geometry.bounds.minx
clipped_annotations['ymin'] = clipped_annotations.geometry.bounds.miny
clipped_annotations['xmax'] = clipped_annotations.geometry.bounds.maxx
clipped_annotations['ymax'] = clipped_annotations.geometry.bounds.maxy
return clipped_annotations
[docs]
def save_crop(base_dir, image_name, index, crop):
"""Save window crop as an image file to be read by PIL.
Args:
base_dir (str): The base directory to save the image file.
image_name (str): The name of the original image.
index (int): The index of the window crop.
crop (numpy.ndarray): The window crop as a NumPy array.
Returns:
str: The filename of the saved image.
"""
# Create directory if needed
if not os.path.exists(base_dir):
os.makedirs(base_dir)
# Convert NumPy array to PIL image
im = Image.fromarray(crop)
# Extract the basename of the image
image_basename = os.path.splitext(image_name)[0]
# Generate the filename for the saved image
filename = "{}/{}_{}.png".format(base_dir, image_basename, index)
# Save the image
im.save(filename)
return filename
[docs]
def split_raster(annotations_file=None,
path_to_raster=None,
numpy_image=None,
root_dir=None,
base_dir=None,
patch_size=400,
patch_overlap=0.05,
allow_empty=False,
image_name=None,
save_dir="."):
"""Divide a large tile into smaller arrays. Each crop will be saved to
file.
Args:
numpy_image: a numpy object to be used as a raster, usually opened from rasterio.open.read(), in order (height, width, channels)
root_dir: (str): Root directory of annotations file, if not supplied, will be inferred from annotations_file
path_to_raster: (str): Path to a tile that can be read by rasterio on disk
annotations_file (str or pd.DataFrame): A pandas dataframe or path to annotations csv file to transform to cropped images. In the format -> image_path, xmin, ymin, xmax, ymax, label. If None, allow_empty is ignored and the function will only return the cropped images.
save_dir (str): Directory to save images
base_dir (str): Directory to save images
patch_size (int): Maximum dimensions of square window
patch_overlap (float): Percent of overlap among windows 0->1
allow_empty: If True, include images with no annotations
to be included in the dataset. If annotations_file is None, this is ignored.
image_name (str): If numpy_image arg is used, what name to give the raster?
Note:
When allow_empty is True, the function will return 0's for coordinates, following torchvision style, the label will be ignored, so for continuity, the first label in the annotations_file will be used.
Returns:
If annotations_file is provided, a pandas dataframe with annotations file for training. A copy of this file is written to save_dir as a side effect.
If not, a list of filenames of the cropped images.
"""
# Set deprecation warning for base_dir and set to save_dir
if base_dir:
warnings.warn(
"base_dir argument will be deprecated in 2.0. The naming is confusing, the rest of the API uses 'save_dir' to refer to location of images. Please use 'save_dir' argument.",
DeprecationWarning)
save_dir = base_dir
# Load raster as image
if numpy_image is None and path_to_raster is None:
raise IOError("Supply a raster either as a path_to_raster or if ready "
"from existing in-memory numpy object, as numpy_image=")
if path_to_raster:
numpy_image = rasterio.open(path_to_raster).read()
numpy_image = np.moveaxis(numpy_image, 0, 2)
else:
if image_name is None:
raise IOError("If passing a numpy_image, please also specify an image_name"
" to match the column in the annotation.csv file")
# Confirm that raster is H x W x C, if not, convert, assuming image is wider/taller than channels
if numpy_image.shape[0] < numpy_image.shape[-1]:
warnings.warn(
"Input rasterio had shape {}, assuming channels first. Converting to channels last"
.format(numpy_image.shape), UserWarning)
numpy_image = np.moveaxis(numpy_image, 0, 2)
# Check that it's 3 bands
bands = numpy_image.shape[2]
if not bands == 3:
warnings.warn(
"Input rasterio had non-3 band shape of {}, ignoring "
"alpha channel".format(numpy_image.shape), UserWarning)
try:
numpy_image = numpy_image[:, :, :3].astype("uint8")
except:
raise IOError("Input file {} has {} bands. "
"DeepForest only accepts 3 band RGB rasters in the order "
"(height, width, channels). "
"Selecting the first three bands failed, "
"please reshape manually. If the image was cropped and "
"saved as a .jpg, please ensure that no alpha channel "
"was used.".format(path_to_raster, bands))
# Check that patch size is greater than image size
height, width = numpy_image.shape[0], numpy_image.shape[1]
if any(np.array([height, width]) < patch_size):
raise ValueError("Patch size of {} is larger than the image dimensions {}".format(
patch_size, [height, width]))
# Compute sliding window index
windows = compute_windows(numpy_image, patch_size, patch_overlap)
# Get image name for indexing
if image_name is None:
image_name = os.path.basename(path_to_raster)
# Load annotations file and coerce dtype
if annotations_file is None:
allow_empty = True
elif type(annotations_file) == str:
annotations = read_file(annotations_file, root_dir=root_dir)
elif type(annotations_file) == pd.DataFrame:
if root_dir is None:
raise ValueError(
"If passing a pandas DataFrame with relative pathnames in image_path, please also specify a root_dir"
)
annotations = read_file(annotations_file, root_dir=root_dir)
elif type(annotations_file) == gpd.GeoDataFrame:
annotations = annotations_file
else:
raise TypeError(
"Annotations file must either be None, a path, Pandas Dataframe, or Geopandas GeoDataFrame, found {}"
.format(type(annotations_file)))
# Select matching annotations
if annotations_file is not None:
image_annotations = annotations[annotations.image_path == image_name]
image_basename = os.path.splitext(image_name)[0]
image_basename = os.path.splitext(image_name)[0]
# Sanity checks
if not allow_empty:
if image_annotations.empty:
raise ValueError(
"No image names match between the file:{} and the image_path: {}. "
"Reminder that image paths should be the relative "
"path (e.g. 'image_name.tif'), not the full path "
"(e.g. path/to/dir/image_name.tif)".format(annotations_file, image_name))
annotations_files = []
crop_filenames = []
for index, window in enumerate(windows):
# Crop image
crop = numpy_image[windows[index].indices()]
# Skip if empty crop
if crop.size == 0:
continue
# Find annotations, image_name is the basename of the path
if annotations_file is not None:
crop_annotations = select_annotations(image_annotations,
window=windows[index])
crop_annotations["image_path"] = "{}_{}.png".format(image_basename, index)
if crop_annotations.empty:
if allow_empty:
geom_type = determine_geometry_type(image_annotations)
# The safest thing is to use the first label and it will be ignored
crop_annotations.loc[0, "label"] = image_annotations.label.unique()[0]
crop_annotations.loc[0, "image_path"] = "{}_{}.png".format(
image_basename, index)
if geom_type == "box":
crop_annotations.loc[0, "xmin"] = 0
crop_annotations.loc[0, "ymin"] = 0
crop_annotations.loc[0, "xmax"] = 0
crop_annotations.loc[0, "ymax"] = 0
elif geom_type == "point":
crop_annotations.loc[0, "geometry"] = geometry.Point(0, 0)
crop_annotations.loc[0, "x"] = 0
crop_annotations.loc[0, "y"] = 0
elif geom_type == "polygon":
crop_annotations.loc[0, "geometry"] = geometry.Polygon([(0, 0),
(0, 0),
(0, 0)])
crop_annotations.loc[0, "polygon"] = 0
else:
continue
annotations_files.append(crop_annotations)
# Save image crop
if allow_empty or crop_annotations is not None:
crop_filename = save_crop(save_dir, image_name, index, crop)
crop_filenames.append(crop_filename)
if annotations_file is None:
return crop_filenames
elif len(annotations_files) == 0:
raise ValueError(
"Input file has no overlapping annotations and allow_empty is {}".format(
allow_empty))
else:
annotations_files = pd.concat(annotations_files)
# Checkpoint csv files, useful for parallelization
# use the filename of the raster path to save the annotations
image_basename = os.path.splitext(image_name)[0]
file_path = os.path.join(save_dir, f"{image_basename}.csv")
annotations_files.to_csv(file_path, index=False, header=True)
return annotations_files