diff --git a/CMakeLists.txt b/CMakeLists.txt index 39c99a77d6369c85c310c4fb920c83f451f1975e..ba3f0cc3366589d54b048085aa0221fb0a040a57 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -175,10 +175,9 @@ endif() #---------------------------------------------------------- # MPI support #---------------------------------------------------------- -set(ARB_DISTRIBUTED_MODEL "serial" CACHE STRING "set the global communication model, one of serial/mpi/dryrun") -set_property(CACHE ARB_DISTRIBUTED_MODEL PROPERTY STRINGS serial mpi dryrun) +option(ARB_WITH_MPI "build with support for MPI" OFF) -if(ARB_DISTRIBUTED_MODEL MATCHES "mpi") +if(ARB_WITH_MPI) # BGQ specific flags if(${ARB_SYSTEM_TYPE} MATCHES "BGQ" ) # On BGQ, set CXX to the mpi wrapper, and pass it a static @@ -195,18 +194,6 @@ if(ARB_DISTRIBUTED_MODEL MATCHES "mpi") # unfortunate workaround for C++ detection in system mpi.h add_definitions(-DMPICH_SKIP_MPICXX=1 -DOMPI_SKIP_MPICXX=1) set_property(DIRECTORY APPEND_STRING PROPERTY COMPILE_OPTIONS "${MPI_C_COMPILE_FLAGS}") - - set(ARB_WITH_MPI TRUE) - -elseif(ARB_DISTRIBUTED_MODEL MATCHES "dryrun") - add_definitions(-DARB_HAVE_DRYRUN) - set(ARB_WITH_DRYRUN TRUE) - -elseif(ARB_DISTRIBUTED_MODEL MATCHES "serial") - # no additional set up needed - -else() - message( FATAL_ERROR "-- Distributed communication model '${ARB_DISTRIBUTED_MODEL}' not supported, use one of serial/mpi/dryrun") endif() #---------------------------------------------------------- diff --git a/doc/cpp_distributed_context.rst b/doc/cpp_distributed_context.rst new file mode 100644 index 0000000000000000000000000000000000000000..3eeeaee08964cdeb105763b25901bff6073648f0 --- /dev/null +++ b/doc/cpp_distributed_context.rst @@ -0,0 +1,183 @@ +.. _cppdistcontext: + +Distributed Context +=================== + +To support running on systems from laptops and workstations to large distributed +HPC clusters, Arbor uses *distributed contexts* to: + + * Describe the distributed computer system that a simulation is to be + distributed over and run on. + * Perform collective operations over the distributed system, such as gather + and synchronization. + * Query information about the distributed system, such as the number of + distributed processes and the index/rank of the calling process. + +The global context used to run a simulation is determined at run time, not at compile time. +This means that if Arbor is compiled with support for MPI enabled, then at run time the +user can choose between using a non-distributed (local) context, or an distributed MPI +context. + +A global context is created by a user before building and running a simulation. +The context is then used to perform domain decomposition and initialize the simulation +(see :ref:`cppsimulation` for more about the simulation building workflow). +In the example below, a context that uses MPI is used to run a distributed simulation: + +.. container:: example-code + + .. code-block:: cpp + + arb::hw::node_info node; + my_recipe recipe; + + // Get an MPI communication context + arb::distributed_context context = arb::mpi_context(); + + // Partition model over the distributed system + arb::domain_decomposition decomp = arb::partition_load_balance(recipe, node, &context); + + // Instatitate the simulation over the distributed system + arb::simulation sim(recipe, decomp, &context); + + // Run the simulation for 100ms over the distributed system + sim.run(100, 0.01); + +By default :cpp:class:`arb::distributed_context` uses an :cpp:class:`arb::local_context`, which +runs on the local computer or node, that is, it is not distributed. + +To run on a distributed system, use :cpp:class:`arb::mpi_context`, which uses +MPI for distributed communication. +By default the context will use the default MPI communicator ``MPI_COMM_WORLD``, +though it can be initialised with a user-supplied communicator. + +.. container:: example-code + + .. code-block:: cpp + + arb::distributed_context context; + + // This is equivelent to default constructed context above + arb::distributed_context context = arb::local_context(); + + // Create an MPI context that uses MPI_COMM_WORLD + arb::distributed_context context = arb::mpi_context(); + + // create an MPI context with a user-supplied MPI_Comm + arb::distributed_context context = arb::mpi_context(communicator); + + +Class Documentation +------------------- + +.. cpp:namespace:: arb + +.. cpp:class:: distributed_context + + Defines the interface used by Arbor to query and perform collective + operations on distributed systems. + + Uses value-semantic type erasure. The main benefit of this approach is that + classes that implement the interface can use duck typing instead of + deriving from :cpp:class:`distributed_context`. + + **Constructor:** + + .. cpp:function:: distributed_context() + + Default contstructor initializes the context as a :cpp:class:`local_context`. + + .. cpp:function:: distributed_context(distributed_context&& other) + + Move constructor. + + .. cpp:function:: distributed_context& operator=(distributed_context&& other) + + Copy from rvalue. + + .. cpp:function:: template <typename Impl> distributed_context(Impl&& impl) + + Initialize with an implementation that satisfies the interface. + + **Interface:** + + .. cpp:function:: int id() const + + Each distributed process has a unique integer identifier, where the identifiers + are numbered contiguously in the half open range [0, size). + (for example ``MPI_Rank``). + + .. cpp:function:: int size() const + + The number of distributed processes (for example ``MPI_Size``). + + .. cpp:function:: void barrier() const + + A synchronization barrier where all distributed processes wait until every + process has reached the barrier (for example ``MPI_Barrier``). + + .. cpp:function:: std::string name() const + + The name of the context implementation. For example, if using MPI returns ``"MPI"``. + + .. cpp:function:: std::vector<std::string> gather(std::string value, int root) const + + Special overload for gathering a string provided by each domain into a vector + of strings on domain :cpp:var:`root`. + + .. cpp:function:: T min(T value) const + + Reduction operation over all processes. + + The type ``T`` is one of ``float``, ``double``, ``int``, + ``std::uint32_t``, ``std::uint64_t``. + + .. cpp:function:: T max(T value) const + + Reduction operation over all processes. + + The type ``T`` is one of ``float``, ``double``, ``int``, + ``std::uint32_t``, ``std::uint64_t``. + + .. cpp:function:: T sum(T value) const + + Reduction operation over all processes. + + The type ``T`` is one of ``float``, ``double``, ``int``, + ``std::uint32_t``, ``std::uint64_t``. + + .. cpp:function:: std::vector<T> gather(T value, int root) const + + Gather operation. Returns a vector with one entry for each process. + + The type ``T`` is one of ``float``, ``double``, ``int``, + ``std::uint32_t``, ``std::uint64_t``, ``std::string``. + +.. cpp:class:: local_context + + Implements the :cpp:class:`arb::distributed_context` interface for + non-distributed computation. + + This is the default :cpp:class:`arb::distributed_context`, and should be used + when running on laptop or workstation systems with one NUMA domain. + + .. Note:: + :cpp:class:`arb::local_context` provides the simplest possible distributed context, + with only one process, and where all reduction operations are the identity operator. + + **Constructor:** + + .. cpp:function:: local_context() + + Default constructor. + +.. cpp:class:: mpi_context + + Implements the :cpp:class:`arb::distributed_context` interface for + distributed computation using the MPI message passing library. + + **Constructor:** + + .. cpp:function:: mpi_context(MPI_Comm comm=MPI_COMM_WORLD) + + Create a context that will uses the MPI communicator :cpp:var:`comm`. + By default uses the global communicator ``MPI_COMM_WORLD``. diff --git a/doc/cpp_domdec.rst b/doc/cpp_domdec.rst index 1cc0fe8103ff13764e15f04c97c75c178fa5c206..f4cd406436e37ff8567937e8b8aea6c04624f0f3 100644 --- a/doc/cpp_domdec.rst +++ b/doc/cpp_domdec.rst @@ -1,3 +1,5 @@ +.. _cppdomdec: + Domain Decomposition ==================== @@ -89,11 +91,12 @@ describes the cell groups on the local MPI rank. .. cpp:namespace:: arb -.. cpp:function:: domain_decomposition partition_load_balance(const recipe& rec, hw::node_info nd) +.. cpp:function:: domain_decomposition partition_load_balance(const recipe& rec, hw::node_info nd, const distributed_context* ctx) Construct a :cpp:class:`domain_decomposition` that distributes the cells - in the model described by :cpp:var:`rec` over the hardware resources described - by `hw::node_info`. + in the model described by :cpp:any:`rec` over the set of distributed + compute nodes that communicate using :cpp:any:`ctx`, with hardware resources + on the calling node described by :cpp:any:`nd`. The algorithm counts the number of each cell type in the global model, then partitions the cells of each type equally over the available nodes. diff --git a/doc/cpp_recipe.rst b/doc/cpp_recipe.rst index 1da6fe2870e19cbf7b8d9a4f85bbef6e5e58e96b..5e178e62fd7b9e6168a06ef30ba2c133b371b968 100644 --- a/doc/cpp_recipe.rst +++ b/doc/cpp_recipe.rst @@ -148,7 +148,7 @@ Class Documentation Returns a list of all the **incoming** connections for `gid` . Each connection ``con`` should have post-synaptic target ``con.dest.gid`` that matches - the argument :cpp:var:`gid`, and a valid synapse id ``con.dest.index`` on `gid`. + the argument :cpp:any:`gid`, and a valid synapse id ``con.dest.index`` on `gid`. See :cpp:type:`cell_connection`. By default returns an empty list. diff --git a/doc/cpp_simulation.rst b/doc/cpp_simulation.rst index 67e39d180a2c77777721f504a1fe0e7eab4ce6ea..a1b5d5f8f442b6c7e47791922c588d8fd0a5c0d0 100644 --- a/doc/cpp_simulation.rst +++ b/doc/cpp_simulation.rst @@ -1,3 +1,5 @@ +.. _cppsimulation: + Simulations =========== @@ -8,17 +10,23 @@ To build a simulation the following are needed: * An :cpp:class:`arb::recipe` that describes the cells and connections in the model. - * An :cpp:class:`arb::hw::node_info` type that describes the hardware + * An :cpp:class:`arb::hw::node_info` that describes the CPU and GPU hardware resources on which the model will be run. + * An :cpp:class:`arb::distributed_context` that describes the distributed system + on which the model will run. The workflow to build a simulation is to first generate a :cpp:class:`arb::domain_decomposition` that describes the distribution of the model -over the hardware, then build the simulation. +over the local and distributed hardware resources (see :ref:`cppdomdec` and :ref:`cppdistcontext`), +then build the simulation. .. container:: example-code .. code-block:: cpp + // Get a communication context + arb::distributed_context context; + // Make description of the hardware that the simulation will run on. arb::hw::node_info node; node.num_cpu_cores = arb::threading::num_threads(); @@ -29,10 +37,10 @@ over the hardware, then build the simulation. // Get a description of the partition the model over the cores // (and gpu if available) on node. - arb::domain_decomposition decomp = arb::partition_load_balance(recipe, node); + arb::domain_decomposition decomp = arb::partition_load_balance(recipe, node, &context); // Instatitate the simulation. - arb::simulation sim(recipe, decomp); + arb::simulation sim(recipe, decomp, &context); Class Documentation @@ -47,9 +55,12 @@ Class Documentation Simulations take the following inputs: - * The **constructor** takes an :cpp:class:`arb::recipe` that describes - the model, and an :cpp:class:`arb::domain_decomposition` that - describes how the cells in the model are assigned to hardware resources. + * The **constructor** takes: + * an :cpp:class:`arb::recipe` that describes the model; + * an :cpp:class:`arb::domain_decomposition` that describes how the + cells in the model are assigned to hardware resources; + * an :cpp:class:`arb::distributed_context` which performs communication + on distributed memory syustems. * **Experimental inputs** that can change between model runs, such as external spike trains. @@ -62,10 +73,6 @@ Class Documentation **Types:** - .. cpp:type:: communicator_type = communication::communicator<communication::global_policy> - - Type used for distributed communication of spikes and global synchronization. - .. cpp:type:: spike_export_function = std::function<void(const std::vector<spike>&)> User-supplied callack function used as a sink for spikes generated @@ -74,7 +81,7 @@ Class Documentation **Constructor:** - .. cpp:function:: simulation(const recipe& rec, const domain_decomposition& decomp) + .. cpp:function:: simulation(const recipe& rec, const domain_decomposition& decomp, const distributed_context* ctx) **Experimental inputs:** @@ -92,8 +99,8 @@ Class Documentation .. cpp:function:: time_type run(time_type tfinal, time_type dt) - Run the simulation from current simulation time to :cpp:var:`tfinal`, - with maximum time step size :cpp:var:`dt`. + Run the simulation from current simulation time to :cpp:any:`tfinal`, + with maximum time step size :cpp:any:`dt`. .. cpp:function:: void set_binning_policy(binning_kind policy, time_type bin_interval) diff --git a/doc/index.rst b/doc/index.rst index 5d1e9825c5e16ba249b33a865f5753a5677fc66c..f6de1c0fe9ce2e0292a61f3c07d5a168ba419ddf 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -55,6 +55,7 @@ Some key features include: cpp_recipe cpp_domdec cpp_simulation + cpp_distributed_context .. toctree:: :caption: Developers: diff --git a/example/brunel/brunel_miniapp.cpp b/example/brunel/brunel_miniapp.cpp index 545c29c12ff2ad37b5847ac7198c8e69355d9b8c..e324c8f64bb753239036658ac9fbf2810f244214 100644 --- a/example/brunel/brunel_miniapp.cpp +++ b/example/brunel/brunel_miniapp.cpp @@ -8,7 +8,7 @@ #include <common_types.hpp> #include <communication/communicator.hpp> -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <event_generator.hpp> #include <hardware/gpu.hpp> #include <hardware/node_info.hpp> @@ -30,10 +30,7 @@ using namespace arb; -using global_policy = communication::global_policy; -using file_export_type = io::exporter_spike_file<global_policy>; -void banner(hw::node_info); -using communicator_type = communication::communicator<communication::global_policy>; +void banner(hw::node_info, const distributed_context*); // Samples m unique values in interval [start, end) - gid. // We exclude gid because we don't want self-loops. @@ -189,21 +186,24 @@ private: using util::any_cast; using util::make_span; -using global_policy = communication::global_policy; -using file_export_type = io::exporter_spike_file<global_policy>; int main(int argc, char** argv) { - arb::communication::global_policy_guard global_guard(argc, argv); + distributed_context context; + try { - arb::util::meter_manager meters; +#ifdef ARB_HAVE_MPI + mpi::scoped_guard guard(&argc, &argv); + context = mpi_context(MPI_COMM_WORLD); +#endif + arb::util::meter_manager meters(&context); meters.start(); - std::cout << util::mask_stream(global_policy::id()==0); + std::cout << util::mask_stream(context.id()==0); // read parameters - io::cl_options options = io::read_options(argc, argv, global_policy::id()==0); + io::cl_options options = io::read_options(argc, argv, context.id()==0); hw::node_info nd; nd.num_cpu_cores = threading::num_threads(); nd.num_gpus = hw::num_gpus()>0? 1: 0; - banner(nd); + banner(nd, &context); meters.checkpoint("setup"); @@ -239,16 +239,16 @@ int main(int argc, char** argv) { brunel_recipe recipe(nexc, ninh, next, in_degree_prop, w, d, rel_inh_strength, poiss_lambda, seed); auto register_exporter = [] (const io::cl_options& options) { - return util::make_unique<file_export_type> + return util::make_unique<io::exporter_spike_file> (options.file_name, options.output_path, options.file_extension, options.over_write); }; - auto decomp = decompose(recipe, group_size); - simulation sim(recipe, decomp); + auto decomp = decompose(recipe, group_size, &context); + simulation sim(recipe, decomp, &context); // Initialize the spike exporting interface - std::unique_ptr<file_export_type> file_exporter; + std::unique_ptr<io::exporter_spike_file> file_exporter; if (options.spike_file_output) { if (options.single_file_per_rank) { file_exporter = register_exporter(options); @@ -259,7 +259,7 @@ int main(int argc, char** argv) { } ); } - else if(communication::global_policy::id()==0) { + else if(context.id()==0) { file_exporter = register_exporter(options); sim.set_global_spike_callback( @@ -282,7 +282,7 @@ int main(int argc, char** argv) { auto report = util::make_meter_report(meters); std::cout << report; - if (global_policy::id()==0) { + if (context.id()==0) { std::ofstream fid; fid.exceptions(std::ios_base::badbit | std::ios_base::failbit); fid.open("meters.json"); @@ -291,7 +291,7 @@ int main(int argc, char** argv) { } catch (io::usage_error& e) { // only print usage/startup errors on master - std::cerr << util::mask_stream(global_policy::id()==0); + std::cerr << util::mask_stream(context.id()==0); std::cerr << e.what() << "\n"; return 1; } @@ -302,11 +302,11 @@ int main(int argc, char** argv) { return 0; } -void banner(hw::node_info nd) { +void banner(hw::node_info nd, const distributed_context* ctx) { std::cout << "==========================================\n"; std::cout << " Arbor miniapp\n"; - std::cout << " - distributed : " << global_policy::size() - << " (" << std::to_string(global_policy::kind()) << ")\n"; + std::cout << " - distributed : " << ctx->size() + << " (" << ctx->name() << ")\n"; std::cout << " - threads : " << nd.num_cpu_cores << " (" << threading::description() << ")\n"; std::cout << " - gpus : " << nd.num_gpus << "\n"; diff --git a/example/brunel/partitioner.hpp b/example/brunel/partitioner.hpp index 8f56c0915b443a06284695669c5fcddf7fb94904..f73e1dee58b677fe7acccf441db1663af80d0218 100644 --- a/example/brunel/partitioner.hpp +++ b/example/brunel/partitioner.hpp @@ -1,10 +1,11 @@ -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <domain_decomposition.hpp> #include <hardware/node_info.hpp> #include <recipe.hpp> namespace arb { - domain_decomposition decompose(const recipe& rec, const unsigned group_size) { + static + domain_decomposition decompose(const recipe& rec, const unsigned group_size, const distributed_context* ctx) { struct partition_gid_domain { partition_gid_domain(std::vector<cell_gid_type> divs): gid_divisions(std::move(divs)) @@ -19,8 +20,8 @@ namespace arb { }; cell_size_type num_global_cells = rec.num_cells(); - unsigned num_domains = communication::global_policy::size(); - int domain_id = communication::global_policy::id(); + unsigned num_domains = ctx->size(); + int domain_id = ctx->id(); auto dom_size = [&](unsigned dom) -> cell_gid_type { const cell_gid_type B = num_global_cells/num_domains; diff --git a/example/generators/event_gen.cpp b/example/generators/event_gen.cpp index 38f3094ea00dc44706b6b62abf34f639d3bdf0dd..6f6aeb4420cb526a5c6bc785a31f2ff35abdb178 100644 --- a/example/generators/event_gen.cpp +++ b/example/generators/event_gen.cpp @@ -14,6 +14,7 @@ #include <cell.hpp> #include <common_types.hpp> +#include <communication/distributed_context.hpp> #include <event_generator.hpp> #include <hardware/node_info.hpp> #include <load_balance.hpp> @@ -124,15 +125,20 @@ public: }; int main() { + // A distributed_context is required for distributed computation (e.g. MPI). + // For this simple one-cell example, non-distributed context is suitable, + // which is what we get with a default-constructed distributed_context. + arb::distributed_context context; + // Create an instance of our recipe. generator_recipe recipe; // Make the domain decomposition for the model auto node = arb::hw::get_node_info(); - auto decomp = arb::partition_load_balance(recipe, node); + auto decomp = arb::partition_load_balance(recipe, node, &context); // Construct the model. - arb::simulation sim(recipe, decomp); + arb::simulation sim(recipe, decomp, &context); // Set up the probe that will measure voltage in the cell. diff --git a/example/miniapp/miniapp.cpp b/example/miniapp/miniapp.cpp index 1aecbf493e762345d693966d62802374e4429c38..fc03fa2f0d951bde8a94bd78a41131dd3634b852 100644 --- a/example/miniapp/miniapp.cpp +++ b/example/miniapp/miniapp.cpp @@ -9,7 +9,7 @@ #include <common_types.hpp> #include <communication/communicator.hpp> -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <cell.hpp> #include <hardware/gpu.hpp> #include <hardware/node_info.hpp> @@ -36,49 +36,37 @@ using namespace arb; using util::any_cast; using util::make_span; -using global_policy = communication::global_policy; -using file_export_type = io::exporter_spike_file<global_policy>; -using communicator_type = communication::communicator<communication::global_policy>; - -void banner(hw::node_info); +void banner(hw::node_info, const distributed_context*); std::unique_ptr<recipe> make_recipe(const io::cl_options&, const probe_distribution&); sample_trace make_trace(const probe_info& probe); void report_compartment_stats(const recipe&); int main(int argc, char** argv) { - communication::global_policy_guard global_guard(argc, argv); + // default serial context + distributed_context context; try { - util::meter_manager meters; + #ifdef ARB_HAVE_MPI + mpi::scoped_guard guard(&argc, &argv); + context = mpi_context(MPI_COMM_WORLD); + #endif + + util::meter_manager meters(&context); meters.start(); - std::cout << util::mask_stream(global_policy::id()==0); + std::cout << util::mask_stream(context.id()==0); // read parameters - io::cl_options options = io::read_options(argc, argv, global_policy::id()==0); - - // If compiled in dry run mode we have to set up the dry run - // communicator to simulate the number of ranks that may have been set - // as a command line parameter (if not, it is 1 rank by default) - if (global_policy::kind() == communication::global_policy_kind::dryrun) { - // Dry run mode requires that each rank has the same number of cells. - // Here we increase the total number of cells if required to ensure - // that this condition is satisfied. - auto cells_per_rank = options.cells/options.dry_run_ranks; - if (options.cells % options.dry_run_ranks) { - ++cells_per_rank; - options.cells = cells_per_rank*options.dry_run_ranks; - } + io::cl_options options = io::read_options(argc, argv, context.id()==0); - global_policy::set_sizes(options.dry_run_ranks, cells_per_rank); - } + // TODO: add dry run mode // Use a node description that uses the number of threads used by the // threading back end, and 1 gpu if available. hw::node_info nd; nd.num_cpu_cores = threading::num_threads(); nd.num_gpus = hw::num_gpus()>0? 1: 0; - banner(nd); + banner(nd, &context); meters.checkpoint("setup"); @@ -94,13 +82,13 @@ int main(int argc, char** argv) { auto register_exporter = [] (const io::cl_options& options) { return - util::make_unique<file_export_type>( + util::make_unique<io::exporter_spike_file>( options.file_name, options.output_path, options.file_extension, options.over_write); }; - auto decomp = partition_load_balance(*recipe, nd); - simulation sim(*recipe, decomp); + auto decomp = partition_load_balance(*recipe, nd, &context); + simulation sim(*recipe, decomp, &context); // Set up samplers for probes on local cable cells, as requested // by command line options. @@ -133,7 +121,7 @@ int main(int argc, char** argv) { sim.set_binning_policy(binning_policy, options.bin_dt); // Initialize the spike exporting interface - std::unique_ptr<file_export_type> file_exporter; + std::unique_ptr<io::exporter_spike_file> file_exporter; if (options.spike_file_output) { if (options.single_file_per_rank) { file_exporter = register_exporter(options); @@ -142,7 +130,7 @@ int main(int argc, char** argv) { file_exporter->output(spikes); }); } - else if(communication::global_policy::id()==0) { + else if(context.id()==0) { file_exporter = register_exporter(options); sim.set_global_spike_callback( [&](const std::vector<spike>& spikes) { @@ -171,7 +159,7 @@ int main(int argc, char** argv) { auto report = util::make_meter_report(meters); std::cout << report; - if (global_policy::id()==0) { + if (context.id()==0) { std::ofstream fid; fid.exceptions(std::ios_base::badbit | std::ios_base::failbit); fid.open("meters.json"); @@ -180,7 +168,7 @@ int main(int argc, char** argv) { } catch (io::usage_error& e) { // only print usage/startup errors on master - std::cerr << util::mask_stream(global_policy::id()==0); + std::cerr << util::mask_stream(context.id()==0); std::cerr << e.what() << "\n"; return 1; } @@ -191,11 +179,11 @@ int main(int argc, char** argv) { return 0; } -void banner(hw::node_info nd) { +void banner(hw::node_info nd, const distributed_context* ctx) { std::cout << "==========================================\n"; std::cout << " Arbor miniapp\n"; - std::cout << " - distributed : " << global_policy::size() - << " (" << std::to_string(global_policy::kind()) << ")\n"; + std::cout << " - distributed : " << ctx->size() + << " (" << ctx->name() << ")\n"; std::cout << " - threads : " << nd.num_cpu_cores << " (" << threading::description() << ")\n"; std::cout << " - gpus : " << nd.num_gpus << "\n"; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b50e258609810105be8ae5417cbf4d7938c007b7..7e0f31855624a8f7e4e8c8bba131b79c39e5cf42 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -47,9 +47,6 @@ endif() if(ARB_WITH_MPI) list(APPEND arbor_cxx_sources communication/mpi.cpp) -elseif(ARB_WITH_DRYRUN) - list(APPEND arbor_cxx_sources - communication/dryrun_global_policy.cpp) endif() if(ARB_WITH_CTHREAD) diff --git a/src/communication/communicator.hpp b/src/communication/communicator.hpp index 2b562b0c3ed61255b7040384421e3da09c2d2f5c..a372937cebc2839187b322081e61ee6c96549779 100644 --- a/src/communication/communicator.hpp +++ b/src/communication/communicator.hpp @@ -10,6 +10,7 @@ #include <algorithms.hpp> #include <common_types.hpp> #include <communication/gathered_vector.hpp> +#include <communication/distributed_context.hpp> #include <connection.hpp> #include <domain_decomposition.hpp> #include <event_queue.hpp> @@ -22,7 +23,6 @@ #include <util/rangeutil.hpp> namespace arb { -namespace communication { // When the communicator is constructed the number of target groups and targets // is specified, along with a mapping between local cell id and local @@ -35,16 +35,17 @@ namespace communication { // to build the data structures required for efficient spike communication and // event generation. -template <typename CommunicationPolicy> class communicator { public: - using communication_policy_type = CommunicationPolicy; - communicator() {} - explicit communicator(const recipe& rec, const domain_decomposition& dom_dec) { + explicit communicator(const recipe& rec, + const domain_decomposition& dom_dec, + const distributed_context* ctx) + { using util::make_span; - num_domains_ = comms_.size(); + context_ = ctx; + num_domains_ = context_->size(); num_local_groups_ = dom_dec.groups.size(); num_local_cells_ = dom_dec.num_local_cells; @@ -141,7 +142,7 @@ public: local_min = std::min(local_min, con.delay()); } - return comms_.min(local_min); + return context_->min(local_min); } /// Perform exchange of spikes. @@ -156,7 +157,7 @@ public: PE(communication_exchange_gather); // global all-to-all to gather a local copy of the global spike list on each node. - auto global_spikes = comms_.gather_spikes(local_spikes); + auto global_spikes = context_->gather_spikes(local_spikes); num_spikes_ += global_spikes.size(); PL(); @@ -256,9 +257,8 @@ private: std::vector<cell_size_type> index_divisions_; util::partition_view_type<std::vector<cell_size_type>> index_part_; - communication_policy_type comms_; + const distributed_context* context_; std::uint64_t num_spikes_ = 0u; }; -} // namespace communication } // namespace arb diff --git a/src/communication/distributed_context.hpp b/src/communication/distributed_context.hpp new file mode 100644 index 0000000000000000000000000000000000000000..ac599391070c6eac309553d73c048d0e6094fc8d --- /dev/null +++ b/src/communication/distributed_context.hpp @@ -0,0 +1,139 @@ +#pragma once + +#include <string> + +#include <spike.hpp> +#include <communication/gathered_vector.hpp> +#include <util/pp_util.hpp> + +#if defined(ARB_HAVE_MPI) +# include "mpi_context.hpp" +#endif +#include "local_context.hpp" + + +namespace arb { + +#define ARB_PUBLIC_COLLECTIVES_(T) \ + T min(T value) const { return impl_->min(value); }\ + T max(T value) const { return impl_->max(value); }\ + T sum(T value) const { return impl_->sum(value); }\ + std::vector<T> gather(T value, int root) const { return impl_->gather(value, root); } + +#define ARB_INTERFACE_COLLECTIVES_(T) \ + virtual T min(T value) const = 0;\ + virtual T max(T value) const = 0;\ + virtual T sum(T value) const = 0;\ + virtual std::vector<T> gather(T value, int root) const = 0; + +#define ARB_WRAP_COLLECTIVES_(T) \ + T min(T value) const override { return wrapped.min(value); }\ + T max(T value) const override { return wrapped.max(value); }\ + T sum(T value) const override { return wrapped.sum(value); }\ + std::vector<T> gather(T value, int root) const override { return wrapped.gather(value, root); } + +#define ARB_COLLECTIVE_TYPES_ float, double, int, std::uint32_t, std::uint64_t + +// distributed_context +// +// Defines the concept/interface for a distributed communication context. +// +// Uses value-semantic type erasure to define the interface, so that +// types that implement the interface can use duck-typing, without having +// to inherit from distributed_context. +// +// For the simplest example of a distributed_context implementation, +// see local_context, which is the default context. + +class distributed_context { +public: + using spike_vector = std::vector<arb::spike>; + + // default constructor uses a local context + distributed_context(): distributed_context(local_context()) {} + + template <typename Impl> + distributed_context(Impl&& impl): + impl_(new wrap<Impl>(std::forward<Impl>(impl))) + {} + + distributed_context(distributed_context&& other) = default; + distributed_context& operator=(distributed_context&& other) = default; + + gathered_vector<arb::spike> gather_spikes(const spike_vector& local_spikes) const { + return impl_->gather_spikes(local_spikes); + } + + int id() const { + return impl_->id(); + } + + int size() const { + return impl_->size(); + } + + void barrier() const { + impl_->barrier(); + } + + std::string name() const { + return impl_->name(); + } + + ARB_PP_FOREACH(ARB_PUBLIC_COLLECTIVES_, ARB_COLLECTIVE_TYPES_); + + std::vector<std::string> gather(std::string value, int root) const { + return impl_->gather(value, root); + } + +private: + struct interface { + virtual gathered_vector<arb::spike> + gather_spikes(const spike_vector& local_spikes) const = 0; + virtual int id() const = 0; + virtual int size() const = 0; + virtual void barrier() const = 0; + virtual std::string name() const = 0; + + ARB_PP_FOREACH(ARB_INTERFACE_COLLECTIVES_, ARB_COLLECTIVE_TYPES_); + virtual std::vector<std::string> gather(std::string value, int root) const = 0; + + virtual ~interface() {} + }; + + template <typename Impl> + struct wrap: interface { + explicit wrap(const Impl& impl): wrapped(impl) {} + explicit wrap(Impl&& impl): wrapped(std::move(impl)) {} + + gathered_vector<arb::spike> + gather_spikes(const spike_vector& local_spikes) const { + return wrapped.gather_spikes(local_spikes); + } + int id() const { + return wrapped.id(); + } + int size() const { + return wrapped.size(); + } + void barrier() const { + wrapped.barrier(); + } + std::string name() const { + return wrapped.name(); + } + + ARB_PP_FOREACH(ARB_WRAP_COLLECTIVES_, ARB_COLLECTIVE_TYPES_) + + std::vector<std::string> gather(std::string value, int root) const override { + return wrapped.gather(value, root); + } + + Impl wrapped; + }; + + std::unique_ptr<interface> impl_; +}; + +} // namespace arb + diff --git a/src/communication/dryrun_global_policy.cpp b/src/communication/dryrun_global_policy.cpp deleted file mode 100644 index 5e640c65427c101d4adb7744d923bb475f452d1d..0000000000000000000000000000000000000000 --- a/src/communication/dryrun_global_policy.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "global_policy.hpp" - -namespace arb { -namespace communication { - -int dryrun_communicator_size=0; -int dryrun_num_local_cells=0; - -} // namespace communication -} // namespace arb diff --git a/src/communication/dryrun_global_policy.hpp b/src/communication/dryrun_global_policy.hpp deleted file mode 100644 index 67ca6db04db85d9096529c79a343bea95b310481..0000000000000000000000000000000000000000 --- a/src/communication/dryrun_global_policy.hpp +++ /dev/null @@ -1,92 +0,0 @@ -#pragma once - -#include <cstdint> -#include <type_traits> -#include <vector> - -#include <communication/gathered_vector.hpp> -#include <util/span.hpp> -#include <spike.hpp> - -namespace arb { -namespace communication { - -extern int dryrun_num_local_cells; -extern int dryrun_communicator_size; - -struct dryrun_global_policy { - template <typename Spike> - static gathered_vector<Spike> - gather_spikes(const std::vector<Spike>& local_spikes) { - using util::make_span; - using count_type = typename gathered_vector<Spike>::count_type; - - // Build the global spike list by replicating the local spikes for each - // "dummy" domain. - const auto num_spikes_local = local_spikes.size(); - const auto num_spikes_global = size()*num_spikes_local; - std::vector<Spike> global_spikes(num_spikes_global); - std::vector<count_type> partition(size()+1); - - for (auto rank: make_span(0u, size())) { - const auto first_cell = rank*dryrun_num_local_cells; - const auto first_spike = rank*num_spikes_local; - for (auto i: make_span(0, num_spikes_local)) { - // the new global spike is the same as the local spike, with - // its source index shifted to the dummy domain - auto s = local_spikes[i]; - s.source.gid += first_cell; - global_spikes[first_spike+i] = s; - } - partition[rank+1] = partition[rank]+num_spikes_local; - } - - EXPECTS(partition.back()==num_spikes_global); - return {std::move(global_spikes), std::move(partition)}; - } - - static int id() { - return 0; - } - - static int size() { - return dryrun_communicator_size; - } - - static void set_sizes(int comm_size, int num_local_cells) { - dryrun_communicator_size = comm_size; - dryrun_num_local_cells = num_local_cells; - } - - template <typename T> - static T min(T value) { - return value; - } - - template <typename T> - static T max(T value) { - return value; - } - - template <typename T> - static T sum(T value) { - return size()*value; - } - - template <typename T> - static std::vector<T> gather(T value, int) { - return std::vector<T>(size(), value); - } - - static void barrier() {} - - static void setup(int& argc, char**& argv) {} - static void teardown() {} - - static global_policy_kind kind() { return global_policy_kind::dryrun; }; -}; - -using global_policy = dryrun_global_policy; - -} // namespace communication -} // namespace arb diff --git a/src/communication/global_policy.hpp b/src/communication/global_policy.hpp deleted file mode 100644 index e26406beee393a796a8de485e371922aee1b0d8f..0000000000000000000000000000000000000000 --- a/src/communication/global_policy.hpp +++ /dev/null @@ -1,56 +0,0 @@ -#pragma once - -#include <string> - -namespace arb { namespace communication { - enum class global_policy_kind {serial, mpi, dryrun}; -}} - -namespace std { - inline - std::string to_string(arb::communication::global_policy_kind k) { - using namespace arb::communication; - if (k == global_policy_kind::mpi) { - return "MPI"; - } - if (k == global_policy_kind::dryrun) { - return "dryrun"; - } - return "serial"; - } -} - -#if defined(ARB_HAVE_MPI) - #include "mpi_global_policy.hpp" -#elif defined(ARB_HAVE_DRYRUN) - #include "dryrun_global_policy.hpp" -#else - #include "serial_global_policy.hpp" -#endif - -namespace arb { -namespace communication { - -template <typename Policy> -struct policy_guard { - using policy_type = Policy; - - policy_guard(int argc, char**& argv) { - policy_type::setup(argc, argv); - } - - policy_guard() = delete; - policy_guard(policy_guard&&) = delete; - policy_guard(const policy_guard&) = delete; - policy_guard& operator=(policy_guard&&) = delete; - policy_guard& operator=(const policy_guard&) = delete; - - ~policy_guard() { - Policy::teardown(); - } -}; - -using global_policy_guard = policy_guard<global_policy>; - -} // namespace communication -} // namespace arb diff --git a/src/communication/local_context.hpp b/src/communication/local_context.hpp new file mode 100644 index 0000000000000000000000000000000000000000..8bae0840b2ecb6df71795e6274a4d699d191243a --- /dev/null +++ b/src/communication/local_context.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include <vector> + +#include <communication/gathered_vector.hpp> +#include <spike.hpp> + +namespace arb { + +struct local_context { + gathered_vector<arb::spike> + gather_spikes(const std::vector<arb::spike>& local_spikes) const { + using count_type = typename gathered_vector<arb::spike>::count_type; + return gathered_vector<arb::spike>( + std::vector<arb::spike>(local_spikes), + {0u, static_cast<count_type>(local_spikes.size())} + ); + } + + int id() const { + return 0; + } + + int size() const { + return 1; + } + + template <typename T> + T min(T value) const { + return value; + } + + template <typename T> + T max(T value) const { + return value; + } + + template <typename T> + T sum(T value) const { + return value; + } + + template <typename T> + std::vector<T> gather(T value, int) const { + return {std::move(value)}; + } + + void barrier() const {} + + std::string name() const { + return "serial"; + } +}; + +} // namespace arb diff --git a/src/communication/mpi.cpp b/src/communication/mpi.cpp index 79288404597c788f981cd4544596249f7738854d..18bc425c337e3e63c3029804ba1da5a4bb367281 100644 --- a/src/communication/mpi.cpp +++ b/src/communication/mpi.cpp @@ -5,56 +5,73 @@ namespace arb { namespace mpi { -// global state -namespace state { - // TODO: in case MPI_Init is never called, this will mimic one rank with rank 0. - // This is not ideal: calls to MPI-dependent features such as reductions will - // still fail, however this allows us to run all the unit tests until the - // run-time executors are implemented. - int size = 1; - int rank = 0; -} // namespace state - -void init(int *argc, char ***argv) { - int provided; +// global guard for initializing mpi. - // initialize with thread serialized level of thread safety - MPI_Init_thread(argc, argv, MPI_THREAD_SERIALIZED, &provided); - assert(provided>=MPI_THREAD_SERIALIZED); +scoped_guard::scoped_guard(int *argc, char ***argv) { + init(argc, argv); +} - MPI_Comm_rank(MPI_COMM_WORLD, &state::rank); - MPI_Comm_size(MPI_COMM_WORLD, &state::size); +scoped_guard::~scoped_guard() { + finalize(); } -void finalize() { - MPI_Finalize(); +// MPI exception class. + +mpi_error::mpi_error(const char* msg, int code): + error_code_(code) +{ + thread_local char buffer[MPI_MAX_ERROR_STRING]; + int n; + MPI_Error_string(error_code_, buffer, &n); + message_ = "MPI error ("; + message_ += buffer; + message_ += "): "; + message_ += msg; } -bool is_root() { - return state::rank == 0; +void handle_mpi_error(const char* msg, int code) { + if (code!=MPI_SUCCESS) { + throw mpi_error(msg, code); + } } -int rank() { - return state::rank; +const char* mpi_error::what() const throw() { + return message_.c_str(); } -int size() { - return state::size; +int mpi_error::error_code() const { + return error_code_; } -void barrier() { - MPI_Barrier(MPI_COMM_WORLD); +void init(int* argc, char*** argv) { + int provided; + + // initialize with thread serialized level of thread safety + MPI_Init_thread(argc, argv, MPI_THREAD_SERIALIZED, &provided); + + if(provided<MPI_THREAD_SERIALIZED) { + throw mpi_error("Unable to initialize MPI with MPI_THREAD_SERIALIZED", MPI_ERR_OTHER); + } } -bool ballot(bool vote) { - using traits = mpi_traits<char>; +void finalize() { + MPI_Finalize(); +} - char result; - char value = vote ? 1 : 0; +int rank(MPI_Comm comm) { + int r; + handle_mpi_error("MPI_Rank", MPI_Comm_rank(comm, &r)); + return r; +} - MPI_Allreduce(&value, &result, 1, traits::mpi_type(), MPI_LAND, MPI_COMM_WORLD); +int size(MPI_Comm comm) { + int s; + handle_mpi_error("MPI_Size", MPI_Comm_size(comm, &s)); + return s; +} - return result; +void barrier(MPI_Comm comm) { + handle_mpi_error("MPI_Barrier", MPI_Barrier(comm)); } } // namespace mpi diff --git a/src/communication/mpi.hpp b/src/communication/mpi.hpp index 3547f144335be2da294699cfacb1e7bcd2a1f323..5f593066217dd3420b78262643ada1c28eddee48 100644 --- a/src/communication/mpi.hpp +++ b/src/communication/mpi.hpp @@ -18,229 +18,247 @@ namespace arb { namespace mpi { - // prototypes - void init(int *argc, char ***argv); - void finalize(); - bool is_root(); - int rank(); - int size(); - void barrier(); - bool ballot(bool vote); - - // type traits for automatically setting MPI_Datatype information - // for C++ types - template <typename T> - struct mpi_traits { - constexpr static size_t count() { - return sizeof(T); - } - constexpr static MPI_Datatype mpi_type() { - return MPI_CHAR; - } - constexpr static bool is_mpi_native_type() { - return false; - } - }; - - #define MAKE_TRAITS(T,M) \ - template <> \ - struct mpi_traits<T> { \ - constexpr static size_t count() { return 1; } \ - /* constexpr */ static MPI_Datatype mpi_type() { return M; } \ - constexpr static bool is_mpi_native_type() { return true; } \ - }; - - MAKE_TRAITS(double, MPI_DOUBLE) - MAKE_TRAITS(float, MPI_FLOAT) - MAKE_TRAITS(int, MPI_INT) - MAKE_TRAITS(long int, MPI_LONG) - MAKE_TRAITS(char, MPI_CHAR) - MAKE_TRAITS(size_t, MPI_UNSIGNED_LONG) - static_assert(sizeof(size_t)==sizeof(unsigned long), - "size_t and unsigned long are not equivalent"); - - // Gather individual values of type T from each rank into a std::vector on - // the root rank. - // T must be trivially copyable - template<typename T> - std::vector<T> gather(T value, int root) { - using traits = mpi_traits<T>; - auto buffer_size = (rank()==root) ? size() : 0; - std::vector<T> buffer(buffer_size); - - MPI_Gather( &value, traits::count(), traits::mpi_type(), // send buffer - buffer.data(), traits::count(), traits::mpi_type(), // receive buffer - root, MPI_COMM_WORLD); - - return buffer; +// prototypes +void init(int *argc, char ***argv); +void finalize(); +int rank(MPI_Comm); +int size(MPI_Comm); +void barrier(MPI_Comm); + +void handle_mpi_error(const char* msg, int code); + +// Exception class to be thrown when MPI API calls return a error code other +// than MPI_SUCCESS. +class mpi_error: public std::exception { +public: + mpi_error(const char* msg, int code); + const char* what() const throw() override; + int error_code() const; + +private: + std::string message_; + int error_code_; +}; + +struct scoped_guard { + scoped_guard(int *argc, char ***argv); + ~scoped_guard(); +}; + +// Type traits for automatically setting MPI_Datatype information for C++ types. +template <typename T> +struct mpi_traits { + constexpr static size_t count() { + return sizeof(T); } - - // Gather individual values of type T from each rank into a std::vector on - // the every rank. - // T must be trivially copyable - template <typename T> - std::vector<T> gather_all(T value) { - using traits = mpi_traits<T>; - std::vector<T> buffer(size()); - - MPI_Allgather( &value, traits::count(), traits::mpi_type(), // send buffer - buffer.data(), traits::count(), traits::mpi_type(), // receive buffer - MPI_COMM_WORLD); - - return buffer; + constexpr static MPI_Datatype mpi_type() { + return MPI_CHAR; } - - // Specialize gather for std::string. - inline std::vector<std::string> gather(std::string str, int root) { - using traits = mpi_traits<char>; - - auto counts = gather_all(int(str.size())); - auto displs = algorithms::make_index(counts); - - std::vector<char> buffer(displs.back()); - + constexpr static bool is_mpi_native_type() { + return false; + } +}; + +#define MAKE_TRAITS(T,M) \ +template <> \ +struct mpi_traits<T> { \ + constexpr static size_t count() { return 1; } \ + /* constexpr */ static MPI_Datatype mpi_type() { return M; } \ + constexpr static bool is_mpi_native_type() { return true; } \ +}; + +MAKE_TRAITS(double, MPI_DOUBLE) +MAKE_TRAITS(float, MPI_FLOAT) +MAKE_TRAITS(int, MPI_INT) +MAKE_TRAITS(long int, MPI_LONG) +MAKE_TRAITS(char, MPI_CHAR) +MAKE_TRAITS(unsigned int, MPI_UNSIGNED) +MAKE_TRAITS(size_t, MPI_UNSIGNED_LONG) +static_assert(sizeof(size_t)==sizeof(unsigned long), + "size_t and unsigned long are not equivalent"); + +// Gather individual values of type T from each rank into a std::vector on +// the root rank. +// T must be trivially copyable. +template<typename T> +std::vector<T> gather(T value, int root, MPI_Comm comm) { + using traits = mpi_traits<T>; + auto buffer_size = (rank(comm)==root) ? size(comm) : 0; + std::vector<T> buffer(buffer_size); + + handle_mpi_error("MPI_Gather", + MPI_Gather( &value, traits::count(), traits::mpi_type(), // send buffer + buffer.data(), traits::count(), traits::mpi_type(), // receive buffer + root, comm)); + + return buffer; +} + +// Gather individual values of type T from each rank into a std::vector on +// the every rank. +// T must be trivially copyable +template <typename T> +std::vector<T> gather_all(T value, MPI_Comm comm) { + using traits = mpi_traits<T>; + std::vector<T> buffer(size(comm)); + + handle_mpi_error("MPI_Allgather", + MPI_Allgather( + &value, traits::count(), traits::mpi_type(), // send buffer + buffer.data(), traits::count(), traits::mpi_type(), // receive buffer + comm)); + + return buffer; +} + +// Specialize gather for std::string. +inline std::vector<std::string> gather(std::string str, int root, MPI_Comm comm) { + using traits = mpi_traits<char>; + + auto counts = gather_all(int(str.size()), comm); + auto displs = algorithms::make_index(counts); + + std::vector<char> buffer(displs.back()); + + // const_cast required for MPI implementations that don't use const* in + // their interfaces. + std::string::value_type* ptr = const_cast<std::string::value_type*>(str.data()); + handle_mpi_error("MPI_Gatherv", MPI_Gatherv( - // const_cast required for MPI implementations that don't use const* in their interfaces - const_cast<std::string::value_type*>(str.data()), counts[rank()], traits::mpi_type(), // send - buffer.data(), counts.data(), displs.data(), traits::mpi_type(), // receive - root, MPI_COMM_WORLD); - - // Unpack the raw string data into a vector of strings. - std::vector<std::string> result; - result.reserve(size()); - for (auto i=0; i<size(); ++i) { - result.push_back(std::string(buffer.data()+displs[i], counts[i])); - } - return result; + ptr, counts[rank(comm)], traits::mpi_type(), // send + buffer.data(), counts.data(), displs.data(), traits::mpi_type(), // receive + root, comm)); + + // Unpack the raw string data into a vector of strings. + std::vector<std::string> result; + auto nranks = size(comm); + result.reserve(nranks); + for (auto i=0; i<nranks; ++i) { + result.push_back(std::string(buffer.data()+displs[i], counts[i])); } + return result; +} +template <typename T> +std::vector<T> gather_all(const std::vector<T>& values, MPI_Comm comm) { - template <typename T> - std::vector<T> gather_all(const std::vector<T>& values) { - - using traits = mpi_traits<T>; - auto counts = gather_all(int(values.size())); - for (auto& c : counts) { - c *= traits::count(); - } - auto displs = algorithms::make_index(counts); - - std::vector<T> buffer(displs.back()/traits::count()); + using traits = mpi_traits<T>; + auto counts = gather_all(int(values.size()), comm); + for (auto& c : counts) { + c *= traits::count(); + } + auto displs = algorithms::make_index(counts); + std::vector<T> buffer(displs.back()/traits::count()); + handle_mpi_error("MPI_Allgatherv", MPI_Allgatherv( - // send buffer // const_cast required for MPI implementations that don't use const* in their interfaces - const_cast<T*>(values.data()), counts[rank()], traits::mpi_type(), - // receive buffer - buffer.data(), counts.data(), displs.data(), traits::mpi_type(), - MPI_COMM_WORLD - ); - - return buffer; + const_cast<T*>(values.data()), counts[rank(comm)], traits::mpi_type(), // send buffer + buffer.data(), counts.data(), displs.data(), traits::mpi_type(), // receive buffer + comm)); + + return buffer; +} + +/// Gather all of a distributed vector +/// Retains the meta data (i.e. vector partition) +template <typename T> +gathered_vector<T> gather_all_with_partition(const std::vector<T>& values, MPI_Comm comm) { + using gathered_type = gathered_vector<T>; + using count_type = typename gathered_vector<T>::count_type; + using traits = mpi_traits<T>; + + // We have to use int for the count and displs vectors instead + // of count_type because these are used as arguments to MPI_Allgatherv + // which expects int arguments. + auto counts = gather_all(int(values.size()), comm); + for (auto& c : counts) { + c *= traits::count(); } + auto displs = algorithms::make_index(counts); - /// Gather all of a distributed vector - /// Retains the meta data (i.e. vector partition) - template <typename T> - gathered_vector<T> gather_all_with_partition(const std::vector<T>& values) { - using gathered_type = gathered_vector<T>; - using count_type = typename gathered_vector<T>::count_type; - using traits = mpi_traits<T>; - - // We have to use int for the count and displs vectors instead - // of count_type because these are used as arguments to MPI_Allgatherv - // which expects int arguments. - auto counts = gather_all(int(values.size())); - for (auto& c : counts) { - c *= traits::count(); - } - auto displs = algorithms::make_index(counts); - - std::vector<T> buffer(displs.back()/traits::count()); + std::vector<T> buffer(displs.back()/traits::count()); + handle_mpi_error("MPI_Allgatherv", MPI_Allgatherv( - // send buffer // const_cast required for MPI implementations that don't use const* in their interfaces - const_cast<T*>(values.data()), counts[rank()], traits::mpi_type(), - // receive buffer - buffer.data(), counts.data(), displs.data(), traits::mpi_type(), - MPI_COMM_WORLD - ); - - for (auto& d : displs) { - d /= traits::count(); - } - - return gathered_type( - std::move(buffer), - std::vector<count_type>(displs.begin(), displs.end()) - ); + const_cast<T*>(values.data()), counts[rank(comm)], traits::mpi_type(), // send buffer + buffer.data(), counts.data(), displs.data(), traits::mpi_type(), // receive buffer + comm)); + + for (auto& d : displs) { + d /= traits::count(); } - template <typename T> - T reduce(T value, MPI_Op op, int root) { - using traits = mpi_traits<T>; - static_assert( - traits::is_mpi_native_type(), - "can only perform reductions on MPI native types"); + return gathered_type( + std::move(buffer), + std::vector<count_type>(displs.begin(), displs.end()) + ); +} - T result; +template <typename T> +T reduce(T value, MPI_Op op, int root, MPI_Comm comm) { + using traits = mpi_traits<T>; + static_assert(traits::is_mpi_native_type(), + "can only perform reductions on MPI native types"); - MPI_Reduce(&value, &result, 1, traits::mpi_type(), op, root, MPI_COMM_WORLD); + T result; - return result; - } + handle_mpi_error("MPI_Reduce", + MPI_Reduce(&value, &result, 1, traits::mpi_type(), op, root, comm)); - template <typename T> - T reduce(T value, MPI_Op op) { - using traits = mpi_traits<T>; - static_assert( - traits::is_mpi_native_type(), - "can only perform reductions on MPI native types"); + return result; +} - T result; +template <typename T> +T reduce(T value, MPI_Op op, MPI_Comm comm) { + using traits = mpi_traits<T>; + static_assert(traits::is_mpi_native_type(), + "can only perform reductions on MPI native types"); - MPI_Allreduce(&value, &result, 1, traits::mpi_type(), op, MPI_COMM_WORLD); + T result; - return result; - } + MPI_Allreduce(&value, &result, 1, traits::mpi_type(), op, comm); - template <typename T> - std::pair<T,T> minmax(T value) { - return {reduce<T>(value, MPI_MIN), reduce<T>(value, MPI_MAX)}; - } + return result; +} - template <typename T> - std::pair<T,T> minmax(T value, int root) { - return {reduce<T>(value, MPI_MIN, root), reduce<T>(value, MPI_MAX, root)}; - } +template <typename T> +std::pair<T,T> minmax(T value) { + return {reduce<T>(value, MPI_MIN), reduce<T>(value, MPI_MAX)}; +} - template <typename T> - T broadcast(T value, int root) { - static_assert( - true,//std::is_trivially_copyable<T>::value, - "broadcast can only be performed on trivally copyable types"); +template <typename T> +std::pair<T,T> minmax(T value, int root) { + return {reduce<T>(value, MPI_MIN, root), reduce<T>(value, MPI_MAX, root)}; +} - using traits = mpi_traits<T>; +template <typename T> +T broadcast(T value, int root, MPI_Comm comm) { + static_assert(std::is_trivially_copyable<T>::value, + "broadcast can only be performed on trivally copyable types"); - MPI_Bcast(&value, traits::count(), traits::mpi_type(), root, MPI_COMM_WORLD); + using traits = mpi_traits<T>; - return value; - } + handle_mpi_error("MPI_Bcast", + MPI_Bcast(&value, traits::count(), traits::mpi_type(), root, comm)); - template <typename T> - T broadcast(int root) { - static_assert( - true,//std::is_trivially_copyable<T>::value, - "broadcast can only be performed on trivally copyable types"); + return value; +} - using traits = mpi_traits<T>; - T value; +template <typename T> +T broadcast(int root, MPI_Comm comm) { + static_assert(std::is_trivially_copyable<T>::value, + "broadcast can only be performed on trivally copyable types"); - MPI_Bcast(&value, traits::count(), traits::mpi_type(), root, MPI_COMM_WORLD); + using traits = mpi_traits<T>; + T value; - return value; - } + handle_mpi_error("MPI_Bcast", + MPI_Bcast(&value, traits::count(), traits::mpi_type(), root, comm)); + + return value; +} } // namespace mpi } // namespace arb diff --git a/src/communication/mpi_context.hpp b/src/communication/mpi_context.hpp new file mode 100644 index 0000000000000000000000000000000000000000..c745990e141a0fe3ee4abdf0444818bef5e733db --- /dev/null +++ b/src/communication/mpi_context.hpp @@ -0,0 +1,65 @@ +#pragma once + +#include <vector> + +#include <communication/gathered_vector.hpp> +#include <communication/mpi.hpp> +#include <spike.hpp> + +namespace arb { + +struct mpi_context { + int size_; + int rank_; + MPI_Comm comm_; + + // throws std::runtime_error if MPI calls fail + mpi_context(MPI_Comm comm=MPI_COMM_WORLD): comm_(comm) { + size_ = arb::mpi::size(comm_); + rank_ = arb::mpi::rank(comm_); + } + + gathered_vector<arb::spike> + gather_spikes(const std::vector<arb::spike>& local_spikes) const { + return mpi::gather_all_with_partition(local_spikes, comm_); + } + + int id() const { + return rank_; + } + + int size() const { + return size_; + } + + template <typename T> + T min(T value) const { + return arb::mpi::reduce(value, MPI_MIN, comm_); + } + + template <typename T> + T max(T value) const { + return arb::mpi::reduce(value, MPI_MAX, comm_); + } + + template <typename T> + T sum(T value) const { + return arb::mpi::reduce(value, MPI_SUM, comm_); + } + + template <typename T> + std::vector<T> gather(T value, int root) const { + return mpi::gather(value, root, comm_); + } + + void barrier() const { + mpi::barrier(comm_); + } + + std::string name() const { + return "MPI"; + } +}; + +} // namespace arb + diff --git a/src/communication/mpi_global_policy.hpp b/src/communication/mpi_global_policy.hpp deleted file mode 100644 index 49ff2cb611040956195debe3351c5df1ddf75462..0000000000000000000000000000000000000000 --- a/src/communication/mpi_global_policy.hpp +++ /dev/null @@ -1,77 +0,0 @@ -#pragma once - -#ifndef ARB_HAVE_MPI -#error "mpi_global_policy.hpp should only be compiled in a ARB_HAVE_MPI build" -#endif - -#include <cstdint> -#include <stdexcept> -#include <type_traits> -#include <vector> - -#include <algorithms.hpp> -#include <common_types.hpp> -#include <communication/gathered_vector.hpp> -#include <communication/mpi.hpp> -#include <spike.hpp> - -namespace arb { -namespace communication { - -struct mpi_global_policy { - template <typename Spike> - static gathered_vector<Spike> - gather_spikes(const std::vector<Spike>& local_spikes) { - return mpi::gather_all_with_partition(local_spikes); - } - - static int id() { return mpi::rank(); } - - static int size() { return mpi::size(); } - - static void set_sizes(int comm_size, int num_local_cells) { - throw std::runtime_error( - "Attempt to set comm size for MPI global communication " - "policy, this is only permitted for dry run mode"); - } - - template <typename T> - static T min(T value) { - return arb::mpi::reduce(value, MPI_MIN); - } - - template <typename T> - static T max(T value) { - return arb::mpi::reduce(value, MPI_MAX); - } - - template <typename T> - static T sum(T value) { - return arb::mpi::reduce(value, MPI_SUM); - } - - template <typename T> - static std::vector<T> gather(T value, int root) { - return mpi::gather(value, root); - } - - static void barrier() { - mpi::barrier(); - } - - static void setup(int& argc, char**& argv) { - arb::mpi::init(&argc, &argv); - } - - static void teardown() { - arb::mpi::finalize(); - } - - static global_policy_kind kind() { return global_policy_kind::mpi; }; -}; - -using global_policy = mpi_global_policy; - -} // namespace communication -} // namespace arb - diff --git a/src/communication/serial_global_policy.hpp b/src/communication/serial_global_policy.hpp deleted file mode 100644 index ec853175bce423b08eb54271ac55eac63b86a41b..0000000000000000000000000000000000000000 --- a/src/communication/serial_global_policy.hpp +++ /dev/null @@ -1,70 +0,0 @@ -#pragma once - -#include <cstdint> -#include <stdexcept> -#include <type_traits> -#include <vector> - -#include <communication/gathered_vector.hpp> -#include <spike.hpp> - -namespace arb { -namespace communication { - -struct serial_global_policy { - template <typename Spike> - static gathered_vector<Spike> - gather_spikes(const std::vector<Spike>& local_spikes) { - using count_type = typename gathered_vector<Spike>::count_type; - return gathered_vector<Spike>( - std::vector<Spike>(local_spikes), - {0u, static_cast<count_type>(local_spikes.size())} - ); - } - - static int id() { - return 0; - } - - static int size() { - return 1; - } - - static void set_sizes(int comm_size, int num_local_cells) { - throw std::runtime_error( - "Attempt to set comm size for serial global communication " - "policy, this is only permitted for dry run mode"); - } - - template <typename T> - static T min(T value) { - return value; - } - - template <typename T> - static T max(T value) { - return value; - } - - template <typename T> - static T sum(T value) { - return value; - } - - template <typename T> - static std::vector<T> gather(T value, int) { - return {std::move(value)}; - } - - static void barrier() {} - - static void setup(int& argc, char**& argv) {} - static void teardown() {} - - static global_policy_kind kind() { return global_policy_kind::serial; }; -}; - -using global_policy = serial_global_policy; - -} // namespace communication -} // namespace arb diff --git a/src/domain_decomposition.hpp b/src/domain_decomposition.hpp index 166822f20c6632f98de5b4438b2564727d32fe18..a4f209d2d415b69b9ead9808e313513b2215d868 100644 --- a/src/domain_decomposition.hpp +++ b/src/domain_decomposition.hpp @@ -7,7 +7,6 @@ #include <backends.hpp> #include <common_types.hpp> -#include <communication/global_policy.hpp> #include <hardware/node_info.hpp> #include <recipe.hpp> #include <util/optional.hpp> diff --git a/src/helpers.hpp b/src/helpers.hpp deleted file mode 100644 index f11c86d07e09d3fa6fc7f3fc45eaa3b0d06138d7..0000000000000000000000000000000000000000 --- a/src/helpers.hpp +++ /dev/null @@ -1,15 +0,0 @@ -#include <algorithm> - -namespace arbmc { -namespace range{ - - template <typename C> - typename C::value_type - sum(C const& c) - { - using value_type = typename C::value_type; - return std::accumulate(c.begin(), c.end(), value_type{0}); - } - -} -} diff --git a/src/io/exporter.hpp b/src/io/exporter.hpp index 9435cea60dac82f82ce765848cae26083f8e4a32..7804d9e4a3d360752a8e9685583d73ee5a7c8159 100644 --- a/src/io/exporter.hpp +++ b/src/io/exporter.hpp @@ -13,7 +13,6 @@ namespace io { // Exposes one virtual functions: // do_export(vector<type>) receiving a vector of parameters to export -template <typename CommunicationPolicy> class exporter { public: // Performs the export of the data diff --git a/src/io/exporter_spike_file.hpp b/src/io/exporter_spike_file.hpp index 06e709b7bd68f2bda1a0cb4545e022b0b0da381a..24fc74d6c5b83a7ca5ddec2da06dcdaaf75418a1 100644 --- a/src/io/exporter_spike_file.hpp +++ b/src/io/exporter_spike_file.hpp @@ -18,11 +18,8 @@ namespace arb { namespace io { -template <typename CommunicationPolicy> -class exporter_spike_file : public exporter<CommunicationPolicy> { +class exporter_spike_file: public exporter { public: - using communication_policy_type = CommunicationPolicy; - // Constructor // over_write if true will overwrite the specified output file (default = true) // output_path relative or absolute path @@ -32,11 +29,12 @@ public: const std::string& file_name, const std::string& path, const std::string& file_extension, + int index, bool over_write=true) { file_path_ = create_output_file_path( - file_name, path, file_extension, communication_policy_.id()); + file_name, path, file_extension, index); //test if the file exist and depending on over_write throw or delete if (!over_write && util::file_exists(file_path_)) { @@ -86,8 +84,6 @@ private: // Handle to opened file handle std::ofstream file_handle_; std::string file_path_; - - communication_policy_type communication_policy_; }; } //communication diff --git a/src/load_balance.hpp b/src/load_balance.hpp index 817a7b4e4d6c6a497e84c7ede08724d83910dee6..c786e59d85ae2e535a338115c961905413ea1446 100644 --- a/src/load_balance.hpp +++ b/src/load_balance.hpp @@ -1,10 +1,12 @@ -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <domain_decomposition.hpp> #include <hardware/node_info.hpp> #include <recipe.hpp> namespace arb { -domain_decomposition partition_load_balance(const recipe& rec, hw::node_info nd); +domain_decomposition partition_load_balance(const recipe& rec, + hw::node_info nd, + const distributed_context* ctx); } // namespace arb diff --git a/src/partition_load_balance.cpp b/src/partition_load_balance.cpp index f3ec4f91dfe8e3958e739ecee76d9d4ae21eaf8b..9be17786619f01ad8f5881498fbdcf10d35fcf38 100644 --- a/src/partition_load_balance.cpp +++ b/src/partition_load_balance.cpp @@ -1,4 +1,4 @@ -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <domain_decomposition.hpp> #include <hardware/node_info.hpp> #include <recipe.hpp> @@ -6,7 +6,10 @@ namespace arb { -domain_decomposition partition_load_balance(const recipe& rec, hw::node_info nd) { +domain_decomposition partition_load_balance(const recipe& rec, + hw::node_info nd, + const distributed_context* ctx) +{ struct partition_gid_domain { partition_gid_domain(std::vector<cell_gid_type> divs): gid_divisions(std::move(divs)) @@ -22,8 +25,8 @@ domain_decomposition partition_load_balance(const recipe& rec, hw::node_info nd) using util::make_span; - unsigned num_domains = communication::global_policy::size(); - unsigned domain_id = communication::global_policy::id(); + unsigned num_domains = ctx->size(); + unsigned domain_id = ctx->id(); auto num_global_cells = rec.num_cells(); auto dom_size = [&](unsigned dom) -> cell_gid_type { @@ -89,8 +92,6 @@ domain_decomposition partition_load_balance(const recipe& rec, hw::node_info nd) d.gid_domain = partition_gid_domain(std::move(gid_divisions)); return d; - - //return domain_decomposition(num_domains, domain_id, num_local_cells, num_global_cells, std::move(groups)); } } // namespace arb diff --git a/src/profiling/meter_manager.cpp b/src/profiling/meter_manager.cpp index d745db6b35d9832be7170a5671491dffc961d1a5..34d50b8a9ead2e4dce90bde73624e2a1ab463cbd 100644 --- a/src/profiling/meter_manager.cpp +++ b/src/profiling/meter_manager.cpp @@ -1,5 +1,5 @@ +#include <communication/distributed_context.hpp> #include <algorithms.hpp> -#include <communication/global_policy.hpp> #include <util/hostname.hpp> #include <util/strprintf.hpp> #include <util/rangeutil.hpp> @@ -12,26 +12,25 @@ namespace arb { namespace util { -measurement::measurement( - std::string n, std::string u, const std::vector<double>& readings): +measurement::measurement(std::string n, std::string u, + const std::vector<double>& readings, + const distributed_context* ctx): name(std::move(n)), units(std::move(u)) { - using gcom = communication::global_policy; - // Assert that the same number of readings were taken on every domain. const auto num_readings = readings.size(); - if (gcom::min(num_readings)!=gcom::max(num_readings)) { + if (ctx->min(num_readings)!=ctx->max(num_readings)) { throw std::out_of_range( "the number of checkpoints in the \""+name+"\" meter do not match across domains"); } // Gather across all of the domains onto the root domain. for (auto r: readings) { - measurements.push_back(gcom::gather(r, 0)); + measurements.push_back(ctx->gather(r, 0)); } } -meter_manager::meter_manager() { +meter_manager::meter_manager(const distributed_context* ctx): glob_ctx_(ctx) { if (auto m = make_memory_meter()) { meters_.push_back(std::move(m)); } @@ -54,7 +53,7 @@ void meter_manager::start() { } // Enforce a global barrier after taking the time stamp - communication::global_policy::barrier(); + glob_ctx_->barrier(); start_time_ = timer_type::tic(); }; @@ -74,7 +73,7 @@ void meter_manager::checkpoint(std::string name) { } // Synchronize all domains before setting start time for the next interval - communication::global_policy::barrier(); + glob_ctx_->barrier(); start_time_ = timer_type::tic(); } @@ -90,6 +89,10 @@ const std::vector<double>& meter_manager::times() const { return times_; } +const distributed_context* meter_manager::context() const { + return glob_ctx_; +} + nlohmann::json to_json(const measurement& mnt) { nlohmann::json measurements; for (const auto& m: mnt.measurements) { @@ -108,20 +111,20 @@ nlohmann::json to_json(const measurement& mnt) { meter_report make_meter_report(const meter_manager& manager) { meter_report report; - using gcom = communication::global_policy; + auto ctx = manager.context(); // Add the times to the meter outputs - report.meters.push_back(measurement("time", "s", manager.times())); + report.meters.push_back(measurement("time", "s", manager.times(), ctx)); // Gather the meter outputs into a json Array for (auto& m: manager.meters()) { report.meters.push_back( - measurement(m->name(), m->units(), m->measurements())); + measurement(m->name(), m->units(), m->measurements(), ctx)); } // Gather a vector with the names of the node that each rank is running on. auto host = hostname(); - auto hosts = gcom::gather(host? *host: "unknown", 0); + auto hosts = ctx->gather(host? *host: "unknown", 0); report.hosts = hosts; // Count the number of unique hosts. @@ -130,9 +133,8 @@ meter_report make_meter_report(const meter_manager& manager) { auto num_hosts = std::distance(hosts.begin(), std::unique(hosts.begin(), hosts.end())); report.checkpoints = manager.checkpoint_names(); - report.num_domains = gcom::size(); + report.num_domains = ctx->size(); report.num_hosts = num_hosts; - report.communication_policy = gcom::kind(); return report; } @@ -141,7 +143,6 @@ nlohmann::json to_json(const meter_report& report) { return { {"checkpoints", report.checkpoints}, {"num_domains", report.num_domains}, - {"global_model", std::to_string(report.communication_policy)}, {"meters", util::transform_view(report.meters, [](measurement const& m){return to_json(m);})}, {"hosts", report.hosts}, }; diff --git a/src/profiling/meter_manager.hpp b/src/profiling/meter_manager.hpp index 5eaae48f11e0315d1c3291b6be32c34a688819cf..f5d3eef2f1bb399cb99a98772a757f68fa0467f8 100644 --- a/src/profiling/meter_manager.hpp +++ b/src/profiling/meter_manager.hpp @@ -3,7 +3,7 @@ #include <memory> #include <vector> -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <json/json.hpp> #include "meter.hpp" @@ -26,7 +26,7 @@ struct measurement { std::string name; std::string units; std::vector<std::vector<double>> measurements; - measurement(std::string, std::string, const std::vector<double>&); + measurement(std::string, std::string, const std::vector<double>&, const distributed_context*); }; class meter_manager { @@ -42,10 +42,13 @@ private: std::vector<std::unique_ptr<meter>> meters_; std::vector<std::string> checkpoint_names_; + const distributed_context* glob_ctx_; + public: - meter_manager(); + meter_manager(const distributed_context* ctx); void start(); void checkpoint(std::string name); + const distributed_context* context() const; const std::vector<std::unique_ptr<meter>>& meters() const; const std::vector<std::string>& checkpoint_names() const; @@ -57,7 +60,6 @@ struct meter_report { std::vector<std::string> checkpoints; unsigned num_domains; unsigned num_hosts; - arb::communication::global_policy_kind communication_policy; std::vector<measurement> meters; std::vector<std::string> hosts; }; diff --git a/src/simulation.cpp b/src/simulation.cpp index d7ab3be0d4593908e736d897bbb5ad3c07eba0f3..ca5beb589cd4009b1778c0514d5484dadaf13886 100644 --- a/src/simulation.cpp +++ b/src/simulation.cpp @@ -15,8 +15,11 @@ namespace arb { -simulation::simulation(const recipe& rec, const domain_decomposition& decomp): - communicator_(rec, decomp) +simulation::simulation(const recipe& rec, + const domain_decomposition& decomp, + const distributed_context* ctx): + context_(ctx), + communicator_(rec, decomp, ctx) { const auto num_local_cells = communicator_.num_local_cells(); diff --git a/src/simulation.hpp b/src/simulation.hpp index 441807335289afd5db700862be2d890335683b93..0acf7419ac05b95526397653f59ea6e52c5fc6c4 100644 --- a/src/simulation.hpp +++ b/src/simulation.hpp @@ -8,7 +8,7 @@ #include <cell_group.hpp> #include <common_types.hpp> #include <communication/communicator.hpp> -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <domain_decomposition.hpp> #include <epoch.hpp> #include <recipe.hpp> @@ -22,10 +22,9 @@ namespace arb { class simulation { public: - using communicator_type = communication::communicator<communication::global_policy>; using spike_export_function = std::function<void(const std::vector<spike>&)>; - simulation(const recipe& rec, const domain_decomposition& decomp); + simulation(const recipe& rec, const domain_decomposition& decomp, const distributed_context* ctx); void reset(); @@ -68,6 +67,9 @@ private: std::size_t num_groups() const; + // communication context + const distributed_context* context_; + // keep track of information about the current integration interval epoch epoch_; @@ -89,7 +91,7 @@ private: util::optional<cell_size_type> local_cell_index(cell_gid_type); - communicator_type communicator_; + communicator communicator_; // Convenience functions that map the spike buffers onto the appropriate // integration interval. diff --git a/src/spike.hpp b/src/spike.hpp index 067c23bbcb3e6a0140292877e31f7b488e2d7dd5..08a807568bffb9d48fe4faed348db2ae1c166121 100644 --- a/src/spike.hpp +++ b/src/spike.hpp @@ -19,6 +19,10 @@ struct basic_spike { basic_spike(id_type s, time_type t): source(s), time(t) {} + + friend bool operator==(const basic_spike& l, const basic_spike& r) { + return l.source==r.source && l.time==r.time; + } }; /// Standard specialization: diff --git a/src/util/pp_util.hpp b/src/util/pp_util.hpp new file mode 100644 index 0000000000000000000000000000000000000000..1347fcb71f8b4fab822ce6fc0d74ff3f96453310 --- /dev/null +++ b/src/util/pp_util.hpp @@ -0,0 +1,52 @@ +#pragma once + +/* + * preprocessor macro utilities + */ + +/* + * ARB_PP_FOREACH(macro , args...) + * expands macro for each entry in args... + * + * example: + * + * #define PROTO(T) T foo(T); + * ARB_PP_FOREACH(PROTO, int, float, double) + * + * expands to + * + * int foo(int); float foo(float); double foo(double); + * + * example: + * + * #define ALLOCATE(name) int* name = new int; + * #define DELETE(name) delete name; + * #define NAMES a, b, c + * + * ALLOCATE(NAMES) + * DELETE(NAMES) + * + * expands to + * + * int* a = new int; int* b = new int; int* c = new int; + * delete a; delete b; delete c; +*/ + +// Implementation macros for ARB_PP_FOREACH: + +#define ARB_PP_FOREACH_1_(M, A, ...) M(A) +#define ARB_PP_FOREACH_2_(M, A, ...) M(A) ARB_PP_FOREACH_1_(M, __VA_ARGS__) +#define ARB_PP_FOREACH_3_(M, A, ...) M(A) ARB_PP_FOREACH_2_(M, __VA_ARGS__) +#define ARB_PP_FOREACH_4_(M, A, ...) M(A) ARB_PP_FOREACH_3_(M, __VA_ARGS__) +#define ARB_PP_FOREACH_5_(M, A, ...) M(A) ARB_PP_FOREACH_4_(M, __VA_ARGS__) +#define ARB_PP_FOREACH_6_(M, A, ...) M(A) ARB_PP_FOREACH_5_(M, __VA_ARGS__) +#define ARB_PP_FOREACH_7_(M, A, ...) M(A) ARB_PP_FOREACH_6_(M, __VA_ARGS__) +#define ARB_PP_FOREACH_8_(M, A, ...) M(A) ARB_PP_FOREACH_7_(M, __VA_ARGS__) +#define ARB_PP_FOREACH_9_(M, A, ...) M(A) ARB_PP_FOREACH_8_(M, __VA_ARGS__) +#define ARB_PP_FOREACH_10_(M, A, ...) M(A) ARB_PP_FOREACH_9_(M, __VA_ARGS__) +#define ARB_PP_GET_11TH_ARGUMENT_(a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, ...) a11 + +// Apply macro in first argument to each of the remaining arguments (up to 10). +// Note: if __VA_ARGS__ has size N, when it is expanded the 11th argument is the ARB_PP_FOREACH_N_ macro. +#define ARB_PP_FOREACH(M, ...)\ +ARB_PP_GET_11TH_ARGUMENT_(__VA_ARGS__, ARB_PP_FOREACH_10_, ARB_PP_FOREACH_9_, ARB_PP_FOREACH_8_, ARB_PP_FOREACH_7_, ARB_PP_FOREACH_6_, ARB_PP_FOREACH_5_, ARB_PP_FOREACH_4_, ARB_PP_FOREACH_3_, ARB_PP_FOREACH_2_, ARB_PP_FOREACH_1_)(M, __VA_ARGS__) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 722549068405e3c271cbe9e0ed826f902401b1f0..bd115d75f26319145740c17131a6e90fb241660d 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -10,9 +10,6 @@ add_subdirectory(validation) # Test for the internode communication (eg. mpi) add_subdirectory(global_communication) -# Tests for performance: This could include stand alone tests. These do not necessarily be run automatically -add_subdirectory(performance) - # Microbenchmarks. # Attempt to update git submodule if required. check_git_submodule(google_bench "${CMAKE_CURRENT_SOURCE_DIR}/ubench/google-benchmark") diff --git a/tests/global_communication/mpi_listener.hpp b/tests/global_communication/mpi_listener.hpp index 4049d0965565d378cfe06be1d7681880f7a8f642..0226666f8de16f9a9a9782f82ac6f68a009bc39e 100644 --- a/tests/global_communication/mpi_listener.hpp +++ b/tests/global_communication/mpi_listener.hpp @@ -4,7 +4,7 @@ #include <fstream> #include <stdexcept> -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include "../gtest.h" @@ -35,6 +35,7 @@ private: int test_case_failures_; int test_case_tests_; int test_failures_; + const arb::distributed_context* context_; bool does_print() const { return rank_==0; @@ -64,9 +65,9 @@ private: } public: - mpi_listener(std::string f_base="") { - rank_ = arb::communication::global_policy::id(); - size_ = arb::communication::global_policy::size(); + mpi_listener(std::string f_base, const arb::distributed_context* ctx): context_(ctx) { + rank_ = context_->id(); + size_ = context_->size(); if (f_base.empty()) { return; @@ -148,8 +149,7 @@ public: test_case_tests_++; // count the number of ranks that had errors - int global_errors = - arb::communication::global_policy::sum(test_failures_>0 ? 1 : 0); + int global_errors = context_->sum(test_failures_>0 ? 1 : 0); if (global_errors>0) { test_case_failures_++; printf_helper(" GLOBAL_FAIL on %d ranks\n", global_errors); diff --git a/tests/global_communication/test.cpp b/tests/global_communication/test.cpp index d0b917e044fb4869a0851568f217c66f80683408..a119d347b6769427edc35b554a2a4d57155b44e8 100644 --- a/tests/global_communication/test.cpp +++ b/tests/global_communication/test.cpp @@ -9,12 +9,14 @@ #include <tinyopt.hpp> #include <communication/communicator.hpp> -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <util/ioutil.hpp> using namespace arb; +distributed_context g_context; + const char* usage_str = "[OPTION]...\n" "\n" @@ -22,11 +24,13 @@ const char* usage_str = " -h, --help Display usage information and exit\n"; int main(int argc, char **argv) { - using policy = communication::global_policy; // We need to set the communicator policy at the top level // this allows us to build multiple communicators in the tests - communication::global_policy_guard global_guard(argc, argv); + #ifdef ARB_HAVE_MPI + mpi::scoped_guard guard(&argc, &argv); + g_context = mpi_context(MPI_COMM_WORLD); + #endif // initialize google test environment testing::InitGoogleTest(&argc, argv); @@ -36,7 +40,7 @@ int main(int argc, char **argv) { // first delete the original printer delete listeners.Release(listeners.default_result_printer()); // now add our custom printer - listeners.Append(new mpi_listener("results_global_communication")); + listeners.Append(new mpi_listener("results_global_communication", &g_context)); int return_value = 0; try { @@ -49,7 +53,8 @@ int main(int argc, char **argv) { // Note that this must be set again for each test that uses a different // number of cells per domain, e.g. // policy::set_sizes(policy::size(), new_cells_per_rank) - policy::set_sizes(*comm_size, 0); + // TODO: fix when dry run mode reimplemented + //policy::set_sizes(*comm_size, 0); } else if (auto o = to::parse_opt(arg, 'h', "help")) { to::usage(argv[0], usage_str); @@ -77,5 +82,5 @@ int main(int argc, char **argv) { // perform global collective, to ensure that all ranks return // the same exit code - return policy::max(return_value); + return g_context.max(return_value); } diff --git a/tests/global_communication/test.hpp b/tests/global_communication/test.hpp new file mode 100644 index 0000000000000000000000000000000000000000..12b98c565e38bda2612d8a4f1c8e5e61b432c3c6 --- /dev/null +++ b/tests/global_communication/test.hpp @@ -0,0 +1,7 @@ +#pragma once + +#include <communication/distributed_context.hpp> + +// Global context is a global variable, set in the main() funtion of the main +// test driver test.cpp. +extern arb::distributed_context g_context; diff --git a/tests/global_communication/test_communicator.cpp b/tests/global_communication/test_communicator.cpp index 90d199131c98997619172a2190c282b272d9167c..a41c06f1c80ee6413f8ba664c7597c0d7079d5cf 100644 --- a/tests/global_communication/test_communicator.cpp +++ b/tests/global_communication/test_communicator.cpp @@ -1,10 +1,11 @@ #include "../gtest.h" +#include "test.hpp" #include <stdexcept> #include <vector> #include <communication/communicator.hpp> -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <hardware/node_info.hpp> #include <load_balance.hpp> #include <util/filter.hpp> @@ -13,78 +14,62 @@ using namespace arb; -using communicator_type = communication::communicator<communication::global_policy>; - static bool is_dry_run() { - return communication::global_policy::kind() == - communication::global_policy_kind::dryrun; + //return global_policy::kind() == global_policy_kind::dryrun; + return false; } TEST(communicator, policy_basics) { - using policy = communication::global_policy; - - const auto num_domains = policy::size(); - const auto rank = policy::id(); + const auto num_domains = g_context.size(); + const auto rank = g_context.id(); - EXPECT_EQ(policy::min(rank), 0); + EXPECT_EQ(g_context.min(rank), 0); if (!is_dry_run()) { - EXPECT_EQ(policy::max(rank), num_domains-1); + EXPECT_EQ(g_context.max(rank), num_domains-1); } } -// Spike gathering works with a generic spike type that -// * has a member called source that -// * the source must be of a type that has a gid member -// -// Here we defined proxy types for testing the gather_spikes functionality. -// These are a little bit simpler than the spike and source types used inside -// Arbor, to simplify the testing. - -// Proxy for a spike source, which represents gid as an integer. -struct source_proxy { - source_proxy() = default; - source_proxy(int g): gid(g) {} - - int gid = 0; -}; - -bool operator==(int other, source_proxy s) {return s.gid==other;}; -bool operator==(source_proxy s, int other) {return s.gid==other;}; - -// Proxy for a spike. -// The value member can be used to test if the spike and its contents were -// successfully gathered. -struct spike_proxy { - spike_proxy() = default; - spike_proxy(int s, int v): source(s), value(v) {} - source_proxy source = 0; - int value = 0; -}; +// Wrappers for creating and testing spikes used +// to test that spikes are correctly exchanged. +arb::spike gen_spike(int source, int value) { + arb::spike s; + s.source.gid = source; + s.source.index = value; + return s; +} + +int get_source(const arb::spike& s) { + return s.source.gid; +} + +int get_value(const arb::spike& s) { + return s.source.index; +} // Test low level spike_gather function when each domain produces the same // number of spikes in the pattern used by dry run mode. TEST(communicator, gather_spikes_equal) { - using policy = communication::global_policy; - - const auto num_domains = policy::size(); - const auto rank = policy::id(); + const auto num_domains = g_context.size(); + const auto rank = g_context.id(); const auto n_local_spikes = 10; - const auto n_local_cells = n_local_spikes; + /* + const auto n_local_cells = n_local_spikes; // Important: set up meta-data in dry run back end. if (is_dry_run()) { - policy::set_sizes(policy::size(), n_local_cells); + g_context.set_sizes(g_context.size(), n_local_cells); } + */ // Create local spikes for communication. - std::vector<spike_proxy> local_spikes; + std::vector<spike> local_spikes; for (auto i=0; i<n_local_spikes; ++i) { - local_spikes.push_back(spike_proxy{i+rank*n_local_spikes, rank}); + local_spikes.push_back(gen_spike(i+rank*n_local_spikes, rank)); } // Perform exchange - const auto global_spikes = policy::gather_spikes(local_spikes); + const auto global_spikes = g_context.gather_spikes(local_spikes); // Test that partition information is correct const auto& part = global_spikes.partition(); @@ -104,15 +89,15 @@ TEST(communicator, gather_spikes_equal) { // is a list of num_domains*n_local_spikes spikes that have // contiguous source gid const auto& spikes = global_spikes.values(); - EXPECT_EQ(n_local_spikes*policy::size(), int(spikes.size())); + EXPECT_EQ(n_local_spikes*g_context.size(), int(spikes.size())); for (auto i=0u; i<spikes.size(); ++i) { const auto s = spikes[i]; EXPECT_EQ(i, unsigned(s.source.gid)); if (is_dry_run()) { - EXPECT_EQ(0, s.value); + EXPECT_EQ(0, get_value(s)); } else { - EXPECT_EQ(int(i)/n_local_spikes, s.value); + EXPECT_EQ(int(i)/n_local_spikes, get_value(s)); } } } @@ -125,10 +110,8 @@ TEST(communicator, gather_spikes_variant) { // number of spikes. if (is_dry_run()) return; - using policy = communication::global_policy; - - const auto num_domains = policy::size(); - const auto rank = policy::id(); + const auto num_domains = g_context.size(); + const auto rank = g_context.id(); // Parameter used to scale the number of spikes generated on successive // ranks. @@ -145,14 +128,14 @@ TEST(communicator, gather_spikes_variant) { // generating the following number of spikes // [ 0, scale, 2*scale, 3*scale, ..., (num_domains-1)*scale ] // i.e. 0 spikes on the first rank, scale spikes on the second, and so on. - std::vector<spike_proxy> local_spikes; + std::vector<spike> local_spikes; const auto local_start_id = sumn(rank-1); for (auto i=0; i<n_local_spikes; ++i) { - local_spikes.push_back(spike_proxy{local_start_id+i, rank}); + local_spikes.push_back(gen_spike(local_start_id+i, rank)); } // Perform exchange - const auto global_spikes = policy::gather_spikes(local_spikes); + const auto global_spikes = g_context.gather_spikes(local_spikes); // Test that partition information is correct const auto& part =global_spikes.partition(); @@ -169,8 +152,8 @@ TEST(communicator, gather_spikes_variant) { const auto last_spike = global_spikes.values().begin() + sumn(domain); const auto spikes = util::make_range(first_spike, last_spike); for (auto s: spikes) { - EXPECT_EQ(s.value, domain); - EXPECT_EQ(s.source, source++); + EXPECT_EQ(get_value(s), domain); + EXPECT_EQ(get_source(s), source++); } } } @@ -182,7 +165,7 @@ namespace { public: ring_recipe(cell_size_type s): size_(s), - ranks_(communication::global_policy::size()) + ranks_(g_context.size()) {} cell_size_type num_cells() const override { @@ -246,7 +229,7 @@ namespace { public: all2all_recipe(cell_size_type s): size_(s), - ranks_(communication::global_policy::size()) + ranks_(g_context.size()) {} cell_size_type num_cells() const override { @@ -312,12 +295,9 @@ namespace { } } -using policy = communication::global_policy; -using comm_type = communication::communicator<policy>; - template <typename F> ::testing::AssertionResult -test_ring(const domain_decomposition& D, comm_type& C, F&& f) { +test_ring(const domain_decomposition& D, communicator& C, F&& f) { using util::transform_view; using util::assign_from; using util::filter; @@ -332,10 +312,10 @@ test_ring(const domain_decomposition& D, comm_type& C, F&& f) { // gather the global set of spikes auto global_spikes = C.exchange(local_spikes); - if (global_spikes.size()!=policy::sum(local_spikes.size())) { + if (global_spikes.size()!=g_context.sum(local_spikes.size())) { return ::testing::AssertionFailure() << "the number of gathered spikes " << global_spikes.size() << " doesn't match the expected " - << policy::sum(local_spikes.size()); + << g_context.sum(local_spikes.size()); } // generate the events @@ -381,7 +361,7 @@ TEST(communicator, ring) using util::make_span; // construct a homogeneous network of 10*n_domain identical cells in a ring - unsigned N = policy::size(); + unsigned N = g_context.size(); unsigned n_local = 10u; unsigned n_global = n_local*N; @@ -389,8 +369,8 @@ TEST(communicator, ring) auto R = ring_recipe(n_global); // use a node decomposition that reflects the resources available // on the node that the test is running on, including gpus. - const auto D = partition_load_balance(R, hw::node_info()); - auto C = communication::communicator<policy>(R, D); + const auto D = partition_load_balance(R, hw::node_info(), &g_context); + auto C = communicator(R, D, &g_context); // every cell fires EXPECT_TRUE(test_ring(D, C, [](cell_gid_type g){return true;})); @@ -404,7 +384,7 @@ TEST(communicator, ring) template <typename F> ::testing::AssertionResult -test_all2all(const domain_decomposition& D, comm_type& C, F&& f) { +test_all2all(const domain_decomposition& D, communicator& C, F&& f) { using util::transform_view; using util::assign_from; using util::filter; @@ -423,10 +403,10 @@ test_all2all(const domain_decomposition& D, comm_type& C, F&& f) { // gather the global set of spikes auto global_spikes = C.exchange(local_spikes); - if (global_spikes.size()!=policy::sum(local_spikes.size())) { + if (global_spikes.size()!=g_context.sum(local_spikes.size())) { return ::testing::AssertionFailure() << "the number of gathered spikes " << global_spikes.size() << " doesn't match the expected " - << policy::sum(local_spikes.size()); + << g_context.sum(local_spikes.size()); } // generate the events @@ -476,7 +456,7 @@ TEST(communicator, all2all) using util::make_span; // construct a homogeneous network of 10*n_domain identical cells in a ring - unsigned N = policy::size(); + unsigned N = g_context.size(); unsigned n_local = 10u; unsigned n_global = n_local*N; @@ -484,8 +464,8 @@ TEST(communicator, all2all) auto R = all2all_recipe(n_global); // use a node decomposition that reflects the resources available // on the node that the test is running on, including gpus. - const auto D = partition_load_balance(R, hw::node_info()); - auto C = communication::communicator<policy>(R, D); + const auto D = partition_load_balance(R, hw::node_info(), &g_context); + auto C = communicator(R, D, &g_context); // every cell fires EXPECT_TRUE(test_all2all(D, C, [](cell_gid_type g){return true;})); diff --git a/tests/global_communication/test_domain_decomposition.cpp b/tests/global_communication/test_domain_decomposition.cpp index 0c7205905f75b02dd74fe8982a5f9659716beafe..a586e0dacc305b92f1cb1ac1ae92800ef480a987 100644 --- a/tests/global_communication/test_domain_decomposition.cpp +++ b/tests/global_communication/test_domain_decomposition.cpp @@ -8,16 +8,15 @@ #include <vector> #include <communication/communicator.hpp> -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <hardware/node_info.hpp> #include <load_balance.hpp> #include "../simple_recipes.hpp" +#include "test.hpp" using namespace arb; -using communicator_type = communication::communicator<communication::global_policy>; - namespace { // Dummy recipes types for testing. @@ -65,8 +64,8 @@ namespace { } TEST(domain_decomposition, homogeneous_population) { - const auto N = communication::global_policy::size(); - const auto I = communication::global_policy::id(); + const auto N = g_context.size(); + const auto I = g_context.id(); { // Test on a node with 1 cpu core and no gpus. // We assume that all cells will be put into cell groups of size 1. @@ -77,7 +76,7 @@ TEST(domain_decomposition, homogeneous_population) { // 10 cells per domain unsigned n_local = 10; unsigned n_global = n_local*N; - const auto D = partition_load_balance(homo_recipe(n_global, dummy_cell{}), nd); + const auto D = partition_load_balance(homo_recipe(n_global, dummy_cell{}), nd, &g_context); EXPECT_EQ(D.num_global_cells, n_global); EXPECT_EQ(D.num_local_cells, n_local); @@ -108,7 +107,7 @@ TEST(domain_decomposition, homogeneous_population) { // 10 cells per domain unsigned n_local = 10; unsigned n_global = n_local*N; - const auto D = partition_load_balance(homo_recipe(n_global, dummy_cell{}), nd); + const auto D = partition_load_balance(homo_recipe(n_global, dummy_cell{}), nd, &g_context); EXPECT_EQ(D.num_global_cells, n_global); EXPECT_EQ(D.num_local_cells, n_local); @@ -134,8 +133,8 @@ TEST(domain_decomposition, homogeneous_population) { } TEST(domain_decomposition, heterogeneous_population) { - const auto N = communication::global_policy::size(); - const auto I = communication::global_policy::id(); + const auto N = g_context.size(); + const auto I = g_context.id(); { // Test on a node with 1 cpu core and no gpus. // We assume that all cells will be put into cell groups of size 1. @@ -148,7 +147,7 @@ TEST(domain_decomposition, heterogeneous_population) { const unsigned n_global = n_local*N; const unsigned n_local_grps = n_local; // 1 cell per group auto R = hetero_recipe(n_global); - const auto D = partition_load_balance(R, nd); + const auto D = partition_load_balance(R, nd, &g_context); EXPECT_EQ(D.num_global_cells, n_global); EXPECT_EQ(D.num_local_cells, n_local); diff --git a/tests/global_communication/test_exporter_spike_file.cpp b/tests/global_communication/test_exporter_spike_file.cpp index 500a8985fb4c679558fe14ee54507b43fafb6ef8..8f421db35b4bae29c8f3b18e6d66bcfdc01ecf42 100644 --- a/tests/global_communication/test_exporter_spike_file.cpp +++ b/tests/global_communication/test_exporter_spike_file.cpp @@ -1,4 +1,5 @@ #include "../gtest.h" +#include "test.hpp" #include <cstdio> #include <fstream> @@ -7,16 +8,13 @@ #include <vector> #include <communication/communicator.hpp> -#include <communication/global_policy.hpp> +#include <communication/distributed_context.hpp> #include <io/exporter_spike_file.hpp> #include <spike.hpp> class exporter_spike_file_fixture : public ::testing::Test { protected: - using communicator_type = arb::communication::global_policy; - - using exporter_type = - arb::io::exporter_spike_file<communicator_type>; + using exporter_type = arb::io::exporter_spike_file; std::string file_name_; std::string path_; @@ -27,7 +25,7 @@ protected: file_name_("spikes_exporter_spike_file_fixture"), path_("./"), extension_("gdf"), - index_(communicator_type::id()) + index_(g_context.id()) {} std::string get_standard_file_name() { @@ -48,15 +46,19 @@ protected: }; TEST_F(exporter_spike_file_fixture, constructor) { - exporter_type exporter(file_name_, path_, extension_, true); + // Create an exporter, and overwrite if neccesary. + exporter_type exporter(file_name_, path_, extension_, index_, true); - //test if the file exist and depending on over_write throw or delete - std::ifstream f(get_standard_file_name()); - EXPECT_TRUE(f.good()); + // Assert that the output file exists + { + std::ifstream f(get_standard_file_name()); + ASSERT_TRUE(f.good()); + } - // We now know the file exists, so create a new exporter with overwrite false + // Create a new exporter with overwrite false. This should throw, because an + // outut file with the same name is in use by exporter. try { - exporter_type exporter1(file_name_, path_, extension_, false); + exporter_type exporter1(file_name_, path_, extension_, index_, false); FAIL() << "expected a file already exists error"; } catch (const std::runtime_error& err) { @@ -84,7 +86,7 @@ TEST_F(exporter_spike_file_fixture, create_output_file_path) { TEST_F(exporter_spike_file_fixture, do_export) { { - exporter_type exporter(file_name_, path_, extension_); + exporter_type exporter(file_name_, path_, extension_, g_context.id()); // Create some spikes std::vector<arb::spike> spikes; diff --git a/tests/global_communication/test_mpi.cpp b/tests/global_communication/test_mpi.cpp index 94c1559de19df0e1c26b3b1ce4330def6c4a71aa..18f98c0a708ff47b8a57114a15d3444d469e3960 100644 --- a/tests/global_communication/test_mpi.cpp +++ b/tests/global_communication/test_mpi.cpp @@ -1,16 +1,15 @@ #ifdef ARB_HAVE_MPI #include "../gtest.h" +#include "test.hpp" #include <cstring> #include <vector> -#include <communication/global_policy.hpp> #include <communication/mpi.hpp> #include <util/rangeutil.hpp> using namespace arb; -using namespace arb::communication; struct big_thing { big_thing() {} @@ -30,9 +29,8 @@ private: }; TEST(mpi, gather_all) { - using policy = mpi_global_policy; - - int id = policy::id(); + int id = mpi::rank(MPI_COMM_WORLD); + int size = mpi::size(MPI_COMM_WORLD); std::vector<big_thing> data; // odd ranks: three items; even ranks: one item. @@ -44,7 +42,7 @@ TEST(mpi, gather_all) { } std::vector<big_thing> expected; - for (int i = 0; i<policy::size(); ++i) { + for (int i = 0; i<size; ++i) { if (i%2) { int rank_data[] = { i, i+7, i+8 }; util::append(expected, rank_data); @@ -55,15 +53,14 @@ TEST(mpi, gather_all) { } } - auto gathered = mpi::gather_all(data); + auto gathered = mpi::gather_all(data, MPI_COMM_WORLD); EXPECT_EQ(expected, gathered); } TEST(mpi, gather_all_with_partition) { - using policy = mpi_global_policy; - - int id = policy::id(); + int id = mpi::rank(MPI_COMM_WORLD); + int size = mpi::size(MPI_COMM_WORLD); std::vector<big_thing> data; // odd ranks: three items; even ranks: one item. @@ -78,7 +75,7 @@ TEST(mpi, gather_all_with_partition) { std::vector<unsigned> expected_divisions; expected_divisions.push_back(0); - for (int i = 0; i<policy::size(); ++i) { + for (int i = 0; i<size; ++i) { if (i%2) { int rank_data[] = { i, i+7, i+8 }; util::append(expected_values, rank_data); @@ -91,16 +88,15 @@ TEST(mpi, gather_all_with_partition) { } } - auto gathered = mpi::gather_all_with_partition(data); + auto gathered = mpi::gather_all_with_partition(data, MPI_COMM_WORLD); EXPECT_EQ(expected_values, gathered.values()); EXPECT_EQ(expected_divisions, gathered.partition()); } TEST(mpi, gather_string) { - using policy = mpi_global_policy; - - int id = policy::id(); + int id = mpi::rank(MPI_COMM_WORLD); + int size = mpi::size(MPI_COMM_WORLD); // Make a string of variable length, with the character // in the string distrubuted as follows @@ -117,10 +113,10 @@ TEST(mpi, gather_string) { auto s = make_string(id); - auto gathered = mpi::gather(s, 0); + auto gathered = mpi::gather(s, 0, MPI_COMM_WORLD); if (!id) { - ASSERT_TRUE(policy::size()==(int)gathered.size()); + ASSERT_TRUE(size==(int)gathered.size()); for (std::size_t i=0; i<gathered.size(); ++i) { EXPECT_EQ(make_string(i), gathered[i]); } @@ -128,14 +124,13 @@ TEST(mpi, gather_string) { } TEST(mpi, gather) { - using policy = mpi_global_policy; - - int id = policy::id(); + int id = mpi::rank(MPI_COMM_WORLD); + int size = mpi::size(MPI_COMM_WORLD); - auto gathered = mpi::gather(id, 0); + auto gathered = mpi::gather(id, 0, MPI_COMM_WORLD); if (!id) { - ASSERT_TRUE(policy::size()==(int)gathered.size()); + ASSERT_TRUE(size==(int)gathered.size()); for (std::size_t i=0; i<gathered.size(); ++i) { EXPECT_EQ(int(i), gathered[i]); } diff --git a/tests/performance/CMakeLists.txt b/tests/performance/CMakeLists.txt deleted file mode 100644 index 351be926371b61bcf130d43efc42e5fb0ff53eee..0000000000000000000000000000000000000000 --- a/tests/performance/CMakeLists.txt +++ /dev/null @@ -1,2 +0,0 @@ -# Unit tests -add_subdirectory(io) diff --git a/tests/performance/io/CMakeLists.txt b/tests/performance/io/CMakeLists.txt deleted file mode 100644 index d6043b491301507e82e76ce1a643aaec6c9c1bec..0000000000000000000000000000000000000000 --- a/tests/performance/io/CMakeLists.txt +++ /dev/null @@ -1,19 +0,0 @@ -set(HEADERS -) - -set(DISK_IO_SOURCES - disk_io.cpp -) - -add_executable(disk_io.exe ${DISK_IO_SOURCES} ${HEADERS}) - -target_link_libraries(disk_io.exe LINK_PUBLIC arbor) -target_link_libraries(disk_io.exe LINK_PUBLIC ${EXTERNAL_LIBRARIES}) - -if(ARB_WITH_MPI) - target_link_libraries(disk_io.exe LINK_PUBLIC ${MPI_C_LIBRARIES}) - set_property(TARGET disk_io.exe APPEND_STRING PROPERTY LINK_FLAGS "${MPI_C_LINK_FLAGS}") -endif() - -# Copy the python file that drives the performance tests and produces the output -file(COPY disk_io.py DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/tests/performance/io/disk_io.cpp b/tests/performance/io/disk_io.cpp deleted file mode 100644 index a3bec2b8caf80fa8a5359046fb6b690920b71c4d..0000000000000000000000000000000000000000 --- a/tests/performance/io/disk_io.cpp +++ /dev/null @@ -1,156 +0,0 @@ -#include <stdio.h> - -#include <fstream> -#include <iostream> -#include <numeric> - -#include <cell.hpp> -#include <cell_group.hpp> -#include <common_types.hpp> -#include <communication/communicator.hpp> -#include <communication/global_policy.hpp> -#include <io/exporter_spike_file.hpp> -#include <profiling/profiler.hpp> -#include <spike.hpp> - -using namespace arb; - -using global_policy = communication::global_policy; -using timer = threading::timer; - -int main(int argc, char** argv) { - - //Setup the possible mpi environment - communication::global_policy_guard global_guard(argc, argv); - - // very simple command line parsing - if (argc < 3) { - std::cout << "disk_io <int nrspikes> <int nr_repeats> [simple_output (false|true)]\n" - << " Simple performance test runner for the exporter manager\n" - << " It exports nrspikes nr_repeats using the export_manager and will produce\n" - << " the total, mean and std of the time needed to perform the output to disk\n\n" - - << " <file_per_rank> true will produce a single file per mpi rank\n" - << " <simple_output> true will produce a simplyfied comma seperated output for automatic parsing\n\n" - - << " The application can be started with mpi support and will produce output on a single rank\n" - << " if nrspikes is not a multiple of the nr of mpi rank, floor is take\n" ; - return 1; - } - auto nr_spikes = atoi(argv[1]); - - if (nr_spikes == 0) { - std::cout << "disk_io <nrspikes>\n"; - std::cout << " nrspikes should be a valid integer higher then zero\n"; - - return 1; - } - auto nr_repeats = atoi(argv[2]); - - if (nr_repeats == 0) { - std::cout << "disk_io <nrspikes>\n"; - std::cout << " nr_repeats should be a valid integer higher then zero\n"; - return 1; - } - - auto simple_stats = false; - if (argc == 4) { - std::string simple(argv[3]); - if (simple == std::string("true")) - { - simple_stats = true; - } - } - - // Create the sut - io::exporter_spike_file<global_policy> exporter( - "spikes", "./", "gdf", true); - - // We need the nr of ranks to calculate the nr of spikes to produce per - // rank - global_policy communication_policy; - - auto nr_ranks = unsigned( communication_policy.size() ); - auto spikes_per_rank = nr_spikes / nr_ranks; - - // Create a set of spikes - std::vector<spike> spikes; - - // ********************************************************************* - // To have a somewhat realworld data set we calculate from the nr of spikes - // (assuming 20 hz average) the number of nr of 'simulated' neurons, - // and create idxs using this value. The number of chars in the number - // influences the size of the output and thus the speed - // Also taken that we have only a single second of simulated time - // all spike times should be between 0.0 and 1.0: - auto simulated_neurons = spikes_per_rank / 20; - for (auto idx = unsigned{ 0 }; idx < spikes_per_rank; ++idx) { - - spikes.push_back({ - {idx % simulated_neurons, 0 }, // correct idx - 0.0f + 1 / (0.05f + idx % 20) - }); // semi random float - } - - double timings_arr[nr_repeats]; - double time_total = 0; - - // now output to disk nr_repeats times, while keeping track of the times - for (auto idx = 0; idx < nr_repeats; ++idx) { - auto time_start = timer::tic(); - exporter.output(spikes); - auto run_time = timer::toc(time_start); - - time_total += run_time; - timings_arr[idx] = run_time; - } - - // create the vector here to prevent changes on the heap influencing the - // timeing - std::vector<double> timings; - for (auto idx = 0; idx < nr_repeats; ++idx) { - timings.push_back(timings_arr[idx]); - } - - - // Calculate some statistics - auto sum = std::accumulate(timings.begin(), timings.end(), 0.0); - auto mean = sum / timings.size(); - - std::vector<double> diff(timings.size()); - std::transform( - timings.begin(), timings.end(), diff.begin(), - std::bind2nd(std::minus<double>(), mean) - ); - auto sq_sum = std::inner_product( - diff.begin(), diff.end(), diff.begin(), - 0.0 - ); - auto stdev = std::sqrt(sq_sum / timings.size()); - - auto min = *std::min_element(timings.begin(), timings.end()); - auto max = *std::max_element(timings.begin(), timings.end()); - - - if (communication_policy.id() != 0) { - return 0; - } - - // and output - if (simple_stats) { - std::cout << time_total<< "," - << mean << "," - << stdev << "," - << min << "," - << max << std::endl; - } - else { - std::cout << "total time (ms): " << time_total << std::endl; - std::cout << "mean time (ms): " << mean << std::endl; - std::cout << "stdev time (ms): " << std::endl; - std::cout << "min time (ms): " << min << std::endl; - std::cout << "max time (ms): " << max << std::endl; - } - - return 0; -} diff --git a/tests/performance/io/disk_io.py b/tests/performance/io/disk_io.py deleted file mode 100644 index ea42304b23e2da851058b8b6d7625f8185ec0db3..0000000000000000000000000000000000000000 --- a/tests/performance/io/disk_io.py +++ /dev/null @@ -1,49 +0,0 @@ -import matplotlib.pyplot as plt -import subprocess -import os - - - -current_script_dir = os.path.dirname(os.path.abspath(__file__)) - -spikes_to_save = 1000000 - -print ( "Simple performance runner for spike output to file. \n" + - str(spikes_to_save) + " spikes will be written to a file and the duration of this \n" + - "operation measured for different number of ranks\n" ) - - -range_nr_rank = [1, 2, 4, 8, 16, 24, 32, 48, 64] -mean = [] -std = [] -min = [] -max = [] -for n_rank in range_nr_rank: - # open the disk_io executable - p1 = subprocess.Popen(["mpirun", "-n",str(n_rank), - os.path.join(current_script_dir, "disk_io.exe"), - str(spikes_to_save), str(10), "true"], - stdout=subprocess.PIPE) - - #and grab the raw stats - stats = p1.communicate()[0] - - # convert into list - stats = stats.split(",") - - mean.append(float(stats[1])) - std.append(float(stats[2])) - min.append(float(stats[3])) - max.append(float(stats[4])) - - print ("performed test for n_rank= " + str(n_rank)) - -print (range_nr_rank) -print (mean) -print (std) - -plt.errorbar(range_nr_rank, mean, yerr=std, fmt='-o', label="mean (std)") -plt.errorbar(range_nr_rank, min, fmt='-', label="min") -plt.errorbar(range_nr_rank, max, fmt='-', label="max") -plt.legend() -plt.show() diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index adaae9002a2a04fcd68e24ba16d132140165107a..dca52faecdd1bf934dc032d09367f23acfce4425 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -76,6 +76,7 @@ set(test_sources test_segment.cpp test_schedule.cpp test_rss_cell.cpp + test_local_context.cpp test_simd.cpp test_span.cpp test_spikes.cpp diff --git a/tests/unit/test.cpp b/tests/unit/test.cpp index d971b2f97e5f3eedf3fede854e7936cd7d32f676..d0a2d43a8a08a211788768511e277b3e30d905cb 100644 --- a/tests/unit/test.cpp +++ b/tests/unit/test.cpp @@ -3,13 +3,9 @@ #include <numeric> #include <vector> -#include <communication/global_policy.hpp> - #include "../gtest.h" int main(int argc, char **argv) { - arb::communication::global_policy_guard g(argc, argv); - ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/tests/unit/test_domain_decomposition.cpp b/tests/unit/test_domain_decomposition.cpp index 4ef308b7f7c80d93a65e286e311dda158b180221..1c58a7fb5443cf92d6dc1b8f1442515058e82d02 100644 --- a/tests/unit/test_domain_decomposition.cpp +++ b/tests/unit/test_domain_decomposition.cpp @@ -3,6 +3,7 @@ #include <stdexcept> #include <backends.hpp> +#include <communication/distributed_context.hpp> #include <domain_decomposition.hpp> #include <hardware/node_info.hpp> #include <load_balance.hpp> @@ -43,8 +44,11 @@ namespace { }; } +// test assumes one domain TEST(domain_decomposition, homogenous_population) { + distributed_context context; + { // Test on a node with 1 cpu core and no gpus. // We assume that all cells will be put into cell groups of size 1. // This assumption will not hold in the future, requiring and update to @@ -52,7 +56,7 @@ TEST(domain_decomposition, homogenous_population) hw::node_info nd(1, 0); unsigned num_cells = 10; - const auto D = partition_load_balance(homo_recipe(num_cells, dummy_cell{}), nd); + const auto D = partition_load_balance(homo_recipe(num_cells, dummy_cell{}), nd, &context); EXPECT_EQ(D.num_global_cells, num_cells); EXPECT_EQ(D.num_local_cells, num_cells); @@ -78,7 +82,7 @@ TEST(domain_decomposition, homogenous_population) hw::node_info nd(1, 1); unsigned num_cells = 10; - const auto D = partition_load_balance(homo_recipe(num_cells, dummy_cell{}), nd); + const auto D = partition_load_balance(homo_recipe(num_cells, dummy_cell{}), nd, &context); EXPECT_EQ(D.num_global_cells, num_cells); EXPECT_EQ(D.num_local_cells, num_cells); @@ -103,6 +107,8 @@ TEST(domain_decomposition, homogenous_population) TEST(domain_decomposition, heterogenous_population) { + distributed_context context; + { // Test on a node with 1 cpu core and no gpus. // We assume that all cells will be put into cell groups of size 1. // This assumption will not hold in the future, requiring and update to @@ -111,7 +117,7 @@ TEST(domain_decomposition, heterogenous_population) unsigned num_cells = 10; auto R = hetero_recipe(num_cells); - const auto D = partition_load_balance(R, nd); + const auto D = partition_load_balance(R, nd, &context); EXPECT_EQ(D.num_global_cells, num_cells); EXPECT_EQ(D.num_local_cells, num_cells); @@ -149,7 +155,7 @@ TEST(domain_decomposition, heterogenous_population) unsigned num_cells = 10; auto R = hetero_recipe(num_cells); - const auto D = partition_load_balance(R, nd); + const auto D = partition_load_balance(R, nd, &context); EXPECT_EQ(D.num_global_cells, num_cells); EXPECT_EQ(D.num_local_cells, num_cells); diff --git a/tests/unit/test_fvm_lowered.cpp b/tests/unit/test_fvm_lowered.cpp index 72f305c0c8f68abb041043f7f017c635678eb564..35244ee76f3ac0b71c891b6891668e6afc919a3b 100644 --- a/tests/unit/test_fvm_lowered.cpp +++ b/tests/unit/test_fvm_lowered.cpp @@ -6,6 +6,7 @@ #include <backends/fvm_types.hpp> #include <backends/multicore/fvm.hpp> #include <backends/multicore/mechanism.hpp> +#include <communication/distributed_context.hpp> #include <cell.hpp> #include <common_types.hpp> #include <fvm_lowered_cell.hpp> @@ -329,8 +330,9 @@ TEST(fvm_lowered, derived_mechs) { float times[] = {10.f, 20.f}; - auto decomp = partition_load_balance(rec, hw::node_info{1u, 0u}); - simulation sim(rec, decomp); + distributed_context context; + auto decomp = partition_load_balance(rec, hw::node_info{1u, 0u}, &context); + simulation sim(rec, decomp, &context); sim.add_sampler(all_probes, explicit_schedule(times), sampler); sim.run(30.0, 1.f/1024); @@ -365,9 +367,9 @@ TEST(fvm_lowered, weighted_write_ion) { // // Geometry: // soma 0: radius 5 µm - // dend 1: 100 µm long, 1 µm diameter cynlinder - // dend 2: 200 µm long, 1 µm diameter cynlinder - // dend 3: 100 µm long, 1 µm diameter cynlinder + // dend 1: 100 µm long, 1 µm diameter cylinder + // dend 2: 200 µm long, 1 µm diameter cylinder + // dend 3: 100 µm long, 1 µm diameter cylinder // // The radius of the soma is chosen such that the surface area of soma is // the same as a 100µm dendrite, which makes it easier to describe the diff --git a/tests/unit/test_lif_cell_group.cpp b/tests/unit/test_lif_cell_group.cpp index 44033c473122f635a7ead309b6a56b301bdf4bce..b931d7034043a5cfb7addb38ffc77e52fdb7a9c4 100644 --- a/tests/unit/test_lif_cell_group.cpp +++ b/tests/unit/test_lif_cell_group.cpp @@ -1,5 +1,6 @@ #include "../gtest.h" #include <cell_group_factory.hpp> +#include <communication/distributed_context.hpp> #include <fstream> #include <lif_cell_description.hpp> #include <lif_cell_group.hpp> @@ -154,14 +155,16 @@ TEST(lif_cell_group, recipe) } TEST(lif_cell_group, spikes) { + distributed_context context; + // make two lif cells path_recipe recipe(2, 1000, 0.1); hw::node_info nd; nd.num_cpu_cores = threading::num_threads(); - auto decomp = partition_load_balance(recipe, nd); - simulation sim(recipe, decomp); + auto decomp = partition_load_balance(recipe, nd, &context); + simulation sim(recipe, decomp, &context); std::vector<postsynaptic_spike_event> events; @@ -199,11 +202,12 @@ TEST(lif_cell_group, ring) // Total simulation time. time_type simulation_time = 100; + distributed_context context; auto recipe = ring_recipe(num_lif_cells, weight, delay); - auto decomp = partition_load_balance(recipe, nd); + auto decomp = partition_load_balance(recipe, nd, &context); // Creates a simulation with a ring recipe of lif neurons - simulation sim(recipe, decomp); + simulation sim(recipe, decomp, &context); std::vector<spike> spike_buffer; diff --git a/tests/unit/test_local_context.cpp b/tests/unit/test_local_context.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1b02173d5375d34d58b0ba0cb0ccbc6cd46914b6 --- /dev/null +++ b/tests/unit/test_local_context.cpp @@ -0,0 +1,78 @@ +#include <vector> + +#include "../gtest.h" +#include <communication/distributed_context.hpp> +#include <spike.hpp> + +// Test that there are no errors constructing a distributed_context from a local_context +TEST(local_context, construct_distributed_context) +{ + arb::distributed_context ctx = arb::local_context(); +} + +TEST(local_context, size_rank) +{ + arb::local_context ctx; + + EXPECT_EQ(ctx.size(), 1); + EXPECT_EQ(ctx.id(), 0); +} + +TEST(local_context, minmax) +{ + arb::local_context ctx; + + EXPECT_EQ(1., ctx.min(1.)); + EXPECT_EQ(1., ctx.max(1.)); + + EXPECT_EQ(1.f, ctx.min(1.f)); + EXPECT_EQ(1.f, ctx.max(1.f)); + + int32_t one32 = 1; + EXPECT_EQ(one32, ctx.min(one32)); + EXPECT_EQ(one32, ctx.max(one32)); + + int64_t one64 = 1; + EXPECT_EQ(one64, ctx.min(one64)); + EXPECT_EQ(one64, ctx.max(one64)); +} + +TEST(local_context, sum) +{ + arb::local_context ctx; + + EXPECT_EQ(42., ctx.min(42.)); + EXPECT_EQ(42.f, ctx.min(42.)); + EXPECT_EQ(42, ctx.sum(42)); + EXPECT_EQ(42u, ctx.min(42u)); +} + +TEST(local_context, gather) +{ + arb::local_context ctx; + + EXPECT_EQ(std::vector<int>{42}, ctx.gather(42, 0)); + EXPECT_EQ(std::vector<double>{42}, ctx.gather(42., 0)); + EXPECT_EQ(std::vector<std::string>{"42"}, ctx.gather(std::string("42"), 0)); +} + +TEST(local_context, gather_spikes) +{ + arb::local_context ctx; + using svec = std::vector<arb::spike>; + + svec spikes = { + {{0u,3u}, 42.f}, + {{1u,2u}, 42.f}, + {{2u,1u}, 42.f}, + {{3u,0u}, 42.f}, + }; + + auto s = ctx.gather_spikes(spikes); + + auto& part = s.partition(); + EXPECT_EQ(s.values(), spikes); + EXPECT_EQ(part.size(), 2u); + EXPECT_EQ(part[0], 0u); + EXPECT_EQ(part[1], spikes.size()); +} diff --git a/tests/validation/validate.cpp b/tests/validation/validate.cpp index 2a8eec47366d71e6f7c785fb3d229f8d52666656..dec1edaf77cc17a1f8d6510da213bb39285dc80d 100644 --- a/tests/validation/validate.cpp +++ b/tests/validation/validate.cpp @@ -4,7 +4,6 @@ #include <string> #include <exception> -#include <communication/global_policy.hpp> #include <tinyopt.hpp> #include "../gtest.h" @@ -34,7 +33,6 @@ const char* usage_str = int main(int argc, char **argv) { using to::parse_opt; - communication::global_policy_guard global_guard(argc, argv); ::testing::InitGoogleTest(&argc, argv); try { diff --git a/tests/validation/validate_ball_and_stick.cpp b/tests/validation/validate_ball_and_stick.cpp index 2d6d429011cc2e8a4c92c49cabfc400644dda848..d7365330ddb6c6d9c819102e9b526c95c8199937 100644 --- a/tests/validation/validate_ball_and_stick.cpp +++ b/tests/validation/validate_ball_and_stick.cpp @@ -75,9 +75,10 @@ void run_ncomp_convergence_test( rec.add_probe(0, 0, cell_probe_address{p.where, cell_probe_address::membrane_voltage}); } + distributed_context context; hw::node_info nd(1, backend==backend_kind::gpu? 1: 0); - auto decomp = partition_load_balance(rec, nd); - simulation sim(rec, decomp); + auto decomp = partition_load_balance(rec, nd, &context); + simulation sim(rec, decomp, &context); runner.run(sim, ncomp, sample_dt, t_end, dt, exclude); } diff --git a/tests/validation/validate_kinetic.cpp b/tests/validation/validate_kinetic.cpp index f501f4fbd59ede1cbe3ecea22ae99abe4c1763f8..0fe78a426252f27daed6ff8b0915cdc2bde54e13 100644 --- a/tests/validation/validate_kinetic.cpp +++ b/tests/validation/validate_kinetic.cpp @@ -41,9 +41,10 @@ void run_kinetic_dt( convergence_test_runner<float> runner("dt", plabels, meta); runner.load_reference_data(ref_file); + distributed_context context; hw::node_info nd(1, backend==backend_kind::gpu? 1: 0); - auto decomp = partition_load_balance(rec, nd); - simulation sim(rec, decomp); + auto decomp = partition_load_balance(rec, nd, &context); + simulation sim(rec, decomp, &context); auto exclude = stimulus_ends(c); diff --git a/tests/validation/validate_soma.cpp b/tests/validation/validate_soma.cpp index a5a6f6a2c052ac6445d31230240e0c8f61c48c2b..7ccfb7580359d7c1c2d6040d621cb11f43fee6ea 100644 --- a/tests/validation/validate_soma.cpp +++ b/tests/validation/validate_soma.cpp @@ -30,9 +30,10 @@ void validate_soma(backend_kind backend) { rec.add_probe(0, 0, cell_probe_address{{0, 0.5}, cell_probe_address::membrane_voltage}); probe_label plabels[1] = {"soma.mid", {0u, 0u}}; + distributed_context context; hw::node_info nd(1, backend==backend_kind::gpu? 1: 0); - auto decomp = partition_load_balance(rec, nd); - simulation sim(rec, decomp); + auto decomp = partition_load_balance(rec, nd, &context); + simulation sim(rec, decomp, &context); nlohmann::json meta = { {"name", "membrane voltage"}, diff --git a/tests/validation/validate_synapses.cpp b/tests/validation/validate_synapses.cpp index e69934ecc22e63c18f657cafa90a95a488e606ae..9a2eb22e16b442bfd9365301a0d864feb6e4d71e 100644 --- a/tests/validation/validate_synapses.cpp +++ b/tests/validation/validate_synapses.cpp @@ -60,6 +60,7 @@ void run_synapse_test( convergence_test_runner<int> runner("ncomp", plabels, meta); runner.load_reference_data(ref_data_path); + distributed_context context; hw::node_info nd(1, backend==backend_kind::gpu? 1: 0); for (int ncomp = 10; ncomp<max_ncomp; ncomp*=2) { c.cable(1)->set_compartments(ncomp); @@ -72,8 +73,8 @@ void run_synapse_test( // dend.end rec.add_probe(0, 0, cell_probe_address{{1, 1.0}, cell_probe_address::membrane_voltage}); - auto decomp = partition_load_balance(rec, nd); - simulation sim(rec, decomp); + auto decomp = partition_load_balance(rec, nd, &context); + simulation sim(rec, decomp, &context); sim.inject_events(synthetic_events);