diff --git a/arbor/threading/cthread.cpp b/arbor/threading/cthread.cpp index 759a8406402f79fde4afa19ccaecb1ccb3f695bf..ae7c9d560b856af275083cff78656acae1eb532d 100644 --- a/arbor/threading/cthread.cpp +++ b/arbor/threading/cthread.cpp @@ -9,134 +9,125 @@ #include "threading.hpp" using namespace arb::threading::impl; +using namespace arb::threading; using namespace arb; -// RAII owner for a task in flight -struct task_pool::run_task { - task_pool& pool; - lock& lck; +task notification_queue::try_pop() { task tsk; - - run_task(task_pool&, lock&); - ~run_task(); -}; - -// Own a task in flight -// lock should be passed locked, -// and will be unlocked after call -task_pool::run_task::run_task(task_pool& pool, lock& lck): - pool{pool}, - lck{lck}, - tsk{} -{ - std::swap(tsk, pool.tasks_.front()); - pool.tasks_.pop_front(); - - lck.unlock(); - pool.tasks_available_.notify_all(); + lock q_lock{q_mutex_, std::try_to_lock}; + if (q_lock && !q_tasks_.empty()) { + tsk = std::move(q_tasks_.front()); + q_tasks_.pop_front(); + } + return tsk; } -// Release task -// Call unlocked, returns unlocked -task_pool::run_task::~run_task() { - lck.lock(); - tsk.second->in_flight--; - - lck.unlock(); - pool.tasks_available_.notify_all(); +task notification_queue::pop() { + task tsk; + lock q_lock{q_mutex_}; + while (q_tasks_.empty() && !quit_) { + q_tasks_available_.wait(q_lock); + } + if (!q_tasks_.empty()) { + tsk = std::move(q_tasks_.front()); + q_tasks_.pop_front(); + } + return tsk; } -template<typename B> -void task_pool::run_tasks_loop(B finished) { - lock lck{tasks_mutex_, std::defer_lock}; - while (true) { - lck.lock(); +bool notification_queue::try_push(task& tsk) { + { + lock q_lock{q_mutex_, std::try_to_lock}; + if (!q_lock) return false; + q_tasks_.push_back(std::move(tsk)); + tsk = 0; + } + q_tasks_available_.notify_all(); + return true; +} - while (! quit_ && tasks_.empty() && ! finished()) { - tasks_available_.wait(lck); - } - if (quit_ || finished()) { - return; - } +void notification_queue::push(task&& tsk) { + { + lock q_lock{q_mutex_}; + q_tasks_.push_back(std::move(tsk)); + } + q_tasks_available_.notify_all(); +} - run_task run{*this, lck}; - run.tsk.first(); +void notification_queue::quit() { + { + lock q_lock{q_mutex_}; + quit_ = true; } + q_tasks_available_.notify_all(); } -// runs forever until quit is true -void task_pool::run_tasks_forever() { - run_tasks_loop([] {return false;}); +void task_system::run_tasks_loop(int i){ + while (true) { + task tsk; + for (unsigned n = 0; n != count_; n++) { + tsk = q_[(i + n) % count_].try_pop(); + if (tsk) break; + } + if (!tsk) tsk = q_[i].pop(); + if (!tsk) break; + tsk(); + } } -// run until out of tasks for a group -void task_pool::run_tasks_while(task_group* g) { - run_tasks_loop([=] {return ! g->in_flight;}); +void task_system::try_run_task() { + auto nthreads = get_num_threads(); + task tsk; + for (int n = 0; n != nthreads; n++) { + tsk = q_[n % nthreads].try_pop(); + if (tsk) { + tsk(); + break; + } + } } -// Create pool and threads -// new threads are nthreads-1 -task_pool::task_pool(std::size_t nthreads): - tasks_mutex_{}, - tasks_available_{}, - tasks_{}, - threads_{} -{ - assert(nthreads > 0); +task_system::task_system(int nthreads) : count_(nthreads), q_(nthreads) { + assert( nthreads > 0); // now for the main thread auto tid = std::this_thread::get_id(); thread_ids_[tid] = 0; - // and go from there - for (std::size_t i = 1; i < nthreads; i++) { - threads_.emplace_back([this]{run_tasks_forever();}); + for (unsigned i = 1; i < count_; i++) { + threads_.emplace_back([this, i]{run_tasks_loop(i);}); tid = threads_.back().get_id(); thread_ids_[tid] = i; } } -task_pool::~task_pool() { - { - lock lck{tasks_mutex_}; - quit_ = true; - } - tasks_available_.notify_all(); - - for (auto& thread: threads_) { - thread.join(); - } +task_system::~task_system() { + for (auto& e: q_) e.quit(); + for (auto& e: threads_) e.join(); } -// push a task into pool -void task_pool::run(const task& tsk) { - { - lock lck{tasks_mutex_}; - tasks_.push_back(tsk); - tsk.second->in_flight++; +void task_system::async(task tsk) { + auto i = index_++; + + for (unsigned n = 0; n != count_; n++) { + if (q_[(i + n) % count_].try_push(tsk)) return; } - tasks_available_.notify_all(); + q_[i % count_].push(std::move(tsk)); } -void task_pool::run(task&& tsk) { - { - lock lck{tasks_mutex_}; - tasks_.push_back(std::move(tsk)); - tsk.second->in_flight++; - } - tasks_available_.notify_all(); +int task_system::get_num_threads() { + return threads_.size() + 1; } -// call on main thread -// uses this thread to run tasks -// and waits until the entire task -// queue is cleared -void task_pool::wait(task_group* g) { - run_tasks_while(g); +std::size_t task_system::get_current_thread() { + std::thread::id tid = std::this_thread::get_id(); + return thread_ids_[tid]; } -task_pool& task_pool::get_global_task_pool() { +task_system& task_system::get_global_task_system() { auto num_threads = threading::num_threads(); - static task_pool global_task_pool(num_threads); - return global_task_pool; + static task_system global_task_system(num_threads); + return global_task_system; } + + diff --git a/arbor/threading/cthread_impl.hpp b/arbor/threading/cthread_impl.hpp index 8d0fe2199fe60abbdb43f9bf5d50468af4476e5d..ba4fe0097d87836eaa3832338061a5ce04ab73ec 100644 --- a/arbor/threading/cthread_impl.hpp +++ b/arbor/threading/cthread_impl.hpp @@ -1,5 +1,7 @@ #pragma once +#include <iostream> +#include <type_traits> #include <thread> #include <mutex> @@ -14,130 +16,123 @@ #include <utility> #include <unordered_map> #include <deque> +#include <atomic> +#include <type_traits> #include <cstdlib> namespace arb { namespace threading { -inline namespace cthread { // Forward declare task_group at bottom of this header class task_group; -namespace impl { - -using arb::threading::task_group; using std::mutex; using lock = std::unique_lock<mutex>; using std::condition_variable; +using task = std::function<void()>; + +namespace impl { +class notification_queue { +private: + // FIFO of pending tasks. + std::deque<task> q_tasks_; -using task = std::pair<std::function<void()>, task_group*>; -using task_queue = std::deque<task>; + // Lock and signal on task availability change this is the crucial bit. + mutex q_mutex_; + condition_variable q_tasks_available_; -using thread_list = std::vector<std::thread>; -using thread_map = std::unordered_map<std::thread::id, std::size_t>; + // Flag to handle exit from all threads. + bool quit_ = false; + +public: + // Pops a task from the task queue returns false when queue is empty. + task try_pop(); + task pop(); + + // Pushes a task into the task queue and increases task group counter. + void push(task&& tsk); // TODO: need to use value? + bool try_push(task& tsk); + + // Finish popping all waiting tasks on queue then stop trying to pop new tasks + void quit(); +}; +}// namespace impl -class task_pool { +class task_system { private: - // lock and signal on task availability change - // this is the crucial bit - mutex tasks_mutex_; - condition_variable tasks_available_; + unsigned count_; - // fifo of pending tasks - task_queue tasks_; + std::vector<std::thread> threads_; + + // queue of tasks + std::vector<impl::notification_queue> q_; - // thread resource - thread_list threads_; // threads -> index - thread_map thread_ids_; - // flag to handle exit from all threads - bool quit_ = false; + std::unordered_map<std::thread::id, std::size_t> thread_ids_; - // internals for taking tasks as a resource - // and running them (updating above) - // They get run by a thread in order to consume - // tasks - struct run_task; - // run tasks until a task_group tasks are done - // for wait - void run_tasks_while(task_group*); - // loop forever for secondary threads - // until quit is set - void run_tasks_forever(); - - // common code for the previous - // finished is a function/lambda - // that returns true when the infinite loop - // needs to be broken - template<typename B> - void run_tasks_loop(B finished ); + // total number of tasks pushed in all queues + std::atomic<unsigned> index_{0}; +public: // Create nthreads-1 new c std threads - // must be > 0 - // singled only created in static get_global_task_pool() - task_pool(std::size_t nthreads); + task_system(int nthreads); - // task_pool is a singleton - task_pool(const task_pool&) = delete; - task_pool& operator=(const task_pool&) = delete; + // task_system is a singleton. + task_system(const task_system&) = delete; + task_system& operator=(const task_system&) = delete; - // set quit and wait for secondary threads to end - ~task_pool(); + ~task_system(); -public: - // Like tbb calls: run queues a task, - // wait waits for all tasks in the group to be done - void run(const task&); - void run(task&&); - void wait(task_group*); - - // includes master thread - int get_num_threads() { - return threads_.size() + 1; - } + // Pushes tasks into notification queue. + void async(task tsk); - // get a stable integer for the current thread that - // is 0..nthreads - std::size_t get_current_thread() { - return thread_ids_[std::this_thread::get_id()]; - } + // Runs tasks until quit is true. + void run_tasks_loop(int i); + + // Request that the task_system attempts to find and run a _single_ task. + // Will return without executing a task if no tasks available. + void try_run_task(); - // singleton constructor - needed to order construction - // with other singletons (profiler) - static task_pool& get_global_task_pool(); + // Includes master thread. + int get_num_threads(); + + // Get a stable integer for the current thread that is [0, nthreads). + std::size_t get_current_thread(); + + // Singleton constructor - needed to order construction with other singletons. TODO + static task_system& get_global_task_system(); }; -} //impl /////////////////////////////////////////////////////////////////////// // types /////////////////////////////////////////////////////////////////////// template <typename T> class enumerable_thread_specific { - impl::task_pool& global_task_pool; + task_system& global_task_system; using storage_class = std::vector<T>; storage_class data; -public : +public: using iterator = typename storage_class::iterator; using const_iterator = typename storage_class::const_iterator; enumerable_thread_specific(): - global_task_pool{impl::task_pool::get_global_task_pool()}, - data{std::vector<T>(global_task_pool.get_num_threads())} + global_task_system{task_system::get_global_task_system()}, + data{std::vector<T>(global_task_system.get_num_threads())} {} enumerable_thread_specific(const T& init): - global_task_pool{impl::task_pool::get_global_task_pool()}, - data{std::vector<T>(global_task_pool.get_num_threads(), init)} + global_task_system{task_system::get_global_task_system()}, + data{std::vector<T>(global_task_system.get_num_threads(), init)} {} T& local() { - return data[global_task_pool.get_current_thread()]; + return data[global_task_system.get_current_thread()]; } const T& local() const { - return data[global_task_pool.get_current_thread()]; + return data[global_task_system.get_current_thread()]; } auto size() const { return data.size(); } @@ -152,45 +147,6 @@ public : const_iterator cend() const { return data.cend(); } }; -template <typename T> -class parallel_vector { - using value_type = T; - std::vector<value_type> data_; - -private: - // lock the parallel_vector to update - impl::mutex mutex; - - // call a function of type X f() in a lock - template<typename F> - decltype(auto) critical(F f) { - impl::lock lock{mutex}; - return f(); - } - -public: - parallel_vector() = default; - using iterator = typename std::vector<value_type>::iterator; - using const_iterator = typename std::vector<value_type>::const_iterator; - - iterator begin() { return data_.begin(); } - iterator end() { return data_.end(); } - - const_iterator begin() const { return data_.begin(); } - const_iterator end() const { return data_.end(); } - - const_iterator cbegin() const { return data_.cbegin(); } - const_iterator cend() const { return data_.cend(); } - - // only guarantees the state of the vector, but not the iterators - // unlike tbb push_back - void push_back (value_type&& val) { - critical([&] { - data_.push_back(std::move(val)); - }); - } -}; - inline std::string description() { return "CThread Pool"; } @@ -199,46 +155,69 @@ constexpr bool multithreaded() { return true; } class task_group { private: - std::size_t in_flight = 0; - impl::task_pool& global_task_pool; - // task pool manipulates in_flight - friend impl::task_pool; + std::atomic<std::size_t> in_flight_{0}; + task_system& task_system_; public: task_group(): - global_task_pool{impl::task_pool::get_global_task_pool()} + task_system_{task_system::get_global_task_system()} {} task_group(const task_group&) = delete; task_group& operator=(const task_group&) = delete; - // send function void f() to threads - template<typename F> - void run(const F& f) { - global_task_pool.run(impl::task{f, this}); - } + template <typename F> + class wrap { + F f; + std::atomic<std::size_t>& counter; + + public: + + // Construct from a compatible function and atomic counter + template <typename F2> + explicit wrap(F2&& other, std::atomic<std::size_t>& c): + f(std::forward<F2>(other)), + counter(c) + {} + + wrap(wrap&& other): + f(std::move(other.f)), + counter(other.counter) + {} + + // std::function is not guaranteed to not copy the contents on move construction + // But the class is safe because we don't call operator() more than once on the same wrapped task + wrap(const wrap& other): + f(other.f), + counter(other.counter) + {} + + void operator()() { + f(); + --counter; + } + }; - template<typename F> - void run(F&& f) { - global_task_pool.run(impl::task{std::move(f), this}); - } + template <typename F> + using callable = typename std::decay<F>::type; - // run function void f() and then wait on all threads in group - template<typename F> - void run_and_wait(const F& f) { - f(); - global_task_pool.wait(this); + template <typename F> + wrap<callable<F>> make_wrapped_function(F&& f, std::atomic<std::size_t>& c) { + return wrap<callable<F>>(std::forward<F>(f), c); } template<typename F> - void run_and_wait(F&& f) { - f(); - global_task_pool.wait(this); + void run(F&& f) { + ++in_flight_; + + task_system_.async(make_wrapped_function(std::forward<F>(f), in_flight_)); } // wait till all tasks in this group are done void wait() { - global_task_pool.wait(this); + while (in_flight_) { + task_system_.try_run_task(); + } } // Make sure that all tasks are done before clean up @@ -254,7 +233,7 @@ struct parallel_for { template <typename F> static void apply(int left, int right, F f) { task_group g; - for(int i = left; i < right; ++i) { + for (int i = left; i < right; ++i) { g.run([=] {f(i);}); } g.wait(); @@ -262,9 +241,8 @@ struct parallel_for { }; inline std::size_t thread_id() { - return impl::task_pool::get_global_task_pool().get_current_thread(); + return task_system::get_global_task_system().get_current_thread(); } -} // namespace cthread } // namespace threading } // namespace arb diff --git a/test/ubench/CMakeLists.txt b/test/ubench/CMakeLists.txt index 7757e386ae6a439fac065d82d6ec6a3aae95c888..a63aee730f4527a4275479037c436d35c81fa75d 100644 --- a/test/ubench/CMakeLists.txt +++ b/test/ubench/CMakeLists.txt @@ -8,6 +8,7 @@ set(bench_sources event_setup.cpp event_binning.cpp mech_vec.cpp + task_system.cpp ) if(ARB_WITH_CUDA) diff --git a/test/ubench/event_setup.cpp b/test/ubench/event_setup.cpp index f9677b4de9f2f346e536bd33588a139105d07eb6..eb1ba88ddccc1ce48f1bd35e29a992f003b7f065 100644 --- a/test/ubench/event_setup.cpp +++ b/test/ubench/event_setup.cpp @@ -13,6 +13,7 @@ #include <algorithm> #include <random> #include <vector> +#include <algorithm> #include <benchmark/benchmark.h> diff --git a/test/ubench/task_system.cpp b/test/ubench/task_system.cpp new file mode 100644 index 0000000000000000000000000000000000000000..17f5486b99a9b81d8545486cda6ee9f44c8a3b2a --- /dev/null +++ b/test/ubench/task_system.cpp @@ -0,0 +1,49 @@ +// Test performance of vectorization for mechanism implementations. +// +// Start with pas (passive dendrite) mechanism + +#include <chrono> +#include <iostream> +#include <thread> + +#include <arbor/threadinfo.hpp> + +#include <arbor/version.hpp> +#if defined(ARB_TBB_ENABLED) + #include "threading/tbb.hpp" +#elif defined(ARB_CTHREAD_ENABLED) + #include "threading/cthread.hpp" +#else + #include "threading/serial.hpp" +#endif + +#include <benchmark/benchmark.h> + +using namespace arb; + +void run(unsigned long us_per_task, unsigned tasks) { + auto duration = std::chrono::microseconds(us_per_task); + arb::threading::parallel_for::apply( + 0, tasks, + [&](unsigned i){std::this_thread::sleep_for(duration);}); +} + +void task_test(benchmark::State& state) { + const unsigned us_per_task = state.range(0); + const auto nthreads = arb::num_threads(); + const unsigned us_per_s = 1000000; + const unsigned num_tasks = nthreads*us_per_s/us_per_task; + + while (state.KeepRunning()) { + run(us_per_task, num_tasks); + } +} + +void us_per_task(benchmark::internal::Benchmark *b) { + for (auto ncomps: {100, 250, 500, 1000, 10000}) { + b->Args({ncomps}); + } +} + +BENCHMARK(task_test)->Apply(us_per_task); +BENCHMARK_MAIN(); diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 5c2d5e1859e970863d93bafd5716bf818098d281..b2e99d0df60112ee1c058f05f23503a66d8a75ab 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -72,6 +72,7 @@ set(unit_sources test_swcio.cpp test_synapses.cpp test_time_seq.cpp + test_thread.cpp test_tree.cpp test_transform.cpp test_uninitialized.cpp diff --git a/test/unit/test_thread.cpp b/test/unit/test_thread.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2e19b1b8c7293a4860bba271803183b1ef97c1c0 --- /dev/null +++ b/test/unit/test_thread.cpp @@ -0,0 +1,211 @@ +#include "../gtest.h" +#include "common.hpp" +#include <arbor/threadinfo.hpp> + +#include <iostream> +#include <ostream> +// (Pending abstraction of threading interface) +#include <arbor/version.hpp> + +#if defined(ARB_CTHREAD_ENABLED) +#include "threading/cthread.hpp" + +using namespace arb::threading::impl; +using namespace arb::threading; +using namespace arb; +namespace { + +std::atomic<int> nmove{0}; +std::atomic<int> ncopy{0}; + +void reset() { + nmove = 0; + ncopy = 0; +} + +struct ftor { + + ftor() {} + + ftor(ftor&& other) { + ++nmove; + } + + ftor(const ftor& other) { + ++ncopy; + } + + void operator()() const {} +}; + +struct ftor_wait { + + ftor_wait() {} + + void operator()() const { + auto duration = std::chrono::microseconds(500); + std::this_thread::sleep_for(duration); + } +}; + +struct ftor_parallel_wait { + + ftor_parallel_wait() {} + + void operator()() const { + auto nthreads = num_threads(); + auto duration = std::chrono::microseconds(500); + parallel_for::apply(0, nthreads, [=](int i){ std::this_thread::sleep_for(duration);}); + } +}; + +} + +TEST(task_system, test_copy) { + task_system &ts = task_system::get_global_task_system(); + + ftor f; + ts.async(f); + + // Copy into new ftor and move ftor into a task (std::function<void()>) + EXPECT_EQ(1, nmove); + EXPECT_EQ(1, ncopy); + reset(); +} + +TEST(task_system, test_move) { + task_system &s = task_system::get_global_task_system(); + + ftor f; + s.async(std::move(f)); + + // Move into new ftor and move ftor into a task (std::function<void()>) + EXPECT_LE(nmove, 2); + EXPECT_LE(ncopy, 1); + reset(); +} + +TEST(notification_queue, test_copy) { + notification_queue q; + + ftor f; + q.push(f); + + // Copy into new ftor and move ftor into a task (std::function<void()>) + EXPECT_EQ(1, nmove); + EXPECT_EQ(1, ncopy); + reset(); +} + +TEST(notification_queue, test_move) { + notification_queue q; + + ftor f; + + // Move into new ftor and move ftor into a task (std::function<void()>) + q.push(std::move(f)); + EXPECT_LE(nmove, 2); + EXPECT_LE(ncopy, 1); + reset(); +} + +TEST(task_group, test_copy) { + task_group g; + + ftor f; + g.run(f); + g.wait(); + + // Copy into "wrap" and move wrap into a task (std::function<void()>) + EXPECT_EQ(1, nmove); + EXPECT_EQ(1, ncopy); + reset(); +} + +TEST(task_group, test_move) { + task_group g; + + ftor f; + g.run(std::move(f)); + g.wait(); + + // Move into wrap and move wrap into a task (std::function<void()>) + EXPECT_LE(nmove, 2); + EXPECT_LE(ncopy, 1); + reset(); +} + +TEST(task_group, individual_tasks) { + // Simple check for deadlock + task_group g; + auto nthreads = num_threads(); + + ftor_wait f; + for (int i = 0; i < 32 * nthreads; i++) { + g.run(f); + } + g.wait(); +} + +TEST(task_group, parallel_for_sleep) { + // Simple check for deadlock for nested parallelism + task_group g; + auto nthreads = num_threads(); + + ftor_parallel_wait f; + for (int i = 0; i < nthreads; i++) { + g.run(f); + } + g.wait(); +} + +TEST(task_group, parallel_for) { + + for (int n = 0; n < 10000; n=!n?1:2*n) { + std::vector<int> v(n, -1); + parallel_for::apply(0, n, [&](int i) {v[i] = i;}); + for (int i = 0; i< n; i++) { + EXPECT_EQ(i, v[i]); + } + } +} + +TEST(task_group, nested_parallel_for) { + + for (int m = 1; m < 512; m*=2) { + for (int n = 0; n < 1000; n=!n?1:2*n) { + std::vector<std::vector<int>> v(n, std::vector<int>(m, -1)); + parallel_for::apply(0, n, [&](int i) { + auto &w = v[i]; + parallel_for::apply(0, m, [&](int j) { w[j] = i + j; }); + }); + for (int i = 0; i < n; i++) { + for (int j = 0; j < m; j++) { + EXPECT_EQ(i + j, v[i][j]); + } + } + } + } +} + +TEST(enumerable_thread_specific, test) { + enumerable_thread_specific<int> buffers(0); + task_group g; + + for (int i = 0; i < 100000; i++) { + g.run([&](){ + auto& buf = buffers.local(); + buf++; + }); + } + g.wait(); + + int sum = 0; + for (auto b: buffers) { + sum += b; + } + + EXPECT_EQ(100000, sum); +} + +#endif