From 7a6c10314f1244381b4d30370f3e9e87878610f1 Mon Sep 17 00:00:00 2001 From: noraabiakar <nora.abiakar@gmail.com> Date: Tue, 24 Jul 2018 15:27:31 +0200 Subject: [PATCH] task_system as part of an execution_context (#537) - Task system is no longer a single system private to the implementation of the threading backend and used everywhere. A separate task_system can be used (with a specified number of threads) for every simulation. - arb::execution_context is the interface to task_system and the previously defined distributed_context - TBB and serial support has been removed. Cthreads is the only threading backend available. --- .gitmodules | 3 - .travis.yml | 11 +- .ycm_extra_conf.py | 1 - CMakeLists.txt | 27 +--- arbor/CMakeLists.txt | 6 +- arbor/communication/communicator.hpp | 19 +-- arbor/partition_load_balance.cpp | 8 +- arbor/profile/meter_manager.cpp | 1 - arbor/simulation.cpp | 25 ++-- arbor/thread_private_spike_store.cpp | 11 +- arbor/thread_private_spike_store.hpp | 4 + arbor/threading/cthread.cpp | 14 +- arbor/threading/cthread_impl.hpp | 49 ++++--- arbor/threading/serial.hpp | 122 ------------------ arbor/threading/tbb.hpp | 61 --------- arbor/threading/threading.cpp | 4 - arbor/threading/threading.hpp | 13 -- arbor/util/double_buffer.hpp | 11 +- arbor/util/range.hpp | 40 ------ cmake/FindTBB.cmake | 60 --------- example/bench/bench.cpp | 12 +- example/brunel/brunel_miniapp.cpp | 24 ++-- example/generators/event_gen.cpp | 2 +- example/miniapp/miniapp.cpp | 26 ++-- ext/CMakeLists.txt | 34 ----- ext/tbb | 1 - include/CMakeLists.txt | 5 - include/arbor/execution_context.hpp | 29 +++++ include/arbor/load_balance.hpp | 4 +- include/arbor/simulation.hpp | 4 +- scripts/travis/build.sh | 2 +- test/ubench/task_system.cpp | 13 +- test/unit-distributed/test.cpp | 13 +- test/unit-distributed/test.hpp | 4 +- test/unit-distributed/test_communicator.cpp | 39 +++--- .../test_domain_decomposition.cpp | 8 +- test/unit/test_algorithms.cpp | 11 +- test/unit/test_domain_decomposition.cpp | 8 +- test/unit/test_fvm_lowered.cpp | 2 +- test/unit/test_lif_cell_group.cpp | 5 +- test/unit/test_range.cpp | 51 -------- test/unit/test_spike_store.cpp | 10 +- test/unit/test_thread.cpp | 50 +++---- test/validation/validate_ball_and_stick.cpp | 2 +- test/validation/validate_kinetic.cpp | 2 +- test/validation/validate_soma.cpp | 2 +- test/validation/validate_synapses.cpp | 2 +- 47 files changed, 242 insertions(+), 613 deletions(-) delete mode 100644 arbor/threading/serial.hpp delete mode 100644 arbor/threading/tbb.hpp delete mode 100644 cmake/FindTBB.cmake delete mode 160000 ext/tbb create mode 100644 include/arbor/execution_context.hpp diff --git a/.gitmodules b/.gitmodules index 3809b6b0..177f10db 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,6 +4,3 @@ [submodule "sphinx_rtd_theme"] path = ext/sphinx_rtd_theme url = https://github.com/rtfd/sphinx_rtd_theme.git -[submodule "tbb"] - path = ext/tbb - url = https://github.com/wjakob/tbb.git diff --git a/.travis.yml b/.travis.yml index dfb339db..baa6baaa 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,13 +14,10 @@ addons: - libopenmpi-dev env: - # test single node/rank with different threading back ends - - BUILD_NAME=serial WITH_THREAD=serial WITH_DISTRIBUTED=serial - - BUILD_NAME=cthread WITH_THREAD=cthread WITH_DISTRIBUTED=serial - - BUILD_NAME=tbb WITH_THREAD=tbb WITH_DISTRIBUTED=serial - # test mpi - - BUILD_NAME=mpi WITH_THREAD=cthread WITH_DISTRIBUTED=mpi - - BUILD_NAME=mpitbb WITH_THREAD=tbb WITH_DISTRIBUTED=mpi + # test single node/rank with threading backend + - BUILD_NAME=cthread WITH_DISTRIBUTED=serial + # test mpi with threading backend + - BUILD_NAME=mpi WITH_DISTRIBUTED=mpi before_install: - CC=gcc-6 diff --git a/.ycm_extra_conf.py b/.ycm_extra_conf.py index adb3c3ea..4c01f7ca 100644 --- a/.ycm_extra_conf.py +++ b/.ycm_extra_conf.py @@ -36,7 +36,6 @@ import ycm_core # CHANGE THIS LIST OF FLAGS. YES, THIS IS THE DROID YOU HAVE BEEN LOOKING FOR. flags = [ '-DNDEBUG', - '-DARB_HAVE_CTHREAD', '-std=c++11', '-x', 'c++', diff --git a/CMakeLists.txt b/CMakeLists.txt index 7307476e..6ff82e3a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,10 +21,6 @@ set(ARB_ARCH "" CACHE STRING "Target architecture for arbor libraries") option(ARB_VECTORIZE "use explicit SIMD code in generated mechanisms" OFF) -# Use in-tree TBB? - -option(ARB_PRIVATE_TBBLIB "build and link against in-tree TBB build" OFF) - # Use externally built modcc? set(ARB_MODCC "" CACHE STRING "path to external modcc NMODL compiler") @@ -42,9 +38,6 @@ set(ARB_VALIDATION_DATA_DIR "${PROJECT_SOURCE_DIR}/validation/data" CACHE PATH # Configure-time features for Arbor: #---------------------------------------------------------- -set(ARB_THREADING_MODEL "cthread" CACHE STRING "set the threading model, one of cthread/tbb/serial") -set_property(CACHE ARB_THREADING_MODEL PROPERTY STRINGS cthread tbb serial ) - option(ARB_WITH_MPI "build with MPI support" OFF) option(ARB_WITH_PROFILING "use built-in profiling" OFF) @@ -115,13 +108,13 @@ set(CMAKE_CXX_STANDARD 14) add_library(arbor-private-deps INTERFACE) # Interface library `arbor-public-deps` collects requirements for the -# users of the arbor library (e.g. tbb, mpi) that will become part +# users of the arbor library (e.g. mpi) that will become part # of arbor's PUBLIC interface. add_library(arbor-public-deps INTERFACE) -# External libraries in `ext` sub-directory: json, tclap and tbb. -# Creates interface libraries `ext-json`, `ext-tclap` and `ext-tbb`. +# External libraries in `ext` sub-directory: json and tclap. +# Creates interface libraries `ext-json` and `ext-tclap`. add_subdirectory(ext) @@ -150,17 +143,9 @@ endif() # Threading model #----------------- -if(ARB_THREADING_MODEL MATCHES "tbb") - set(ARB_WITH_TBB TRUE) - target_link_libraries(arbor-public-deps INTERFACE ext-tbb) - target_compile_definitions(arbor-private-deps INTERFACE ARB_HAVE_TBB) -elseif(ARB_THREADING_MODEL MATCHES "cthread") - set(ARB_WITH_CTHREAD TRUE) - find_package(Threads REQUIRED) - find_threads_cuda_fix() - target_compile_definitions(arbor-private-deps INTERFACE ARB_HAVE_CTHREAD) - target_link_libraries(arbor-private-deps INTERFACE Threads::Threads) -endif() +find_package(Threads REQUIRED) +find_threads_cuda_fix() +target_link_libraries(arbor-private-deps INTERFACE Threads::Threads) # MPI support #------------------- diff --git a/arbor/CMakeLists.txt b/arbor/CMakeLists.txt index aef98060..ec612bbc 100644 --- a/arbor/CMakeLists.txt +++ b/arbor/CMakeLists.txt @@ -40,6 +40,7 @@ set(arbor_sources spike_source_cell_group.cpp swcio.cpp threadinfo.cpp + threading/cthread.cpp threading/threading.cpp thread_private_spike_store.cpp util/hostname.cpp @@ -73,11 +74,6 @@ if(ARB_WITH_MPI) communication/mpi_context.cpp) endif() -if(ARB_WITH_CTHREAD) - list(APPEND arbor_sources - threading/cthread.cpp) -endif() - # Add special target for private include directory, for use by arbor target # and arbor unit tests. Private headers are also used for the other binaries # until the process of splitting our private and public headers is complete. diff --git a/arbor/communication/communicator.hpp b/arbor/communication/communicator.hpp index 78aa7e4c..70b79ad7 100644 --- a/arbor/communication/communicator.hpp +++ b/arbor/communication/communicator.hpp @@ -44,10 +44,12 @@ public: explicit communicator(const recipe& rec, const domain_decomposition& dom_dec, - const distributed_context* ctx) + const execution_context* ctx) { - context_ = ctx; - num_domains_ = context_->size(); + distributed_ = &ctx->distributed; + thread_pool_ = ctx->thread_pool; + + num_domains_ = distributed_->size(); num_local_groups_ = dom_dec.groups.size(); num_local_cells_ = dom_dec.num_local_cells; @@ -82,7 +84,7 @@ public: // Build the connection information for local cells in parallel. std::vector<gid_info> gid_infos; gid_infos.resize(num_local_cells_); - threading::parallel_for::apply(0, gids.size(), + threading::parallel_for::apply(0, gids.size(), thread_pool_.get(), [&](cell_size_type i) { auto gid = gids[i]; gid_infos[i] = gid_info(gid, i, rec.connections_on(gid)); @@ -125,7 +127,7 @@ public: // Sort the connections for each domain. // This is num_domains_ independent sorts, so it can be parallelized trivially. const auto& cp = connection_part_; - threading::parallel_for::apply(0, num_domains_, + threading::parallel_for::apply(0, num_domains_, thread_pool_.get(), [&](cell_size_type i) { util::sort(util::subrange_view(connections_, cp[i], cp[i+1])); }); @@ -144,7 +146,7 @@ public: local_min = std::min(local_min, con.delay()); } - return context_->min(local_min); + return distributed_->min(local_min); } /// Perform exchange of spikes. @@ -159,7 +161,7 @@ public: PE(communication_exchange_gather); // global all-to-all to gather a local copy of the global spike list on each node. - auto global_spikes = context_->gather_spikes(local_spikes); + auto global_spikes = distributed_->gather_spikes(local_spikes); num_spikes_ += global_spikes.size(); PL(); @@ -259,7 +261,8 @@ private: std::vector<cell_size_type> index_divisions_; util::partition_view_type<std::vector<cell_size_type>> index_part_; - const distributed_context* context_; + const distributed_context* distributed_; + task_system_handle thread_pool_; std::uint64_t num_spikes_ = 0u; }; diff --git a/arbor/partition_load_balance.cpp b/arbor/partition_load_balance.cpp index e6a80892..f9198140 100644 --- a/arbor/partition_load_balance.cpp +++ b/arbor/partition_load_balance.cpp @@ -1,7 +1,7 @@ -#include <arbor/distributed_context.hpp> #include <arbor/domain_decomposition.hpp> #include <arbor/load_balance.hpp> #include <arbor/recipe.hpp> +#include <arbor/execution_context.hpp> #include "cell_group_factory.hpp" #include "util/maputil.hpp" @@ -13,7 +13,7 @@ namespace arb { domain_decomposition partition_load_balance( const recipe& rec, proc_allocation nd, - const distributed_context* ctx, + const execution_context* ctx, partition_hint_map hint_map) { struct partition_gid_domain { @@ -31,8 +31,8 @@ domain_decomposition partition_load_balance( using util::make_span; - unsigned num_domains = ctx->size(); - unsigned domain_id = ctx->id(); + unsigned num_domains = ctx->distributed.size(); + unsigned domain_id = ctx->distributed.id(); auto num_global_cells = rec.num_cells(); auto dom_size = [&](unsigned dom) -> cell_gid_type { diff --git a/arbor/profile/meter_manager.cpp b/arbor/profile/meter_manager.cpp index 52542035..c5663700 100644 --- a/arbor/profile/meter_manager.cpp +++ b/arbor/profile/meter_manager.cpp @@ -1,6 +1,5 @@ #include <arbor/profile/timer.hpp> -#include <arbor/distributed_context.hpp> #include <arbor/profile/meter_manager.hpp> #include "memory_meter.hpp" diff --git a/arbor/simulation.cpp b/arbor/simulation.cpp index f0c90f77..dcef80c4 100644 --- a/arbor/simulation.cpp +++ b/arbor/simulation.cpp @@ -37,6 +37,9 @@ public: // current: spikes generated in the current interval // previous: spikes generated in the preceding interval + spike_double_buffer(thread_private_spike_store l, thread_private_spike_store r): + buffer_(std::move(l), std::move(r)) {} + thread_private_spike_store& current() { return buffer_.get(); } thread_private_spike_store& previous() { return buffer_.other(); } void exchange() { buffer_.exchange(); } @@ -44,7 +47,7 @@ public: class simulation_state { public: - simulation_state(const recipe& rec, const domain_decomposition& decomp, const distributed_context* ctx); + simulation_state(const recipe& rec, const domain_decomposition& decomp, const execution_context* ctx); void reset(); @@ -96,6 +99,8 @@ private: communicator communicator_; + task_system_handle task_system_; + // Pending events to be delivered. std::array<std::vector<pse_vector>, 2> event_lanes_; std::vector<pse_vector> pending_events_; @@ -106,7 +111,7 @@ private: // Apply a functional to each cell group in parallel. template <typename L> void foreach_group(L&& fn) { - threading::parallel_for::apply(0, cell_groups_.size(), + threading::parallel_for::apply(0, cell_groups_.size(), task_system_.get(), [&, fn = std::forward<L>(fn)](int i) { fn(cell_groups_[i]); }); } @@ -114,7 +119,7 @@ private: // the cell group pointer reference and index. template <typename L> void foreach_group_index(L&& fn) { - threading::parallel_for::apply(0, cell_groups_.size(), + threading::parallel_for::apply(0, cell_groups_.size(), task_system_.get(), [&, fn = std::forward<L>(fn)](int i) { fn(cell_groups_[i], i); }); } }; @@ -122,10 +127,12 @@ private: simulation_state::simulation_state( const recipe& rec, const domain_decomposition& decomp, - const distributed_context* ctx + const execution_context* ctx ): - local_spikes_(new spike_double_buffer{}), - communicator_(rec, decomp, ctx) + local_spikes_(new spike_double_buffer(thread_private_spike_store(ctx->thread_pool), + thread_private_spike_store(ctx->thread_pool))), + communicator_(rec, decomp, ctx), + task_system_(ctx->thread_pool) { const auto num_local_cells = communicator_.num_local_cells(); @@ -269,7 +276,7 @@ time_type simulation_state::run(time_type tfinal, time_type dt) { // run the tasks, overlapping if the threading model and number of // available threads permits it. - threading::task_group g; + threading::task_group g(task_system_.get()); g.run(exchange); g.run(update_cells); g.wait(); @@ -297,7 +304,7 @@ time_type simulation_state::run(time_type tfinal, time_type dt) { // pending_events : take all events void simulation_state::setup_events(time_type t_from, time_type t_to, std::size_t epoch) { const auto n = communicator_.num_local_cells(); - threading::parallel_for::apply(0, n, + threading::parallel_for::apply(0, n, task_system_.get(), [&](cell_size_type i) { merge_events( t_from, t_to, @@ -361,7 +368,7 @@ void simulation_state::inject_events(const pse_vector& events) { simulation::simulation( const recipe& rec, const domain_decomposition& decomp, - const distributed_context* ctx) + const execution_context* ctx) { impl_.reset(new simulation_state(rec, decomp, ctx)); } diff --git a/arbor/thread_private_spike_store.cpp b/arbor/thread_private_spike_store.cpp index 883d234c..e6a1f92e 100644 --- a/arbor/thread_private_spike_store.cpp +++ b/arbor/thread_private_spike_store.cpp @@ -9,11 +9,15 @@ namespace arb { struct local_spike_store_type { threading::enumerable_thread_specific<std::vector<spike>> buffers_; + + local_spike_store_type(const task_system_handle& ts): buffers_(ts) {}; }; -thread_private_spike_store::thread_private_spike_store(): - impl_(new local_spike_store_type) -{} +thread_private_spike_store::thread_private_spike_store(thread_private_spike_store&& t): impl_(std::move(t.impl_)) {}; + +thread_private_spike_store::thread_private_spike_store(const task_system_handle& ts): + impl_(new local_spike_store_type(ts)) { +} thread_private_spike_store::~thread_private_spike_store() {} @@ -41,5 +45,4 @@ void thread_private_spike_store::clear() { b.clear(); } } - } // namespace arb diff --git a/arbor/thread_private_spike_store.hpp b/arbor/thread_private_spike_store.hpp index 9e1829ef..eec50709 100644 --- a/arbor/thread_private_spike_store.hpp +++ b/arbor/thread_private_spike_store.hpp @@ -5,6 +5,7 @@ #include <arbor/common_types.hpp> #include <arbor/spike.hpp> +#include <arbor/execution_context.hpp> #include "threading/threading.hpp" @@ -23,6 +24,9 @@ public : thread_private_spike_store(); ~thread_private_spike_store(); + thread_private_spike_store(thread_private_spike_store&& t); + thread_private_spike_store(const task_system_handle& ts); + /// Collate all of the individual buffers into a single vector of spikes. /// Does not modify the buffer contents. std::vector<spike> gather() const; diff --git a/arbor/threading/cthread.cpp b/arbor/threading/cthread.cpp index ae7c9d56..e613beac 100644 --- a/arbor/threading/cthread.cpp +++ b/arbor/threading/cthread.cpp @@ -7,6 +7,7 @@ #include "cthread.hpp" #include "threading.hpp" +#include "arbor/execution_context.hpp" using namespace arb::threading::impl; using namespace arb::threading; @@ -119,15 +120,12 @@ int task_system::get_num_threads() { return threads_.size() + 1; } -std::size_t task_system::get_current_thread() { - std::thread::id tid = std::this_thread::get_id(); - return thread_ids_[tid]; -} +std::unordered_map<std::thread::id, std::size_t> task_system::get_thread_ids() { + return thread_ids_; +}; -task_system& task_system::get_global_task_system() { - auto num_threads = threading::num_threads(); - static task_system global_task_system(num_threads); - return global_task_system; +task_system_handle arb::make_thread_pool(int nthreads) { + return task_system_handle(new task_system(nthreads)); } diff --git a/arbor/threading/cthread_impl.hpp b/arbor/threading/cthread_impl.hpp index ba4fe009..6899691d 100644 --- a/arbor/threading/cthread_impl.hpp +++ b/arbor/threading/cthread_impl.hpp @@ -20,6 +20,7 @@ #include <type_traits> #include <cstdlib> +#include "arbor/execution_context.hpp" namespace arb { namespace threading { @@ -97,19 +98,17 @@ public: // Includes master thread. int get_num_threads(); - // Get a stable integer for the current thread that is [0, nthreads). - std::size_t get_current_thread(); - - // Singleton constructor - needed to order construction with other singletons. TODO - static task_system& get_global_task_system(); + // Returns the thread_id map + std::unordered_map<std::thread::id, std::size_t> get_thread_ids(); }; /////////////////////////////////////////////////////////////////////// // types /////////////////////////////////////////////////////////////////////// + template <typename T> class enumerable_thread_specific { - task_system& global_task_system; + std::unordered_map<std::thread::id, std::size_t> thread_ids_; using storage_class = std::vector<T>; storage_class data; @@ -118,21 +117,21 @@ public: using iterator = typename storage_class::iterator; using const_iterator = typename storage_class::const_iterator; - enumerable_thread_specific(): - global_task_system{task_system::get_global_task_system()}, - data{std::vector<T>(global_task_system.get_num_threads())} + enumerable_thread_specific(const task_system_handle& ts): + thread_ids_{ts.get()->get_thread_ids()}, + data{std::vector<T>(ts.get()->get_num_threads())} {} - enumerable_thread_specific(const T& init): - global_task_system{task_system::get_global_task_system()}, - data{std::vector<T>(global_task_system.get_num_threads(), init)} + enumerable_thread_specific(const T& init, const task_system_handle& ts): + thread_ids_{ts.get()->get_thread_ids()}, + data{std::vector<T>(ts.get()->get_num_threads(), init)} {} T& local() { - return data[global_task_system.get_current_thread()]; + return data[thread_ids_.at(std::this_thread::get_id())]; } const T& local() const { - return data[global_task_system.get_current_thread()]; + return data[thread_ids_.at(std::this_thread::get_id())]; } auto size() const { return data.size(); } @@ -156,11 +155,13 @@ constexpr bool multithreaded() { return true; } class task_group { private: std::atomic<std::size_t> in_flight_{0}; - task_system& task_system_; + /// We use a raw pointer here instead of a shared_ptr to avoid a race condition + /// on the destruction of a task_system that would lead to a thread trying to join itself + task_system* task_system_; public: - task_group(): - task_system_{task_system::get_global_task_system()} + task_group(task_system* ts): + task_system_{ts} {} task_group(const task_group&) = delete; @@ -209,14 +210,13 @@ public: template<typename F> void run(F&& f) { ++in_flight_; - - task_system_.async(make_wrapped_function(std::forward<F>(f), in_flight_)); + task_system_->async(make_wrapped_function(std::forward<F>(f), in_flight_)); } // wait till all tasks in this group are done void wait() { while (in_flight_) { - task_system_.try_run_task(); + task_system_->try_run_task(); } } @@ -231,18 +231,13 @@ public: /////////////////////////////////////////////////////////////////////// struct parallel_for { template <typename F> - static void apply(int left, int right, F f) { - task_group g; + static void apply(int left, int right, task_system* ts, F f) { + task_group g(ts); for (int i = left; i < right; ++i) { g.run([=] {f(i);}); } g.wait(); } }; - -inline std::size_t thread_id() { - return task_system::get_global_task_system().get_current_thread(); -} - } // namespace threading } // namespace arb diff --git a/arbor/threading/serial.hpp b/arbor/threading/serial.hpp deleted file mode 100644 index af3b0594..00000000 --- a/arbor/threading/serial.hpp +++ /dev/null @@ -1,122 +0,0 @@ -#pragma once - -#include <algorithm> -#include <array> -#include <chrono> -#include <string> -#include <vector> - -namespace arb { -namespace threading { -inline namespace serial { - -/////////////////////////////////////////////////////////////////////// -// types -/////////////////////////////////////////////////////////////////////// -template <typename T> -class enumerable_thread_specific { - std::array<T, 1> data; - -public : - using iterator = typename std::array<T, 1>::iterator; - using const_iterator = typename std::array<T, 1>::const_iterator; - - enumerable_thread_specific() = default; - - enumerable_thread_specific(const T& init) : - data{init} - {} - - enumerable_thread_specific(T&& init) : - data{std::move(init)} - {} - - T& local() { return data[0]; } - const T& local() const { return data[0]; } - - auto size() const { return data.size(); } - - iterator begin() { return data.begin(); } - iterator end() { return data.end(); } - - const_iterator begin() const { return data.begin(); } - const_iterator end() const { return data.end(); } - - const_iterator cbegin() const { return data.cbegin(); } - const_iterator cend() const { return data.cend(); } -}; - - -/////////////////////////////////////////////////////////////////////// -// algorithms -/////////////////////////////////////////////////////////////////////// -struct parallel_for { - template <typename F> - static void apply(int left, int right, F f) { - for(int i=left; i<right; ++i) { - f(i); - } - } -}; - -template <typename RandomIt> -void sort(RandomIt begin, RandomIt end) { - std::sort(begin, end); -} - -template <typename RandomIt, typename Compare> -void sort(RandomIt begin, RandomIt end, Compare comp) { - std::sort(begin, end, comp); -} - -template <typename Container> -void sort(Container& c) { - std::sort(c.begin(), c.end()); -} - -template <typename T> -using parallel_vector = std::vector<T>; - -inline std::string description() { - return "serial"; -} - -constexpr bool multithreaded() { return false; } - -inline std::size_t thread_id() { - return 0; -} - -/// Proxy for tbb task group. -/// The tbb version launches tasks asynchronously, returning control to the -/// caller. The serial version implemented here simply runs the task, before -/// returning control, effectively serializing all asynchronous calls. -class task_group { -public: - task_group() = default; - - template<typename Func> - void run(const Func& f) { - f(); - } - - template<typename Func> - void run_and_wait(const Func& f) { - f(); - } - - void wait() - {} - - bool is_canceling() { - return false; - } - - void cancel() - {} -}; - -} // namespace serial -} // namespace threading -} // namespace arb - diff --git a/arbor/threading/tbb.hpp b/arbor/threading/tbb.hpp deleted file mode 100644 index b4c2af94..00000000 --- a/arbor/threading/tbb.hpp +++ /dev/null @@ -1,61 +0,0 @@ -#pragma once - -#include <atomic> -#include <string> - -#include <tbb/tbb.h> -#include <tbb/tbb_stddef.h> -#include <tbb/compat/thread> -#include <tbb/enumerable_thread_specific.h> - -namespace arb { -namespace threading { -inline namespace tbb { - -template <typename T> -using enumerable_thread_specific = ::tbb::enumerable_thread_specific<T>; - -struct parallel_for { - template <typename F> - static void apply(int left, int right, F f) { - ::tbb::parallel_for(left, right, f); - } -}; - -inline std::string description() { - return "TBBv" + std::to_string(::tbb::TBB_runtime_interface_version()); -} - -constexpr bool multithreaded() { return true; } - -template <typename T> -using parallel_vector = ::tbb::concurrent_vector<T>; - -using task_group = ::tbb::task_group; - -inline -std::size_t thread_id() { - static std::atomic<std::size_t> num_threads(0); - thread_local std::size_t thread_id = num_threads++; - return thread_id; -} - -template <typename RandomIt> -void sort(RandomIt begin, RandomIt end) { - ::tbb::parallel_sort(begin, end); -} - -template <typename RandomIt, typename Compare> -void sort(RandomIt begin, RandomIt end, Compare comp) { - ::tbb::parallel_sort(begin, end, comp); -} - -template <typename Container> -void sort(Container& c) { - ::tbb::parallel_sort(c.begin(), c.end()); -} - -} // namespace tbb -} // namespace threading -} // namespace arb - diff --git a/arbor/threading/threading.cpp b/arbor/threading/threading.cpp index a05bf1bd..54857415 100644 --- a/arbor/threading/threading.cpp +++ b/arbor/threading/threading.cpp @@ -78,12 +78,8 @@ std::size_t num_threads_init() { // number of threads. size_t num_threads() { // TODO: this is a bit of a hack until we have user-configurable threading. -#if defined(ARB_HAVE_SERIAL) - return 1; -#else static size_t num_threads_cached = num_threads_init(); return num_threads_cached; -#endif } } // namespace threading diff --git a/arbor/threading/threading.hpp b/arbor/threading/threading.hpp index 8150ca6a..d6fe7b93 100644 --- a/arbor/threading/threading.hpp +++ b/arbor/threading/threading.hpp @@ -25,17 +25,4 @@ size_t num_threads(); } // namespace threading } // namespace arb -#if defined(ARB_HAVE_TBB) - -#include "tbb.hpp" - -#elif defined(ARB_HAVE_CTHREAD) - #include "cthread.hpp" - -#else - -#define ARB_HAVE_SERIAL -#include "serial.hpp" - -#endif diff --git a/arbor/util/double_buffer.hpp b/arbor/util/double_buffer.hpp index f4520098..67afecde 100644 --- a/arbor/util/double_buffer.hpp +++ b/arbor/util/double_buffer.hpp @@ -4,6 +4,7 @@ #include <atomic> #include <arbor/assert.hpp> +#include <arbor/execution_context.hpp> namespace arb { namespace util { @@ -13,7 +14,7 @@ template <typename T> class double_buffer { private: std::atomic<int> index_; - std::array<T, 2> buffers_; + std::vector<T> buffers_; int other_index() { return index_ ? 0 : 1; @@ -23,9 +24,15 @@ public: using value_type = T; double_buffer() : - index_(0) + index_(0), buffers_(2) {} + double_buffer(T l, T r): index_(0) { + buffers_.reserve(2); + buffers_.push_back(std::move(l)); + buffers_.push_back(std::move(r)); + } + /// remove the copy and move constructors which won't work with std::atomic double_buffer(double_buffer&&) = delete; double_buffer(const double_buffer&) = delete; diff --git a/arbor/util/range.hpp b/arbor/util/range.hpp index fbec6108..3f692016 100644 --- a/arbor/util/range.hpp +++ b/arbor/util/range.hpp @@ -26,10 +26,6 @@ #include <type_traits> #include <utility> -#ifdef ARB_HAVE_TBB -#include <tbb/tbb_stddef.h> -#endif - #include <arbor/assert.hpp> #include <util/counter.hpp> @@ -133,42 +129,6 @@ struct range { data() const { return left; } - -#ifdef ARB_HAVE_TBB - template < - typename V = iterator, - typename = std::enable_if_t<is_forward_iterator<V>::value> - > - range(range& r, tbb::split): - left(r.left), right(r.right) - { - std::advance(left, r.size()/2u); - r.right = left; - } - - template < - typename V = iterator, - typename = std::enable_if_t<is_forward_iterator<V>::value> - > - range(range& r, tbb::proportional_split p): - left(r.left), right(r.right) - { - size_type i = (r.size()*p.left())/(p.left()+p.right()); - if (i<1) { - i = 1; - } - std::advance(left, i); - r.right = left; - } - - bool is_divisible() const { - return is_forward_iterator<U>::value && left != right && std::next(left) != right; - } - - static constexpr bool is_splittable_in_proportion() { - return is_forward_iterator<U>::value; - } -#endif }; template <typename U, typename V> diff --git a/cmake/FindTBB.cmake b/cmake/FindTBB.cmake deleted file mode 100644 index 1b4b85fa..00000000 --- a/cmake/FindTBB.cmake +++ /dev/null @@ -1,60 +0,0 @@ -# Find the Intel Thread Building Blocks library -# -# Sets the following variables: -# -# TBB_FOUND - True if libtbb and libtbb_malloc found. -# TBB_LIBRARIES - Paths to libtbb and libtbbmalloc. -# TBB_INCLUDE_DIR - Base directory for tbb/ includes. -# -# Generates the import library target TBB:tbb if found. -# -# The default search path can be overriden by setting the -# CMake variable TBB_ROOT_DIR or the environment variables -# TBBROOT or TBB_ROOT. - -if(NOT TBB_FOUND) - find_package(Threads REQUIRED) - - set(_tbb_search_path ${TBB_ROOT_DIR} $ENV{TBBROOT} $ENV{TBB_ROOT}) - set(_tbb_lib_suffixes lib/intel64/gcc4.7 lib/intel64/gcc4.4 lib/gcc4.7 lib/gcc4.4 lib/android lib/mic lib) - - macro(_tbb_findlib libname) - find_library(_lib${libname} ${libname} - PATHS ${_tbb_search_path} NO_DEFAULT_PATH - PATH_SUFFIXES ${_tbb_lib_suffixes}) - find_library(_lib${libname} ${libname} - PATH_SUFFIXES ${_tbb_lib_suffixes}) - endmacro() - - _tbb_findlib(tbb) - _tbb_findlib(tbbmalloc) - - find_path(TBB_INCLUDE_DIR tbb/tbb.h PATHS ${_tbb_search_path} NO_DEFAULT_PATH PATH_SUFFIXES include) - find_path(TBB_INCLUDE_DIR tbb/tbb.h) - - include(FindPackageHandleStandardArgs) - find_package_handle_standard_args(TBB DEFAULT_MSG TBB_INCLUDE_DIR _libtbb _libtbbmalloc) - - if(TBB_FOUND) - set(TBB_INCLUDE_DIRS ${TBB_INCLUDE_DIR}) - set(TBB_LIBRARIES ${_libtbb} ${_libtbbmalloc}) - if(NOT TARGET TBB::tbb) - if("${_libtbb}" MATCHES "\.a$") - add_library(TBB::tbb STATIC IMPORTED GLOBAL) - else() - add_library(TBB::tbb SHARED IMPORTED GLOBAL) - endif() - set_target_properties(TBB::tbb PROPERTIES - IMPORTED_LOCATION "${_libtbb}" - INTERFACE_LINK_LIBRARIES "${_libtbbmalloc}" Threads::Threads ${CMAKE_DL_LIBS} - INTERFACE_INCLUDE_DIRECTORIES "${TBB_INCLUDE_DIR}" - ) - endif() - endif() - mark_as_advanced(TBB_INCLUDE_DIR) - - unset(_tbb_search_path) - unset(_tbb_lib_suffixes) - unset(_libtbb) - unset(_libtbbmalloc) -endif() diff --git a/example/bench/bench.cpp b/example/bench/bench.cpp index 40414176..52e11c20 100644 --- a/example/bench/bench.cpp +++ b/example/bench/bench.cpp @@ -10,12 +10,14 @@ #include <arbor/profile/meter_manager.hpp> #include <arbor/common_types.hpp> -#include <arbor/distributed_context.hpp> +#include <arbor/execution_context.hpp> #include <arbor/domain_decomposition.hpp> #include <arbor/load_balance.hpp> #include <arbor/profile/profiler.hpp> #include <arbor/recipe.hpp> #include <arbor/simulation.hpp> +#include <arbor/threadinfo.hpp> + #include <aux/ioutil.hpp> #include <aux/json_meter.hpp> @@ -30,12 +32,12 @@ namespace profile = arb::profile; int main(int argc, char** argv) { try { - arb::distributed_context context; + arb::execution_context context; #ifdef ARB_HAVE_MPI aux::with_mpi guard(&argc, &argv); - context = mpi_context(MPI_COMM_WORLD); + context.distributed = mpi_context(MPI_COMM_WORLD); #endif - const bool is_root = context.id()==0; + const bool is_root = context.distributed.id()==0; std::cout << aux::mask_stream(is_root); @@ -43,7 +45,7 @@ int main(int argc, char** argv) { std::cout << params << "\n"; - profile::meter_manager meters(&context); + profile::meter_manager meters(&context.distributed); meters.start(); // Create an instance of our recipe. diff --git a/example/brunel/brunel_miniapp.cpp b/example/brunel/brunel_miniapp.cpp index fb1195f1..463be264 100644 --- a/example/brunel/brunel_miniapp.cpp +++ b/example/brunel/brunel_miniapp.cpp @@ -32,7 +32,7 @@ using namespace arb; -void banner(proc_allocation, const distributed_context*); +void banner(proc_allocation, const execution_context*); // Samples m unique values in interval [start, end) - gid. // We exclude gid because we don't want self-loops. @@ -187,18 +187,18 @@ private: }; int main(int argc, char** argv) { - distributed_context context; + execution_context context; try { #ifdef ARB_MPI_ENABLED with_mpi guard(argc, argv, false); - context = mpi_context(MPI_COMM_WORLD); + context.distributed = mpi_context(MPI_COMM_WORLD); #endif - arb::profile::meter_manager meters(&context); + arb::profile::meter_manager meters(&context.distributed); meters.start(); - std::cout << aux::mask_stream(context.id()==0); + std::cout << aux::mask_stream(context.distributed.id()==0); // read parameters - io::cl_options options = io::read_options(argc, argv, context.id()==0); + io::cl_options options = io::read_options(argc, argv, context.distributed.id()==0); proc_allocation nd = local_allocation(); banner(nd, &context); @@ -246,7 +246,7 @@ int main(int argc, char** argv) { if (options.spike_file_output) { using std::ios_base; - auto rank = context.id(); + auto rank = context.distributed.id(); aux::path p = options.output_path; p /= aux::strsub("%_%.%", options.file_name, rank, options.file_extension); @@ -273,7 +273,7 @@ int main(int argc, char** argv) { auto report = profile::make_meter_report(meters); std::cout << report; - if (context.id()==0) { + if (context.distributed.id()==0) { std::ofstream fid; fid.exceptions(std::ios_base::badbit | std::ios_base::failbit); fid.open("meters.json"); @@ -282,7 +282,7 @@ int main(int argc, char** argv) { } catch (io::usage_error& e) { // only print usage/startup errors on master - std::cerr << aux::mask_stream(context.id()==0); + std::cerr << aux::mask_stream(context.distributed.id()==0); std::cerr << e.what() << "\n"; return 1; } @@ -293,11 +293,11 @@ int main(int argc, char** argv) { return 0; } -void banner(proc_allocation nd, const distributed_context* ctx) { +void banner(proc_allocation nd, const execution_context* ctx) { std::cout << "==========================================\n"; std::cout << " Arbor miniapp\n"; - std::cout << " - distributed : " << ctx->size() - << " (" << ctx->name() << ")\n"; + std::cout << " - distributed : " << ctx->distributed.size() + << " (" << ctx->distributed.name() << ")\n"; std::cout << " - threads : " << nd.num_threads << " (" << arb::thread_implementation() << ")\n"; std::cout << " - gpus : " << nd.num_gpus << "\n"; diff --git a/example/generators/event_gen.cpp b/example/generators/event_gen.cpp index f3c577c3..fd7423d4 100644 --- a/example/generators/event_gen.cpp +++ b/example/generators/event_gen.cpp @@ -128,7 +128,7 @@ int main() { // A distributed_context is required for distributed computation (e.g. MPI). // For this simple one-cell example, non-distributed context is suitable, // which is what we get with a default-constructed distributed_context. - arb::distributed_context context; + arb::execution_context context; // Create an instance of our recipe. generator_recipe recipe; diff --git a/example/miniapp/miniapp.cpp b/example/miniapp/miniapp.cpp index e384a015..edbc220f 100644 --- a/example/miniapp/miniapp.cpp +++ b/example/miniapp/miniapp.cpp @@ -6,7 +6,7 @@ #include <vector> #include <arbor/common_types.hpp> -#include <arbor/distributed_context.hpp> +#include <arbor/execution_context.hpp> #include <arbor/load_balance.hpp> #include <arbor/mc_cell.hpp> #include <arbor/profile/meter_manager.hpp> @@ -36,7 +36,7 @@ using namespace arb; using util::any_cast; -void banner(proc_allocation, const distributed_context*); +void banner(proc_allocation, const execution_context*); std::unique_ptr<recipe> make_recipe(const io::cl_options&, const probe_distribution&); sample_trace make_trace(const probe_info& probe); std::fstream& open_or_throw(std::fstream& file, const aux::path& p, bool exclusive = false); @@ -44,20 +44,20 @@ void report_compartment_stats(const recipe&); int main(int argc, char** argv) { // default serial context - distributed_context context; + execution_context context; try { #ifdef ARB_MPI_ENABLED with_mpi guard(argc, argv, false); - context = mpi_context(MPI_COMM_WORLD); + context.distributed = mpi_context(MPI_COMM_WORLD); #endif - profile::meter_manager meters(&context); + profile::meter_manager meters(&context.distributed); meters.start(); - std::cout << aux::mask_stream(context.id()==0); + std::cout << aux::mask_stream(context.distributed.id()==0); // read parameters - io::cl_options options = io::read_options(argc, argv, context.id()==0); + io::cl_options options = io::read_options(argc, argv, context.distributed.id()==0); // TODO: add dry run mode @@ -117,7 +117,7 @@ int main(int argc, char** argv) { if (options.spike_file_output) { using std::ios_base; - auto rank = context.id(); + auto rank = context.distributed.id(); aux::path p = options.output_path; p /= aux::strsub("%_%.%", options.file_name, rank, options.file_extension); @@ -151,7 +151,7 @@ int main(int argc, char** argv) { auto report = profile::make_meter_report(meters); std::cout << report; - if (context.id()==0) { + if (context.distributed.id()==0) { std::ofstream fid; fid.exceptions(std::ios_base::badbit | std::ios_base::failbit); fid.open("meters.json"); @@ -160,7 +160,7 @@ int main(int argc, char** argv) { } catch (io::usage_error& e) { // only print usage/startup errors on master - std::cerr << aux::mask_stream(context.id()==0); + std::cerr << aux::mask_stream(context.distributed.id()==0); std::cerr << e.what() << "\n"; return 1; } @@ -171,11 +171,11 @@ int main(int argc, char** argv) { return 0; } -void banner(proc_allocation nd, const distributed_context* ctx) { +void banner(proc_allocation nd, const execution_context* ctx) { std::cout << "==========================================\n"; std::cout << " Arbor miniapp\n"; - std::cout << " - distributed : " << ctx->size() - << " (" << ctx->name() << ")\n"; + std::cout << " - distributed : " << ctx->distributed.size() + << " (" << ctx->distributed.name() << ")\n"; std::cout << " - threads : " << nd.num_threads << " (" << arb::thread_implementation() << ")\n"; std::cout << " - gpus : " << nd.num_gpus << "\n"; diff --git a/ext/CMakeLists.txt b/ext/CMakeLists.txt index 05f93fa1..49aa903e 100644 --- a/ext/CMakeLists.txt +++ b/ext/CMakeLists.txt @@ -8,40 +8,6 @@ target_include_directories(ext-json INTERFACE json/single_include) add_library(ext-tclap INTERFACE) target_include_directories(ext-tclap INTERFACE tclap/include) -# Intel TBB: -# Alias system TBB or build locally and export that, according -# to ARB_PRIVATE_TBBLIB setting. - -find_package(TBB) -if(ARB_PRIVATE_TBBLIB OR NOT TBB_FOUND) - check_git_submodule(tbb tbb) - add_target_if(tbb_avail check-tbb-submodule "Checking TBB submodule" "TBB git submodule required") - - # Turn off proxy malloc library and test compilation. - option(TBB_BUILD_TBBMALLOC_PROXY "" OFF) - option(TBB_BUILD_TESTS "" OFF) - # Only make static libraries. - option(TBB_BUILD_SHARED "" OFF) - option(TBB_BUILD_STATIC "" ON) - - add_subdirectory(tbb EXCLUDE_FROM_ALL) - - add_library(ext-tbb INTERFACE) - add_dependencies(ext-tbb check-tbb-submodule) - target_link_libraries(ext-tbb INTERFACE tbb_static tbbmalloc_static) - target_include_directories(ext-tbb SYSTEM INTERFACE tbb/include) - - # Can't use install(TARGETS) because 1) tbb targets are defined in - # a subdirectory and 2) having been excluded-from-all, the behaviour - # might have been undefined anyway. Seriously. - - install(FILES "$<TARGET_FILE:tbb_static>" "$<TARGET_FILE:tbbmalloc_static>" DESTINATION ${CMAKE_INSTALL_LIBDIR} OPTIONAL) -else() - add_library(ext-tbb INTERFACE) - target_link_libraries(ext-tbb INTERFACE TBB::tbb) -endif() - - # Google benchmark for microbenchmarks: check_git_submodule(gbench google-benchmark) diff --git a/ext/tbb b/ext/tbb deleted file mode 160000 index a0dc9bf7..00000000 --- a/ext/tbb +++ /dev/null @@ -1 +0,0 @@ -Subproject commit a0dc9bf76d0120f917b641ed095360448cabc85b diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 9476c74c..23e4a59b 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -38,11 +38,6 @@ endif() if(ARB_WITH_PROFILING) list(APPEND arb_features PROFILE) endif() -if(ARB_WITH_TBB) - list(APPEND arb_features TBB) -elseif(ARB_WITH_CTHREAD) - list(APPEND arb_features CTHREAD) -endif() add_custom_command( OUTPUT version.hpp-test diff --git a/include/arbor/execution_context.hpp b/include/arbor/execution_context.hpp new file mode 100644 index 00000000..fac08b82 --- /dev/null +++ b/include/arbor/execution_context.hpp @@ -0,0 +1,29 @@ +#pragma once + +#include <memory> +#include <string> + +#include <arbor/domain_decomposition.hpp> +#include <arbor/distributed_context.hpp> +#include <arbor/util/pp_util.hpp> +#include <arbor/threadinfo.hpp> + + +namespace arb { +namespace threading { + class task_system; +} +using task_system_handle = std::shared_ptr<threading::task_system>; + +task_system_handle make_thread_pool (int nthreads); + +struct execution_context { + // TODO: use a shared_ptr for distributed_context + distributed_context distributed; + task_system_handle thread_pool; + + execution_context(): thread_pool(arb::make_thread_pool(arb::num_threads())) {}; + execution_context(proc_allocation nd): thread_pool(arb::make_thread_pool(nd.num_threads)) {}; +}; + +} diff --git a/include/arbor/load_balance.hpp b/include/arbor/load_balance.hpp index 8235da03..5866f4a8 100644 --- a/include/arbor/load_balance.hpp +++ b/include/arbor/load_balance.hpp @@ -1,6 +1,6 @@ #pragma once -#include <arbor/distributed_context.hpp> +#include <arbor/execution_context.hpp> #include <arbor/domain_decomposition.hpp> #include <arbor/recipe.hpp> @@ -19,7 +19,7 @@ using partition_hint_map = std::unordered_map<cell_kind, partition_hint>; domain_decomposition partition_load_balance( const recipe& rec, proc_allocation nd, - const distributed_context* ctx, + const execution_context* ctx, partition_hint_map hint_map = {}); } // namespace arb diff --git a/include/arbor/simulation.hpp b/include/arbor/simulation.hpp index 844380c6..d652108e 100644 --- a/include/arbor/simulation.hpp +++ b/include/arbor/simulation.hpp @@ -6,7 +6,7 @@ #include <vector> #include <arbor/common_types.hpp> -#include <arbor/distributed_context.hpp> +#include <arbor/execution_context.hpp> #include <arbor/domain_decomposition.hpp> #include <arbor/recipe.hpp> #include <arbor/sampling.hpp> @@ -22,7 +22,7 @@ class simulation_state; class simulation { public: - simulation(const recipe& rec, const domain_decomposition& decomp, const distributed_context* ctx); + simulation(const recipe& rec, const domain_decomposition& decomp, const execution_context* ctx); void reset(); diff --git a/scripts/travis/build.sh b/scripts/travis/build.sh index 3a6e3dbc..583daec5 100755 --- a/scripts/travis/build.sh +++ b/scripts/travis/build.sh @@ -46,7 +46,7 @@ cd $build_path # progress "Configuring with cmake" -cmake_flags="-DARB_WITH_ASSERTIONS=on -DARB_THREADING_MODEL=${WITH_THREAD} -DARB_WITH_MPI=${WITH_MPI} ${CXX_FLAGS}" +cmake_flags="-DARB_WITH_ASSERTIONS=on -DARB_WITH_MPI=${WITH_MPI} ${CXX_FLAGS}" echo "cmake flags: ${cmake_flags}" cmake .. ${cmake_flags} || error "unable to configure cmake" diff --git a/test/ubench/task_system.cpp b/test/ubench/task_system.cpp index 17f5486b..e0a7303d 100644 --- a/test/ubench/task_system.cpp +++ b/test/ubench/task_system.cpp @@ -7,24 +7,19 @@ #include <thread> #include <arbor/threadinfo.hpp> - #include <arbor/version.hpp> -#if defined(ARB_TBB_ENABLED) - #include "threading/tbb.hpp" -#elif defined(ARB_CTHREAD_ENABLED) - #include "threading/cthread.hpp" -#else - #include "threading/serial.hpp" -#endif + +#include "threading/cthread.hpp" #include <benchmark/benchmark.h> using namespace arb; void run(unsigned long us_per_task, unsigned tasks) { + arb::threading::task_system ts(arb::num_threads()); auto duration = std::chrono::microseconds(us_per_task); arb::threading::parallel_for::apply( - 0, tasks, + 0, tasks, &ts, [&](unsigned i){std::this_thread::sleep_for(duration);}); } diff --git a/test/unit-distributed/test.cpp b/test/unit-distributed/test.cpp index f5be0bdd..4db82042 100644 --- a/test/unit-distributed/test.cpp +++ b/test/unit-distributed/test.cpp @@ -5,7 +5,8 @@ #include "../gtest.h" -#include <arbor/distributed_context.hpp> +#include <arbor/execution_context.hpp> +#include "arbor/threadinfo.hpp" #include <aux/ioutil.hpp> #include <aux/tinyopt.hpp> @@ -17,7 +18,7 @@ using namespace arb; -distributed_context g_context; +execution_context g_context; const char* usage_str = "[OPTION]...\n" @@ -28,9 +29,9 @@ const char* usage_str = int main(int argc, char **argv) { #ifdef TEST_MPI with_mpi guard(argc, argv, false); - g_context = mpi_context(MPI_COMM_WORLD); + g_context.distributed = mpi_context(MPI_COMM_WORLD); #elif defined(TEST_LOCAL) - g_context = local_context(); + g_context.distributed = local_context(); #else #error "define TEST_MPI or TEST_LOCAL for distributed test" #endif @@ -42,7 +43,7 @@ int main(int argc, char **argv) { auto& listeners = testing::UnitTest::GetInstance()->listeners(); // replace original printer with our custom printer delete listeners.Release(listeners.default_result_printer()); - listeners.Append(new distributed_listener("run_"+g_context.name(), &g_context)); + listeners.Append(new distributed_listener("run_"+g_context.distributed.name(), &g_context.distributed)); int return_value = 0; try { @@ -84,5 +85,5 @@ int main(int argc, char **argv) { // perform global collective, to ensure that all ranks return // the same exit code - return g_context.max(return_value); + return g_context.distributed.max(return_value); } diff --git a/test/unit-distributed/test.hpp b/test/unit-distributed/test.hpp index b7d4679a..630bd188 100644 --- a/test/unit-distributed/test.hpp +++ b/test/unit-distributed/test.hpp @@ -1,7 +1,7 @@ #pragma once -#include <arbor/distributed_context.hpp> +#include <arbor/execution_context.hpp> // Global context is a global variable, set in the main() funtion of the main // test driver test.cpp. -extern arb::distributed_context g_context; +extern arb::execution_context g_context; diff --git a/test/unit-distributed/test_communicator.cpp b/test/unit-distributed/test_communicator.cpp index 6e2907d2..4e7daef3 100644 --- a/test/unit-distributed/test_communicator.cpp +++ b/test/unit-distributed/test_communicator.cpp @@ -8,6 +8,7 @@ #include <arbor/domain_decomposition.hpp> #include <arbor/load_balance.hpp> #include <arbor/spike_event.hpp> +#include <threading/cthread.hpp> #include "communication/communicator.hpp" #include "util/filter.hpp" @@ -22,12 +23,12 @@ static bool is_dry_run() { } TEST(communicator, policy_basics) { - const auto num_domains = g_context.size(); - const auto rank = g_context.id(); + const auto num_domains = g_context.distributed.size(); + const auto rank = g_context.distributed.id(); - EXPECT_EQ(g_context.min(rank), 0); + EXPECT_EQ(g_context.distributed.min(rank), 0); if (!is_dry_run()) { - EXPECT_EQ(g_context.max(rank), num_domains-1); + EXPECT_EQ(g_context.distributed.max(rank), num_domains-1); } } @@ -51,8 +52,8 @@ int get_value(const arb::spike& s) { // Test low level spike_gather function when each domain produces the same // number of spikes in the pattern used by dry run mode. TEST(communicator, gather_spikes_equal) { - const auto num_domains = g_context.size(); - const auto rank = g_context.id(); + const auto num_domains = g_context.distributed.size(); + const auto rank = g_context.distributed.id(); const auto n_local_spikes = 10; @@ -71,7 +72,7 @@ TEST(communicator, gather_spikes_equal) { } // Perform exchange - const auto global_spikes = g_context.gather_spikes(local_spikes); + const auto global_spikes = g_context.distributed.gather_spikes(local_spikes); // Test that partition information is correct const auto& part = global_spikes.partition(); @@ -91,7 +92,7 @@ TEST(communicator, gather_spikes_equal) { // is a list of num_domains*n_local_spikes spikes that have // contiguous source gid const auto& spikes = global_spikes.values(); - EXPECT_EQ(n_local_spikes*g_context.size(), int(spikes.size())); + EXPECT_EQ(n_local_spikes*g_context.distributed.size(), int(spikes.size())); for (auto i=0u; i<spikes.size(); ++i) { const auto s = spikes[i]; EXPECT_EQ(i, unsigned(s.source.gid)); @@ -112,8 +113,8 @@ TEST(communicator, gather_spikes_variant) { // number of spikes. if (is_dry_run()) return; - const auto num_domains = g_context.size(); - const auto rank = g_context.id(); + const auto num_domains = g_context.distributed.size(); + const auto rank = g_context.distributed.id(); // Parameter used to scale the number of spikes generated on successive // ranks. @@ -137,7 +138,7 @@ TEST(communicator, gather_spikes_variant) { } // Perform exchange - const auto global_spikes = g_context.gather_spikes(local_spikes); + const auto global_spikes = g_context.distributed.gather_spikes(local_spikes); // Test that partition information is correct const auto& part =global_spikes.partition(); @@ -167,7 +168,7 @@ namespace { public: ring_recipe(cell_size_type s): size_(s), - ranks_(g_context.size()) + ranks_(g_context.distributed.size()) {} cell_size_type num_cells() const override { @@ -231,7 +232,7 @@ namespace { public: all2all_recipe(cell_size_type s): size_(s), - ranks_(g_context.size()) + ranks_(g_context.distributed.size()) {} cell_size_type num_cells() const override { @@ -314,10 +315,10 @@ test_ring(const domain_decomposition& D, communicator& C, F&& f) { // gather the global set of spikes auto global_spikes = C.exchange(local_spikes); - if (global_spikes.size()!=g_context.sum(local_spikes.size())) { + if (global_spikes.size()!=g_context.distributed.sum(local_spikes.size())) { return ::testing::AssertionFailure() << "the number of gathered spikes " << global_spikes.size() << " doesn't match the expected " - << g_context.sum(local_spikes.size()); + << g_context.distributed.sum(local_spikes.size()); } // generate the events @@ -363,7 +364,7 @@ TEST(communicator, ring) using util::make_span; // construct a homogeneous network of 10*n_domain identical cells in a ring - unsigned N = g_context.size(); + unsigned N = g_context.distributed.size(); unsigned n_local = 10u; unsigned n_global = n_local*N; @@ -405,10 +406,10 @@ test_all2all(const domain_decomposition& D, communicator& C, F&& f) { // gather the global set of spikes auto global_spikes = C.exchange(local_spikes); - if (global_spikes.size()!=g_context.sum(local_spikes.size())) { + if (global_spikes.size()!=g_context.distributed.sum(local_spikes.size())) { return ::testing::AssertionFailure() << "the number of gathered spikes " << global_spikes.size() << " doesn't match the expected " - << g_context.sum(local_spikes.size()); + << g_context.distributed.sum(local_spikes.size()); } // generate the events @@ -458,7 +459,7 @@ TEST(communicator, all2all) using util::make_span; // construct a homogeneous network of 10*n_domain identical cells in a ring - unsigned N = g_context.size(); + unsigned N = g_context.distributed.size(); unsigned n_local = 10u; unsigned n_global = n_local*N; diff --git a/test/unit-distributed/test_domain_decomposition.cpp b/test/unit-distributed/test_domain_decomposition.cpp index bade370f..db3c1f1b 100644 --- a/test/unit-distributed/test_domain_decomposition.cpp +++ b/test/unit-distributed/test_domain_decomposition.cpp @@ -65,8 +65,8 @@ namespace { } TEST(domain_decomposition, homogeneous_population) { - const auto N = g_context.size(); - const auto I = g_context.id(); + const auto N = g_context.distributed.size(); + const auto I = g_context.distributed.id(); { // Test on a node with 1 cpu core and no gpus. // We assume that all cells will be put into cell groups of size 1. @@ -134,8 +134,8 @@ TEST(domain_decomposition, homogeneous_population) { } TEST(domain_decomposition, heterogeneous_population) { - const auto N = g_context.size(); - const auto I = g_context.id(); + const auto N = g_context.distributed.size(); + const auto I = g_context.distributed.id(); { // Test on a node with 1 cpu core and no gpus. // We assume that all cells will be put into cell groups of size 1. diff --git a/test/unit/test_algorithms.cpp b/test/unit/test_algorithms.cpp index 8a2aef6d..a238565e 100644 --- a/test/unit/test_algorithms.cpp +++ b/test/unit/test_algorithms.cpp @@ -12,18 +12,11 @@ // (Pending abstraction of threading interface) #include <arbor/version.hpp> -#if defined(ARB_TBB_ENABLED) - #include "threading/tbb.hpp" -#elif defined(ARB_CTHREAD_ENABLED) - #include "threading/cthread.hpp" -#else - #include "threading/serial.hpp" -#endif - +#include "threading/cthread.hpp" #include "common.hpp" /// tests the sort implementation in threading -/// is only parallel if TBB is being used +/// Not parallel TEST(algorithms, parallel_sort) { auto n = 10000; diff --git a/test/unit/test_domain_decomposition.cpp b/test/unit/test_domain_decomposition.cpp index effb760b..1e630d97 100644 --- a/test/unit/test_domain_decomposition.cpp +++ b/test/unit/test_domain_decomposition.cpp @@ -2,7 +2,7 @@ #include <stdexcept> -#include <arbor/distributed_context.hpp> +#include <arbor/execution_context.hpp> #include <arbor/domain_decomposition.hpp> #include <arbor/load_balance.hpp> @@ -48,7 +48,7 @@ namespace { // test assumes one domain TEST(domain_decomposition, homogenous_population) { - distributed_context context; + execution_context context; { // Test on a node with 1 cpu core and no gpus. // We assume that all cells will be put into cell groups of size 1. @@ -108,7 +108,7 @@ TEST(domain_decomposition, homogenous_population) TEST(domain_decomposition, heterogenous_population) { - distributed_context context; + execution_context context; { // Test on a node with 1 cpu core and no gpus. // We assume that all cells will be put into cell groups of size 1. @@ -193,7 +193,7 @@ TEST(domain_decomposition, hints) { // Check that we can provide group size hint and gpu/cpu preference // by cell kind. - distributed_context context; + execution_context context; partition_hint_map hints; hints[cell_kind::cable1d_neuron].cpu_group_size = 3; diff --git a/test/unit/test_fvm_lowered.cpp b/test/unit/test_fvm_lowered.cpp index f81069f6..b93d8195 100644 --- a/test/unit/test_fvm_lowered.cpp +++ b/test/unit/test_fvm_lowered.cpp @@ -328,7 +328,7 @@ TEST(fvm_lowered, derived_mechs) { float times[] = {10.f, 20.f}; - distributed_context context; + execution_context context; auto decomp = partition_load_balance(rec, proc_allocation{1, 0}, &context); simulation sim(rec, decomp, &context); sim.add_sampler(all_probes, explicit_schedule(times), sampler); diff --git a/test/unit/test_lif_cell_group.cpp b/test/unit/test_lif_cell_group.cpp index 450226b0..80b1909b 100644 --- a/test/unit/test_lif_cell_group.cpp +++ b/test/unit/test_lif_cell_group.cpp @@ -155,7 +155,7 @@ TEST(lif_cell_group, spikes) { // make two lif cells path_recipe recipe(2, 1000, 0.1); - distributed_context context; + execution_context context; proc_allocation nd = local_allocation(); auto decomp = partition_load_balance(recipe, nd, &context); @@ -194,9 +194,8 @@ TEST(lif_cell_group, ring) // Total simulation time. time_type simulation_time = 100; - distributed_context context; + execution_context context; proc_allocation nd = local_allocation(); - auto recipe = ring_recipe(num_lif_cells, weight, delay); auto decomp = partition_load_balance(recipe, nd, &context); diff --git a/test/unit/test_range.cpp b/test/unit/test_range.cpp index 12049ba1..c8c0a82e 100644 --- a/test/unit/test_range.cpp +++ b/test/unit/test_range.cpp @@ -9,10 +9,6 @@ #include <type_traits> #include <unordered_map> -#ifdef ARB_HAVE_TBB -#include <tbb/tbb_stddef.h> -#endif - #include <util/counter.hpp> #include <util/meta.hpp> #include <util/range.hpp> @@ -661,50 +657,3 @@ TEST(range, reverse) { EXPECT_EQ("olleh"s, rev); } - - -#ifdef ARB_HAVE_TBB - -TEST(range, tbb_split) { - constexpr std::size_t N = 20; - int xs[N]; - - for (unsigned i = 0; i<N; ++i) { - xs[i] = i; - } - - auto s = util::make_range(&xs[0], &xs[0]+N); - - while (s.size()>1) { - auto ssize = s.size(); - auto r = decltype(s){s, tbb::split{}}; - EXPECT_GT(r.size(), 0u); - EXPECT_GT(s.size(), 0u); - EXPECT_EQ(ssize, r.size()+s.size()); - EXPECT_EQ(s.end(), r.begin()); - - EXPECT_TRUE(r.size()>1 || !r.is_divisible()); - EXPECT_TRUE(s.size()>1 || !s.is_divisible()); - } - - for (unsigned i = 1; i<N-1; ++i) { - s = util::make_range(&xs[0], &xs[0]+N); - // expect exact splitting by proportion in this instance - - auto r = decltype(s){s, tbb::proportional_split{i, N-i}}; - EXPECT_EQ(&xs[0], s.left); - EXPECT_EQ(&xs[0]+i, s.right); - EXPECT_EQ(&xs[0]+i, r.left); - EXPECT_EQ(&xs[0]+N, r.right); - } -} - -TEST(range, tbb_no_split) { - std::istringstream sin("10 9 8 7 6"); - auto s = util::make_range(std::istream_iterator<int>(sin), std::istream_iterator<int>()); - - EXPECT_FALSE(decltype(s)::is_splittable_in_proportion()); - EXPECT_FALSE(s.is_divisible()); -} - -#endif diff --git a/test/unit/test_spike_store.cpp b/test/unit/test_spike_store.cpp index 9a526cc2..11bd1123 100644 --- a/test/unit/test_spike_store.cpp +++ b/test/unit/test_spike_store.cpp @@ -1,6 +1,7 @@ #include "../gtest.h" #include <arbor/spike.hpp> +#include <arbor/execution_context.hpp> #include "thread_private_spike_store.hpp" @@ -10,7 +11,8 @@ TEST(spike_store, insert) { using store_type = arb::thread_private_spike_store; - store_type store; + arb::execution_context context; + store_type store(context.thread_pool); // insert 3 spike events and check that they were inserted correctly store.insert({ @@ -54,7 +56,8 @@ TEST(spike_store, clear) { using store_type = arb::thread_private_spike_store; - store_type store; + arb::execution_context context; + store_type store(context.thread_pool); // insert 3 spike events store.insert({ @@ -69,7 +72,8 @@ TEST(spike_store, gather) { using store_type = arb::thread_private_spike_store; - store_type store; + arb::execution_context context; + store_type store(context.thread_pool); std::vector<spike> spikes = { {{0,0}, 0.0f}, {{1,2}, 0.5f}, {{2,4}, 1.0f} }; diff --git a/test/unit/test_thread.cpp b/test/unit/test_thread.cpp index 2e19b1b8..298e4c51 100644 --- a/test/unit/test_thread.cpp +++ b/test/unit/test_thread.cpp @@ -1,13 +1,13 @@ #include "../gtest.h" #include "common.hpp" #include <arbor/threadinfo.hpp> +#include <arbor/execution_context.hpp> #include <iostream> #include <ostream> // (Pending abstraction of threading interface) #include <arbor/version.hpp> -#if defined(ARB_CTHREAD_ENABLED) #include "threading/cthread.hpp" using namespace arb::threading::impl; @@ -43,26 +43,28 @@ struct ftor_wait { ftor_wait() {} void operator()() const { - auto duration = std::chrono::microseconds(500); + auto duration = std::chrono::microseconds(100); std::this_thread::sleep_for(duration); } }; struct ftor_parallel_wait { - ftor_parallel_wait() {} + ftor_parallel_wait(task_system* ts): ts{ts} {} void operator()() const { auto nthreads = num_threads(); - auto duration = std::chrono::microseconds(500); - parallel_for::apply(0, nthreads, [=](int i){ std::this_thread::sleep_for(duration);}); + auto duration = std::chrono::microseconds(100); + parallel_for::apply(0, nthreads, ts, [=](int i){ std::this_thread::sleep_for(duration);}); } + + task_system* ts; }; } TEST(task_system, test_copy) { - task_system &ts = task_system::get_global_task_system(); + task_system ts(num_threads()); ftor f; ts.async(f); @@ -74,10 +76,10 @@ TEST(task_system, test_copy) { } TEST(task_system, test_move) { - task_system &s = task_system::get_global_task_system(); + task_system ts(num_threads()); ftor f; - s.async(std::move(f)); + ts.async(std::move(f)); // Move into new ftor and move ftor into a task (std::function<void()>) EXPECT_LE(nmove, 2); @@ -110,7 +112,8 @@ TEST(notification_queue, test_move) { } TEST(task_group, test_copy) { - task_group g; + task_system ts(num_threads()); + task_group g(&ts); ftor f; g.run(f); @@ -123,7 +126,8 @@ TEST(task_group, test_copy) { } TEST(task_group, test_move) { - task_group g; + task_system ts(num_threads()); + task_group g(&ts); ftor f; g.run(std::move(f)); @@ -137,7 +141,9 @@ TEST(task_group, test_move) { TEST(task_group, individual_tasks) { // Simple check for deadlock - task_group g; + task_system ts(num_threads()); + task_group g(&ts); + auto nthreads = num_threads(); ftor_wait f; @@ -149,10 +155,11 @@ TEST(task_group, individual_tasks) { TEST(task_group, parallel_for_sleep) { // Simple check for deadlock for nested parallelism - task_group g; auto nthreads = num_threads(); + task_system ts(nthreads); + task_group g(&ts); - ftor_parallel_wait f; + ftor_parallel_wait f(&ts); for (int i = 0; i < nthreads; i++) { g.run(f); } @@ -160,10 +167,10 @@ TEST(task_group, parallel_for_sleep) { } TEST(task_group, parallel_for) { - + task_system ts(num_threads()); for (int n = 0; n < 10000; n=!n?1:2*n) { std::vector<int> v(n, -1); - parallel_for::apply(0, n, [&](int i) {v[i] = i;}); + parallel_for::apply(0, n, &ts, [&](int i) {v[i] = i;}); for (int i = 0; i< n; i++) { EXPECT_EQ(i, v[i]); } @@ -171,13 +178,13 @@ TEST(task_group, parallel_for) { } TEST(task_group, nested_parallel_for) { - + task_system ts(num_threads()); for (int m = 1; m < 512; m*=2) { for (int n = 0; n < 1000; n=!n?1:2*n) { std::vector<std::vector<int>> v(n, std::vector<int>(m, -1)); - parallel_for::apply(0, n, [&](int i) { + parallel_for::apply(0, n, &ts, [&](int i) { auto &w = v[i]; - parallel_for::apply(0, m, [&](int j) { w[j] = i + j; }); + parallel_for::apply(0, m, &ts, [&](int j) { w[j] = i + j; }); }); for (int i = 0; i < n; i++) { for (int j = 0; j < m; j++) { @@ -189,8 +196,9 @@ TEST(task_group, nested_parallel_for) { } TEST(enumerable_thread_specific, test) { - enumerable_thread_specific<int> buffers(0); - task_group g; + task_system_handle ts = task_system_handle(new task_system(num_threads())); + enumerable_thread_specific<int> buffers(ts); + task_group g(ts.get()); for (int i = 0; i < 100000; i++) { g.run([&](){ @@ -207,5 +215,3 @@ TEST(enumerable_thread_specific, test) { EXPECT_EQ(100000, sum); } - -#endif diff --git a/test/validation/validate_ball_and_stick.cpp b/test/validation/validate_ball_and_stick.cpp index afc769fe..95257ab7 100644 --- a/test/validation/validate_ball_and_stick.cpp +++ b/test/validation/validate_ball_and_stick.cpp @@ -64,7 +64,7 @@ void run_ncomp_convergence_test( convergence_test_runner<int> runner("ncomp", plabels, meta); runner.load_reference_data(ref_data_path); - distributed_context context; + execution_context context; proc_allocation nd; nd.num_gpus = (backend==backend_kind::gpu); diff --git a/test/validation/validate_kinetic.cpp b/test/validation/validate_kinetic.cpp index 3bd9f796..c2994eb3 100644 --- a/test/validation/validate_kinetic.cpp +++ b/test/validation/validate_kinetic.cpp @@ -43,7 +43,7 @@ void run_kinetic_dt( convergence_test_runner<float> runner("dt", plabels, meta); runner.load_reference_data(ref_file); - distributed_context context; + execution_context context; proc_allocation nd; nd.num_gpus = (backend==backend_kind::gpu); diff --git a/test/validation/validate_soma.cpp b/test/validation/validate_soma.cpp index 6c1b17bd..a3e6460f 100644 --- a/test/validation/validate_soma.cpp +++ b/test/validation/validate_soma.cpp @@ -29,7 +29,7 @@ void validate_soma(backend_kind backend) { rec.add_probe(0, 0, cell_probe_address{{0, 0.5}, cell_probe_address::membrane_voltage}); probe_label plabels[1] = {{"soma.mid", {0u, 0u}}}; - distributed_context context; + execution_context context; proc_allocation nd; nd.num_gpus = (backend==backend_kind::gpu); diff --git a/test/validation/validate_synapses.cpp b/test/validation/validate_synapses.cpp index 354dab94..57c926c2 100644 --- a/test/validation/validate_synapses.cpp +++ b/test/validation/validate_synapses.cpp @@ -61,7 +61,7 @@ void run_synapse_test( convergence_test_runner<int> runner("ncomp", plabels, meta); runner.load_reference_data(ref_data_path); - distributed_context context; + execution_context context; proc_allocation nd; nd.num_gpus = (backend==backend_kind::gpu); -- GitLab