diff --git a/arbor/benchmark_cell_group.cpp b/arbor/benchmark_cell_group.cpp index 4d4f1630053c4bab6e1d0c708e069f188d09fe9a..edd6dbe20d4a79884e87f9ce33e448887d70fd7a 100644 --- a/arbor/benchmark_cell_group.cpp +++ b/arbor/benchmark_cell_group.cpp @@ -33,8 +33,6 @@ benchmark_cell_group::benchmark_cell_group(const std::vector<cell_gid_type>& gid } void benchmark_cell_group::reset() { - t_ = 0; - for (auto& c: cells_) { c.time_sequence.reset(); } @@ -55,7 +53,7 @@ void benchmark_cell_group::advance(epoch ep, PE(advance_bench_cell); // Micro-seconds to advance in this epoch. - auto us = 1e3*(ep.tfinal-t_); + auto us = 1e3*(ep.duration()); for (auto i: util::make_span(0, gids_.size())) { // Expected time to complete epoch in micro seconds. const double duration_us = cells_[i].realtime_ratio*us; @@ -64,7 +62,7 @@ void benchmark_cell_group::advance(epoch ep, // Start timer. auto start = high_resolution_clock::now(); - auto spike_times = util::make_range(cells_[i].time_sequence.events(t_, ep.tfinal)); + auto spike_times = util::make_range(cells_[i].time_sequence.events(ep.t0, ep.t1)); for (auto t: spike_times) { spikes_.push_back({{gid, 0u}, t}); } @@ -74,7 +72,6 @@ void benchmark_cell_group::advance(epoch ep, // has elapsed, to emulate a "real" cell. while (duration_type(high_resolution_clock::now()-start).count() < duration_us); } - t_ = ep.tfinal; PL(); }; diff --git a/arbor/benchmark_cell_group.hpp b/arbor/benchmark_cell_group.hpp index f915eed0752b1b5704bae6c50f2023294a6b15f6..62c27abf3b1baf27642f5e34e1481c594137e0fa 100644 --- a/arbor/benchmark_cell_group.hpp +++ b/arbor/benchmark_cell_group.hpp @@ -34,8 +34,6 @@ public: void remove_all_samplers() override {} private: - time_type t_; - std::vector<benchmark_cell> cells_; std::vector<spike> spikes_; std::vector<cell_gid_type> gids_; diff --git a/arbor/epoch.hpp b/arbor/epoch.hpp index 85d2ebcea5f4baa89aaea150c712db48f1768cfc..34fd050a00398f9e16e68881579d186525309d29 100644 --- a/arbor/epoch.hpp +++ b/arbor/epoch.hpp @@ -7,31 +7,46 @@ namespace arb { -// Information about a current time integration epoch. -// Each epoch has an integral id, that is incremented for successive epochs. -// Time is divided as follows, where tfinal_i is tfinal for epoch i: +// An epoch describes an integration interval within an ongoing simulation. +// Epochs within a simulation are sequentially numbered by id, and each +// describe a half open time interval [t0, t1), where t1 for one epoch +// is t0 for the next. // -// epoch_0 : t < tfinal_0 -// epoch_1 : tfinal_0 <= t < tfinal_1 -// epoch_2 : tfinal_1 <= t < tfinal_2 -// epoch_3 : tfinal_2 <= t < tfinal_3 -// -// At the end of epoch_i the solution is at tfinal_i, however events that are -// due for delivery at tfinal_i are not delivered until epoch_i+1. +// At the end of an epoch the simulation state corresponds to the time t1, +// save for any pending events, which will be delivered at the beginning of +// the next epoch. struct epoch { - std::size_t id=0; - time_type tfinal=0; + std::ptrdiff_t id = -1; + time_type t0 = 0, t1 = 0; epoch() = default; + epoch(std::ptrdiff_t id, time_type t0, time_type t1): + id(id), t0(t0), t1(t1) {} - epoch(std::size_t id, time_type tfinal): id(id), tfinal(tfinal) {} + operator bool() const { + arb_assert(id>=-1); + return id>=0; + } - void advance(time_type t) { - arb_assert(t>=tfinal); - tfinal = t; + bool empty() const { + return t1<=t0; + } + + time_type duration() const { + return t1-t0; + } + + void advance_to(time_type next_t1) { + t0 = t1; + t1 = next_t1; ++id; } + + void reset() { + *this = epoch(); + } + }; } // namespace arb diff --git a/arbor/include/arbor/schedule.hpp b/arbor/include/arbor/schedule.hpp index d0087fa09f070668a065fcf992efd85ac7c247a8..7442c70e6fcb4290c3d9f32309b7acfa7fdf5ba5 100644 --- a/arbor/include/arbor/schedule.hpp +++ b/arbor/include/arbor/schedule.hpp @@ -4,12 +4,13 @@ #include <iterator> #include <memory> #include <random> +#include <type_traits> #include <utility> #include <vector> #include <arbor/assert.hpp> #include <arbor/common_types.hpp> -#include <arbor/util/compat.hpp> +#include <arbor/util/extra_traits.hpp> // Time schedules for probe–sampler associations. @@ -30,11 +31,11 @@ class schedule { public: schedule(); - template <typename Impl> + template <typename Impl, typename = std::enable_if_t<!std::is_same_v<util::remove_cvref_t<Impl>, schedule>>> explicit schedule(const Impl& impl): impl_(new wrap<Impl>(impl)) {} - template <typename Impl> + template <typename Impl, typename = std::enable_if_t<!std::is_same_v<util::remove_cvref_t<Impl>, schedule>>> explicit schedule(Impl&& impl): impl_(new wrap<Impl>(std::move(impl))) {} diff --git a/arbor/include/arbor/util/expected.hpp b/arbor/include/arbor/util/expected.hpp index 9f5d1a4307b3841b437beff302e232b2ff00f5aa..947783bc5c970e78a8576e962614e3a55bb36824 100644 --- a/arbor/include/arbor/util/expected.hpp +++ b/arbor/include/arbor/util/expected.hpp @@ -14,6 +14,8 @@ #include <utility> #include <variant> +#include <arbor/util/extra_traits.hpp> + namespace arb { namespace util { @@ -32,10 +34,6 @@ struct conversion_hazard: std::integral_constant<bool, template <typename T, typename X> inline constexpr bool conversion_hazard_v = conversion_hazard<T, X>::value; - -// TODO: C++20 replace with std::remove_cvref_t -template <typename X> -using remove_cvref_t = std::remove_cv_t<std::remove_reference_t<X>>; } // namespace detail struct unexpect_t {}; @@ -89,8 +87,8 @@ struct unexpected { template <typename F, typename = std::enable_if_t<std::is_constructible_v<E, F&&>>, - typename = std::enable_if_t<!std::is_same_v<std::in_place_t, detail::remove_cvref_t<F>>>, - typename = std::enable_if_t<!std::is_same_v<unexpected, detail::remove_cvref_t<F>>> + typename = std::enable_if_t<!std::is_same_v<std::in_place_t, remove_cvref_t<F>>>, + typename = std::enable_if_t<!std::is_same_v<unexpected, remove_cvref_t<F>>> > explicit unexpected(F&& f): value_(std::forward<F>(f)) {} @@ -207,9 +205,9 @@ struct expected { template < typename S, - typename = std::enable_if_t<!std::is_same_v<std::in_place_t, detail::remove_cvref_t<S>>>, - typename = std::enable_if_t<!std::is_same_v<expected, detail::remove_cvref_t<S>>>, - typename = std::enable_if_t<!std::is_same_v<unexpected<E>, detail::remove_cvref_t<S>>>, + typename = std::enable_if_t<!std::is_same_v<std::in_place_t, remove_cvref_t<S>>>, + typename = std::enable_if_t<!std::is_same_v<expected, remove_cvref_t<S>>>, + typename = std::enable_if_t<!std::is_same_v<unexpected<E>, remove_cvref_t<S>>>, typename = std::enable_if_t<std::is_constructible_v<T, S&&>> > expected(S&& x): data_(std::in_place_index<0>, std::forward<S>(x)) {} @@ -234,7 +232,7 @@ struct expected { template < typename S, - typename = std::enable_if_t<!std::is_same_v<expected, detail::remove_cvref_t<S>>>, + typename = std::enable_if_t<!std::is_same_v<expected, remove_cvref_t<S>>>, typename = std::enable_if_t<std::is_constructible_v<T, S>>, typename = std::enable_if_t<std::is_assignable_v<T&, S>> > diff --git a/arbor/include/arbor/util/extra_traits.hpp b/arbor/include/arbor/util/extra_traits.hpp new file mode 100644 index 0000000000000000000000000000000000000000..62653158f549cc09e133d2e5fcc050a1e2a9b74b --- /dev/null +++ b/arbor/include/arbor/util/extra_traits.hpp @@ -0,0 +1,17 @@ +#pragma once + +namespace arb { +namespace util { + +// TODO: C++20 replace with std::remove_cvref, std::remove_cvref_t + +template <typename T> +struct remove_cvref { + typedef std::remove_cv_t<std::remove_reference_t<T>> type; +}; + +template <typename T> +using remove_cvref_t = typename remove_cvref<T>::type; + +} // namespace util +} // namespace arb diff --git a/arbor/include/arbor/util/unique_any.hpp b/arbor/include/arbor/util/unique_any.hpp index 0e5d7834e03e8220e0ee3a1a8600349580da5839..3b4ebffce308929d152ffcbb2df59bf387a6735e 100644 --- a/arbor/include/arbor/util/unique_any.hpp +++ b/arbor/include/arbor/util/unique_any.hpp @@ -6,6 +6,7 @@ #include <type_traits> #include <arbor/util/any_cast.hpp> +#include <arbor/util/extra_traits.hpp> // A non copyable variant of std::any. The two main use cases for such a // container are: @@ -43,13 +44,6 @@ namespace arb { namespace util { -namespace detail { - // TODO: C++20 replace with std::remove_cvref_t - template <typename T> - using remove_cvref_t = std::remove_cv_t<std::remove_reference_t<T>>; -} - - class unique_any { public: constexpr unique_any() = default; @@ -60,8 +54,8 @@ public: template < typename T, - typename = std::enable_if_t<!std::is_same_v<detail::remove_cvref_t<T>, unique_any>>, - typename = std::enable_if_t<!std::is_same_v<detail::remove_cvref_t<T>, std::any>> + typename = std::enable_if_t<!std::is_same_v<remove_cvref_t<T>, unique_any>>, + typename = std::enable_if_t<!std::is_same_v<remove_cvref_t<T>, std::any>> > unique_any(T&& other) { state_.reset(new model<contained_type<T>>(std::forward<T>(other))); @@ -74,8 +68,8 @@ public: template < typename T, - typename = std::enable_if_t<!std::is_same_v<detail::remove_cvref_t<T>, unique_any>>, - typename = std::enable_if_t<!std::is_same_v<detail::remove_cvref_t<T>, std::any>> + typename = std::enable_if_t<!std::is_same_v<remove_cvref_t<T>, unique_any>>, + typename = std::enable_if_t<!std::is_same_v<remove_cvref_t<T>, std::any>> > unique_any& operator=(T&& other) { state_.reset(new model<contained_type<T>>(std::forward<T>(other))); @@ -166,7 +160,7 @@ T* any_cast(unique_any* operand) { template<class T> T any_cast(const unique_any& operand) { - using U = detail::remove_cvref_t<T>; + using U = remove_cvref_t<T>; static_assert(std::is_constructible<T, const U&>::value, "any_cast type can't construct copy of contained object"); @@ -179,7 +173,7 @@ T any_cast(const unique_any& operand) { template<class T> T any_cast(unique_any& operand) { - using U = detail::remove_cvref_t<T>; + using U = remove_cvref_t<T>; static_assert(std::is_constructible<T, U&>::value, "any_cast type can't construct copy of contained object"); @@ -192,7 +186,7 @@ T any_cast(unique_any& operand) { template<class T> T any_cast(unique_any&& operand) { - using U = detail::remove_cvref_t<T>; + using U = remove_cvref_t<T>; static_assert(std::is_constructible<T, U&&>::value, "any_cast type can't construct copy of contained object"); diff --git a/arbor/lif_cell_group.cpp b/arbor/lif_cell_group.cpp index 1ec02ba98de55500c8770317577f0deecff162a4..d3f84f8ae7ccd2d1ddd2ffa2e8afec0b884ddaf2 100644 --- a/arbor/lif_cell_group.cpp +++ b/arbor/lif_cell_group.cpp @@ -3,6 +3,7 @@ #include <lif_cell_group.hpp> #include "profile/profiler_macro.hpp" +#include "util/rangeutil.hpp" #include "util/span.hpp" using namespace arb; @@ -36,7 +37,7 @@ void lif_cell_group::advance(epoch ep, time_type dt, const event_lane_subrange& if (event_lanes.size() > 0) { for (auto lid: util::make_span(gids_.size())) { // Advance each cell independently. - advance_cell(ep.tfinal, dt, lid, event_lanes[lid]); + advance_cell(ep.t1, dt, lid, event_lanes[lid]); } } PL(); @@ -62,7 +63,7 @@ void lif_cell_group::set_binning_policy(binning_kind policy, time_type bin_inter void lif_cell_group::reset() { spikes_.clear(); - last_time_updated_.clear(); + util::fill(last_time_updated_, 0.); } // Advances a single cell (lid) with the exact solution (jumps can be arbitrary). diff --git a/arbor/mc_cell_group.cpp b/arbor/mc_cell_group.cpp index 2a013afe696e8f1668900a84a8bce36631f365a5..395f426e4b918f375caaf42315298f6476cd8962 100644 --- a/arbor/mc_cell_group.cpp +++ b/arbor/mc_cell_group.cpp @@ -398,7 +398,7 @@ void mc_cell_group::advance(epoch ep, time_type dt, const event_lane_subrange& e auto curr_intdom = cell_to_intdom_[lid]; for (auto e: lane) { - if (e.time>=ep.tfinal) break; + if (e.time>=ep.t1) break; e.time = binners_[lid].bin(e.time, tstart); auto h = target_handles_[target_handle_divisions_[lid]+e.target.index]; auto ev = deliverable_event(e.time, h, e.weight); @@ -452,7 +452,7 @@ void mc_cell_group::advance(epoch ep, time_type dt, const event_lane_subrange& e // Ignore sampler_association_handle, just need the association itself. sampler_association& sa = sm_entry.second; - auto sample_times = util::make_range(sa.sched.events(tstart, ep.tfinal)); + auto sample_times = util::make_range(sa.sched.events(tstart, ep.t1)); if (sample_times.empty()) { continue; } @@ -511,7 +511,7 @@ void mc_cell_group::advance(epoch ep, time_type dt, const event_lane_subrange& e PL(); // Run integration and collect samples, spikes. - auto result = lowered_->integrate(ep.tfinal, dt, staged_events_, std::move(sample_events)); + auto result = lowered_->integrate(ep.t1, dt, staged_events_, std::move(sample_events)); // For each sampler callback registered in `call_info`, construct the // vector of sample entries from the lowered cell sample times and values diff --git a/arbor/mc_cell_group.hpp b/arbor/mc_cell_group.hpp index e342dbf13633d981677f5a06e9c07b5ad3d22145..f93e4ebb582d73a80a18681c9efc434ac0486405 100644 --- a/arbor/mc_cell_group.hpp +++ b/arbor/mc_cell_group.hpp @@ -18,12 +18,7 @@ #include "event_binner.hpp" #include "event_queue.hpp" #include "fvm_lowered_cell.hpp" -#include "profile/profiler_macro.hpp" #include "sampler_map.hpp" -#include "util/double_buffer.hpp" -#include "util/filter.hpp" -#include "util/partition.hpp" -#include "util/range.hpp" namespace arb { diff --git a/arbor/simulation.cpp b/arbor/simulation.cpp index 726e798ed516be5e8d9b6a5903fad0f6bb8031c6..9dae450c4f51cc610d647e752ead57222ad7f9ab 100644 --- a/arbor/simulation.cpp +++ b/arbor/simulation.cpp @@ -17,7 +17,6 @@ #include "merge_events.hpp" #include "thread_private_spike_store.hpp" #include "threading/threading.hpp" -#include "util/double_buffer.hpp" #include "util/filter.hpp" #include "util/maputil.hpp" #include "util/partition.hpp" @@ -26,29 +25,67 @@ namespace arb { -class spike_double_buffer { - util::double_buffer<thread_private_spike_store> buffer_; +template <typename Seq, typename Value, typename Less = std::less<>> +auto split_sorted_range(Seq&& seq, const Value& v, Less cmp = Less{}) { + auto canon = util::canonical_view(seq); + auto it = std::lower_bound(canon.begin(), canon.end(), v, cmp); + return std::make_pair( + util::make_range(seq.begin(), it), + util::make_range(it, seq.end())); +} + +// Create a new cell event_lane vector from sorted pending events, previous event_lane events, +// and events from event generators for the given interval. +void merge_cell_events( + time_type t_from, + time_type t_to, + event_span old_events, + event_span pending, + std::vector<event_generator>& generators, + pse_vector& new_events) +{ + PE(communication_enqueue_setup); + new_events.clear(); + old_events = split_sorted_range(old_events, t_from, event_time_less()).second; + PL(); + + if (!generators.empty()) { + PE(communication_enqueue_setup); + // Tree-merge events in [t_from, t_to) from old, pending and generator events. + + std::vector<event_span> spanbuf; + spanbuf.reserve(2+generators.size()); + + auto old_split = split_sorted_range(old_events, t_to, event_time_less()); + auto pending_split = split_sorted_range(pending, t_to, event_time_less()); + + spanbuf.push_back(old_split.first); + spanbuf.push_back(pending_split.first); + + for (auto& g: generators) { + event_span evs = g.events(t_from, t_to); + if (!evs.empty()) { + spanbuf.push_back(evs); + } + } + PL(); + + PE(communication_enqueue_tree); + tree_merge_events(spanbuf, new_events); + PL(); + + old_events = old_split.second; + pending = pending_split.second; + } + + // Merge (remaining) old and pending events. + PE(communication_enqueue_merge); + auto n = new_events.size(); + new_events.resize(n+pending.size()+old_events.size()); + std::merge(pending.begin(), pending.end(), old_events.begin(), old_events.end(), new_events.begin()+n); + PL(); +} -public: - // Convenience functions that map the spike buffers onto the appropriate - // integration interval. - // - // To overlap communication and computation, integration intervals of - // size Delta/2 are used, where Delta is the minimum delay in the global - // system. - // From the frame of reference of the current integration period we - // define three intervals: previous, current and future - // Then we define the following : - // current: spikes generated in the current interval - // previous: spikes generated in the preceding interval - - spike_double_buffer(thread_private_spike_store l, thread_private_spike_store r): - buffer_(std::move(l), std::move(r)) {} - - thread_private_spike_store& current() { return buffer_.get(); } - thread_private_spike_store& previous() { return buffer_.other(); } - void exchange() { buffer_.exchange(); } -}; class simulation_state { public: @@ -79,26 +116,17 @@ public: spike_export_function local_export_callback_; private: - // Private helper function that sets up the event lanes for an epoch. - // See comments on implementation for more information. - void setup_events(time_type t_from, time_type time_to, std::size_t epoch_id); - - std::vector<pse_vector>& event_lanes(std::size_t epoch_id) { - return event_lanes_[epoch_id%2]; - } - - // keep track of information about the current integration interval + // Record last computed epoch (integration interval). epoch epoch_; - time_type t_ = 0.; - time_type min_delay_; + // Maximum epoch duration. + time_type t_interval_ = 0; + std::vector<cell_group_ptr> cell_groups_; - // one set of event_generators for each local cell + // One set of event_generators for each local cell std::vector<std::vector<event_generator>> event_generators_; - std::unique_ptr<spike_double_buffer> local_spikes_; - // Hash table for looking up the the local index of a cell with a given gid struct gid_local_info { cell_size_type cell_index; @@ -111,8 +139,19 @@ private: task_system_handle task_system_; // Pending events to be delivered. - std::array<std::vector<pse_vector>, 2> event_lanes_; std::vector<pse_vector> pending_events_; + std::array<std::vector<pse_vector>, 2> event_lanes_; + + std::vector<pse_vector>& event_lanes(std::ptrdiff_t epoch_id) { + return event_lanes_[epoch_id&1]; + } + + // Spikes generated by local cell groups. + std::array<thread_private_spike_store, 2> local_spikes_; + + thread_private_spike_store& local_spikes(std::ptrdiff_t epoch_id) { + return local_spikes_[epoch_id&1]; + } // Sampler associations handles are managed by a helper class. util::handle_set<sampler_association_handle> sassoc_handles_; @@ -131,6 +170,12 @@ private: threading::parallel_for::apply(0, cell_groups_.size(), task_system_.get(), [&, fn = std::forward<L>(fn)](int i) { fn(cell_groups_[i], i); }); } + + // Apply a functional to each local cell in parallel. + template <typename L> + void foreach_cell(L&& fn) { + threading::parallel_for::apply(0, communicator_.num_local_cells(), task_system_.get(), fn); + } }; simulation_state::simulation_state( @@ -138,15 +183,14 @@ simulation_state::simulation_state( const domain_decomposition& decomp, execution_context ctx ): - local_spikes_(new spike_double_buffer(thread_private_spike_store(ctx.thread_pool), - thread_private_spike_store(ctx.thread_pool))), communicator_(rec, decomp, ctx), - task_system_(ctx.thread_pool) + task_system_(ctx.thread_pool), + local_spikes_({thread_private_spike_store(ctx.thread_pool), thread_private_spike_store(ctx.thread_pool)}) { const auto num_local_cells = communicator_.num_local_cells(); - // Cache the minimum delay of the network - min_delay_ = communicator_.min_delay(); + // Use half minimum delay of the network for max integration interval. + t_interval_ = communicator_.min_delay()/2; // Initialize empty buffers for pending events for each local cell pending_events_.resize(num_local_cells); @@ -176,18 +220,19 @@ simulation_state::simulation_state( }); // Create event lane buffers. - // There is one set for each epoch: current (0) and next (1). - // For each epoch there is one lane for each cell in the cell group. + // One buffer is consumed by cell group updates while the other is filled with events for + // the following epoch. In each buffer there is one lane for each local cell. event_lanes_[0].resize(num_local_cells); event_lanes_[1].resize(num_local_cells); + + epoch_.reset(); } void simulation_state::reset() { - t_ = 0.; + epoch_ = epoch(); // Reset cell group state. - foreach_group( - [](cell_group_ptr& group) { group->reset(); }); + foreach_group([](cell_group_ptr& group) { group->reset(); }); // Clear all pending events in the event lanes. for (auto& lanes: event_lanes_) { @@ -209,173 +254,162 @@ void simulation_state::reset() { communicator_.reset(); - local_spikes_->current().clear(); - local_spikes_->previous().clear(); + for (auto& spikes: local_spikes_) { + spikes.clear(); + } + + epoch_.reset(); } time_type simulation_state::run(time_type tfinal, time_type dt) { - // Calculate the size of the largest possible time integration interval - // before communication of spikes is required. - // If spike exchange and cell update are serialized, this is the - // minimum delay of the network, however we use half this period - // to overlap communication and computation. - const time_type t_interval = min_delay_/2; - - // task that updates cell state in parallel. - auto update_cells = [&] () { + // Progress simulation to time tfinal, through a series of integration epochs + // of length at most t_interval_. t_interval_ is chosen to be no more than + // than half the network minimum delay. + // + // There are three simulation tasks that can be run partially in parallel: + // + // 1. Update: + // Ask each cell group to update their state to the end of the integration epoch. + // Generated spikes are stored in local_spikes_ for this epoch. + // + // 2. Exchange: + // Consume local spikes held in local_spikes_ from a previous update, and collect + // such spikes from across all ranks. + // Translate spikes to local postsynaptic spike events, to be appended to pending_events_. + // + // 3. Enqueue events: + // Take events from pending_events_, together with any event-generator events for the + // next epoch and any left over events from the last epoch, and collate them into + // the per-cell event_lanes for the next epoch. + // + // Writing U(k) for Update on kth epoch; D(k) for Exchange of spikes generated in the kth epoch; + // and E(k) for Enqueue of the events required for the kth epoch, there are the following + // dependencies: + // + // * E(k) precedes U(k). + // * U(k) precedes D(k). + // * U(k) precedes U(k+1). + // * D(k) precedes E(k+2). + // * D(k) precedes D(k+1). + // + // In the schedule implemented below, U(k) and D(k-1) or U(k) and E(k+1) can be run + // in parallel, while D and E operations must be serialized (D writes to pending_events_, + // while E consumes and clears it). The local spike collection and the per-cell event + // lanes are double buffered. + // + // Required state on run() invocation with epoch_.id==k: + // * For k≥0, U(k) and D(k) have completed. + // + // Requires state at end of run(), with epoch_.id==k: + // * U(k) and D(k) have compelted. + + + if (tfinal<=epoch_.t1) return epoch_.t1; + + // Compute following epoch, with max time tfinal. + auto next_epoch = [tfinal](epoch e, time_type interval) -> epoch { + epoch next = e; + next.advance_to(std::min(next.t1+interval, tfinal)); + return next; + }; + + // Update task: advance cell groups to end of current epoch and store spikes in local_spikes_. + auto update = [this, dt](epoch current) { + local_spikes(current.id).clear(); foreach_group_index( [&](cell_group_ptr& group, int i) { - auto queues = util::subrange_view(event_lanes(epoch_.id), communicator_.group_queue_range(i)); - group->advance(epoch_, dt, queues); + auto queues = util::subrange_view(event_lanes(current.id), communicator_.group_queue_range(i)); + group->advance(current, dt, queues); PE(advance_spikes); - local_spikes_->current().insert(group->spikes()); + local_spikes(current.id).insert(group->spikes()); group->clear_spikes(); PL(); }); }; - // task that performs spike exchange with the spikes generated in - // the previous integration period, generating the postsynaptic - // events that must be delivered at the start of the next - // integration period at the latest. - auto exchange = [&] () { + // Exchange task: gather previous locally generated spikes, distribute across all ranks, and deliver + // post-synaptic spike events to per-cell pending event vectors. + auto exchange = [this](epoch prev) { + // Collate locally generated spikes. PE(communication_exchange_gatherlocal); - auto local_spikes = local_spikes_->previous().gather(); + auto all_local_spikes = local_spikes(prev.id).gather(); PL(); - auto global_spikes = communicator_.exchange(local_spikes); + // Gather generated spikes across all ranks. + auto global_spikes = communicator_.exchange(all_local_spikes); + // Present spikes to user-supplied callbacks. PE(communication_spikeio); if (local_export_callback_) { - local_export_callback_(local_spikes); + local_export_callback_(all_local_spikes); } if (global_export_callback_) { global_export_callback_(global_spikes.values()); } PL(); + // Append events formed from global spikes to per-cell pending event queues. PE(communication_walkspikes); communicator_.make_event_queues(global_spikes, pending_events_); PL(); - - const auto t0 = epoch_.tfinal; - const auto t1 = std::min(tfinal, t0+t_interval); - setup_events(t0, t1, epoch_.id); }; - time_type tuntil = std::min(t_+t_interval, tfinal); - epoch_ = epoch(0, tuntil); - setup_events(t_, tuntil, 1); - while (t_<tfinal) { - local_spikes_->exchange(); - - // empty the spike buffers for the current integration period. - // these buffers will store the new spikes generated in update_cells. - local_spikes_->current().clear(); - - // run the tasks, overlapping if the threading model and number of - // available threads permits it. - threading::task_group g(task_system_.get()); - g.run(exchange); - g.run(update_cells); - g.wait(); - - t_ = tuntil; - - tuntil = std::min(t_+t_interval, tfinal); - epoch_.advance(tuntil); - } - - // Run the exchange one last time to ensure that all spikes are output to file. - local_spikes_->exchange(); - exchange(); - - return t_; -} + // Enqueue task: build event_lanes for next epoch from pending events, event-generator events for the + // next epoch, and with any unprocessed events the current event_lanes. + auto enqueue = [this](epoch next) { + foreach_cell( + [&](cell_size_type i) { + PE(communication_enqueue_sort); + util::sort(pending_events_[i]); + PL(); -template <typename Seq, typename Value, typename Less = std::less<>> -auto split_sorted_range(Seq&& seq, const Value& v, Less cmp = Less{}) { - auto canon = util::canonical_view(seq); - auto it = std::lower_bound(canon.begin(), canon.end(), v, cmp); - return std::make_pair( - util::make_range(seq.begin(), it), - util::make_range(it, seq.end())); -} + event_span pending = util::range_pointer_view(pending_events_[i]); + event_span old_events = util::range_pointer_view(event_lanes(next.id-1)[i]); -// Populate the event lanes for epoch+1 (i.e event_lanes_[epoch+1)] -// Update each lane in parallel, if supported by the threading backend. -// On completion event_lanes[epoch+1] will contain sorted lists of events with -// delivery times due in or after epoch+1. The events will be taken from the -// following sources: -// event_lanes[epoch]: take all events ≥ t_from -// event_generators : take all events < t_to -// pending_events : take all events + merge_cell_events(next.t0, next.t1, old_events, pending, event_generators_[i], event_lanes(next.id)[i]); + pending_events_[i].clear(); + }); + }; -// merge_cell_events() is a separate function for unit testing purposes. -void merge_cell_events( - time_type t_from, - time_type t_to, - event_span old_events, - event_span pending, - std::vector<event_generator>& generators, - pse_vector& new_events) -{ - PE(communication_enqueue_setup); - new_events.clear(); - old_events = split_sorted_range(old_events, t_from, event_time_less()).second; - PL(); + threading::task_group g(task_system_.get()); - if (!generators.empty()) { - PE(communication_enqueue_setup); - // Tree-merge events in [t_from, t_to) from old, pending and generator events. + epoch prev = epoch_; + epoch current = next_epoch(prev, t_interval_); + epoch next = next_epoch(current, t_interval_); - std::vector<event_span> spanbuf; - spanbuf.reserve(2+generators.size()); + if (next.empty()) { + enqueue(current); + update(current); + exchange(current); + } + else { + enqueue(current); - auto old_split = split_sorted_range(old_events, t_to, event_time_less()); - auto pending_split = split_sorted_range(pending, t_to, event_time_less()); + g.run([&]() { enqueue(next); }); + g.run([&]() { update(current); }); + g.wait(); - spanbuf.push_back(old_split.first); - spanbuf.push_back(pending_split.first); + for (;;) { + prev = current; + current = next; + next = next_epoch(next, t_interval_); + if (next.empty()) break; - for (auto& g: generators) { - event_span evs = g.events(t_from, t_to); - if (!evs.empty()) { - spanbuf.push_back(evs); - } + g.run([&]() { exchange(prev); enqueue(next); }); + g.run([&]() { update(current); }); + g.wait(); } - PL(); - PE(communication_enqueue_tree); - tree_merge_events(spanbuf, new_events); - PL(); + g.run([&]() { exchange(prev); }); + g.run([&]() { update(current); }); + g.wait(); - old_events = old_split.second; - pending = pending_split.second; + exchange(current); } - // Merge (remaining) old and pending events. - PE(communication_enqueue_merge); - auto n = new_events.size(); - new_events.resize(n+pending.size()+old_events.size()); - std::merge(pending.begin(), pending.end(), old_events.begin(), old_events.end(), new_events.begin()+n); - PL(); -} - -void simulation_state::setup_events(time_type t_from, time_type t_to, std::size_t epoch) { - const auto n = communicator_.num_local_cells(); - threading::parallel_for::apply(0, n, task_system_.get(), - [&](cell_size_type i) { - PE(communication_enqueue_sort); - util::sort(pending_events_[i]); - PL(); - - event_span pending = util::range_pointer_view(pending_events_[i]); - event_span old_events = util::range_pointer_view(event_lanes(epoch)[i]); - - merge_cell_events(t_from, t_to, old_events, pending, event_generators_[i], event_lanes(epoch+1)[i]); - pending_events_[i].clear(); - }); + // Record current epoch for next run() invocation. + epoch_ = current; + return current.t1; } sampler_association_handle simulation_state::add_sampler( @@ -424,8 +458,8 @@ void simulation_state::inject_events(const pse_vector& events) { // Push all events that are to be delivered to local cells into the // pending event list for the event's target cell. for (auto& e: events) { - if (e.time<t_) { - throw bad_event_time(e.time, t_); + if (e.time<epoch_.t1) { + throw bad_event_time(e.time, epoch_.t1); } // gid_to_local_ maps gid to index in local cells and of corresponding cell group. if (auto lidx = util::value_by_key(gid_to_local_, e.target.gid)) { diff --git a/arbor/spike_source_cell_group.cpp b/arbor/spike_source_cell_group.cpp index 025d94e9a28677ed847ebba74bd14afc7c68977e..856eec81a7b7c3d6da1fe3d157a6e110c70be81d 100644 --- a/arbor/spike_source_cell_group.cpp +++ b/arbor/spike_source_cell_group.cpp @@ -43,11 +43,10 @@ void spike_source_cell_group::advance(epoch ep, time_type dt, const event_lane_s for (auto i: util::count_along(gids_)) { const auto gid = gids_[i]; - for (auto t: util::make_range(time_sequences_[i].events(t_, ep.tfinal))) { + for (auto t: util::make_range(time_sequences_[i].events(ep.t0, ep.t1))) { spikes_.push_back({{gid, 0u}, t}); } } - t_ = ep.tfinal; PL(); }; @@ -56,8 +55,6 @@ void spike_source_cell_group::reset() { for (auto& s: time_sequences_) { s.reset(); } - t_ = 0; - clear_spikes(); } diff --git a/arbor/spike_source_cell_group.hpp b/arbor/spike_source_cell_group.hpp index ff2d4b4986ede884332cdc57fc2e5d80325fb476..3d36a376092b576b3698c984b1d4b2b02e833ece 100644 --- a/arbor/spike_source_cell_group.hpp +++ b/arbor/spike_source_cell_group.hpp @@ -36,7 +36,6 @@ public: void remove_all_samplers() override {} private: - time_type t_ = 0; std::vector<spike> spikes_; std::vector<cell_gid_type> gids_; std::vector<schedule> time_sequences_; diff --git a/arbor/util/double_buffer.hpp b/arbor/util/double_buffer.hpp deleted file mode 100644 index 845a5ce9457722e4688403007923d444002e75c3..0000000000000000000000000000000000000000 --- a/arbor/util/double_buffer.hpp +++ /dev/null @@ -1,70 +0,0 @@ -#pragma once - -#include <atomic> -#include <vector> - -#include <arbor/assert.hpp> - -namespace arb { -namespace util { - -/// double buffer with thread safe exchange/flip operation. -template <typename T> -class double_buffer { -private: - std::atomic<int> index_; - std::vector<T> buffers_; - - int other_index() { - return index_ ? 0 : 1; - } - -public: - using value_type = T; - - double_buffer() : - index_(0), buffers_(2) - {} - - double_buffer(T l, T r): index_(0) { - buffers_.reserve(2); - buffers_.push_back(std::move(l)); - buffers_.push_back(std::move(r)); - } - - /// remove the copy and move constructors which won't work with std::atomic - double_buffer(double_buffer&&) = delete; - double_buffer(const double_buffer&) = delete; - double_buffer& operator=(const double_buffer&) = delete; - double_buffer& operator=(double_buffer&&) = delete; - - /// flip the buffers in a thread safe manner - /// n calls to exchange will always result in n flips - void exchange() { - // use operator^= which is overloaded by std::atomic<> - index_ ^= 1; - } - - /// get the current/front buffer - value_type& get() { - return buffers_[index_]; - } - - /// get the current/front buffer - const value_type& get() const { - return buffers_[index_]; - } - - /// get the back buffer - value_type& other() { - return buffers_[other_index()]; - } - - /// get the back buffer - const value_type& other() const { - return buffers_[other_index()]; - } -}; - -} // namespace util -} // namespace arb diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 7e649daaa0687fa9ad3153e96909c3febf668bb5..43418b99dbd01b6370f47ea275ad4152d8097187 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -97,7 +97,6 @@ set(unit_sources test_cycle.cpp test_domain_decomposition.cpp test_dry_run_context.cpp - test_double_buffer.cpp test_event_binner.cpp test_event_delivery.cpp test_event_generators.cpp @@ -147,6 +146,7 @@ set(unit_sources test_scope_exit.cpp test_segment_tree.cpp test_simd.cpp + test_simulation.cpp test_span.cpp test_spike_source.cpp test_spikes.cpp diff --git a/test/unit/test_double_buffer.cpp b/test/unit/test_double_buffer.cpp deleted file mode 100644 index b8ec6ef75cfa4c71a284d85d636d0f53e2b82705..0000000000000000000000000000000000000000 --- a/test/unit/test_double_buffer.cpp +++ /dev/null @@ -1,58 +0,0 @@ -#include "../gtest.h" - -#include <util/double_buffer.hpp> - -// not much to test here: just test that values passed into the constructor -// are correctly stored in members -TEST(double_buffer, exchange_and_get) -{ - using namespace arb::util; - - double_buffer<int> buf; - - buf.get() = 2134; - buf.exchange(); - buf.get() = 8990; - buf.exchange(); - - EXPECT_EQ(buf.get(), 2134); - EXPECT_EQ(buf.other(), 8990); - buf.exchange(); - EXPECT_EQ(buf.get(), 8990); - EXPECT_EQ(buf.other(), 2134); - buf.exchange(); - EXPECT_EQ(buf.get(), 2134); - EXPECT_EQ(buf.other(), 8990); -} - -TEST(double_buffer, assign_get_other) -{ - using namespace arb::util; - - double_buffer<std::string> buf; - - buf.get() = "1"; - buf.other() = "2"; - - EXPECT_EQ(buf.get(), "1"); - EXPECT_EQ(buf.other(), "2"); -} - -TEST(double_buffer, non_pod) -{ - using namespace arb::util; - - double_buffer<std::string> buf; - - buf.get() = "1"; - buf.other() = "2"; - - EXPECT_EQ(buf.get(), "1"); - EXPECT_EQ(buf.other(), "2"); - buf.exchange(); - EXPECT_EQ(buf.get(), "2"); - EXPECT_EQ(buf.other(), "1"); - buf.exchange(); - EXPECT_EQ(buf.get(), "1"); - EXPECT_EQ(buf.other(), "2"); -} diff --git a/test/unit/test_mc_cell_group.cpp b/test/unit/test_mc_cell_group.cpp index 25f54f2bee9d13ec222463e67e32fe146ae75659..efc5fbff8ad6eb71f4a0143cffa297e55e6db6f7 100644 --- a/test/unit/test_mc_cell_group.cpp +++ b/test/unit/test_mc_cell_group.cpp @@ -54,7 +54,7 @@ TEST(mc_cell_group, test) { rec.nernst_ion("k"); mc_cell_group group{{0}, rec, lowered_cell()}; - group.advance(epoch(0, 50), 0.01, {}); + group.advance(epoch(0, 0., 50.), 0.01, {}); // Model is expected to generate 4 spikes as a result of the // fixed stimulus over 50 ms. diff --git a/test/unit/test_simulation.cpp b/test/unit/test_simulation.cpp new file mode 100644 index 0000000000000000000000000000000000000000..dda5be17163645f5815f5d05326836eddaa5c151 --- /dev/null +++ b/test/unit/test_simulation.cpp @@ -0,0 +1,174 @@ +#include "../gtest.h" + +#include <random> +#include <vector> + +#include <arbor/common_types.hpp> +#include <arbor/domain_decomposition.hpp> +#include <arbor/load_balance.hpp> +#include <arbor/lif_cell.hpp> +#include <arbor/recipe.hpp> +#include <arbor/simulation.hpp> +#include <arbor/spike_source_cell.hpp> + +#include "util/rangeutil.hpp" +#include "util/transform.hpp" + +#include "common.hpp" +using namespace arb; + +struct play_spikes: public recipe { + play_spikes(std::vector<schedule> spike_times): spike_times_(std::move(spike_times)) {} + + cell_size_type num_cells() const override { return spike_times_.size(); } + cell_kind get_cell_kind(cell_gid_type) const override { return cell_kind::spike_source; } + cell_size_type num_sources(cell_gid_type) const override { return 1; } + cell_size_type num_targets(cell_gid_type) const override { return 0; } + util::unique_any get_cell_description(cell_gid_type gid) const override { + return spike_source_cell{spike_times_.at(gid)}; + } + + std::vector<schedule> spike_times_; +}; + +static auto n_thread_context(unsigned n_thread) { + return make_context(proc_allocation(std::max((int)n_thread, 1), -1)); +} + +TEST(simulation, spike_global_callback) { + constexpr unsigned n = 5; + double t_max = 10.; + + std::vector<schedule> spike_times; + for (unsigned i = 0; i<n; ++i) { + spike_times.push_back(poisson_schedule(0., 20./t_max, std::minstd_rand(1000+i))); + } + + std::vector<spike> expected_spikes; + for (unsigned i = 0; i<n; ++i) { + util::append(expected_spikes, util::transform_view(util::make_range(spike_times.at(i).events(0, t_max)), + [i](double time) { return spike({i, 0}, time); })); + spike_times.at(i).reset(); + } + + play_spikes rec(spike_times); + auto ctx = n_thread_context(4); + auto decomp = partition_load_balance(rec, ctx); + simulation sim(rec, decomp, ctx); + + std::vector<spike> collected; + sim.set_global_spike_callback([&](const std::vector<spike>& spikes) { + collected.insert(collected.end(), spikes.begin(), spikes.end()); + }); + + double tfinal = 0.7*t_max; + constexpr double dt = 0.01; + sim.run(tfinal, dt); + + auto spike_lt = [](spike a, spike b) { return a.time<b.time || (a.time==b.time && a.source<b.source); }; + std::sort(expected_spikes.begin(), expected_spikes.end(), spike_lt); + std::sort(collected.begin(), collected.end(), spike_lt); + + auto cut_from = std::partition_point(expected_spikes.begin(), expected_spikes.end(), [tfinal](auto spike) { return spike.time<tfinal; }); + expected_spikes.erase(cut_from, expected_spikes.end()); + EXPECT_EQ(expected_spikes, collected); +} + +struct lif_chain: public recipe { + lif_chain(unsigned n, double delay, schedule triggers): + n_(n), delay_(delay), triggers_(std::move(triggers)) {} + + cell_size_type num_cells() const override { return n_; } + + cell_kind get_cell_kind(cell_gid_type) const override { return cell_kind::lif; } + cell_size_type num_sources(cell_gid_type) const override { return 1; } + cell_size_type num_targets(cell_gid_type) const override { return 1; } + util::unique_any get_cell_description(cell_gid_type) const override { + // A hair-trigger LIF cell with tiny time constant and no refractory period. + lif_cell lif; + lif.tau_m = 0.01; // time constant (ms) + lif.t_ref = 0; // refactory period (ms) + lif.V_th = lif.E_L + 0.001; // threshold voltage 1 µV higher than resting + return lif; + } + + std::vector<cell_connection> connections_on(cell_gid_type target) const { + if (target) { + return {cell_connection({target-1, 0}, {target, 0}, weight_, delay_)}; + } + else { + return {}; + } + } + + std::vector<event_generator> event_generators(cell_gid_type target) const { + if (target) { + return {}; + } + else { + return {schedule_generator({target, 0}, weight_, triggers_)}; + } + } + + static constexpr double weight_ = 2.0; + unsigned n_; + double delay_; + schedule triggers_; +}; + +TEST(simulation, restart) { + std::vector<double> trigger_times = {1., 2., 3.}; + double delay = 10; + unsigned n = 5; + lif_chain rec(n, delay, explicit_schedule(trigger_times)); + + // Expect spike times to be almost exactly according to trigger times, + // plus delays along the chain of cells. + + std::vector<spike> expected_spikes; + for (auto t: trigger_times) { + for (unsigned i = 0; i<n; ++i) { + expected_spikes.push_back(spike({i, 0}, i*delay+t)); + } + } + + auto spike_lt = [](spike a, spike b) { return a.time<b.time || (a.time==b.time && a.source<b.source); }; + std::sort(expected_spikes.begin(), expected_spikes.end(), spike_lt); + + auto ctx = n_thread_context(4); + auto decomp = partition_load_balance(rec, ctx); + simulation sim(rec, decomp, ctx); + + std::vector<spike> collected; + sim.set_global_spike_callback([&](const std::vector<spike>& spikes) { + collected.insert(collected.end(), spikes.begin(), spikes.end()); + }); + + double tfinal = trigger_times.back()+delay*(n/2+0.1); + constexpr double dt = 0.01; + + auto cut_from = std::partition_point(expected_spikes.begin(), expected_spikes.end(), [tfinal](auto spike) { return spike.time<tfinal; }); + expected_spikes.erase(cut_from, expected_spikes.end()); + + // Run simulation in various numbers of stages, ranging from a single stage + // to running it in stages of duration less than delay/2. + + for (double run_time = 0.1*delay; run_time<=tfinal; run_time *= 1.5) { + SCOPED_TRACE(run_time); + collected.clear(); + + sim.reset(); + double t = 0; + do { + double run_to = std::min(tfinal, t + run_time); + t = sim.run(run_to, dt); + ASSERT_EQ(t, run_to); + } while (t<tfinal); + + ASSERT_EQ(expected_spikes.size(), collected.size()); + for (unsigned i = 0; i<expected_spikes.size(); ++i) { + EXPECT_EQ(expected_spikes[i].source, collected[i].source); + EXPECT_DOUBLE_EQ(expected_spikes[i].time, collected[i].time); + } + } +} diff --git a/test/unit/test_spike_source.cpp b/test/unit/test_spike_source.cpp index 8892b77406b8de4b81993d490c1157c30b0a6329..b0703a4b63fff7099b6ab2635051a7bb213cf660 100644 --- a/test/unit/test_spike_source.cpp +++ b/test/unit/test_spike_source.cpp @@ -43,14 +43,14 @@ TEST(spike_source, matches_time_seq) spike_source_cell_group group({0}, rec); // epoch ending at 10ms - epoch ep(0, 10); + epoch ep(0, 0., 10.); group.advance(ep, 1, {}); EXPECT_EQ(spike_times(group.spikes()), as_vector(seq.events(0, 10))); group.clear_spikes(); // advance to 20 ms and repeat - ep.advance(20); + ep.advance_to(20); group.advance(ep, 1, {}); EXPECT_EQ(spike_times(group.spikes()), as_vector(seq.events(10, 20))); }; @@ -70,7 +70,7 @@ TEST(spike_source, reset) spike_source_cell_group group({0}, rec); // Advance for 10 ms and store generated spikes in spikes1. - epoch ep(0, 10); + epoch ep(0, 0., 10.); group.advance(ep, 1, {}); auto spikes1 = group.spikes(); @@ -100,7 +100,7 @@ TEST(spike_source, exhaust) spike_source_cell_group group({0}, rec); // epoch ending at 10ms - epoch ep(0, 10); + epoch ep(0, 0., 10.); group.advance(ep, 1, {}); EXPECT_EQ(spike_times(group.spikes()), as_vector(seq.events(0, 10)));