import importlib
import itertools
import json
from functools import reduce, singledispatch, update_wrapper
import numpy as np
import pandas as pd
import networkx as nx
from osgeo import ogr
from sklearn.cluster import DBSCAN
from sklearn.neighbors import NearestNeighbors
from sklearn.neighbors import KDTree
from scipy.spatial import Delaunay
from shapely import geometry
from shapely.geometry import MultiPoint
from shapely.ops import cascaded_union, polygonize
def tile(array_size, tilesize=1000, overlap=500):
stepsize = tilesize - overlap
if stepsize < 0:
raise ValueError('Overlap can not be greater than tilesize.')
# Compute the tiles
if tilesize >= array_size[1]:
ytiles = [(0, array_size[1])]
else:
ystarts = range(0, array_size[1], stepsize)
ystops = range(tilesize, array_size[1], stepsize)
ytiles = list(zip(ystarts, ystops))
ytiles.append((ytiles[-1][0] + stepsize, array_size[1]))
if tilesize >= array_size[0]:
xtiles = [(0, array_size[0])]
else:
xstarts = range(0, array_size[0], stepsize)
xstops = range(tilesize, array_size[0], stepsize)
xtiles = list(zip(xstarts, xstops))
xtiles.append((xtiles[-1][0] + stepsize, array_size[0]))
tiles = itertools.product(xtiles, ytiles)
slices = []
for tile in tiles:
# xstart, ystart, xcount, ycount
xstart = tile[0][0]
ystart = tile[1][0]
xstop = tile[0][1]
ystop = tile[1][1]
pixels = [xstart, ystart,
xstop - xstart,
ystop - ystart]
slices.append(pixels)
return slices
[docs]def compare_dicts(d, o):
"""
Given two dictionaries, compare them with support for np.ndarray and
pd.DataFrame objects
Parameters
----------
d : dict
first dict to compare
o : dict
second dict to compare
Examples
--------
>>> d = {'a':0}
>>> o = {'a':0}
>>> compare_dicts(d, o)
True
>>> d['a'] = 1
>>> compare_dicts(d,o)
False
>>> d['a'] = np.arange(3)
>>> o['a'] = np.arange(3)
>>> compare_dicts(d,o)
True
"""
for k in o.keys():
if k not in d.keys():
return False
for k, v in d.items():
if v is None and o[k] is not None:
return False
if isinstance(v, pd.DataFrame):
if not v.equals(o[k]):
return False
elif isinstance(v, np.ndarray):
if not np.allclose(v, o[k]):
return False
else:
if k == '_geodata':
continue
if not v == o[k]:
return False
return True
[docs]def normalize_vector(line):
"""
Normalize a standard form line
Parameters
----------
line : ndarray
Standard form of a line (Ax + By + C = 0)
Returns
-------
line : ndarray
The normalized line
Examples
--------
>>> x = np.array([3, 1, 2])
>>> nv = normalize_vector(x)
>>> print(np.round(nv, 6)) # For doc test float percision
[0.801784 0.267261 0.534522]
"""
if isinstance(line, pd.DataFrame):
line = line.values
n = np.sqrt((line[0]**2 + line[1]**2 + line[2]**2))
return line / abs(n)
[docs]def getnearest(iterable, value):
"""
Given an iterable, get the index nearest to the input value
Parameters
----------
iterable : iterable
An iterable to search
value : int, float
The value to search for
Returns
-------
: int
The index into the list
"""
return min(enumerate(iterable), key=lambda i: abs(i[1] - value))
[docs]def checkbandnumbers(bands, checkbands):
"""
Given a list of input bands, check that the passed
tuple contains those bands.
In case of THEMIS, we check for band 9 as band 9 is the temperature
band required to derive thermal temperature. We also check for band 10
which is required for TES atmosphere calculations.
Parameters
----------
bands : tuple
of bands in the input image
checkbands : list
of bands to check against
Returns
-------
: bool
True if the bands are present, else False
"""
for c in checkbands:
if c not in bands:
return False
return True
[docs]def checkdeplaid(incidence):
"""
Given an incidence angle, select the appropriate deplaid method.
Parameters
----------
incidence : float
incidence angle extracted from the campt results.
"""
if incidence >= 95 and incidence <= 180:
return 'night'
elif incidence >=90 and incidence < 95:
return 'night'
elif incidence >= 85 and incidence < 90:
return 'day'
elif incidence >= 0 and incidence < 85:
return 'day'
else:
return False
[docs]def checkmonotonic(iterable, piecewise=False):
"""
Check if a given iterable is monotonically increasing.
Parameters
----------
iterable : iterable
Any Python iterable object
piecewise : boolean
If false, return a boolean for the entire iterable,
else return a list with elementwise monotinicy checks
Returns
-------
monotonic : bool/list
A boolean list of all True if monotonic, or including
an inflection point
"""
monotonic = [True] + [x < y for x, y in zip(iterable, iterable[1:])]
if piecewise is True:
return monotonic
else:
return all(monotonic)
[docs]def find_in_dict(obj, key):
"""
Recursively find an entry in a dictionary
Parameters
----------
obj : dict
The dictionary to search
key : str
The key to find in the dictionary
Returns
-------
item : obj
The value from the dictionary
"""
if key in obj:
return obj[key]
for k, v in obj.items():
if isinstance(v,dict):
item = find_in_dict(v, key)
if item is not None:
return item
[docs]def find_nested_in_dict(data, key_list):
"""
Traverse a list of keys into a dict.
Parameters
----------
data : dict
The dictionary to be traversed
key_list: list
The list of keys to be travered. Keys are
traversed in the order they are entered in
the list
Returns
-------
value : object
The value in the dict
"""
return reduce(lambda d, k: d[k], key_list, data)
[docs]def make_homogeneous(points):
"""
Convert a set of points (n x dim array) to
homogeneous coordinates.
Parameters
----------
points : arraylike
n x m array of points, where n is the number
of points.
Returns
-------
: arraylike
n x m + 1 array of homogeneous points
"""
homogeneous = np.hstack((points, np.ones((points.shape[0], 1))))
if isinstance(points, pd.DataFrame):
columns = points.columns.values.tolist() + ['z']
homogeneous = pd.DataFrame(homogeneous, index=points.index,
columns=columns)
return homogeneous
[docs]def remove_field_name(a, name):
"""
Given a numpy structured array, remove a column and return
a copy of the remainder of the array
Parameters
----------
a : ndarray
Numpy structured array
name : str
of the index (column) to be removed
Returns
-------
b : ndarray
Numpy structured array with the 'name' column removed
"""
names = list(a.dtype.names)
if name in names:
names.remove(name)
b = a[names]
return b
[docs]def calculate_slope(x1, x2):
"""
Calculates the 2-dimensional slope between the points in two dataframes each containing two columns ['x', 'y']
The slope is calculated from x1 to x2.
Parameters
----------
x1 : dataframe
Each row is a point with columns ['x', 'y']
x2 : dataframe
Each row is a point with columns ['x', 'y']
Returns
-------
: dataframe
A dataframe with the slope between the points in x1 and x2 for each row.
"""
sl = False
if isinstance(x1, pd.DataFrame):
index = x1.index
sl = True
x1 = x1.values
if isinstance(x2, pd.DataFrame):
x2 = x2.values
slopes = (x2[:,1] - x1[:,1])/(x2[:,0] - x1[:,0])
if sl:
slopes = pd.Series(slopes, index=index)
return slopes
[docs]def cartesian(arrays, out=None):
"""
Generate a cartesian product of input arrays.
Parameters
----------
arrays : list of array-like
1-D arrays to form the cartesian product of.
out : ndarray
Array to place the cartesian product in.
Returns
-------
out : ndarray
2-D array of shape (M, len(arrays)) containing cartesian products
formed of input arrays.
from scikit-learn
https://github.com/scikit-learn/scikit-learn/blob/master/sklearn/utils/extmath.py
"""
arrays = [np.asarray(x) for x in arrays]
shape = (len(x) for x in arrays)
dtype = arrays[0].dtype
ix = np.indices(shape)
ix = ix.reshape(len(arrays), -1).T
if out is None:
out = np.empty_like(ix, dtype=dtype)
for n, arr in enumerate(arrays):
out[:, n] = arrays[n][ix[:, n]]
return out
[docs]def array_to_poly(array):
"""
Generate a geojson geom
Parameters
----------
array : array-like
2-D array of size (n, 2) of x, y coordinates
Returns
-------
geom : GeoJson
geojson containing the necessary data to construct
a poly gon
"""
array = np.asarray(array)
size = np.shape(array)
if size[1] != 2:
raise ValueError('Array is not the proper size.')
return
geom_array = np.append(array, [array[0]], axis = 0).tolist()
geom = {"type": "Polygon", "coordinates": [geom_array]}
poly = ogr.CreateGeometryFromJson(json.dumps(geom))
return poly
[docs]def methodispatch(func):
"""
New dispatch decorator that looks at the second argument to
avoid self
Parameters
----------
func : Object
Function object to be dispatched
Returns
wrapper : Object
Wrapped function call chosen by the dispatcher
----------
"""
dispatcher = singledispatch(func)
def wrapper(*args, **kw):
return dispatcher.dispatch(args[1].__class__)(*args, **kw)
wrapper.register = dispatcher.register
update_wrapper(wrapper, dispatcher)
return wrapper
[docs]def decorate_class(cls, decorator, exclude=[], *args, **kwargs): # pragma: no cover
"""
Decorates a class with a give docorator. Returns a subclass with
dectorations applied
Parameters
----------
cls : Class
A class to be decorated
decorator : callable
callable to wrap cls's methods with
exclude : list
list of method names to exclude from being decorated
args, kwargs : list, dict
Parameters to pass into decorator
"""
if not callable(decorator):
raise Exception('Decorator must be callable.')
def decorate(cls):
attributes = cls.__dict__.keys()
for attr in attributes: # there's propably a better way to do this
if callable(getattr(cls, attr)):
name = getattr(cls, attr).__name__
if name[0] == '_' or name in exclude:
continue
setattr(cls, attr, decorator(getattr(cls, attr)))
return cls
# return decorated copy (i.e. a subclass with decorations)
return decorate(type('cls_copy', cls.__bases__, dict(cls.__dict__)))
[docs]def create_decorator(dec, **namespace):
"""
Create a decorator function using arbirary params. The objects passed in
can be used in the body. Originally designed with the idea of automatically
updating one object after the decorated object was modified.
"""
def decorator(func, *args, **kwargs):
def wrapper(*args, **kwarg):
for key in namespace.keys():
locals()[key] = namespace[key]
ret = func(*args, **kwargs)
exec(dec.__code__, locals(), globals())
if ret:
return ret
return wrapper
return decorator
[docs]def bytescale(data, cmin=None, cmax=None, high=255, low=0):
"""
This is pulled directly from scipy.misc as they are deprecating bytescale.
Byte scales an array (image).
Byte scaling means converting the input image to uint8 dtype and scaling
the range to ``(low, high)`` (default 0-255).
If the input image already has dtype uint8, no scaling is done.
This function is only available if Python Imaging Library (PIL) is installed.
Parameters
----------
data : ndarray
PIL image data array.
cmin : scalar, optional
Bias scaling of small values. Default is ``data.min()``.
cmax : scalar, optional
Bias scaling of large values. Default is ``data.max()``.
high : scalar, optional
Scale max value to `high`. Default is 255.
low : scalar, optional
Scale min value to `low`. Default is 0.
Returns
-------
img_array : uint8 ndarray
The byte-scaled array.
Examples
--------
>>> from scipy.misc import bytescale
>>> img = np.array([[ 91.06794177, 3.39058326, 84.4221549 ],
... [ 73.88003259, 80.91433048, 4.88878881],
... [ 51.53875334, 34.45808177, 27.5873488 ]])
>>> bytescale(img)
array([[255, 0, 236],
[205, 225, 4],
[140, 90, 70]], dtype=uint8)
>>> bytescale(img, high=200, low=100)
array([[200, 100, 192],
[180, 188, 102],
[155, 135, 128]], dtype=uint8)
>>> bytescale(img, cmin=0, cmax=255)
array([[91, 3, 84],
[74, 81, 5],
[52, 34, 28]], dtype=uint8)
"""
if data.dtype == np.uint8:
return data
if high > 255:
raise ValueError("`high` should be less than or equal to 255.")
if low < 0:
raise ValueError("`low` should be greater than or equal to 0.")
if high < low:
raise ValueError("`high` should be greater than or equal to `low`.")
if cmin is None:
cmin = data.min()
if cmax is None:
cmax = data.max()
cscale = cmax - cmin
if cscale < 0:
raise ValueError("`cmax` should be larger than `cmin`.")
elif cscale == 0:
cscale = 1
scale = float(high - low) / cscale
bytedata = (data - cmin) * scale + low
return (bytedata.clip(low, high) + 0.5).astype(np.uint8)
[docs]def import_func(func):
"""
Imports a function from the autocnet package.
Parameters
----------
func : str
import path. For example, to import the place_points_in_overlap function,
this func can be called with: 'spatial.overlap.place_points_in_overlap'
Returns
-------
func : obj
The function object for use.
"""
if not func[0] == '.':
# Since this intentionally forces the package to be autocnet
# need the import path relative to the package name. Convenience
# for the caller to add the '.' so they don't get a cryptic
# ModuleImportError.
func = f'.{func}'
module, func = func.rsplit('.', 1)
module = importlib.import_module(module, package='autocnet')
func = getattr(module, func)
return func
[docs]def compute_depression(input_dem, scale_factor=1, curvature_percentile=75):
"""
Compute depressions and return a new image with larges depressions filled in.
Parameters
----------
input_dem : np.array, rd.rdarray
2d array of elevation DNs, a DEM
scale_factor : float
Value to scale the erotion of planform curvatures by
curvature_percentile : float
what percentile of the curvature to keep, lower values
results in bigger blobs
Returns
-------
dem : rd.rdarray
Dem with filled depressions
mask : np.array
Change mask, true on pixels that have been changed
"""
if isinstance(input_dem, np.ndarray):
dem = rd.rdarray(input_dem.copy(), no_data=0)
elif isinstance(input_dem, rd.rdarray):
# take ownership of the reference
dem = input_dem.copy()
# create filled DEM
demfilled = rd.FillDepressions(dem, epsilon=True, in_place=False, topology="D8")
# Mask out filled areas
mask = np.abs(dem-demfilled)
thresh = np.percentile(mask, 95)
mask[mask <= thresh] = False
mask[mask > thresh] = True
curvatures = rd.TerrainAttribute(dem, attrib='planform_curvature')
curvatures = (curvatures - np.min(curvatures))/np.ptp(curvatures)
curvatures[curvatures < np.percentile(curvatures, curvature_percentile)] = 0
curvatures[mask.astype(bool)] = 0
demfilled -= curvatures * scale_factor
mask = (curvatures+mask).astype(bool)
# Get 3rd nn distance
coords = np.argwhere(mask)
nbrs = NearestNeighbors(n_neighbors=3, algorithm='kd_tree').fit(coords)
dists, _ = nbrs.kneighbors(coords)
eps = np.percentile(dists, 95)
# Cluster
db = DBSCAN(eps=eps, min_samples=3).fit(coords)
labels = db.labels_
unique, counts = np.unique(labels, return_counts=True)
# First count are outliers, ignore
counts = counts[1:]
unique = unique[1:]
index = np.argwhere(counts == counts.max())
group = unique[index][0][0]
cluster = coords[labels == group]
# mask out depression
dmask = np.full(dem.shape, False)
dmask[[*cluster.T]] = True
dem[dmask] = 0
demfilled[~dmask] = 0
dem = dem+demfilled
return dem, dmask
[docs]def rasterize_polygon(shape, vertices, dtype=bool):
"""
Simple tool to convert poly into a boolean numpy array.
source: https://stackoverflow.com/questions/37117878/generating-a-filled-polygon-inside-a-numpy-array
Parameters
----------
shape : tuple
size of the array in (y,x) format
vertices : np.array, list
array of vertices in [[x0, y0], [x1, y1]...] format
dtype : type
datatype of output mask
Returns
-------
mask : np.array
mask with filled polygon set to true
"""
def check(p1, p2, base_array):
idxs = np.indices(base_array.shape) # Create 3D array of indices
p1 = p1.astype(float)
p2 = p2.astype(float)
# Calculate max column idx for each row idx based on interpolated line between two points
if p1[0] == p2[0]:
max_col_idx = (idxs[0] - p1[0]) * idxs.shape[1]
sign = np.sign(p2[1] - p1[1])
else:
max_col_idx = (idxs[0] - p1[0]) / (p2[0] - p1[0]) * (p2[1] - p1[1]) + p1[1]
sign = np.sign(p2[0] - p1[0])
return idxs[1] * sign <= max_col_idx * sign
base_array = np.zeros(shape, dtype=dtype) # Initialize your array of zeros
fill = np.ones(base_array.shape) * True # Initialize boolean array defining shape fill
# Create check array for each edge segment, combine into fill array
for k in range(vertices.shape[0]):
fill = np.all([fill, check(vertices[k-1], vertices[k], base_array)], axis=0)
print(fill.any())
# Set all values inside polygon to one
base_array[fill] = 1
return base_array
[docs]def generate_dem(alpha=1.0, size=800, scales=[160,80,32,16,8,4,2,1], scale_factor=5):
"""
Produces a random DEM
Parameters
----------
alpha : float
Controls height variation. Lower number makes a shallower and noisier DEM,
higher values create smoother DEM with large peaks and valleys.
Reccommended range = (0, 1.5]
size : int
size of DEM, output DEM is in the shape of (size, size)
scale_factor : float
Scalar to multiply the slope degradation by, higher values = more erosion.
Recommended to increase proportionately with alpha
(higher alphas mean you might want higher scale_factor)
Returns
-------
dem : np.array
DEM array in the shape (size, size)
"""
topo=np.zeros((2,2))+random.rand(2,2)*(200/(2.**alpha))
for k in range(len(scales)):
nn = size/scales[k]
topo = scipy.misc.imresize(topo, (int(nn), int(nn)), "cubic", mode="F")
topo = topo + random.rand(int(nn), int(nn))*(200/(nn**alpha))
topo = rd.rdarray(topo, no_data=0)
curvatures = rd.TerrainAttribute(topo, attrib='slope_riserun')
curvatures = (curvatures - np.min(curvatures))/np.ptp(curvatures) * scale_factor
return topo - curvatures
[docs]def hillshade(img, azi=255, min_slope=20, max_slope=100, min_bright=0, grayscale=False):
"""
hillshade a DEM, based on IDL code by Colin Dundas
Parameters
----------
img : np.array
DEM to hillshade
azi : float
Sun azimuth
min_slope : float
minimum slope value
max_slope : float
maximum slope value
min_bright : float
minimum brightness
grayscale : bool
whether or not to produce grayscale image
Returns
-------
dem : np.array
hillshaded DEM
"""
dem = np.array(np.flip(bytescale(img), axis = 0), dtype=int)
emax = np.max(dem)
emin = np.min(dem)
indices = np.linspace(0, 255, 256) / 25.5
red_array = [0,25,50,101,153,204,255,255,255,255,255,255]
red_index = np.arange(len(red_array))
red_vec = np.interp(indices, red_index, red_array)
green_array = [42,101,153,204,237,255,255,238,204,153,102,42]
green_index = np.arange(len(green_array))
green_vec = np.interp(indices, green_index, green_array)
blue_array = [255,255,255,255,255,255,204,153,101,50,25,0]
blue_index = np.arange(len(blue_array))
blue_vec = np.interp(indices, blue_index, blue_array)
zz = (255.0/(emax-emin))*(dem-emin)
zz = zz.astype(int)
nx = (np.roll(dem, 1, axis = 1) - dem)
ny = (np.roll(dem, 1, axis = 0) - dem)
sz = np.shape(nx)
nz = np.ones(sz)
nl = np.sqrt(np.power(nx, 2.0) + np.power(ny, 2.0) + np.power(nz, 2.0))
nx = nx/nl
ny = ny/nl
nz = nz/nl
math.cos(math.radians(1))
azi_rad = math.radians(azi)
alt_rad = math.radians(alt)
lx = math.sin(azi_rad)*math.cos(alt_rad)
ly = math.cos(azi_rad)*math.cos(alt_rad)
lz = math.sin(alt_rad)
dprod = nx*lx + ny*ly + nz*lz
if min_slope is not None:
min_dprod = math.cos(math.radians(max_slope + 90.0 - alt))
else:
min_dprod = np.min(dprod)
if max_slope is not None:
max_dprod = math.cos(math.radians(90.0 - alt - max_slope))
else:
max_dprod = np.max(dprod)
bright = ((dprod - min_dprod) + min_bright)/((max_dprod - min_dprod) + min_bright)
if grayscale:
qq=(255*bright)
else:
qq = red_vec[zz]*bright
if grayscale:
rr = (255*bright)
else:
rr = green_vec[zz]*bright
if grayscale:
ss=(255*bright)
else:
ss = blue_vec[zz]*bright
arrforout = np.dstack((qq, rr ,ss))
arrforout = np.flip(arrforout.astype(int), axis = 0)
arrfotout = bytescale(arrforout)
arrforout.shape
return arrforout