Skip to content
Snippets Groups Projects
Commit 8ea74421 authored by Benjamin Cumming's avatar Benjamin Cumming
Browse files

make communicator thread safe for overlap of comms & comp

- add double buffering of spikes and events to communicator
- communicator::exchange() generates buffered events from buffered spikes
- this allows communication alongside cell update methods accessing spike
  and event lists
parent 16d96f75
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@
#include <spike.hpp>
#include <threading/threading.hpp>
#include <util/double_buffer.hpp>
#include <algorithms.hpp>
#include <event_queue.hpp>
......@@ -26,14 +27,76 @@ namespace communication {
// Once all connections have been specified, the construct() method can be used
// to build the data structures required for efficient spike communication and
// event generation.
//
// To overlap communication and computation, i.e. to perform spike
// exchange at the same time as cell state update, thread safe access to the
// spike and event lists must be provided. We use double buffering, whereby
// for each of the spikes and events one buffer is exposed publicly, while
// the other is used internally by the communicator
// - the spike lists are not directly exposed via the communicator
// interface. Instead they are updated when the add_spike() methods
// are called.
// - the event queues for each cell group are exposed via the queue()
// method.
// For each double buffer, the current buffer (accessed via buffer.get())
// is exposed to the user, and the other buffer is used inside exchange().
template <typename CommunicationPolicy>
class communicator {
public:
using id_type = cell_gid_type;
private:
using communication_policy_type = CommunicationPolicy;
using id_type = cell_gid_type;
using spike_type = spike<cell_member_type>;
/// thread private storage for accumulating spikes
using local_spike_store_type =
threading::enumerable_thread_specific<std::vector<spike_type>>;
/// per-cell group lists of events to be delivered
using event_queue =
std::vector<postsynaptic_spike_event>;
/// double buffered storage of the thread private spike lists
util::double_buffer<local_spike_store_type> thread_spikes_;
/// double buffered storage of the cell group event lists
util::double_buffer<std::vector<event_queue>> events_;
/// access to the spikes buffered from the previous communication
/// interval. Used internally by the communicator for exchange
local_spike_store_type& buffered_spikes() {
return thread_spikes_.other();
}
/// access to thread-private list of spikes, used for storing
/// spikes added via the add_spike() interface
std::vector<spike_type>& thread_spikes() {
return thread_spikes_.get().local();
}
void clear_buffered_spikes() {
for (auto& v : buffered_spikes()) {
v.clear();
}
}
std::vector<spike_type> gather_local_spikes() {
std::vector<spike_type> spikes;
for (auto& v : buffered_spikes()) {
spikes.insert(spikes.end(), v.begin(), v.end());
}
return spikes;
}
std::vector<connection> connections_;
communication_policy_type communication_policy_;
uint64_t num_spikes_ = 0u;
id_type cell_gid_from_;
id_type cell_gid_to_;
public:
communicator() = default;
// for now, still assuming one-to-one association cells <-> groups,
......@@ -45,10 +108,10 @@ public:
auto num_groups_local_ = cell_gid_to_-cell_gid_from_;
// create an event queue for each target group
events_.resize(num_groups_local_);
events_.get().resize(num_groups_local_);
events_.other().resize(num_groups_local_);
}
void add_connection(connection con) {
EXPECTS(is_local_cell(con.destination().gid));
connections_.push_back(con);
......@@ -83,26 +146,20 @@ public:
v.insert(v.end(), s.begin(), s.end());
}
std::vector<spike_type>& thread_spikes() {
return thread_spikes_.local();
}
void exchange() {
// global all-to-all to gather a local copy of the global spike list
// on each node
//profiler_.enter("global exchange");
auto global_spikes = communication_policy_.gather_spikes(local_spikes());
auto global_spikes = communication_policy_.gather_spikes(gather_local_spikes());
num_spikes_ += global_spikes.size();
clear_thread_spike_buffers();
//profiler_.leave();
clear_buffered_spikes();
for (auto& q : events_) {
// clear the event queue buffers, which will hold the events generated by the
// global_spikes in the exchange
auto& queues = events_.other();
for (auto& q : queues) {
q.clear();
}
//profiler_.enter("events");
//profiler_.enter("make events");
// check all global spikes to see if they will generate local events
for (auto spike : global_spikes) {
// search for targets
......@@ -114,20 +171,15 @@ public:
// generate an event for each target
for (auto it=targets.first; it!=targets.second; ++it) {
auto gidx = it->destination().gid - cell_gid_from_;
events_[gidx].push_back(it->make_event(spike));
queues[gidx].push_back(it->make_event(spike));
}
}
//profiler_.leave(); // make events
//profiler_.leave(); // event generation
}
uint64_t num_spikes() const { return num_spikes_; }
const std::vector<postsynaptic_spike_event>& queue(int i) const {
return events_[i];
return events_.get()[i];
}
const std::vector<connection>& connections() const {
......@@ -138,46 +190,11 @@ public:
return communication_policy_;
}
std::vector<spike_type> local_spikes() {
std::vector<spike_type> spikes;
for (auto& v : thread_spikes_) {
spikes.insert(spikes.end(), v.begin(), v.end());
}
return spikes;
}
void clear_thread_spike_buffers() {
for (auto& v : thread_spikes_) {
v.clear();
}
void swap_buffers() {
thread_spikes_.exchange();
events_.exchange();
}
private:
//
// both of these can be fixed with double buffering
//
// FIXME : race condition on the thread_spikes_ buffers when exchange() modifies/access them
// ... other threads will be pushing to them simultaneously
// FIXME : race condition on the group-specific event queues when exchange pushes to them
// ... other threads will be accessing them to update their event queues
// thread private storage for accumulating spikes
using local_spike_store_type =
nest::mc::threading::enumerable_thread_specific<std::vector<spike_type>>;
local_spike_store_type thread_spikes_;
std::vector<connection> connections_;
std::vector<std::vector<postsynaptic_spike_event>> events_;
// for keeping track of how time is spent where
//util::Profiler profiler_;
communication_policy_type communication_policy_;
uint64_t num_spikes_ = 0u;
id_type cell_gid_from_;
id_type cell_gid_to_;
};
} // namespace communication
......
......@@ -29,7 +29,9 @@ struct model {
};
model(const recipe &rec, cell_gid_type cell_from, cell_gid_type cell_to):
cell_from_(cell_from), cell_to_(cell_to)
cell_from_(cell_from),
cell_to_(cell_to),
communicator_(cell_from, cell_to)
{
cell_groups_ = std::vector<cell_group_type>{cell_to_-cell_from_};
......@@ -51,7 +53,6 @@ struct model {
probes_.assign(probes.begin(), probes.end());
communicator_ = communicator_type(cell_from_, cell_to_);
for (cell_gid_type i=cell_from_; i<cell_to_; ++i) {
for (const auto& cc: rec.connections_on(i)) {
// currently cell_connection and connection are basically the same data;
......@@ -74,6 +75,10 @@ struct model {
time_type min_delay = communicator_.min_delay();
while (t_<tfinal) {
auto tuntil = std::min(t_+min_delay, tfinal);
// ensure that spikes are available for exchange
communicator_.swap_buffers();
threading::parallel_for::apply(
0u, cell_groups_.size(),
[&](unsigned i) {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment