-
Maximilian Schmidt authored
Remove dependency from h5py_wrapper completely: Store data in binary numpy data files, add test_analysis to test analysis class, fix many bugs in analysis and analysis_helpers
e22a394a
theory.py 15.92 KiB
"""
theory
============
Theory package to predict the stable fixed points of the multi-area
model of macaque visual cortex (Schmidt et al. 2015), perform further
analysis on them and to apply the stabilization procedure (Schuecker,
Schmidt et al., 2017) to the network connectivity.
Classes
--------
Theory : provides functionality to predict the stable fixed point of
the model, perform further analysis and execute the stabilization
provedure.
"""
import json
import pprint
import nest
import numpy as np
from copy import copy
from .default_params import nested_update, theory_params
from .default_params import check_custom_params
from dicthash import dicthash
from .multiarea_helpers import create_mask, create_vector_mask, dict_to_vector
from .theory_helpers import d_nu_d_mu_fb_numeric, d_nu_d_sigma_fb_numeric
class Theory():
def __init__(self, network, theory_spec):
self.params = copy(theory_params)
check_custom_params(theory_spec, self.params)
self.custom_params = theory_spec
nested_update(self.params, self.custom_params)
self.network = network
E_L = self.params['neuron_params']['single_neuron_dict']['E_L']
self.NP = {'theta': self.params['neuron_params']['single_neuron_dict']['V_th'] - E_L,
'V_reset': self.params['neuron_params']['single_neuron_dict']['V_reset'] - E_L,
'tau_m': self.params['neuron_params']['single_neuron_dict']['tau_m'],
# assumes that tau_syn_ex = tau_syn_in in LIF neuron
'tau_syn': self.params['neuron_params']['single_neuron_dict']['tau_syn_ex'],
't_ref': self.params['neuron_params']['single_neuron_dict']['t_ref'],
'tau': 1.}
self.label = dicthash.generate_hash_from_dict({'params': self.params,
'network_label': self.network.label})
def __eq__(self, other):
return self.label == other.label
def __str__(self):
s = "Analytical theory {} of network {}".format(self.label, self.network.label)
s += "with parameters:"
s += pprint.pformat(self.params, width=1)
return s
def __hash__(self):
return hash(self.label)
def integrate_siegert(self, full_output=True):
"""
Integrate siegert formula to obtain stationary rates. See Eq. (3)
and following in Schuecker, Schmidt et al. (2017).
"""
dt = self.params['dt']
T = self.params['T']
rate_ext = self.params['input_params']['rate_ext']
K = copy(self.network.K_matrix)
J = copy(self.network.J_matrix)
tau = self.NP['tau_m'] * 1e-3
dim = np.shape(K)[0]
nest.ResetKernel()
nest.set_verbosity('M_ERROR')
nest.SetKernelStatus({'resolution': dt,
'use_wfr': False,
'print_time': False,
'overwrite_files': True})
# create neurons for external drive
drive = nest.Create(
'siegert_neuron', 1, params={'rate': rate_ext, 'mean': rate_ext})
# create neurons representing populations
neurons = nest.Create(
'siegert_neuron', dim, params=self.NP)
# external drive
syn_dict = {'drift_factor': tau * np.array([K[:, -1] * J[:, -1]]).transpose(),
'diffusion_factor': tau * np.array([K[:, -1] * J[:, -1]**2]).transpose(),
'model': 'diffusion_connection',
'receptor_type': 0}
nest.Connect(drive, neurons, 'all_to_all', syn_dict)
# external DC drive (expressed in mV)
DC_drive = nest.Create(
'siegert_neuron', 1, params={'rate': 1., 'mean': 1.})
C_m = self.network.params['neuron_params']['single_neuron_dict']['C_m']
syn_dict = {'drift_factor': 1e3 * tau / C_m * np.array(
self.network.add_DC_drive).reshape(dim, 1),
'diffusion_factor': 0.,
'model': 'diffusion_connection',
'receptor_type': 0}
nest.Connect(DC_drive, neurons, 'all_to_all', syn_dict)
# handle switches for cortico-cortical connectivity
if (self.network.params['connection_params']['replace_cc'] in
['hom_poisson_stat', 'het_poisson_stat']):
mu_CC, sigma2_CC = self.replace_cc_input()
mask = create_mask(self.network.structure, cortico_cortical=True, external=False)
K[mask] = 0.
# Additional external drive
# The actual rate is included in the connection
add_drive = nest.Create('siegert_neuron', 1, params={'rate': 1., 'mean': 1.})
syn_dict = {'drift_factor': np.array([mu_CC]).transpose(),
'diffusion_factor': np.array([sigma2_CC]).transpose(),
'model': 'diffusion_connection',
'receptor_type': 0}
nest.Connect(add_drive, neurons, 'all_to_all', syn_dict)
elif self.network.params['connection_params']['replace_cc'] == 'het_current_nonstat':
raise NotImplementedError('Replacing the cortico-cortical input by'
' non-stationary current input is not supported'
' in the Theory class.')
# network connections
syn_dict = {'drift_factor': tau * K[:, :-1] * J[:, :-1],
'diffusion_factor': tau * K[:, :-1] * J[:, :-1]**2,
'model': 'diffusion_connection',
'receptor_type': 0}
nest.Connect(neurons, neurons, 'all_to_all', syn_dict)
# Set initial rates of neurons:
if self.params['initial_rates'] is not None:
# iterate over different initial conditions drawn from a random distribution
if self.params['initial_rates'] == 'random_uniform':
gen = self.initial_rates(self.params['initial_rates_iter'],
dim,
mode=self.params['initial_rates'],
rate_max=1000.)
num_iter = self.params['initial_rates_iter']
# initial rates are explicitly defined in self.params
elif isinstance(self.params['initial_rates'], np.ndarray):
num_iter = 1
gen = (self.params['initial_rates'] for ii in range(num_iter))
# if initial rates are not defined, set them 0
else:
num_iter = 1
gen = (np.zeros(dim) for ii in range(num_iter))
rates = []
# Loop over all iterations of different initial conditions
for nsim in range(num_iter):
print("Iteration {}".format(nsim))
initial_rates = next(gen)
for ii in range(dim):
nest.SetStatus([neurons[ii]], {'rate': initial_rates[ii]})
# create recording device
multimeter = nest.Create('multimeter', params={'record_from':
['rate'], 'interval': 1.,
'to_screen': False,
'to_file': False,
'to_memory': True})
# multimeter
nest.Connect(multimeter, neurons)
nest.Connect(multimeter, drive)
# simulate
nest.Simulate(T)
data = nest.GetStatus(multimeter)[0]['events']
res = np.array([np.insert(data['rate'][np.where(data['senders'] == n)],
0,
initial_rates[ii])
for ii, n in enumerate(neurons)])
if full_output:
rates.append(res)
else:
# Keep only initial and final rates
rates.append(res[:, [0, -1]])
if num_iter == 1:
return self.network.structure_vec, rates[0]
else:
return self.network.structure_vec, rates
def replace_cc_input(self):
"""
Helper function to replace cortico-cortical input by different variants.
"""
mu_CC = np.array([])
sigma2_CC = np.array([])
if self.network.params['connection_params']['replace_cc'] == 'het_poisson_stat':
with open(self.network.params['connection_params'][
'replace_cc_input_source'], 'r') as f:
rates = json.load(f)
self.cc_input_rates = dict_to_vector(rates,
self.network.area_list,
self.network.structure)
elif self.network.params['connection_params']['replace_cc'] == 'hom_poisson_stat':
self.cc_input_rates = (np.ones(self.network.K_matrix.shape[0]) *
self.network.params['input_params']['rate_ext'])
for area in self.network.area_list:
area_dim = len(self.network.structure[area])
mask = create_mask(self.network.structure,
cortico_cortical=True, target_areas=[area],
external=False)
input_areas = list(set(self.network.structure.keys()).difference({area}))
rate_mask = create_vector_mask(self.network.structure,
areas=input_areas)
rate_vector = self.cc_input_rates[rate_mask]
N_input_pops = self.network.K_matrix.shape[1] - 1 - area_dim
K_CC = self.network.K_matrix[mask].reshape((area_dim,
N_input_pops))
J_CC = self.network.J_matrix[mask].reshape((area_dim,
N_input_pops))
mu_CC = np.append(mu_CC, np.dot(K_CC * J_CC, rate_vector))
sigma2_CC = np.append(sigma2_CC, np.dot(K_CC * J_CC**2, rate_vector))
tau = self.NP['tau_m'] * 1e-3
mu_CC *= tau
sigma2_CC *= tau
return mu_CC, sigma2_CC
def initial_rates(self, num_iter, dim, mode='random_uniform', rate_max=100., rng_seed=123):
"""
Helper function to create generator for initial rates
"""
np.random.seed(rng_seed)
n = 0
while n < num_iter:
yield rate_max * np.random.rand(dim)
n += 1
def mu_sigma(self, rates, external=True, matrix_filter=None,
vector_filter=None, replace_cc=None):
"""
Calculates mean and variance according to the
theory.
"""
if matrix_filter is not None:
K = copy(self.network.K_matrix)
J = copy(self.network.J_matrix)
K[np.logical_not(matrix_filter)] = 0.
J[np.logical_not(matrix_filter)] = 0.
else:
K = self.network.K_matrix
J = self.network.J_matrix
if replace_cc is not None:
mu_CC, sigma2_CC = self.replace_cc_input(replace_cc)
mask = create_mask(self.network.structure, cortico_cortical=True, external=False)
K[mask] = 0.
else:
mu_CC = np.zeros_like(rates)
sigma2_CC = np.zeros_like(rates)
KJ = K * J
J2 = J * J
if external:
rates = np.hstack((rates, self.params['input_params']['rate_ext']))
else:
rates = np.hstack((rates, np.zeros(self.dim_ext)))
# if dist:
# # due to distributed weights with std = 0.1
# J2[:, :7] += 0.01 * J[:, :7] * J[:, :7]
KJ2 = K * J2
C_m = self.network.params['neuron_params']['single_neuron_dict']['C_m']
mu = self.NP['tau_m'] * 1e-3 * np.dot(KJ, rates) + mu_CC + self.NP[
'tau_m'] / C_m * self.network.add_DC_drive
sigma2 = self.NP['tau_m'] * 1e-3 * np.dot(KJ2, rates) + sigma2_CC
sigma = np.sqrt(sigma2)
return mu, sigma
def stability_matrix(self, rates, matrix_filter=None,
vector_filter=None, full_output=False, replace_cc=None):
"""
Computes stability matrix on the population level.
"""
if np.any(matrix_filter is not None):
assert(np.any(vector_filter is not None))
assert(self.network.N_vec[vector_filter].size *
(self.network.N_vec[vector_filter].size + 1) ==
self.network.K_matrix[matrix_filter].size)
N = self.network.N_vec[vector_filter]
K = (self.network.K_matrix[matrix_filter].reshape((N.size, N.size + 1)))[:, :-1]
J = (self.network.J_matrix[matrix_filter].reshape((N.size, N.size + 1)))[:, :-1]
else:
N = self.network.N_vec
K = self.network.K_matrix[:, :-1]
J = self.network.J_matrix[:, :-1]
N_pre = np.zeros_like(K)
N_post = np.zeros_like(K)
for ii in range(N.size):
N_pre[ii] = N
N_post[:, ii] = N
# Connection probabilities between populations
C = 1. - (1.-1./(N_pre * N_post))**(K*N_post)
mu, sigma = self.mu_sigma(rates, replace_cc=replace_cc)
if np.any(vector_filter is not None):
mu = mu[vector_filter]
sigma = sigma[vector_filter]
slope = np.array([d_nu_d_mu_fb_numeric(1.e-3*self.NP['tau_m'],
1.e-3*self.NP['tau_syn'],
1.e-3*self.NP['t_ref'],
self.NP['V_th'],
self.NP['V_reset'],
mu[ii], sigma[ii]) for ii in range(N.size)])
slope_sigma = np.array([d_nu_d_sigma_fb_numeric(1.e-3*self.NP['tau_m'],
1.e-3*self.NP['tau_syn'],
1.e-3*self.NP['t_ref'],
self.NP['V_th'],
self.NP['V_reset'],
mu[ii], sigma[ii])*1/(
2. * sigma[ii]) for ii in range(N.size)]) # 1/(mV)**2
slope_matrix = np.zeros_like(J)
slope_sigma_matrix = np.zeros_like(J)
for ii in range(N.size):
slope_matrix[:, ii] = slope
slope_sigma_matrix[:, ii] = slope_sigma
V = C*(1-C)
G = (self.NP['tau_m'] * 1e-3)**2 * (slope_matrix*J +
slope_sigma_matrix*J**2)**2
G_N = N_pre * G
M = G_N * V
if full_output:
return M, slope, slope_sigma, M, C, V, G_N, J, N_pre
else:
return M
def lambda_max(self, rates, matrix_filter=None,
vector_filter=None, full_output=False, replace_cc=None):
"""
Computes radius of eigenvalue spectrum of the stability matrix.
"""
if full_output:
M, slope, slope_sigma, M, EV, C, V, G_N = self.stability_matrix(rates,
matrix_filter=matrix_filter,
vector_filter=vector_filter,
full_output=full_output,
replace_cc=replace_cc)
else:
M = self.stability_matrix(rates, matrix_filter=matrix_filter,
vector_filter=vector_filter,
full_output=full_output, replace_cc=replace_cc)
EV = np.linalg.eig(M)
lambda_max = np.sqrt(np.max(np.real(EV[0])))
if full_output:
return lambda_max, slope, slope_sigma, M, EV, C, V, G
else:
return lambda_max