Source code for autocnet.matcher.cpu_matcher
import warnings
import numpy as np
import pandas as pd
import cv2
FLANN_INDEX_KDTREE = 1 # Algorithm to set centers,
DEFAULT_FLANN_PARAMETERS = dict(algorithm=FLANN_INDEX_KDTREE, trees=3)
[docs]def match(edge, k=2, **kwargs):
"""
Given two sets of descriptors, utilize a FLANN (Approximate Nearest
Neighbor KDTree) matcher to find the k nearest matches. Nearness is
the euclidean distance between descriptors.
The matches are then added as an attribute to the edge object.
Parameters
----------
k : int
The number of neighbors to find
"""
def _add_matches(matches):
"""
Given a dataframe of matches, either append to an existing
matches edge attribute or initially populate said attribute.
Parameters
----------
matches : dataframe
A dataframe of matches
"""
if edge.matches.empty:
edge.matches = matches
else:
df = edge.matches
edge.matches = df.append(matches,
ignore_index=True,
verify_integrity=True)
def mono_matches(a, b, aidx=None, bidx=None):
"""
Apply the FLANN match_features
Parameters
----------
a : object
A node object
b : object
A node object
aidx : iterable
An index for the descriptors to subset
bidx : iterable
An index for the descriptors to subset
"""
# Subset if requested
if aidx is not None:
ad = a.descriptors[aidx]
else:
ad = a.descriptors
if bidx is not None:
bd = b.descriptors[bidx]
else:
bd = b.descriptors
# Load, train, and match
fl.add(ad, a['node_id'], index=aidx)
fl.train()
matches = fl.query(bd, b['node_id'], k, index=bidx)
_add_matches(matches)
fl.clear()
fl = FlannMatcher()
# Get the correct descriptors
aidx = kwargs.pop('aidx', None)
bidx = kwargs.pop('bidx', None)
mono_matches(edge.source, edge.destination, aidx=aidx, bidx=bidx)
# Swap the indices since mono_matches is generic and source/destin are
# swapped
mono_matches(edge.destination, edge.source, aidx=bidx, bidx=aidx)
source_keypoints = edge.source.keypoints[['x', 'y']]
source_keypoints.rename(columns={'x': 'source_x', 'y': 'source_y'}, inplace=True)
edge.matches = edge.matches.join(source_keypoints, 'source_idx')
destination_keypoints = edge.destination.keypoints[['x', 'y']]
destination_keypoints.rename(columns={'x': 'destination_x', 'y': 'destination_y'}, inplace=True)
edge.matches = edge.matches.join(destination_keypoints, 'destination_idx')
edge.matches.sort_values(by=['distance'])
[docs]class FlannMatcher(object):
"""
A wrapper to the OpenCV Flann based matcher class that adds
metadata tracking attributes and methods. This takes arbitrary
descriptors and so should be available for use with any
descriptor data stored as an ndarray.
Attributes
----------
image_indices : dict
with key equal to the train image index (returned by the DMatch object),
e.g. an integer array index
and value equal to the image identifier, e.g. the name
image_index_counter : int
The current number of images loaded into the matcher
"""
def __init__(self, flann_parameters=DEFAULT_FLANN_PARAMETERS):
self._flann_matcher = cv2.FlannBasedMatcher(flann_parameters, {})
self.nid_lookup = {}
self.search_idx = {}
self.node_counter = 0
[docs] def add(self, descriptor, nid, index=None):
"""
Add a set of descriptors to the matcher and add the image
index key to the image_indices attribute
Parameters
----------
descriptor : ndarray
The descriptor to be added
nid : int
The node ids
"""
self._flann_matcher.add([descriptor])
self.nid_lookup[self.node_counter] = nid
self.node_counter += 1
if index is not None:
self.search_idx = dict((i, j) for i, j in enumerate(index))
else:
self.search_idx = dict((i,i) for i in range(len(descriptor)))
[docs] def clear(self):
"""
Remove all nodes from the tree and resets
all counters
"""
self._flann_matcher.clear()
self.nid_lookup = {}
self.node_counter = 0
self.search_idx = {}
[docs] def train(self):
"""
Using the descriptors, generate the KDTree
"""
self._flann_matcher.train()
[docs] def query(self, descriptor, query_image, k=3, index=None):
"""
Parameters
----------
descriptor : ndarray
The query descriptor to search for
query_image : hashable
Key of the query image
k : int
The number of nearest neighbors to search for
index : iterable
An iterable of observation indices to utilize for
the input descriptors
Returns
-------
matched : dataframe
containing matched points with columns containing:
matched image name, query index, train index, and
descriptor distance
"""
matches = self._flann_matcher.knnMatch(descriptor, k=k)
matched = []
for i, m in enumerate(matches):
for j in m:
if index is not None:
qid = index[i]
else:
qid = j.queryIdx
source = query_image
destination = self.nid_lookup[j.imgIdx]
if source < destination:
matched.append((query_image,
qid,
destination,
self.search_idx[j.trainIdx],
j.distance))
elif source > destination:
matched.append((destination,
self.search_idx[j.trainIdx],
query_image,
qid,
j.distance))
else:
warnings.warn('Likely self neighbor in query!')
return pd.DataFrame(matched, columns=['source_image', 'source_idx',
'destination_image', 'destination_idx',
'distance']).astype(np.float32)