from collections import defaultdict, OrderedDict
from contextlib import contextmanager
import itertools
import json
import math
import os
from shutil import copyfile
import threading
from time import gmtime, strftime, time
import warnings
from itertools import combinations
import networkx as nx
import geopandas as gpd
import pandas as pd
import numpy as np
from redis import StrictRedis
import shapely
import scipy.special
import geoalchemy2
from sqlalchemy.orm.decl_api import DeclarativeMeta
from sqlalchemy.sql import func
import shapely.affinity
import shapely.geometry
import shapely.wkt as swkt
import shapely.ops
from plio.io.io_controlnetwork import to_isis, from_isis
from plio.io import io_hdf, io_json
from plio.utils import utils as io_utils
from plio.io.io_gdal import GeoDataset
from plio.io.isis_serial_number import generate_serial_number
from plio.io import io_controlnetwork as cnet
from plurmy import Slurm
import autocnet
from autocnet.config_parser import parse_config
from autocnet.cg import cg
from autocnet.graph.asynchronous_funcs import watch_insert_queue, watch_update_queue
from autocnet.graph import markov_cluster
from autocnet.graph.edge import Edge, NetworkEdge
from autocnet.graph.node import Node, NetworkNode
from autocnet.io import network as io_network
from autocnet.io.db import controlnetwork as io_controlnetwork
from autocnet.io.db.model import (Images, Keypoints, Matches, Cameras, Points,
Base, Overlay, Edges, Costs, Measures, CandidateGroundPoints,
JsonEncoder, try_db_creation)
from autocnet.io.db.connection import new_connection, Parent
from autocnet.matcher import subpixel
from autocnet.matcher import cross_instrument_matcher as cim
from autocnet.vis.graph_view import plot_graph, cluster_plot
from autocnet.control import control
from autocnet.spatial.overlap import compute_overlaps_sql
from autocnet.spatial.isis import point_info
from autocnet.spatial.surface import GdalDem, EllipsoidDem
from autocnet.transformation.spatial import reproject, og2oc
#np.warnings.filterwarnings('ignore')
# The total number of pixels squared that can fit into the keys number of GB of RAM for SIFT.
MAXSIZE = {0: None,
2: 6250,
4: 8840,
8: 12500,
12: 15310}
[docs]class CandidateGraph(nx.Graph):
"""
A NetworkX derived directed graph to store candidate overlap images.
Attributes
----------
node_counter : int
The number of nodes in the graph.
node_name_map : dict
The mapping of image labels (i.e. file base names) to their
corresponding node indices
clusters : dict
of clusters with key as the cluster id and value as a
list of node indices
cn : object
A control network object instantiated by calling generate_cnet.
----------
"""
node_factory = Node
edge_factory = Edge
measures_keys = ['point_id', 'image_index', 'keypoint_index',
'edge', 'match_idx', 'x', 'y', 'x_off', 'y_off', 'corr']
# dtypes are usful for allowing merges, otherwise they default to object
cnet_dtypes = {
'match_idx' : int,
'point_id' : int,
'image_index' : int,
'keypoint_index' : int
}
def __init__(self, *args, basepath=None, node_id_map=None, overlaps=False, **kwargs):
super(CandidateGraph, self).__init__(*args, **kwargs)
self.graph['creationdate'] = strftime("%Y-%m-%d %H:%M:%S", gmtime())
self.graph['modifieddate'] = strftime("%Y-%m-%d %H:%M:%S", gmtime())
self.graph['node_name_map'] = {}
self.graph['node_counter'] = 1
self._point_id = 0
self._measure_id = 0
self.measure_to_point = {}
self.controlnetwork = pd.DataFrame(columns=self.measures_keys).astype(self.cnet_dtypes)
self.masks = pd.DataFrame()
for i, n in self.nodes(data=True):
if basepath:
image_path = os.path.join(basepath, i)
else:
image_path = i
if node_id_map:
node_id = node_id_map[image_path]
else:
node_id = self.graph['node_counter']
self.graph['node_counter'] += 1
n['data'] = self.node_factory(
image_name=i, image_path=image_path, node_id=node_id)
self.graph['node_name_map'][i] = node_id
# Relabel the nodes in place to use integer node ids
nx.relabel_nodes(self, self.graph['node_name_map'], copy=False)
for s, d, e in self.edges(data=True):
if s > d:
s, d = d, s
edge = self.edge_factory(
self.nodes[s]['data'], self.nodes[d]['data'])
# Unidrected graph - both representation point at the same data
self.edges[s, d]['data'] = edge
self.edges[d, s]['data'] = edge
if overlaps:
self.compute_overlaps()
def __key(self):
# TODO: This needs to be a real self identifying key
return 'abcde'
def __hash__(self):
return hash(self.__key())
def __eq__(self, other):
# Check the nodes
if sorted(self.nodes()) != sorted(other.nodes()):
return False
for node in self.nodes:
if not self.nodes[node] == other.nodes[node]:
return False
if sorted(self.edges()) != sorted(other.edges()):
return False
for s, d, e in self.edges.data('data'):
if s > d:
s, d = d, s
if not e == other.edges[(s, d)]['data']:
return False
return True
def _order_adjacency(self): # pragma: no cover
self.adj = OrderedDict(sorted(self.adj.items()))
@property
def maxsize(self):
if not hasattr(self, '_maxsize'):
self._maxsize = MAXSIZE[0]
return self._maxsize
@maxsize.setter
def maxsize(self, value):
if not value in MAXSIZE.keys():
raise KeyError('Value must be in {}'.format(
','.join(map(str, MAXSIZE.keys()))))
else:
self._maxsize = MAXSIZE[value]
@property
def unmatched_edges(self):
"""
Returns a list of edges (source, destination) that do not have
entries in the matches dataframe.
"""
unmatched = []
for s, d, e in self.edges(data='data'):
if len(e.matches) == 0:
unmatched.append((s,d))
return unmatched
[docs] @classmethod
def from_filelist(cls, filelist, basepath=None):
"""
Instantiate the class using a filelist as a python list.
An adjacency structure is calculated using the lat/lon information in the
input images. Currently only images with this information are supported.
Parameters
----------
filelist : list
A list containing the files (with full paths) to construct an adjacency graph from
Returns
-------
: object
A Network graph object
"""
if isinstance(filelist, str):
filelist = io_utils.file_to_list(filelist)
# TODO: Reject unsupported file formats + work with more file formats
if basepath:
datasets = [GeoDataset(os.path.join(basepath, f))
for f in filelist]
else:
datasets = [GeoDataset(f) for f in filelist]
# This is brute force for now, could swap to an RTree at some point.
adjacency_dict = {}
valid_datasets = []
for i in datasets:
adjacency_dict[i.file_name] = []
fp = i.footprint
if fp and fp.IsValid():
valid_datasets.append(i)
else:
warnings.warn(
'Missing or invalid geospatial data for {}'.format(i.base_name))
# Grab the footprints and test for intersection
for i, j in itertools.permutations(valid_datasets, 2):
i_fp = i.footprint
j_fp = j.footprint
try:
if i_fp.Intersects(j_fp):
adjacency_dict[i.file_name].append(j.file_name)
adjacency_dict[j.file_name].append(i.file_name)
except:
warnings.warn(
'Failed to calculate intersection between {} and {}'.format(i, j))
return cls.from_adjacency(adjacency_dict)
[docs] @classmethod
def from_adjacency(cls, input_adjacency, node_id_map=None, basepath=None, **kwargs):
"""
Instantiate the class using an adjacency dict or file. The input must contain relative or
absolute paths to image files.
Parameters
----------
input_adjacency : dict or str
An adjacency dictionary or the name of a file containing an adjacency dictionary.
Returns
-------
: object
A Network graph object
Examples
--------
>>> from autocnet.examples import get_path
>>> inputfile = get_path('adjacency.json')
>>> candidate_graph = CandidateGraph.from_adjacency(inputfile)
>>> sorted(candidate_graph.nodes())
[0, 1, 2, 3, 4, 5]
"""
if not isinstance(input_adjacency, dict):
input_adjacency = io_json.read_json(input_adjacency)
return cls(input_adjacency, basepath=basepath, node_id_map=node_id_map, **kwargs)
@classmethod
def from_save(cls, input_file):
return io_network.load(input_file)
def _update_date(self):
"""
Update the last modified date attribute.
"""
self.graph['modifieddate'] = strftime("%Y-%m-%d %H:%M:%S", gmtime())
[docs] def get_name(self, node_index):
"""
Get the image name for the given node.
Parameters
----------
node_index : int
The index of the node.
Returns
-------
: str
The name of the image attached to the given node.
"""
return self.nodes[node_index]['data']['image_name']
def get_matches(self, clean_keys=[]):
matches = []
for s, d, e in self.edges_iter(data=True):
match, _ = e.clean(clean_keys=clean_keys)
match = match[['source_image', 'source_idx',
'destination_image', 'destination_idx']]
skps = e.get_keypoints('source', index=match.source_idx)
skps.columns = ['source_x', 'source_y']
dkps = e.get_keypoints('destination', index=match.destination_idx)
dkps.columns = ['destination_x', 'destination_y']
match = match.join(skps, on='source_idx')
match = match.join(dkps, on='destination_idx')
# TODO: This is a bandaid fix, join is creating an insane amount of duplicate points
match = match.drop_duplicates()
matches.append(match)
return matches
[docs] def add_node(self, n=None, **attr):
"""
Adds an image node to the graph.
Parameters
----------
image_name : str
The file name of the node
adjacency : str list
List of files names of adjacent images that correspond
to names in CandidateGraph.graph["node_name_map"]
basepath : str
The base path to the node image file
"""
image_name = attr.pop("image_name", None)
adj = attr.pop("adjacency", None)
new_node = None
# If image name is provided, build the node from the image before
# calling nx.add_node()
if image_name is not None:
if "basepath" in attr.keys():
image_path = os.path.join(attr.pop("basepath"), image_name)
else:
image_path = image_name
if not os.path.exists(image_path):
warnings.warn("Cannot find {}".format(image_path))
return
n = self.graph["node_counter"]
self.graph["node_counter"] += 1
new_node = Node(image_name=image_name,
image_path=image_path,
node_id=n)
self.graph["node_name_map"][new_node["image_name"]
] = new_node["node_id"]
attr["data"] = new_node
# Add the new node to the graph using networkx
super(CandidateGraph, self).add_node(n, **attr)
# Populate adjacency, if provided
if new_node is not None and adj is not None:
for adj_img in adj:
if adj_img not in self.graph["node_name_map"].keys():
warnings.warn("{} not found in the graph".format(adj_img))
continue
new_idx = new_node["node_id"]
adj_idx = self.graph["node_name_map"][adj_img]
self.add_edge(adj_img, new_node["image_name"])
[docs] def add_edge(self, u, v, **attr):
"""
Adds an edge with the given src and dst nodes to the graph
Parameters
----------
u : str
The filename of the source image for the edge
v : Node
The filename of the destination image for the edge
"""
if ("node_name_map" in self.graph.keys() and
u in self.graph["node_name_map"].keys() and
v in self.graph["node_name_map"].keys()):
# Grab node ids & create edge obj
s_id = self.graph["node_name_map"][u]
d_id = self.graph["node_name_map"][v]
new_edge = Edge(self.nodes[s_id]["data"], self.nodes[d_id]["data"])
# Prepare data for networkx
u = s_id
v = d_id
attr["data"] = new_edge
# Add the new edge to the graph using networkx
super(CandidateGraph, self).add_edge(u, v, **attr)
[docs] def extract_features(self, band=1, *args, **kwargs): # pragma: no cover
"""
Extracts features from each image in the graph and uses the result to assign the
node attributes for 'handle', 'image', 'keypoints', and 'descriptors'.
"""
for i, node in self.nodes.data('data'):
array = node.geodata.read_array(band=band)
node.extract_features(array, *args, **kwargs),
[docs] def extract_features_with_downsampling(self, downsample_amount=None, *args, **kwargs): # pragma: no cover
"""
Extract interest points from a downsampled array. The array is downsampled
by the downsample_amount keyword using the Lanconz downsample amount. If the
downsample keyword is not supplied, compute a downsampling constant as the
total array size divided by the network maxsize attribute.
Parameters
----------
downsample_amount : int
The amount of downsampling to apply to the image
"""
for node in self.nodes:
if downsample_amount == None:
total_size = node.geodata.raster_size[0] * \
node.geodata.raster_size[1]
downsample_amount = math.ceil(total_size / self.maxsize**2)
node.extract_features_with_downsampling(
downsample_amount, *args, **kwargs)
[docs] def extract_features_with_tiling(self, *args, **kwargs): #pragma: no cover
"""
"""
self.apply(Node.extract_features_with_tiling, args=args, **kwargs)
[docs] def save_features(self, out_path):
"""
Save the features (keypoints and descriptors) for the
specified nodes.
Parameters
----------
out_path : str
Location of the output file. If the file exists,
features are appended. Otherwise, the file is created.
"""
self.apply(Node.save_features, args=(out_path,), on='node')
[docs] def load_features(self, in_path, nodes=[], nfeatures=None, **kwargs):
"""
Load features (keypoints and descriptors) for the
specified nodes.
Parameters
----------
in_path : str
Location of the input file.
nodes : list
of nodes to load features for. If empty, load features
for all nodes
"""
self.apply(Node.load_features, args=(in_path, nfeatures), on='node', **kwargs)
for n in self.nodes:
if n['node_id'] not in nodes:
continue
else:
n.load_features(in_path, **kwargs)
[docs] def match(self, *args, **kwargs):
"""
For all connected edges in the graph, apply feature matching
See Also
----------
autocnet.graph.edge.Edge.match
"""
self.apply_func_to_edges('match', *args, **kwargs)
[docs] def decompose_and_match(self, *args, **kwargs):
"""
For all edges in the graph, apply coupled decomposition followed by
feature matching.
See Also
--------
autocnet.graph.edge.Edge.decompose_and_match
"""
self.apply_func_to_edges('decompose_and_match', *args, **kwargs)
[docs] def estimate_mbrs(self, *args, **kwargs):
"""
For each edge, estimate the overlap and compute a minimum bounding
rectangle (mbr) in pixel space.
See Also
--------
autocnet.graoh.edge.Edge.compute_mbr
"""
self.apply_func_to_edges('estimate_mbr', *args, **kwargs)
[docs] def compute_clusters(self, func=markov_cluster.mcl, *args, **kwargs):
"""
Apply some graph clustering algorithm to compute a subset of the global
graph.
Parameters
----------
func : object
The clustering function to be applied. Defaults to
Markov Clustering Algorithm
args : list
of arguments to be passed through to the func
kwargs : dict
of keyword arguments to be passed through to the func
"""
_, self.clusters = func(self, *args, **kwargs)
[docs] def compute_triangular_cycles(self):
"""
Find all cycles of length 3. This is similar
to cycle_basis (networkX), but returns all cycles.
As opposed to all basis cycles.
Returns
-------
cycles : list
A list of cycles in the form [(a,b,c), (c,d,e)],
where letters indicate node identifiers
Examples
--------
>>> g = CandidateGraph()
>>> g.add_edges_from([(0,1), (0,2), (1,2), (0,3), (1,3), (2,3)])
>>> sorted(g.compute_triangular_cycles())
[(0, 1, 2), (0, 1, 3), (0, 2, 3), (1, 2, 3)]
"""
cycles = []
for s, d in self.edges:
for n in self.nodes:
if(s, n) in self.edges and (d, n) in self.edges:
cycles.append(tuple(sorted([s, d, n])))
return set(cycles)
[docs] def minimum_spanning_tree(self):
"""
Calculates the minimum spanning tree of the graph
Returns
-------
: DataFrame
boolean mask for edges in the minimum spanning tree
"""
mst = nx.minimum_spanning_tree(self)
return self.create_edge_subgraph(mst.edges())
[docs] def apply_func_to_edges(self, function, nodes=[], *args, **kwargs):
"""
Iterates over edges using an optional mask and and applies the given function.
If func is not an attribute of Edge, raises AttributeError
Parameters
----------
function : obj
function to be called on every edge
graph_mask_keys : list
of keys in graph_masks
"""
return_lis = []
if callable(function):
function = function.__name__
for s, d, edge in self.edges.data('data'):
try:
func = getattr(edge, function)
except:
raise AttributeError(function, ' is not an attribute of Edge')
else:
ret = func(*args, **kwargs)
return_lis.append(ret)
if any(return_lis):
return return_lis
[docs] def apply(self, function, on='edge', out=None, args=(), **kwargs):
"""
Applys a function to every node or edge, returns collected return
values. If applying a functions to nodes, then all ignored nodes
will be skipped.
TODO: Merge with apply_func_to_edges?
Parameters
----------
function : callable
Function to apply to graph. Should accept (id, data).
on : string
Whether to use nodes or edges. default is 'edge'.
out : var
Optionally put the output in a variable rather than returning it
args : iterable
Some iterable of positional arguments for function.
kwargs : dict
keyword args to pass into function.
"""
options = {
'edge': self.edges_iter,
'edges': self.edges_iter,
'e': self.edges_iter,
0: self.edges_iter,
'node': self.nodes_iter,
'nodes': self.nodes_iter,
'n': self.nodes_iter,
1: self.nodes_iter
}
if not callable(function):
raise TypeError('{} is not callable.'.format(function))
res = []
obj = 1
# We just want to the object, not the indices, so slice appropriately
if options[on] == self.edges_iter:
obj = 2
for elem in options[on](data=True):
if getattr(elem[obj], 'ignore', False):
continue
res.append(function(elem[obj], *args, **kwargs))
if out:
out = res
else:
return res
[docs] def symmetry_checks(self):
'''
Apply a symmetry check to all edges in the graph
'''
self.apply_func_to_edges('symmetry_check')
[docs] def ratio_checks(self, *args, **kwargs):
'''
Apply a ratio check to all edges in the graph
See Also
--------
autocnet.matcher.cpu_outlier_detector.DistanceRatio.compute
'''
self.apply_func_to_edges('ratio_check', *args, **kwargs)
[docs] def compute_overlaps(self, *args, **kwargs):
'''
Computes overlap MBRs for all edges
'''
self.apply_func_to_edges('compute_overlap', *args, **kwargs)
[docs] def overlap_checks(self, *args, **kwargs):
'''
Apply overlap check to all edges in the graph
'''
self.apply_func_to_edges('overlap_check', *args, **kwargs)
[docs] def compute_homographies(self, *args, **kwargs):
'''
Compute homographies for all edges using identical parameters
See Also
--------
autocnet.graph.edge.Edge.compute_homography
autocnet.matcher.cpu_outlier_detector.compute_homography
'''
self.apply_func_to_edges('compute_homography', *args, **kwargs)
[docs] def compute_fundamental_matrices(self, *args, **kwargs):
'''
Compute fundmental matrices for all edges using identical parameters
See Also
--------
autocnet.matcher.cpu_outlier_detector.compute_fundamental_matrix
'''
self.apply_func_to_edges('compute_fundamental_matrix', *args, **kwargs)
[docs] def subpixel_register(self, *args, **kwargs):
'''
Compute subpixel offsets for all edges using identical parameters
See Also
--------
autocnet.graph.edge.Edge.subpixel_register
'''
self.apply_func_to_edges('subpixel_register', *args, **kwargs)
[docs] def suppress(self, *args, **kwargs):
'''
Apply a metric of point suppression to the graph
See Also
--------
autocnet.matcher.cpu_outlier_detector.SpatialSuppression
'''
self.apply_func_to_edges('suppress', *args, **kwargs)
[docs] def overlap(self):
'''
Compute the percentage and area coverage of two images
See Also
--------
autocnet.cg.cg.two_image_overlap
'''
self.apply_func_to_edges('overlap')
[docs] def to_filelist(self):
"""
Generate a file list for the entire graph.
Returns
-------
filelist : list
A list where each entry is a string containing the full path to an image in the graph.
"""
filelist = []
for i, node in self.nodes.data('data'):
filelist.append(node['image_path'])
return filelist
[docs] def island_nodes(self):
"""
Finds single nodes that are completely disconnected from the rest of the graph
Returns
-------
: list
A list of disconnected nodes, nodes of degree zero, island nodes, etc.
"""
return nx.isolates(self)
[docs] def connected_subgraphs(self):
"""
Finds and returns a list of each connected subgraph of nodes. Each subgraph is a set.
Returns
-------
: list
A list of connected sub-graphs of nodes, with the largest sub-graph first. Each subgraph is a set.
"""
return sorted(nx.connected_components(self), key=len, reverse=True)
[docs] def serials(self):
"""
Create a dictionary of ISIS3 compliant serial numbers for each
node in the graph.
Returns
-------
serials : dict
with key equal to the node id and value equal to
an ISIS3 compliant serial number or None
"""
serials = {}
for n, node in self.nodes.data('data'):
serials[n] = generate_serial_number(node['image_path'])
return serials
@property
def files(self):
"""
Return a list of all full file PATHs in the CandidateGraph
"""
return [node['image_path'] for _, node in self.nodes(data='data')]
[docs] def save(self, filename):
"""
Save the graph object to disk.
Parameters
----------
filename : str
The relative or absolute PATH where the network is saved
"""
io_network.save(self, filename)
[docs] def plot(self, ax=None, **kwargs): # pragma: no cover
"""
Plot the graph object
Parameters
----------
ax : object
A MatPlotLib axes object.
Returns
-------
: object
A MatPlotLib axes object
"""
return plot_graph(self, ax=ax, **kwargs)
[docs] def plot_cluster(self, ax=None, **kwargs): # pragma: no cover
"""
Plot the graph based on the clusters generated by
the markov clustering algorithm
Parameters
----------
ax : object
A MatPlotLib axes object.
Returns
-------
ax : object
A MatPlotLib axes object.
"""
return cluster_plot(self, ax, **kwargs)
[docs] def create_node_subgraph(self, nodes):
"""
Given a list of nodes, create a sub-graph and
copy both the node and edge attributes to the subgraph.
Changes to node/edge attributes are propagated back to the
parent graph, while changes to the graph structure, i.e.,
the topology, are not.
Parameters
----------
nodes : iterable
An iterable (list, set, ndarray) of nodes to subset
the graph
Returns
-------
H : object
A networkX graph object
"""
return self.subgraph(nodes)
[docs] def create_edge_subgraph(self, edges):
"""
Create a subgraph using a list of edges.
This is pulled directly from the networkx dev branch.
Parameters
----------
edges : list
A list of edges in the form [(a,b), (c,d)] to retain
in the subgraph
Returns
-------
H : object
A networkx subgraph object
"""
return self.edge_subgraph(edges)
[docs] def size(self, weight=None):
"""
This replaces the built-in size method to properly
support Python 3 rounding.
Parameters
----------
weight : string or None, optional (default=None)
The edge attribute that holds the numerical value used
as a weight. If None, then each edge has weight 1.
Returns
-------
nedges : int
The number of edges or sum of edge weights in the graph.
"""
if weight:
return sum(e[weight] for s, d, e in self.edges.data('data'))
else:
return len(self.edges())
[docs] def subgraph_from_matches(self):
"""
Returns a sub-graph where all edges have matches.
(i.e. images with no matches are removed)
Returns
-------
: Object
A networkX graph object
"""
# get all edges that have matches
matches = [(u, v) for u, v, edge in self.edges.data('data')
if not edge.matches.empty]
return self.create_edge_subgraph(matches)
[docs] def filter_nodes(self, func, *args, **kwargs):
"""
Filters graph and returns a sub-graph from matches. Mimics
python's filter() function
Parameters
----------
func : function which returns bool used to filter out nodes
Returns
-------
: Object
A networkX graph object
"""
nodes = [node for i, node in self.nodes.data(
'data') if func(node, *args, **kwargs)]
return self.create_node_subgraph(nodes)
[docs] def filter_edges(self, func, *args, **kwargs):
"""
Filters graph and returns a sub-graph from matches. Mimics
python's filter() function
Parameters
----------
func : function which returns bool used to filter out edges
Returns
-------
: Object
A networkX graph object
"""
edges_to_remove = [(u, v) for u, v, edge in self.edges.data(
'data') if func(edge, *args, **kwargs)]
subgraph = nx.create_empty_copy(self)
subgraph.add_edges_from(edges_to_remove)
return subgraph
[docs] def compute_cliques(self, node_id=None): # pragma: no cover
"""
Computes all maximum complete subgraphs for the given graph.
If a node_id is given, method will return only the complete subgraphs that
contain that node
Parameters
----------
node_id : int
Integer value for a given node
Returns
-------
: list
A list of lists of node ids that make up maximum complete subgraphs of the given graph
"""
if node_id is not None:
return list(nx.cliques_containing_node(self, nodes=node_id))
else:
return list(nx.find_cliques(self))
[docs] def compute_weight(self, clean_keys, **kwargs): # pragma: no cover
"""
Computes a voronoi weight for each edge in a given graph.
Can function as is, but is slightly optimized for complete subgraphs.
----------
kwargs : dict
keyword arguments that get passed to compute_voronoi
clean_keys : list
Strings used to apply masks to omit correspondences
"""
if not self.is_connected():
warnings.warn(
'The given graph is not complete and may yield garbage.')
for s, d, edge in self.edges.data('edge'):
source_node = edge.source
overlap, _ = self.compute_intersection(
source_node, clean_keys=clean_keys)
matches, _ = edge.clean(clean_keys)
kps = edge.get_keypoints(edge.source, index=matches['source_idx'])[
['x', 'y']]
reproj_geom = source_node.reproject_geom(
overlap.geometry.values[0].__geo_interface__['coordinates'][0])
initial_mask = cg.geom_mask(kps, reproj_geom)
if (len(kps[initial_mask]) <= 0):
continue
kps['geometry'] = kps.apply(
lambda x: shapely.geometry.Point(x['x'], x['y']), axis=1)
kps_mask = kps['geometry'][initial_mask].apply(
lambda x: reproj_geom.contains(x))
voronoi_df = cg.compute_voronoi(
kps[initial_mask][kps_mask], reproj_geom, **kwargs)
edge['weights']['voronoi'] = voronoi_df
[docs] def compute_unique_fully_connected_components(self, size=2):
"""
Compute a list of all cliques with size greater than size.
Parameters
----------
size : int
Only cliques larger than size are returned. Default 2.
Returns
-------
: list
of lists of node ids
Examples
--------
>>> G = CandidateGraph()
>>> G.add_edges_from([('A', 'B'), ('A', 'C'), ('B', 'C'), ('B', 'D'), ('A', 'E'), ('A', 'F'), ('E', 'F') ])
>>> res = G.compute_unique_fully_connected_components()
>>> sorted(map(sorted,res))
[['A', 'B', 'C'], ['A', 'E', 'F']]
"""
return [i for i in nx.enumerate_all_cliques(self) if len(i) > size]
[docs] def compute_fully_connected_components(self):
"""
For a given graph, compute all of the fully connected subgraphs with
3+ components.
Returns
-------
fc : list
of lists of node identifiers
Examples
--------
>>> G = CandidateGraph()
>>> G.add_edges_from([('A', 'B'), ('A', 'C'), ('B', 'C'), ('B', 'D'), ('A', 'E'), ('A', 'F'), ('E', 'F') ])
>>> fc = G.compute_fully_connected_components()
>>> len(fc) #A, B, C, E, A - D is omitted because it is a singular terminal node
5
>>> sorted(map(sorted,fc['A'])) # Sort inner and outer lists
[['A', 'B', 'C'], ['A', 'E', 'F']]
"""
fully_connected = self.compute_unique_fully_connected_components()
fc = defaultdict(list)
for i in fully_connected:
for j in i:
fc[j].append(tuple(i))
return fc
[docs] def compute_intersection(self, source, clean_keys=[]):
"""
Computes the intercetion of all images in a graph
based around a given source node
Parameters
----------
source: object or int
Either a networkx Node object or an integer
clean_keys : list
Strings used to apply masks to omit correspondences
Returns
-------
intersect_gdf : dataframe
A geopandas dataframe of intersections for all images
that overlap with the source node. Also includes the common
overlap for all images in the source node.
"""
if type(source) is int:
source = self.node[source]['data']
# May want to use a try except block here, but what error to raise?
source_poly = swkt.loads(
source.geodata.footprint.GetGeometryRef(0).ExportToWkt())
source_gdf = gpd.GeoDataFrame(
{'geometry': [source_poly], 'source_node': [source['node_id']]})
proj_gdf = gpd.GeoDataFrame(columns=['geometry', 'proj_node'])
proj_poly_list = []
proj_node_list = []
# Begin iterating through the edges in the graph that include the source node
for s, d, edge in self.edges.data('data'):
if s == source['node_id']:
proj_poly = swkt.loads(
edge.destination.geodata.footprint.GetGeometryRef(0).ExportToWkt())
proj_poly_list.append(proj_poly)
proj_node_list.append(d)
elif d == source['node_id']:
proj_poly = swkt.loads(
edge.source.geodata.footprint.GetGeometryRef(0).ExportToWkt())
proj_poly_list.append(proj_poly)
proj_node_list.append(s)
proj_gdf = gpd.GeoDataFrame(
{"geometry": proj_poly_list, "proj_node": proj_node_list})
# Overlay all geometry and find the one geometry element that overlaps all of the images
intersect_gdf = gpd.overlay(source_gdf, proj_gdf, how='intersection')
if len(intersect_gdf) == 0:
raise ValueError(
'Node ' + str(source['node_id']) + ' does not overlap with any other images in the candidate graph.')
overlaps_mask = intersect_gdf.geometry.apply(
lambda x: proj_gdf.geometry.contains(shapely.affinity.scale(x, .9, .9)).all())
overlaps_all = intersect_gdf[overlaps_mask]
# If there is no intersection polygon that overlaps all of the images, union all of the intersection
# polygons into one large polygon that does overlap all of the images
if len(overlaps_all) <= 0:
new_poly = shapely.ops.unary_union(intersect_gdf.geometry)
overlaps_all = gpd.GeoDataFrame({'source_node': source['node_id'], 'proj_node': source['node_id'],
'geometry': [new_poly]})
return overlaps_all, intersect_gdf
[docs] def is_complete(self):
"""
Checks if the graph is a complete graph
"""
nneighbors = len(self) - 1
for n in self.nodes:
if self.degree(n) != nneighbors:
return False
return True
def footprints(self):
geoms = []
names = []
for i, node in self.nodes.data('data'):
geoms.append(node.footprint)
names.append(node['image_name'])
return gpd.GeoDataFrame(names, geometry=geoms)
def identify_potential_overlaps(self, **kwargs):
cc = control.identify_potential_overlaps(
self, self.controlnetwork, **kwargs)
return cc
def nodes_iter(self, data=False):
for i, n in self.nodes.data('data'):
if data:
yield i, n
else:
yield i
def edges_iter(self, data=False):
for s, d, e in self.edges.data('data'):
if data:
yield s, d, e
else:
yield s, d
[docs] def generate_control_network(self, clean_keys=[], mask=None):
"""
Generates a fresh control network from edge matches.
parameters
----------
clean_keys : list
A list of clean keys, same that would be used to filter edges
mask
"""
def add_measure(lis, key, edge, match_idx, fields, point_id=None):
"""
Create a new measure that is coincident to a given point. This method does not
create the point if is missing. When a measure is added to the graph, an associated
row is added to the measures dataframe.
Parameters
----------
key : hashable
Some hashable id. In the case of an autocnet graph object the
id should be in the form (image_id, match_id)
point_id : hashable
The point to link the node to. This is most likely an integer, but
any hashable should work.
"""
if key in self.measure_to_point.keys():
return
if point_id == None:
point_id = self._point_id
self.measure_to_point[key] = point_id
# The node_id is a composite key (image_id, correspondence_id), so just grab the image
image_id = int(key[0])
match_id = int(key[1])
lis.append([point_id, image_id, match_id, edge, int(match_idx), *fields, 0, 0, np.inf])
self._measure_id += 1
# TODO: get rid of these wack variables
self.measure_to_point = {}
self._measure_id = 0
self.point_id = 0
matches = self.get_matches(clean_keys)
cnet_lis = []
for match in matches:
for row in match.to_records():
edge = (row.source_image, row.destination_image)
source_key = (row.source_image, row.destination_image, row.source_idx)
source_fields = [row.source_x, row.source_y]
destin_key = (row.destination_image, row.source_image, row.destination_idx)
destin_fields = [row.destination_x, row.destination_y]
if self.measure_to_point.get(source_key, None) is not None:
tempid = self.measure_to_point[source_key]
add_measure(cnet_lis, destin_key, edge, row[0],
destin_fields, point_id=tempid)
elif self.measure_to_point.get(destin_key, None) is not None:
tempid = self.measure_to_point[destin_key]
add_measure(cnet_lis, source_key, edge, row[0],
source_fields, point_id=tempid)
else:
add_measure(cnet_lis, source_key, edge, row[0], source_fields)
add_measure(cnet_lis, destin_key, edge, row[0], destin_fields)
self._point_id += 1
self.controlnetwork = pd.DataFrame(cnet_lis, columns=self.measures_keys)
self.controlnetwork.index.name = 'measure_id'
def remove_measure(self, idx):
self.controlnetwork = self.controlnetwork.drop(
self.controlnetwork.index[idx])
for r in idx:
self.measure_to_point.pop(r, None)
[docs] def validate_points(self):
"""
Ensure that all control points currently in the nework are valid.
Criteria for validity:
* Singularity: A control point can have one and only one measure from any image
Returns
-------
: pd.Series
"""
def func(g):
# One and only one measure constraint
if g.image_index.duplicated().any():
return True
else: return False
return self.controlnetwork.groupby('point_id').apply(func)
[docs] def clean_singles(self):
"""
Take the `controlnetwork` dataframe and return only those points with
at least two measures. This is automatically called before writing
as functions such as subpixel matching can result in orphaned measures.
"""
return self.controlnetwork.groupby('point_id').apply(lambda g: g if len(g) > 1 else None)
[docs] def to_isis(self, outname, flistpath=None, target="Mars"): # pragma: no cover
"""
Write the control network out to the ISIS3 control network format.
"""
df = self.controlnetwork
serials = [generate_serial_number(self.nodes[id_]["data"]["image_path"]) for id_ in df["image_index"]]
#create columns in the dataframe; zeros ensure plio (/protobuf) will
#ignore unless populated with alternate values
df['aprioriX'] = 0
df['aprioriY'] = 0
df['aprioriZ'] = 0
df['adjustedX'] = 0
df['adjustedY'] = 0
df['adjustedZ'] = 0
df['type'] = 3
df['measureType'] = 2
df["serialnumber"] = serials
#only populate the new columns for ground points. Otherwise, isis will
#recalculate the control point lat/lon from control measures which where
#"massaged" by the phase and template matcher.
for i, group in df.groupby('point_id'):
zero_group = group.iloc[0]
apriori_geom = np.array(point_info(self.nodes[zero_group.image_index]['data'].geodata.file_name, zero_group.x, zero_group.y, 'image')['BodyFixedCoordinate'].value) * 1000
for j, row in group.iterrows():
row['aprioriX'] = apriori_geom[0]
row['aprioriY'] = apriori_geom[1]
row['aprioriZ'] = apriori_geom[2]
df.iloc[row.name] = row
if flistpath is None:
flistpath = os.path.splitext(outname)[0] + '.lis'
df = df.rename(columns={'image_index':'image_id','point_id':'id', 'type' : 'pointType',
'x':'sample', 'y':'line'})
cnet.to_isis(df, outname, targetname=target)
cnet.write_filelist(self.files, path=flistpath)
[docs]class NetworkCandidateGraph(CandidateGraph):
node_factory = NetworkNode
edge_factory = NetworkEdge
def __init__(self, *args, **kwargs):
super(NetworkCandidateGraph, self).__init__(*args, **kwargs)
# Job metadata
self.job_status = defaultdict(dict)
# Set the parents of the nodes/edges and populate the database
# if unpopulated.
for i, d in self.nodes(data='data'):
d.parent = self
for s, d, e in self.edges(data='data'):
e.parent = self
self.apply_iterable_options = {
'edge' : self.edges,
'edges' : self.edges,
'e' : self.edges,
0 : self.edges,
'node' : self.nodes,
'nodes' : self.nodes,
'n' : self.nodes,
1 : self.nodes,
'measures' : Measures,
'measure' : Measures,
'm' : Measures,
2 : Measures,
'points' : Points,
'point' : Points,
'p' : Points,
3 : Points,
'overlaps': Overlay,
'overlap' : Overlay,
'o' :Overlay,
4: Overlay,
'image': Images,
'images': Images,
'i': Images,
5: Images,
'candidategroundpoints' : CandidateGroundPoints,
'candidategroundpoint' : CandidateGroundPoints,
6: CandidateGroundPoints
}
[docs] def config_from_file(self, filepath, async_watchers=False):
"""
A NetworkCandidateGraph uses a database. This method parses a config
file to set up the connection. Additionally, this loads planetary
information and settings for other operations the candidate graph
can perform.
Parameters
----------
filepath : str
The path to the config file
async_watchers : bool
If True the ncg will also spawn redis queue watching threads
that manage asynchronous database inserts. This is primarily
used for increased write performance.
"""
# The YAML library will raise any parse errors
self.config_from_dict(parse_config(filepath), async_watchers=async_watchers)
[docs] def config_from_dict(self, config_dict, async_watchers=False):
"""
A NetworkCandidateGraph uses a database. This method loads a config
dict to set up the connection. Additionally, this loads planetary
information and settings for other operations the candidate graph
can perform.
Parameters
----------
filepath : str
The path to the config file
async_watchers : bool
If True the ncg will also spawn redis queue watching threads
that manage asynchronous database inserts. This is primarily
used for increased write performance.
"""
self.config = config_dict
self.async_watchers = async_watchers
# Setup REDIS
self._setup_queues()
# Setup the database
self._setup_database()
# Setup threaded queue watchers
if self.async_watchers == True:
self._setup_asynchronous_workers()
# Setup the DEM
# I dislike having the DEM on the NCG, but in the short term it
# is the best solution I think. I don't want to pass the DEM around
# for the sensor calls.
self._setup_dem()
[docs] @contextmanager
def session_scope(self):
"""
Provide a transactional scope around a series of operations.
"""
session = self.Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
def _setup_dem(self):
spatial = self.config['spatial']
semi_major = spatial.get('semimajor_rad')
semi_minor = spatial.get('semiminor_rad')
dem_type = spatial.get('dem_type')
dem = spatial.get('dem', False)
if dem:
self.dem = GdalDem(dem, semi_major, semi_minor, dem_type)
else:
self.dem = EllipsoidDem(semi_major, semi_minor)
@property
def Session(self):
return self._Session
@Session.setter
def Session(self, Session):
self._Session = Session
def _setup_database(self):
db = self.config['database']
self.Session, self.engine = new_connection(self.config['database'])
# Attempt to create the database (if it does not exist)
try_db_creation(self.engine, self.config)
def _setup_edges(self):
with self.session_scope() as session:
res = session.query(Edges).all()
edges = []
for e in res:
s = e.source
d = e.destination
if s > d:
s,d = d,s
edges.append((s,d))
to_add = []
for e in self.edges:
s = e[0]
d = e[1]
if s > d:
s,d = d,s
edgeid = (s,d)
if edgeid not in edges:
to_add.append(Edges(source=edgeid[0],
destination=edgeid[1],
weights=json.dumps({})))
session.add_all(to_add)
session.commit()
def _setup_queues(self):
"""
Setup a 2 queue redis connection for pushing and pulling work/results
"""
conf = self.config['redis']
self.redis_queue = StrictRedis(host=conf['host'],
port=conf['port'],
db=0)
self.processing_queue = conf['basename'] + ':processing'
self.completed_queue = conf['basename'] + ':completed'
self.working_queue = conf['basename'] + ':working'
self.point_insert_queue = conf['basename'] + ':point_insert_queue'
self.point_insert_counter = conf['basename'] + ':point_insert_counter'
self.measure_update_queue = conf['basename'] + ':measure_update_queue'
self.measure_update_counter = conf['basename'] + ':measure_update_counter'
self.queue_names = [self.processing_queue, self.completed_queue, self.working_queue,
self.point_insert_queue, self.point_insert_counter,
self.measure_update_queue, self.measure_update_counter]
def _setup_asynchronous_workers(self):
# Default the counters to zero, unless they are already set from a run
# where the NCG did not exit cleanly
if self.redis_queue.get(self.point_insert_counter) is None:
self.redis_queue.set(self.point_insert_counter, 0)
if self.redis_queue.get(self.measure_update_counter) is None:
self.redis_queue.set(self.measure_update_counter, 0)
# Start the insert watching thread
self.point_inserter_stop_event = threading.Event()
self.point_inserter = threading.Thread(target=watch_insert_queue,
args=(self.redis_queue,
self.point_insert_queue,
self.point_insert_counter,
self.engine,
self.point_inserter_stop_event))
self.point_inserter.setDaemon(True)
self.point_inserter.start()
# Start the update watching thread
self.measure_updater_stop_event = threading.Event()
self.measure_updater = threading.Thread(target=watch_update_queue,
args=(self.redis_queue,
self.measure_update_queue,
self.measure_update_counter,
self.engine,
self.measure_updater_stop_event))
self.measure_updater.setDaemon(True)
self.measure_updater.start()
[docs] def clear_queues(self):
"""
Delete all messages from the redis queue. This a convenience method.
The `redis_queue` object is a redis-py StrictRedis object with API
documented at: https://redis-py.readthedocs.io/en/latest/#redis.StrictRedis
This also needs to restart any threaded watchers of the queues.
"""
if self.async_watchers:
self.point_inserter_stop_event.set()
self.measure_updater_stop_event.set()
for q in self.queue_names:
self.redis_queue.delete(q)
self._setup_queues()
if self.async_watchers:
self._setup_asynchronous_workers()
def _execute_sql(self, sql):
"""
Execute a raw SQL string in the database currently specified
by the AutoCNet config file.
Use this method with caution as you can easily do things like
truncate a table.
Parameters
----------
sql : str
The SQL string to be passed to the DB engine and executed.
"""
conn = self.engine.connect()
conn.execute(sql)
conn.close()
def _push_obj_messages(self, onobj, function, walltime, args, kwargs):
"""
Push messages to the redis queue for objects e.g., Nodes and Edges
"""
for job_counter, elem in enumerate(onobj.data('data')):
if getattr(elem[-1], 'ignore', False):
continue
# Determine if we are working with an edge or a node
if len(elem) > 2:
id = (elem[2].source['node_id'],
elem[2].destination['node_id'])
image_path = (elem[2].source['image_path'],
elem[2].destination['image_path'])
along = 'edge'
else:
id = (elem[0])
image_path = elem[1]['image_path']
along = 'node'
msg = {'id':id,
'along':along,
'func':function,
'args':args,
'kwargs':kwargs,
'walltime':walltime,
'image_path':image_path,
'param_step':1,
'config':self.config}
self.redis_queue.rpush(self.processing_queue, json.dumps(msg, cls=JsonEncoder))
return job_counter + 1
def _push_row_messages(self, query_obj, on, function, walltime, filters, query_string, args, kwargs):
"""
Push messages to the redis queue for DB objects e.g., Points, Measures
"""
if filters and query_string:
warnings.warn('Use of filters and query_string are mutually exclusive.')
with self.session_scope() as session:
# Support either an SQL query string, or a simple dict based query
if query_string:
res = session.execute(query_string).fetchall()
else:
query = session.query(query_obj)
# Now apply any filters that might be passed in.
for attr, value in filters.items():
query = query.filter(getattr(query_obj, attr)==value)
# Execute the query to get the rows to be processed
res = query.order_by(query_obj.id).all()
if len(res) == 0:
raise ValueError('Query returned zero results.')
for row in res:
msg = {'along':on,
'id':row.id,
'func':function,
'args':args,
'kwargs':kwargs,
'walltime':walltime}
msg['config'] = self.config # Hacky for now, just passs the whole config dict
self.redis_queue.rpush(self.processing_queue,
json.dumps(msg, cls=JsonEncoder))
assert len(res) == self.queue_length
return len(res)
def _push_iterable_message(self, iterable, function, walltime, args, kwargs):
for job_counter, item in enumerate(iterable):
msg = {'along':item,
'func':function,
'args':args,
'kwargs':kwargs,
'walltime':walltime}
msg['config'] = self.config
self.redis_queue.rpush(self.processing_queue,
json.dumps(msg, cls=JsonEncoder))
return job_counter + 1
[docs] def apply(self,
function,
on='edge',
args=(),
walltime='01:00:00',
chunksize=1000,
arraychunk=25,
filters={},
query_string='',
reapply=False,
log_dir=None,
queue=None,
redis_queue='processing_queue',
exclude=None,
**kwargs):
"""
A mirror of the apply function from the standard CandidateGraph object. This implementation
dispatches the job to the cluster as an independent operation instead of applying an arbitrary function
locally.
This methods returns the number of jobs submitted. The job status is then asynchronously
updated as the jobs complete.
Parameters
----------
function : string / obj
The function to apply. This can be either the full, importable path from
this library or an arbitrary function that will be serialized. If the arbitrary
function requires imports external to this library, those imports must be made
within the function scope.
on : str
{'edge', 'edges', 'e', 0} for an edge
{'node', 'nodes', 'n' 1} for a node
{'measures', 'measure', 'm', '2'} for measures
{'points', 'point', 'p', '3'} for points
args : tuple
Of additional arguments to pass to the apply function
walltime : str
in the format Hour:Minute:Second, 00:00:00
chunksize : int
The maximum number of jobs to submit per job array. Defaults to 1000.
This number may be have an actualy higher or lower limited based on
how the cluster has been configured.
arraychunk : int
The number of concurrent jobs to run per job array. e.g. chunksize=100 and
arraychunk=25 gives the job array 1-100%25
filters : dict
Of simple filters to apply on database rows where the key is the attribute and
the value used to check equivalency (e.g., attribute == value).
This is usable only when applying to measures, points, or overlays.
Filters can not be used with a query_string. Filters are included as a convenience
and are really only usable for simple equivalency checks.
query_string : str
A SQL query to be applied to the iterable.
This is usable only when applying to measures, points, or overlays.
The query_string can not be used with a filter and is appropriate for
any queries.
reapply : bool
Flag indicating whether you want to resubmit jobs that are still on the queue
after an initial apply due to an slurm launching errors.
log_dir: str
absolute path of directory used to store the jobs logs, defaults to location
indicated in the configuration file.
kwargs : dict
Of keyword arguments passed to the function being applied
queue : str
The cluster processing queue to submit jobs to. If None (default),
use the cluster processing queue from the config file.
redis_queue : str
The redis queue to push messages to that are then pulled by the
cluster job this call launches. Options are: 'processing_queue' (default)
or 'working_queue'
Returns
-------
job_str : str
The string job that is submitted to the job scheduler
Examples
--------
Apply a function to the overlay table omitting those overlay rows that already have
points within them and have an area less than a given threshold.
>>> query_string = 'SELECT overlay.id FROM overlay LEFT JOIN\
points ON ST_INTERSECTS(overlay.geom, points.geom) WHERE\
points.id IS NULL AND ST_AREA(overlay.geom) >= 0.0001;'
>>> njobs = ncg.apply('spatial.overlap.place_points_in_overlap', on='overlaps', query_string=query_string)
Apply a function to the overlay table and pass keyword arguments (kwargs) to the function.
>>> def ns(x):
from math import ceil
return ceil(round(x,1)*8)
>>> def ew(x):
from math import ceil
return ceil(round(x,1)*2)
>>> distribute_points_kwargs = {'nspts_func':ns, 'ewpts_func':ew, 'method':'classic'}
>>> njobs = ncg.apply('spatial.overlap.place_points_in_overlap',\
on='overlaps', distribute_points_kwargs=distribute_points_kwargs)
"""
if log_dir is None:
log_dir=self.config['cluster']['cluster_log_dir']
job_counter = self.queue_length
# TODO: reapply uses the queue name and reapplies on that queue.
if not reapply:
# Determine which obj will be called
if isinstance(on, str):
onobj = self.apply_iterable_options[on]
elif isinstance(on, (list, np.ndarray)):
onobj = on
# This method support arbitrary functions. The name needs to be a string for the log name.
if not isinstance(function, (str, bytes)):
function_name = function.__name__
else:
function_name = function
# Dispatch to either the database object message generator or the autocnet object message generator
if isinstance(onobj, DeclarativeMeta):
job_counter = self._push_row_messages(onobj, on, function, walltime, filters, query_string, args, kwargs)
elif isinstance(onobj, (list, np.ndarray)):
job_counter = self._push_iterable_message(onobj, function, walltime, args, kwargs)
elif isinstance(onobj, (nx.classes.reportviews.EdgeView, nx.classes.reportviews.NodeView)):
job_counter = self._push_obj_messages(onobj, function, walltime, args, kwargs)
else:
raise TypeError('The type of the `on` argument is not understood. Must be a database model, iterable, Node or Edge.')
# Submit the jobs
rconf = self.config['redis']
rhost = rconf['host']
rport = rconf['port']
try:
processing_queue = getattr(self, redis_queue)
except AttributeError:
print(f'Unable to find attribute {redis_queue} on this object. Valid queue names are: "processing_queue" and "working_queue".')
env = self.config['env']
condaenv = env['conda']
isisroot = env['ISISROOT']
isisdata = env['ISISDATA']
isissetup = f'export ISISROOT={isisroot} && export ISISDATA={isisdata}'
condasetup = f'conda activate {condaenv}'
job = f'acn_submit -r={rhost} -p={rport} {processing_queue} {self.working_queue}'
command = f'{condasetup} && {isissetup} && {job}'
if queue == None:
queue = self.config['cluster']['queue']
submitter = Slurm(command,
job_name='AutoCNet',
mem_per_cpu=self.config['cluster']['processing_memory'],
time=walltime,
partition=queue,
output=log_dir+f'/autocnet.{function}-%j')
job_str = submitter.submit(array='1-{}%{}'.format(job_counter,arraychunk),
chunksize=chunksize,
exclude=exclude)
return job_str
[docs] def generic_callback(self, msg):
"""
This method manages the responses from the jobs and updates
the status on this object. The msg is in a standard, parseable
format.
"""
id = msg['id']
if isinstance(id, (int, float, str)):
# Working with a node
obj = self.nodes[id]['data']
else:
obj = self.edges[id]['data']
# Working with an edge
func = msg['func']
obj.job_status[func]['success'] = msg['success']
# If the job was successful, no need to resubmit
if msg['success'] == True:
return
[docs] def to_isis(self,
path,
flistpath=None,
latsigma=10,
lonsigma=10,
radsigma=15,
**db_kwargs):
"""
Write a NetworkCandidateGraph to an ISIS control network
Parameters
----------
path : str
Outpath to write the control network
flishpath : str
Outpath to write the associated file list. If None (default),
the file list is written alongside the control network
latsigma : int/float
The estimated sigma (error) in the latitude direction
lonsigma : int/float
The estimated sigma (error) in the longitude direction
radsigma : int/float
The estimated sigma (error) in the radius direction
radius : int/float
The body semimajor radius
db_kwargs : dict
Kwargs that are passed to the io.db.controlnetwork.db_to_df function
Returns
-------
df : pd.DataFrame
The pandas dataframe that is passed to plio to generate the control network.
"""
# Read the cnet from the db
df = io_controlnetwork.db_to_df(self.engine, **db_kwargs)
# Add the covariance matrices to ground measures
df = control.compute_covariance(df,
latsigma,
lonsigma,
radsigma,
self.config['spatial']['semimajor_rad'])
print("df shape: ", df.shape)
if flistpath is None:
flistpath = os.path.splitext(path)[0] + '.lis'
target = self.config['spatial'].get('target', None)
ids = df['imageid'].unique()
fpaths = [self.nodes[i]['data']['image_path'] for i in ids]
for f in self.files:
if f not in fpaths:
warnings.warn(f'{f} in candidate graph but not in output network.')
# Remap the df columns back to ISIS
df.rename(columns={'pointtype':'pointType',
'measuretype':'measureType'},
inplace=True)
cnet.to_isis(df, path, targetname=target)
cnet.write_filelist(fpaths, path=flistpath)
# Even though this method writes, having a non-None return
# let's a user work with the data that is passed to plio
return df
[docs] def update_from_jigsaw(self, path, pointid_func=lambda x: int(x.split('_')[-1])):
"""
Updates the measures table in the database with data from
a jigsaw bundle adjust
Parameters
----------
path : str
Full path to a bundle adjusted isis control network
pointid_func : callable
A function that is used to convert from the id in the ISIS network
back into the pointid that autocnet uses as the primary key. The
default takes a string, splits it on underscores and takes the final element(s).
For example, autocnet_14 becomes 14.
"""
isis_network = cnet.from_isis(path)
io_controlnetwork.update_from_jigsaw(isis_network,
ncg.measures,
ncg.connection,
pointid_func=pointid_func)
[docs] @classmethod
def from_filelist(cls, filelist, config, clear_db=False):
"""
Parse a filelist to add nodes to the database. Using the
information in the database, then instantiate a complete,
NCG.
Parameters
----------
filelist : list, str
If a list, this is a list of paths. If a str, this is
a path to a file containing a list of image paths
that is newline ("\\n") delimited.
config : dict, str
configuration information; either a path to a yaml
file or a dictionary.
clear_db : boolean
truncates all tables in the active database.
Returns
-------
ncg : object
A network candidate graph object
See Also:
--------
config_from_dict: config documentation
"""
obj = cls()
if isinstance(config, str):
config = parse_config(config)
obj.config_from_dict(config)
if clear_db:
obj.clear_db()
obj.add_from_filelist(filelist, clear_db=clear_db)
return obj
[docs] def add_from_filelist(self, filelist, clear_db=False):
"""
Parse a filelist to add nodes to the database.
Parameters
----------
filelist : list, str
If a list, this is a list of paths. If a str, this is
a path to a file containing a list of image paths
that is newline ("\\n") delimited.
clear_db : boolean
truncates all tables in the active database.
"""
if isinstance(filelist, list):
pass
elif os.path.exists(filelist):
filelist = io_utils.file_to_list(filelist)
else:
warnings.warn('Unable to parse the passed filelist')
if clear_db:
self.clear_db()
total=len(filelist)
for cnt, f in enumerate(filelist):
# Create the nodes in the graph. Really, this is creating the
# images in the DB
print('loading {} of {}'.format(cnt+1, total))
self.add_image(f)
self.from_database()
# Execute the computation to compute overlapping geometries
self._execute_sql(compute_overlaps_sql)
[docs] def add_image(self, img_path):
"""
Upload a single image to NetworkCandidateGraph associated DB.
Parameters
----------
img_path : str
absolute path to image
Returns
-------
node.id : int
The id of the newly added node.
"""
image_name = os.path.basename(img_path)
node = NetworkNode(image_path=img_path, image_name=image_name)
node.parent = self
node.populate_db()
return node['node_id']
[docs] def copy_images(self, newdir):
"""
Copy images from a given directory into a new directory and
update the 'path' column in the Images table.
Parameters
----------
newdir : str
The full output PATH where the images are to be copied to.
"""
if not os.path.exists(newdir):
os.makedirs(newdir)
with self.session_scope() as session:
images = session.query(Images).all()
for obj in images:
oldpath = obj.path
filename = os.path.basename(oldpath)
obj.path = os.path.join(newdir, filename)
if oldpath != obj.path:
# Copy the files
copyfile(oldpath, obj.path)
session.commit()
else:
continue
[docs] def add_from_remote_database(self, source_db_config, path, query_string='SELECT * FROM public.images LIMIT 10'):
"""
This is a constructor that takes an existing database containing images and sensors,
copies the selected rows into the project specified in the autocnet_config variable,
and instantiates a new NetworkCandidateGraph object. This method is
similar to the `from_database` method. The main difference is that this
method assumes that the image and sensor rows are prepopulated in an external db
and simply copies those entires into the currently speficied project.
Currently, this method does NOT check for duplicate serial numbers during the
bulk add. Therefore multiple runs of this method on the same database will fail.
Parameters
----------
source_db_config : dict
In the form: {'username':'somename',
'password':'somepassword',
'host':'somehost',
'pgbouncer_port':6543,
'name':'somename'}
path : str
The PATH to which images in the database specified in the config
will be copied to. This method duplicates the data and copies it
to a user defined PATH to avoid issues with updating image ephemeris
across projects.
query_string : str
An optional string to select a subset of the images in the
database specified in the config.
Example
-------
>>> ncg = NetworkCandidateGraph()
>>> ncg.config_from_dict(new_config)
>>> source_db_config = {'username':'jay',
'password':'abcde',
'host':'autocnet.wr.usgs.gov',
'pgbouncer_port':5432,
'name':'mars'}
>>> geom = 'LINESTRING(145 10, 145 10.25, 145.25 10.25, 145.25 10, 145 10)'
>>> srid = 949900
>>> outpath = '/scratch/jlaura/fromdb'
>>> query = f"SELECT * FROM ctx WHERE ST_INTERSECTS(geom, ST_Polygon(ST_GeomFromText('{geom}'), {srid})) = TRUE"
>>> ncg.add_from_remote_database(source_db_config, outpath, query_string=query)
"""
sourceSession, _ = new_connection(source_db_config)
sourcesession = sourceSession()
sourceimages = sourcesession.execute(query_string).fetchall()
# Change for SQLAlchemy >= 1.4, results are now row objects
sourceimages = [sourceimage._asdict() for sourceimage in sourceimages]
with self.session_scope() as destinationsession:
destinationsession.execute(Images.__table__.insert(), sourceimages)
# Get the camera objects to manually join. Keeps the caller from
# having to remember to bring cameras as well.
#ids = [i[0] for i in sourceimages]
#cameras = sourcesession.query(Cameras).filter(Cameras.image_id.in_(ids)).all()
#for c in cameras:
# destinationsession.merge(c)
sourcesession.close()
# Create the graph, copy the images, and compute the overlaps
self.copy_images(path)
self.from_database()
self._execute_sql(compute_overlaps_sql)
[docs] def from_database(self, query_string='SELECT * FROM public.images'):
"""
This is a constructor that takes the results from an arbitrary query string,
uses those as a subquery into a standard polygon overlap query and
returns a NetworkCandidateGraph object. By default, an images
in the Image table will be used in the outer query.
Parameters
----------
query_string : str
A valid SQL select statement that targets the Images table
Usage
-----
Here, we provide usage examples for a few, potentially common use cases.
## Spatial Query
This example selects those images that intersect a given bounding polygon. The polygon is
specified as a Well Known Text LINESTRING with the first and last points being the same.
The query says, select the geom (the bounding polygons in the database) that
intersect the user provided polygon (the LINESTRING) in the given spatial reference system
(SRID), 949900.
SELECT * FROM Images WHERE ST_INTERSECTS(geom, ST_Polygon(ST_GeomFromText('LINESTRING(159 10, 159 11, 160 11, 160 10, 159 10)'),949900)) = TRUE
## Select from a specific orbit
This example selects those images that are from a particular orbit. In this case,
the regex string pulls all P##_* orbits and creates a graph from them. This method
does not guarantee that the graph is fully connected.
SELECT * FROM Images WHERE (split_part(path, '/', 6) ~ 'P[0-9]+_.+') = True
"""
composite_query = '''WITH i as ({}) SELECT i1.id
as i1_id,i1.path as i1_path, i2.id as i2_id, i2.path as i2_path
FROM i as i1, i as i2
WHERE ST_INTERSECTS(i1.geom, i2.geom) = TRUE
AND i1.id < i2.id'''.format(query_string)
with self.session_scope() as session:
res = session.execute(composite_query)
adjacency = defaultdict(list)
adjacency_lookup = {}
for r in res:
sid, spath, did, dpath = r
adjacency_lookup[spath] = sid
adjacency_lookup[dpath] = did
if spath != dpath:
adjacency[spath].append(dpath)
# Add nodes that do not overlap any images
self.__init__(adjacency, node_id_map=adjacency_lookup)
# Setup the edges
self._setup_edges()
[docs] def clear_db(self, tables=None):
"""
Truncate all of the database tables and reset any
autoincrement columns to start with 1.
Parameters
----------
table : str or list of str, optional
the table name of a list of table names to truncate
"""
with self.session_scope() as session:
if tables:
if isinstance(tables, str):
tables = [tables]
else:
tables = self.engine.table_names()
for t in tables:
if t != 'spatial_ref_sys':
try:
session.execute(f'TRUNCATE TABLE {t} CASCADE')
except Exception as e:
raise RuntimeError(f'Failed to truncate table {t}, {t} not modified').with_traceback(e.__traceback__)
try:
session.execute(f'ALTER SEQUENCE {t}_id_seq RESTART WITH 1')
except Exception as e:
warnings.warn(f'Failed to reset primary id sequence for table {t}')
def place_points_from_cnet(self, cnet):
semi_major, semi_minor = self.config["spatial"]["semimajor_rad"], self.config["spatial"]["semiminor_rad"]
if isinstance(cnet, str):
cnet = from_isis(cnet)
cnetpoints = cnet.groupby('id')
session = self.Session()
for id, cnetpoint in cnetpoints:
def get_measures(row):
res = session.query(Images).filter(Images.serial == row.serialnumber).one()
return Measures(pointid=id,
imageid=int(res.id), # Need to grab this
measuretype=int(row.measureType),
serial=row.serialnumber,
sample=float(row['sample']),
line=float(row['line']),
sampler=float(row.sampleResidual),
liner=float(row.lineResidual),
ignore=row.measureIgnore,
jigreject=row.measureJigsawRejected,
aprioriline=float(row.aprioriline),
apriorisample=float(row.apriorisample),
linesigma=float(row.linesigma),
samplesigma=float(row.samplesigma))
measures = cnetpoint.apply(get_measures, axis=1)
row = cnetpoint.iloc[0]
x,y,z= row.adjustedX, row.adjustedY, row.adjustedZ
lon_og, lat_og, alt = reproject([x, y, z], semi_major, semi_minor, 'geocent', 'latlon')
lon, lat = og2oc(lon_og, lat_og, semi_major, semi_minor)
point = Points(identifier=id,
ignore=row.pointIgnore,
apriori= shapely.geometry.Point(float(row.aprioriX), float(row.aprioriY), float(row.aprioriZ)),
adjusted= shapely.geometry.Point(float(row.adjustedX),float(row.adjustedY),float(row.adjustedZ)),
pointtype=float(row.pointType))
point.measures = list(measures)
session.add(point)
session.commit()
session.close()
[docs] @classmethod
def from_cnet(cls, cnet, filelist, config):
"""
Instantiates and populates a NetworkCandidateGraph from an
ISIS control network and corresponding cube list.
Parameters
----------
cnet: str
path to control network file from which you want to populate
the NetworkCandidateGraph.
filelist: str
path to file containing list of cubes associated with
the control network file.
config : dict, str
configuration information; either a path to a yaml
file or a dictionary.
Returns
-------
obj: NetworkCandidateGraph
The NetworkCandidateGraph populated with the points and measures
from the control network and the images from the filelist.
See Also:
--------
config_from_dict: config documentation
"""
obj = cls.from_filelist(filelist, config)
obj.place_points_from_cnet(cnet)
return obj
@property
def measures(self):
df = pd.read_sql_table('measures', con=self.engine)
return df
@property
def queue_length(self):
"""
Returns the length of the processing queue.
Jobs are left on the queue if a cluster job is cancelled. Those cancelled
jobs are then called on next cluster job launch, causing failures. This
method provides a check for left over jobs.
"""
llen = self.redis_queue.llen(self.processing_queue)
return llen
@property
def union(self):
"""
The boundary formed by unioning (or merging) all of the input footprints. The result
will likely be a multipolygon, likely with holes where data were not collected.
Returns
"""
if not hasattr(self, '_union'):
with self.session_scope() as session:
self._union = Images.union(session)
return self._union
[docs] def overlays(self, size_threshold=0):
"""
Return the overlays in a database
Parameters
----------
size_threshold: float
Minimum area requirment for returned overlaps. Units are
determined by spatial reference system.
Returns
-------
overlays: list of Overlay objects
Model information associated with overlaps that contain one or more valid points
See Also
--------
autocnet.io.db.model.Overlay: for description of information associated with Overlay class
"""
with self.session_scope() as session:
q = session.query(Overlay).filter(func.ST_Area(Overlay.geom)>=size_threshold)
overlays = q.all()
session.expunge_all()
return overlays
[docs] def empty_overlays(self, filters={'ignore': False}, size_threshold=0):
"""
Find overlaps that do not contain valid points. By default, valid points
include not ignored points, but additional point properties can be used to
further define a valid point. For example, to look at not ignored, free
(not ground) points; filters = {'ignored': False, 'pointtype': 2}.
Parameters
----------
filters: dict
Points object properties for point filtering.
size_threshold: float
Minimum area requirment for returned overlaps. Units are
determined by spatial reference system.
Returns
-------
overlays: list of Overlay objects
Model information associated with overlaps that contain no valid points
See Also
--------
autocnet.io.db.model.Overlay: for description of information associated with Overlay class
autocnet.io.db.model.Points: for description of information associated with Points class
"""
with self.session_scope() as session:
# Find overlap ids that contain one or more valid points
sq = session.query(Overlay.id).join(Points, func.ST_Contains(Overlay.geom, Points.geom))
for attr, value in filters.items():
sq = sq.filter(getattr(Points, attr)==value)
sq = sq.group_by(Overlay.id)
# find overlap information not satisfying previous query
q = session.query(Overlay).filter(Overlay.id.notin_(sq)).filter(func.ST_Area(Overlay.geom)>=size_threshold)
overlays = q.all()
session.expunge_all()
return overlays
[docs] def overlay_connection(self, oid):
"""
Evaluate the connection status of an overlay. An overlap can be empty (no points),
fully connected (all images are connected by points), or partially connected. The
first two status return empty lists while partially connected overlaps will return
a list of image pairs that are missing point connections.
Parameters
----------
overlay: int
Database id of overlay of interest.
Returns
-------
missing_edges: list of tuples
tuples correspond to image ids that comprise an overlap
but are not connected by a point.
"""
graph = nx.Graph()
with self.session_scope() as session:
# create graph nodes
overlap = session.query(Overlay).filter(Overlay.id==oid).first()
ointersections = overlap.intersections
for ii in ointersections:
graph.add_node(ii)
# find measures in relevant overlap and images
geom = geoalchemy2.shape.from_shape(overlap.geom, srid=self.config['spatial']['latitudinal_srid'])
q = session.query(Measures).join(Points, Measures.pointid==Points.id).\
filter(func.ST_Contains(geom, Points.geom)).\
filter(Measures.imageid.in_(ointersections))
df = pd.read_sql(q.statement, session.bind)
# TO DO: RETURN ALL EDGES
if len(df) == 0:
print(f'Overlap {oid} is empty')
return []
# create graph edges
for pid, g in df.groupby('pointid'):
edge_pool = np.sort([row['imageid'] for i, row in g.iterrows()])
graph.add_edges_from(list(combinations(edge_pool, 2)))
# evaluate connectivity of overlap
fully_connected_number_of_edges = scipy.special.comb(graph.number_of_nodes(),2)
all_edges = list(combinations(graph.nodes, 2))
if graph.number_of_edges() == fully_connected_number_of_edges:
print(f'Overlap {oid} is fully connected')
return []
# return missing image id pairs
return [e for e in all_edges if e not in graph.edges]
def cluster_propagate_control_network(self,
base_cnet,
walltime='00:20:00',
chunksize=1000,
exclude=None):
warnings.warn('This function is not well tested. No tests currently exists \
in the test suite for this version of the function.')
# Setup the redis queue
rqueue = StrictRedis(host=config['redis']['host'],
port=config['redis']['port'],
db=0)
# Push the job messages onto the queue
queuename = config['redis']['processing_queue']
groups = base_cnet.groupby('pointid').groups
for cpoint, indices in groups.items():
measures = base_cnet.loc[indices]
measure = measures.iloc[0]
p = measure.point
# get image in the destination that overlap
lon, lat = measures["point"].iloc[0].xy
msg = {'lon' : lon[0],
'lat' : lat[0],
'pointid' : cpoint,
'paths' : measures['path'].tolist(),
'lines' : measures['line'].tolist(),
'samples' : measures['sample'].tolist(),
'walltime' : walltime}
rqueue.rpush(queuename, json.dumps(msg, cls=JsonEncoder))
# Submit the jobs
submitter = Slurm('acn_propagate',
job_name='cross_instrument_matcher',
mem_per_cpu=config['cluster']['processing_memory'],
time=walltime,
partition=config['cluster']['queue'],
output=config['cluster']['cluster_log_dir']+'/autocnet.cim-%j')
job_counter = len(groups.items())
submitter.submit(array='1-{}'.format(job_counter))
return job_counter
[docs] def distribute_ground_density(self, threshold=4, distribute_points_kwargs={}):
"""
Distribute candidate ground points into overlaps with a number of images greater than or equal
to the threshold. This function returns a list of 2d nd-arrays where the first element is the
longitude and the second element is the latitude.
Parameters
----------
distirbute_points_kwargs : dict
Of arguments that are passed on the the
distribute_points_in_geom argument in autocnet.cg.cg
threshold : int
Overlaps intersecting threshold images or greater have points placed.
Default 4.
Returns
-------
valid : np.ndarray
n, 2 array in the form lon, lat
Examples
--------
Usage for `distribute_ground_density` is identical to usage for `distribute_ground_uniform`.
See Also
--------
autocnet.graph.network.NetworkCandidateGraph.distribute_ground_uniform
"""
valid = []
with self.session_scope() as session:
res = session.query(Overlay).filter(func.array_length(Overlay.intersections, 1) >= threshold).all()
for r in res:
coords = cg.distribute_points_in_geom(r.geom, **distribute_points_kwargs)
if len(coords) > 0:
valid.append(coords)
valid = np.vstack(valid)
return valid
def subpixel_register_points(self, **kwargs):
subpixel.subpixel_register_points(self.Session, **kwargs)
def subpixel_register_point(self, pointid, **kwargs):
subpixel.subpixel_register_point(self.Session, pointid, **kwargs)
def subpixel_regiter_mearure(self, measureid, **kwargs):
subpixel.subpixel_register_measure(self.Session, measureid, **kwargs)
def propagate_control_network(self, control_net, **kwargs):
cim.propagate_control_network(self.Session,
self.config,
self.dem,
control_net)
def generate_ground_points(self, ground_mosaic, **kwargs):
cim.generate_ground_points(self.Session, ground_mosaic, **kwargs)
def place_points_in_overlaps(self, nodes, **kwargs):
overlap.place_points_in_overlaps(self.Session,
self.config,
self.dem,
nodes,
**kwargs)