diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4d600486692198e89031610e014ebbef34e1bd2b..7e6e6f160ef3f1fcc552fc54c2643636327727f1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,7 @@ set(BASE_SOURCES parameter_list.cpp profiling/profiler.cpp swcio.cpp + threading/affinity.cpp util/debug.cpp util/path.cpp util/unwind.cpp diff --git a/src/threading/affinity.cpp b/src/threading/affinity.cpp new file mode 100644 index 0000000000000000000000000000000000000000..16e5d5589c28d5428deaf72adc968baf9206ef9f --- /dev/null +++ b/src/threading/affinity.cpp @@ -0,0 +1,60 @@ +#include <vector> + +#include <cstdlib> + +#ifdef __linux__ + + #ifndef _GNU_SOURCE + #define _GNU_SOURCE + #endif + + extern "C" { + #include <sched.h> + } + +#endif + +namespace nest { +namespace mc { +namespace threading { + +#ifdef __linux__ +std::vector<int> get_affinity() { + cpu_set_t cpu_set_mask; + + auto status = sched_getaffinity(0, sizeof(cpu_set_t), &cpu_set_mask); + + if(status==-1) { + return {}; + } + + auto cpu_count = CPU_COUNT(&cpu_set_mask); + + std::vector<int> cores; + for(auto i=0; i<CPU_SETSIZE && cores.size()<cpu_count; ++i) { + if(CPU_ISSET(i, &cpu_set_mask)) { + cores.push_back(i); + } + } + + if(cores.size() != cpu_count) { + return {}; + } + + return cores; +} +#else + +// No support for non-linux systems +std::vector<int> get_affinity() { + return {}; +} +#endif + +unsigned count_available_cores() { + return get_affinity().size(); +} + +} // namespace threading +} // namespace mc +} // namespace nest diff --git a/src/threading/affinity.hpp b/src/threading/affinity.hpp new file mode 100644 index 0000000000000000000000000000000000000000..caf5770fc7f75fece0a0e9ccc5aa62e451da40e9 --- /dev/null +++ b/src/threading/affinity.hpp @@ -0,0 +1,29 @@ +#pragma once + +#include <vector> + +namespace nest { +namespace mc { +namespace threading { + +// The list of cores for which the calling thread has affinity. +// If calling from the main thread at application start up, before +// attempting to change thread affinity, may produce unreliable +// results. +// - beware OpenMP thread pinning or custom job scheduler affinity +// flags that assign threads to specific cores. +// +// Returns an empty vector if unable to determine the number of +// available cores. +std::vector<int> get_affinity(); + +// Attempts to find the number of cores available to the application +// This is likely to give inaccurate results if the caller has already +// been playing with thread affinity. +// +// Returns 0 if unable to determine the number of cores. +unsigned count_available_cores(); + +} // namespace threading +} // namespace mc +} // namespace nest diff --git a/src/threading/cthread.cpp b/src/threading/cthread.cpp index 0da76d5a5ced02de78b29d3bee75d47aa7c251af..399dc635eb81a8f5b71e2d9e13c6de8b0d1c9787 100644 --- a/src/threading/cthread.cpp +++ b/src/threading/cthread.cpp @@ -1,9 +1,11 @@ #include <cassert> +#include <cstring> #include <exception> #include <iostream> +#include <regex> #include "cthread.hpp" - +#include "affinity.hpp" using namespace nest::mc::threading::impl; @@ -27,7 +29,7 @@ task_pool::run_task::run_task(task_pool& pool, lock& lck): { std::swap(tsk, pool.tasks_.front()); pool.tasks_.pop_front(); - + lck.unlock(); pool.tasks_available_.notify_all(); } @@ -37,7 +39,7 @@ task_pool::run_task::run_task(task_pool& pool, lock& lck): task_pool::run_task::~run_task() { lck.lock(); tsk.second->in_flight--; - + lck.unlock(); pool.tasks_available_.notify_all(); } @@ -45,7 +47,7 @@ task_pool::run_task::~run_task() { template<typename B> void task_pool::run_tasks_loop(B finished) { lock lck{tasks_mutex_, std::defer_lock}; - while (true) { + while (true) { lck.lock(); while (! quit_ && tasks_.empty() && ! finished()) { @@ -57,7 +59,7 @@ void task_pool::run_tasks_loop(B finished) { run_task run{*this, lck}; run.tsk.first(); - } + } } // runs forever until quit is true @@ -79,11 +81,11 @@ task_pool::task_pool(std::size_t nthreads): threads_{} { assert(nthreads > 0); - + // now for the main thread auto tid = std::this_thread::get_id(); thread_ids_[tid] = 0; - + // and go from there for (std::size_t i = 1; i < nthreads; i++) { threads_.emplace_back([this]{run_tasks_forever();}); @@ -98,7 +100,7 @@ task_pool::~task_pool() { quit_ = true; } tasks_available_.notify_all(); - + for (auto& thread: threads_) { thread.join(); } @@ -132,53 +134,50 @@ void task_pool::wait(task_group* g) { } [[noreturn]] -static void terminate(const char *const msg) { +static void terminate(std::string msg) { std::cerr << "NMC_NUM_THREADS_ERROR: " << msg << std::endl; std::terminate(); } // should check string, throw exception on missing or badly formed static size_t global_get_num_threads() { - const char* nthreads_str; + const char* str; + // select variable to use: // If NMC_NUM_THREADS_VAR is set, use $NMC_NUM_THREADS_VAR // else if NMC_NUM_THREAD set, use it // else if OMP_NUM_THREADS set, use it if (auto nthreads_var_name = std::getenv("NMC_NUM_THREADS_VAR")) { - nthreads_str = std::getenv(nthreads_var_name); + str = std::getenv(nthreads_var_name); } - else if (! (nthreads_str = std::getenv("NMC_NUM_THREADS"))) { - nthreads_str = std::getenv("OMP_NUM_THREADS"); + else if (! (str = std::getenv("NMC_NUM_THREADS"))) { + str = std::getenv("OMP_NUM_THREADS"); } - // If the selected var is unset, - // or no var is set, - // error - if (! nthreads_str) { - terminate("No environmental var defined"); - } - - // only composed of spaces*digits*space* - auto nthreads_str_end{nthreads_str}; - while (std::isspace(*nthreads_str_end)) { - ++nthreads_str_end; - } - while (std::isdigit(*nthreads_str_end)) { - ++nthreads_str_end; - } - while (std::isspace(*nthreads_str_end)) { - ++nthreads_str_end; - } - if (*nthreads_str_end) { - terminate("Num threads is not a single integer"); + // If the selected var is unset set the number of threads to + // the hint given by the standard library + if (!str) { + unsigned nthreads = nest::mc::threading::count_available_cores(); + if (nthreads==0u) { + terminate( + "The number of threads was not set by the user, and I am unable " + "to determine a sane default number of threads on this system. " + "Use the NMC_NUM_THREADS environment variable to explicitly " + "set the number of threads."); + } + return nthreads; } - // and it's got a single non-zero value - auto nthreads{std::atoi(nthreads_str)}; - if (! nthreads) { - terminate("Num threads is not a non-zero number"); + auto nthreads = std::strtoul(str, nullptr, 10); + + // check that the environment variable string describes a non-negative integer + if (nthreads==0 || errno==ERANGE || + !std::regex_match(str, std::regex("\\s*\\d*[1-9]\\d*\\s*"))) + { + terminate("The requested number of threads \""+std::string(str) + +"\" is not a reasonable positive integer"); } - + return nthreads; }