# Prediction utilities
import os
import numpy as np
import pandas as pd
import shapely
import torch
from scipy.spatial import cKDTree
from shapely import affinity
from torchvision.ops import nms
from deepforest import distributed, utilities
from deepforest.datasets import cropmodel
from deepforest.utilities import read_file
def _predict_image_(
model,
image: np.ndarray | None = None,
path: str | None = None,
iou_threshold: float = 0.15,
nms_distance_thresh: float = 5.0,
):
"""Predict a single image with a deepforest model.
Args:
model: a deepforest.main.model object
image: a tensor of shape (channels, height, width)
path: optional path to read image from disk instead of passing image arg
iou_threshold: IoU threshold for box non-max suppression
nms_distance_thresh: Distance threshold in pixels for point NMS, see config.point.nms_distance_thresh
Returns:
df: A pandas dataframe of predictions (Default)
img: The input with predictions overlaid (Optional)
"""
image = torch.tensor(image).permute(2, 0, 1)
image = image / 255
with torch.no_grad():
prediction = model(image.unsqueeze(0))
prediction = prediction[0]
geom_type = utilities.determine_geometry_type(prediction)
df = utilities.format_geometry(prediction, geom_type=geom_type)
# return None for no predictions
if df is None:
return None
if geom_type == "box" and df.label.nunique() > 1:
df = across_class_nms(df, iou_threshold=iou_threshold)
elif geom_type == "point":
df = reduce_points(df, nms_thresh=nms_distance_thresh)
# Add image path if provided
if path is not None:
df["image_path"] = os.path.basename(path)
return df
[docs]def translate_predictions(predictions: pd.DataFrame) -> pd.DataFrame:
"""Shift window-relative predictions into image coordinates using geometry.
Args:
predictions: DataFrame with geometry and window_xmin/window_ymin offset columns.
Returns:
DataFrame with geometry (and coordinate columns) shifted by the window origin.
"""
predictions = predictions.copy()
is_box = {"xmin", "ymin", "xmax", "ymax"}.issubset(predictions.columns)
predictions["geometry"] = [
affinity.translate(geom, xoff=dx, yoff=dy)
for geom, dx, dy in zip(
predictions.geometry,
predictions.window_xmin,
predictions.window_ymin,
strict=True,
)
]
if is_box:
bounds = shapely.bounds(np.array(predictions["geometry"]))
predictions[["xmin", "ymin", "xmax", "ymax"]] = bounds.astype(int)
else:
coords = shapely.get_coordinates(np.array(predictions["geometry"]))
predictions["x"] = coords[:, 0]
predictions["y"] = coords[:, 1]
return predictions.drop(columns=["window_xmin", "window_ymin"]).reset_index(drop=True)
[docs]def reduce_boxes(predictions: pd.DataFrame, iou_threshold: float) -> pd.DataFrame:
"""Reduce overlapping box predictions with torchvision NMS.
Args:
predictions: DataFrame of image-space box predictions.
iou_threshold: IoU threshold for NMS.
Returns:
DataFrame containing the filtered box predictions in the public box schema.
"""
box_output_columns = ["xmin", "ymin", "xmax", "ymax", "label", "score"]
if predictions.shape[0] <= 1:
return predictions[box_output_columns].reset_index(drop=True).copy()
print(
f"{predictions.shape[0]} predictions in overlapping windows, applying non-max suppression"
)
boxes = torch.tensor(
predictions[["xmin", "ymin", "xmax", "ymax"]].values, dtype=torch.float32
)
scores = torch.tensor(predictions.score.values, dtype=torch.float32)
keep_idx = nms(boxes=boxes, scores=scores, iou_threshold=iou_threshold).numpy()
filtered_predictions = predictions.iloc[keep_idx].reset_index(drop=True)
print(f"{filtered_predictions.shape[0]} predictions kept after non-max suppression")
return filtered_predictions[box_output_columns].reset_index(drop=True).copy()
[docs]def reduce_points(predictions: pd.DataFrame, nms_thresh: float) -> pd.DataFrame:
"""Reduce nearby point predictions with distance-based suppression.
Args:
predictions: DataFrame of image-space point predictions.
nms_thresh: Distance threshold in pixels used to suppress duplicates.
Returns:
Filtered point predictions with all non-coordinate columns preserved.
"""
predictions = predictions.reset_index(drop=True)
if nms_thresh <= 0 or len(predictions) <= 1:
return predictions
coords = predictions[["x", "y"]].values
scores = predictions["score"].values
tree = cKDTree(coords)
order = np.argsort(scores)[::-1]
kept = np.ones(len(coords), dtype=bool)
for idx in order:
if not kept[idx]:
continue
for neighbor_idx in tree.query_ball_point(coords[idx], r=nms_thresh):
if neighbor_idx != idx:
kept[neighbor_idx] = False
return predictions.iloc[np.flatnonzero(kept)].reset_index(drop=True)
[docs]def mosaic(
predictions: pd.DataFrame,
iou_threshold: float = 0.1,
nms_distance_thresh: float = 5.0,
) -> pd.DataFrame:
"""Mosaic predictions from overlapping windows.
Args:
predictions: A pandas dataframe containing predictions from overlapping windows from a single image.
iou_threshold: The IoU threshold for non-max suppression (box predictions).
nms_distance_thresh: Distance in pixels below which two points are duplicates (point predictions).
Returns:
A pandas dataframe of predictions.
"""
if predictions.empty:
return predictions.copy()
is_box_predictions = {"xmin", "ymin", "xmax", "ymax"}.issubset(predictions.columns)
is_point_predictions = {"x", "y"}.issubset(predictions.columns)
translated_predictions = translate_predictions(predictions)
if is_box_predictions:
return reduce_boxes(translated_predictions, iou_threshold=iou_threshold)
if is_point_predictions:
return reduce_points(translated_predictions, nms_thresh=nms_distance_thresh)
raise ValueError("Predictions must include either box or point coordinates.")
[docs]def across_class_nms(predicted_boxes, iou_threshold=0.15):
"""Perform non-max suppression for a dataframe of results (see
visualize.format_boxes) to remove boxes that overlap by iou_thresholdold of
IoU."""
# Skip NMS if there's is one or less prediction
if predicted_boxes.shape[0] <= 1:
return predicted_boxes
# move prediciton to tensor
boxes = torch.tensor(
predicted_boxes[["xmin", "ymin", "xmax", "ymax"]].values, dtype=torch.float32
)
scores = torch.tensor(predicted_boxes.score.values, dtype=torch.float32)
labels = predicted_boxes.label.values
bbox_left_idx = nms(boxes=boxes, scores=scores, iou_threshold=iou_threshold)
bbox_left_idx = bbox_left_idx.numpy()
new_boxes, new_labels, new_scores = (
boxes[bbox_left_idx].type(torch.int),
labels[bbox_left_idx],
scores[bbox_left_idx],
)
# Recreate box dataframe
image_detections = np.concatenate(
[
new_boxes,
np.expand_dims(new_labels, axis=1),
np.expand_dims(new_scores, axis=1),
],
axis=1,
)
new_df = pd.DataFrame(
image_detections, columns=["xmin", "ymin", "xmax", "ymax", "label", "score"]
)
return new_df
def _flatten_prediction_batches_(batched_results):
"""Flatten prediction batches returned by Lightning predict()."""
flattened = []
for batch in batched_results:
if isinstance(batch, pd.DataFrame):
if not batch.empty:
flattened.append(batch)
continue
for item in batch:
if isinstance(item, pd.DataFrame) and not item.empty:
flattened.append(item)
if not flattened:
return pd.DataFrame()
return pd.concat(flattened, ignore_index=True)
def _dataloader_wrapper_(model, trainer, dataloader, root_dir, crop_model):
"""
Args:
model: deepforest.main object
trainer: a pytorch lightning trainer object
dataloader: pytorch dataloader object
root_dir: directory of images. If none, uses "image_dir" in config
nms_thresh: Non-max suppression threshold, see config.nms_thresh
crop_model: Optional. A list of crop models to be used for prediction.
Returns:
results: pandas dataframe with bounding boxes, label and scores for each image in the csv file
"""
batched_results = trainer.predict(model, dataloader)
results = distributed.gather_dataframe(_flatten_prediction_batches_(batched_results))
if results.empty:
return pd.DataFrame()
# Apply across class NMS for each image
processed_results = []
for image_path in results.image_path.unique():
image_results = results[results.image_path == image_path].copy()
if image_results.label.nunique() > 1:
image_results = across_class_nms(
image_results, iou_threshold=model.config.nms_thresh
)
if crop_model:
# Flag to check if only one model is passed
is_single_model = len(crop_model) == 1
for i, crop_model_item in enumerate(crop_model):
crop_model_results = _predict_crop_model_(
crop_model=crop_model_item,
results=image_results,
path=image_path,
trainer=trainer,
model_index=i,
is_single_model=is_single_model,
)
processed_results.append(crop_model_results)
else:
processed_results.append(image_results)
if processed_results:
results = pd.concat(processed_results, ignore_index=True)
results = read_file(results, root_dir)
return results
def _predict_crop_model_(
crop_model,
trainer,
results,
path,
transform=None,
augmentations=None,
model_index=0,
is_single_model=False,
):
"""Predicts crop model on a raster file.
Args:
crop_model: The crop model to be used for prediction.
trainer: The PyTorch Lightning trainer object for prediction.
results: The results dataframe to store the predicted labels and scores.
path: The path to the raster file.
is_single_model: Boolean flag to determine column naming.
Returns:
The updated results dataframe with predicted labels and scores.
"""
if results.empty:
print("No predictions to run crop model on, returning empty dataframe")
return results
# Remove invalid boxes
results = results[results.xmin != results.xmax]
results = results[results.ymin != results.ymax]
# Get config from crop_model if not using custom transform
resize = None
resize_interpolation = "bilinear"
normalize = None
expand = 0
if transform is None and hasattr(crop_model, "config"):
cropmodel_cfg = crop_model.config.get("cropmodel", {})
resize = cropmodel_cfg.get("resize", [224, 224])
resize_interpolation = cropmodel_cfg.get("resize_interpolation", "bilinear")
norm_transform = crop_model.normalize()
if norm_transform is None:
normalize = False
else:
normalize = norm_transform
expand = cropmodel_cfg.get("expand", 0)
# Create dataset
bounding_box_dataset = cropmodel.BoundingBoxDataset(
results,
root_dir=os.path.dirname(path),
transform=transform,
augmentations=augmentations,
resize=resize,
resize_interpolation=resize_interpolation,
normalize=normalize,
expand=expand,
)
# Create dataloader
crop_dataloader = crop_model.predict_dataloader(bounding_box_dataset)
# Run prediction
crop_results = trainer.predict(crop_model, crop_dataloader)
# Process results
label, score = crop_model.postprocess_predictions(crop_results)
# Determine column names
if is_single_model:
label_column = "cropmodel_label"
score_column = "cropmodel_score"
else:
label_column = f"cropmodel_label_{model_index}"
score_column = f"cropmodel_score_{model_index}"
if crop_model.numeric_to_label_dict is None:
raise ValueError(
f"The numeric_to_label_dict is not set, and the label_dict is "
f"{crop_model.label_dict}, set either when loading CropModel(label_dict=), "
f"which creates the numeric_to_label_dict, or load annotations from CropModel."
f"load_from_disk(), which creates the dictionaries based on file contents."
)
results[label_column] = [crop_model.numeric_to_label_dict[x] for x in label]
results[score_column] = score
return results
def _crop_models_wrapper_(
crop_models, trainer, results, transform=None, augmentations=None
):
if crop_models is not None and not isinstance(crop_models, list):
crop_models = [crop_models]
# Run predictions
crop_results = []
if crop_models:
is_single_model = (
len(crop_models) == 1
) # Flag to check if only one model is passed
for i, crop_model in enumerate(crop_models):
for path in results.image_path.unique():
path = os.path.join(results.root_dir, path)
crop_result = _predict_crop_model_(
crop_model=crop_model,
results=results,
path=path,
trainer=trainer,
model_index=i,
transform=transform,
augmentations=augmentations,
is_single_model=is_single_model,
)
crop_results.append(crop_result)
# Concatenate results
crop_results = pd.concat(crop_results)
return crop_results