Source code for generation.read_block

# -*- coding: utf-8 -*- 
# @Time : 2022/8/10 13:48 
# @Author : lepold
# @File : read_block.py

import re
import os
import numpy as np
import torch


[docs]def connect_for_block(block_dir, dense=True, bases=None, return_src=False): """ Read npz file , and convert it into node attribute and adjacency matrix id dense is True, else convert to generator. Parameters ---------- block_dir: str the directory of these block npz files. dense: bool whether convert to dense case. bases: optional, None the bases of neuronal number. return_src: bool `old version, may be eliminated` Returns ------- """ block_name = re.compile('block_[0-9]*.npz') block_length = len([name for name in os.listdir(block_dir) if block_name.fullmatch(name)]) if bases is None: bases = [0] for i in range(block_length): pkl_path = os.path.join(block_dir, "block_{}.npz".format(i)) file = np.load(pkl_path) bases.append(bases[-1] + file["property"].shape[0]) bases = np.array(bases, dtype=np.int64) if dense: weights = [] indices = [] sizes = [] properties = [] src = [] for i in range(block_length): pkl_path = os.path.join(block_dir, "block_{}.npz".format(i)) file = np.load(pkl_path) property = file["property"] properties.append(property) if 'src_timestamp' in file or 'src_neuron_idx' in file: assert 'src_timestamp' in file and 'src_neuron_idx' in file weight = np.array([], dtype=np.float32) idx = np.array([[], [], []], dtype=np.uint32) size = [4, file["property"].shape[0], 0] _src_timestamp = file["src_timestamp"] _src_neuron_idx = file['src_neuron_idx'] _src = np.zeros([property.shape[0], np.max(_src_timestamp) + 1], dtype=np.uint8) _src[_src_neuron_idx, _src_timestamp] = 1 src.append(_src) else: output_neuron_idx, input_block_idx, input_neuron_idx, input_channel_offset, weight = \ tuple(file[name] for name in ["output_neuron_idx", "input_block_idx", "input_neuron_idx", "input_channel_offset", "weight"]) idx = np.stack([input_channel_offset.astype(np.uint32), output_neuron_idx.astype(np.uint32), (bases[(input_block_idx + i)] + input_neuron_idx).astype(np.uint32)]) weight = weight.reshape([-1]) idx = np.stack([idx, idx], axis=-1) idx[0, :, 1] = idx[0, :, 0] + 1 idx = idx.reshape([3, -1]) if idx.shape[1] > 0: size = [4, np.max(idx[1]) + 1, np.max(idx[2]) + 1] else: size = None if size is not None: indices.append(idx) weights.append(weight) sizes.append(size) # print(sizes) size = tuple(np.max(np.array(sizes), axis=0)[1:].tolist()) property = torch.cat([torch.from_numpy(property) for property in properties]) weight = torch.cat([torch.sparse.FloatTensor( torch.from_numpy(indices[i].astype(np.int64)), torch.from_numpy(weights[i]), torch.Size([4, bases[i + 1] - bases[i], bases[-1]])) for i in range(block_length)], dim=1) assert property.shape[0] == weight.shape[1] assert weight.shape[2] == weight.shape[1] print(property.shape, weight.shape) if return_src: return property, weight, torch.from_numpy(np.concatenate(src).astype(np.bool)) if len(src) > 0 else None else: return property, weight else: def conn(i, s, e): pkl_path = os.path.join(block_dir, "block_{}.npz".format(i)) file = np.load(pkl_path) if 'src' not in file: output_neuron_idx, input_block_idx, input_neuron_idx, input_channel_offset, weight = \ tuple(file[name] for name in ["output_neuron_idx", "input_block_idx", "input_neuron_idx", "input_channel_offset", "weight"]) selection_idx = np.logical_and(output_neuron_idx >= s, output_neuron_idx < e).nonzero()[0] output_neuron_idx = np.take(output_neuron_idx, selection_idx, axis=0) input_block_idx = np.take(input_block_idx, selection_idx, axis=0) input_channel_offset = np.take(input_channel_offset, selection_idx, axis=0) input_neuron_idx = np.take(input_neuron_idx, selection_idx, axis=0) weight = np.take(weight, selection_idx, axis=0) output_neuron_idx = output_neuron_idx.astype(np.uint32) input_neuron_idx = input_neuron_idx.astype(np.uint32) output_neuron_idx += bases[i].astype(output_neuron_idx.dtype) input_neuron_idx += bases[i + input_block_idx].astype(input_neuron_idx.dtype) yield output_neuron_idx, input_neuron_idx, input_channel_offset, weight def prop(i, s, e): pkl_path = os.path.join(block_dir, "block_{}.npz".format(i)) file = np.load(pkl_path) property = file["property"] assert 0 <= s <= e <= property.shape[0] return property[s:e].copy() return prop, conn, bases