From 219b782f21f55bf885e8c6a40d1490b7a43586e0 Mon Sep 17 00:00:00 2001
From: Ben Cumming <louncharf@gmail.com>
Date: Tue, 21 Mar 2017 11:53:08 +0100
Subject: [PATCH] More robust NMC_NUM_THREADS parsing (#198)

fixes #197

* Add functions that can find thread affinity and the number of available cores on linux systems via `sched_getaffinity`.
* On other systems they default to "unknown affinity" and return 0 to indicate that the number of cores is unknown.
* Set the default number of threads according to the new functions above if no environment variable explicitly setting the number of threads is set.
* Validate environment variable value against regex and range check; terminate if improper.
* Terminate if no number of threads is provided and the library is unable to determine a sensible number automatically.
---
 src/CMakeLists.txt         |  1 +
 src/threading/affinity.cpp | 60 ++++++++++++++++++++++++++++++
 src/threading/affinity.hpp | 29 +++++++++++++++
 src/threading/cthread.cpp  | 75 +++++++++++++++++++-------------------
 4 files changed, 127 insertions(+), 38 deletions(-)
 create mode 100644 src/threading/affinity.cpp
 create mode 100644 src/threading/affinity.hpp

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4d600486..7e6e6f16 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -5,6 +5,7 @@ set(BASE_SOURCES
     parameter_list.cpp
     profiling/profiler.cpp
     swcio.cpp
+    threading/affinity.cpp
     util/debug.cpp
     util/path.cpp
     util/unwind.cpp
diff --git a/src/threading/affinity.cpp b/src/threading/affinity.cpp
new file mode 100644
index 00000000..16e5d558
--- /dev/null
+++ b/src/threading/affinity.cpp
@@ -0,0 +1,60 @@
+#include <vector>
+
+#include <cstdlib>
+
+#ifdef __linux__
+
+    #ifndef _GNU_SOURCE
+        #define _GNU_SOURCE
+    #endif
+
+    extern "C" {
+        #include <sched.h>
+    }
+
+#endif
+
+namespace nest {
+namespace mc {
+namespace threading {
+
+#ifdef __linux__
+std::vector<int> get_affinity() {
+    cpu_set_t cpu_set_mask;
+
+    auto status = sched_getaffinity(0, sizeof(cpu_set_t), &cpu_set_mask);
+
+    if(status==-1) {
+        return {};
+    }
+
+    auto cpu_count = CPU_COUNT(&cpu_set_mask);
+
+    std::vector<int> cores;
+    for(auto i=0; i<CPU_SETSIZE && cores.size()<cpu_count; ++i) {
+        if(CPU_ISSET(i, &cpu_set_mask)) {
+            cores.push_back(i);
+        }
+    }
+
+    if(cores.size() != cpu_count) {
+        return {};
+    }
+
+    return cores;
+}
+#else
+
+// No support for non-linux systems
+std::vector<int> get_affinity() {
+    return {};
+}
+#endif
+
+unsigned count_available_cores() {
+    return get_affinity().size();
+}
+
+} // namespace threading
+} // namespace mc
+} // namespace nest
diff --git a/src/threading/affinity.hpp b/src/threading/affinity.hpp
new file mode 100644
index 00000000..caf5770f
--- /dev/null
+++ b/src/threading/affinity.hpp
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <vector>
+
+namespace nest {
+namespace mc {
+namespace threading {
+
+// The list of cores for which the calling thread has affinity.
+// If calling from the main thread at application start up, before
+// attempting to change thread affinity, may produce unreliable
+// results.
+//  - beware OpenMP thread pinning or custom job scheduler affinity
+//    flags that assign threads to specific cores.
+//
+// Returns an empty vector if unable to determine the number of
+// available cores.
+std::vector<int> get_affinity();
+
+// Attempts to find the number of cores available to the application
+// This is likely to give inaccurate results if the caller has already
+// been playing with thread affinity.
+//
+// Returns 0 if unable to determine the number of cores.
+unsigned count_available_cores();
+
+} // namespace threading
+} // namespace mc
+} // namespace nest
diff --git a/src/threading/cthread.cpp b/src/threading/cthread.cpp
index 0da76d5a..399dc635 100644
--- a/src/threading/cthread.cpp
+++ b/src/threading/cthread.cpp
@@ -1,9 +1,11 @@
 #include <cassert>
+#include <cstring>
 #include <exception>
 #include <iostream>
+#include <regex>
 
 #include "cthread.hpp"
-
+#include "affinity.hpp"
 
 using namespace nest::mc::threading::impl;
 
@@ -27,7 +29,7 @@ task_pool::run_task::run_task(task_pool& pool, lock& lck):
 {
     std::swap(tsk, pool.tasks_.front());
     pool.tasks_.pop_front();
-    
+
     lck.unlock();
     pool.tasks_available_.notify_all();
 }
@@ -37,7 +39,7 @@ task_pool::run_task::run_task(task_pool& pool, lock& lck):
 task_pool::run_task::~run_task() {
     lck.lock();
     tsk.second->in_flight--;
-    
+
     lck.unlock();
     pool.tasks_available_.notify_all();
 }
@@ -45,7 +47,7 @@ task_pool::run_task::~run_task() {
 template<typename B>
 void task_pool::run_tasks_loop(B finished) {
     lock lck{tasks_mutex_, std::defer_lock};
-    while (true) {  
+    while (true) {
         lck.lock();
 
         while (! quit_ && tasks_.empty() && ! finished()) {
@@ -57,7 +59,7 @@ void task_pool::run_tasks_loop(B finished) {
 
         run_task run{*this, lck};
         run.tsk.first();
-    }    
+    }
 }
 
 // runs forever until quit is true
@@ -79,11 +81,11 @@ task_pool::task_pool(std::size_t nthreads):
     threads_{}
 {
     assert(nthreads > 0);
-  
+
     // now for the main thread
     auto tid = std::this_thread::get_id();
     thread_ids_[tid] = 0;
-  
+
     // and go from there
     for (std::size_t i = 1; i < nthreads; i++) {
         threads_.emplace_back([this]{run_tasks_forever();});
@@ -98,7 +100,7 @@ task_pool::~task_pool() {
         quit_ = true;
     }
     tasks_available_.notify_all();
-    
+
     for (auto& thread: threads_) {
         thread.join();
     }
@@ -132,53 +134,50 @@ void task_pool::wait(task_group* g) {
 }
 
 [[noreturn]]
-static void terminate(const char *const msg) {
+static void terminate(std::string msg) {
     std::cerr << "NMC_NUM_THREADS_ERROR: " << msg << std::endl;
     std::terminate();
 }
 
 // should check string, throw exception on missing or badly formed
 static size_t global_get_num_threads() {
-    const char* nthreads_str;
+    const char* str;
+
     // select variable to use:
     //   If NMC_NUM_THREADS_VAR is set, use $NMC_NUM_THREADS_VAR
     //   else if NMC_NUM_THREAD set, use it
     //   else if OMP_NUM_THREADS set, use it
     if (auto nthreads_var_name = std::getenv("NMC_NUM_THREADS_VAR")) {
-        nthreads_str = std::getenv(nthreads_var_name);
+        str = std::getenv(nthreads_var_name);
     }
-    else if (! (nthreads_str = std::getenv("NMC_NUM_THREADS"))) {
-        nthreads_str = std::getenv("OMP_NUM_THREADS");
+    else if (! (str = std::getenv("NMC_NUM_THREADS"))) {
+        str = std::getenv("OMP_NUM_THREADS");
     }
 
-    // If the selected var is unset,
-    //   or no var is set,
-    //   error
-    if (! nthreads_str) {
-        terminate("No environmental var defined");
-     }
-
-    // only composed of spaces*digits*space*
-    auto nthreads_str_end{nthreads_str};
-    while (std::isspace(*nthreads_str_end)) {
-        ++nthreads_str_end;
-    }
-    while (std::isdigit(*nthreads_str_end)) {
-        ++nthreads_str_end;
-    }
-    while (std::isspace(*nthreads_str_end)) {
-        ++nthreads_str_end;
-    }
-    if (*nthreads_str_end) {
-        terminate("Num threads is not a single integer");
+    // If the selected var is unset set the number of threads to
+    // the hint given by the standard library
+    if (!str) {
+        unsigned nthreads = nest::mc::threading::count_available_cores();
+        if (nthreads==0u) {
+            terminate(
+                "The number of threads was not set by the user, and I am unable "
+                "to determine a sane default number of threads on this system. "
+                "Use the NMC_NUM_THREADS environment variable to explicitly "
+                "set the number of threads.");
+        }
+        return nthreads;
     }
 
-    // and it's got a single non-zero value
-    auto nthreads{std::atoi(nthreads_str)};
-    if (! nthreads) {
-        terminate("Num threads is not a non-zero number");
+    auto nthreads = std::strtoul(str, nullptr, 10);
+
+    // check that the environment variable string describes a non-negative integer
+    if (nthreads==0 || errno==ERANGE ||
+        !std::regex_match(str, std::regex("\\s*\\d*[1-9]\\d*\\s*")))
+    {
+        terminate("The requested number of threads \""+std::string(str)
+            +"\" is not a reasonable positive integer");
     }
-  
+
     return nthreads;
 }
 
-- 
GitLab