Source code for generation.make_block

# -*- coding: utf-8 -*- 
# @Time : 2022/8/9 16:23 
# @Author : lepold
# @File : make_block.py

"""
The WBNN model presents the computational basis of the Digital twin brain(WBM) and
is composed of two components: the basic computing unites and the network structure.

Generating logic can be basically broken down into the following two steps:

1) provide constructing information of population (minimum specific unit)

    a. Weighted directed graph of connections between groups

    b. The average degree of neurons in each population

    c. The size scale of each population

    d. Parameters of neuron model

2) Construct connections for each neuron based on the above information
"""

import os
import time
from multiprocessing.pool import Pool as pool
from multiprocessing.pool import ThreadPool as Thpool

import numpy as np
import torch
from numba import jit
from generation.read_block import connect_for_block
import pickle
import sparse


def record_runtime(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"Function {func.__name__} took {end_time - start_time:.5f} seconds to execute.")
        return result

    return wrapper


# @record_runtime
@jit(nogil=True, nopython=True)
def get_k_idx(max_k, num, except_idx):  # , destination, orign
    """
    Fast implementation of random sampling with Numba.

    Parameters
    ----------
    max_k: int
        allowed range to sample.
    num: int
        the number of samples required.
    except_idx: int
        whether consider self idx.

    Returns
    -------
    sample idx: ndarray

    """
    if except_idx < 0:
        assert num <= max_k  # return np.random.choice(max_k, num, replace=True)
        # if num > max_k:
        #     print("destination: %s ; source: %s, num: %s, max_K: %s"%(destination, orign, num, max_k))
        #     raise ValueError
        if num == max_k:
            return np.arange(0, max_k)
    elif except_idx is not None:
        assert num < max_k
        if num == max_k - 1:
            return np.concatenate((np.arange(0, except_idx), np.arange(except_idx + 1, max_k)))

    j = 2
    while True:
        k_idx = np.unique(np.random.randint(0, max_k, num * j))
        k_idx = k_idx[np.random.permutation(k_idx.shape[0])]
        if except_idx is not None:
            k_idx = k_idx[k_idx != except_idx]
        k_idx = k_idx[:num]
        if k_idx.shape[0] == num:
            break
        j += 1
    return k_idx


@record_runtime
# @jit(nogil=True, nopython=True)
def random_sample(max_k, num, except_idx):
    if except_idx < 0:
        assert num <= max_k
        if num == max_k:
            return np.arange(0, max_k)
    else:
        assert num < max_k
        if num == max_k - 1:
            return np.delete(np.arange(max_k), except_idx)

    sample = np.arange(num, dtype=np.int64)
    sample[sample == except_idx] = max_k - 1
    if except_idx < 0:
        step_range = np.arange(num, max_k - 1)
    elif except_idx >= num:
        step_range = np.delete(np.arange(num, max_k - 1), except_idx - num)
    for i in step_range:
        j = np.random.randint(0, i + 1)
        if j < num:
            sample[j] = i
    return sample


# @jit(nogil=True, nopython=True)
def examine(input_block_idx, extern_input_size):
    unique_input_block_idx = np.unique(input_block_idx)
    index_all = np.ones_like(input_block_idx, dtype=np.bool_)
    for i in unique_input_block_idx:
        index = np.where(input_block_idx == i)[0]
        if np.shape(index)[0] >= extern_input_size[i]:
            index = index[extern_input_size[i] - 1:]  # assert num<max_k in except idx is not -1.
            index_all[index] = np.False_
    out = input_block_idx[index_all]
    return out


[docs]def connect_for_multi_sparse_block(population_connect_prob, population_node_init_kwards=None, degree=int(1e3), prefix=None, multi_conn2single_conn=False, dtype="single"): """ Main api to generate connection table and save in npz file, if given information contain connection portability of populations, the average degree of neurons in each population, and The size scale of each population. Parameters ---------- population_connect_prob: Tensor or sparse.COO the connection probability of populations. population_node_init_kwards: dict or list the information includes size information of each population. if dict, it's information of one population and will broadcast to each population. if list, it mush be [dict, dict, ..] and contains each population information. degree: ndarray or int specified degree of each population. prefix: str or None specifies the way of generation, 1)writing to prefix 2) return a closure. Returns ------- """ if isinstance(population_connect_prob, torch.Tensor): population_connect_prob = population_connect_prob.numpy() assert len(population_connect_prob.shape) == 2 and \ population_connect_prob.shape[0] == population_connect_prob.shape[1] # population_connect_prob should be a [N, N] tensor N = population_connect_prob.shape[0] population_node_init_kwards = {} if population_node_init_kwards is None else population_node_init_kwards if isinstance(population_node_init_kwards, dict): extern_input_k_sizes = [population_node_init_kwards["size"]] * N elif isinstance(population_node_init_kwards, list): extern_input_k_sizes = [b["size"] for b in population_node_init_kwards] else: raise ValueError # print('total {} populations'.format(N)) if prefix is None: def _out(): if isinstance(population_node_init_kwards, dict): number = [population_node_init_kwards['size']] * N sub_block_idx = None elif isinstance(population_node_init_kwards, list): number = [b['size'] for b in population_node_init_kwards] if 'sub_block_idx' in population_node_init_kwards[0]: sub_block_idx = np.array([population_node_init_kwards[i]["sub_block_idx"] for i in range(len(population_node_init_kwards))], dtype=np.int64) else: sub_block_idx = None else: raise ValueError bases = np.add.accumulate(np.array(number, dtype=np.int64)) bases = np.concatenate([np.array([0], dtype=np.int64), bases]) def prop(i, s, e): block_node_init_kward = population_node_init_kwards[i] if isinstance(population_node_init_kwards, list) else population_node_init_kwards if 'sub_block_idx' not in block_node_init_kward: return generate_block_node_property(sub_block_idx=i, s=s, e=e, **block_node_init_kward) else: return generate_block_node_property(s=s, e=e, **block_node_init_kward) def conn(i, s, e): prob = population_connect_prob[i, :] step = int(1e6) for _s in range(s, e, step): _e = min(_s + step, e) output_neuron_idx, input_block_idx, input_neuron_idx, input_neuron_offset, weight = \ connect_for_single_sparse_block(i, bases[i + 1] - bases[i], prob, s=_s, e=_e, extern_input_k_sizes=extern_input_k_sizes, degree=degree if not isinstance(degree, np.ndarray) else degree[ i], multi_conn2single_conn=multi_conn2single_conn, dtype=dtype, sub_block_idx=sub_block_idx) output_neuron_idx = output_neuron_idx.astype(np.int64) input_neuron_idx = input_neuron_idx.astype(np.int64) output_neuron_idx += bases[i].astype(output_neuron_idx.dtype) input_neuron_idx += bases[i + input_block_idx].astype(input_neuron_idx.dtype) # assert len(input_neuron_idx) == len(input_block_idx) == len(output_neuron_idx) yield output_neuron_idx, input_neuron_idx, input_neuron_offset, weight return prop, conn, bases return _out else: os.makedirs(prefix, exist_ok=True) mpi = False if mpi: from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() chunk = int(N // 100) # brain_region version: 1 for i in range(rank, 100, size): s, e = chunk * i, chunk * (i + 1) if i < 99 else N with pool(initializer=_init, initargs=(population_connect_prob, population_node_init_kwards, extern_input_k_sizes, degree, prefix, multi_conn2single_conn, dtype )) as p: p.map(_process_dti, range(s, e, 1), chunksize=1) else: with pool(initializer=_init, initargs=(population_connect_prob, population_node_init_kwards, extern_input_k_sizes, degree, prefix, multi_conn2single_conn, dtype)) as p: p.map(_process_dti, range(0, N, 1), chunksize=1) print('total done!') return
[docs]def connect_for_single_sparse_block(population_idx, k, extern_input_rate, extern_input_k_sizes, degree=int(1e3), s=0, e=-1, multi_conn2single_conn=False, dtype="single", sub_block_idx=None): """ For each population, we implement the detailed construction of connection table. Parameters ---------- population_idx: int the idx of processing population. k: int the number of neurons in this population. extern_input_rate: ndarray or sprse.COO the connection probability from others to itself. extern_input_k_sizes: ndarray for those populations that need to connect here, the total number of neurons that can be sampled, that is their maximum total number of neurons. degree: int number of in-degree for this population. s: int start idx of neurons in this population. e: int end idx of neurons in this population. Returns ------- Notes ------- In the processing, we need to ensure that each source population can meet the sampling requirements of the target populaiton. If it is not met, we pop up the error and interrupt the processing. However, although the above requirements are met in terms of probability, a few samples may fail. In this case, we use some tricks to slightly adjust the degree requirements of this population. ------- """ if e == -1: e = k assert 0 <= s <= e <= k _extern_input_k_sizes = np.array(extern_input_k_sizes, dtype=np.int64) if s < e: if isinstance(extern_input_rate, np.ndarray): extern_input_rate = np.add.accumulate(extern_input_rate) extern_input_idx = None else: extern_input_idx = extern_input_rate.coords[0, :] # ensure the degree requirement of this target population is reasonable if multi_conn2single_conn: degree_max = (_extern_input_k_sizes[extern_input_idx] / extern_input_rate.data).astype( np.int64) degree_max = np.min(degree_max) if degree_max < degree: print(f"{degree_max} ?< {degree}, {extern_input_idx}") extern_input_rate = np.add.accumulate(extern_input_rate.data) assert np.abs(1 - extern_input_rate[-1]) < 1e-4, f"{s}-{e}, {np.abs(1 - extern_input_rate[-1])}" extern_input_rate = extern_input_rate[:-1] # @jit(nogil=True, nopython=True) def _run(i): r = np.random.rand(degree) input_block_idx = np.searchsorted(extern_input_rate, r, 'right').astype(np.int16) if extern_input_idx is not None: input_block_idx = extern_input_idx[input_block_idx] if multi_conn2single_conn: input_block_idx = examine(input_block_idx, _extern_input_k_sizes) input_channel_offset = np.zeros_like(input_block_idx, dtype=np.uint8) output_neuron_idx = np.ones_like(input_block_idx, dtype=np.uint32) * i if sub_block_idx is None: input_channel_offset[input_block_idx % 2 == 0] = 0 input_channel_offset[input_block_idx % 2 == 1] = 2 else: conn_EI = sub_block_idx[input_block_idx] input_channel_offset[conn_EI % 2 == 0] = 0 input_channel_offset[conn_EI % 2 == 1] = 2 if dtype == "single": weight = np.random.rand(input_block_idx.shape[0], 2).astype(np.float32) elif dtype == "uint8": weight = (np.random.randint(0, 256, dtype=np.uint8, size=(input_block_idx.shape[0], 2))) # trick process # if population_idx < 195700: # index = np.where(np.abs(population_idx - input_block_idx) > 10)[0] # weight[index, :] = weight[index, :] * 1.5 else: raise NotImplementedError input_neuron_idx = np.zeros_like(input_block_idx, dtype=np.uint32) for _idx in np.unique(input_block_idx): extern_incomming_idx = (input_block_idx == _idx).nonzero()[0] if _idx != population_idx: extern_outcomming_idx = get_k_idx(_extern_input_k_sizes[_idx], extern_incomming_idx.shape[0], -1) # population_idx, _idx else: extern_outcomming_idx = get_k_idx(_extern_input_k_sizes[_idx], extern_incomming_idx.shape[0], i) # population_idx, _idx input_neuron_idx[extern_incomming_idx] = extern_outcomming_idx input_block_idx -= population_idx return input_block_idx, input_neuron_idx, input_channel_offset, output_neuron_idx, weight # time1 = time.time() with Thpool() as p: input_block_idx, input_neuron_idx, input_channel_offset, output_neuron_idx, weight = tuple( zip(*p.map(_run, range(s, e)))) # time2 = time.time() # print("done", e - s, time2 - time1) input_block_idx = np.concatenate(input_block_idx) input_neuron_idx = np.concatenate(input_neuron_idx) input_channel_offset = np.concatenate(input_channel_offset) output_neuron_idx = np.concatenate(output_neuron_idx) weight = np.concatenate(weight, axis=0) else: input_block_idx = np.zeros([0], dtype=np.int16) input_neuron_idx = np.zeros([0], dtype=np.uint32) input_channel_offset = np.zeros([0], dtype=np.uint8) output_neuron_idx = np.zeros([0], dtype=np.uint32) weight = np.random.rand(e - s, degree, 2).astype(np.float32) return output_neuron_idx, input_block_idx, input_neuron_idx, input_channel_offset, weight
[docs]def generate_block_node_property(size=1000, noise_rate=0.01, I_extern_Input=0, sub_block_idx=0, C=1, T_ref=5, g_Li=0.03, V_L=-75, V_th=-50, V_reset=-65, g_ui=(5 / 275, 5 / 4000, 3 / 30, 3 / 730), V_ui=(0, 0, -70, -100), tao_ui=(2, 40, 10, 50), s=0, e=-1): """ Generate neuronal property for each population. Parameters ---------- noise_rate: float different neuron have a background noise, its output spike is calculated as spike | noise. I_extern_Input: float external current to each neuron sub_block_idx: float population idx C: float capacitance T_ref: float refractory time g_Li: float conductance of leaky channel V_L: float leaky potential V_th: float threshold potential V_reset: float reset potential g_ui: tuple[float, ] conductance of 4 synaptic channels V_ui: tuple[float, ] reverse potential of 4 synaptic channels tao_ui: tuple[float, ] timescale of exponential synaptic filter. s: int start index e: int end indext Returns ------- property: ndarray property of LIF neurons, shape=(e-s, 23) Notes ------- each node contain such property:: noise_rate, blocked_in_stat, I_extern_Input, sub_block_idx, C, T_ref, g_Li, V_L, V_th, V_reset, g_ui, V_ui, tao_ui size: 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 4 4, 4 dtype: f, b, f, i, f, f, f, f, f, f, f, f, f b means bool(although storage as float), f means float. this function support broadcast, e.g, C can be a scalar for a total block or a [E_number, I_number] tensor for total nodes. """ if e == -1: e = size assert 0 <= s <= e <= size property = np.zeros([e - s, 22], dtype=np.float32) property[:, 0] = noise_rate property[:, 1] = 0 property[:, 2] = I_extern_Input property[:, 3] = sub_block_idx property[:, 4] = C property[:, 5] = T_ref property[:, 6] = g_Li property[:, 7] = V_L property[:, 8] = V_th property[:, 9] = V_reset g_ui = g_ui if isinstance(g_ui, np.ndarray) else np.array(g_ui) property[:, 10:14] = g_ui V_ui = V_ui if isinstance(V_ui, np.ndarray) else np.array(V_ui) property[:, 14:18] = V_ui tao_ui = tao_ui if isinstance(tao_ui, np.ndarray) else np.array(tao_ui) property[:, 18:22] = tao_ui return property
def _process_dti(i): global population_connect_prob global population_node_init_kwards global extern_input_k_sizes global degree global prefix global multi_conn2single_conn global dtype def check_if_exist(): os.makedirs(prefix, exist_ok=True) if not os.path.exists(os.path.join(prefix, "block_{}.npz".format(i))): return False return True if check_if_exist(): print("skip", i) return print("processing", i) prob = population_connect_prob[i] if isinstance(population_node_init_kwards, list): population_node_init_kward = population_node_init_kwards[i] else: population_node_init_kward = population_node_init_kwards assert isinstance(population_node_init_kward, dict) property = generate_block_node_property(sub_block_idx=i, **population_node_init_kward) output_neuron_idx, input_block_idx, input_neuron_idx, input_channel_offset, weight \ = connect_for_single_sparse_block(i, population_node_init_kward['size'], prob, extern_input_k_sizes=extern_input_k_sizes, degree=degree if not isinstance(degree, np.ndarray) else degree[i], multi_conn2single_conn=multi_conn2single_conn, dtype=dtype, sub_block_idx=None) os.makedirs(prefix, exist_ok=True) np.savez(os.path.join(prefix, "block_{}".format(i)), property=np.ascontiguousarray(property), output_neuron_idx=np.ascontiguousarray(output_neuron_idx), input_block_idx=np.ascontiguousarray(input_block_idx), input_neuron_idx=np.ascontiguousarray(input_neuron_idx), input_channel_offset=np.ascontiguousarray(input_channel_offset), weight=weight) print("done in population! ", i)
[docs]def merge_dti_distributation_block(orig_path, new_path, dtype="single", avg_degree=(1000,), number=1, block_partition=None, debug_block_dir=None, output_direction=False, MPI_rank=None, only_load=False): """ merge to block that is corresponding to gpu card and then save in a npz file. Parameters ---------- orig_path: closure or str Call this closure and return three generators. or directory of block npz files. new_path: str directory to save dtype: str "single" indicate it's a single ensemble. in the old version, it represents the precision of storage data. number: int number of npz files, corresponding to cpu cards. block_partition: optional, None customized block partition in each gpu cards. debug_block_dir: str or None directory to save debug block. None indicate no debug block. output_direction: bool whether the output neurons as priorities. MPI_rank: int mpi rank , assert mpi rank < total gpus. only_load: bool used in debug block. Returns ------- """ if callable(orig_path): prop, conn, dti_block_thresh = orig_path() else: prop, conn, dti_block_thresh = connect_for_block(orig_path, dense=False) if debug_block_dir is not None: debug_selection_idx, prop, conn, dti_block_thresh = add_debug(debug_block_dir, prop, conn, dti_block_thresh, new_path, only_load) else: debug_selection_idx = None if block_partition is None: block_threshold = get_block_threshold(number, dti_block_thresh) else: if number == 1: # old partition way assert isinstance(block_partition, list) assert sum(block_partition) == dti_block_thresh.shape[0] - 1 idx_threshold = np.add.accumulate(np.array(block_partition, dtype=np.int64)) idx_threshold = np.concatenate([np.array([0], dtype=np.int64), idx_threshold]) block_threshold = np.ascontiguousarray(dti_block_thresh[idx_threshold]) else: # new block partition block_threshold = np.add.accumulate(block_partition) block_threshold = np.insert(block_threshold, 0, 0) assert block_threshold[-1] == dti_block_thresh[-1] if debug_block_dir is not None: assert debug_selection_idx is not None if not only_load: np.save(os.path.join(new_path, "debug_selection_idx"), np.ascontiguousarray( np.stack(list(turn_to_block_idx_in_debug(debug_selection_idx, block_threshold)), axis=1))) def _process(block_i): _new_path = os.path.join(new_path, dtype) os.makedirs(_new_path, exist_ok=True) storage_path = os.path.join(_new_path, "block_{}".format(block_i)) if os.path.exists(storage_path + '.npz'): print("passing processing", block_i) return # print("in processing", block_i) block_start = block_threshold[block_i] block_end = block_threshold[block_i + 1] _avg_degree = int(avg_degree[block_i]) dti_block_selection = [] for j, (s, e) in enumerate(zip(dti_block_thresh[:-1], dti_block_thresh[1:])): if s >= block_start and e <= block_end: s1 = 0 e1 = e - s elif s <= block_start and e >= block_end: s1 = block_start - s e1 = block_end - s elif s >= block_start and s < block_end: s1 = 0 e1 = block_end - s elif e > block_start and e <= block_end: s1 = block_start - s e1 = e - s else: continue assert s1 >= 0 and e1 >= s1 and e1 <= e - s dti_block_selection.append((j, s1, e1)) # print("property finished", j) _property = [] for dti_i, s, e in dti_block_selection: _property.append(prop(dti_i, s, e)) _property = np.concatenate(_property) assert _property.shape[0] == block_end - block_start, f"{_property.shape[0]} vs {block_end - block_start}" connect_sum = (block_end - block_start) * _avg_degree # print("connect_sum", connect_sum) _output_neuron_idx = np.empty(connect_sum, dtype=np.int64) global _input_neuron_idx _input_neuron_idx = np.empty(connect_sum, dtype=np.int64) _input_channel_offset = np.empty(connect_sum, dtype=np.uint8) _weight = np.empty((connect_sum, 2), dtype=dtype) base_cnt = 0 for dti_i, s, e in dti_block_selection: for output_neuron_idx, input_neuron_idx, input_channel_offset, weight in conn(dti_i, s, e): # assert len(output_neuron_idx) == len(input_neuron_idx) == len(input_channel_offset), f"{dti_i}, {s}-->{e}" temp = np.size(output_neuron_idx) _input_neuron_idx[base_cnt: base_cnt + temp] = input_neuron_idx _output_neuron_idx[base_cnt: base_cnt + temp] = output_neuron_idx _input_channel_offset[base_cnt: base_cnt + temp] = input_channel_offset _weight[base_cnt:base_cnt + temp, :] = weight base_cnt = base_cnt + temp if base_cnt != connect_sum: _output_neuron_idx = _output_neuron_idx[:base_cnt] _input_channel_offset = _input_channel_offset[:base_cnt] _input_neuron_idx = _input_neuron_idx[:base_cnt] _weight = _weight[:base_cnt] # assert (np.unique(_output_neuron_idx) == np.arange(block_start, block_end, # dtype=_output_neuron_idx.dtype)).all() _output_neuron_idx = (_output_neuron_idx - block_start).astype(np.uint32) global block_card block_card = block_i _input_block_idx, _input_neuron_idx, outer_sum = turn_to_block_idx(block_threshold, turn_format=True) if not output_direction: new_weight_idx = np.lexsort( (_input_block_idx, _output_neuron_idx)) else: new_weight_idx = np.lexsort( (_input_neuron_idx, _input_block_idx)) _output_neuron_idx = np.take(_output_neuron_idx, new_weight_idx, axis=0) _input_block_idx = np.take(_input_block_idx, new_weight_idx, axis=0) _input_block_idx -= block_i _input_neuron_idx = np.take(_input_neuron_idx, new_weight_idx, axis=0) _input_channel_offset = np.take(_input_channel_offset, new_weight_idx, axis=0) _weight = np.take(_weight, new_weight_idx, axis=0) print("done in ", block_i, "card and size is ", block_end - block_start, "out conn", outer_sum) _new_path = os.path.join(new_path, dtype) storage_path = os.path.join(_new_path, "block_{}".format(block_i)) np.savez(storage_path, property=_property, output_neuron_idx=_output_neuron_idx, input_block_idx=_input_block_idx, input_neuron_idx=_input_neuron_idx, input_channel_offset=_input_channel_offset, weight=_weight) block_numbers = block_threshold[1:] - block_threshold[:-1] assert (block_numbers > 0).all() if MPI_rank is None: # because the global _input_neuron_idx is a nonlocal variable, multiprocess will use a common variable, inducing error. # or just delete global in function turn_to_block_idx # with Thpool() as p: # p.map(_process, range(0, block_numbers.shape[0])) for i in range(block_numbers.shape[0]): _process(i) else: assert 0 <= MPI_rank and MPI_rank < block_numbers.shape[0] _process(MPI_rank) return block_threshold
def _init(_population_connect_prob, _population_node_init_kwards, _extern_input_k_sizes, _degree, _prefix, _multi_conn2single_conn, _dtype): global population_connect_prob global population_node_init_kwards global extern_input_k_sizes global degree global prefix global multi_conn2single_conn global dtype population_connect_prob = _population_connect_prob population_node_init_kwards = _population_node_init_kwards extern_input_k_sizes = _extern_input_k_sizes degree = _degree prefix = _prefix multi_conn2single_conn = _multi_conn2single_conn dtype = _dtype np.random.seed() def get_block_threshold(number, dti_block_thresh): if isinstance(number, int): _block_number = (dti_block_thresh[-1] - 1) // number + 1 block_threshold = np.concatenate([np.arange(0, dti_block_thresh[-1], _block_number, dtype=np.int64), np.array([dti_block_thresh[-1]], dtype=np.int64)]) elif isinstance(number, list): weight = np.array(number) _block_number = dti_block_thresh[-1] / np.sum(weight) * weight block_threshold = np.add.accumulate(_block_number).astype(np.int64) block_threshold = np.concatenate([np.array([0], dtype=np.int64), block_threshold]) block_threshold[-1] = dti_block_thresh[-1] else: raise ValueError return block_threshold def turn_to_block_idx(block_threshold, turn_format=False): global _input_neuron_idx block_idx = np.searchsorted(block_threshold, _input_neuron_idx, side='right') - 1 outer_conn = len(block_idx) - (block_idx == block_card).sum() if turn_format: block_idx = block_idx.astype(np.int16) neuron_idx = _input_neuron_idx - block_threshold[block_idx] if turn_format: neuron_idx = neuron_idx.astype(np.uint32) return block_idx, neuron_idx, outer_conn def turn_to_block_idx_in_debug(idx, block_threshold, turn_format=False): block_idx = np.searchsorted(block_threshold, idx, side='right') - 1 if turn_format: block_idx = block_idx.astype(np.int16) neuron_idx = idx - block_threshold[block_idx] if turn_format: neuron_idx = neuron_idx.astype(np.uint32) return block_idx, neuron_idx
[docs]def add_debug(debug_block_dir, prop, conn, _dti_block_thresh, debug_idx_path, only_load): """ Add debug block to verify the accuracy of simulation with cuda program. Parameters. Parameters ---------- debug_block_dir: str the directory of small block which is used to verify accuracy of simulation. _dti_block_thresh: ndarray number of neurons in each population. debug_idx_path: ndarray debug idx in debug block. only_load: bool Returns ------- """ debug_prop, debug_conn, debug_block_thresh = connect_for_block(debug_block_dir, dense=False) main_size = _dti_block_thresh[-1] def generate_debug_selection_idx(): while True: out = np.random.choice(main_size + debug_block_thresh[-1], debug_block_thresh[-1] * 2, replace=True).astype( np.int64) out = np.random.permutation(np.unique(out)) if out.shape[0] >= debug_block_thresh[-1]: temp = out[:debug_block_thresh[-1]] if (temp > 0).all(): return temp debug_selection_idx = load_if_exist(only_load, generate_debug_selection_idx, debug_idx_path, 'debug_selection_idx_original') _sorted_debug_selection_idx, _index = np.unique(debug_selection_idx, return_index=True) def debug_permutation_idx(start, end): # debug_idx -> idx out = np.arange(start, end, dtype=np.int64) _idx = np.searchsorted(_sorted_debug_selection_idx, out, side='right') - 1 _idx2 = (_sorted_debug_selection_idx[_idx] == out) out -= _idx + 1 out[_idx2] = _index[_idx[_idx2]] + main_size return out def debug_recover_idx(idx): # idx-> debug_idx out = np.zeros_like(idx) _idx = idx >= main_size out[_idx] = debug_selection_idx[idx[_idx] - main_size] _idx = (idx < main_size).nonzero()[0] _idx_2 = idx[_idx] while True: new_idx = _idx_2 + np.searchsorted(_sorted_debug_selection_idx, out[_idx], side='right') if (new_idx == out[_idx]).all(): break out[_idx] = new_idx return out dti_block_thresh = _dti_block_thresh.copy() dti_block_thresh[:-1] = debug_recover_idx(_dti_block_thresh[:-1]) dti_block_thresh[-1] = main_size + debug_block_thresh[-1] def add_debug_prop(_prop): debug_property = np.concatenate([debug_prop(i, 0, e - s) for i, (s, e) in enumerate(zip(debug_block_thresh[:-1], debug_block_thresh[ 1:]))]) debug_property[:, 1] = 1 def prop(i, s, e): select_start = dti_block_thresh[i] + s select_end = dti_block_thresh[i] + e assert select_start < select_end and select_end <= dti_block_thresh[i + 1] part_debug_selection_idx = debug_permutation_idx(select_start, select_end) start = np.min(part_debug_selection_idx[part_debug_selection_idx < main_size]) end = np.max(part_debug_selection_idx[part_debug_selection_idx < main_size]) + np.int64(1) part_debug_selection_idx[part_debug_selection_idx < main_size] -= start part_debug_selection_idx[part_debug_selection_idx >= main_size] -= main_size - end + start property = np.concatenate( [_prop(i, start - _dti_block_thresh[i], end - _dti_block_thresh[i]), debug_property]) first_sub_blk = property[0, 3] property = property[part_debug_selection_idx] part_debug_sample_recover_idx = np.argsort(part_debug_selection_idx)[end - start:] sub_block_trace_idx = part_debug_sample_recover_idx.astype(np.int64).copy() while True: turn_idx = np.logical_and(np.isin(sub_block_trace_idx, part_debug_sample_recover_idx), sub_block_trace_idx >= 0).nonzero()[0] if turn_idx.shape[0] == 0: break sub_block_trace_idx[turn_idx] -= 1 property[part_debug_sample_recover_idx, 3] = first_sub_blk property[part_debug_sample_recover_idx[sub_block_trace_idx >= 0], 3] = \ property[sub_block_trace_idx[sub_block_trace_idx >= 0], 3] return property return prop def add_debug_conn(_conn): def conn(i, s, e): select_start = dti_block_thresh[i] + s select_end = dti_block_thresh[i] + e assert select_start < select_end and select_end <= dti_block_thresh[i + 1] part_selection_idx = debug_permutation_idx(select_start, select_end) for di, (ds, de) in enumerate(zip(debug_block_thresh[:-1], debug_block_thresh[1:])): for output_neuron_idx, input_neuron_idx, input_channel_offset, value in debug_conn(di, 0, de - ds): output_neuron_idx = debug_recover_idx(output_neuron_idx + main_size) input_neuron_idx = debug_recover_idx(input_neuron_idx + main_size) _idx = np.logical_and(select_start <= output_neuron_idx, output_neuron_idx < select_end).nonzero()[ 0] output_neuron_idx = np.take(output_neuron_idx, _idx, axis=0) input_neuron_idx = np.take(input_neuron_idx, _idx, axis=0) input_channel_offset = np.take(input_channel_offset, _idx, axis=0) value = np.take(value, _idx, axis=0) yield output_neuron_idx, input_neuron_idx, input_channel_offset, value start = np.min(part_selection_idx[part_selection_idx < main_size]) end = np.max(part_selection_idx[part_selection_idx < main_size]) + np.int64(1) for output_neuron_idx, input_neuron_idx, input_channel_offset, value in \ _conn(i, start - _dti_block_thresh[i], end - _dti_block_thresh[i]): output_neuron_idx = debug_recover_idx(output_neuron_idx) input_neuron_idx = debug_recover_idx(input_neuron_idx) yield output_neuron_idx, input_neuron_idx, input_channel_offset, value return conn prop = add_debug_prop(prop) conn = add_debug_conn(conn) return debug_selection_idx, prop, conn, dti_block_thresh
def load_if_exist(only_load, func, *args): path = os.path.join(*args) if only_load: while True: if os.path.exists(path + ".npy"): try: return np.load(path + ".npy") except: continue else: os.makedirs(args[0], exist_ok=True) if os.path.exists(path + ".npy"): out = np.load(path + ".npy") else: print('running generation') out = func() print('done!') np.save(path, out) return out def apply_map(population_connect_prob, size_scale, degree_scale, map=None): # for example, map='/public/home/tytest04/project/lyh_route/tables/map_table/map_20000_split_2_cortical_v2.pkl' with open(map, 'rb') as f: _merge_map = pickle.load(f) assert isinstance(_merge_map, dict) merge_map = dict() for k, v in _merge_map.items(): if isinstance(v, dict): merge_map[k] = v else: merge_map[k] = {v_p: 1 for v_p in v} assert isinstance(size_scale, np.ndarray) assert isinstance(degree_scale, np.ndarray) assert size_scale.shape[0] == degree_scale.shape[0] # print('estimate total neurons: {}'.format(sum([int(b) for b in block_size]))) order = np.concatenate( [np.array(list(merge_map[str(i)].keys()), dtype=np.int64) for i in range(len(merge_map))]) part = np.concatenate( [np.array(list(merge_map[str(i)].values()), dtype=np.float64) for i in range(len(merge_map))]) size_scale = np.ascontiguousarray(size_scale[order]) * part degree_scale = np.ascontiguousarray(degree_scale[order]) if isinstance(population_connect_prob, np.ndarray): conn_prob = np.ascontiguousarray(population_connect_prob[order, :][:, order]) * part elif isinstance(population_connect_prob, dict): assert (part == 1).all() length = 194342 assert len(order) == length indices = np.argsort(order) coords1 = np.array(list(population_connect_prob.keys()), dtype=np.int64) // length coords2 = np.array(list(population_connect_prob.keys()), dtype=np.int64) % length values = np.array(list(population_connect_prob.values()), dtype=np.float32) coords1 = indices[coords1] coords2 = indices[coords2] conn_prob = sparse.COO(coords=np.stack([coords1, coords2]), data=values, shape=[len(order), len(order)]) else: nonzeros = \ np.logical_and(np.isfinite(population_connect_prob.data), population_connect_prob.data > 0).nonzero()[0] mapping = {o: (c, (order == o).nonzero()[0]) for o, c in zip(*np.unique(order, return_counts=True))} part_error = 0 for o, (count, idx) in enumerate(mapping.values()): if np.abs(np.sum(part[idx]) - 1) > 1e-2: print("part_error:", o, count, np.sum(part[idx])) part_error += 1 if part_error > 0: raise ValueError nnz = len(population_connect_prob.data[nonzeros]) coords1 = np.zeros(nnz, dtype=np.int64) coords2 = np.zeros(nnz, dtype=np.int64) values = np.zeros(nnz, dtype=np.float32) coords_cnt = 0 for c1, c2, data in zip(population_connect_prob.coords[0][nonzeros], population_connect_prob.coords[1][nonzeros], population_connect_prob.data[nonzeros]): count1, idx1 = mapping[c1] count2, idx2 = mapping[c2] out_data = (data * np.ones([count1, count2]) * part[idx2]).reshape(-1) out_idx_1, out_idx_2 = np.meshgrid(idx1, idx2, indexing='ij') out_idx_1 = out_idx_1.reshape(-1) out_idx_2 = out_idx_2.reshape(-1) count = len(out_idx_1) coords1[coords_cnt:coords_cnt + count] = out_idx_1 coords2[coords_cnt:coords_cnt + count] = out_idx_2 values[coords_cnt:coords_cnt + count] = out_data coords_cnt += count coords1 = coords1[:coords_cnt] coords2 = coords2[:coords_cnt] values = values[:coords_cnt] conn_prob = sparse.COO(coords=np.stack([coords1, coords2]), data=values, shape=[len(order), len(order)]) partition = [len(merge_map[str(i)]) for i in range(len(merge_map))] return conn_prob, size_scale, degree_scale, order, partition