From 1c933e8369f980d67ab8cca337a82ef8af7b8a4f Mon Sep 17 00:00:00 2001
From: Eloy Retamino <retamino@ugr.es>
Date: Thu, 19 Sep 2019 12:41:54 +0000
Subject: [PATCH] Merged in NRPJP-91-move-from-mpi-comm_world-comm (pull
 request #9)

NRPJP-91 move from mpi comm world comm

* [NRPJP-91]  MPI communications were moved from COMM_WORLD to COMM_NRP a duplicate of COMM_WORLD (ie. same group, different ID) defined in hbp_nrp_cle module.

* [NRPJP-91] Fixed one left call to COMM_WORLD

Approved-by: Ugo Albanese <ugo.albanese@santannapisa.it>
Approved-by: Kepa Cantero <cantero@fortiss.org>
---
 .../DistributedPyNNCommunicationAdapter.py    | 10 +++----
 .../cle/DistributedPyNNControlAdapter.py      | 14 +++++-----
 .../launch/DistributedCLEProcess.py           | 26 +++++++++----------
 .../launch/DistributedNestProcess.py          |  6 ++---
 .../launch/NestBrainProcess.py                |  7 ++---
 5 files changed, 32 insertions(+), 31 deletions(-)

diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNCommunicationAdapter.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNCommunicationAdapter.py
index c502e1d..417af2e 100644
--- a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNCommunicationAdapter.py
+++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNCommunicationAdapter.py
@@ -31,7 +31,7 @@ from hbp_nrp_cle.brainsim.pynn_nest.devices.__NestDeviceGroup import PyNNNestDev
 from hbp_nrp_distributed_nest.launch.NestBrainProcess import NestBrainProcess
 import hbp_nrp_cle.tf_framework.config as tf_config
 
-from mpi4py import MPI
+from hbp_nrp_cle.brainsim import COMM_NRP
 
 import pyNN.nest as sim
 
@@ -174,8 +174,8 @@ class DistributedPyNNCommunicationAdapter(PyNNNestCommunicationAdapter):
         # propagate the synapse creation parameters to all remote notes, they will run the same
         # connection/creation commands after receiving these messages, guaranteed to be
         # run from CLE MPI process 0 only
-        for rank in xrange(1, MPI.COMM_WORLD.Get_size()):
-            MPI.COMM_WORLD.send({'command': 'ConnectTF', 'type': kind, 'assemblies': assemblies,
+        for rank in xrange(1, COMM_NRP.Get_size()):
+            COMM_NRP.send({'command': 'ConnectTF', 'type': kind, 'assemblies': assemblies,
                                  'device': device, 'timestep': timestep,
                                  'params': params},
                                 dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
@@ -194,6 +194,6 @@ class DistributedPyNNCommunicationAdapter(PyNNNestCommunicationAdapter):
 
         # propagate the deletion configuration to all other processes, guaranteed to be
         # run from CLE MPI process 0 only
-        for rank in xrange(1, MPI.COMM_WORLD.Get_size()):
-            MPI.COMM_WORLD.send({'command': 'DeleteTF', 'timestep': timestep},
+        for rank in xrange(1, COMM_NRP.Get_size()):
+            COMM_NRP.send({'command': 'DeleteTF', 'timestep': timestep},
                                 dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNControlAdapter.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNControlAdapter.py
index cc775a0..9114b55 100644
--- a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNControlAdapter.py
+++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/cle/DistributedPyNNControlAdapter.py
@@ -26,7 +26,7 @@ when they should step the simulation.
 from hbp_nrp_cle.brainsim.pynn.PyNNControlAdapter import PyNNControlAdapter
 from hbp_nrp_distributed_nest.launch.NestBrainProcess import NestBrainProcess
 
-from mpi4py import MPI
+from hbp_nrp_cle.brainsim import COMM_NRP
 
 
 class DistributedPyNNControlAdapter(PyNNControlAdapter):
@@ -50,10 +50,10 @@ class DistributedPyNNControlAdapter(PyNNControlAdapter):
         """
 
         # notify all other processes, blocking send calls for them to receive
-        for rank in xrange(MPI.COMM_WORLD.Get_size()):
-            if rank == MPI.COMM_WORLD.Get_rank():
+        for rank in xrange(COMM_NRP.Get_size()):
+            if rank == COMM_NRP.Get_rank():
                 continue
-            MPI.COMM_WORLD.send({'command': 'LoadBrain', 'file': network_file,
+            COMM_NRP.send({'command': 'LoadBrain', 'file': network_file,
                                  'populations': populations},
                                 dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
 
@@ -68,10 +68,10 @@ class DistributedPyNNControlAdapter(PyNNControlAdapter):
         """
 
         # notify all other processes, blocking send calls for them to receive
-        for rank in xrange(MPI.COMM_WORLD.Get_size()):
-            if rank == MPI.COMM_WORLD.Get_rank():
+        for rank in xrange(COMM_NRP.Get_size()):
+            if rank == COMM_NRP.Get_rank():
                 continue
-            MPI.COMM_WORLD.send('step', dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
+            COMM_NRP.send('step', dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
 
         # run the actual simulation step
         super(DistributedPyNNControlAdapter, self).run_step(dt)
diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedCLEProcess.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedCLEProcess.py
index 699ba6b..808f221 100644
--- a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedCLEProcess.py
+++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedCLEProcess.py
@@ -44,7 +44,7 @@ def launch_cle(argv): # pragma: no cover
     """
 
     # import MPI here, must be done after Nest in subclass adapters
-    from mpi4py import MPI
+    from hbp_nrp_cle.brainsim import COMM_NRP
 
     # exit code, 0 for success and -1 for any failures
     try:
@@ -116,10 +116,10 @@ def launch_cle(argv): # pragma: no cover
         # the tag is a magic number to avoid circular build/release dependency for now but
         # this will be removed when the referenced bug is fixed
         # notify MPI processes that configuration is complete
-        for rank in xrange(MPI.COMM_WORLD.Get_size()):
-            if rank != MPI.COMM_WORLD.Get_rank():
-                MPI.COMM_WORLD.send('ready', dest=rank, tag=100)
-        MPI.COMM_WORLD.Barrier()
+        for rank in xrange(COMM_NRP.Get_size()):
+            if rank != COMM_NRP.Get_rank():
+                COMM_NRP.send('ready', dest=rank, tag=100)
+        COMM_NRP.Barrier()
 
         logger.info('Starting CLE.')
         simulation.run()  # This is a blocking call, not to be confused with
@@ -132,12 +132,12 @@ def launch_cle(argv): # pragma: no cover
         print '[ MPI ] CLE aborted with message {}, terminating.'.format(e.message)
         logger.exception(e)
 
-        for rank in xrange(MPI.COMM_WORLD.Get_size()):
-            if rank != MPI.COMM_WORLD.Get_rank():
-                MPI.COMM_WORLD.send('abort', dest=rank, tag=100)
+        for rank in xrange(COMM_NRP.Get_size()):
+            if rank != COMM_NRP.Get_rank():
+                COMM_NRP.send('abort', dest=rank, tag=100)
 
-        print '[ MPI ] ABORTing distributed CLE process: {}'.format(str(MPI.COMM_WORLD.Get_rank()))
-        MPI.COMM_WORLD.Abort(-1)
+        print '[ MPI ] ABORTing distributed CLE process: {}'.format(str(COMM_NRP.Get_rank()))
+        COMM_NRP.Abort(-1)
 
     finally:
 
@@ -150,6 +150,6 @@ def launch_cle(argv): # pragma: no cover
     # terminate the spawned brain processes
     # send a shutdown message in case the brain processes are in a recv loop at startup since they
     # seem to block and ignore the Abort command until receiving a message
-    for rank in xrange(MPI.COMM_WORLD.Get_size()):
-        if rank != MPI.COMM_WORLD.Get_rank():
-            MPI.COMM_WORLD.send('shutdown', dest=rank, tag=100)
+    for rank in xrange(COMM_NRP.Get_size()):
+        if rank != COMM_NRP.Get_rank():
+            COMM_NRP.send('shutdown', dest=rank, tag=100)
diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedNestProcess.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedNestProcess.py
index 4218c36..12ff3c4 100644
--- a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedNestProcess.py
+++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/DistributedNestProcess.py
@@ -45,7 +45,7 @@ def launch_brain(argv): # pragma: no cover
     """
 
     # import MPI here, must be done after Nest in subclass adapters
-    from mpi4py import MPI
+    from hbp_nrp_cle.brainsim import COMM_NRP
 
     try:
         parser = argparse.ArgumentParser()
@@ -73,5 +73,5 @@ def launch_brain(argv): # pragma: no cover
         traceback.print_exc()
         print str(ex)
         # for any failures, terminate all other brain processes and the CLE
-        print '[ MPI ] ABORTing distributed NEST process: {}'.format(str(MPI.COMM_WORLD.Get_rank()))
-        MPI.COMM_WORLD.Abort(-1)
+        print '[ MPI ] ABORTing distributed NEST process: {}'.format(str(COMM_NRP.Get_rank()))
+        COMM_NRP.Abort(-1)
diff --git a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestBrainProcess.py b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestBrainProcess.py
index a887d5b..3aea30c 100644
--- a/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestBrainProcess.py
+++ b/hbp_nrp_distributed_nest/hbp_nrp_distributed_nest/launch/NestBrainProcess.py
@@ -40,6 +40,7 @@ from hbp_nrp_cle.tf_framework._CleanableTransferFunctionParameter import (
     ICleanableTransferFunctionParameter)
 
 from mpi4py import MPI
+from hbp_nrp_cle.brainsim import COMM_NRP
 import traceback
 
 
@@ -99,7 +100,7 @@ class NestBrainProcess(object):
         while self._running:
 
             # block and read messages from the CLE
-            data = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=NestBrainProcess.MPI_MSG_TAG)
+            data = COMM_NRP.recv(source=MPI.ANY_SOURCE, tag=NestBrainProcess.MPI_MSG_TAG)
 
             # new transfer function, dictionary of parameters
             if isinstance(data, dict):
@@ -131,7 +132,7 @@ class NestBrainProcess(object):
                 self._ready = True
 
                 # notify all processes start at the same time (all Nest commands barrier themselves)
-                MPI.COMM_WORLD.Barrier()
+                COMM_NRP.Barrier()
 
             # step the simulation, commanded by the CLE
             elif data == 'step':
@@ -157,7 +158,7 @@ class NestBrainProcess(object):
                 # self._brain_controller.shutdown()
                 # self._brain_communicator.shutdown()
                 raise Exception('Raising exception to ABORT distributed NEST process: {}'.format(
-                    str(MPI.COMM_WORLD.Get_rank())
+                    str(COMM_NRP.Get_rank())
                 ))
 
             # unknown message, this is a critical failure since this should never happen
-- 
GitLab