Skip to content
Snippets Groups Projects
Commit f01e2147 authored by Ben Cumming's avatar Ben Cumming Committed by Sam Yates
Browse files

Fix that ensures all spikes are saved to disk (#195)

* Add an additional exchange at the end of the time stepping loop to ensure that any spikes still in spike buffers are exchanged and saved to disk.
* Remove the parallel sort from the cthread back end to fix compilation errors with CUDA and cthreads.
* Fix `nvcc` warning concerning initialization of `std::size_t` variable with literal `-1`.
parent 9df68703
No related branches found
No related tags found
No related merge requests found
......@@ -276,7 +276,7 @@ void write_trace_json(const sample_trace_type& trace, const std::string& prefix)
void report_compartment_stats(const recipe& rec) {
std::size_t ncell = rec.num_cells();
std::size_t ncomp_total = 0;
std::size_t ncomp_min = -1;
std::size_t ncomp_min = std::numeric_limits<std::size_t>::max();
std::size_t ncomp_max = 0;
for (std::size_t i = 0; i<ncell; ++i) {
......
......@@ -50,9 +50,9 @@ public:
file_handle_.open(file_path_);
}
// Performs the a export of the spikes to file
// one id and spike time with 4 decimals after the comma on a
// line space separated
// Performs export of the spikes to file.
// One id and spike time with 4 decimals after the comma on a
// line space separated.
void output(const std::vector<spike_type>& spikes) override {
for (auto spike : spikes) {
char linebuf[45];
......
......@@ -124,8 +124,54 @@ public:
// to overlap communication and computation.
time_type t_interval = communicator_.min_delay()/2;
time_type tuntil;
// task that updates cell state in parallel.
auto update_cells = [&] () {
threading::parallel_for::apply(
0u, cell_groups_.size(),
[&](unsigned i) {
auto &group = cell_groups_[i];
PE("stepping","events");
group.enqueue_events(current_events()[i]);
PL();
group.advance(tuntil, dt);
PE("events");
current_spikes().insert(group.spikes());
group.clear_spikes();
PL(2);
});
};
// task that performs spike exchange with the spikes generated in
// the previous integration period, generating the postsynaptic
// events that must be delivered at the start of the next
// integration period at the latest.
auto exchange = [&] () {
PE("stepping", "communication");
PE("exchange");
auto local_spikes = previous_spikes().gather();
auto global_spikes = communicator_.exchange(local_spikes);
PL();
PE("spike output");
local_export_callback_(local_spikes);
global_export_callback_(global_spikes.values());
PL();
PE("events");
future_events() = communicator_.make_event_queues(global_spikes);
PL();
PL(2);
};
while (t_<tfinal) {
auto tuntil = std::min(t_+t_interval, tfinal);
tuntil = std::min(t_+t_interval, tfinal);
event_queues_.exchange();
local_spikes_.exchange();
......@@ -134,50 +180,6 @@ public:
// these buffers will store the new spikes generated in update_cells.
current_spikes().clear();
// task that updates cell state in parallel.
auto update_cells = [&] () {
threading::parallel_for::apply(
0u, cell_groups_.size(),
[&](unsigned i) {
auto &group = cell_groups_[i];
PE("stepping","events");
group.enqueue_events(current_events()[i]);
PL();
group.advance(tuntil, dt);
PE("events");
current_spikes().insert(group.spikes());
group.clear_spikes();
PL(2);
});
};
// task that performs spike exchange with the spikes generated in
// the previous integration period, generating the postsynaptic
// events that must be delivered at the start of the next
// integration period at the latest.
auto exchange = [&] () {
PE("stepping", "communication");
PE("exchange");
auto local_spikes = previous_spikes().gather();
auto global_spikes = communicator_.exchange(local_spikes);
PL();
PE("spike output");
local_export_callback_(local_spikes);
global_export_callback_(global_spikes.values());
PL();
PE("events");
future_events() = communicator_.make_event_queues(global_spikes);
PL();
PL(2);
};
// run the tasks, overlapping if the threading model and number of
// available threads permits it.
threading::task_group g;
......@@ -188,6 +190,12 @@ public:
t_ = tuntil;
}
// Run the exchange one last time to ensure that all spikes are output
// to file.
event_queues_.exchange();
local_spikes_.exchange();
exchange();
return t_;
}
......
/*
Copyright (C) 2014 Intel Corporation
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
* Neither the name of Intel Corporation nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
Modified for nestmc
*/
#include <algorithm>
#include "pss_common.h"
namespace pss {
namespace internal {
using task_group = nest::mc::threading::task_group;
// Merge sequences [xs,xe) and [ys,ye) to output sequence [zs,zs+(xe-xs)+(ye-ys))
// Destroy input sequence iff destroy==true
template<typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename RandomAccessIterator3,
typename Compare>
void parallel_move_merge(RandomAccessIterator1 xs,
RandomAccessIterator1 xe,
RandomAccessIterator2 ys,
RandomAccessIterator2 ye,
RandomAccessIterator3 zs,
bool destroy,
Compare comp)
{
task_group g;
const int MERGE_CUT_OFF = 2000;
while( (xe-xs) + (ye-ys) > MERGE_CUT_OFF ) {
RandomAccessIterator1 xm;
RandomAccessIterator2 ym;
if( xe-xs < ye-ys ) {
ym = ys+(ye-ys)/2;
xm = std::upper_bound(xs,xe,*ym,comp);
} else {
xm = xs+(xe-xs)/2;
ym = std::lower_bound(ys,ye,*xm,comp);
}
g.run([=] {
parallel_move_merge( xs, xm, ys, ym, zs, destroy, comp);
});
zs += (xm-xs) + (ym-ys);
xs = xm;
ys = ym;
}
serial_move_merge( xs, xe, ys, ye, zs, comp );
if( destroy ) {
serial_destroy( xs, xe );
serial_destroy( ys, ye );
}
g.wait();
}
// Sorts [xs,xe), where zs[0:xe-xs) is temporary buffer supplied by caller.
// Result is in [xs,xe) if inplace==true, otherwise in [zs,zs+(xe-xs))
template<typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename Compare>
void parallel_stable_sort_aux(RandomAccessIterator1 xs,
RandomAccessIterator1 xe,
RandomAccessIterator2 zs,
int inplace,
Compare comp)
{
//typedef typename std::iterator_traits<RandomAccessIterator2>::value_type T;
const int SORT_CUT_OFF = 500;
if( xe-xs<=SORT_CUT_OFF ) {
stable_sort_base_case(xs, xe, zs, inplace, comp);
}
else {
RandomAccessIterator1 xm = xs + (xe-xs)/2;
RandomAccessIterator2 zm = zs + (xm-xs);
RandomAccessIterator2 ze = zs + (xe-xs);
task_group g;
g.run([&] {
parallel_stable_sort_aux( xs, xm, zs, !inplace, comp );
});
parallel_stable_sort_aux( xm, xe, zm, !inplace, comp );
g.wait();
if( inplace )
parallel_move_merge( zs, zm, zm, ze, xs, inplace==2, comp );
else
parallel_move_merge( xs, xm, xm, xe, zs, false, comp );
}
}
} // namespace internal
template<typename RandomAccessIterator, typename Compare>
void parallel_stable_sort(RandomAccessIterator xs,
RandomAccessIterator xe,
Compare comp )
{
using T
= typename std::iterator_traits<RandomAccessIterator>
::value_type;
if(internal::raw_buffer z
= internal::raw_buffer( sizeof(T)*(xe-xs)))
internal::parallel_stable_sort_aux( xs, xe,
(T*)z.get(), 2, comp );
else
// Not enough memory available - fall back on serial sort
std::stable_sort( xs, xe, comp );
}
template<class RandomAccessIterator>
void parallel_stable_sort(RandomAccessIterator xs,
RandomAccessIterator xe)
{
using T
= typename std::iterator_traits<RandomAccessIterator>
::value_type;
parallel_stable_sort(xs, xe, std::less<T>());
}
} // namespace pss
// parallel stable sort uses threading
#include "cthread_parallel_stable_sort.h"
namespace nest {
namespace mc {
namespace threading {
template <typename RandomIt>
void sort(RandomIt begin, RandomIt end) {
pss::parallel_stable_sort(begin, end);
std::sort(begin, end);
}
template <typename RandomIt, typename Compare>
void sort(RandomIt begin, RandomIt end, Compare comp) {
pss::parallel_stable_sort(begin, end ,comp);
std::sort(begin, end, comp);
}
template <typename Container>
void sort(Container& c) {
pss::parallel_stable_sort(c.begin(), c.end());
std::sort(std::begin(c), std::end(c));
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment