From c27757f98fb5e2c575a7d5b98e0590a1dfc6dbd7 Mon Sep 17 00:00:00 2001 From: Ben Cumming <louncharf@gmail.com> Date: Mon, 15 Jan 2018 14:29:17 +0100 Subject: [PATCH] Fix event-generator bugs in model (#439) There were two latent bugs in the event generation part of `model`. 1. A segmentation fault when initializing the `event_generators` in the `model` constructor caused by using an index variable after it had been incremented. 2. Events generated during the first epoch were not delivered on time. The first issue was simple to fix, by ensuring that the coutning variable is incremented at the end of the loop. The second issue required refactoring the event wrangling inside `model`. Events can be introduced into a model via three sources: 1. Generated by spike exchange 2. By calling the `model::inject_events()` interface 3. `event_generator`s attached to cells. The refactoring was required to ensure that all three sources are handled correctly. There is further opportunities for refactoring the code to make it a bit cleaner, specifically putting the wrangling code in its own type that could be tested seperately, outside `model`, but that is beyond the scope of this fix. --- src/communication/communicator.hpp | 19 ++-- src/merge_events.hpp | 8 +- src/model.cpp | 99 ++++++++++++------- src/model.hpp | 6 ++ .../test_communicator.cpp | 10 +- 5 files changed, 87 insertions(+), 55 deletions(-) diff --git a/src/communication/communicator.hpp b/src/communication/communicator.hpp index a5b13639..4466af13 100644 --- a/src/communication/communicator.hpp +++ b/src/communication/communicator.hpp @@ -150,17 +150,22 @@ public: /// Check each global spike in turn to see it generates local events. /// If so, make the events and insert them into the appropriate event list. - /// Return a vector that contains the event queues for each local cell group. /// - /// Returns a vector of event queues, with one queue for each local cell group. The - /// events in each queue are all events that must be delivered to targets in that cell - /// group as a result of the global spike exchange. - std::vector<pse_vector> make_event_queues(const gathered_vector<spike>& global_spikes) { + /// Takes reference to a vector of event lists as an argument, with one list + /// for each local cell group. On completion, the events in each list are + /// all events that must be delivered to targets in that cell group as a + /// result of the global spike exchange, plus any events that were already + /// in the list. + void make_event_queues( + const gathered_vector<spike>& global_spikes, + std::vector<pse_vector>& queues) + { + EXPECTS(queues.size()==num_local_cells_); + using util::subrange_view; using util::make_span; using util::make_range; - auto queues = std::vector<pse_vector>(num_local_cells_); const auto& sp = global_spikes.partition(); const auto& cp = connection_part_; for (auto dom: make_span(0, num_domains_)) { @@ -210,8 +215,6 @@ public: } } } - - return queues; } /// Returns the total number of global spikes over the duration of the simulation diff --git a/src/merge_events.hpp b/src/merge_events.hpp index bba08cbd..9976d6cc 100644 --- a/src/merge_events.hpp +++ b/src/merge_events.hpp @@ -13,7 +13,7 @@ namespace arb { // delivered after the current epoch ends. It merges events from multiple // sources: // lc : the list of currently enqueued events -// events : an unsorted list of events from the communicator +// pending_events : an unsorted list of events from the communicator // generators : a set of event_generators // // The time intervales are illustrated below, along with the range of times @@ -26,11 +26,11 @@ namespace arb { // |------|------| // // [----------------------] lc -// [---------------] events +// [---------------] pending_events // [------) generators // // The output list, stored in lf, will contain all the following: -// * all events in events list +// * all events in pending_events // * events in lc with time >= t₀ // * events from each generator with time < t₠// All events in lc that are to be delivered before t₀ are discared, along with @@ -39,7 +39,7 @@ namespace arb { void merge_events(time_type t0, time_type t1, const pse_vector& lc, - pse_vector& events, + pse_vector& pending_events, std::vector<event_generator_ptr>& generators, pse_vector& lf); diff --git a/src/model.cpp b/src/model.cpp index 47519576..3afb9f4d 100644 --- a/src/model.cpp +++ b/src/model.cpp @@ -18,13 +18,21 @@ namespace arb { model::model(const recipe& rec, const domain_decomposition& decomp): communicator_(rec, decomp) { - event_generators_.resize(communicator_.num_local_cells()); + const auto num_local_cells = communicator_.num_local_cells(); + + // Cache the minimum delay of the network + min_delay_ = communicator_.min_delay(); + + // Initialize empty buffers for pending events for each local cell + pending_events_.resize(num_local_cells); + + event_generators_.resize(num_local_cells); cell_local_size_type lidx = 0; const auto& grps = decomp.groups; for (auto i: util::make_span(0, grps.size())) { for (auto gid: grps[i].gids) { // Store mapping of gid to local cell index. - gid_to_local_[gid] = lidx++; + gid_to_local_[gid] = lidx; // Set up the event generators for cell gid. auto rec_gens = rec.event_generators(gid); @@ -39,6 +47,7 @@ model::model(const recipe& rec, const domain_decomposition& decomp): gens.push_back(std::move(g)); } } + ++lidx; } } @@ -51,35 +60,42 @@ model::model(const recipe& rec, const domain_decomposition& decomp): PL(2); }); - // Create event lane buffers. // There is one set for each epoch: current (0) and next (1). // For each epoch there is one lane for each cell in the cell group. - event_lanes_[0].resize(communicator_.num_local_cells()); - event_lanes_[1].resize(communicator_.num_local_cells()); + event_lanes_[0].resize(num_local_cells); + event_lanes_[1].resize(num_local_cells); } void model::reset() { t_ = 0.; + // Reset cell group state. for (auto& group: cell_groups_) { group->reset(); } + // Clear all pending events in the event lanes. for (auto& lanes: event_lanes_) { for (auto& lane: lanes) { lane.clear(); } } + // Reset all event generators, and advance to t_. for (auto& lane: event_generators_) { for (auto& gen: lane) { if (gen) { gen->reset(); + gen->advance(t_); } } } + for (auto& lane: pending_events_) { + lane.clear(); + } + communicator_.reset(); current_spikes().clear(); @@ -94,9 +110,7 @@ time_type model::run(time_type tfinal, time_type dt) { // If spike exchange and cell update are serialized, this is the // minimum delay of the network, however we use half this period // to overlap communication and computation. - time_type t_interval = communicator_.min_delay()/2; - - time_type tuntil; + const time_type t_interval = min_delay_/2; // task that updates cell state in parallel. auto update_cells = [&] () { @@ -135,28 +149,23 @@ time_type model::run(time_type tfinal, time_type dt) { PL(); PE("events","from-spikes"); - auto events = communicator_.make_event_queues(global_spikes); + communicator_.make_event_queues(global_spikes, pending_events_); PL(); PE("enqueue"); - threading::parallel_for::apply(0, communicator_.num_local_cells(), - [&](cell_size_type i) { - const auto epid = epoch_.id; - merge_events( - epoch_.tfinal, - epoch_.tfinal+std::min(t_+t_interval, tfinal), - event_lanes(epid)[i], - events[i], - event_generators_[i], - event_lanes(epid+1)[i]); - }); + const auto t0 = epoch_.tfinal; + const auto t1 = std::min(tfinal, t0+t_interval); + setup_events(t0, t1, epoch_.id); PL(2); PL(2); }; - tuntil = std::min(t_+t_interval, tfinal); + time_type tuntil = std::min(t_+t_interval, tfinal); epoch_ = epoch(0, tuntil); + PE("stepping", "communication", "events", "enqueue"); + setup_events(t_, tuntil, 1); + PL(4); while (t_<tfinal) { local_spikes_.exchange(); @@ -185,6 +194,28 @@ time_type model::run(time_type tfinal, time_type dt) { return t_; } +// Populate the event lanes for epoch+1 (i.e event_lanes_[epoch+1)] +// Update each lane in parallel, if supported by the threading backend. +// On completion event_lanes[epoch+1] will contain sorted lists of events with +// delivery times due in or after epoch+1. The events will be taken from the +// following sources: +// event_lanes[epoch]: take all events ≥ t_from +// event_generators : take all events < t_to +// pending_events : take all events +void model::setup_events(time_type t_from, time_type t_to, std::size_t epoch) { + const auto n = communicator_.num_local_cells(); + threading::parallel_for::apply(0, n, + [&](cell_size_type i) { + merge_events( + t_from, t_to, + event_lanes(epoch)[i], // in: the current event lane + pending_events_[i], // in: events from the communicator + event_generators_[i], // in: event generators for this lane + event_lanes(epoch+1)[i]); // out: the event lane for the next epoch + pending_events_[i].clear(); + }); +} + sampler_association_handle model::add_sampler(cell_member_predicate probe_ids, schedule sched, sampler_function f, sampling_policy policy) { sampler_association_handle h = sassoc_handles_.acquire(); @@ -248,28 +279,22 @@ util::optional<cell_size_type> model::local_cell_index(cell_gid_type gid) { } void model::inject_events(const pse_vector& events) { - auto& lanes = event_lanes(epoch_.id); - - // Append all events that are to be delivered to local cells to the - // appropriate lane. At the same time, keep track of which lanes have been - // modified, because the lanes will have to be sorted once all events have - // been added. - pse_vector local_events; - std::set<cell_size_type> modified_lanes; + // Push all events that are to be delivered to local cells into the + // pending event list for the event's target cell. for (auto& e: events) { if (e.time<t_) { - throw std::runtime_error("model::inject_events(): attempt to inject an event at time " + std::to_string(e.time) + ", when model state is at time " + std::to_string(t_)); + throw std::runtime_error( + "model::inject_events(): attempt to inject an event at time " + + std::to_string(e.time) + + ", when model state is at time " + + std::to_string(t_)); } + // local_cell_index returns an optional type that evaluates + // to true iff the gid is a local cell. if (auto lidx = local_cell_index(e.target.gid)) { - lanes[*lidx].push_back(e); - modified_lanes.insert(*lidx); + pending_events_[*lidx].push_back(e); } } - - // Sort events in the event lanes that were modified - for (auto l: modified_lanes) { - util::sort(lanes[l]); - } } } // namespace arb diff --git a/src/model.hpp b/src/model.hpp index 268e31e9..acac8703 100644 --- a/src/model.hpp +++ b/src/model.hpp @@ -60,6 +60,10 @@ public: void inject_events(const pse_vector& events); private: + // Private helper function that sets up the event lanes for an epoch. + // See comments on implementation for more information. + void setup_events(time_type t_from, time_type time_to, std::size_t epoch_id); + std::vector<pse_vector>& event_lanes(std::size_t epoch_id); std::size_t num_groups() const; @@ -68,6 +72,7 @@ private: epoch epoch_; time_type t_ = 0.; + time_type min_delay_; std::vector<cell_group_ptr> cell_groups_; // one set of event_generators for each local cell @@ -103,6 +108,7 @@ private: // Pending events to be delivered. std::array<std::vector<pse_vector>, 2> event_lanes_; + std::vector<pse_vector> pending_events_; // Sampler associations handles are managed by a helper class. util::handle_set<sampler_association_handle> sassoc_handles_; diff --git a/tests/global_communication/test_communicator.cpp b/tests/global_communication/test_communicator.cpp index 2a5238a4..a914befc 100644 --- a/tests/global_communication/test_communicator.cpp +++ b/tests/global_communication/test_communicator.cpp @@ -339,11 +339,8 @@ test_ring(const domain_decomposition& D, comm_type& C, F&& f) { } // generate the events - auto queues = C.make_event_queues(global_spikes); - if (queues.size() != D.groups.size()) { // one queue for each cell group - return ::testing::AssertionFailure() - << "expect one event queue for each cell group"; - } + std::vector<arb::pse_vector> queues(C.num_local_cells()); + C.make_event_queues(global_spikes, queues); // Assert that all the correct events were generated. // Iterate over each local gid, and testing whether an event is expected for @@ -433,7 +430,8 @@ test_all2all(const domain_decomposition& D, comm_type& C, F&& f) { } // generate the events - auto queues = C.make_event_queues(global_spikes); + std::vector<arb::pse_vector> queues(C.num_local_cells()); + C.make_event_queues(global_spikes, queues); if (queues.size() != D.groups.size()) { // one queue for each cell group return ::testing::AssertionFailure() << "expect one event queue for each cell group"; -- GitLab