Skip to content
Snippets Groups Projects
Commit dcde0b08 authored by Kenny Sharma's avatar Kenny Sharma
Browse files

[NRRPLT-4722] Support faux-dynamic MUSIC ports for transfer functions.

This patch fixes a critical issue where the output of distributed simulations
did not match that of single process simulations.

The issue was due to static synapse creation for proxy connections that did
not consider the specific spike devices that were connected - each of which
have different default and user configurable values - and would produce
similar but incorrect output. Further, fixed port width meant that multiple
transfer functions would be connected to the same parrot neurons, resulting
in cumulative output to the actual brain neurons rather than properly
separated input from each device.

To resolve this issue, the following was implemented:
- faux large port widths to the brain allowing 5 x population connections
- dynamic configuration of these ports via MPI communication
- use of Nest freeze/thaw paradigm to try and reduce performance impact

Thus, the CLE notifies the remote brain processes of each TF's specific
configuration and it is replicated using the appropriate free proxy
neurons on each side through direct Nest manipulation. Once the CLE
notifies the processes that the transfer functions are loaded, they no
longer accept configuration messages and run the simulation.

This creates proper transfer function connectivity to workaround the lack
of dynamic port creation in MUSIC and produces the same results as the
single process version of the Husky experiment.

This patch should absolutely be removed and fully refactored when
MUSIC supports dynamic port creation, it is a workaround to achieve
functionality.

Change-Id: Ifee5e668c9644e679861dc7598dff9e9f5c6fa52
parent 4782ac39
No related branches found
No related tags found
No related merge requests found
Showing
with 283 additions and 31 deletions
......@@ -24,6 +24,9 @@ class MUSICConfiguration(object):
MUSIC script that can be used to launch the distributed simulation components.
"""
# FIXME: [NRRPLT-4722] fudge factor for faux dynamic ports
FAUX_DYNAMIC_FUDGE_FACTOR = 5
def __init__(self, bibi):
"""
Convert the given BIBI configuration into a MUSIC XML configuration.
......@@ -52,7 +55,11 @@ class MUSICConfiguration(object):
width = get_neuron_count(bibi_population)
music_population = self.__bibi_to_music_population(bibi_population)
self.__add_port('%s_to_brain' % name, width, 'CLE', 'BRAIN', name, music_population)
# FIXME: [NRRPLT-4722] Workaround for lack of dynamic MUSIC ports, allow the user
# <fudge> the size of the population for devices (the neurons will be frozen
# by default, which should not impact performance (too much))
faux_w = width * MUSICConfiguration.FAUX_DYNAMIC_FUDGE_FACTOR
self.__add_port('%s_to_brain' % name, faux_w, 'CLE', 'BRAIN', name, music_population)
self.__add_port('%s_to_cle' % name, width, 'BRAIN', 'CLE', name, music_population)
# empty dict of applications, CLE and BRAIN definitions are required
......
......@@ -7,6 +7,11 @@ from hbp_nrp_cle.brainsim.pynn_nest.PyNNNestCommunicationAdapter import PyNNNest
from hbp_nrp_cle.brainsim.pynn import simulator as sim
import hbp_nrp_cle.tf_framework.config as tf_config
from hbp_nrp_music_interface.bibi.bibi_music_config import MUSICConfiguration
from hbp_nrp_music_interface.launch.MUSICBrainProcess import MUSICBrainProcess
from mpi4py import MPI
import logging
logger = logging.getLogger(__name__)
......@@ -27,7 +32,13 @@ class MUSICPyNNCommunicationAdapter(PyNNNestCommunicationAdapter):
def register_spike_source(self, populations, spike_generator_type, **params):
"""
Intercepts default PyNNNestCommunicationAdapter request and replaces specified
population with requivalent spike source proxy.
population with a new set of proxy neurons for each individual TF. This is required
to ensure weighting between the TF and target node is maintained properly as multiple
TFs can target a single neuron and we must weight their inputs separately.
This function is over-complicated due to backwards comaptibility for the single process
CLE, it can be simplified in the future if we merge these repositories and especially
when MUSIC supports dynamic ports.
:param populations: A reference to the populations to which the spike generator
should be connected
......@@ -36,9 +47,20 @@ class MUSICPyNNCommunicationAdapter(PyNNNestCommunicationAdapter):
:param params: A dictionary of configuration parameters
:return: A communication object or a group of objects
"""
populations = self.__get_population_proxy(populations, 'source')
return super(MUSICPyNNCommunicationAdapter, self). \
register_spike_source(populations, spike_generator_type, **params)
# get the next set of free parrot neurons with real connectivity to proxy_out neuron
proxies = self.__get_population_proxy(populations, 'source')
# connect the device generator to the new proxy neurons, this will create synapses with
# given parameters that we will need to duplicate from proxy->real neuron (parrot neurons
# ignore their input synapses, but output synapses are respected)
device = super(MUSICPyNNCommunicationAdapter, self). \
register_spike_source(proxies, spike_generator_type, **params)
# notify the remote brain processes that they need to setup the other side of this proxy
self.__notify_brain_processes(populations, proxies)
# return the new generator device
return device
def register_spike_sink(self, populations, spike_detector_type, **params):
"""
......@@ -56,9 +78,82 @@ class MUSICPyNNCommunicationAdapter(PyNNNestCommunicationAdapter):
return super(MUSICPyNNCommunicationAdapter, self). \
register_spike_sink(populations, spike_detector_type, **params)
@staticmethod
def __notify_brain_processes(populations, proxies):
"""
Notify remote MPI Brain Processes that they must complete this transfer function
connection by duplicating the parameters for this device connection with the output
proxy neurons. This is the only way to ensure a TF is actually connected with the
right parameters.
:param populations The target population to connect to on the other side.
:param proxies The proxy/parrot neurons that the device is actually connected to.
"""
# nest is the only supported simulator in the hbp_nrp_music packages, but assert here
# in case this changes, import here to avoid starting nest by accident earlier
assert sim.simulator.name == "NEST", "NEST is currently required to reconfigure MUSIC ports"
import nest
# synapse details to pass to other MPI clients, the synapse parameters should all be
# the same, but in the future a device may have variable/randomly parameterized
# synapses so just support them now
synapse_params = []
# for each proxy/reference pair, extract input connectivity from the device, the real
# reference output targets, and finally duplicate the input connectivity to the targets
for p in map(int, proxies.all_cells):
# the reference connection from the parrot neuron to proxy out, this contains music
# channel information
ref_c = nest.GetStatus(nest.GetConnections(source=[p]))[0]
channel = ref_c['receptor']
# the connection from device to parrot neuron, our desired synapse
dev_c = nest.GetStatus(nest.GetConnections(target=[p]))[0]
model = str(dev_c['synapse_model'])
# remove specific connection information, only leave synapse params since
# otherwise nest will complain about unused values
for k in ['receptor', 'source', 'target', 'synapse_label', 'synapse_model', 'sizeof']:
if k in dev_c:
dev_c.pop(k)
# override the original dummy weighted parrot->proxy synapse, we can't create a
# new synapse because MUSIC will complain about duplicate use of music_channels
nest.SetStatus(nest.GetConnections(source=[p]), dev_c)
# store the connection parameters for remote clients
dev_c['model'] = model
dev_c['music_channel'] = channel
synapse_params.append(dev_c)
# population information to send to the remote MPI nodes, we can't pickle Populations
# directly and those references wouldn't be valid on the remote nodes anyway
label = populations.parent.label if populations.parent else populations.label
# [NRRPLT-4722] workaround, if there is no population mask, then we need to use the size
# of the proxies to target the real size of the neuron population instead of the inflated
# proxy size
mask = populations.mask if populations.mask else slice(0, len(proxies), 1)
# propagate the synapse creation parameters to all remote notes, they will create the
# other side of the connections for this type
for rank in xrange(MPI.COMM_WORLD.Get_size()):
if rank == MPI.COMM_WORLD.Get_rank():
continue
MPI.COMM_WORLD.send({'label': label, 'mask': mask, 'synapses': synapse_params},
dest=rank, tag=MUSICBrainProcess.MPI_MSG_TAG)
# wait for the remote brain processes before continuing, use the global MPI comm
# handle that include all of the MUSIC processes, not just the local group ones
MPI.COMM_WORLD.Barrier()
def __get_population_proxy(self, population, proxy_type):
"""
Retrieves the specified population proxy type for a given population.
Retrieves the first <size> free proxy neurons of the specified population proxy type
for a given population.
:param population The population to find/create a proxy for.
:param proxy_type string The type of proxy device to retrieve.
......@@ -74,13 +169,53 @@ class MUSICPyNNCommunicationAdapter(PyNNNestCommunicationAdapter):
assert isinstance(population, sim.PopulationView)
try:
# for sinks we must use the same few neurons that are allocated at MUSIC launch
# otherwise the frontend will not properly fill out the neuron monitor and things
if proxy_type == 'sink':
# top level population view with no subslicing (e.g. sensors, actors, etc.)
if isinstance(population.parent, sim.Population):
return tf_config.music_proxies[population.label][proxy_type]
# otherwise, this is a view of a top level named population view
parent = tf_config.music_proxies[population.parent.label][proxy_type]
return sim.PopulationView(parent, population.mask, population.label)
# [NRRPLT-4722] workaround, simply get the first <size> proxy neurons that are
# frozen for a population, unfreeze and return them for connection, for full
# population requests only get the real population size
import nest
# top level population view with no subslicing (e.g. sensors, actors, etc.)
if isinstance(population.parent, sim.Population):
return tf_config.music_proxies[population.label][proxy_type]
proxies = tf_config.music_proxies[population.label][proxy_type]
size = proxies.size / MUSICConfiguration.FAUX_DYNAMIC_FUDGE_FACTOR
# otherwise, this is a view of a top level named population view
parent = tf_config.music_proxies[population.parent.label][proxy_type]
return sim.PopulationView(parent, population.mask, population.label)
else:
proxies = tf_config.music_proxies[population.parent.label][proxy_type]
size = population.size
# find all of the free/frozen proxy neurons
mask = []
for i, p in enumerate(map(int, proxies.all_cells)):
# if the neuron is frozen, unfreeze it and add to our list
if nest.GetStatus([p], 'frozen')[0]:
nest.SetStatus([p], 'frozen', False)
mask.append(i)
# stop looping when we have enough free neurons
if len(mask) == size:
break
# make sure we have enough free proxy neurons
if len(mask) != size:
raise Exception("Not enough free proxy neurons to connect transfer functions!\n"
"Please contact neurorobotics@humanbrainproject.eu with details of "
"this experiment and associated BIBI file if you require support.")
# return a view of the free proxy neurons for connection
return sim.PopulationView(proxies, mask, population.label)
except KeyError:
raise Exception("Unable to locate distriuted MUSIC neuron population proxies for {}.\n"
......
......@@ -16,20 +16,25 @@ import argparse
import os
import music
from mpi4py import MPI
import traceback
class MUSICBrainProcess(object):
"""
A distributed brain process that can be launched standalone on remote hosts.
"""
def __init__(self, bibi_file, exp_path):
# tag to listen for MPI configuration/command messages
MPI_MSG_TAG = 100
def __init__(self, bibi_file):
"""
Load the distributed brain and construct MUSIC proxies for communication
to/from the CLE. Nest will automatically allocate the brain in a round-robin
fashion under the hood, we do not need to do anything explicitly.
:param bibi_file The absolute path to the BIBI file for this experiment.
:param exp_path The base dir of the experiment being launched.
"""
self._music_setup = music.Setup()
......@@ -38,7 +43,13 @@ class MUSICBrainProcess(object):
with open(bibi_file) as f:
self._bibi = bibi_api_gen.CreateFromDocument(f.read())
pop_dict = get_all_neurons_as_dict(self._bibi.brainModel.populations)
brain_file = os.path.join(exp_path, self._bibi.brainModel.file)
# load the models path on this node and set absolute bibi path
models_path = os.environ.get('NRP_MODELS_DIRECTORY')
if models_path is None:
raise Exception("Unable to determine bibi path, NRP_MODELS_DIRECTORY is not "
"defined on target node!")
brain_file = os.path.join(models_path, self._bibi.brainModel.file)
# spawn CLE components that will handle loading the brain file and interfaces
self._brain_controller = PyNNControlAdapter()
......@@ -64,6 +75,85 @@ class MUSICBrainProcess(object):
raise Exception("Unable to load MUSIC proxy population definitions from MUSIC "
"configuration script. MUSIC_XML value was not properly set.")
def run(self):
"""
Blocking run loop for this brain process. First accept any transfer function configuration
via MPI messages and then block running the brain until terminated externally by the CLE
shutting down.
"""
# listen for transfer function creation messages until the CLE tells us to start
# NOTE: this is done on the same thread because MVAPICH2 defaults to single threaded
# when compiled for performance reasons, attempting to read/write to the MPI
# communicator on other threads causes freezing/segfault so don't do it unless
# we change how MVAPICH2 is compiled!
ready = False
while not ready:
# block and read messages from the CLE
data = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=MUSICBrainProcess.MPI_MSG_TAG)
# new transfer function, dictionary of parameters
if isinstance(data, dict):
# create the tf based on the proxy direction from the CLE
self._connect_tf(data)
# command and control string from the CLE
elif isinstance(data, str) and data == 'ready':
# TF loading is complete, we can start the simulation
ready = True
# unknown message, this is a critical failure since this should never happen
# fully abort and log the condition
else:
raise Exception('Remote brain process received unknown message: %s' % str(data))
# ensure the CLE and other brain processes have all processed the message
MPI.COMM_WORLD.Barrier()
# run until forcefully terminated
while True:
sim.run(sim.get_time_step())
def _connect_tf(self, params):
"""
Reflect a transfer function connection made on the CLE side by connecting proxy neurons
to real brain neurons using the same parameters and connectivity as the CLE. This is the
only way to guarantee both sides share the same connectivity using static port allocation.
:param params The connectivity/synapse parameters passed by the CLE.
"""
# get the population of neurons from our dictionary, we can guarantee this is a valid
# population that has been declared in the BIBI at this point as the CLE will validate
# before sending us the create message
brain_pop = sim.PopulationView(tf_config.brain_root.__dict__[params['label']],
selector=params['mask'])
# get the whole population of proxy neurons for this port
port_name = '%s_to_brain' % params['label']
proxies = self._proxies[port_name]
# this is Nest specific code, but so is the rest of the pipeline
assert sim.simulator.name == "NEST", "Currently only NEST is supported"
import nest
# iterate through synapses and connect the specific proxy neuron (via music channel) to real
# brain neuron with given parameters
for synapse, brain_neuron in zip(params['synapses'], map(int, brain_pop.all_cells)):
# get the proxy neuron at the channel index
proxy = proxies[synapse['music_channel']]
synapse.pop('music_channel')
# thaw the proxy neuron so we actually get data
nest.SetStatus([proxy], 'frozen', False)
# for a source, we only need to connect the input proxy to the real neuron
nest.Connect([proxy], [brain_neuron], syn_spec=synapse)
if __name__ == '__main__':
......@@ -77,20 +167,17 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--bibi-file', dest='bibi_file',
help='the absolute bibi file path to load', required=True)
parser.add_argument('--experiment-path', dest='exp_path',
help='the root path of this experiment', required=True)
args = parser.parse_args()
# construct brain and proxies
brain = MUSICBrainProcess(args.bibi_file, args.exp_path)
brain = MUSICBrainProcess(args.bibi_file)
# sim.run() is blocking and will only advance when all other brain processes have called
# it as well, this mimics the operation done by the CLE as well, run until this process
# is temrinated by the CLE ending the simulation or CLE/brain process aborting on failure
while True:
sim.run(sim.get_time_step())
# run the brain until terminated, this is a blocking call
brain.run()
except Exception: # pylint: disable=W0703
# print the traceback which should go back to the remote logger
traceback.print_exc()
except Exception as e: # pylint: disable=W0703
# for any failures, terminate all other brain processes and the CLE
from mpi4py import MPI
MPI.COMM_WORLD.Abort()
......@@ -79,8 +79,7 @@ class MUSICLauncher(object):
'--music'],
1)
self._config.add_application('BRAIN', brain_launcher,
['--bibi-file={}'.format(self._bibi_file),
'--experiment-path={}'.format(self._exp_path)],
['--bibi-file={}'.format(self._bibi_file)],
brain_processes)
self._config.save(self._tmpdir)
......
......@@ -10,9 +10,9 @@ np = 2
binary = brain_binary
args = "--brain --args"
CLE.range_to_brain -> BRAIN.range_to_brain [2]
CLE.range_to_brain -> BRAIN.range_to_brain [10]
BRAIN.range_to_cle -> CLE.range_to_cle [2]
CLE.list_to_brain -> BRAIN.list_to_brain [3]
CLE.list_to_brain -> BRAIN.list_to_brain [15]
BRAIN.list_to_cle -> CLE.list_to_cle [3]
CLE.population_to_brain -> BRAIN.population_to_brain [10]
CLE.population_to_brain -> BRAIN.population_to_brain [50]
BRAIN.population_to_cle -> CLE.population_to_cle [10]
......@@ -3,7 +3,7 @@
<port>
<type>Event</type>
<name>range_to_brain</name>
<width>2</width>
<width>10</width>
<sender>
<name>CLE</name>
</sender>
......@@ -43,7 +43,7 @@
<port>
<type>Event</type>
<name>list_to_brain</name>
<width>3</width>
<width>15</width>
<sender>
<name>CLE</name>
</sender>
......@@ -83,7 +83,7 @@
<port>
<type>Event</type>
<name>population_to_brain</name>
<width>10</width>
<width>50</width>
<sender>
<name>CLE</name>
</sender>
......
......@@ -20,6 +20,8 @@ class MockPopulationView(object):
self.parent = parent
self.label = label
self.mask = mask
self.size = 0
self.all_cells = []
@patch('hbp_nrp_cle.brainsim.pynn_nest.PyNNNestCommunicationAdapter.PyNNNestCommunicationAdapter.initialize')
......@@ -48,10 +50,12 @@ class TestCommunicationAdapter(unittest.TestCase):
tf_config.music_proxies = {'parent':{'sink':pop}}
self._adapter.register_spike_sink(pop, 'foo')
'''
def test_population_list(self, mock_initialize, mock_source, mock_sink):
pop = MockPopulationView(MockPopulation('parent'), 'population', 'mask')
tf_config.music_proxies = {'population':{'source':pop}}
self._adapter.register_spike_source([pop, pop, pop], 'foo')
'''
def test_invalid_population(self, mock_initialize, mock_source, mock_sink):
pop = MockPopulationView(MockPopulation('parent'), 'population', 'mask')
......
......@@ -37,6 +37,13 @@ class PyNNConnectorFactory(object):
:param synapse: Uses this PyNN synapse instead of 'StaticSynapse', default None
"""
# only create default synapses to the cle ports since we will dynamically configure
# device connectivity <-> proxies from the cle (e.g. spike generators or recorders)
# but we need connectivity in the remote brain processes from parrot -> proxy so that
# spikes are propagated
if not port_name.endswith('to_cle'):
return
try:
connector = self.connector_types[connection_rule]
except KeyError, e:
......
......@@ -147,6 +147,19 @@ class PyNNProxyFactory(object):
logger.debug("Created {port_width} parrot neurons and connected it to outgoing "
"port {port_name}"
.format(port_width=port_width, port_name=port_name))
# [NRRPLT-4722] workaround for lack of dynamic MUSIC ports, since we have many
# more parrot neurons than required, freeze them. this means they will not be
# updated until we explicitly thaw them during connection.
if port_name.endswith('to_brain'):
nest.SetStatus(map(int, parrots.all_cells), 'frozen', True)
return parrots
else:
# [NRRPLT-4722] workaround for lack of dynamic MUSIC ports, since we have many
# more parrot neurons than required, freeze them. this means they will not be
# updated until we explicitly thaw them during connection.
if port_name.endswith('to_brain'):
nest.SetStatus(map(int, proxy.all_cells), 'frozen', True)
return proxy
......@@ -43,7 +43,7 @@ class TestPyNNConnectorFactory(unittest.TestCase):
proxy = sim.Population(10, sim.IF_cond_alpha())
target_population = sim.Population(10, sim.IF_cond_alpha())
connector_factory.create_and_connect_synapse(proxy, "test",
connector_factory.create_and_connect_synapse(proxy, "test_to_cle",
connection_rule,
population_slice,
target_population,
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment