ENH: support run-time PstreamBuffers algorithm selection (advanced option)

- this helps isolate selection of PEX vs NBX, for potential migration
  to a hybrid PEX
This commit is contained in:
Mark Olesen 2023-03-02 11:46:35 +01:00
parent 99bf27cbee
commit 82c0b360c6
5 changed files with 240 additions and 157 deletions

View File

@ -470,8 +470,7 @@ public:
// Exchange
//- Helper: exchange sizes of sendBufs for specified
//- set of send/receive processes.
//- Helper: exchange sizes of sendBufs for specified send/recv ranks
template<class Container>
static void exchangeSizes
(
@ -483,6 +482,17 @@ public:
const label comm = UPstream::worldComm
);
//- Helper: exchange sizes of sendBufs for specified neighbour ranks
template<class Container>
static void exchangeSizes
(
const labelUList& neighProcs,
const Container& sendBufs,
labelList& sizes,
const label tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Helper: exchange sizes of sendBufs.
//- The sendBufs is the data per processor (in the communicator).
// Returns sizes of sendBufs on the sending processor.

View File

@ -28,6 +28,30 @@ License
#include "PstreamBuffers.H"
#include "bitSet.H"
#include "debug.H"
#include "registerSwitch.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
int Foam::PstreamBuffers::algorithm
(
// Not really the most creative name...
Foam::debug::optimisationSwitch("pbufs.algorithm", -1)
);
registerOptSwitch
(
"pbufs.algorithm",
int,
Foam::PstreamBuffers::algorithm
);
// Simple enumerations
// -------------------
static constexpr int algorithm_PEX_allToAll = -1; // OpenFOAM 2212 and earlier
//static constexpr int algorithm_PEX_hybrid = 0; // New default?
static constexpr int algorithm_full_NBX = 1; // Experimental
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
@ -43,9 +67,58 @@ void Foam::PstreamBuffers::finalExchange
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
// Dense storage uses all-to-all
Pstream::exchangeSizes(sendBuffers_, recvSizes, comm_);
if
(
wait
&& (algorithm >= algorithm_full_NBX)
&& (UPstream::maxCommsSize <= 0)
)
{
// NBX algorithm (nonblocking exchange)
// - when requested and waiting, no data chunking etc
PstreamDetail::exchangeConsensus<DynamicList<char>, char>
(
sendBuffers_,
recvBuffers_,
recvSizes,
(tag_ + 271828), // some unique tag?
comm_,
wait
);
return;
}
// PEX algorithm with two different flavours of exchanging sizes
// Assemble the send sizes (cf. Pstream::exchangeSizes)
labelList sendSizes(nProcs_);
forAll(sendBuffers_, proci)
{
sendSizes[proci] = sendBuffers_[proci].size();
}
recvSizes.resize_nocopy(nProcs_);
if (algorithm == algorithm_PEX_allToAll)
{
// PEX stage 1: exchange sizes (all-to-all)
UPstream::allToAll(sendSizes, recvSizes, comm_);
}
else
{
// PEX stage 1: exchange sizes (non-blocking consensus)
UPstream::allToAllConsensus
(
sendSizes,
recvSizes,
(tag_ + 314159), // some unique tag?
comm_
);
}
// PEX stage 2: point-to-point data exchange
Pstream::exchange<DynamicList<char>, char>
(
sendBuffers_,
@ -73,6 +146,30 @@ void Foam::PstreamBuffers::finalExchange
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
// Preparation. Temporarily abuse recvSizes as logic to clear
// send buffers that are not in the neighbourhood connection
{
recvSizes.resize_nocopy(nProcs_);
recvSizes = 0;
// Preserve self-send, even if not described by neighbourhood
recvSizes[UPstream::myProcNo(comm_)] = 1;
for (const label proci : sendProcs)
{
recvSizes[proci] = 1; // Connected
}
for (label proci=0; proci < nProcs_; ++proci)
{
if (!recvSizes[proci]) // Not connected
{
sendBuffers_[proci].clear();
}
}
}
// PEX stage 1: exchange sizes (limited neighbourhood)
Pstream::exchangeSizes
(
sendProcs,
@ -83,6 +180,7 @@ void Foam::PstreamBuffers::finalExchange
comm_
);
// PEX stage 2: point-to-point data exchange
Pstream::exchange<DynamicList<char>, char>
(
sendBuffers_,
@ -96,11 +194,10 @@ void Foam::PstreamBuffers::finalExchange
}
void Foam::PstreamBuffers::finalExchangeGatherScatter
void Foam::PstreamBuffers::finalGatherScatter
(
const bool isGather,
const bool wait,
const bool needSizes,
labelList& recvSizes
)
{
@ -133,6 +230,12 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
// Use PEX algorithm
// - for a non-sparse gather/scatter, it is presumed that
// MPI_Gather/MPI_Scatter will be the most efficient way to
// communicate the sizes.
// PEX stage 1: exchange sizes (gather or scatter)
if (isGather)
{
// gather mode (all-to-one): master [0] <- everyone
@ -166,7 +269,7 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
recvSizes[0] = myRecv;
}
// PEX stage 2: point-to-point data exchange
Pstream::exchange<DynamicList<char>, char>
(
sendBuffers_,
@ -378,7 +481,7 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
{
labelList counts(recvPositions_.size(), Zero);
labelList counts(nProcs_, Zero);
if (finishedSendsCalled_)
{
@ -404,17 +507,20 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
}
Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount(const label proci) const
Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount
(
const label excludeProci
) const
{
label maxLen = 0;
if (finishedSendsCalled_)
{
forAll(recvBuffers_, idx)
forAll(recvBuffers_, proci)
{
const label len(recvBuffers_[idx].size() - recvPositions_[idx]);
if (idx != proci)
if (excludeProci != proci)
{
label len(recvBuffers_[proci].size() - recvPositions_[proci]);
maxLen = max(maxLen, len);
}
}
@ -494,6 +600,9 @@ void Foam::PstreamBuffers::finishedSends
const bool wait
)
{
// Resize for copying back
recvSizes.resize_nocopy(sendBuffers_.size());
finalExchange(wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
@ -510,39 +619,25 @@ void Foam::PstreamBuffers::finishedSends
}
void Foam::PstreamBuffers::finishedSends
void Foam::PstreamBuffers::finishedNeighbourSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool wait
)
{
labelList recvSizes;
finalExchange(sendProcs, recvProcs, wait, recvSizes);
}
void Foam::PstreamBuffers::finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const labelUList& neighProcs,
labelList& recvSizes,
const bool wait
)
{
finalExchange(sendProcs, recvProcs, wait, recvSizes);
finalExchange(neighProcs, neighProcs, wait, recvSizes);
}
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
FatalErrorInFunction
<< "Obtaining sizes not supported in "
<< UPstream::commsTypeNames[commsType_] << endl
<< " since transfers already in progress. Use non-blocking instead."
<< exit(FatalError);
// Note: maybe possible only if using different tag from write started
// by ~UOPstream. Needs some work.
}
void Foam::PstreamBuffers::finishedNeighbourSends
(
const labelUList& neighProcs,
const bool wait
)
{
labelList recvSizes;
finalExchange(neighProcs, neighProcs, wait, recvSizes);
}
@ -562,10 +657,8 @@ bool Foam::PstreamBuffers::finishedSends
}
// Update send connections
// - reasonable to assume there are no self-sends on UPstream::myProcNo
forAll(sendBuffers_, proci)
{
// ie, sendDataCount(proci) != 0
if (sendConnections.set(proci, !sendBuffers_[proci].empty()))
{
// The state changed
@ -577,22 +670,20 @@ bool Foam::PstreamBuffers::finishedSends
if (changed)
{
// Create send/recv topology
// Update send/recv topology
labelList recvSizes;
finishedSends(recvSizes, wait); // eg, using all-to-all
// The send ranks
sendProcs.clear();
forAll(sendBuffers_, proci)
{
// ie, sendDataCount(proci) != 0
if (!sendBuffers_[proci].empty())
{
sendProcs.push_back(proci);
}
}
labelList recvSizes;
finishedSends(recvSizes, wait); // All-to-all
// The recv ranks
recvProcs.clear();
forAll(recvSizes, proci)
@ -606,46 +697,25 @@ bool Foam::PstreamBuffers::finishedSends
else
{
// Use existing send/recv ranks
finishedSends(sendProcs, recvProcs, wait);
labelList recvSizes;
finalExchange(sendProcs, recvProcs, wait, recvSizes);
}
return changed;
}
void Foam::PstreamBuffers::finishedNeighbourSends
(
const labelUList& neighProcs,
labelList& recvSizes,
const bool wait
)
{
finishedSends(neighProcs, neighProcs, recvSizes, wait);
}
void Foam::PstreamBuffers::finishedNeighbourSends
(
const labelUList& neighProcs,
const bool wait
)
{
finishedSends(neighProcs, neighProcs, wait);
}
void Foam::PstreamBuffers::finishedGathers(const bool wait)
{
labelList recvSizes;
finalExchangeGatherScatter(true, wait, false, recvSizes);
finalGatherScatter(true, wait, recvSizes);
}
void Foam::PstreamBuffers::finishedScatters(const bool wait)
{
labelList recvSizes;
finalExchangeGatherScatter(false, wait, false, recvSizes);
finalGatherScatter(false, wait, recvSizes);
}
@ -655,7 +725,7 @@ void Foam::PstreamBuffers::finishedGathers
const bool wait
)
{
finalExchangeGatherScatter(true, wait, true, recvSizes);
finalGatherScatter(true, wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -677,7 +747,7 @@ void Foam::PstreamBuffers::finishedScatters
const bool wait
)
{
finalExchangeGatherScatter(false, wait, true, recvSizes);
finalGatherScatter(false, wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{

View File

@ -164,7 +164,7 @@ class PstreamBuffers
);
//- Mark sends as done.
// Only exchange sizes using the sendProcs/recvProcs subset
// Only exchange sizes using the neighbour ranks
// (nonBlocking comms).
void finalExchange
(
@ -175,11 +175,10 @@ class PstreamBuffers
);
//- For all-to-one or one-to-all
void finalExchangeGatherScatter
void finalGatherScatter
(
const bool isGather,
const bool wait,
const bool needSizes, // If recvSizes needed or scratch
labelList& recvSizes
);
@ -202,6 +201,12 @@ class PstreamBuffers
public:
// Static Data
//- Preferred exchange algorithm (may change or be removed in future)
static int algorithm;
// Constructors
//- Construct given communication type (default: nonBlocking), message
@ -340,7 +345,7 @@ public:
//- Maximum receive size, excluding the specified processor rank
//- Must call finishedSends() or other finished.. method first!
label maxNonLocalRecvCount(const label proci) const;
label maxNonLocalRecvCount(const label excludeProci) const;
//- Number of unconsumed receive bytes for the specified processor.
//- Must call finishedSends() or other finished.. method first!
@ -397,60 +402,6 @@ public:
// Functions with restricted neighbours
//- Mark sends as done using subset of send/recv ranks
//- to exchange data on.
//
// Non-blocking mode: populates receive buffers.
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool wait = true
);
//- Mark sends as done using subset of send/recv ranks
//- to exchange data on. Recovers the sizes (bytes) received.
//
// Non-blocking mode: populates receive buffers.
//
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
const bool wait = true
);
//- A caching version that uses a limited send/recv connectivity.
//
// Non-blocking mode: populates receive buffers.
// \param sendConnections on/off for sending ranks
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \return True if the send/recv connectivity changed
//
// \warning currently only valid for nonBlocking comms.
bool finishedSends
(
bitSet& sendConnections,
DynamicList<label>& sendProcs,
DynamicList<label>& recvProcs,
const bool wait = true
);
//- Mark sends as done using subset of send/recv ranks
//- and recover the sizes (bytes) received.
//
@ -484,6 +435,25 @@ public:
const bool wait = true
);
//- A caching version that uses a limited send/recv connectivity.
//
// Non-blocking mode: populates receive buffers.
// \param sendConnections on/off for sending ranks
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \return True if the send/recv connectivity changed
//
// \warning currently only valid for nonBlocking comms.
bool finishedSends
(
bitSet& sendConnections,
DynamicList<label>& sendProcs,
DynamicList<label>& recvProcs,
const bool wait = true
);
// Gather/scatter modes

View File

@ -746,58 +746,89 @@ void Foam::Pstream::exchangeSizes
const label comm
)
{
//const label myProci = UPstream::myProcNo(comm);
const label myProci = UPstream::myProcNo(comm);
const label numProcs = UPstream::nProcs(comm);
if (sendBufs.size() != UPstream::nProcs(comm))
if (sendBufs.size() != numProcs)
{
FatalErrorInFunction
<< "Size of container " << sendBufs.size()
<< " does not equal the number of processors "
<< UPstream::nProcs(comm)
<< " does not equal the number of processors " << numProcs
<< Foam::abort(FatalError);
}
labelList sendSizes(sendProcs.size());
forAll(sendProcs, i)
labelList sendSizes(numProcs);
for (label proci = 0; proci < numProcs; ++proci)
{
sendSizes[i] = sendBufs[sendProcs[i]].size();
sendSizes[proci] = sendBufs[proci].size();
}
recvSizes.resize_nocopy(sendBufs.size());
recvSizes.resize_nocopy(numProcs);
recvSizes = 0; // Ensure non-received entries are properly zeroed
// Preserve self-send, even if not described by neighbourhood
recvSizes[myProci] = sendSizes[myProci];
const label startOfRequests = UPstream::nRequests();
for (const label proci : recvProcs)
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
reinterpret_cast<char*>(&recvSizes[proci]),
sizeof(label),
tag,
comm
);
if (proci != myProci)
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
reinterpret_cast<char*>(&recvSizes[proci]),
sizeof(label),
tag,
comm
);
}
}
forAll(sendProcs, i)
for (const label proci : sendProcs)
{
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
sendProcs[i],
reinterpret_cast<char*>(&sendSizes[i]),
sizeof(label),
tag,
comm
);
if (proci != myProci)
{
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
proci,
reinterpret_cast<char*>(&sendSizes[proci]),
sizeof(label),
tag,
comm
);
}
}
UPstream::waitRequests(startOfRequests);
}
template<class Container>
void Foam::Pstream::exchangeSizes
(
const labelUList& neighProcs,
const Container& sendBufs,
labelList& recvSizes,
const label tag,
const label comm
)
{
Pstream::exchangeSizes<Container>
(
neighProcs, // send
neighProcs, // recv
sendBufs,
recvSizes,
tag,
comm
);
}
// Sparse sending
template<class Container>
void Foam::Pstream::exchangeSizes

View File

@ -1720,6 +1720,8 @@ void Foam::argList::parse
<< " nProcsSimpleSum : " << Pstream::nProcsSimpleSum << nl
<< " nonBlockingExchange: "
<< Pstream::nProcsNonblockingExchange << nl
<< " exchange algorithm : "
<< PstreamBuffers::algorithm << nl
<< " commsType : "
<< Pstream::commsTypeNames[Pstream::defaultCommsType] << nl
<< " polling iterations : " << Pstream::nPollProcInterfaces