diff --git a/src/connector.cc b/src/connector.cc index b7921b391d5314db86323c9db2e000edd1612795..ac6af7ea7ad543b95ccaaa203a8df044d2c90d98 100644 --- a/src/connector.cc +++ b/src/connector.cc @@ -762,6 +762,8 @@ error( "LOCAL Indices are not supported with MUSIC_ANYSOURCE"); } } #endif //MUSIC_ANYSOURCE + + /******************************************************************** * * Message Connectors @@ -780,8 +782,6 @@ error( "LOCAL Indices are not supported with MUSIC_ANYSOURCE"); { } - - Subconnector* MessageOutputConnector::makeSubconnector (int remoteRank) diff --git a/src/music/collector.hh b/src/music/collector.hh index 043dbac06ee33bb132cc351f2fe314cca8da5574..3f8a967b71075252897c82a956336a6f12a2e443 100644 --- a/src/music/collector.hh +++ b/src/music/collector.hh @@ -33,6 +33,11 @@ namespace MUSIC { + /** + * The Collector is responsible for collecting data from a set of + * input buffers in BufferMap and store them in the state variables + * represented by a DataMap. + */ class Collector { public: //for BG compiler class Interval : public MUSIC::Interval { diff --git a/src/music/connector.hh b/src/music/connector.hh index 271f5215bb5e20b2139af759314b7e5b3f362d0b..82a65cd59e4702bedadefaabe9371bd0bb5185dc 100644 --- a/src/music/connector.hh +++ b/src/music/connector.hh @@ -393,9 +393,10 @@ namespace MUSIC { public PostCommunicationConnector { private: // OutputSynchronizer synch; - FIBO buffer; + FIBO buffer; // MessageOutputConnectors have only one output buffer bool bufferAdded; - std::vector<FIBO*>& buffers_; + std::vector<FIBO*>& buffers_; // pointer to the port vector of + // connector buffers void send (); public: MessageOutputConnector (ConnectorInfo connInfo, diff --git a/src/music/distributor.hh b/src/music/distributor.hh index 39beb83c6fbc813fa89978cf8fdfefb1a8ffb3e0..c09ba9606310cf0b5d9b96876ab358f48f10bc93 100644 --- a/src/music/distributor.hh +++ b/src/music/distributor.hh @@ -33,6 +33,10 @@ namespace MUSIC { + /** + * The Distributor distributes data from state variables represented + * by a DataMap ro a set of output buffers in BufferMap. + */ class Distributor { public: //for BG compiler class Interval : public MUSIC::Interval { diff --git a/src/music/port.hh b/src/music/port.hh index 9f2d50fc14f139bc4ba535a65d6e1628e430f3b2..d7c639eb7c138b150f4d26f2473e556fea65807c 100644 --- a/src/music/port.hh +++ b/src/music/port.hh @@ -242,7 +242,7 @@ namespace MUSIC { class MessageOutputPort : public MessagePort, public OutputPort { - std::vector<FIBO*> buffers; + std::vector<FIBO*> buffers; // one buffer per MessageOutputConnector public: MessageOutputPort (Setup* s, std::string id); void map (); diff --git a/src/port.cc b/src/port.cc index 3a49da9264ca4d88a093404476b8b3841e88c0ac..b70ab83c544e8896c6c6c38e3780a34c6888eaa5 100644 --- a/src/port.cc +++ b/src/port.cc @@ -680,7 +680,7 @@ namespace MUSIC { { return new MessageOutputConnector (connInfo, indices_, - index_type_, + index_type_, setup_->communicator (), buffers); } diff --git a/src/subconnector.cc b/src/subconnector.cc index 19cf54c30972e1df0951bbc16ec22f1d85373b81..3b28c0b7b31d2a8f41674b359956130835fd086f 100644 --- a/src/subconnector.cc +++ b/src/subconnector.cc @@ -35,854 +35,885 @@ namespace MUSIC { -Subconnector::Subconnector (MPI::Datatype type, - MPI::Intercomm intercomm_, - int remoteLeader, - int remoteRank, - int receiverRank, - int receiverPortCode) -: type_ (type), - intercomm (intercomm_), - remoteRank_ (remoteRank), - remoteWorldRank_ (remoteLeader + remoteRank), - receiverRank_ (receiverRank), - receiverPortCode_ (receiverPortCode), - flushed(false) -{ -} - + Subconnector::Subconnector (MPI::Datatype type, + MPI::Intercomm intercomm_, + int remoteLeader, + int remoteRank, + int receiverRank, + int receiverPortCode) + : type_ (type), + intercomm (intercomm_), + remoteRank_ (remoteRank), + remoteWorldRank_ (remoteLeader + remoteRank), + receiverRank_ (receiverRank), + receiverPortCode_ (receiverPortCode), + flushed(false) + { + } -Subconnector::~Subconnector () -{ -} + + Subconnector::~Subconnector () + { + } + + BufferingOutputSubconnector::BufferingOutputSubconnector (int elementSize) + : buffer_ (elementSize) + { + } -BufferingOutputSubconnector::BufferingOutputSubconnector (int elementSize) -: buffer_ (elementSize) -{ -} + /******************************************************************** + * + * Cont Subconnectors + * + ********************************************************************/ + + ContOutputSubconnector::ContOutputSubconnector (//Synchronizer* synch_, + MPI::Intercomm intercomm_, + int remoteLeader, + int remoteRank, + int receiverPortCode_, + MPI::Datatype type) + : Subconnector (type, + intercomm_, + remoteLeader, + remoteRank, + remoteRank, + receiverPortCode_), + BufferingOutputSubconnector (0) + { + } + + void + ContOutputSubconnector::initialCommunication (double param) + { + send (); + } + + void + ContOutputSubconnector::maybeCommunicate () + { + if(!flushed) + send (); + } + + void + ContOutputSubconnector::send () + { -/******************************************************************** - * - * Cont Subconnectors - * - ********************************************************************/ - -ContOutputSubconnector::ContOutputSubconnector (//Synchronizer* synch_, - MPI::Intercomm intercomm_, - int remoteLeader, - int remoteRank, - int receiverPortCode_, - MPI::Datatype type) -: Subconnector (type, - intercomm_, - remoteLeader, - remoteRank, - remoteRank, - receiverPortCode_), - BufferingOutputSubconnector (0) -{ -} + void* data; + int size; + buffer_.nextBlock (data, size); + // NOTE: marshalling + char* buffer = static_cast <char*> (data); + while (size >= CONT_BUFFER_MAX) + { -void -ContOutputSubconnector::initialCommunication (double param) -{ - send (); -} + MUSIC_LOGR ("Sending to rank " << remoteRank_); + intercomm.Ssend (buffer, + CONT_BUFFER_MAX / type_.Get_size (), + type_, + remoteRank_, + CONT_MSG); + buffer += CONT_BUFFER_MAX; + size -= CONT_BUFFER_MAX; + } + MUSIC_LOGR ("Last send to rank " << remoteRank_); + intercomm.Ssend (buffer, + size / type_.Get_size (), + type_, + remoteRank_, + CONT_MSG); + } + + void + ContOutputSubconnector::flush (bool& dataStillFlowing) + { + if (!flushed) + { + if (!buffer_.isEmpty ()) + { + MUSIC_LOGR ("sending data remaining in buffers"); + send (); + dataStillFlowing = true; + } + else + { + char dummy; + intercomm.Ssend (&dummy, 0, type_, remoteRank_, FLUSH_MSG); + flushed = true; + } + } + } -void -ContOutputSubconnector::maybeCommunicate () -{ - if(!flushed) - send (); -} + + ContInputSubconnector::ContInputSubconnector (//Synchronizer* synch_, + MPI::Intercomm intercomm, + int remoteLeader, + int remoteRank, + int receiverRank, + int receiverPortCode, + MPI::Datatype type) + : Subconnector (type, + intercomm, + remoteLeader, + remoteRank, + receiverRank, + receiverPortCode), + InputSubconnector () + { + } + + void + ContInputSubconnector::initialCommunication (double initialBufferedTicks) + { + receive (); -void -ContOutputSubconnector::send () -{ + buffer_.fill (initialBufferedTicks); + } - void* data; - int size; - buffer_.nextBlock (data, size); - // NOTE: marshalling - char* buffer = static_cast <char*> (data); - while (size >= CONT_BUFFER_MAX) - { + void + ContInputSubconnector::maybeCommunicate () + { + if (!flushed) + receive (); + } - MUSIC_LOGR ("Sending to rank " << remoteRank_); - intercomm.Ssend (buffer, - CONT_BUFFER_MAX / type_.Get_size (), - type_, - remoteRank_, - CONT_MSG); - buffer += CONT_BUFFER_MAX; - size -= CONT_BUFFER_MAX; - } - MUSIC_LOGR ("Last send to rank " << remoteRank_); - intercomm.Ssend (buffer, - size / type_.Get_size (), + + void + ContInputSubconnector::receive () + { + char* data; + MPI::Status status; + int size, maxCount; + maxCount = CONT_BUFFER_MAX / type_.Get_size (); + do + { + data = static_cast<char*> (buffer_.insertBlock ()); + MUSIC_LOGR ("Receiving from rank " << remoteRank_); + + intercomm.Recv (data, + maxCount, type_, remoteRank_, - CONT_MSG); -} - - -void -ContOutputSubconnector::flush (bool& dataStillFlowing) -{ - if (!flushed) - { - if (!buffer_.isEmpty ()) - { - MUSIC_LOGR ("sending data remaining in buffers"); - send (); - dataStillFlowing = true; - } - else - { - char dummy; - intercomm.Ssend (&dummy, 0, type_, remoteRank_, FLUSH_MSG); - flushed = true; - } - } -} - - -ContInputSubconnector::ContInputSubconnector (//Synchronizer* synch_, - MPI::Intercomm intercomm, - int remoteLeader, - int remoteRank, - int receiverRank, - int receiverPortCode, - MPI::Datatype type) -: Subconnector (type, - intercomm, - remoteLeader, - remoteRank, - receiverRank, - receiverPortCode), - InputSubconnector () -{ -} - + MPI::ANY_TAG, + status); + if (status.Get_tag () == FLUSH_MSG) + { + flushed = true; + MUSIC_LOGR ("received flush message"); + return; + } + size = status.Get_count (type_); + buffer_.trimBlock (type_.Get_size () * size); + } + while (size == maxCount); + } -void -ContInputSubconnector::initialCommunication (double initialBufferedTicks) -{ + + void + ContInputSubconnector::flush (bool& dataStillFlowing) + { + if (!flushed) + { + //MUSIC_LOGR ("receiving and throwing away data"); receive (); - - buffer_.fill (initialBufferedTicks); -} - - -void -ContInputSubconnector::maybeCommunicate () -{ if (!flushed) - receive (); -} - - -void -ContInputSubconnector::receive () -{ - char* data; - MPI::Status status; - int size, maxCount; - maxCount = CONT_BUFFER_MAX / type_.Get_size (); - do - { - data = static_cast<char*> (buffer_.insertBlock ()); - MUSIC_LOGR ("Receiving from rank " << remoteRank_); - - intercomm.Recv (data, - maxCount, - type_, - remoteRank_, - MPI::ANY_TAG, - status); - if (status.Get_tag () == FLUSH_MSG) - { - flushed = true; - MUSIC_LOGR ("received flush message"); - return; - } - size = status.Get_count (type_); - buffer_.trimBlock (type_.Get_size () * size); - } - while (size == maxCount); -} + dataStillFlowing = true; + } + } -void -ContInputSubconnector::flush (bool& dataStillFlowing) -{ - if (!flushed) - { - //MUSIC_LOGR ("receiving and throwing away data"); - receive (); - if (!flushed) - dataStillFlowing = true; - } -} + + /******************************************************************** + * + * Event Subconnectors + * + ********************************************************************/ + + + EventOutputSubconnector::EventOutputSubconnector (//Synchronizer* synch_, + MPI::Intercomm intercomm, + int remoteLeader, + int remoteRank, + int receiverPortCode) + : Subconnector (MPI::BYTE, + intercomm, + remoteLeader, + remoteRank, + remoteRank, + receiverPortCode), + BufferingOutputSubconnector (sizeof (Event)) + { + } -/******************************************************************** - * - * Event Subconnectors - * - ********************************************************************/ - - -EventOutputSubconnector::EventOutputSubconnector (//Synchronizer* synch_, - MPI::Intercomm intercomm, - int remoteLeader, - int remoteRank, - int receiverPortCode) -: Subconnector (MPI::BYTE, - intercomm, - remoteLeader, - remoteRank, - remoteRank, - receiverPortCode), - BufferingOutputSubconnector (sizeof (Event)) -{ -} + + void + EventOutputSubconnector::maybeCommunicate () + { + send (); + } -void -EventOutputSubconnector::maybeCommunicate () -{ - send (); + + void + EventOutputSubconnector::maybeCommunicate (std::vector<MPI::Request> &requests) + { + send (requests); + } -} -void -EventOutputSubconnector::maybeCommunicate (std::vector<MPI::Request> &requests) -{ - send (requests); -} + + void + EventOutputSubconnector::send (std::vector<MPI::Request> &requests) + { + MUSIC_LOGRE ("ISend"); + void* data; + int size; + buffer_.nextBlock (data, size); + char* buffer = static_cast <char*> (data); + while (size >= SPIKE_BUFFER_MAX) + { + requests.push_back(intercomm.Isend (buffer, + SPIKE_BUFFER_MAX, + type_, + remoteRank_, + SPIKE_MSG)); + buffer += SPIKE_BUFFER_MAX; + size -= SPIKE_BUFFER_MAX; + } + requests.push_back( intercomm.Isend (buffer, size, type_, remoteRank_, SPIKE_MSG)); + } -void -EventOutputSubconnector::send (std::vector<MPI::Request> &requests) -{ - MUSIC_LOGRE ("ISend"); - void* data; - int size; - buffer_.nextBlock (data, size); - char* buffer = static_cast <char*> (data); - while (size >= SPIKE_BUFFER_MAX) - { - requests.push_back(intercomm.Isend (buffer, - SPIKE_BUFFER_MAX, - type_, - remoteRank_, - SPIKE_MSG)); - buffer += SPIKE_BUFFER_MAX; - size -= SPIKE_BUFFER_MAX; - } - requests.push_back( intercomm.Isend (buffer, size, type_, remoteRank_, SPIKE_MSG)); -} -void -EventOutputSubconnector::send () -{ - MUSIC_LOGRE ("Ssend"); - void* data; - int size; - buffer_.nextBlock (data, size); - char* buffer = static_cast <char*> (data); - while (size >= SPIKE_BUFFER_MAX) - { - intercomm.Ssend (buffer, - SPIKE_BUFFER_MAX, - type_, - remoteRank_, - SPIKE_MSG); - buffer += SPIKE_BUFFER_MAX; - size -= SPIKE_BUFFER_MAX; - } - intercomm.Ssend (buffer, size, type_, remoteRank_, SPIKE_MSG); -} + + void + EventOutputSubconnector::send () + { + MUSIC_LOGRE ("Ssend"); + void* data; + int size; + buffer_.nextBlock (data, size); + char* buffer = static_cast <char*> (data); + while (size >= SPIKE_BUFFER_MAX) + { + intercomm.Ssend (buffer, + SPIKE_BUFFER_MAX, + type_, + remoteRank_, + SPIKE_MSG); + buffer += SPIKE_BUFFER_MAX; + size -= SPIKE_BUFFER_MAX; + } + intercomm.Ssend (buffer, size, type_, remoteRank_, SPIKE_MSG); + } -void -EventOutputSubconnector::flush (bool& dataStillFlowing) -{ - if (!flushed) - { - if (!buffer_.isEmpty ()) - { - MUSIC_LOGR ("sending data remaining in buffers"); - send (); - dataStillFlowing = true; - } - /* remedius - * flushed flag was added since synchronous communication demands equal sends and receives - */ - else - { - Event* e = static_cast<Event*> (buffer_.insert ()); - e->id = FLUSH_MARK; - send (); - flushed = true; - } - } -} + + void + EventOutputSubconnector::flush (bool& dataStillFlowing) + { + if (!flushed) + { + if (!buffer_.isEmpty ()) + { + MUSIC_LOGR ("sending data remaining in buffers"); + send (); + dataStillFlowing = true; + } + /* remedius + * flushed flag was added since synchronous communication demands equal sends and receives + */ + else + { + Event* e = static_cast<Event*> (buffer_.insert ()); + e->id = FLUSH_MARK; + send (); + flushed = true; + } + } + } -EventInputSubconnector::EventInputSubconnector (//Synchronizer* synch_, - MPI::Intercomm intercomm, - int remoteLeader, - int remoteRank, - int receiverRank, - int receiverPortCode) -: Subconnector (MPI::BYTE, - intercomm, - remoteLeader, - remoteRank, - receiverRank, - receiverPortCode), - InputSubconnector () -{ -} + EventInputSubconnector::EventInputSubconnector (//Synchronizer* synch_, + MPI::Intercomm intercomm, + int remoteLeader, + int remoteRank, + int receiverRank, + int receiverPortCode) + : Subconnector (MPI::BYTE, + intercomm, + remoteLeader, + remoteRank, + receiverRank, + receiverPortCode), + InputSubconnector () + { + } -EventInputSubconnectorGlobal::EventInputSubconnectorGlobal -(//Synchronizer* synch_, - MPI::Intercomm intercomm, - int remoteLeader, - int remoteRank, - int receiverRank, - int receiverPortCode, - EventHandlerGlobalIndex* eh) -: Subconnector (MPI::BYTE, - intercomm, - remoteLeader, - remoteRank, - receiverRank, - receiverPortCode), - EventInputSubconnector (//synch_, - intercomm, - remoteLeader, - remoteRank, - receiverRank, - receiverPortCode), - handleEvent (eh) -{ + EventInputSubconnectorGlobal::EventInputSubconnectorGlobal + (//Synchronizer* synch_, + MPI::Intercomm intercomm, + int remoteLeader, + int remoteRank, + int receiverRank, + int receiverPortCode, + EventHandlerGlobalIndex* eh) + : Subconnector (MPI::BYTE, + intercomm, + remoteLeader, + remoteRank, + receiverRank, + receiverPortCode), + EventInputSubconnector (//synch_, + intercomm, + remoteLeader, + remoteRank, + receiverRank, + receiverPortCode), + handleEvent (eh) + { -} + } -//EventHandlerGlobalIndexDummy -//EventInputSubconnectorGlobal::dummyHandler; - - -EventInputSubconnectorLocal::EventInputSubconnectorLocal -(//Synchronizer* synch_, - MPI::Intercomm intercomm, - int remoteLeader, - int remoteRank, - int receiverRank, - int receiverPortCode, - EventHandlerLocalIndex* eh) -: Subconnector (MPI::BYTE, - intercomm, - remoteLeader, - remoteRank, - receiverRank, - receiverPortCode), - EventInputSubconnector ( - intercomm, - remoteLeader, - remoteRank, - receiverRank, - receiverPortCode), - handleEvent (eh) -{ -} + //EventHandlerGlobalIndexDummy + //EventInputSubconnectorGlobal::dummyHandler; + + + EventInputSubconnectorLocal::EventInputSubconnectorLocal + (//Synchronizer* synch_, + MPI::Intercomm intercomm, + int remoteLeader, + int remoteRank, + int receiverRank, + int receiverPortCode, + EventHandlerLocalIndex* eh) + : Subconnector (MPI::BYTE, + intercomm, + remoteLeader, + remoteRank, + receiverRank, + receiverPortCode), + EventInputSubconnector ( + intercomm, + remoteLeader, + remoteRank, + receiverRank, + receiverPortCode), + handleEvent (eh) + { + } -//EventHandlerLocalIndexDummy -//EventInputSubconnectorLocal::dummyHandler; + //EventHandlerLocalIndexDummy + //EventInputSubconnectorLocal::dummyHandler; -void -EventInputSubconnector::maybeCommunicate () -{ - if (!flushed ) - receive (); -} + void + EventInputSubconnector::maybeCommunicate () + { + if (!flushed ) + receive (); + } -bool -EventInputSubconnectorGlobal::receive (char *data, int size) -{ - Event* ev = (Event*) data; - /* remedius - * since the message can be of size 0 and contains garbage=FLUSH_MARK, - * the check for the size of the message was added. - */ - if (ev[0].id == FLUSH_MARK && size > 0) - { - flushed = true; - MUSIC_LOGR ("received flush message"); - } - else - { - int nEvents = size / sizeof (Event); - for (int i = 0; i < nEvents; ++i){ - (*handleEvent) (ev[i].t, ev[i].id); - } - } - return flushed; -} -// NOTE: isolate difference between global and local to avoid code repetition -void -EventInputSubconnectorGlobal::receive () -{ - char data[SPIKE_BUFFER_MAX]; - MPI::Status status; - int size; - //double starttime, endtime; - //starttime = MPI_Wtime(); - - do - { - intercomm.Recv (data, - SPIKE_BUFFER_MAX, - type_, - remoteRank_, - SPIKE_MSG, - status); - - size = status.Get_count (type_); - Event* ev = (Event*) data; - /* remedius - * since the message can be of size 0 and contains garbage=FLUSH_MARK, - * the check for the size of the message was added. - */ - if (ev[0].id == FLUSH_MARK && size > 0) - { - flushed = true; - MUSIC_LOGR ("received flush message"); - return; - } - - - int nEvents = size / sizeof (Event); - - for (int i = 0; i < nEvents; ++i){ - (*handleEvent) (ev[i].t, ev[i].id); - } + bool + EventInputSubconnectorGlobal::receive (char *data, int size) + { + Event* ev = (Event*) data; + /* remedius + * since the message can be of size 0 and contains garbage=FLUSH_MARK, + * the check for the size of the message was added. + */ + if (ev[0].id == FLUSH_MARK && size > 0) + { + flushed = true; + MUSIC_LOGR ("received flush message"); + } + else + { + int nEvents = size / sizeof (Event); + for (int i = 0; i < nEvents; ++i){ + (*handleEvent) (ev[i].t, ev[i].id); + } + } + return flushed; + } + + // NOTE: isolate difference between global and local to avoid code repetition + void + EventInputSubconnectorGlobal::receive () + { + char data[SPIKE_BUFFER_MAX]; + MPI::Status status; + int size; + //double starttime, endtime; + //starttime = MPI_Wtime(); + + do + { + intercomm.Recv (data, + SPIKE_BUFFER_MAX, + type_, + remoteRank_, + SPIKE_MSG, + status); + + size = status.Get_count (type_); + Event* ev = (Event*) data; + /* remedius + * since the message can be of size 0 and contains garbage=FLUSH_MARK, + * the check for the size of the message was added. + */ + if (ev[0].id == FLUSH_MARK && size > 0) + { + flushed = true; + MUSIC_LOGR ("received flush message"); + return; + } + + + int nEvents = size / sizeof (Event); + + for (int i = 0; i < nEvents; ++i){ + (*handleEvent) (ev[i].t, ev[i].id); } - while (size == SPIKE_BUFFER_MAX); - //endtime = MPI_Wtime(); - //endtime = endtime-starttime; - //if(tt < endtime) - //tt = endtime; -} + } + while (size == SPIKE_BUFFER_MAX); + //endtime = MPI_Wtime(); + //endtime = endtime-starttime; + //if(tt < endtime) + //tt = endtime; + } -void -EventInputSubconnectorLocal::receive () -{ - MUSIC_LOGRE ("receive"); - char data[SPIKE_BUFFER_MAX]; - MPI::Status status; - int size; - do - { - intercomm.Recv (data, - SPIKE_BUFFER_MAX, - type_, - remoteRank_, - SPIKE_MSG, - status); - size = status.Get_count (type_); - Event* ev = (Event*) data; - /* remedius - * since the message can be of size 0 and contains garbage=FLUSH_MARK, - * the check for the size of the message was added. - */ - if (ev[0].id == FLUSH_MARK && size > 0) - { - flushed = true; - return; - } - int nEvents = size / sizeof (Event); - for (int i = 0; i < nEvents; ++i) - (*handleEvent) (ev[i].t, ev[i].id); - } - while (size == SPIKE_BUFFER_MAX); -} + void + EventInputSubconnectorLocal::receive () + { + MUSIC_LOGRE ("receive"); + char data[SPIKE_BUFFER_MAX]; + MPI::Status status; + int size; + do + { + intercomm.Recv (data, + SPIKE_BUFFER_MAX, + type_, + remoteRank_, + SPIKE_MSG, + status); + size = status.Get_count (type_); + Event* ev = (Event*) data; + /* remedius + * since the message can be of size 0 and contains garbage=FLUSH_MARK, + * the check for the size of the message was added. + */ + if (ev[0].id == FLUSH_MARK && size > 0) + { + flushed = true; + return; + } + int nEvents = size / sizeof (Event); + for (int i = 0; i < nEvents; ++i) + (*handleEvent) (ev[i].t, ev[i].id); + } + while (size == SPIKE_BUFFER_MAX); + } -void -EventInputSubconnector::flush (bool& dataStillFlowing) -{ - if (!flushed) - { - //MUSIC_LOGRE ("receiving and throwing away data"); - receive (); - if (!flushed) - dataStillFlowing = true; - } -} + void + EventInputSubconnector::flush (bool& dataStillFlowing) + { + if (!flushed) + { + //MUSIC_LOGRE ("receiving and throwing away data"); + receive (); + if (!flushed) + dataStillFlowing = true; + } + } -/*void -EventInputSubconnectorGlobal::flush (bool& dataStillFlowing) -{ - //handleEvent = &dummyHandler; - EventInputSubconnector::flush (dataStillFlowing); -} + /*void + EventInputSubconnectorGlobal::flush (bool& dataStillFlowing) + { + //handleEvent = &dummyHandler; + EventInputSubconnector::flush (dataStillFlowing); + } -void -EventInputSubconnectorLocal::flush (bool& dataStillFlowing) -{ - //handleEvent = &dummyHandler; - EventInputSubconnector::flush (dataStillFlowing); -}*/ -/******************************************************************** - * - * Message Subconnectors - * - ********************************************************************/ - -MessageOutputSubconnector::MessageOutputSubconnector (//Synchronizer* synch_, - MPI::Intercomm intercomm, - int remoteLeader, - int remoteRank, - int receiverPortCode, - FIBO* buffer) -: Subconnector (MPI::BYTE, - intercomm, - remoteLeader, - remoteRank, - remoteRank, - receiverPortCode), - buffer_ (buffer) -{ -} + void + EventInputSubconnectorLocal::flush (bool& dataStillFlowing) + { + //handleEvent = &dummyHandler; + EventInputSubconnector::flush (dataStillFlowing); + }*/ + + + + /******************************************************************** + * + * Message Subconnectors + * + ********************************************************************/ + + MessageOutputSubconnector::MessageOutputSubconnector (//Synchronizer* synch_, + MPI::Intercomm intercomm, + int remoteLeader, + int remoteRank, + int receiverPortCode, + FIBO* buffer) + : Subconnector (MPI::BYTE, + intercomm, + remoteLeader, + remoteRank, + remoteRank, + receiverPortCode), + buffer_ (buffer) + { + } -void -MessageOutputSubconnector::maybeCommunicate () -{ - if (!flushed) - send (); -} + void + MessageOutputSubconnector::maybeCommunicate () + { + if (!flushed) + send (); + } -void -MessageOutputSubconnector::send () -{ - void* data; - int size; - buffer_->nextBlockNoClear (data, size); - // NOTE: marshalling - char* buffer = static_cast <char*> (data); - while (size >= MESSAGE_BUFFER_MAX) - { - intercomm.Ssend (buffer, - MESSAGE_BUFFER_MAX, - type_, - remoteRank_, - MESSAGE_MSG); - buffer += MESSAGE_BUFFER_MAX; - size -= MESSAGE_BUFFER_MAX; - } - intercomm.Ssend (buffer, size, type_, remoteRank_, MESSAGE_MSG); -} + void + MessageOutputSubconnector::send () + { + void* data; + int size; + buffer_->nextBlockNoClear (data, size); + // NOTE: marshalling + char* buffer = static_cast <char*> (data); + while (size >= MESSAGE_BUFFER_MAX) + { + intercomm.Ssend (buffer, + MESSAGE_BUFFER_MAX, + type_, + remoteRank_, + MESSAGE_MSG); + buffer += MESSAGE_BUFFER_MAX; + size -= MESSAGE_BUFFER_MAX; + } + intercomm.Ssend (buffer, size, type_, remoteRank_, MESSAGE_MSG); + } -void -MessageOutputSubconnector::flush (bool& dataStillFlowing) -{ - if (!flushed) - { - if (!buffer_->isEmpty ()) - { - MUSIC_LOGRE ("sending data remaining in buffers"); - send (); - dataStillFlowing = true; - } - else - { - char dummy; - intercomm.Ssend (&dummy, 0, type_, remoteRank_, FLUSH_MSG); - } - } -} + void + MessageOutputSubconnector::flush (bool& dataStillFlowing) + { + if (!flushed) + { + if (!buffer_->isEmpty ()) + { + MUSIC_LOGRE ("sending data remaining in buffers"); + send (); + dataStillFlowing = true; + } + else + { + char dummy; + intercomm.Ssend (&dummy, 0, type_, remoteRank_, FLUSH_MSG); + } + } + } -MessageInputSubconnector::MessageInputSubconnector (//Synchronizer* synch_, - MPI::Intercomm intercomm, - int remoteLeader, - int remoteRank, - int receiverRank, - int receiverPortCode, - MessageHandler* mh) -: Subconnector (MPI::BYTE, - intercomm, - remoteLeader, - remoteRank, - receiverRank, - receiverPortCode), - handleMessage (mh) -{ -} + MessageInputSubconnector::MessageInputSubconnector (//Synchronizer* synch_, + MPI::Intercomm intercomm, + int remoteLeader, + int remoteRank, + int receiverRank, + int receiverPortCode, + MessageHandler* mh) + : Subconnector (MPI::BYTE, + intercomm, + remoteLeader, + remoteRank, + receiverRank, + receiverPortCode), + handleMessage (mh) + { + } -//MessageHandlerDummy -//MessageInputSubconnector::dummyHandler; + //MessageHandlerDummy + //MessageInputSubconnector::dummyHandler; -void -MessageInputSubconnector::maybeCommunicate () -{ - if (!flushed)// && synch->communicate ()) - receive (); -} + void + MessageInputSubconnector::maybeCommunicate () + { + if (!flushed)// && synch->communicate ()) + receive (); + } -void -MessageInputSubconnector::receive () -{ - char data[MESSAGE_BUFFER_MAX]; - MPI::Status status; - int size; - do - { - intercomm.Recv (data, - MESSAGE_BUFFER_MAX, - type_, - remoteRank_, - MPI::ANY_TAG, - status); - if (status.Get_tag () == FLUSH_MSG) - { - flushed = true; - MUSIC_LOGRE ("received flush message"); - return; - } - size = status.Get_count (type_); - int current = 0; - while (current < size) - { - MessageHeader* header = static_cast<MessageHeader*> - (static_cast<void*> (&data[current])); - current += sizeof (MessageHeader); - (*handleMessage) (header->t (), &data[current], header->size ()); - current += header->size (); - } - } - while (size == MESSAGE_BUFFER_MAX); -} + void + MessageInputSubconnector::receive () + { + char data[MESSAGE_BUFFER_MAX]; + MPI::Status status; + int size; + do + { + intercomm.Recv (data, + MESSAGE_BUFFER_MAX, + type_, + remoteRank_, + MPI::ANY_TAG, + status); + if (status.Get_tag () == FLUSH_MSG) + { + flushed = true; + MUSIC_LOGRE ("received flush message"); + return; + } + size = status.Get_count (type_); + int current = 0; + while (current < size) + { + MessageHeader* header = static_cast<MessageHeader*> + (static_cast<void*> (&data[current])); + current += sizeof (MessageHeader); + (*handleMessage) (header->t (), &data[current], header->size ()); + current += header->size (); + } + } + while (size == MESSAGE_BUFFER_MAX); + } -void -MessageInputSubconnector::flush (bool& dataStillFlowing) -{ - //handleMessage = &dummyHandler; + void + MessageInputSubconnector::flush (bool& dataStillFlowing) + { + //handleMessage = &dummyHandler; + if (!flushed) + { + //MUSIC_LOGRE ("receiving and throwing away data"); + receive (); if (!flushed) - { - //MUSIC_LOGRE ("receiving and throwing away data"); - receive (); - if (!flushed) - dataStillFlowing = true; - } -} + dataStillFlowing = true; + } + } -/******************************************************************** - * - * Collective Subconnector - * - ********************************************************************/ -CollectiveSubconnector::CollectiveSubconnector (MPI::Intracomm intracomm) - : intracomm_ (intracomm) -{ -} + + /******************************************************************** + * + * Collective Subconnector + * + ********************************************************************/ + CollectiveSubconnector::CollectiveSubconnector (MPI::Intracomm intracomm) + : intracomm_ (intracomm) + { + } -CollectiveSubconnector::~CollectiveSubconnector () -{ -} -void -CollectiveSubconnector::allocAllgathervArrays () -{ - nProcesses = intracomm_.Get_size (); - ppBytes = new int[nProcesses]; - displ = new int[nProcesses]; -} + CollectiveSubconnector::~CollectiveSubconnector () + { + } -void -CollectiveSubconnector::freeAllgathervArrays () -{ - delete ppBytes; - delete displ; -} + + void + CollectiveSubconnector::allocAllgathervArrays () + { + nProcesses = intracomm_.Get_size (); + ppBytes = new int[nProcesses]; + displ = new int[nProcesses]; + } -void -CollectiveSubconnector::maybeCommunicate () -{ - if (!flushed) - communicate (); -} + + void + CollectiveSubconnector::freeAllgathervArrays () + { + delete ppBytes; + delete displ; + } -int -CollectiveSubconnector::calcCommDataSize (int local_data_size) -{ - int dsize; - //distributing the size of the buffer - intracomm_.Allgather (&local_data_size, 1, MPI_INT, ppBytes, 1, MPI_INT); - //could it be that dsize is more then unsigned int? - dsize = 0; - for(int i=0; i < nProcesses; ++i){ - displ[i] = dsize; - dsize += ppBytes[i]; - //ppBytes[i] /= type_.Get_size(); - /* if(ppBytes[i] > max_size) - max_size = ppBytes[i];*/ - } - return dsize; -} + + void + CollectiveSubconnector::maybeCommunicate () + { + if (!flushed) + communicate (); + } -/* remedius - * current collective communication is realized through two times calls of - * mpi allgather function: first time the size of the data is distributed, - * the second time the data by itself is distributed. - * Probably that could be not optimal for a huge # of ranks. - */ -std::vector<char> CollectiveSubconnector::getCommData(char *cur_buff, int size) -{ - unsigned int dsize; - char *recv_buff; - std::vector<char> commData; - recv_buff=NULL; - - dsize = calcCommDataSize(size); - - if(dsize > 0){ - //distributing the data - recv_buff = new char[dsize]; - intracomm_.Allgatherv(cur_buff, size, MPI::BYTE, recv_buff, ppBytes, displ, MPI::BYTE ); - std::copy(recv_buff,recv_buff+dsize,std::back_inserter(commData)); - delete[] recv_buff; - } - return commData; -} + + int + CollectiveSubconnector::calcCommDataSize (int local_data_size) + { + int dsize; + //distributing the size of the buffer + intracomm_.Allgather (&local_data_size, 1, MPI_INT, ppBytes, 1, MPI_INT); + //could it be that dsize is more then unsigned int? + dsize = 0; + for(int i=0; i < nProcesses; ++i){ + displ[i] = dsize; + dsize += ppBytes[i]; + //ppBytes[i] /= type_.Get_size(); + /* if(ppBytes[i] > max_size) + max_size = ppBytes[i];*/ + } + return dsize; + } + + /* remedius + * current collective communication is realized through two times calls of + * mpi allgather function: first time the size of the data is distributed, + * the second time the data by itself is distributed. + * Probably that could be not optimal for a huge # of ranks. + */ + std::vector<char> CollectiveSubconnector::getCommData(char *cur_buff, int size) + { + unsigned int dsize; + char *recv_buff; + std::vector<char> commData; + recv_buff=NULL; + + dsize = calcCommDataSize(size); + + if(dsize > 0){ + //distributing the data + recv_buff = new char[dsize]; + intracomm_.Allgatherv(cur_buff, size, MPI::BYTE, recv_buff, ppBytes, displ, MPI::BYTE ); + std::copy(recv_buff,recv_buff+dsize,std::back_inserter(commData)); + delete[] recv_buff; + } + return commData; + } -void -EventOutputCollectiveSubconnector::setOutputBuffer (void* buffer, - unsigned int size) -{ - router_->setOutputBuffer (buffer, size); -} + void + EventOutputCollectiveSubconnector::setOutputBuffer (void* buffer, + unsigned int size) + { + router_->setOutputBuffer (buffer, size); + } -unsigned int -EventOutputCollectiveSubconnector::dataSize () -{ - return router_->dataSize (); -} + unsigned int + EventOutputCollectiveSubconnector::dataSize () + { + return router_->dataSize (); + } -void -EventOutputCollectiveSubconnector::fillOutputBuffer () -{ - router_->fillOutputBuffer (); -} + void + EventOutputCollectiveSubconnector::fillOutputBuffer () + { + router_->fillOutputBuffer (); + } -void -EventInputCollectiveSubconnector::processData (void* data, unsigned int size) -{ - Event* e = static_cast<Event*> (data); - while (size > 0) - { - router_->processEvent (e->t, e->id); - ++e; - size -= sizeof (Event); - } -} + void + EventInputCollectiveSubconnector::processData (void* data, unsigned int size) + { + Event* e = static_cast<Event*> (data); + while (size > 0) + { + router_->processEvent (e->t, e->id); + ++e; + size -= sizeof (Event); + } + } -void -ContCollectiveSubconnector::maybeCommunicate(){ - CollectiveSubconnector::maybeCommunicate(); -} -void ContCollectiveSubconnector::communicate(){ - int size, bSize, nBuffered; - void *data; - char *cur_buff, *dest_buff; - std::vector<char> commData; - if(flushed) - return; - ContOutputSubconnector::buffer_.nextBlock (data, size); - commData = getCommData(static_cast <char*> (data), size); - const int* displ = getDisplArr(); - //std::copy(commData.begin(),commData.end(),cur_buff); - cur_buff = (char*)static_cast<void*> (&commData[0]); - flushed = commData.size() == 0 ? true : false; - nBuffered= commData.size() / width_; //the received data size is always multiple of port width - for(int i =0; i < nBuffered ; ++i){ - bSize = 0; - dest_buff = static_cast<char*> (ContInputSubconnector::buffer_.insertBlock ()); - for (std::multimap<int, Interval>::iterator it= - intervals_.begin() ; it != intervals_.end(); it++ ) //iterates all the intervals - { - memcpy (dest_buff + bSize, - cur_buff - +displ[(*it).first]//+displacement of data for a current rank - +i*blockSizePerRank_[(*it).first] //+displacement of the appropriate buffered chunk of data within current rank data block - +(*it).second.begin(), //+displacement of the requested data - (*it).second.end()); // length field is stored overlapping the end field - bSize += (*it).second.end(); - } - ContInputSubconnector::buffer_.trimBlock (bSize); - } -} -void ContCollectiveSubconnector::initialCommunication(double initialBufferedTicks){ - communicate(); - fillBlockSizes(); - ContInputSubconnector::buffer_.fill (initialBufferedTicks); -} -void ContCollectiveSubconnector::fillBlockSizes() -{ - std::map<int, Interval>::iterator it; - const int* ppBytes = getSizeArr(); - for ( it=intervals_.begin() ; it != intervals_.end(); it++ ) //iterates all the intervals - blockSizePerRank_[(*it).first]= ppBytes[(*it).first]; -} -void ContCollectiveSubconnector::flush(bool& dataStillFlowing){ - if (!flushed) + + void + ContCollectiveSubconnector::maybeCommunicate() + { + CollectiveSubconnector::maybeCommunicate(); + } + + + void + ContCollectiveSubconnector::communicate() + { + int size, bSize, nBuffered; + void *data; + char *cur_buff, *dest_buff; + std::vector<char> commData; + if(flushed) + return; + ContOutputSubconnector::buffer_.nextBlock (data, size); + commData = getCommData(static_cast <char*> (data), size); + const int* displ = getDisplArr(); + //std::copy(commData.begin(),commData.end(),cur_buff); + cur_buff = (char*)static_cast<void*> (&commData[0]); + flushed = commData.size() == 0 ? true : false; + nBuffered= commData.size() / width_; //the received data size is always multiple of port width + for(int i =0; i < nBuffered ; ++i){ + bSize = 0; + dest_buff = static_cast<char*> (ContInputSubconnector::buffer_.insertBlock ()); + for (std::multimap<int, Interval>::iterator it= + intervals_.begin() ; it != intervals_.end(); it++ ) //iterates all the intervals { - if (!ContOutputSubconnector::buffer_.isEmpty ()) - { - MUSIC_LOGR ("sending data remaining in buffers"); - maybeCommunicate (); - dataStillFlowing = true; - } - else - { - //a sign for the end of communication will be an empty buffer - communicate (); - if(!flushed) - dataStillFlowing = true; - } + memcpy (dest_buff + bSize, + cur_buff + +displ[(*it).first]//+displacement of data for a current rank + +i*blockSizePerRank_[(*it).first] //+displacement of the appropriate buffered chunk of data within current rank data block + +(*it).second.begin(), //+displacement of the requested data + (*it).second.end()); // length field is stored overlapping the end field + bSize += (*it).second.end(); } -} + ContInputSubconnector::buffer_.trimBlock (bSize); + } + } + + + void + ContCollectiveSubconnector::initialCommunication(double initialBufferedTicks) + { + communicate(); + fillBlockSizes(); + ContInputSubconnector::buffer_.fill (initialBufferedTicks); + } + + + void ContCollectiveSubconnector::fillBlockSizes() + { + std::map<int, Interval>::iterator it; + const int* ppBytes = getSizeArr(); + for ( it=intervals_.begin() ; it != intervals_.end(); it++ ) //iterates all the intervals + blockSizePerRank_[(*it).first]= ppBytes[(*it).first]; + } + + + void + ContCollectiveSubconnector::flush(bool& dataStillFlowing) + { + if (!flushed) + { + if (!ContOutputSubconnector::buffer_.isEmpty ()) + { + MUSIC_LOGR ("sending data remaining in buffers"); + maybeCommunicate (); + dataStillFlowing = true; + } + else + { + //a sign for the end of communication will be an empty buffer + communicate (); + if(!flushed) + dataStillFlowing = true; + } + } + } } #endif