From 82c0b360c69a82f85362701a7e0c865a7105b0ed Mon Sep 17 00:00:00 2001 From: Mark Olesen Date: Thu, 2 Mar 2023 11:46:35 +0100 Subject: [PATCH] ENH: support run-time PstreamBuffers algorithm selection (advanced option) - this helps isolate selection of PEX vs NBX, for potential migration to a hybrid PEX --- src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H | 14 +- .../db/IOstreams/Pstreams/PstreamBuffers.C | 210 ++++++++++++------ .../db/IOstreams/Pstreams/PstreamBuffers.H | 86 +++---- .../db/IOstreams/Pstreams/PstreamExchange.C | 85 ++++--- src/OpenFOAM/global/argList/argList.C | 2 + 5 files changed, 240 insertions(+), 157 deletions(-) diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index cb17c2fc81..1f49f7234c 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -470,8 +470,7 @@ public: // Exchange - //- Helper: exchange sizes of sendBufs for specified - //- set of send/receive processes. + //- Helper: exchange sizes of sendBufs for specified send/recv ranks template static void exchangeSizes ( @@ -483,6 +482,17 @@ public: const label comm = UPstream::worldComm ); + //- Helper: exchange sizes of sendBufs for specified neighbour ranks + template + static void exchangeSizes + ( + const labelUList& neighProcs, + const Container& sendBufs, + labelList& sizes, + const label tag = UPstream::msgType(), + const label comm = UPstream::worldComm + ); + //- Helper: exchange sizes of sendBufs. //- The sendBufs is the data per processor (in the communicator). // Returns sizes of sendBufs on the sending processor. diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C index 90a17969a5..c8de9f243e 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C @@ -28,6 +28,30 @@ License #include "PstreamBuffers.H" #include "bitSet.H" +#include "debug.H" +#include "registerSwitch.H" + +// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // + +int Foam::PstreamBuffers::algorithm +( + // Not really the most creative name... + Foam::debug::optimisationSwitch("pbufs.algorithm", -1) +); +registerOptSwitch +( + "pbufs.algorithm", + int, + Foam::PstreamBuffers::algorithm +); + + +// Simple enumerations +// ------------------- +static constexpr int algorithm_PEX_allToAll = -1; // OpenFOAM 2212 and earlier +//static constexpr int algorithm_PEX_hybrid = 0; // New default? +static constexpr int algorithm_full_NBX = 1; // Experimental + // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // @@ -43,9 +67,58 @@ void Foam::PstreamBuffers::finalExchange if (commsType_ == UPstream::commsTypes::nonBlocking) { - // Dense storage uses all-to-all - Pstream::exchangeSizes(sendBuffers_, recvSizes, comm_); + if + ( + wait + && (algorithm >= algorithm_full_NBX) + && (UPstream::maxCommsSize <= 0) + ) + { + // NBX algorithm (nonblocking exchange) + // - when requested and waiting, no data chunking etc + PstreamDetail::exchangeConsensus, char> + ( + sendBuffers_, + recvBuffers_, + recvSizes, + (tag_ + 271828), // some unique tag? + comm_, + wait + ); + + return; + } + + + // PEX algorithm with two different flavours of exchanging sizes + + // Assemble the send sizes (cf. Pstream::exchangeSizes) + labelList sendSizes(nProcs_); + forAll(sendBuffers_, proci) + { + sendSizes[proci] = sendBuffers_[proci].size(); + } + recvSizes.resize_nocopy(nProcs_); + + if (algorithm == algorithm_PEX_allToAll) + { + // PEX stage 1: exchange sizes (all-to-all) + UPstream::allToAll(sendSizes, recvSizes, comm_); + } + else + { + // PEX stage 1: exchange sizes (non-blocking consensus) + UPstream::allToAllConsensus + ( + sendSizes, + recvSizes, + (tag_ + 314159), // some unique tag? + comm_ + ); + } + + // PEX stage 2: point-to-point data exchange Pstream::exchange, char> ( sendBuffers_, @@ -73,6 +146,30 @@ void Foam::PstreamBuffers::finalExchange if (commsType_ == UPstream::commsTypes::nonBlocking) { + // Preparation. Temporarily abuse recvSizes as logic to clear + // send buffers that are not in the neighbourhood connection + { + recvSizes.resize_nocopy(nProcs_); + recvSizes = 0; + + // Preserve self-send, even if not described by neighbourhood + recvSizes[UPstream::myProcNo(comm_)] = 1; + + for (const label proci : sendProcs) + { + recvSizes[proci] = 1; // Connected + } + + for (label proci=0; proci < nProcs_; ++proci) + { + if (!recvSizes[proci]) // Not connected + { + sendBuffers_[proci].clear(); + } + } + } + + // PEX stage 1: exchange sizes (limited neighbourhood) Pstream::exchangeSizes ( sendProcs, @@ -83,6 +180,7 @@ void Foam::PstreamBuffers::finalExchange comm_ ); + // PEX stage 2: point-to-point data exchange Pstream::exchange, char> ( sendBuffers_, @@ -96,11 +194,10 @@ void Foam::PstreamBuffers::finalExchange } -void Foam::PstreamBuffers::finalExchangeGatherScatter +void Foam::PstreamBuffers::finalGatherScatter ( const bool isGather, const bool wait, - const bool needSizes, labelList& recvSizes ) { @@ -133,6 +230,12 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter if (commsType_ == UPstream::commsTypes::nonBlocking) { + // Use PEX algorithm + // - for a non-sparse gather/scatter, it is presumed that + // MPI_Gather/MPI_Scatter will be the most efficient way to + // communicate the sizes. + + // PEX stage 1: exchange sizes (gather or scatter) if (isGather) { // gather mode (all-to-one): master [0] <- everyone @@ -166,7 +269,7 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter recvSizes[0] = myRecv; } - + // PEX stage 2: point-to-point data exchange Pstream::exchange, char> ( sendBuffers_, @@ -378,7 +481,7 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const Foam::labelList Foam::PstreamBuffers::recvDataCounts() const { - labelList counts(recvPositions_.size(), Zero); + labelList counts(nProcs_, Zero); if (finishedSendsCalled_) { @@ -404,17 +507,20 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const } -Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount(const label proci) const +Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount +( + const label excludeProci +) const { label maxLen = 0; if (finishedSendsCalled_) { - forAll(recvBuffers_, idx) + forAll(recvBuffers_, proci) { - const label len(recvBuffers_[idx].size() - recvPositions_[idx]); - if (idx != proci) + if (excludeProci != proci) { + label len(recvBuffers_[proci].size() - recvPositions_[proci]); maxLen = max(maxLen, len); } } @@ -494,6 +600,9 @@ void Foam::PstreamBuffers::finishedSends const bool wait ) { + // Resize for copying back + recvSizes.resize_nocopy(sendBuffers_.size()); + finalExchange(wait, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) @@ -510,39 +619,25 @@ void Foam::PstreamBuffers::finishedSends } -void Foam::PstreamBuffers::finishedSends +void Foam::PstreamBuffers::finishedNeighbourSends ( - const labelUList& sendProcs, - const labelUList& recvProcs, - const bool wait -) -{ - labelList recvSizes; - finalExchange(sendProcs, recvProcs, wait, recvSizes); -} - - -void Foam::PstreamBuffers::finishedSends -( - const labelUList& sendProcs, - const labelUList& recvProcs, + const labelUList& neighProcs, labelList& recvSizes, const bool wait ) { - finalExchange(sendProcs, recvProcs, wait, recvSizes); + finalExchange(neighProcs, neighProcs, wait, recvSizes); +} - if (commsType_ != UPstream::commsTypes::nonBlocking) - { - FatalErrorInFunction - << "Obtaining sizes not supported in " - << UPstream::commsTypeNames[commsType_] << endl - << " since transfers already in progress. Use non-blocking instead." - << exit(FatalError); - // Note: maybe possible only if using different tag from write started - // by ~UOPstream. Needs some work. - } +void Foam::PstreamBuffers::finishedNeighbourSends +( + const labelUList& neighProcs, + const bool wait +) +{ + labelList recvSizes; + finalExchange(neighProcs, neighProcs, wait, recvSizes); } @@ -562,10 +657,8 @@ bool Foam::PstreamBuffers::finishedSends } // Update send connections - // - reasonable to assume there are no self-sends on UPstream::myProcNo forAll(sendBuffers_, proci) { - // ie, sendDataCount(proci) != 0 if (sendConnections.set(proci, !sendBuffers_[proci].empty())) { // The state changed @@ -577,22 +670,20 @@ bool Foam::PstreamBuffers::finishedSends if (changed) { - // Create send/recv topology + // Update send/recv topology + labelList recvSizes; + finishedSends(recvSizes, wait); // eg, using all-to-all // The send ranks sendProcs.clear(); forAll(sendBuffers_, proci) { - // ie, sendDataCount(proci) != 0 if (!sendBuffers_[proci].empty()) { sendProcs.push_back(proci); } } - labelList recvSizes; - finishedSends(recvSizes, wait); // All-to-all - // The recv ranks recvProcs.clear(); forAll(recvSizes, proci) @@ -606,46 +697,25 @@ bool Foam::PstreamBuffers::finishedSends else { // Use existing send/recv ranks - - finishedSends(sendProcs, recvProcs, wait); + labelList recvSizes; + finalExchange(sendProcs, recvProcs, wait, recvSizes); } return changed; } -void Foam::PstreamBuffers::finishedNeighbourSends -( - const labelUList& neighProcs, - labelList& recvSizes, - const bool wait -) -{ - finishedSends(neighProcs, neighProcs, recvSizes, wait); -} - - -void Foam::PstreamBuffers::finishedNeighbourSends -( - const labelUList& neighProcs, - const bool wait -) -{ - finishedSends(neighProcs, neighProcs, wait); -} - - void Foam::PstreamBuffers::finishedGathers(const bool wait) { labelList recvSizes; - finalExchangeGatherScatter(true, wait, false, recvSizes); + finalGatherScatter(true, wait, recvSizes); } void Foam::PstreamBuffers::finishedScatters(const bool wait) { labelList recvSizes; - finalExchangeGatherScatter(false, wait, false, recvSizes); + finalGatherScatter(false, wait, recvSizes); } @@ -655,7 +725,7 @@ void Foam::PstreamBuffers::finishedGathers const bool wait ) { - finalExchangeGatherScatter(true, wait, true, recvSizes); + finalGatherScatter(true, wait, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { @@ -677,7 +747,7 @@ void Foam::PstreamBuffers::finishedScatters const bool wait ) { - finalExchangeGatherScatter(false, wait, true, recvSizes); + finalGatherScatter(false, wait, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H index 05efab023b..68a3c81d32 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H @@ -164,7 +164,7 @@ class PstreamBuffers ); //- Mark sends as done. - // Only exchange sizes using the sendProcs/recvProcs subset + // Only exchange sizes using the neighbour ranks // (nonBlocking comms). void finalExchange ( @@ -175,11 +175,10 @@ class PstreamBuffers ); //- For all-to-one or one-to-all - void finalExchangeGatherScatter + void finalGatherScatter ( const bool isGather, const bool wait, - const bool needSizes, // If recvSizes needed or scratch labelList& recvSizes ); @@ -202,6 +201,12 @@ class PstreamBuffers public: + // Static Data + + //- Preferred exchange algorithm (may change or be removed in future) + static int algorithm; + + // Constructors //- Construct given communication type (default: nonBlocking), message @@ -340,7 +345,7 @@ public: //- Maximum receive size, excluding the specified processor rank //- Must call finishedSends() or other finished.. method first! - label maxNonLocalRecvCount(const label proci) const; + label maxNonLocalRecvCount(const label excludeProci) const; //- Number of unconsumed receive bytes for the specified processor. //- Must call finishedSends() or other finished.. method first! @@ -397,60 +402,6 @@ public: // Functions with restricted neighbours - //- Mark sends as done using subset of send/recv ranks - //- to exchange data on. - // - // Non-blocking mode: populates receive buffers. - // \param sendProcs ranks used for sends - // \param recvProcs ranks used for recvs - // \param wait wait for requests to complete (in nonBlocking mode) - // - // \warning currently only valid for nonBlocking comms. - void finishedSends - ( - const labelUList& sendProcs, - const labelUList& recvProcs, - const bool wait = true - ); - - //- Mark sends as done using subset of send/recv ranks - //- to exchange data on. Recovers the sizes (bytes) received. - // - // Non-blocking mode: populates receive buffers. - // - // \param sendProcs ranks used for sends - // \param recvProcs ranks used for recvs - // \param[out] recvSizes the sizes (bytes) received - // \param wait wait for requests to complete (in nonBlocking mode) - // - // \warning currently only valid for nonBlocking comms. - void finishedSends - ( - const labelUList& sendProcs, - const labelUList& recvProcs, - labelList& recvSizes, - const bool wait = true - ); - - //- A caching version that uses a limited send/recv connectivity. - // - // Non-blocking mode: populates receive buffers. - // \param sendConnections on/off for sending ranks - // \param sendProcs ranks used for sends - // \param recvProcs ranks used for recvs - // \param wait wait for requests to complete (in nonBlocking mode) - // - // \return True if the send/recv connectivity changed - // - // \warning currently only valid for nonBlocking comms. - bool finishedSends - ( - bitSet& sendConnections, - DynamicList