"""
geoutils.georaster provides a toolset for working with raster data.
"""
from __future__ import annotations
import copy
import os
import warnings
from collections import abc
from numbers import Number
from typing import IO, Any, Callable, TypeVar, overload
import geopandas as gpd
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pyproj
import rasterio as rio
import rasterio.mask
import rasterio.transform
import rasterio.warp
import rasterio.windows
from affine import Affine
from matplotlib import cm, colors
from rasterio.crs import CRS
from rasterio.features import shapes
from rasterio.io import MemoryFile
from rasterio.plot import show as rshow
from rasterio.warp import Resampling
from scipy.ndimage import map_coordinates
from shapely.geometry.polygon import Polygon
import geoutils.geovector as gv
from geoutils.geovector import Vector
# If python38 or above, Literal is builtin. Otherwise, use typing_extensions
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal # type: ignore
try:
import rioxarray
except ImportError:
_has_rioxarray = False
else:
_has_rioxarray = True
RasterType = TypeVar("RasterType", bound="Raster")
def _resampling_from_str(resampling: str) -> Resampling:
"""
Match a rio.warp.Resampling enum from a string representation.
:param resampling: A case-sensitive string matching the resampling enum (e.g. 'cubic_spline')
:raises ValueError: If no matching Resampling enum was found.
:returns: A rio.warp.Resampling enum that matches the given string.
"""
# Try to match the string version of the resampling method with a rio Resampling enum name
for method in rio.warp.Resampling:
if str(method).replace("Resampling.", "") == resampling:
resampling_method = method
break
# If no match was found, raise an error.
else:
raise ValueError(
f"'{resampling}' is not a valid rasterio.warp.Resampling method. "
f"Valid methods: {[str(method).replace('Resampling.', '') for method in rio.warp.Resampling]}"
)
return resampling_method
# Function to set the default nodata values for any given dtype
# Similar to GDAL for int types, but without absurdly long nodata values for floats.
# For unsigned types, the maximum value is chosen (with a max of 99999).
# For signed types, the minimum value is chosen (with a min of -99999).
def _default_ndv(dtype: str | np.dtype | type) -> int:
"""
Set the default nodata value for any given dtype, when this is not provided.
"""
default_ndv_lookup = {
"uint8": 255,
"int8": -128,
"uint16": 65535,
"int16": -32768,
"uint32": 99999,
"int32": -99999,
"float32": -99999,
"float64": -99999,
"float128": -99999,
}
# Check argument dtype is as expected
if not isinstance(dtype, (str, np.dtype, type)):
raise ValueError(f"dtype {dtype} not understood")
# Convert numpy types to string
if isinstance(dtype, type):
dtype = np.dtype(dtype).name
# Convert np.dtype to string
if isinstance(dtype, np.dtype):
dtype = dtype.name
if dtype in default_ndv_lookup.keys():
return default_ndv_lookup[dtype]
else:
raise NotImplementedError(f"No default nodata value set for dtype {dtype}")
[docs]class Raster:
"""
Create a Raster object from a rasterio-supported raster dataset.
If not otherwise specified below, attribute types and contents correspond
to the attributes defined by rasterio.
Attributes:
filename : str
The path/filename of the loaded, file, only set if a disk-based file is read in.
data : np.array
Loaded image. Dimensions correspond to (bands, height, width).
nbands : int
Number of bands loaded into .data
bands : tuple
The indexes of the opened dataset which correspond to the bands loaded into data.
is_loaded : bool
True if the image data have been loaded into this Raster.
ds : rio.io.DatasetReader
Link to underlying DatasetReader object.
bounds
count
crs
dataset_mask
driver
dtypes
height
indexes
name
nodata
res
shape
transform
width
"""
# This only gets set if a disk-based file is read in.
# If the Raster is created with from_array, from_mem etc, this stays as None.
filename = None
_is_modified: bool | None = None
_disk_hash: int | None = None
# Rasterio-inherited names and types are defined here to get proper type hints.
# Maybe these don't have to be hard-coded in the future?
_data: np.ndarray | np.ma.masked_array
transform: Affine
crs: CRS
nodata: int | float | None
res: tuple[float | int, float | int]
bounds: rio.coords.BoundingBox
height: int
width: int
shape: tuple[int, int]
indexes: list[int]
count: int
dataset_mask: np.ndarray | None
driver: str
dtypes: list[str]
name: str
[docs] def __init__(
self,
filename_or_dataset: str | RasterType | rio.io.DatasetReader | rio.io.MemoryFile,
bands: None | int | list[int] = None,
load_data: bool = True,
downsample: int | float = 1,
masked: bool = True,
nodata: abc.Sequence[int | float] | int | float | None = None,
attrs: list[str] | None = None,
as_memfile: bool = False,
) -> None:
"""
Load a rasterio-supported dataset, given a filename.
:param filename_or_dataset: The filename of the dataset.
:param bands: The band(s) to load into the object. Default is to load all bands.
:param load_data: Load the raster data into the object. Default is True.
:param downsample: Reduce the size of the image loaded by this factor. Default is 1
:param masked: the data is loaded as a masked array, with no data values masked. Default is True.
:param nodata: nodata to be used (overwrites the metadata). Default is None, i.e. reads from metadata.
:param attrs: Additional attributes from rasterio's DataReader class to add to the Raster object.
Default list is ['bounds', 'count', 'crs', 'dataset_mask', 'driver', 'dtypes', 'height', 'indexes',
'name', 'nodata', 'res', 'shape', 'transform', 'width'] - if no attrs are specified, these will be added.
:param as_memfile: open the dataset via a rio.MemoryFile.
:return: A Raster object
"""
# If Raster is passed, simply point back to Raster
if isinstance(filename_or_dataset, Raster):
for key in filename_or_dataset.__dict__:
setattr(self, key, filename_or_dataset.__dict__[key])
return
# Image is a file on disk.
elif isinstance(filename_or_dataset, str):
# Save the absolute on-disk filename
self.filename = os.path.abspath(filename_or_dataset)
if as_memfile:
# open the file in memory
memfile = MemoryFile(open(filename_or_dataset, "rb"))
# Read the file as a rasterio dataset
self.ds = memfile.open()
else:
self.ds = rio.open(filename_or_dataset, "r")
# If rio.Dataset is passed
elif isinstance(filename_or_dataset, rio.io.DatasetReader):
self.filename = filename_or_dataset.files[0]
self.ds = filename_or_dataset
# Or, image is already a Memory File.
elif isinstance(filename_or_dataset, rio.io.MemoryFile):
self.ds = filename_or_dataset.open()
# Provide a catch in case trying to load from data array
elif isinstance(filename_or_dataset, np.ndarray):
raise ValueError("np.array provided as filename. Did you mean to call Raster.from_array(...) instead? ")
# Don't recognise the input, so stop here.
else:
raise ValueError("filename argument not recognised.")
self._read_attrs(attrs)
# Save _masked attribute to be used by self.load()
self._masked = masked
# Check number of bands to be loaded
if bands is None:
nbands = self.count
elif isinstance(bands, int):
nbands = 1
elif isinstance(bands, abc.Iterable):
nbands = len(bands)
# Downsampled image size
if not isinstance(downsample, (int, float)):
raise ValueError("downsample must be of type int or float")
if downsample == 1:
out_shape = (nbands, self.height, self.width)
else:
down_width = int(np.ceil(self.width / downsample))
down_height = int(np.ceil(self.height / downsample))
out_shape = (nbands, down_height, down_width)
if load_data:
self.load(bands=bands, out_shape=out_shape)
self.nbands = self._data.shape[0]
self.is_loaded = True
if isinstance(filename_or_dataset, str):
self._is_modified = False
self._disk_hash = hash((self._data.tobytes(), self.transform, self.crs, self.nodata))
else:
self._data = None
self.nbands = None
self.is_loaded = False
# update attributes when downsample is not 1
if downsample != 1:
# Original attributes
meta = self.ds.meta
# width and height must be same as data
meta.update({"width": down_width, "height": down_height})
# Resolution is set, transform must be updated accordingly
res = tuple(np.asarray(self.res) * downsample)
transform = rio.transform.from_origin(self.bounds.left, self.bounds.top, res[0], res[1])
meta.update({"transform": transform})
# Update metadata
self._update(self.data, metadata=meta)
# Set nodata
if nodata is not None:
self.set_ndv(nodata)
[docs] @classmethod
def from_array(
cls: type[RasterType],
data: np.ndarray | np.ma.masked_array,
transform: tuple[float, ...] | Affine,
crs: CRS | int,
nodata: int | float | None = None,
) -> RasterType:
"""Create a Raster from a numpy array and some geo-referencing information.
:param data: data array
:param transform: the 2-D affine transform for the image mapping.
Either a tuple(x_res, 0.0, top_left_x, 0.0, y_res, top_left_y) or
an affine.Affine object.
:param crs: Coordinate Reference System for image. Either a rasterio CRS,
or the EPSG integer.
:param nodata: nodata value
:returns: A Raster object containing the provided data.
Example:
You have a data array in EPSG:32645. It has a spatial resolution of
30 m in x and y, and its top left corner is X=478000, Y=3108140.
>>> transform = (30.0, 0.0, 478000.0, 0.0, -30.0, 3108140.0)
>>> myim = Raster.from_array(data, transform, 32645)
"""
if not isinstance(transform, Affine):
if isinstance(transform, tuple):
transform = Affine(*transform)
else:
raise ValueError("transform argument needs to be Affine or tuple.")
# Enable shortcut to create CRS from an EPSG ID.
if isinstance(crs, int):
crs = CRS.from_epsg(crs)
# If a 2-D ('single-band') array is passed in, give it a band dimension.
if len(data.shape) < 3:
data = np.expand_dims(data, 0)
# Preserves input mask
if isinstance(data, np.ma.masked_array):
if nodata is None:
if np.sum(data.mask) > 0:
raise ValueError("For masked arrays, a nodata value must be set")
else:
data.data[data.mask] = nodata
# Open handle to new memory file
mfh = MemoryFile()
# Create the memory file
with rio.open(
mfh,
"w",
height=data.shape[1],
width=data.shape[2],
count=data.shape[0],
dtype=data.dtype,
crs=crs,
transform=transform,
nodata=nodata,
driver="GTiff",
) as ds:
ds.write(data)
# Initialise a Raster object created with MemoryFile.
# (i.e., __init__ will now be run.)
return cls(mfh)
def __repr__(self) -> str:
"""Convert object to formal string representation."""
L = [getattr(self, item) for item in self._saved_attrs]
s: str = "{}.{}({})".format(type(self).__module__, type(self).__qualname__, ", ".join(map(str, L)))
return s
def __str__(self) -> str:
"""Provide string of information about Raster."""
return self.info()
def __eq__(self, other: object) -> bool:
"""Check if a Raster's data and georeferencing is equal to another."""
if not isinstance(other, type(self)): # TODO: Possibly add equals to SatelliteImage?
return NotImplemented
return all(
[
np.array_equal(self.data, other.data, equal_nan=True),
self.transform == other.transform,
self.crs == other.crs,
self.nodata == other.nodata,
]
)
def __ne__(self, other: object) -> bool:
return not self.__eq__(other)
def __add__(self: RasterType, other: RasterType | np.ndarray | Number) -> RasterType:
"""
Sum up the data of two rasters or a raster and a numpy array, or a raster and single number.
If other is a Raster, it must have the same data.shape, transform and crs as self.
If other is a np.ndarray, it must have the same shape.
Otherwise, other must be a single number.
"""
# Check that other is of correct type
if not isinstance(other, (Raster, np.ndarray, Number)):
raise ValueError("Addition possible only with a Raster, np.ndarray or single number.")
# Case 1 - other is a Raster
if isinstance(other, Raster):
# Check that both data are loaded
if not (self.is_loaded & other.is_loaded):
raise ValueError("Raster's data must be loaded with self.load().")
# Check that both rasters have the same shape and georeferences
if (self.data.shape == other.data.shape) & (self.transform == other.transform) & (self.crs == other.crs):
pass
else:
raise ValueError("Both rasters must have the same shape, transform and CRS.")
other_data = other.data
# Case 2 - other is a numpy array
elif isinstance(other, np.ndarray):
# Check that both array have the same shape
if self.data.shape == other.shape:
pass
else:
raise ValueError("Both rasters must have the same shape.")
other_data = other
# Case 3 - other is a single number
else:
other_data = other
# Calculate the sum of arrays
data = self.data + other_data
# Save as a new Raster
out_rst = self.from_array(data, self.transform, self.crs, nodata=self.nodata)
return out_rst
def __neg__(self: RasterType) -> RasterType:
"""Return self with self.data set to -self.data"""
out_rst = self.copy()
out_rst.data = -out_rst.data
return out_rst
def __sub__(self, other: Raster | np.ndarray | Number) -> Raster:
"""
Subtract two rasters. Both rasters must have the same data.shape, transform and crs.
"""
if isinstance(other, Raster):
# Need to convert both rasters to a common type before doing the negation
ctype: np.dtype = np.find_common_type([*self.dtypes, *other.dtypes], [])
other = other.astype(ctype) # type: ignore
return self + -other # type: ignore
@overload
def astype(self, dtype: np.dtype | type | str, inplace: Literal[False]) -> Raster:
...
@overload
def astype(self, dtype: np.dtype | type | str, inplace: Literal[True]) -> None:
...
[docs] def astype(self, dtype: np.dtype | type | str, inplace: bool = False) -> Raster | None:
"""
Converts the data type of a Raster object.
:param dtype: Any numpy dtype or string accepted by numpy.astype
:param inplace: Set to True to modify the raster in place.
:returns: the output Raster with dtype changed.
"""
# Check that dtype is supported by rasterio
if not rio.dtypes.check_dtype(dtype):
raise TypeError(f"{dtype} is not supported by rasterio")
# Check that data type change will not result in a loss of information
if not rio.dtypes.can_cast_dtype(self.data, dtype):
warnings.warn(
"dtype conversion will result in a loss of information. "
f"{rio.dtypes.get_minimum_dtype(self.data)} is the minimum type to represent the data."
)
out_data = self.data.astype(dtype)
if inplace:
meta = self.ds.meta
meta.update({"dtype": dtype})
self._update(imgdata=out_data, metadata=meta)
return None
else:
return self.from_array(out_data, self.transform, self.crs, nodata=self.nodata)
def _get_rio_attrs(self) -> list[str]:
"""Get the attributes that have the same name in rio.DatasetReader and Raster."""
rio_attrs: list[str] = []
for attr in Raster.__annotations__.keys():
if "__" in attr or attr not in dir(self.ds):
continue
rio_attrs.append(attr)
return rio_attrs
def _read_attrs(self, attrs: list[str] | str | None = None) -> None:
# Copy most used attributes/methods
rio_attrs = self._get_rio_attrs()
for attr in self.__annotations__.keys():
if "__" in attr or attr not in dir(self.ds):
continue
rio_attrs.append(attr)
if attrs is None:
self._saved_attrs = rio_attrs
attrs = rio_attrs
else:
if isinstance(attrs, str):
attrs = [attrs]
for attr in rio_attrs:
if attr not in attrs:
attrs.append(attr)
self._saved_attrs = attrs
for attr in attrs:
setattr(self, attr, getattr(self.ds, attr))
@property
def is_modified(self) -> bool:
"""Check whether file has been modified since it was created/opened.
:returns: True if Raster has been modified.
"""
if not self._is_modified:
new_hash = hash((self._data.tobytes(), self.transform, self.crs, self.nodata))
self._is_modified = not (self._disk_hash == new_hash)
return self._is_modified
@property
def data(self) -> np.ndarray | np.ma.masked_array:
"""
Get data.
:returns: data array.
"""
return self._data
@data.setter
def data(self, new_data: np.ndarray | np.ma.masked_array) -> None:
"""
Set the contents of .data.
new_data must have the same shape as existing data! (bands dimension included)
:param new_data: New data to assign to this instance of Raster
"""
# Check that new_data is a Numpy array
if not isinstance(new_data, np.ndarray):
raise ValueError("New data must be a numpy array.")
# Check that new_data has correct shape
if self.is_loaded:
orig_shape = self._data.shape
else:
orig_shape = (self.count, self.height, self.width)
if new_data.shape != orig_shape:
raise ValueError(f"New data must be of the same shape as existing data: {orig_shape}.")
# Check that new_data has the right type
if new_data.dtype != self._data.dtype:
raise ValueError(
"New data must be of the same type as existing\
data: {}".format(
self.data.dtype
)
)
self._data = new_data
def _update(
self,
imgdata: np.ndarray | None = None,
metadata: dict[str, Any] | None = None,
vrt_to_driver: str = "GTiff",
) -> None:
"""
Update the object with a new image or metadata.
:param imgdata: image data to update with.
:param metadata: metadata to update with.
:param vrt_to_driver: name of driver to coerce a VRT to. This is required
because rasterio does not support writing to to a VRTSourcedRasterBand.
"""
memfile = MemoryFile()
if imgdata is None:
imgdata = self.data
if metadata is None:
metadata = self.ds.meta
if metadata["driver"] == "VRT":
metadata["driver"] = vrt_to_driver
with memfile.open(**metadata) as ds:
ds.write(imgdata)
self.ds = memfile.open()
self._read_attrs()
if self.is_loaded:
self.load()
self._is_modified = True
[docs] def info(self, stats: bool = False) -> str:
"""
Returns string of information about the raster (filename, coordinate system, number of columns/rows, etc.).
:param stats: Add statistics for each band of the dataset (max, min, median, mean, std. dev.). Default is to
not calculate statistics.
:returns: text information about Raster attributes.
"""
as_str = [
f"Driver: {self.driver} \n",
f"Opened from file: {self.filename} \n",
f"Filename: {self.name} \n",
f"Raster modified since disk load? {self._is_modified} \n",
f"Size: {self.width}, {self.height}\n",
f"Number of bands: {self.count:d}\n",
f"Data types: {self.dtypes}\n",
f"Coordinate System: EPSG:{self.crs.to_epsg()}\n",
f"NoData Value: {self.nodata}\n",
"Pixel Size: {}, {}\n".format(*self.res),
"Upper Left Corner: {}, {}\n".format(*self.bounds[:2]),
"Lower Right Corner: {}, {}\n".format(*self.bounds[2:]),
]
if stats:
if self.data is not None:
if self.nbands == 1:
as_str.append(f"[MAXIMUM]: {np.nanmax(self.data):.2f}\n")
as_str.append(f"[MINIMUM]: {np.nanmin(self.data):.2f}\n")
as_str.append(f"[MEDIAN]: {np.ma.median(self.data):.2f}\n")
as_str.append(f"[MEAN]: {np.nanmean(self.data):.2f}\n")
as_str.append(f"[STD DEV]: {np.nanstd(self.data):.2f}\n")
else:
for b in range(self.nbands):
# try to keep with rasterio convention.
as_str.append(f"Band {b + 1}:")
as_str.append(f"[MAXIMUM]: {np.nanmax(self.data[b, :, :]):.2f}\n")
as_str.append(f"[MINIMUM]: {np.nanmin(self.data[b, :, :]):.2f}\n")
as_str.append(f"[MEDIAN]: {np.ma.median(self.data[b, :, :]):.2f}\n")
as_str.append(f"[MEAN]: {np.nanmean(self.data[b, :, :]):.2f}\n")
as_str.append(f"[STD DEV]: {np.nanstd(self.data[b, :, :]):.2f}\n")
return "".join(as_str)
[docs] def copy(self: RasterType, new_array: np.ndarray | None = None) -> RasterType:
"""
Copy the Raster object in memory
:param new_array: New array to use for the copied Raster
:return:
"""
if new_array is not None:
data = new_array
else:
data = self.data
cp = self.from_array(data=data, transform=self.transform, crs=self.crs, nodata=self.nodata)
return cp
@property
def __array_interface__(self) -> dict[str, Any]:
if self._data is None:
self.load()
return self._data.__array_interface__ # type: ignore
[docs] def load(self, bands: int | list[int] | None = None, **kwargs: Any) -> None:
r"""
Load specific bands of the dataset, using rasterio.read().
Ensure that self.data.ndim = 3 for ease of use (needed e.g. in show)
:param bands: The band(s) to load. Note that rasterio begins counting at 1, not 0.
\*\*kwargs: any additional arguments to rasterio.io.DatasetReader.read.
Useful ones are:
.. hlist::
* out_shape : to load a subsampled version
* window : to load a cropped version
* resampling : to set the resampling algorithm
"""
if bands is None:
self._data = self.ds.read(masked=self._masked, **kwargs)
bands = self.ds.indexes
else:
self._data = self.ds.read(bands, masked=self._masked, **kwargs)
if type(bands) is int:
bands = bands
# If ndim is 2, expand to 3
if self._data.ndim == 2:
self._data = np.expand_dims(self._data, 0)
self.nbands = self._data.shape[0]
self.is_loaded = True
self.bands = bands
[docs] def crop(
self: RasterType,
cropGeom: Raster | Vector | list[float] | tuple[float, ...],
mode: str = "match_pixel",
inplace: bool = True,
) -> RasterType | None:
"""
Crop the Raster to a given extent.
:param cropGeom: Geometry to crop raster to, as either a Raster object, a Vector object, or a list of
coordinates. If cropGeom is a Raster, crop() will crop to the boundary of the raster as returned by
Raster.ds.bounds. If cropGeom is a Vector, crop() will crop to the bounding geometry. If cropGeom is a
list of coordinates, the order is assumed to be [xmin, ymin, xmax, ymax].
:param mode: one of 'match_pixel' (default) or 'match_extent'. 'match_pixel' will preserve the original pixel
resolution, cropping to the extent that most closely aligns with the current coordinates. 'match_extent'
will match the extent exactly, adjusting the pixel resolution to fit the extent.
:param inplace: Update the raster inplace or return copy.
:returns: None if inplace=True and a new Raster if inplace=False
"""
assert mode in [
"match_extent",
"match_pixel",
], "mode must be one of 'match_pixel', 'match_extent'"
if isinstance(cropGeom, (Raster, Vector)):
xmin, ymin, xmax, ymax = cropGeom.bounds
elif isinstance(cropGeom, (list, tuple)):
xmin, ymin, xmax, ymax = cropGeom
else:
raise ValueError("cropGeom must be a Raster, Vector, or list of coordinates.")
meta = copy.copy(self.ds.meta)
if mode == "match_pixel":
crop_bbox = Polygon([(xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax)])
crop_img, tfm = rio.mask.mask(self.ds, [crop_bbox], crop=True, all_touched=True)
meta.update(
{
"height": crop_img.shape[1],
"width": crop_img.shape[2],
"transform": tfm,
}
)
else:
window = rio.windows.from_bounds(xmin, ymin, xmax, ymax, transform=self.transform)
new_height = int(window.height)
new_width = int(window.width)
new_tfm = rio.transform.from_bounds(xmin, ymin, xmax, ymax, width=new_width, height=new_height)
if self.is_loaded:
new_img = np.zeros((self.nbands, new_height, new_width), dtype=self.data.dtype)
else:
new_img = np.zeros((self.count, new_height, new_width), dtype=self.data.dtype)
crop_img, tfm = rio.warp.reproject(
self.data,
new_img,
src_transform=self.transform,
dst_transform=new_tfm,
src_crs=self.crs,
dst_crs=self.crs,
)
meta.update({"height": new_height, "width": new_width, "transform": tfm})
if inplace:
self._update(crop_img, meta)
return None
else:
return self.from_array(crop_img, meta["transform"], meta["crs"], meta["nodata"])
[docs] def reproject(
self: RasterType,
dst_ref: RasterType | rio.io.Dataset | str | None = None,
dst_crs: CRS | str | None = None,
dst_size: tuple[int, int] | None = None,
dst_bounds: dict[str, float] | rio.coords.BoundingBox | None = None,
dst_res: float | abc.Iterable[float] | None = None,
dst_nodata: int | float | None = None,
src_nodata: int | float | None = None,
dtype: np.dtype | None = None,
resampling: Resampling | str = Resampling.nearest,
silent: bool = False,
n_threads: int = 0,
memory_limit: int = 64,
) -> RasterType:
"""
Reproject raster to a specified grid.
The output grid can either be given by a reference Raster (using `dst_ref`),
or by manually providing the output CRS (`dst_crs`), dimensions (`dst_size`),
resolution (with `dst_size`) and/or bounds (`dst_bounds`).
Any resampling algorithm implemented in rasterio can be used.
Currently: requires image data to have been loaded into memory.
NOT SUITABLE for large datasets yet! This requires work...
To reproject a Raster with different source bounds, first run Raster.crop.
:param dst_ref: a reference raster. If set will use the attributes of this
raster for the output grid. Can be provided as Raster/rasterio data set or as path to the file.
:param crs: Specify the Coordinate Reference System to reproject to. If dst_ref not set, defaults to self.crs.
:param dst_size: Raster size to write to (x, y). Do not use with dst_res.
:param dst_bounds: a BoundingBox object or a dictionary containing\
left, bottom, right, top bounds in the source CRS.
:param dst_res: Pixel size in units of target CRS. Either 1 value or (xres, yres). Do not use with dst_size.
:param dst_nodata: nodata value of the destination. If set to None, will use the same as source, \
and if source is None, will use GDAL's default.
:param src_nodata: nodata value of the source. If set to None, will read from the metadata.
:param resampling: A rasterio Resampling method
:param silent: If True, will not print warning statements
:param n_threads: The number of worker threads. Defaults to (os.cpu_count() - 1).
:param memory_limit: The warp operation memory limit in MB. Larger values may perform better.
:returns: Raster
"""
# Check that either dst_ref or dst_crs is provided
if dst_ref is not None:
if dst_crs is not None:
raise ValueError("Either of `dst_ref` or `dst_crs` must be set. Not both.")
else:
# In case dst_res or dst_size is set, use original CRS
if dst_crs is None:
dst_crs = self.crs
# Case a raster is provided as reference
if dst_ref is not None:
# Check that dst_ref type is either str, Raster or rasterio data set
# Preferably use Raster instance to avoid rasterio data set to remain open. See PR #45
if isinstance(dst_ref, Raster):
ds_ref = dst_ref
elif isinstance(dst_ref, rio.io.MemoryFile) or isinstance(dst_ref, rasterio.io.DatasetReader):
ds_ref = dst_ref
elif isinstance(dst_ref, str):
assert os.path.exists(dst_ref), "Reference raster does not exist"
ds_ref = Raster(dst_ref, load_data=False)
else:
raise ValueError(
"Type of dst_ref not understood, must be path to file (str), Raster or rasterio data set"
)
# Read reprojecting params from ref raster
dst_crs = ds_ref.crs
dst_size = (ds_ref.width, ds_ref.height)
dst_res = None
dst_bounds = ds_ref.bounds
else:
# Determine target CRS
dst_crs = CRS.from_user_input(dst_crs)
# Set output dtype
if dtype is None:
# Warning: this will not work for multiple bands with different dtypes
dtype = self.dtypes[0]
# Set source nodata if provided
if src_nodata is None:
src_nodata = self.nodata
# Set destination nodata if provided. This is needed in areas not covered by the input data.
# If None, will use GeoUtils' default, as rasterio's default is unknown, hence cannot be handled properly.
if dst_nodata is None:
dst_nodata = self.nodata
if dst_nodata is None:
dst_nodata = _default_ndv(dtype)
# Basic reprojection options, needed in all cases.
reproj_kwargs = {
"src_transform": self.transform,
"src_crs": self.crs,
"dst_crs": dst_crs,
"resampling": resampling if isinstance(resampling, Resampling) else _resampling_from_str(resampling),
"src_nodata": src_nodata,
"dst_nodata": dst_nodata,
}
# If dst_ref is None, check other input arguments
if dst_size is not None and dst_res is not None:
raise ValueError("dst_size and dst_res both specified. Specify only one.")
# Create a BoundingBox if required
if dst_bounds is not None:
if not isinstance(dst_bounds, rio.coords.BoundingBox):
dst_bounds = rio.coords.BoundingBox(
dst_bounds["left"],
dst_bounds["bottom"],
dst_bounds["right"],
dst_bounds["top"],
)
# Determine target raster size/resolution
dst_transform = None
if dst_res is not None:
if dst_bounds is None:
# Let rasterio determine the maximum bounds of the new raster.
reproj_kwargs.update({"dst_resolution": dst_res})
else:
# Bounds specified. First check if xres and yres are different.
if isinstance(dst_res, tuple):
xres = dst_res[0]
yres = dst_res[1]
else:
xres = dst_res
yres = dst_res
# Calculate new raster size which ensures that pixels have
# precisely the resolution specified.
dst_width = np.ceil((dst_bounds.right - dst_bounds.left) / xres)
dst_height = np.ceil(np.abs(dst_bounds.bottom - dst_bounds.top) / yres)
dst_size = (int(dst_width), int(dst_height))
# As a result of precise pixel size, the destination bounds may
# have to be adjusted.
x1 = dst_bounds.left + (xres * dst_width)
y1 = dst_bounds.top - (yres * dst_height)
dst_bounds = rio.coords.BoundingBox(top=dst_bounds.top, left=dst_bounds.left, bottom=y1, right=x1)
# Set output shape (Note: dst_size is (ncol, nrow))
if dst_size is not None:
dst_shape = (self.count, dst_size[1], dst_size[0])
dst_data = np.ones(dst_shape, dtype=dtype)
reproj_kwargs.update({"destination": dst_data})
else:
dst_shape = (self.count, self.height, self.width)
# If dst_bounds is set, will enforce dst_bounds
if dst_bounds is not None:
if dst_size is None:
# Calculate new raster size which ensures that pixels resolution is as close as possible to original
# Raster size is increased by up to one pixel if needed
yres, xres = self.res
dst_width = int(np.ceil((dst_bounds.right - dst_bounds.left) / xres))
dst_height = int(np.ceil(np.abs(dst_bounds.bottom - dst_bounds.top) / yres))
dst_size = (dst_width, dst_height)
# Calculate associated transform
dst_transform = rio.transform.from_bounds(*dst_bounds, width=dst_size[0], height=dst_size[1])
# Specify the output bounds and shape, let rasterio handle the rest
reproj_kwargs.update({"dst_transform": dst_transform})
dst_data = np.ones((dst_size[1], dst_size[0]), dtype=dtype)
reproj_kwargs.update({"destination": dst_data})
# Check that reprojection is actually needed
# Caution, dst_size is (width, height) while shape is (height, width)
if all(
[
(dst_transform == self.transform) or (dst_transform is None),
(dst_crs == self.crs) or (dst_crs is None),
(dst_size == self.shape[::-1]) or (dst_size is None),
(dst_res == self.res) or (dst_res == self.res[0] == self.res[1]) or (dst_res is None),
]
):
if (dst_nodata == self.nodata) or (dst_nodata is None):
if not silent:
warnings.warn("Output projection, bounds and size are identical -> return self (not a copy!)")
return self
elif dst_nodata is not None:
if not silent:
warnings.warn(
"Only nodata is different, consider using the 'set_ndv()' method instead'\
' -> return self (not a copy!)"
)
return self
# Set the performance keywords
if n_threads == 0:
# Default to cpu count minus one. If the cpu count is undefined, num_threads will be 1
cpu_count = os.cpu_count() or 2
num_threads = cpu_count - 1
else:
num_threads = n_threads
reproj_kwargs.update({"num_threads": num_threads, "warp_mem_limit": memory_limit})
# Currently reprojects all in-memory bands at once.
# This may need to be improved to allow reprojecting from-disk.
# See rio.warp.reproject docstring for more info.
dst_data, dst_transformed = rio.warp.reproject(self.data, **reproj_kwargs)
# Enforce output type
dst_data = dst_data.astype(dtype)
# Check for funny business.
if dst_transform is not None:
assert dst_transform == dst_transformed
# Write results to a new Raster.
dst_r = self.from_array(dst_data, dst_transformed, dst_crs, dst_nodata)
return dst_r
[docs] def shift(self, xoff: float, yoff: float) -> None:
"""
Translate the Raster by a given x,y offset.
:param xoff: Translation x offset.
:param yoff: Translation y offset.
"""
# Check that data is loaded, as it is necessary for this method
assert self.is_loaded, "Data must be loaded, use self.load"
meta = self.ds.meta
dx, b, xmin, d, dy, ymax = list(self.transform)[:6]
meta.update({"transform": rio.transform.Affine(dx, b, xmin + xoff, d, dy, ymax + yoff)})
self._update(metadata=meta)
[docs] def set_ndv(self, ndv: abc.Sequence[int | float] | int | float, update_array: bool = False) -> None:
"""
Set new nodata values for bands (and possibly update arrays).
:param ndv: nodata values
:param update_array: change the existing nodata in array
"""
if not isinstance(ndv, (abc.Sequence, int, float, np.integer, np.floating)):
raise ValueError("Type of ndv not understood, must be list or float or int")
elif (isinstance(ndv, (int, float, np.integer, np.floating))) and self.count > 1:
print("Several raster band: using nodata value for all bands")
ndv = [ndv] * self.count
elif isinstance(ndv, abc.Sequence) and self.count == 1:
print("Only one raster band: using first nodata value provided")
ndv = list(ndv)[0]
# Check that ndv has same length as number of bands in self
if isinstance(ndv, abc.Sequence):
if len(ndv) != self.count:
raise ValueError(f"Length of ndv ({len(ndv)}) incompatible with number of bands ({self.count})")
# Check that ndv value is compatible with dtype
for k in range(len(ndv)):
if not rio.dtypes.can_cast_dtype(ndv[k], self.dtypes[k]):
raise ValueError(f"ndv value {ndv[k]} incompatible with self.dtype {self.dtypes[k]}")
else:
if not rio.dtypes.can_cast_dtype(ndv, self.dtypes[0]):
raise ValueError(f"ndv value {ndv} incompatible with self.dtype {self.dtypes[0]}")
meta = self.ds.meta
imgdata = self.data
pre_ndv = self.nodata
meta.update({"nodata": ndv})
if update_array and pre_ndv is not None:
# nodata values are specific to each band
# let's do a loop then
if self.count == 1:
if np.ma.isMaskedArray(imgdata):
imgdata.data[imgdata.mask] = ndv # type: ignore
else:
ind = imgdata[:] == pre_ndv
imgdata[ind] = ndv
else:
# At this point, ndv is definitely iterable, but mypy doesn't understand that.
for i in range(self.count):
if np.ma.isMaskedArray(imgdata):
imgdata.data[i, imgdata.mask[i, :]] = ndv[i] # type: ignore
else:
ind = imgdata[i, :] == pre_ndv[i] # type: ignore
imgdata[i, ind] = ndv[i] # type: ignore
else:
imgdata = None
self._update(metadata=meta, imgdata=imgdata)
[docs] def save(
self,
filename: str | IO[bytes],
driver: str = "GTiff",
dtype: np.dtype | None = None,
compress: str = "deflate",
tiled: bool = False,
blank_value: None | int | float = None,
co_opts: dict[str, str] | None = None,
metadata: dict[str, Any] | None = None,
gcps: list[tuple[float, ...]] | None = None,
gcps_crs: CRS | None = None,
) -> None:
"""Write the Raster to a geo-referenced file.
Given a filename to save the Raster to, create a geo-referenced file
on disk which contains the contents of self.data.
If blank_value is set to an integer or float, then instead of writing
the contents of self.data to disk, write this provided value to every
pixel instead.
:param filename: Filename to write the file to.
:param driver: the 'GDAL' driver to use to write the file as.
:param dtype: Data Type to write the image as (defaults to dtype of image data)
:param compress: Compression type. Defaults to 'deflate' (equal to GDALs: COMPRESS=DEFLATE)
:param tiled: Whether to write blocks in tiles instead of strips. Improves read performance on large files,
but increases file size.
:param blank_value: Use to write an image out with every pixel's value
corresponding to this value, instead of writing the image data to disk.
:param co_opts: GDAL creation options provided as a dictionary,
e.g. {'TILED':'YES', 'COMPRESS':'LZW'}
:param metadata: pairs of metadata key, value
:param gcps: list of gcps, each gcp being [row, col, x, y, (z)]
:param gcps_crs: the CRS of the GCPS (Default is None)
:returns: None.
"""
dtype = self.data.dtype if dtype is None else dtype
if co_opts is None:
co_opts = {}
if metadata is None:
metadata = {}
if gcps is None:
gcps = []
if (self.data is None) & (blank_value is None):
raise AttributeError("No data loaded, and alternative blank_value not set.")
elif blank_value is not None:
if isinstance(blank_value, int) | isinstance(blank_value, float):
save_data = np.zeros((self.ds.count, self.ds.height, self.ds.width))
save_data[:, :, :] = blank_value
else:
raise ValueError("blank_values must be one of int, float (or None).")
else:
save_data = self.data
with rio.open(
filename,
"w",
driver=driver,
height=self.ds.height,
width=self.ds.width,
count=self.ds.count,
dtype=save_data.dtype,
crs=self.ds.crs,
transform=self.ds.transform,
nodata=self.ds.nodata,
compress=compress,
tiled=tiled,
**co_opts,
) as dst:
dst.write(save_data)
# Add metadata (tags in rio)
dst.update_tags(**metadata)
# Save GCPs
if not isinstance(gcps, list):
raise ValueError("gcps must be a list")
if len(gcps) > 0:
rio_gcps = []
for gcp in gcps:
rio_gcps.append(rio.control.GroundControlPoint(*gcp))
# Warning: this will overwrite the transform
if dst.transform != rio.transform.Affine(1, 0, 0, 0, 1, 0):
warnings.warn(
"A geotransform previously set is going \
to be cleared due to the setting of GCPs."
)
dst.gcps = (rio_gcps, gcps_crs)
[docs] def to_xarray(self, name: str | None = None) -> rioxarray.DataArray:
"""Convert this Raster into an xarray DataArray using rioxarray.
This method uses rioxarray to generate a DataArray with associated
geo-referencing information.
See the documentation of rioxarray and xarray for more information on
the methods and attributes of the resulting DataArray.
:param name: Set the name of the DataArray.
:returns: xarray DataArray
"""
if not _has_rioxarray:
raise ImportError("rioxarray is required for this functionality.")
xr = rioxarray.open_rasterio(self.ds)
if name is not None:
xr.name = name
return xr
[docs] def get_bounds_projected(self, out_crs: CRS, densify_pts_max: int = 5000) -> rio.coords.BoundingBox:
"""
Return self's bounds in the given CRS.
:param out_crs: Output CRS
:param densify_pts_max: Maximum points to be added between image corners to account for non linear edges.
Reduce if time computation is really critical (ms) or increase if extent is \
not accurate enough.
"""
# Max points to be added between image corners to account for non linear edges
# rasterio's default is a bit low for very large images
# instead, use image dimensions, with a maximum of 50000
densify_pts = min(max(self.width, self.height), densify_pts_max)
# Calculate new bounds
left, bottom, right, top = self.bounds
new_bounds = rio.warp.transform_bounds(self.crs, out_crs, left, bottom, right, top, densify_pts)
return new_bounds
[docs] def intersection(self, rst: str | Raster) -> tuple[float, float, float, float]:
"""
Returns the bounding box of intersection between this image and another.
If the rasters have different projections, the intersection extent is given in self's projection system.
:param rst : path to the second image (or another Raster instance)
:returns: extent of the intersection between the 2 images \
(xmin, ymin, xmax, ymax) in self's coordinate system.
"""
from geoutils import projtools
# If input rst is string, open as Raster
if isinstance(rst, str):
rst = Raster(rst, load_data=False)
# Check if both files have the same projection
# To be implemented
same_proj = True
# Find envelope of rasters' intersections
poly1 = projtools.bounds2poly(self.bounds)
# poly1.AssignSpatialReference(self.crs)
# Create a polygon of the envelope of the second image
poly2 = projtools.bounds2poly(rst.bounds)
# poly2.AssignSpatialReference(rst.srs)
# If coordinate system is different, reproject poly2 into poly1
if not same_proj:
raise NotImplementedError()
# Compute intersection envelope
intersect = poly1.intersection(poly2)
extent: tuple[float, float, float, float] = intersect.envelope.bounds
# check that intersection is not void
if intersect.area == 0:
warnings.warn("Warning: Intersection is void")
return (0.0, 0.0, 0.0, 0.0)
return extent
[docs] def show(
self,
band: int | None = None,
cmap: matplotlib.colors.Colormap | str | None = None,
vmin: float | int | None = None,
vmax: float | int | None = None,
cb_title: str | None = None,
add_cb: bool = True,
ax: matplotlib.axes.Axes | None = None,
**kwargs: Any,
) -> None | tuple[matplotlib.axes.Axes, matplotlib.colors.Colormap]:
r"""Show/display the image, with axes in projection of image.
This method is a wrapper to rasterio.plot.show. Any \*\*kwargs which
you give this method will be passed to rasterio.plot.show.
:param band: which band to plot, from 0 to self.count-1 (default is all)
:param cmap: The figure's colormap. Default is plt.rcParams['image.cmap']
:param vmin: Colorbar minimum value. Default is data min.
:param vmax: Colorbar maximum value. Default is data min.
:param cb_title: Colorbar label. Default is None.
:param add_cb: Set to True to display a colorbar. Default is True.
:param ax: A figure ax to be used for plotting. If None, will create default figure and axes,\
and plot figure directly.
:returns: if ax is not None, returns (ax, cbar) where cbar is the colorbar (None if add_cb is False)
You can also pass in \*\*kwargs to be used by the underlying imshow or
contour methods of matplotlib. The example below shows provision of
a kwarg for rasterio.plot.show, and a kwarg for matplotlib as well::
import matplotlib.pyplot as plt
ax1 = plt.subplot(111)
mpl_kws = {'cmap':'seismic'}
myimage.show(ax=ax1, mpl_kws)
"""
# If data is not loaded, need to load it
if not self.is_loaded:
self.load()
# Check if specific band selected, or take all
# rshow takes care of image dimensions
# if self.count=3 (4) => plotted as RGB(A)
if band is None:
band = np.arange(self.count)
elif isinstance(band, int):
if band >= self.count:
raise ValueError(f"band must be in range 0-{self.count - 1:d}")
pass
else:
raise ValueError("band must be int or None")
# If multiple bands (RGB), cbar does not make sense
if isinstance(band, abc.Sequence):
if len(band) > 1:
add_cb = False
# Create colorbar
# Use rcParam default
if cmap is None:
cmap = plt.get_cmap(plt.rcParams["image.cmap"])
elif isinstance(cmap, str):
cmap = plt.get_cmap(cmap)
elif isinstance(cmap, matplotlib.colors.Colormap):
pass
# Set colorbar min/max values (needed for ScalarMappable)
if vmin is None:
vmin = np.nanmin(self.data[band, :, :])
if vmax is None:
vmax = np.nanmax(self.data[band, :, :])
# Make sure they are numbers, to avoid mpl error
try:
vmin = float(vmin)
vmax = float(vmax)
except ValueError:
raise ValueError("vmin or vmax cannot be converted to float")
# Create axes
if ax is None:
fig, ax0 = plt.subplots()
elif isinstance(ax, matplotlib.axes.Axes):
ax0 = ax
fig = ax.figure
else:
raise ValueError("ax must be a matplotlib.axes.Axes instance or None")
# Use data array directly, as rshow on self.ds will re-load data
rshow(
self.data[band, :, :],
transform=self.transform,
ax=ax0,
cmap=cmap,
vmin=vmin,
vmax=vmax,
**kwargs,
)
# Add colorbar
if add_cb:
cbar = fig.colorbar(
cm.ScalarMappable(norm=colors.Normalize(vmin=vmin, vmax=vmax), cmap=cmap),
ax=ax0,
)
if cb_title is not None:
cbar.set_label(cb_title)
else:
cbar = None
# If ax not set, figure should be plotted directly
if ax is None:
plt.show()
return None
return ax0, cbar
[docs] def value_at_coords(
self,
x: float | list[float],
y: float | list[float],
latlon: bool = False,
band: int | None = None,
masked: bool = False,
window: int | None = None,
return_window: bool = False,
boundless: bool = True,
reducer_function: Callable[[np.ndarray], float] = np.ma.mean,
) -> Any:
""" Extract the pixel value(s) at the nearest pixel(s) from the specified coordinates.
Extract pixel value of each band in dataset at the specified
coordinates. Alternatively, if band is specified, return only that
band's pixel value.
Optionally, return mean of pixels within a square window.
:param x: x (or longitude) coordinate.
:param y: y (or latitude) coordinate.
:param latlon: Set to True if coordinates provided as longitude/latitude.
:param band: the band number to extract from.
:param masked: If `masked` is `True` the return value will be a masked
array. Otherwise (the default) the return value will be a
regular array.
:param window: expand area around coordinate to dimensions \
window * window. window must be odd.
:param return_window: If True when window=int, returns (mean,array) \
where array is the dataset extracted via the specified window size.
:param boundless: If `True`, windows that extend beyond the dataset's extent
are permitted and partially or completely filled arrays (with self.nodata) will
be returned as appropriate.
:param reducer_function: a function to apply to the values in window.
:returns: When called on a Raster or with a specific band \
set, return value of pixel.
:returns: If multiple band Raster and the band is not specified, a \
dictionary containing the value of the pixel in each band.
:returns: In addition, if return_window=True, return tuple of \
(values, arrays)
:examples:
>>> self.value_at_coords(-48.125,67.8901,window=3)
Returns mean of a 3*3 window:
v v v \
v c v | = float(mean)
v v v /
(c = provided coordinate, v= value of surrounding coordinate)
"""
value: float | dict[int, float] | tuple[float | dict[int, float] | tuple[list[float], np.ndarray] | Any]
if window is not None:
if window % 2 != 1:
raise ValueError("Window must be an odd number.")
def format_value(value: Any) -> Any:
"""Check if valid value has been extracted"""
if type(value) in [np.ndarray, np.ma.core.MaskedArray]:
if window is not None:
value = reducer_function(value.flatten())
else:
value = value[0, 0]
else:
value = None
return value
# Need to implement latlon option later
if latlon:
from geoutils import projtools
x, y = projtools.reproject_from_latlon((y, x), self.crs)
# Convert coordinates to pixel space
row, col = self.ds.index(x, y, op=round)
# Decide what pixel coordinates to read:
if window is not None:
half_win = (window - 1) / 2
# Subtract start coordinates back to top left of window
col = col - half_win
row = row - half_win
# Offset to read to == window
width = window
height = window
else:
# Start reading at col,row and read 1px each way
width = 1
height = 1
# Make sure coordinates are int
col = int(col)
row = int(row)
# Create rasterio's window for reading
window = rio.windows.Window(col, row, width, height)
# Get values for all bands
if band is None:
# Deal with single band case
if self.nbands == 1:
data = self.ds.read(
window=window,
fill_value=self.nodata,
boundless=boundless,
masked=masked,
)
value = format_value(data)
win = data
# Deal with multiband case
else:
value = {}
win = {}
for b in self.indexes:
data = self.ds.read(
window=window,
fill_value=self.nodata,
boundless=boundless,
indexes=b,
masked=masked,
)
val = format_value(data)
# Store according to GDAL band numbers
value[b] = val
win[b] = data
# Or just for specified band in multiband case
elif isinstance(band, int):
data = self.ds.read(
window=window,
fill_value=self.nodata,
boundless=boundless,
indexes=band,
masked=masked,
)
value = format_value(data)
else:
raise ValueError("Value provided for band was not int or None.")
if return_window:
return (value, win)
return value
[docs] def coords(self, offset: str = "corner", grid: bool = True) -> tuple[np.ndarray, np.ndarray]:
"""
Get x,y coordinates of all pixels in the raster.
:param offset: coordinate type. If 'corner', returns corner coordinates of pixels.
If 'center', returns center coordinates. Default is corner.
:param grid: Return grid
:returns x,y: numpy arrays corresponding to the x,y coordinates of each pixel.
"""
assert offset in [
"corner",
"center",
], f"ctype is not one of 'corner', 'center': {offset}"
dx = self.res[0]
dy = self.res[1]
xx = np.linspace(self.bounds.left, self.bounds.right, self.width + 1)[:: int(np.sign(dx))]
yy = np.linspace(self.bounds.bottom, self.bounds.top, self.height + 1)[:: int(np.sign(dy))]
if offset == "center":
xx += dx / 2 # shift by half a pixel
yy += dy / 2
if grid:
meshgrid: tuple[np.ndarray, np.ndarray] = np.meshgrid(xx[:-1], yy[:-1]) # drop the last element
return meshgrid
else:
return xx[:-1], yy[:-1]
[docs] def xy2ij(
self,
x: np.ndarray,
y: np.ndarray,
op: type = np.float32,
area_or_point: str | None = None,
precision: float | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
Return row, column indices for a given x,y coordinate pair.
:param x: x coordinates
:param y: y coordinates
:param op: operator to calculate index
:param precision: precision for rio.Dataset.index
:param area_or_point: shift index according to GDAL AREA_OR_POINT attribute (None) or \
force position ('Point' or 'Area') of the interpretation of where the raster value \
corresponds to in the pixel ('Area' = lower left or 'Point' = center)
:returns i, j: indices of x,y in the image.
"""
if op not in [np.float32, np.float64, float]:
raise UserWarning(
"Operator is not of type float: rio.Dataset.index might "
"return unreliable indexes due to rounding issues."
)
if area_or_point not in [None, "Area", "Point"]:
raise ValueError(
'Argument "area_or_point" must be either None (falls back to GDAL metadata), "Point" or "Area".'
)
i, j = self.ds.index(x, y, op=op, precision=precision)
# # necessary because rio.Dataset.index does not return abc.Iterable for a single point
if not isinstance(i, abc.Iterable):
i, j = (
np.asarray(
[
i,
]
),
np.asarray(
[
j,
]
),
)
else:
i, j = (np.asarray(i), np.asarray(j))
# AREA_OR_POINT GDAL attribute, i.e. does the value refer to the upper left corner (AREA) or
# the center of pixel (POINT)
# This has no influence on georeferencing, it's only about the interpretation of the raster values,
# and thus only affects sub-pixel interpolation
# if input is None, default to GDAL METADATA
if area_or_point is None:
area_or_point = self.ds.tags()["AREA_OR_POINT"]
if area_or_point == "Point":
if not isinstance(i.flat[0], np.floating):
raise ValueError(
"Operator must return np.floating values to perform AREA_OR_POINT subpixel index shifting"
)
# if point, shift index by half a pixel
i += 0.5
j += 0.5
# otherwise, leave as is
return i, j
[docs] def ij2xy(self, i: np.ndarray, j: np.ndarray, offset: str = "center") -> tuple[np.ndarray, np.ndarray]:
"""
Return x,y coordinates for a given row, column index pair.
:param i: row (i) index of pixel.
:param j: column (j) index of pixel.
:param offset: return coordinates as "corner" or "center" of pixel
:returns x, y: x,y coordinates of i,j in reference system.
"""
x, y = self.ds.xy(i, j, offset=offset)
return x, y
[docs] def outside_image(self, xi: np.ndarray, yj: np.ndarray, index: bool = True) -> bool:
"""
Check whether a given point falls outside of the raster.
:param xi: Indices (or coordinates) of x direction to check.
:param yj: Indices (or coordinates) of y direction to check.
:param index: Interpret ij as raster indices (default is True). If False, assumes ij is coordinates.
:returns is_outside: True if ij is outside of the image.
"""
if not index:
xi, xj = self.xy2ij(xi, yj)
if np.any(np.array((xi, yj)) < 0):
return True
elif xi > self.width or yj > self.height:
return True
else:
return False
[docs] def interp_points(
self,
pts: np.ndarray,
input_latlon: bool = False,
mode: str = "linear",
band: int = 1,
area_or_point: str | None = None,
**kwargs: Any,
) -> np.ndarray:
"""
Interpolate raster values at a given point, or sets of points.
:param pts: Point(s) at which to interpolate raster value. If points fall outside of image,
value returned is nan. Shape should be (N,2)'
:param input_latlon: Whether the input is in latlon, unregarding of Raster CRS
:param mode: One of 'linear', 'cubic', or 'quintic'. Determines what type of spline is
used to interpolate the raster value at each point. For more information, see
scipy.interpolate.interp2d. Default is linear.
:param band: Raster band to use
:param area_or_point: shift index according to GDAL AREA_OR_POINT attribute (None) or force position\
('Point' or 'Area') of the interpretation of where the raster value corresponds to in the pixel\
('Area' = lower left or 'Point' = center)
:returns rpts: Array of raster value(s) for the given points.
"""
assert mode in [
"mean",
"linear",
"cubic",
"quintic",
"nearest",
], "mode must be mean, linear, cubic, quintic or nearest."
# get coordinates
x, y = list(zip(*pts))
# if those are in latlon, convert to Raster crs
if input_latlon:
init_crs = pyproj.CRS(4326)
dest_crs = pyproj.CRS(self.crs)
transformer = pyproj.Transformer.from_crs(init_crs, dest_crs)
x, y = transformer.transform(x, y)
i, j = self.xy2ij(x, y, op=np.float32, area_or_point=area_or_point)
ind_invalid = np.vectorize(lambda k1, k2: self.outside_image(k1, k2, index=True))(j, i)
rpts = map_coordinates(self.data[band - 1, :, :].astype(np.float32), [i, j], **kwargs)
rpts = np.array(rpts, dtype=np.float32)
rpts[np.array(ind_invalid)] = np.nan
return rpts
# #TODO: right now it's a loop... could add multiprocessing parallel loop outside,
# # but such a method probably exists already within scipy/other interpolation packages?
# for pt in pts:
# i,j = self.xy2ij(pt[0],pt[1])
# if self.outside_image(i,j, index=True):
# rpts.append(np.nan)
# continue
# else:
# x = xx[j - nsize:j + nsize + 1]
# y = yy[i - nsize:i + nsize + 1]
#
# #TODO: read only that window?
# z = self.data[band-1, i - nsize:i + nsize + 1, j - nsize:j + nsize + 1]
# if mode in ['linear', 'cubic', 'quintic', 'nearest']:
# X, Y = np.meshgrid(x, y)
# try:
# zint = griddata((X.flatten(), Y.flatten()), z.flatten(), list(pt), method=mode)[0]
# except:
# #TODO: currently fails when dealing with the edges
# print('Interpolation failed for:')
# print(pt)
# print(i,j)
# print(x)
# print(y)
# print(z)
# zint = np.nan
# else:
# zint = np.nanmean(z.flatten())
# rpts.append(zint)
# rpts = np.array(rpts)
[docs] def split_bands(self: RasterType, copy: bool = False, subset: list[int] | int | None = None) -> list[Raster]:
"""
Split the bands into separate copied rasters.
:param copy: Copy the bands or return slices of the original data.
:param subset: Optional. A subset of band indices to extract. Defaults to all.
:returns: A list of Rasters for each band.
"""
bands: list[Raster] = []
if subset is None:
indices = list(range(self.nbands))
elif isinstance(subset, int):
indices = [subset]
elif isinstance(subset, list):
indices = subset
else:
raise ValueError(f"'subset' got invalid type: {type(subset)}. Expected list[int], int or None")
if copy:
for band_n in indices:
# Generate a new Raster from a copy of the band's data
bands.append(
self.from_array(
self.data[band_n, :, :],
transform=self.transform,
crs=self.crs,
nodata=self.nodata,
)
)
else:
for band_n in indices:
# Generate a new instance with the same underlying values.
raster = Raster(self)
# Set the data to a slice of the original array
raster._data = self.data[band_n, :, :].reshape((1,) + self.data.shape[1:])
# Set the nbands
raster.nbands = 1
bands.append(raster)
return bands
@overload
def to_points(
self, subset: float | int, as_frame: Literal[True], pixel_offset: Literal["center", "corner"]
) -> gpd.GeoDataFrame:
...
@overload
def to_points(
self, subset: float | int, as_frame: Literal[False], pixel_offset: Literal["center", "corner"]
) -> np.ndarray:
...
[docs] def to_points(
self, subset: float | int = 1, as_frame: bool = False, pixel_offset: Literal["center", "corner"] = "center"
) -> np.ndarray:
"""
Subset a point cloud of the raster.
If 'subset' is either 1 or is equal to the pixel count, all points are returned in order.
If 'subset' is smaller than 1 (for fractions) or the pixel count, a random sample is returned.
If the raster is not loaded, sampling will be done from disk without loading the entire Raster.
Formats:
* `as_frame` == None | False: A numpy ndarray of shape (N, 2 + nbands) with the columns [x, y, b1, b2..].
* `as_frame` == True: A GeoPandas GeoDataFrame with the columns ["b1", "b2", ..., "geometry"]
:param subset: The point count or fraction. If 'subset' > 1, it's parsed as a count.
:param as_frame: Return a GeoDataFrame with a geometry column and crs instead of an ndarray.
:param pixel_offset: The point at which to associate the pixel coordinate with ('corner' == upper left).
:raises ValueError: If the subset count or fraction is poorly formatted.
:returns: An ndarray/GeoDataFrame of the shape (N, 2 + nbands) where N is the subset count.
"""
data_size = self.width * self.height
# Validate the subset argument.
if subset <= 0.0:
raise ValueError(f"Subset cannot be zero or negative (given value: {subset})")
# If the subset is equal to or less than 1, it is assumed to be a fraction.
if subset <= 1.0:
subset = int(data_size * subset)
else:
subset = int(subset)
if subset > data_size:
raise ValueError(f"Subset cannot exceed the size of the dataset ({subset} vs {data_size})")
# If the subset is smaller than the max size, take a random subset of indices, otherwise take the whole.
choice = np.random.randint(0, data_size - 1, subset) if subset != data_size else np.arange(data_size)
cols = choice % self.width
rows = (choice / self.width).astype(int)
# Extract the coordinates of the pixels and filter by the chosen pixels.
x_coords, y_coords = (np.array(a) for a in self.ij2xy(rows, cols, offset=pixel_offset))
# If the Raster is loaded, pick from the data, otherwise use the disk-sample method from rasterio.
if self.is_loaded:
pixel_data = self.data[:, rows, cols]
else:
pixel_data = np.array(list(self.ds.sample(zip(x_coords, y_coords)))).T
if isinstance(pixel_data, np.ma.masked_array):
pixel_data = np.where(pixel_data.mask, np.nan, pixel_data.data)
# Merge the coordinates and pixel data into a point cloud.
points = np.vstack((x_coords.reshape(1, -1), y_coords.reshape(1, -1), pixel_data)).T
if as_frame:
points = gpd.GeoDataFrame(
points[:, 2:],
columns=[f"b{i}" for i in range(1, pixel_data.shape[0] + 1)],
geometry=gpd.points_from_xy(points[:, 0], points[:, 1]),
crs=self.crs,
)
return points
[docs] def polygonize(
self, in_value: Number | tuple[Number, Number] | list[Number] | np.ndarray | Literal["all"] = 1
) -> Vector:
"""
Return a GeoDataFrame polygonized from a raster.
:param in_value: Value or range of values of the raster from which to
create geometries (Default is 1). If 'all', all unique pixel values of the raster are used.
:returns: Vector containing the polygonized geometries.
"""
# mask a unique value set by a number
if isinstance(in_value, Number):
if np.sum(self.data == in_value) == 0:
raise ValueError(f"no pixel with in_value {in_value}")
bool_msk = np.array(self.data == in_value).astype(np.uint8)
# mask values within boundaries set by a tuple
elif isinstance(in_value, tuple):
if np.sum((self.data > in_value[0]) & (self.data < in_value[1])) == 0:
raise ValueError(f"no pixel with in_value between {in_value[0]} and {in_value[1]}")
bool_msk = ((self.data > in_value[0]) & (self.data < in_value[1])).astype(np.uint8)
# mask specific values set by a sequence
elif isinstance(in_value, list) or isinstance(in_value, np.ndarray):
if np.sum(np.isin(self.data, in_value)) == 0:
raise ValueError("no pixel with in_value " + ", ".join(map("{}".format, in_value)))
bool_msk = np.isin(self.data, in_value).astype("uint8")
# mask all valid values
elif in_value == "all":
vals_for_msk = list(set(self.data.flatten()))
bool_msk = np.isin(self.data, vals_for_msk).astype("uint8")
else:
raise ValueError("in_value must be a number, a tuple or a sequence")
results = (
{"properties": {"raster_value": v}, "geometry": s}
for i, (s, v) in enumerate(shapes(self.data, mask=bool_msk, transform=self.transform))
)
gdf = gpd.GeoDataFrame.from_features(list(results))
gdf.insert(0, "New_ID", range(0, 0 + len(gdf)))
gdf.set_geometry(col="geometry", inplace=True)
gdf.set_crs(self.crs, inplace=True)
return gv.Vector(gdf)