Skip to content
Snippets Groups Projects
Commit 1c933e83 authored by Eloy Retamino's avatar Eloy Retamino Committed by Manos Angelidis
Browse files

Merged in NRPJP-91-move-from-mpi-comm_world-comm (pull request #9)


NRPJP-91 move from mpi comm world comm

* [NRPJP-91]  MPI communications were moved from COMM_WORLD to COMM_NRP a duplicate of COMM_WORLD (ie. same group, different ID) defined in hbp_nrp_cle module.

* [NRPJP-91] Fixed one left call to COMM_WORLD

Approved-by: default avatarUgo Albanese <ugo.albanese@santannapisa.it>
Approved-by: default avatarKepa Cantero <cantero@fortiss.org>
parent 6a083481
No related branches found
No related tags found
No related merge requests found
......@@ -31,7 +31,7 @@ from hbp_nrp_cle.brainsim.pynn_nest.devices.__NestDeviceGroup import PyNNNestDev
from hbp_nrp_distributed_nest.launch.NestBrainProcess import NestBrainProcess
import hbp_nrp_cle.tf_framework.config as tf_config
from mpi4py import MPI
from hbp_nrp_cle.brainsim import COMM_NRP
import pyNN.nest as sim
......@@ -174,8 +174,8 @@ class DistributedPyNNCommunicationAdapter(PyNNNestCommunicationAdapter):
# propagate the synapse creation parameters to all remote notes, they will run the same
# connection/creation commands after receiving these messages, guaranteed to be
# run from CLE MPI process 0 only
for rank in xrange(1, MPI.COMM_WORLD.Get_size()):
MPI.COMM_WORLD.send({'command': 'ConnectTF', 'type': kind, 'assemblies': assemblies,
for rank in xrange(1, COMM_NRP.Get_size()):
COMM_NRP.send({'command': 'ConnectTF', 'type': kind, 'assemblies': assemblies,
'device': device, 'timestep': timestep,
'params': params},
dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
......@@ -194,6 +194,6 @@ class DistributedPyNNCommunicationAdapter(PyNNNestCommunicationAdapter):
# propagate the deletion configuration to all other processes, guaranteed to be
# run from CLE MPI process 0 only
for rank in xrange(1, MPI.COMM_WORLD.Get_size()):
MPI.COMM_WORLD.send({'command': 'DeleteTF', 'timestep': timestep},
for rank in xrange(1, COMM_NRP.Get_size()):
COMM_NRP.send({'command': 'DeleteTF', 'timestep': timestep},
dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
......@@ -26,7 +26,7 @@ when they should step the simulation.
from hbp_nrp_cle.brainsim.pynn.PyNNControlAdapter import PyNNControlAdapter
from hbp_nrp_distributed_nest.launch.NestBrainProcess import NestBrainProcess
from mpi4py import MPI
from hbp_nrp_cle.brainsim import COMM_NRP
class DistributedPyNNControlAdapter(PyNNControlAdapter):
......@@ -50,10 +50,10 @@ class DistributedPyNNControlAdapter(PyNNControlAdapter):
"""
# notify all other processes, blocking send calls for them to receive
for rank in xrange(MPI.COMM_WORLD.Get_size()):
if rank == MPI.COMM_WORLD.Get_rank():
for rank in xrange(COMM_NRP.Get_size()):
if rank == COMM_NRP.Get_rank():
continue
MPI.COMM_WORLD.send({'command': 'LoadBrain', 'file': network_file,
COMM_NRP.send({'command': 'LoadBrain', 'file': network_file,
'populations': populations},
dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
......@@ -68,10 +68,10 @@ class DistributedPyNNControlAdapter(PyNNControlAdapter):
"""
# notify all other processes, blocking send calls for them to receive
for rank in xrange(MPI.COMM_WORLD.Get_size()):
if rank == MPI.COMM_WORLD.Get_rank():
for rank in xrange(COMM_NRP.Get_size()):
if rank == COMM_NRP.Get_rank():
continue
MPI.COMM_WORLD.send('step', dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
COMM_NRP.send('step', dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
# run the actual simulation step
super(DistributedPyNNControlAdapter, self).run_step(dt)
......@@ -44,7 +44,7 @@ def launch_cle(argv): # pragma: no cover
"""
# import MPI here, must be done after Nest in subclass adapters
from mpi4py import MPI
from hbp_nrp_cle.brainsim import COMM_NRP
# exit code, 0 for success and -1 for any failures
try:
......@@ -116,10 +116,10 @@ def launch_cle(argv): # pragma: no cover
# the tag is a magic number to avoid circular build/release dependency for now but
# this will be removed when the referenced bug is fixed
# notify MPI processes that configuration is complete
for rank in xrange(MPI.COMM_WORLD.Get_size()):
if rank != MPI.COMM_WORLD.Get_rank():
MPI.COMM_WORLD.send('ready', dest=rank, tag=100)
MPI.COMM_WORLD.Barrier()
for rank in xrange(COMM_NRP.Get_size()):
if rank != COMM_NRP.Get_rank():
COMM_NRP.send('ready', dest=rank, tag=100)
COMM_NRP.Barrier()
logger.info('Starting CLE.')
simulation.run() # This is a blocking call, not to be confused with
......@@ -132,12 +132,12 @@ def launch_cle(argv): # pragma: no cover
print '[ MPI ] CLE aborted with message {}, terminating.'.format(e.message)
logger.exception(e)
for rank in xrange(MPI.COMM_WORLD.Get_size()):
if rank != MPI.COMM_WORLD.Get_rank():
MPI.COMM_WORLD.send('abort', dest=rank, tag=100)
for rank in xrange(COMM_NRP.Get_size()):
if rank != COMM_NRP.Get_rank():
COMM_NRP.send('abort', dest=rank, tag=100)
print '[ MPI ] ABORTing distributed CLE process: {}'.format(str(MPI.COMM_WORLD.Get_rank()))
MPI.COMM_WORLD.Abort(-1)
print '[ MPI ] ABORTing distributed CLE process: {}'.format(str(COMM_NRP.Get_rank()))
COMM_NRP.Abort(-1)
finally:
......@@ -150,6 +150,6 @@ def launch_cle(argv): # pragma: no cover
# terminate the spawned brain processes
# send a shutdown message in case the brain processes are in a recv loop at startup since they
# seem to block and ignore the Abort command until receiving a message
for rank in xrange(MPI.COMM_WORLD.Get_size()):
if rank != MPI.COMM_WORLD.Get_rank():
MPI.COMM_WORLD.send('shutdown', dest=rank, tag=100)
for rank in xrange(COMM_NRP.Get_size()):
if rank != COMM_NRP.Get_rank():
COMM_NRP.send('shutdown', dest=rank, tag=100)
......@@ -45,7 +45,7 @@ def launch_brain(argv): # pragma: no cover
"""
# import MPI here, must be done after Nest in subclass adapters
from mpi4py import MPI
from hbp_nrp_cle.brainsim import COMM_NRP
try:
parser = argparse.ArgumentParser()
......@@ -73,5 +73,5 @@ def launch_brain(argv): # pragma: no cover
traceback.print_exc()
print str(ex)
# for any failures, terminate all other brain processes and the CLE
print '[ MPI ] ABORTing distributed NEST process: {}'.format(str(MPI.COMM_WORLD.Get_rank()))
MPI.COMM_WORLD.Abort(-1)
print '[ MPI ] ABORTing distributed NEST process: {}'.format(str(COMM_NRP.Get_rank()))
COMM_NRP.Abort(-1)
......@@ -40,6 +40,7 @@ from hbp_nrp_cle.tf_framework._CleanableTransferFunctionParameter import (
ICleanableTransferFunctionParameter)
from mpi4py import MPI
from hbp_nrp_cle.brainsim import COMM_NRP
import traceback
......@@ -99,7 +100,7 @@ class NestBrainProcess(object):
while self._running:
# block and read messages from the CLE
data = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=NestBrainProcess.MPI_MSG_TAG)
data = COMM_NRP.recv(source=MPI.ANY_SOURCE, tag=NestBrainProcess.MPI_MSG_TAG)
# new transfer function, dictionary of parameters
if isinstance(data, dict):
......@@ -131,7 +132,7 @@ class NestBrainProcess(object):
self._ready = True
# notify all processes start at the same time (all Nest commands barrier themselves)
MPI.COMM_WORLD.Barrier()
COMM_NRP.Barrier()
# step the simulation, commanded by the CLE
elif data == 'step':
......@@ -157,7 +158,7 @@ class NestBrainProcess(object):
# self._brain_controller.shutdown()
# self._brain_communicator.shutdown()
raise Exception('Raising exception to ABORT distributed NEST process: {}'.format(
str(MPI.COMM_WORLD.Get_rank())
str(COMM_NRP.Get_rank())
))
# unknown message, this is a critical failure since this should never happen
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment