diff --git a/src/communication/communicator.hpp b/src/communication/communicator.hpp index 42dfa845a61b96e9ec5a7bd616bd124124531fd2..79ea52a2047bac4b4c53e98aa8b9bfee795cdc8a 100644 --- a/src/communication/communicator.hpp +++ b/src/communication/communicator.hpp @@ -7,6 +7,7 @@ #include <spike.hpp> #include <threading/threading.hpp> +#include <util/double_buffer.hpp> #include <algorithms.hpp> #include <event_queue.hpp> @@ -26,14 +27,76 @@ namespace communication { // Once all connections have been specified, the construct() method can be used // to build the data structures required for efficient spike communication and // event generation. +// +// To overlap communication and computation, i.e. to perform spike +// exchange at the same time as cell state update, thread safe access to the +// spike and event lists must be provided. We use double buffering, whereby +// for each of the spikes and events one buffer is exposed publicly, while +// the other is used internally by the communicator +// - the spike lists are not directly exposed via the communicator +// interface. Instead they are updated when the add_spike() methods +// are called. +// - the event queues for each cell group are exposed via the queue() +// method. +// For each double buffer, the current buffer (accessed via buffer.get()) +// is exposed to the user, and the other buffer is used inside exchange(). + template <typename CommunicationPolicy> class communicator { -public: - using id_type = cell_gid_type; +private: using communication_policy_type = CommunicationPolicy; - + using id_type = cell_gid_type; using spike_type = spike<cell_member_type>; + /// thread private storage for accumulating spikes + using local_spike_store_type = + threading::enumerable_thread_specific<std::vector<spike_type>>; + + /// per-cell group lists of events to be delivered + using event_queue = + std::vector<postsynaptic_spike_event>; + + /// double buffered storage of the thread private spike lists + util::double_buffer<local_spike_store_type> thread_spikes_; + + /// double buffered storage of the cell group event lists + util::double_buffer<std::vector<event_queue>> events_; + + /// access to the spikes buffered from the previous communication + /// interval. Used internally by the communicator for exchange + local_spike_store_type& buffered_spikes() { + return thread_spikes_.other(); + } + + /// access to thread-private list of spikes, used for storing + /// spikes added via the add_spike() interface + std::vector<spike_type>& thread_spikes() { + return thread_spikes_.get().local(); + } + + void clear_buffered_spikes() { + for (auto& v : buffered_spikes()) { + v.clear(); + } + } + + std::vector<spike_type> gather_local_spikes() { + std::vector<spike_type> spikes; + for (auto& v : buffered_spikes()) { + spikes.insert(spikes.end(), v.begin(), v.end()); + } + return spikes; + } + + std::vector<connection> connections_; + + communication_policy_type communication_policy_; + + uint64_t num_spikes_ = 0u; + id_type cell_gid_from_; + id_type cell_gid_to_; + +public: communicator() = default; // for now, still assuming one-to-one association cells <-> groups, @@ -45,10 +108,10 @@ public: auto num_groups_local_ = cell_gid_to_-cell_gid_from_; // create an event queue for each target group - events_.resize(num_groups_local_); + events_.get().resize(num_groups_local_); + events_.other().resize(num_groups_local_); } - void add_connection(connection con) { EXPECTS(is_local_cell(con.destination().gid)); connections_.push_back(con); @@ -83,26 +146,20 @@ public: v.insert(v.end(), s.begin(), s.end()); } - std::vector<spike_type>& thread_spikes() { - return thread_spikes_.local(); - } - void exchange() { // global all-to-all to gather a local copy of the global spike list // on each node - //profiler_.enter("global exchange"); - auto global_spikes = communication_policy_.gather_spikes(local_spikes()); + auto global_spikes = communication_policy_.gather_spikes(gather_local_spikes()); num_spikes_ += global_spikes.size(); - clear_thread_spike_buffers(); - //profiler_.leave(); + clear_buffered_spikes(); - for (auto& q : events_) { + // clear the event queue buffers, which will hold the events generated by the + // global_spikes in the exchange + auto& queues = events_.other(); + for (auto& q : queues) { q.clear(); } - //profiler_.enter("events"); - - //profiler_.enter("make events"); // check all global spikes to see if they will generate local events for (auto spike : global_spikes) { // search for targets @@ -114,20 +171,15 @@ public: // generate an event for each target for (auto it=targets.first; it!=targets.second; ++it) { auto gidx = it->destination().gid - cell_gid_from_; - events_[gidx].push_back(it->make_event(spike)); + queues[gidx].push_back(it->make_event(spike)); } } - - - //profiler_.leave(); // make events - - //profiler_.leave(); // event generation } uint64_t num_spikes() const { return num_spikes_; } const std::vector<postsynaptic_spike_event>& queue(int i) const { - return events_[i]; + return events_.get()[i]; } const std::vector<connection>& connections() const { @@ -138,46 +190,11 @@ public: return communication_policy_; } - std::vector<spike_type> local_spikes() { - std::vector<spike_type> spikes; - for (auto& v : thread_spikes_) { - spikes.insert(spikes.end(), v.begin(), v.end()); - } - return spikes; - } - - void clear_thread_spike_buffers() { - for (auto& v : thread_spikes_) { - v.clear(); - } + void swap_buffers() { + thread_spikes_.exchange(); + events_.exchange(); } -private: - - // - // both of these can be fixed with double buffering - // - // FIXME : race condition on the thread_spikes_ buffers when exchange() modifies/access them - // ... other threads will be pushing to them simultaneously - // FIXME : race condition on the group-specific event queues when exchange pushes to them - // ... other threads will be accessing them to update their event queues - - // thread private storage for accumulating spikes - using local_spike_store_type = - nest::mc::threading::enumerable_thread_specific<std::vector<spike_type>>; - local_spike_store_type thread_spikes_; - - std::vector<connection> connections_; - std::vector<std::vector<postsynaptic_spike_event>> events_; - - // for keeping track of how time is spent where - //util::Profiler profiler_; - - communication_policy_type communication_policy_; - - uint64_t num_spikes_ = 0u; - id_type cell_gid_from_; - id_type cell_gid_to_; }; } // namespace communication diff --git a/src/model.hpp b/src/model.hpp index 252691fdbab57913a75ccfde1dcf19321752f7a3..2e4a2c60e233410a27a7598de19709d2818259f9 100644 --- a/src/model.hpp +++ b/src/model.hpp @@ -29,7 +29,9 @@ struct model { }; model(const recipe &rec, cell_gid_type cell_from, cell_gid_type cell_to): - cell_from_(cell_from), cell_to_(cell_to) + cell_from_(cell_from), + cell_to_(cell_to), + communicator_(cell_from, cell_to) { cell_groups_ = std::vector<cell_group_type>{cell_to_-cell_from_}; @@ -51,7 +53,6 @@ struct model { probes_.assign(probes.begin(), probes.end()); - communicator_ = communicator_type(cell_from_, cell_to_); for (cell_gid_type i=cell_from_; i<cell_to_; ++i) { for (const auto& cc: rec.connections_on(i)) { // currently cell_connection and connection are basically the same data; @@ -74,6 +75,10 @@ struct model { time_type min_delay = communicator_.min_delay(); while (t_<tfinal) { auto tuntil = std::min(t_+min_delay, tfinal); + + // ensure that spikes are available for exchange + communicator_.swap_buffers(); + threading::parallel_for::apply( 0u, cell_groups_.size(), [&](unsigned i) {