diff --git a/arbor/threading/threading.cpp b/arbor/threading/threading.cpp
index ae251d6e89e5d38c25f7176b1f7ed62a3efc6cc6..552fda90a9d3260b29e14dd93e2f69c556ee4d1a 100644
--- a/arbor/threading/threading.cpp
+++ b/arbor/threading/threading.cpp
@@ -1,49 +1,64 @@
 #include <atomic>
 
-#include "threading.hpp"
+#include <arbor/assert.hpp>
+#include <arbor/util/scope_exit.hpp>
+
+#include "threading/threading.hpp"
 
 using namespace arb::threading::impl;
 using namespace arb::threading;
 using namespace arb;
 
-task notification_queue::try_pop() {
-    task tsk;
+priority_task notification_queue::try_pop(int priority) {
+    arb_assert(priority < (int)q_tasks_.size());
     lock q_lock{q_mutex_, std::try_to_lock};
-    if (q_lock && !q_tasks_.empty()) {
-        tsk = std::move(q_tasks_.front());
-        q_tasks_.pop_front();
+
+    if (q_lock) {
+        auto& q = q_tasks_.at(priority);
+        if (!q.empty()) {
+            priority_task ptsk(std::move(q.front()), priority);
+            q.pop_front();
+            return ptsk;
+        }
     }
-    return tsk;
+
+    return {};
 }
 
-task notification_queue::pop() {
-    task tsk;
+priority_task notification_queue::pop() {
     lock q_lock{q_mutex_};
-    while (q_tasks_.empty() && !quit_) {
+
+    while (empty() && !quit_) {
         q_tasks_available_.wait(q_lock);
     }
-    if (!q_tasks_.empty()) {
-        tsk = std::move(q_tasks_.front());
-        q_tasks_.pop_front();
+    for (int pri = n_priority-1; pri>=0; --pri) {
+        auto& q = q_tasks_.at(pri);
+        if (!q.empty()) {
+            priority_task ptsk{std::move(q.front()), pri};
+            q.pop_front();
+            return ptsk;
+        }
     }
-    return tsk;
+    return {};
 }
 
-bool notification_queue::try_push(task& tsk) {
+bool notification_queue::try_push(priority_task& ptsk) {
+    arb_assert(ptsk.priority < (int)q_tasks_.size());
     {
         lock q_lock{q_mutex_, std::try_to_lock};
         if (!q_lock) return false;
-        q_tasks_.push_back(std::move(tsk));
-        tsk = 0;
+
+        q_tasks_.at(ptsk.priority).push_front(ptsk.release());
     }
     q_tasks_available_.notify_all();
     return true;
 }
 
-void notification_queue::push(task&& tsk) {
+void notification_queue::push(priority_task&& ptsk) {
+    arb_assert(ptsk.priority < (int)q_tasks_.size());
     {
         lock q_lock{q_mutex_};
-        q_tasks_.push_back(std::move(tsk));
+        q_tasks_.at(ptsk.priority).push_front(ptsk.release());
     }
     q_tasks_available_.notify_all();
 }
@@ -56,31 +71,63 @@ void notification_queue::quit() {
     q_tasks_available_.notify_all();
 }
 
-void task_system::run_tasks_loop(int i){
+bool notification_queue::empty() {
+    for(const auto& q: q_tasks_) {
+        if (!q.empty()) return false;
+    }
+    return true;
+}
+
+void task_system::run(priority_task ptsk) {
+    arb_assert(ptsk);
+    auto guard = util::on_scope_exit([pri = current_task_priority_] { current_task_priority_ = pri; });
+
+    current_task_priority_ = ptsk.priority;
+    ptsk.run();
+}
+
+void task_system::run_tasks_loop(int i) {
+    auto guard = util::on_scope_exit([] { current_task_queue_ = -1; });
+    current_task_queue_ = i;
+
     while (true) {
-        task tsk;
-        for (unsigned n = 0; n != count_; n++) {
-            tsk = q_[(i + n) % count_].try_pop();
-            if (tsk) break;
+        priority_task ptsk;
+        // Loop over the levels of priority starting from highest to lowest
+        for (int pri = n_priority-1; pri>=0; --pri) {
+            // Loop over the threads trying to pop a task of the requested priority.
+            for (unsigned n = 0; n<count_; ++n) {
+                ptsk = q_[(i + n) % count_].try_pop(pri);
+                if (ptsk) break;
+            }
+            if (ptsk) break;
         }
-        if (!tsk) tsk = q_[i].pop();
-        if (!tsk) break;
-        tsk();
+        // If a task can not be acquired, force a pop from the queue. This is a blocking action.
+        if (!ptsk) ptsk = q_[i].pop();
+        if (!ptsk) break;
+
+        run(std::move(ptsk));
     }
 }
 
-void task_system::try_run_task() {
-    auto nthreads = get_num_threads();
-    task tsk;
-    for (int n = 0; n != nthreads; n++) {
-        tsk = q_[n % nthreads].try_pop();
-        if (tsk) {
-            tsk();
-            break;
+void task_system::try_run_task(int lowest_priority) {
+    unsigned i = current_task_queue_+1==0? 0: current_task_queue_;
+    arb_assert(i>=0 && i<count_);
+
+    // Loop over the levels of priority starting from highest to lowest_priority
+    for (int pri = n_priority-1; pri>=lowest_priority; --pri) {
+        // Loop over the threads trying to pop a task of the requested priority.
+        for (unsigned n = 0; n != count_; n++) {
+            if (auto ptsk = q_[(i + n) % count_].try_pop(pri)) {
+                run(std::move(ptsk));
+                return;
+            }
         }
     }
 }
 
+thread_local int task_system::current_task_priority_ = -1;
+thread_local unsigned task_system::current_task_queue_ = -1;
+
 // Default construct with one thread.
 task_system::task_system(): task_system(1) {}
 
@@ -88,9 +135,14 @@ task_system::task_system(int nthreads): count_(nthreads), q_(nthreads) {
     if (nthreads <= 0)
         throw std::runtime_error("Non-positive number of threads in thread pool");
 
+    for (unsigned p = 0; p<n_priority; ++p) {
+        index_[p] = 0;
+    }
+
     // Main thread
     auto tid = std::this_thread::get_id();
     thread_ids_[tid] = 0;
+    current_task_queue_ = 0;
 
     for (unsigned i = 1; i < count_; i++) {
         threads_.emplace_back([this, i]{run_tasks_loop(i);});
@@ -100,21 +152,25 @@ task_system::task_system(int nthreads): count_(nthreads), q_(nthreads) {
 }
 
 task_system::~task_system() {
+    current_task_priority_ = -1;
+    current_task_queue_ = -1;
     for (auto& e: q_) e.quit();
     for (auto& e: threads_) e.join();
 }
 
-void task_system::async(task tsk) {
-    auto i = index_++;
-
-    for (unsigned n = 0; n != count_; n++) {
-        if (q_[(i + n) % count_].try_push(tsk)) return;
+void task_system::async(priority_task ptsk) {
+    if (ptsk.priority>=n_priority) {
+        run(std::move(ptsk));
     }
-    q_[i % count_].push(std::move(tsk));
-}
+    else {
+        arb_assert(ptsk.priority < (int)index_.size());
+        auto i = index_[ptsk.priority]++;
 
-int task_system::get_num_threads() const {
-    return threads_.size() + 1;
+        for (unsigned n = 0; n != count_; n++) {
+            if (q_[(i + n) % count_].try_push(ptsk)) return;
+        }
+        q_[i % count_].push(std::move(ptsk));
+    }
 }
 
 std::unordered_map<std::thread::id, std::size_t> task_system::get_thread_ids() const {
diff --git a/arbor/threading/threading.hpp b/arbor/threading/threading.hpp
index 9a3d94c5b25a44fce9324bc79a677cf6cc1592bb..ad2b1d74a6d5e395319656e7e466d4e45ac1ab7f 100644
--- a/arbor/threading/threading.hpp
+++ b/arbor/threading/threading.hpp
@@ -17,79 +17,200 @@
 namespace arb {
 namespace threading {
 
-// Forward declare task_group at bottom of this header
-class task_group;
-
 using std::mutex;
 using lock = std::unique_lock<mutex>;
 using std::condition_variable;
 using task = std::function<void()>;
 
+// Tasks with priority higher than max_async_task_priority will be run synchronously.
+constexpr int max_async_task_priority = 1;
+
+// Wrap task and priority; provide move/release/reset operations and reset on run()
+// to help ensure no wrapped task is run twice.
+struct priority_task {
+    task t;
+    int priority = -1;
+
+    priority_task() = default;
+    priority_task(task&& t, int priority): t(std::move(t)), priority(priority) {}
+
+    priority_task(priority_task&& other) noexcept {
+        std::swap(t, other.t);
+        priority = other.priority;
+    }
+
+    priority_task& operator=(priority_task&& other) noexcept {
+        reset();
+        std::swap(t, other.t);
+        priority = other.priority;
+        return *this;
+    }
+
+    priority_task(const priority_task&) = delete;
+    priority_task& operator=(const priority_task&) = delete;
+
+    explicit operator bool() const noexcept { return static_cast<bool>(t); }
+
+    void run() {
+        release()();
+    }
+
+    task release() {
+        task u = std::move(t);
+        reset();
+        return u;
+    }
+
+    void reset() noexcept {
+        t = nullptr;
+    }
+};
+
 namespace impl {
+
 class notification_queue {
+    // Number of priority levels in notification queues.
+    static constexpr int n_priority = max_async_task_priority+1;
+
+public:
+    // Tries to acquire the lock to get a task of a requested priority.
+    // If unsuccessful returns an empty task. If the lock is acquired
+    // successfully, and the deque containing the tasks of the requested
+    // priority is not empty; pops a task from that deque and returns it.
+    // Otherwise returns an empty task.
+    priority_task try_pop(int priority);
+
+    // Acquires the lock and pops a task from the highest priority deque
+    // that is not empty. If all deques are empty, it waits for a task to
+    // be enqueued. If after a task is enqueued, it still can't acquire it
+    // (because it was popped by another thread), returns an empty task.
+    // If quit_ is set and the deques are all empty, returns an empty task.
+    priority_task pop();
+
+    // Acquires the lock and pushes the task into the deque containing
+    // tasks of the same priority, then notifies the condition variable to
+    // awaken waiting threads.
+    void push(priority_task&&);
+
+    // Tries to acquire the lock: if successful, pushes the task onto the
+    // deque containing tasks of the same priority, notifies the condition
+    // variable to awaken waiting threads and returns true. If unsuccessful
+    // returns false.
+    bool try_push(priority_task&);
+
+    // Finish popping all waiting tasks on queue then stop trying to pop
+    // new tasks
+    void quit();
+
+    // Check whether the deques are all empty.
+    bool empty();
+
 private:
-    // FIFO of pending tasks.
-    std::deque<task> q_tasks_;
+    // deques of pending tasks. Each deque contains tasks of a single priority.
+    // q_tasks_[i+1] has higher priority than q_tasks_[i]
+    std::array<std::deque<task>, n_priority> q_tasks_;
 
-    // Lock and signal on task availability change this is the crucial bit.
+    // Lock and signal on task availability change. This is the crucial bit.
     mutex q_mutex_;
     condition_variable q_tasks_available_;
 
     // Flag to handle exit from all threads.
     bool quit_ = false;
-
-public:
-    // Pops a task from the task queue returns false when queue is empty.
-    task try_pop();
-    task pop();
-
-    // Pushes a task into the task queue and increases task group counter.
-    void push(task&& tsk); // TODO: need to use value?
-    bool try_push(task& tsk);
-
-    // Finish popping all waiting tasks on queue then stop trying to pop new tasks
-    void quit();
 };
+
 }// namespace impl
 
 class task_system {
 private:
+    // Number of notification queues.
     unsigned count_;
 
+    // Worker threads.
     std::vector<std::thread> threads_;
 
-    // queue of tasks
+    // Local thread storage: used to encode the priority of the task
+    // currently executed by the thread.
+    // It is initialized to -1 and reset to -1 in the destructor,
+    // where a value of -1 => not running a task system task.
+    static thread_local int current_task_priority_;
+
+    // Queue index for the running thread, if any,
+    // A value of -1 indicates that the executing thread is not one in
+    // threads_.
+    static thread_local unsigned current_task_queue_;
+
+    // Number of priority levels in notification queues.
+    static constexpr int n_priority = max_async_task_priority+1;
+
+    // Notification queues containing n_priority deques representing
+    // different priority levels.
     std::vector<impl::notification_queue> q_;
 
-    // threads -> index
+    // Map from thread id to index in the vector of threads.
     std::unordered_map<std::thread::id, std::size_t> thread_ids_;
 
-    // total number of tasks pushed in all queues
-    std::atomic<unsigned> index_{0};
+    // Total number of tasks pushed in each priority level.
+    // Used to index which notification queue to enqueue tasks on to,
+    // to balance the workload among the queues.
+    std::array<std::atomic<unsigned>, n_priority> index_;
 
 public:
+    // Create zero new threads. Only worker thread is the main thread.
     task_system();
-    // Create nthreads-1 new c std threads
+
+    // Create nthreads-1 new std::threads running run_tasks_loop(tid)
     task_system(int nthreads);
 
-    // task_system is a singleton.
     task_system(const task_system&) = delete;
     task_system& operator=(const task_system&) = delete;
 
+    // Quits the notification queues. Joins the threads. Resets thread_depth_.
+    // Won't wait for the existing tasks in the notification queues to be executed.
     ~task_system();
 
-    // Pushes tasks into notification queue.
-    void async(task tsk);
-
-    // Runs tasks until quit is true.
+    // Pushes tasks into a notification queue, on a deque of the requested priority.
+    // Will first attempt to push on all the notification queues, round-robin, starting
+    // with the notification queue at index_[priority]. If unsuccessful, forces a push
+    // onto the notification queue at index_[priority].
+
+    // Public interface: run task asynchronously if priority <= max_async_task_priority,
+    // else equivalent to task_system::run(priority_task) below.
+    void async(priority_task ptsk);
+
+    // Public interface: run task synchronously with current task priority set.
+    void run(priority_task ptsk);
+
+    // Convenience interfaces with priority parameter:
+    void async(task t, int priority) { async({std::move(t), priority}); }
+    void run(task t, int priority) { run({std::move(t), priority}); }
+
+    // The main function that all worker std::threads execute.
+    // It will try to acquire a task of the highest possible of priority from all
+    // of the notification queues. If unsuccessful it will force pop any task from
+    // the thread's personal queue, trying again from highest to lowest priority.
+    // Note on stack overflow possibility: the force pop can seem like it could cause
+    // an issue in cases when the personal queue only has low priority tasks that
+    // spawn higher priority tasks that end up on other queues. In that case the thread
+    // can end up in a situation where it keeps executing low priority tasks as it
+    // waits for higher priority tasks to finish, causing a stack overflow. The key
+    // point here is that while the thread is waiting for other tasks to finish, it
+    // is not executing the run_tasks_loop, but the task_group::wait() loop which
+    // doesn't use pop but always try_pop.
+    // `i` is the thread idx, used to select the thread's personal notification queue.
     void run_tasks_loop(int i);
 
-    // Request that the task_system attempts to find and run a _single_ task.
-    // Will return without executing a task if no tasks available.
-    void try_run_task();
+    // Public interface: try to dequeue and run a single task with at least the
+    // requested priority level. Will return without executing a task if no tasks
+    // are available or if the lock can't be acquired.
+    //
+    // Will start with queue corresponding to calling thread, if one exists.
+    void try_run_task(int lowest_priority);
+
+    // Number of threads in pool, including master thread.
+    // Equivalently, number of notification queues.
+    int get_num_threads() const { return (int)count_; }
 
-    // Includes master thread.
-    int get_num_threads() const;
+    static int get_task_priority() { return current_task_priority_; }
 
     // Returns the thread_id map
     std::unordered_map<std::thread::id, std::size_t> get_thread_ids() const;
@@ -97,6 +218,18 @@ public:
 
 class task_group {
 private:
+    // For tracking exceptions raised inside the task_system.
+    // If multiple tasks raise exceptions, any exception can be
+    // saved and returned. Once an exception has been raised, the
+    // rest of the tasks don't need to be executed, but if a few are
+    // executed anyway, that's okay.
+    // For the reset to work correctly using the relaxed memory order,
+    // it is necessary that both task_group::run and task_group::wait
+    // are synchronization points, which they are because they require
+    // mutex acquisition. The reason behind this requirement is that
+    // exception_state::reset is called at the end of task_group::wait,
+    // and we need it to actually reset exception_state::error_ before
+    // we start running any new tasks in the group.
     struct exception_state {
         std::atomic<bool> error_{false};
         std::exception_ptr exception_;
@@ -123,10 +256,10 @@ private:
         }
     };
 
+    // Number of tasks that are queued but not yet executed.
     std::atomic<std::size_t> in_flight_{0};
 
-    // Set by run(), cleared by wait(). Used to check task completion status
-    // in destructor.
+    // Set by run(), cleared by wait(). Used to check task completion status in destructor.
     bool running_ = false;
 
     // We use a raw pointer here instead of a shared_ptr to avoid a race condition
@@ -149,7 +282,7 @@ public:
         exception_state& exception_status_;
 
     public:
-        // Construct from a compatible function and atomic counter
+        // Construct from a compatible function, atomic counter, and exception_state.
         template <typename F2>
         explicit wrap(F2&& other, std::atomic<std::size_t>& c, exception_state& ex):
                 f_(std::forward<F2>(other)),
@@ -171,8 +304,10 @@ public:
                 exception_status_(other.exception_status_)
         {}
 
+        // This is where tasks of the task_group are actually executed.
         void operator()() {
             if (!exception_status_) {
+                // Execute the task. Save exceptions if they occur.
                 try {
                     f_();
                 }
@@ -180,6 +315,7 @@ public:
                     exception_status_.set(std::current_exception());
                 }
             }
+            // Decrement the atomic counter of the tasks in the task_group;
             --counter_;
         }
     };
@@ -192,17 +328,38 @@ public:
         return wrap<callable<F>>(std::forward<F>(f), c, ex);
     }
 
+    // Adds new tasks to be executed in the task_group.
+    // Use priority one higher than that of the task in the currently
+    // executing thread, if any, so that tasks in nested task groups
+    // are completed before any peers of the parent task. Returns this
+    // priority.
+    template<typename F>
+    int run(F&& f) {
+        int priority = task_system::get_task_priority()+1;
+        run(std::forward<F>(f), priority);
+        return priority;
+    }
+
+    // Adds a new task with a given priority to be executed.
     template<typename F>
-    void run(F&& f) {
+    void run(F&& f, int priority) {
         running_ = true;
         ++in_flight_;
-        task_system_->async(make_wrapped_function(std::forward<F>(f), in_flight_, exception_status_));
+        task_system_->async(priority_task{make_wrapped_function(std::forward<F>(f), in_flight_, exception_status_), priority});
     }
 
     // Wait till all tasks in this group are done.
+    // While waiting the thread will participate in executing the tasks.
+    // It's necessary that the waiting thread participate in execution:
+    // otherwise, due to nested parallelism, all the threads could become
+    // stuck waiting forever, while no new tasks get executed.
+    // To shorten waiting time, and reduce the chances of stack overflow,
+    // the waiting thread can only execute tasks with a higher priority
+    // than the task it is currently running.
     void wait() {
+        int lowest_priority = task_system::get_task_priority()+1;
         while (in_flight_) {
-            task_system_->try_run_task();
+            task_system_->try_run_task(lowest_priority);
         }
         running_ = false;
 
@@ -220,14 +377,26 @@ public:
 // algorithms
 ///////////////////////////////////////////////////////////////////////
 struct parallel_for {
+    // Creates a task group, enqueues tasks and waits for their completion.
+    // If a batching size if not specified, a default batch size of 1 is used.
     template <typename F>
-    static void apply(int left, int right, task_system* ts, F f) {
+    static void apply(int left, int right, int batch_size, task_system* ts, F f) {
         task_group g(ts);
-        for (int i = left; i < right; ++i) {
-          g.run([=] {f(i);});
+        for (int i = left; i < right; i += batch_size) {
+            g.run([=] {
+                int r = i + batch_size < right ? i + batch_size : right;
+                for (int j = i; j < r; ++j) {
+                    f(j);
+                }
+            });
         }
         g.wait();
     }
+
+    template <typename F>
+    static void apply(int left, int right, task_system* ts, F f) {
+        apply(left, right, 1, ts, std::move(f));
+    }
 };
 } // namespace threading
 
diff --git a/test/ubench/task_system.cpp b/test/ubench/task_system.cpp
index 873b16ea96b1fb87972712ea5844e5850035d650..dac7a094fd02f9e1de16e18a9ea235d0748dcf17 100644
--- a/test/ubench/task_system.cpp
+++ b/test/ubench/task_system.cpp
@@ -21,12 +21,31 @@ void run(unsigned long us_per_task, unsigned tasks, threading::task_system* ts)
             [&](unsigned i){std::this_thread::sleep_for(duration);});
 }
 
+void run_nested(unsigned long us_per_task, unsigned tasks, threading::task_system* ts) {
+    auto duration = std::chrono::microseconds(us_per_task);
+    arb::threading::parallel_for::apply(0, tasks, ts, [&](unsigned i) {
+       arb::threading::parallel_for::apply(0, 1, ts, [&](unsigned i) {
+          std::this_thread::sleep_for(duration);});});
+}
+
+void task_test_nested(benchmark::State& state) {
+    const unsigned us_per_task = state.range(0);
+    arb::threading::task_system ts;
+    const auto nthreads = ts.get_num_threads();
+    const unsigned total_us = 1000000;
+    const unsigned num_tasks = nthreads*total_us/us_per_task;
+
+    while (state.KeepRunning()) {
+        run_nested(us_per_task, num_tasks, &ts);
+    }
+}
+
 void task_test(benchmark::State& state) {
     const unsigned us_per_task = state.range(0);
     arb::threading::task_system ts;
     const auto nthreads = ts.get_num_threads();
-    const unsigned us_per_s = 1000000;
-    const unsigned num_tasks = nthreads*us_per_s/us_per_task;
+    const unsigned total_us = 1000000;
+    const unsigned num_tasks = nthreads*total_us/us_per_task;
 
     while (state.KeepRunning()) {
         run(us_per_task, num_tasks, &ts);
@@ -34,10 +53,11 @@ void task_test(benchmark::State& state) {
 }
 
 void us_per_task(benchmark::internal::Benchmark *b) {
-    for (auto ncomps: {100, 250, 500, 1000, 10000}) {
-        b->Args({ncomps});
+    for (auto us_per_task: {10, 100, 250, 500, 1000, 10000}) {
+        b->Args({us_per_task});
     }
 }
 
 BENCHMARK(task_test)->Apply(us_per_task);
+BENCHMARK(task_test_nested)->Apply(us_per_task);
 BENCHMARK_MAIN();
diff --git a/test/unit/test_thread.cpp b/test/unit/test_thread.cpp
index e1f0b2ed1117244ba919b349976191acd8fc908b..49d46d0539c479cee3f8f8357e42141651fcb049 100644
--- a/test/unit/test_thread.cpp
+++ b/test/unit/test_thread.cpp
@@ -38,11 +38,13 @@ struct ftor {
 };
 
 struct ftor_wait {
+    int duration_us = 100;
 
     ftor_wait() {}
+    ftor_wait(int us): duration_us(us) {}
 
     void operator()() const {
-        auto duration = std::chrono::microseconds(100);
+        auto duration = std::chrono::microseconds(duration_us);
         std::this_thread::sleep_for(duration);
     }
 };
@@ -63,34 +65,38 @@ struct ftor_parallel_wait {
 }
 
 TEST(task_system, test_copy) {
-    task_system ts;
+    for (int nthreads = 1; nthreads < 20; nthreads*=2) {
+        task_system ts(nthreads);
 
-    ftor f;
-    ts.async(f);
+        ftor f;
+        ts.async(f, 0);
 
-    // Copy into new ftor and move ftor into a task (std::function<void()>)
-    EXPECT_EQ(1, nmove);
-    EXPECT_EQ(1, ncopy);
-    reset();
+        // Copy into new ftor and move ftor into a task (std::function<void()>)
+        EXPECT_EQ(1, nmove);
+        EXPECT_EQ(1, ncopy);
+        reset();
+    }
 }
 
 TEST(task_system, test_move) {
-    task_system ts;
+    for (int nthreads = 1; nthreads < 20; nthreads*=2) {
+        task_system ts(nthreads);
 
-    ftor f;
-    ts.async(std::move(f));
+        ftor f;
+        ts.async(std::move(f), 0);
 
-    // Move into new ftor and move ftor into a task (std::function<void()>)
-    EXPECT_LE(nmove, 2);
-    EXPECT_LE(ncopy, 1);
-    reset();
+        // Move into new ftor and move ftor into a task (std::function<void()>)
+        EXPECT_LE(nmove, 2);
+        EXPECT_LE(ncopy, 1);
+        reset();
+    }
 }
 
 TEST(notification_queue, test_copy) {
     notification_queue q;
 
     ftor f;
-    q.push(f);
+    q.push({task(f), 0});
 
     // Copy into new ftor and move ftor into a task (std::function<void()>)
     EXPECT_EQ(1, nmove);
@@ -104,113 +110,297 @@ TEST(notification_queue, test_move) {
     ftor f;
 
     // Move into new ftor and move ftor into a task (std::function<void()>)
-    q.push(std::move(f));
+    q.push({task(std::move(f)), 1});
     EXPECT_LE(nmove, 2);
     EXPECT_LE(ncopy, 1);
     reset();
 }
 
 TEST(task_group, test_copy) {
-    task_system ts;
-    task_group g(&ts);
+    for (int nthreads = 1; nthreads < 20; nthreads*=2) {
+        task_system ts(nthreads);
+        task_group g(&ts);
 
-    ftor f;
-    g.run(f);
-    g.wait();
+        ftor f;
+        g.run(f);
+        g.wait();
 
-    // Copy into "wrap" and move wrap into a task (std::function<void()>)
-    EXPECT_EQ(1, nmove);
-    EXPECT_EQ(1, ncopy);
-    reset();
+        // Copy into "wrap" and move wrap into a task (std::function<void()>)
+        EXPECT_EQ(1, nmove);
+        EXPECT_EQ(1, ncopy);
+        reset();
+    }
 }
 
 TEST(task_group, test_move) {
-    task_system ts;
-    task_group g(&ts);
-
-    ftor f;
-    g.run(std::move(f));
-    g.wait();
-
-    // Move into wrap and move wrap into a task (std::function<void()>)
-    EXPECT_LE(nmove, 2);
-    EXPECT_LE(ncopy, 1);
-    reset();
+    for (int nthreads = 1; nthreads < 20; nthreads*=2) {
+        task_system ts(nthreads);
+        task_group g(&ts);
+
+        ftor f;
+        g.run(std::move(f));
+        g.wait();
+
+        // Move into wrap and move wrap into a task (std::function<void()>)
+        EXPECT_LE(nmove, 2);
+        EXPECT_LE(ncopy, 1);
+        reset();
+    }
 }
 
 TEST(task_group, individual_tasks) {
     // Simple check for deadlock
-    task_system ts;
-    task_group g(&ts);
-
-    auto nthreads = ts.get_num_threads();
+    for (int nthreads = 1; nthreads < 20; nthreads*=2) {
+        task_system ts(nthreads);
+        task_group g(&ts);
 
-    ftor_wait f;
-    for (int i = 0; i < 32 * nthreads; i++) {
-        g.run(f);
+        ftor_wait f;
+        for (int i = 0; i < 32 * nthreads; i++) {
+            g.run(f);
+        }
+        g.wait();
     }
-    g.wait();
 }
 
 TEST(task_group, parallel_for_sleep) {
     // Simple check for deadlock for nested parallelism
-    task_system ts;
-    auto nthreads = ts.get_num_threads();
-    task_group g(&ts);
+    for (int nthreads = 1; nthreads < 20; nthreads*=2) {
+        task_system ts(nthreads);
+        task_group g(&ts);
 
-    ftor_parallel_wait f(&ts);
-    for (int i = 0; i < nthreads; i++) {
-        g.run(f);
+        ftor_parallel_wait f(&ts);
+        for (int i = 0; i < nthreads; i++) {
+            g.run(f);
+        }
+        g.wait();
     }
-    g.wait();
 }
 
 TEST(task_group, parallel_for) {
-    task_system ts;
-    for (int n = 0; n < 10000; n=!n?1:2*n) {
-        std::vector<int> v(n, -1);
-        parallel_for::apply(0, n, &ts, [&](int i) {v[i] = i;});
-        for (int i = 0; i< n; i++) {
-            EXPECT_EQ(i, v[i]);
+    for (int nthreads = 1; nthreads < 20; nthreads*=2) {
+        task_system ts(nthreads);
+        for (int n = 0; n < 10000; n = !n ? 1 : 2 * n) {
+            std::vector<int> v(n, -1);
+            parallel_for::apply(0, n, &ts, [&](int i) { v[i] = i; });
+            for (int i = 0; i < n; i++) {
+                EXPECT_EQ(i, v[i]);
+            }
+        }
+    }
+}
+
+
+TEST(task_group, manual_nested_parallel_for) {
+    // Check for deadlock or stack overflow
+    const int ntasks = 100000;
+    {
+        for (int nthreads = 1; nthreads < 20; nthreads *= 4) {
+            std::vector<int> v(ntasks);
+            task_system ts(nthreads);
+
+            auto nested = [&](int j) {
+              task_group g1(&ts);
+              g1.run([&](){v[j] = j;});
+              g1.wait();
+            };
+
+            task_group g0(&ts);
+            for (int i = 0; i < ntasks; i++) {
+                g0.run([=](){nested(i);});
+            }
+            g0.wait();
+            for (int i = 0; i < ntasks; i++) {
+                EXPECT_EQ(i, v[i]);
+            }
+        }
+    }
+    {
+        for (int nthreads = 1; nthreads < 20; nthreads *= 4) {
+            std::vector<int> v(ntasks);
+            task_system ts(nthreads);
+
+            auto double_nested = [&](int i) {
+              task_group g2(&ts);
+              g2.run([&](){v[i] = i;});
+              g2.wait();
+            };
+
+            auto nested = [&](int i) {
+              task_group g1(&ts);
+              g1.run([=](){double_nested(i);});
+              g1.wait();
+            };
+
+            task_group g0(&ts);
+            for (int i = 0; i < ntasks; i++) {
+                g0.run([=](){nested(i);});
+            }
+            g0.wait();
+            for (int i = 0; i < ntasks; i++) {
+                EXPECT_EQ(i, v[i]);
+            }
         }
     }
 }
 
 TEST(task_group, nested_parallel_for) {
-    task_system ts;
-    for (int m = 1; m < 512; m*=2) {
-        for (int n = 0; n < 1000; n=!n?1:2*n) {
-            std::vector<std::vector<int>> v(n, std::vector<int>(m, -1));
-            parallel_for::apply(0, n, &ts, [&](int i) {
-                auto &w = v[i];
-                parallel_for::apply(0, m, &ts, [&](int j) { w[j] = i + j; });
-            });
-            for (int i = 0; i < n; i++) {
-                for (int j = 0; j < m; j++) {
-                    EXPECT_EQ(i + j, v[i][j]);
+    for (int nthreads = 1; nthreads < 20; nthreads*=2) {
+        task_system ts(nthreads);
+        for (int m = 1; m < 512; m *= 2) {
+            for (int n = 0; n < 1000; n = !n ? 1 : 2 * n) {
+                std::vector<std::vector<int>> v(n, std::vector<int>(m, -1));
+                parallel_for::apply(0, n, &ts, [&](int i) {
+                    auto& w = v[i];
+                    parallel_for::apply(0, m, &ts, [&](int j) { w[j] = i + j; });
+                });
+                for (int i = 0; i < n; i++) {
+                    for (int j = 0; j < m; j++) {
+                        EXPECT_EQ(i + j, v[i][j]);
+                    }
                 }
             }
         }
     }
 }
 
-TEST(enumerable_thread_specific, test) {
-    task_system_handle ts = task_system_handle(new task_system);
-    enumerable_thread_specific<int> buffers(ts);
-    task_group g(ts.get());
-
-    for (int i = 0; i < 100000; i++) {
-        g.run([&](){
-            auto& buf = buffers.local();
-            buf++;
-        });
+TEST(task_group, multi_nested_parallel_for) {
+    for (int nthreads = 1; nthreads < 20; nthreads*=2) {
+        task_system ts(nthreads);
+        for (int m = 1; m < 512; m *= 2) {
+            for (int n = 0; n < 128; n = !n ? 1 : 2 * n) {
+                for (int p = 0; p < 16; p = !p ? 1 : 2 * p) {
+                    std::vector<std::vector<std::vector<int>>> v(n, std::vector<std::vector<int>>(m, std::vector<int>(p, -1)));
+                    parallel_for::apply(0, n, &ts, [&](int i) {
+                        auto& w = v[i];
+                        parallel_for::apply(0, m, &ts, [&](int j) {
+                            auto& u = w[j];
+                            parallel_for::apply(0, p, &ts, [&](int k) {
+                                u[k] = i + j + k;
+                            });
+                        });
+                    });
+                    for (int i = 0; i < n; i++) {
+                        for (int j = 0; j < m; j++) {
+                            for (int k = 0; k < p; k++) {
+                                EXPECT_EQ(i + j + k, v[i][j][k]);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+TEST(task_group, nested_parallel_for_unbalanced) {
+    // Top level parallel for has many more tasks than lower level
+    const int ntasks = 100000;
+    {
+        // Default batching
+        for (int nthreads = 1; nthreads < 20; nthreads *= 4) {
+            task_system ts(nthreads);
+            std::vector<int> v(ntasks);
+            parallel_for::apply(0, ntasks, &ts, [&](int i) {
+                parallel_for::apply(0, 1, &ts, [&](int j) { v[i] = i; });
+            });
+            for (int i = 0; i < ntasks; i++) {
+                EXPECT_EQ(i, v[i]);
+            }
+        }
+        // 128 tasks per batch
+        const int batch_size = 128;
+        for (int nthreads = 1; nthreads < 20; nthreads *= 4) {
+            task_system ts(nthreads);
+            std::vector<int> v(ntasks);
+            parallel_for::apply(0, ntasks, batch_size, &ts, [&](int i) {
+              parallel_for::apply(0, 1, batch_size, &ts, [&](int j) { v[i] = i; });
+            });
+            for (int i = 0; i < ntasks; i++) {
+                EXPECT_EQ(i, v[i]);
+            }
+        }
+    }
+    // lower level parallel for has many more tasks than top level
+    {
+        // Default batching
+        for (int nthreads = 1; nthreads < 20; nthreads *= 4) {
+            task_system ts(nthreads);
+            std::vector<int> v(ntasks);
+            parallel_for::apply(0, 1, &ts, [&](int i) {
+                parallel_for::apply(0, ntasks, &ts, [&](int j) { v[j] = j; });
+            });
+            for (int i = 0; i < ntasks; i++) {
+                EXPECT_EQ(i, v[i]);
+            }
+        }
+        // 128 tasks per batch
+        const int batch_size = 128;
+        for (int nthreads = 1; nthreads < 20; nthreads *= 4) {
+            task_system ts(nthreads);
+            std::vector<int> v(ntasks);
+            parallel_for::apply(0, 1, batch_size, &ts, [&](int i) {
+                parallel_for::apply(0, ntasks, batch_size, &ts, [&](int j) { v[j] = j; });
+            });
+            for (int i = 0; i < ntasks; i++) {
+                EXPECT_EQ(i, v[i]);
+            }
+        }
     }
-    g.wait();
+}
 
-    int sum = 0;
-    for (auto b: buffers) {
-        sum += b;
+TEST(task_group, multi_nested_parallel_for_unbalanced) {
+    // Top level parallel for has many more tasks than lower level
+    const int ntasks = 100000;
+    for (int nthreads = 1; nthreads < 20; nthreads*=4) {
+        task_system ts(nthreads);
+        std::vector<int> v(ntasks);
+        parallel_for::apply(0, ntasks, &ts, [&](int i) {
+            parallel_for::apply(0, 1, &ts, [&](int j) {
+                parallel_for::apply(0, 1, &ts, [&](int k) {
+                    v[i] = i;
+                });
+            });
+        });
+        for (int i = 0; i < ntasks; i++) {
+            EXPECT_EQ(i, v[i]);
+        }
+    }
+    // lower level parallel for has many more tasks than top level
+    for (int nthreads = 1; nthreads < 20; nthreads*=4) {
+        task_system ts(nthreads);
+        std::vector<int> v(ntasks);
+        parallel_for::apply(0, 1, &ts, [&](int i) {
+            parallel_for::apply(0, 1, &ts, [&](int j) {
+                parallel_for::apply(0, ntasks, &ts, [&](int k) {
+                    v[k] = k;
+                });
+            });
+        });
+        for (int i = 0; i < ntasks; i++) {
+            EXPECT_EQ(i, v[i]);
+        }
     }
+}
 
-    EXPECT_EQ(100000, sum);
+TEST(enumerable_thread_specific, test) {
+    for (int nthreads = 1; nthreads < 20; nthreads*=2) {
+        task_system_handle ts = task_system_handle(new task_system(nthreads));
+        enumerable_thread_specific<int> buffers(ts);
+        task_group g(ts.get());
+
+        for (int i = 0; i < 100000; i++) {
+            g.run([&]() {
+                auto& buf = buffers.local();
+                buf++;
+            });
+        }
+        g.wait();
+
+        int sum = 0;
+        for (auto b: buffers) {
+            sum += b;
+        }
+
+        EXPECT_EQ(100000, sum);
+    }
 }