diff --git a/src/communication/communicator.hpp b/src/communication/communicator.hpp index a5b13639865c6e44467d838c39b177e1a8c80176..4466af131a0215293e171e58ae5dcd71808e494b 100644 --- a/src/communication/communicator.hpp +++ b/src/communication/communicator.hpp @@ -150,17 +150,22 @@ public: /// Check each global spike in turn to see it generates local events. /// If so, make the events and insert them into the appropriate event list. - /// Return a vector that contains the event queues for each local cell group. /// - /// Returns a vector of event queues, with one queue for each local cell group. The - /// events in each queue are all events that must be delivered to targets in that cell - /// group as a result of the global spike exchange. - std::vector<pse_vector> make_event_queues(const gathered_vector<spike>& global_spikes) { + /// Takes reference to a vector of event lists as an argument, with one list + /// for each local cell group. On completion, the events in each list are + /// all events that must be delivered to targets in that cell group as a + /// result of the global spike exchange, plus any events that were already + /// in the list. + void make_event_queues( + const gathered_vector<spike>& global_spikes, + std::vector<pse_vector>& queues) + { + EXPECTS(queues.size()==num_local_cells_); + using util::subrange_view; using util::make_span; using util::make_range; - auto queues = std::vector<pse_vector>(num_local_cells_); const auto& sp = global_spikes.partition(); const auto& cp = connection_part_; for (auto dom: make_span(0, num_domains_)) { @@ -210,8 +215,6 @@ public: } } } - - return queues; } /// Returns the total number of global spikes over the duration of the simulation diff --git a/src/merge_events.hpp b/src/merge_events.hpp index bba08cbd54f651d26043080c3f27fb8a1e2d1c8e..9976d6cc655e2306610dcd47fc78049df7a96424 100644 --- a/src/merge_events.hpp +++ b/src/merge_events.hpp @@ -13,7 +13,7 @@ namespace arb { // delivered after the current epoch ends. It merges events from multiple // sources: // lc : the list of currently enqueued events -// events : an unsorted list of events from the communicator +// pending_events : an unsorted list of events from the communicator // generators : a set of event_generators // // The time intervales are illustrated below, along with the range of times @@ -26,11 +26,11 @@ namespace arb { // |------|------| // // [----------------------] lc -// [---------------] events +// [---------------] pending_events // [------) generators // // The output list, stored in lf, will contain all the following: -// * all events in events list +// * all events in pending_events // * events in lc with time >= t₀ // * events from each generator with time < t₠// All events in lc that are to be delivered before t₀ are discared, along with @@ -39,7 +39,7 @@ namespace arb { void merge_events(time_type t0, time_type t1, const pse_vector& lc, - pse_vector& events, + pse_vector& pending_events, std::vector<event_generator_ptr>& generators, pse_vector& lf); diff --git a/src/model.cpp b/src/model.cpp index 47519576dcbed4170f0201b9afc8f306bcedf47a..3afb9f4df9a40690d7d7db4ac6faea7494769ab5 100644 --- a/src/model.cpp +++ b/src/model.cpp @@ -18,13 +18,21 @@ namespace arb { model::model(const recipe& rec, const domain_decomposition& decomp): communicator_(rec, decomp) { - event_generators_.resize(communicator_.num_local_cells()); + const auto num_local_cells = communicator_.num_local_cells(); + + // Cache the minimum delay of the network + min_delay_ = communicator_.min_delay(); + + // Initialize empty buffers for pending events for each local cell + pending_events_.resize(num_local_cells); + + event_generators_.resize(num_local_cells); cell_local_size_type lidx = 0; const auto& grps = decomp.groups; for (auto i: util::make_span(0, grps.size())) { for (auto gid: grps[i].gids) { // Store mapping of gid to local cell index. - gid_to_local_[gid] = lidx++; + gid_to_local_[gid] = lidx; // Set up the event generators for cell gid. auto rec_gens = rec.event_generators(gid); @@ -39,6 +47,7 @@ model::model(const recipe& rec, const domain_decomposition& decomp): gens.push_back(std::move(g)); } } + ++lidx; } } @@ -51,35 +60,42 @@ model::model(const recipe& rec, const domain_decomposition& decomp): PL(2); }); - // Create event lane buffers. // There is one set for each epoch: current (0) and next (1). // For each epoch there is one lane for each cell in the cell group. - event_lanes_[0].resize(communicator_.num_local_cells()); - event_lanes_[1].resize(communicator_.num_local_cells()); + event_lanes_[0].resize(num_local_cells); + event_lanes_[1].resize(num_local_cells); } void model::reset() { t_ = 0.; + // Reset cell group state. for (auto& group: cell_groups_) { group->reset(); } + // Clear all pending events in the event lanes. for (auto& lanes: event_lanes_) { for (auto& lane: lanes) { lane.clear(); } } + // Reset all event generators, and advance to t_. for (auto& lane: event_generators_) { for (auto& gen: lane) { if (gen) { gen->reset(); + gen->advance(t_); } } } + for (auto& lane: pending_events_) { + lane.clear(); + } + communicator_.reset(); current_spikes().clear(); @@ -94,9 +110,7 @@ time_type model::run(time_type tfinal, time_type dt) { // If spike exchange and cell update are serialized, this is the // minimum delay of the network, however we use half this period // to overlap communication and computation. - time_type t_interval = communicator_.min_delay()/2; - - time_type tuntil; + const time_type t_interval = min_delay_/2; // task that updates cell state in parallel. auto update_cells = [&] () { @@ -135,28 +149,23 @@ time_type model::run(time_type tfinal, time_type dt) { PL(); PE("events","from-spikes"); - auto events = communicator_.make_event_queues(global_spikes); + communicator_.make_event_queues(global_spikes, pending_events_); PL(); PE("enqueue"); - threading::parallel_for::apply(0, communicator_.num_local_cells(), - [&](cell_size_type i) { - const auto epid = epoch_.id; - merge_events( - epoch_.tfinal, - epoch_.tfinal+std::min(t_+t_interval, tfinal), - event_lanes(epid)[i], - events[i], - event_generators_[i], - event_lanes(epid+1)[i]); - }); + const auto t0 = epoch_.tfinal; + const auto t1 = std::min(tfinal, t0+t_interval); + setup_events(t0, t1, epoch_.id); PL(2); PL(2); }; - tuntil = std::min(t_+t_interval, tfinal); + time_type tuntil = std::min(t_+t_interval, tfinal); epoch_ = epoch(0, tuntil); + PE("stepping", "communication", "events", "enqueue"); + setup_events(t_, tuntil, 1); + PL(4); while (t_<tfinal) { local_spikes_.exchange(); @@ -185,6 +194,28 @@ time_type model::run(time_type tfinal, time_type dt) { return t_; } +// Populate the event lanes for epoch+1 (i.e event_lanes_[epoch+1)] +// Update each lane in parallel, if supported by the threading backend. +// On completion event_lanes[epoch+1] will contain sorted lists of events with +// delivery times due in or after epoch+1. The events will be taken from the +// following sources: +// event_lanes[epoch]: take all events ≥ t_from +// event_generators : take all events < t_to +// pending_events : take all events +void model::setup_events(time_type t_from, time_type t_to, std::size_t epoch) { + const auto n = communicator_.num_local_cells(); + threading::parallel_for::apply(0, n, + [&](cell_size_type i) { + merge_events( + t_from, t_to, + event_lanes(epoch)[i], // in: the current event lane + pending_events_[i], // in: events from the communicator + event_generators_[i], // in: event generators for this lane + event_lanes(epoch+1)[i]); // out: the event lane for the next epoch + pending_events_[i].clear(); + }); +} + sampler_association_handle model::add_sampler(cell_member_predicate probe_ids, schedule sched, sampler_function f, sampling_policy policy) { sampler_association_handle h = sassoc_handles_.acquire(); @@ -248,28 +279,22 @@ util::optional<cell_size_type> model::local_cell_index(cell_gid_type gid) { } void model::inject_events(const pse_vector& events) { - auto& lanes = event_lanes(epoch_.id); - - // Append all events that are to be delivered to local cells to the - // appropriate lane. At the same time, keep track of which lanes have been - // modified, because the lanes will have to be sorted once all events have - // been added. - pse_vector local_events; - std::set<cell_size_type> modified_lanes; + // Push all events that are to be delivered to local cells into the + // pending event list for the event's target cell. for (auto& e: events) { if (e.time<t_) { - throw std::runtime_error("model::inject_events(): attempt to inject an event at time " + std::to_string(e.time) + ", when model state is at time " + std::to_string(t_)); + throw std::runtime_error( + "model::inject_events(): attempt to inject an event at time " + + std::to_string(e.time) + + ", when model state is at time " + + std::to_string(t_)); } + // local_cell_index returns an optional type that evaluates + // to true iff the gid is a local cell. if (auto lidx = local_cell_index(e.target.gid)) { - lanes[*lidx].push_back(e); - modified_lanes.insert(*lidx); + pending_events_[*lidx].push_back(e); } } - - // Sort events in the event lanes that were modified - for (auto l: modified_lanes) { - util::sort(lanes[l]); - } } } // namespace arb diff --git a/src/model.hpp b/src/model.hpp index 268e31e99c594adacdb77209a26ec7168ec08b6a..acac870322b451c2fa3bb1995f8c699d66330873 100644 --- a/src/model.hpp +++ b/src/model.hpp @@ -60,6 +60,10 @@ public: void inject_events(const pse_vector& events); private: + // Private helper function that sets up the event lanes for an epoch. + // See comments on implementation for more information. + void setup_events(time_type t_from, time_type time_to, std::size_t epoch_id); + std::vector<pse_vector>& event_lanes(std::size_t epoch_id); std::size_t num_groups() const; @@ -68,6 +72,7 @@ private: epoch epoch_; time_type t_ = 0.; + time_type min_delay_; std::vector<cell_group_ptr> cell_groups_; // one set of event_generators for each local cell @@ -103,6 +108,7 @@ private: // Pending events to be delivered. std::array<std::vector<pse_vector>, 2> event_lanes_; + std::vector<pse_vector> pending_events_; // Sampler associations handles are managed by a helper class. util::handle_set<sampler_association_handle> sassoc_handles_; diff --git a/tests/global_communication/test_communicator.cpp b/tests/global_communication/test_communicator.cpp index 2a5238a47bdba20ec5cb2e3c5c713790c2c765fb..a914befcb608befbc77dc192934ac7f568261fb5 100644 --- a/tests/global_communication/test_communicator.cpp +++ b/tests/global_communication/test_communicator.cpp @@ -339,11 +339,8 @@ test_ring(const domain_decomposition& D, comm_type& C, F&& f) { } // generate the events - auto queues = C.make_event_queues(global_spikes); - if (queues.size() != D.groups.size()) { // one queue for each cell group - return ::testing::AssertionFailure() - << "expect one event queue for each cell group"; - } + std::vector<arb::pse_vector> queues(C.num_local_cells()); + C.make_event_queues(global_spikes, queues); // Assert that all the correct events were generated. // Iterate over each local gid, and testing whether an event is expected for @@ -433,7 +430,8 @@ test_all2all(const domain_decomposition& D, comm_type& C, F&& f) { } // generate the events - auto queues = C.make_event_queues(global_spikes); + std::vector<arb::pse_vector> queues(C.num_local_cells()); + C.make_event_queues(global_spikes, queues); if (queues.size() != D.groups.size()) { // one queue for each cell group return ::testing::AssertionFailure() << "expect one event queue for each cell group";