Source code for deepforest.preprocess

# Deepforest Preprocessing model
"""The preprocessing module is used to reshape data into format suitable for
training or prediction.

For example cutting large tiles into smaller images.
"""

import os
import warnings

import geopandas as gpd
import numpy as np
import pandas as pd
import slidingwindow
import torch
from PIL import Image
from shapely import geometry

from deepforest.utilities import determine_geometry_type, read_file


[docs]def preprocess_image(image): """Preprocess a single RGB numpy array as a prediction from channels last, to channels first.""" image = torch.tensor(image).permute(2, 0, 1) image = image / 255 return image
[docs]def image_name_from_path(image_path): """Convert path to image name for use in indexing.""" image_name = os.path.basename(image_path) image_name = os.path.splitext(image_name)[0] return image_name
[docs]def compute_windows(numpy_image, patch_size, patch_overlap): """Create a sliding window object from a raster tile. Args: numpy_image (array): Raster object as numpy array to cut into crops, channels first Returns: windows (list): a sliding windows object """ if patch_overlap > 1: raise ValueError(f"Patch overlap {patch_overlap} must be between 0 - 1") # Check that image is channels first if numpy_image.shape[0] != 3: raise ValueError(f"Image is not channels first, shape is {numpy_image.shape}") # Generate overlapping sliding windows windows = slidingwindow.generate( numpy_image, slidingwindow.DimOrder.ChannelHeightWidth, patch_size, patch_overlap ) return windows
[docs]def select_annotations(annotations, window): """Select annotations that overlap with selected image crop. Args: annotations: a geopandas dataframe of annotations with a geometry column windows: A sliding window object (see compute_windows) Returns: selected_annotations: a pandas dataframe of annotations """ # Get window coordinates window_xmin, window_ymin, w, h = window.getRect() # Create a shapely box from the window window_box = geometry.box(window_xmin, window_ymin, window_xmin + w, window_ymin + h) selected_annotations = annotations[annotations.intersects(window_box)] # cut off any annotations over the border original_area = selected_annotations.geometry.area clipped_annotations = gpd.clip(selected_annotations, window_box) if clipped_annotations.empty: return clipped_annotations # For points, keep all annotations but translate to window origin if selected_annotations.iloc[0].geometry.geom_type == "Point": if clipped_annotations.empty: return clipped_annotations # Translate point coordinates to be relative to the top-left of the window clipped_annotations.geometry = clipped_annotations.geometry.translate( xoff=-window_xmin, yoff=-window_ymin ) # Update convenience x/y columns if they exist or create them clipped_annotations["x"] = clipped_annotations.geometry.x clipped_annotations["y"] = clipped_annotations.geometry.y return clipped_annotations else: # Keep clipped boxes if they're more than 50% of original size clipped_area = clipped_annotations.geometry.area clipped_annotations = clipped_annotations[(clipped_area / original_area) > 0.5] clipped_annotations.geometry = clipped_annotations.geometry.translate( xoff=-window_xmin, yoff=-window_ymin ) # Update xmin, ymin, xmax, ymax from clipped geometry if ( not clipped_annotations.empty and determine_geometry_type(clipped_annotations) == "box" ): if clipped_annotations.shape[0] > 0: clipped_annotations["xmin"] = clipped_annotations.geometry.bounds.minx clipped_annotations["ymin"] = clipped_annotations.geometry.bounds.miny clipped_annotations["xmax"] = clipped_annotations.geometry.bounds.maxx clipped_annotations["ymax"] = clipped_annotations.geometry.bounds.maxy return clipped_annotations
[docs]def save_crop(base_dir, image_name, index, crop): """Save window crop as an image file to be read by PIL. Args: base_dir (str): The base directory to save the image file. image_name (str): The name of the original image. index (int): The index of the window crop. crop (numpy.ndarray): The window crop as a NumPy array. Returns: str: The filename of the saved image. """ # Create directory if needed if not os.path.exists(base_dir): os.makedirs(base_dir) # Convert NumPy array to PIL image, PIL expects channels last crop = np.moveaxis(crop, 0, 2) im = Image.fromarray(crop) # Extract the basename of the image image_basename = os.path.splitext(image_name)[0] # Generate the filename for the saved image filename = f"{base_dir}/{image_basename}_{index}.png" # Save the image im.save(filename) return filename
[docs]def split_raster( annotations_file=None, path_to_raster=None, numpy_image=None, root_dir=None, patch_size=400, patch_overlap=0.05, allow_empty=False, image_name=None, save_dir=".", ): """Split a large raster into smaller patches for processing. Args: annotations_file: Path to annotations CSV or DataFrame with columns: image_path, xmin, ymin, xmax, ymax, label path_to_raster: Path to raster file on disk numpy_image: Numpy array in (channels, height, width) order root_dir: Root directory for annotations file patch_size: Size of square patches patch_overlap: Overlap between patches (0-1) allow_empty: Include patches with no annotations image_name: Name for the raster image save_dir: Directory to save patches Returns: DataFrame with annotations for training, or list of patch filenames """ # Load raster as image if numpy_image is None and path_to_raster is None: raise OSError( "Supply a raster either as a path_to_raster or if ready " "from existing in-memory numpy object, as numpy_image=" ) if path_to_raster: numpy_image = Image.open(path_to_raster) numpy_image = np.array(numpy_image) else: if image_name is None: raise OSError( "If passing a numpy_image, please also specify an image_name" " to match the column in the annotation.csv file" ) # Convert from channels-last (H x W x C) to channels-first (C x H x W) if numpy_image.shape[2] in [3, 4]: print( f"Image shape is {numpy_image.shape[2]}, assuming this is channels last, " "converting to channels first" ) numpy_image = numpy_image.transpose(2, 0, 1) # Check that it's 3 bands bands = numpy_image.shape[2] if not bands == 3: warnings.warn( f"Input image had non-3 band shape of {numpy_image.shape}, selecting first 3 bands", UserWarning, stacklevel=2, ) try: numpy_image = numpy_image[:3, :, :].astype("uint8") except Exception: raise OSError( f"Input file {path_to_raster} has {bands} bands. " "DeepForest only accepts 3 band RGB rasters in the order " "(channels, height, width). " "Selecting the first three bands failed, " "please reshape manually. If the image was cropped and " "saved, please ensure that no alpha channel " "was used." ) from None # Check that patch size is greater than image size height, width = numpy_image.shape[1], numpy_image.shape[2] if any(np.array([height, width]) < patch_size): raise ValueError( f"Patch size of {patch_size} is larger than the image dimensions {[height, width]}" ) # Compute sliding window index windows = compute_windows(numpy_image, patch_size, patch_overlap) # Get image name for indexing if image_name is None: image_name = os.path.basename(path_to_raster) # Early return for non-annotation case if annotations_file is None: return process_without_annotations(numpy_image, windows, image_name, save_dir) # Process annotations annotations = load_annotations(annotations_file, root_dir) validate_annotations(annotations, numpy_image, path_to_raster) image_annotations = annotations[annotations.image_path == image_name] if not allow_empty and image_annotations.empty: raise ValueError( f"No image names match between the file:{annotations_file} and the image_path: {image_name}. " "Reminder that image paths should be the relative " "path (e.g. 'image_name.tif'), not the full path " "(e.g. path/to/dir/image_name.tif)" ) return process_with_annotations( numpy_image=numpy_image, windows=windows, image_annotations=image_annotations, image_name=image_name, save_dir=save_dir, allow_empty=allow_empty, )
[docs]def load_annotations(annotations_file, root_dir): """Load and validate annotations file. Args: annotations_file: Path to file, DataFrame, or GeoDataFrame root_dir: Root directory for relative paths Returns: GeoDataFrame with annotations """ if isinstance(annotations_file, str): return read_file(annotations_file, root_dir=root_dir) elif isinstance(annotations_file, gpd.GeoDataFrame): return annotations_file elif isinstance(annotations_file, pd.DataFrame): if root_dir is None: raise ValueError( "If passing a pandas DataFrame with relative pathnames in " "image_path, please also specify a root_dir" ) return read_file(annotations_file, root_dir=root_dir) else: raise TypeError( f"Annotations file must either be a path, Pandas Dataframe, or Geopandas GeoDataFrame, found {type(annotations_file)}" )
[docs]def validate_annotations(annotations, numpy_image, path_to_raster): """Validate annotation coordinate systems and bounds.""" if hasattr(annotations, "crs"): if annotations.crs is not None and annotations.crs.is_geographic: raise ValueError( "Annotations appear to be in geographic coordinates (latitude/longitude). " "Please convert your annotations to the same projected coordinate system as your raster." ) if hasattr(annotations, "total_bounds"): raster_height, raster_width = numpy_image.shape[1], numpy_image.shape[2] ann_bounds = annotations.total_bounds if ( ann_bounds[0] < -raster_width * 0.1 # xmin or ann_bounds[2] > raster_width * 1.1 # xmax or ann_bounds[1] < -raster_height * 0.1 # ymin or ann_bounds[3] > raster_height * 1.1 ): # ymax raise ValueError( f"Annotation bounds {ann_bounds} appear to be outside reasonable range for " f"raster dimensions ({raster_width}, {raster_height}). " "This might indicate your annotations are in a different coordinate system." )
[docs]def process_without_annotations(numpy_image, windows, image_name, save_dir): """Process raster without annotations.""" crop_filenames = [] for index, window in enumerate(windows): crop = numpy_image[window.indices()] if crop.size == 0: continue crop_filename = save_crop(save_dir, image_name, index, crop) crop_filenames.append(crop_filename) return crop_filenames
[docs]def process_with_annotations( numpy_image, windows, image_annotations, image_name, save_dir, allow_empty ): """Process raster with annotations.""" annotations_files = [] crop_filenames = [] image_basename = os.path.splitext(image_name)[0] for index, window in enumerate(windows): crop = numpy_image[window.indices()] if crop.size == 0: continue crop_annotations = select_annotations(image_annotations, window=window) crop_annotations["image_path"] = f"{image_basename}_{index}.png" if crop_annotations.empty: if allow_empty: crop_annotations = create_empty_annotation( image_annotations, image_basename, index ) else: continue annotations_files.append(crop_annotations) crop_filename = save_crop(save_dir, image_name, index, crop) crop_filenames.append(crop_filename) if len(annotations_files) == 0: raise ValueError( f"Input file has no overlapping annotations and allow_empty is {allow_empty}" ) annotations_files = pd.concat(annotations_files) file_path = os.path.join(save_dir, f"{image_basename}.csv") annotations_files.to_csv(file_path, index=False, header=True) return annotations_files
[docs]def create_empty_annotation(image_annotations, image_basename, index): """Create empty annotation record when allow_empty=True.""" geom_type = determine_geometry_type(image_annotations) crop_annotations = pd.DataFrame(columns=image_annotations.columns) crop_annotations.loc[0, "label"] = image_annotations.label.unique()[0] crop_annotations.loc[0, "image_path"] = f"{image_basename}_{index}.png" if geom_type == "box": crop_annotations.loc[0, "xmin"] = 0 crop_annotations.loc[0, "ymin"] = 0 crop_annotations.loc[0, "xmax"] = 0 crop_annotations.loc[0, "ymax"] = 0 elif geom_type == "point": crop_annotations.loc[0, "geometry"] = geometry.Point(0, 0) crop_annotations.loc[0, "x"] = 0 crop_annotations.loc[0, "y"] = 0 elif geom_type == "polygon": crop_annotations.loc[0, "geometry"] = geometry.Polygon([(0, 0), (0, 0), (0, 0)]) crop_annotations.loc[0, "polygon"] = 0 return crop_annotations