Select Git revision
NeuroDP_framework.py 25.70 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-
'''
NeuroDP Framework allows you to do parameters sweep on a simuation using cutomized brain map
Author: Aarón Pérez Martín
Contact:a.perez.martin@fz-juelich.de
Organization: Forschungszentrum Jülich
'''
#
import pickle, os, sys, json, numpy as np, seaborn as sns
import matplotlib.pyplot as plt
import scipy.stats
from sklearn.cross_decomposition import CCA
from os import walk
from zipfile import ZipFile
from PyUnicoreManager import *
import tvb.simulator.lab as lab
import requests
from openid_http_client.http_client import HttpClient
from openid_http_client.auth_client.access_token_client import AccessTokenClient
import zipfile
INPUT_NAME = "input_data.zip"
OUTPUT_NAME = "tavg_data"
PARAM_NAME = "params_data"
no_prefixs = ('.', '~', '#')
NVIDIA_SW = ["module load Python SciPy-Stack numba PyCUDA"]
NVIDIA_test = ["module load Python SciPy-Stack numba PyCUDA",
"export CUDA_VISIBLE_DEVICES=0 ",
"nvidia-smi"]
JOB_PARAMS = {"Project": "slns", 'Resources': {"Queue": "develgpus", "Nodes": "1", "Runtime": "10m"}}
logss_cbs = set_logger("ndp.core")
class Type(IntEnum):
# Different type of simua
CUDA = 0
PYTHON = 1
def load_matrix_dataset(path_dataset, total_length, max_round, split_char=" "):
matrix = np.empty((0, total_length), float)
with open(path_dataset, 'r') as f:
for line in f:
raw = line.split(split_char)
# Filters
nums = filter(None, raw) # no empty items
x = np.round(np.fromiter(map(float, nums), dtype=np.float64), max_round)
matrix = np.vstack((matrix, x))
return matrix
def ploting(data, time=None):
if time is not None:
plt.plot(time, data, alpha=0.1)
else:
plt.plot(data, alpha=0.1)
plt.show()
def get_sessions(user_folder):
# TODO: Order of files is important
_, sub_folders, _ = next(walk(user_folder))
sub_folders = sorted(sub_folders)
return [x for x in sub_folders if not x.startswith(no_prefixs)]
def get_files(user_folder):
# TODO: Order of files is important
_, _, filenames = next(walk(user_folder))
return sorted(filenames)
def LoadModel(filename, concept="model"):
try:
file = open(filename, 'rb')
data = pickle.load(file)
file.close()
# (simsteps, states, nodes, paramscombi)
logss_cbs.info("Loaded " + concept + " " + filename + str(data.shape))
return data
except Exception as e:
logss_cbs.error(e)
def LoadModel_TVB(region):
try:
return lab.connectivity.Connectivity.from_file('connectivity_' + str(region) + '.zip')
except Exception as e:
logss_cbs.error(e)
# Calculates the Functional Connectivity
# ys comes from tavg_data,
# NOTE: assuming no interaction means no connection. TODO assuming other point of views.
# dynamic FC, moving window
def compFC(timeSeries):
# print(timeSeries.shape)
return np.corrcoef(timeSeries.T)
# Calculates the degree of correlation between the Functional and the Structural Connectivity
def corrSCFC(SC, FCloc, round=None):
if round:
return np.round(np.corrcoef(FCloc.ravel(), SC.ravel())[0, 1], 6)
else:
return np.corrcoef(FCloc.ravel(), SC.ravel())[0, 1]
# Generates a plot of the Functional and Structural connectivity. Returns their correlation coefficient.
def plot_FCSC(SC, FCloc, user_id):
# print(SC.shape)
# print(FCloc.shape)
fig, ax = plt.subplots(ncols=2, figsize=(12, 3))
# fig.suptitle("Title for whole figure", fontsize=16)
sns.heatmap(FCloc, xticklabels='', yticklabels='', ax=ax[0], cmap='YlGnBu') # coolwarm
sns.heatmap(SC / SC.max(), xticklabels='', yticklabels='', ax=ax[1], cmap='YlGnBu')
r = corrSCFC(SC, FCloc)
# print("Correlation = " + str(r))
ax[0].set_title('simulated FC. \nSC-FC r = ' + str(r))
ax[1].set_title('SC: ' + user_id)
# plt.show()
return r
class DataAnalysis(object):
def model_CCA(X, Y, components=1, decimals=15):
try:
cca = CCA(n_components=components)
X_, Y_ = cca.fit_transform(X, Y)
corrcoef, p_value = scipy.stats.pearsonr(np.ravel(X_), np.ravel(Y_))
corrcoef = np.round(corrcoef, decimals)
p_value = np.round(p_value, decimals)
logss_cbs.info("CCA: X_" + str(X_.shape) + ", Y_" + str(Y_.shape) +
" Coef.Pear: corrcoef " + str(corrcoef) + ", p_value " + str(p_value))
return corrcoef, p_value
except Exception as err:
logss_cbs.error(err)
return None, None
class Simulation():
def __init__(self, name, location, input_params=None):
self.name = name
self.data = None
self.location = location
self.input_params = input_params # dict: coupling(min,max), speed(min, max), etc
self.output_params = None # parameters used
class Simulator():
def __init__(self, authentication: Authentication, language: Type):
self.language = language
self.auth = authentication
self.sim = None
def pick_dict(self, file_location, obj_to_store):
try:
with open(file_location, 'wb') as pickleFile:
pickle.dump(obj_to_store, pickleFile)
except Exception as err:
logss_cbs.error(err)
class Connectivity():
def __init__(self, weights, lenghts, regions):
self.weights = weights
self.tract_lengths = lenghts
self.regions = regions
class Snapshot():
def __init__(self, id, connectivity):
self.id = id
self.connectivity = connectivity
self.simulation = list() # class Simulation
self.best_dict = None # param, pearson, etc
def check_format(string: str, format_str) -> bool:
format_arr = [idx for idx, c in enumerate(format_str) if c == char]
str_arr = [idx for idx, c in enumerate(str) if c == char]
class Patient():
def __init__(self, id=0):
self.id = id
self.snapshots = list()
self.info = dict()
def loadPatient_from_KG(self, path_info, weights_name, lenghts_name, regions=0, decimals=6):
try:
if regions == 0:
raise Exception("Please, specifiy the number of regions")
logss_cbs.info("Loading data from: " + str(path_info))
weights = load_matrix_dataset(path_dataset=path_info + weights_name,
total_length=regions,
max_round=decimals)
tract_lengths = load_matrix_dataset(path_dataset=path_info + lenghts_name,
total_length=regions,
max_round=decimals)
# return weights_matrix, lenghts_matrix
regions = Fake.fake_matrix(regions=regions)
self.snapshots.append(Snapshot(id=self.id, connectivity=Connectivity(weights, tract_lengths, regions)))
except Exception as err:
logss_cbs.error(err)
return None, None
def load_one_session_with_format(self, content_str, format_str, char=os.sep, regions=68, decimals=6):
try:
dict_content, dict_main = {}, {}
# check if content follows the format
a = [idx for idx, c in enumerate(format_str) if c == char]
b = [idx for idx, c in enumerate(content_str) if c == char]
if len(a) != len(b):
return None
array_cnt = content_str.split(char)
array_ids = format_str.split(char)
# content and format can change, they are dynamically filled
dict_main.fromkeys(array_ids)
for idx, val in enumerate(array_cnt):
pos_found = content_str.rfind(val)
dict_content[val] = content_str[0:pos_found + len(val)]
dict_main[array_ids[idx]] = dict_content[val]
self.id = dict_main["user"].split(char)[-1]
except Exception as err:
logss_cbs.error("Format must be [folder/User_id/session_id/SC/weights.txt]", err)
def load_sessions(self, location, subfolder="", regions=68, decimals=6, split_char=" ", fill_gaps=True):
try:
# location/ user_id / sesion_id / data
root = os.path.join(location, self.id)
sessions = get_sessions(root)
self.snapshots = list()
for session in sessions:
if len(subfolder) > 0:
relative_path = os.path.join(root, session, subfolder)
else:
relative_path = os.path.join(root, session)
files = get_files(relative_path)
w, t, r = None, None, None
for file in files:
folder = os.path.join(relative_path, file)
if "weights" in file:
w = load_matrix_dataset(path_dataset=folder, total_length=regions, max_round=decimals,
split_char=split_char)
elif "tract_lengths" in file:
t = load_matrix_dataset(path_dataset=folder, total_length=regions, max_round=decimals,
split_char=split_char)
elif "regions" in file:
r = load_matrix_dataset(path_dataset=folder, total_length=regions, max_round=decimals,
split_char=split_char)
if not isinstance(w, np.ndarray) or w.shape == (0, 0):
if fill_gaps:
w = Fake.fake_matrix(regions=68)
logss_cbs.info(str(session) + " - No weights")
else:
logss_cbs.info(str(session) + " - Filled weights")
if not isinstance(t, np.ndarray) or len(t) == 0:
if fill_gaps:
t = Fake.fake_array(regions=68)
logss_cbs.info(str(session) + " - Filled tract_lengths")
else:
logss_cbs.info(str(session) + " - not tract_lengths")
if not isinstance(r, np.ndarray) or len(t) == 0:
if fill_gaps:
r = Fake.fake_array(regions=68)
logss_cbs.info(str(session) + " - Filled regions")
else:
logss_cbs.info(str(session) + " - not regions")
self.snapshots.append(
Snapshot(id=session, connectivity=Connectivity(w, t, r)))
logss_cbs.info("Total sessions: " + str(len(sessions)))
except Exception as err:
logss_cbs.error("Format must be [folder/User_id/session_id/SC/weights.txt]", err)
def load_sessions_old(self, location, weight_location="SC/weights.txt", regions=68, decimals=6):
try:
user_folder = os.path.join(location, self.id)
sessions = get_sessions(user_folder)
self.snapshots = list()
for session in sessions:
folder = os.path.join(user_folder, session, weight_location)
w = load_matrix_dataset(path_dataset=folder,
total_length=regions,
max_round=decimals)
# TODO: specify when this is available
# tract_lengths= load_matrix_dataset(...)
t = Fake.fake_array(regions=68)
r = Fake.fake_array(regions=68)
self.snapshots.append(
Snapshot(id=session, connectivity=Connectivity(w, t, r)))
logss_cbs.info(session + str(weights_matrix.shape))
logss_cbs.info("Total sessions: " + str(len(sessions)))
except Exception as err:
logss_cbs.error("Format must be [folder/User_id/session_id/SC/weights.txt]", err)
def load_simulations(self, local_subdir, model="rwongwang"):
simulations_folder = get_files(local_subdir)
if len(self.snapshots) == 0:
logss_cbs.info("No snapshots list")
count_sims = 0
for snapshot in self.snapshots:
sim_params_name = PARAM_NAME + "_" + model + "_" + str(snapshot.id)
sim_params = os.path.join(local_subdir, sim_params_name)
snapshot.sims = list()
for sim_name in simulations_folder:
if sim_name == OUTPUT_NAME + "_" + model + "_" + str(snapshot.id):
sim_path = os.path.join(local_subdir, sim_name)
obj_sim = Simulation(name=sim_name, location=local_subdir)
obj_sim.data = LoadModel(sim_path)
obj_sim.output_params = LoadModel(sim_params, concept="parameters")
snapshot.simulation = obj_sim
count_sims += 1
logss_cbs.info("Loaded simulations " + str(count_sims))
def find_best_parameters_per_simulation(self):
# TODO: remove prints
for snapshot in self.snapshots:
_, _, _, param_list = snapshot.simulation.data.shape
param_values_coefs = list()
SC = snapshot.connectivity.weights
# print(snapshot.id)
for param_set in range(param_list):
FC = compFC(snapshot.simulation.data[:, 0, :, param_set]) # [time_serie, states, regions, params]
# corrcoef ,_ = DataAnalysis.model_CCA(SC,FC, components=68, decimals=6)
corrcoef = corrSCFC(SC, FC)
param_values_coefs.append(corrcoef)
maxvalue = max(param_values_coefs)
idx = param_values_coefs.index(maxvalue)
params = snapshot.simulation.output_params[idx]
# print(" -", param_values_coefs)
# print(" - max corr.coef.: ", round(maxvalue,6),", index:" + str(idx) + "", ",params:",params)
logss_cbs.info(
"Sesion:" + str(snapshot.id) + ", max corr.coef.:" + str(round(maxvalue, 6)) + ", params:" + str(
params))
self.info[str(snapshot.id)] = {"max_corr_coef": maxvalue,
"idx_param_set": idx,
"param_set": params}
def compare_sims_timeseries(self, width=12.5, height=10.5):
fig, axs = plt.subplots(len(self.snapshots), 1, figsize=(width, height), squeeze=False)
for idx, snap in enumerate(self.snapshots):
axs[idx,0].plot(snap.simulation.data[:, 0, :, 0])
def compare_sims_scfc(self):
for idx, snap in enumerate(self.snapshots):
w = snap.connectivity.weights
d = snap.simulation.data
user_id_snap = str(self.id) + "_" + str(snap.id)
plot_FCSC(w, compFC(d[:, 0, :, 0]), user_id_snap)
class Fake():
@staticmethod
def fake_matrix(min_val=0.0, max_val=250.0, regions=68):
return np.random.randint(min_val, max_val, (regions, regions))
@staticmethod
def fake_array(min_val=0.0, max_val=250.0, regions=68):
return np.random.randint(min_val, max_val, regions)
class TVB_pySimulation():
def __init__(self, **args):
self.model = args.get('model', lab.models.ReducedWongWang())
self.connectivity = args.get('connectivity', LoadModel_TVB("68"))
self.coupling = args.get('coupling', lab.coupling.Linear(a=np.array([1. / 68.])))
self.integrator = args.get('integrator', lab.integrators.EulerDeterministic(dt=0.1))
self.monitor = args.get('monitor', [lab.monitors.TemporalAverage(period=10.0)])
self.simulation_length = args.get('simulation_length', 4000)
self.conduction_speed = args.get('conduction_speed', None)
self.time, self.data = None, None
def run(self):
self.sim = lab.simulator.Simulator(
model=self.model,
connectivity=self.connectivity,
coupling=self.coupling,
integrator=self.integrator,
monitors=self.monitor,
simulation_length=self.simulation_length,
conduction_speed=self.conduction_speed)
self.sim.configure()
self.sim.history.buffer[:] = 0.0
self.sim.current_state[:] = 0.0
(self.time, self.data), = self.sim.run()
def plot(self):
plt.plot(time=self.time, data=self.data)
plt.show()
class HPC_cudaSimulation():
def __init__(self, auth, clean_job_storages=True, jobArgs=JOB_PARAMS, **args):
auth = Authentication(token=auth.token, access=auth.access, server=auth.server)
self.jobArgs = jobArgs
self.env = Environment_UNICORE(auth=auth)
self.unicore = PyUnicoreManager(environment=self.env, clean_job_storages=True)
self.result = None
def compile_model(self, tvb_loc, model="rwongwang", input_params=""):
steps = ["export PYTHONPATH=$PYTHONPATH:" + tvb_loc + "/scientific_library",
"cd " + tvb_loc + "/scientific_library/tvb/rateML",
"python XML2model.py -m " + model + " -l cuda"]
logss_cbs.info("Compiling model: " + str(model))
self.result = self.unicore.one_run(steps=NVIDIA_SW + steps, parameters=self.jobArgs)
def run(self, tvb_loc, model="rwongwang", input_params="", id=""):
steps = ["export PYTHONPATH=$PYTHONPATH:" + tvb_loc + "/scientific_library",
"cd " + tvb_loc + "/scientific_library/tvb/rateML/run",
"python3 model_driver.py -m " + model + " -w " + input_params]
logss_cbs.info("Doing a simulation: " + str(model) + ", simulation: "+str(id))
self.result = self.unicore.one_run(steps=NVIDIA_SW + steps, parameters=self.jobArgs)
def make_simulations(self, patient, tvb_location, local_subdir, remote_subdir, sim_params="", model="rwongwang"):
count_sims = 0
# For each snapshot a simulation/parameter sweep is done.
if len(patient.snapshots) == 0:
logss_cbs.warning("There is not any snapshot")
return
#self.sim = HPC_cudaSimulation(self.auth, clean_job_storages=True) # To avoid UNICORE issues of availability
self.compile_model(model=model, tvb_loc=tvb_location, input_params=sim_params)
if not self.result:
return None
for snapshot in patient.snapshots:
# Creating a dictionary file as input for the simulation
dict_files = {}
dict_files["weights.txt"] = snapshot.connectivity.weights
dict_files["tract_lengths.txt"] = snapshot.connectivity.tract_lengths
dict_files["regions.txt"] = np.full(snapshot.connectivity.weights.shape, 1.)
# TODO: extend the files
# dict["best_params"] = []
# dict["input_params"] = input_params # snapshot.simulation.input_params # dict: coupling:[min,max], speed:[min, max], etc
# Prepare Zip of input data
StorageManager.store_files_fromDict(where=local_subdir, id=snapshot.id, dic=dict_files, toZip=True)
# Input data
# sim_input_local_name = INPUT_NAME + "_" + model + "_" + str(snapshot.id)
sim_input_path = os.path.join(local_subdir, str(snapshot.id), INPUT_NAME)
# self.pick_dict(file_location=sim_input_path, obj_to_store=dict)
filesToUpload = [[sim_input_path, os.path.join(remote_subdir, INPUT_NAME)]]
if not self.upload_files(filesToUpload):
return None
self.run(model=model, tvb_loc=tvb_location, input_params=sim_params,id=snapshot.id)
if not self.result:
logss_cbs.error("Please review the simulation parameter")
return None
# output data
sim_name = OUTPUT_NAME + "_" + model + "_" + str(snapshot.id)
sim_path = os.path.join(local_subdir, sim_name)
# Output param combinnation
sim_params_name = PARAM_NAME + "_" + model + "_" + str(snapshot.id)
sim_params_path = os.path.join(local_subdir, sim_params_name)
# Downloading timeSeries + parameterSweep values
filesToDownload = [[sim_path, os.path.join(remote_subdir, OUTPUT_NAME)],
[sim_params_path, os.path.join(remote_subdir, PARAM_NAME)]]
if not self.download_result(filesToDownload):
return None
obj_sim = Simulation(name=sim_name, location=local_subdir)
obj_sim.data = LoadModel(sim_path)
obj_sim.output_params = LoadModel(sim_params_path, concept="parameters")
snapshot.simulation = obj_sim
count_sims += 1
logss_cbs.info("Loaded simulations:" + str(count_sims))
return patient.snapshots
def test_GPUs_access(self):
self.result = self.unicore.one_run(steps=NVIDIA_test, parameters=self.jobArgs)
def download_result(self, files):
return self.unicore.downloadFiles(files)
def upload_files(self, files):
return self.unicore.uploadFiles(files)
class KG_Manager():
def __init__(self, token, **args):
self.token = token
self.req = self.check_credentials(self.token)
self.has_access = True
if not self.req.status_code == 200:
self.has_access = False
print("Error code: ", self.req.status_code)
self.query = None
def check_credentials(self, doi=""):
session = requests.Session()
session.headers['Authorization'] = self.token
session.headers['accept'] = 'application/json'
# DOI is optional
return session.get(
"https://kg.humanbrainproject.eu/query/minds/core/dataset/v1.0.0/TVBtest/instances?databaseScope=RELEASED&DOI=" + str(
doi))
def kg_query(self, doi):
if not self.has_access:
return
auth_client = AccessTokenClient(self.token)
http_client = HttpClient("https://kg.humanbrainproject.eu/query", "", auth_client=auth_client)
url = "{}/{}/instances?databaseScope=RELEASED&{}".format("minds/core/dataset/v1.0.0",
"TVBtest", "DOI=" + doi)
self.query = http_client.get(url)
if self.query:
for key, value in self.query["results"][0].items():
key_word = key.split("/")[-1]
if key_word == "name":
print("Title:", value)
print("Database:", self.query["databaseScope"])
def download_file(self, prefix, where):
remote_header = "https://kg.ebrains.eu/proxy/export?container=" + self.query["results"][0][
'https://schema.hbp.eu/myQuery/container_url'][
:-4] # removing "v1.0"
remote_url = remote_header + prefix
local_filename = remote_url.split('/')[-1]
path_local_filename = os.path.join(where, local_filename)
with requests.get(remote_url, stream=True) as r:
r.raise_for_status()
with open(path_local_filename, 'wb') as file:
for chunk in r.iter_content(chunk_size=8192):
# If you have chunk encoded response uncomment "if" and set chunk_size parameter to None.
if chunk:
file.write(chunk)
return path_local_filename, remote_url
def download_range(self, num_from, num_to, prefix, where):
for n in np.arange(num_from, num_to + 1, 1):
web_prefix = prefix + "/" + str(n)
filename, _ = self.download_file(prefix=web_prefix, where=where)
self.unzip_file(local_folder=where, local_filename=filename)
def unzip_file(self, local_folder, local_filename):
files = list()
if zipfile.is_zipfile(local_filename):
with zipfile.ZipFile(local_filename, 'r') as zip_ref:
zip_ref.extractall(local_folder)
print("Extracted ", local_filename, ":", len(zip_ref.namelist()), "files")
class StorageManager():
@staticmethod
def store_files_fromDict(where: str, id: str, dic: dict, toZip=True):
if len(where) > 0:
folder = os.path.join(where, id)
for key, value in dic.items():
if not os.path.exists(folder):
os.makedirs(folder)
file_path = os.path.join(folder, key)
np.savetxt(file_path, value, delimiter='\t') # separator compatible with TVB reader
if toZip:
StorageManager.zip_files_fromDict(folder, dic)
@staticmethod
def zip_files_fromDict(where: str, dic: dict):
zipObj = ZipFile(os.path.join(where, INPUT_NAME), 'w')
for file in dic.keys():
file_path = os.path.join(where, file)
zipObj.write(file_path, arcname=file)
zipObj.close()
@staticmethod
def read_files_fromZip_to_nparray(filename: str, array_shape):
dict = {}
with ZipFile(filename, "r") as zipfile:
for subfile in zipfile.namelist():
if not os.path.isdir(subfile):
with io.TextIOWrapper(zipfile.open(subfile), encoding='utf-8') as file:
dict[str(subfile)] = np.fromstring(file.read(), dtype=float, sep='\t').reshape(array_shape)
print(dict[str(subfile)].shape)
return dict
"""
def load_default_parameters(regions, noise=True):
m = lab.models.ReducedWongWang()
c = LoadModel_TVB(regions)
co = lab.coupling.Linear(a=np.array([1. / 68.]))
#if noise:
# no = lab.integrators.EulerStochastic(dt=1.0, noise=lab.noise.Additive(nsig=np.array([1e-5])))
#else:
# no = lab.integrators.EulerStochastic(dt=1.0)
no = lab.integrators.EulerDeterministic(dt=1.0)
mo = [lab.monitors.TemporalAverage(period=10.0)]
steps = 5000 # 5e3
return m, c, co, no, mo, steps
"""