Source code for autocnet.io.network

from io import BytesIO
import json
import os
import warnings
from zipfile import ZipFile

from networkx.readwrite import json_graph
import numpy as np
import pandas as pd

import autocnet


[docs]class NumpyEncoder(json.JSONEncoder):
[docs] def default(self, obj): """ If input object is an ndarray it will be converted into a dict holding dtype, shape and the data, base64 encoded. """ if isinstance(obj, np.ndarray): return dict(__ndarray__= obj.tolist(), dtype=str(obj.dtype), shape=obj.shape) # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj)
[docs]def save(network, projectname): """ Save an AutoCNet candiate graph to disk in a compressed file. The graph adjacency structure is stored as human readable JSON and all potentially large numpy arrays are stored as compressed binary. The project archive is a standard .zip file that can have any ending, e.g., <projectname>.project, <projectname>.zip, <projectname>.myname. TODO: This func. writes a intermediary .npz to disk when saving. Can we write the .npz to memory? Parameters ---------- network : object The AutoCNet Candidate Graph object projectname : str The PATH to the output file. """ # Convert the graph into json format js = json_graph.node_link_data(network) with ZipFile(projectname, 'w') as pzip: js_str = json.dumps(js, cls=NumpyEncoder, sort_keys=True, indent=4) pzip.writestr('graph.json', js_str) # Write the array node_attributes for n, data in network.nodes.data('data'): ndarrays_to_write = {} for k, v in data.__dict__.items(): if isinstance(v, np.ndarray): ndarrays_to_write[k] = v elif isinstance(v, pd.DataFrame): ndarrays_to_write[k] = v ndarrays_to_write[k+'_idx'] = v.index ndarrays_to_write[k+'_columns'] = v.columns np.savez('{}.npz'.format(data['node_id']),**ndarrays_to_write) pzip.write('{}.npz'.format(data['node_id'])) os.remove('{}.npz'.format(data['node_id'])) # Write the array edge attributes to hdf for s, d, data in network.edges.data('data'): if s > d: s, d = d, s ndarrays_to_write = {} for k,v in data.__dict__.items(): if isinstance(v, np.ndarray): ndarrays_to_write[k] = v elif isinstance(v, pd.DataFrame): ndarrays_to_write[k] = v ndarrays_to_write[k+'_idx'] = v.index ndarrays_to_write[k+'_columns'] = v.columns # Handle DataFrames that are properties for k in ['_matches', '_masks', '_costs']: ndarrays_to_write[k] = getattr(data, k, np.array([])) ndarrays_to_write['{}_idx'.format(k)] = getattr(data, k, pd.DataFrame()).index ndarrays_to_write['{}_columns'.format(k)] = getattr(data, k, pd.DataFrame()).columns np.savez('{}_{}.npz'.format(s, d),**ndarrays_to_write) pzip.write('{}_{}.npz'.format(s, d)) os.remove('{}_{}.npz'.format(s, d))
[docs]def json_numpy_obj_hook(dct): """Decodes a previously encoded numpy ndarray with proper shape and dtype. :param dct: (dict) json encoded ndarray :return: (ndarray) if input was an encoded ndarray """ if isinstance(dct, dict) and '__ndarray__' in dct: data = np.asarray(dct['__ndarray__']) return np.frombuffer(data, dct['dtype']).reshape(dct['shape']) return dct
[docs]def load(projectname): """ Loads an autocnet project. Parameters ---------- projectname : str PATH to the file. """ with ZipFile(projectname, 'r') as pzip: # Read the graph object with pzip.open('graph.json', 'r') as g: data = json.loads(g.read().decode(),object_hook=json_numpy_obj_hook) cg = autocnet.graph.network.CandidateGraph() Edge = autocnet.graph.edge.Edge Node = autocnet.graph.node.Node # Reload the graph attributes cg.graph = data['graph'] # Handle nodes for d in data['nodes']: # Backwards compatible with nx 1.x proj files (64_apollo in examples) if 'data' in d.keys(): d = d['data'] n = Node() for k, v in d.items(): if k == 'id': continue n[k] = v try: # Load the byte stream for the nested npz file into memory and then unpack n.load_features(BytesIO(pzip.read('{}.npz'.format(d['id'])))) nzf = np.load(BytesIO(pzip.read('{}.npz'.format(d['id'])))) n.masks = pd.DataFrame(nzf['masks'], index=nzf['masks_idx'], columns=nzf['masks_columns']) except: pass # The node does not have features to load. cg.add_node(d['node_id'], data=n) for e in data['links']: s = e['source'] d = e['target'] if s > d: s,d = d,s source = cg.nodes[s]['data'] destination = cg.nodes[d]['data'] edge = Edge(source, destination) # Backwards compatible with nx 1.x proj files (64_apollo in examples) if 'data' in e.keys(): di = e['data'] else: di = e # Read the data and populate edge attrs for k, v in di.items(): if k == 'target' or k == 'source': continue edge[k] = v try: nzf = np.load(BytesIO(pzip.read('{}_{}.npz'.format(s,d)))) for j in ['_matches', '_masks', '_costs']: setattr(edge, j, pd.DataFrame(nzf[j], index=nzf['{}_idx'.format(j)], columns=nzf['{}_columns'.format(j)])) except: pass # Add a mock edge cg.add_edge(s, d, data=edge) cg._order_adjacency return cg