ENH: improve handling of multi-pass send/recv (#3152)

- the maxCommsSize variable is used to 'chunk' large data transfers
  (eg, with PstreamBuffers) into a multi-pass send/recv sequence.

  The send/recv windows for chunk-wise transfers:

      iter    data window
      ----    -----------
      0       [0, 1*chunk]
      1       [1*chunk, 2*chunk]
      2       [2*chunk, 3*chunk]
      ...

  Since we mostly send/recv in bytes, the current internal limit
  for MPI counts (INT_MAX) can be hit rather quickly.

  The chunking limit should thus also be INT_MAX, but since it is
  rather tedious to specify such large numbers, can instead use

      maxCommsSize = -1

  to specify (INT_MAX-1) as the limit.
  The default value of maxCommsSize = 0 (ie, no chunking).

Note
~~~~
  In previous versions, the number of chunks was determined by the
  sender sizes. This required an additional MPI_Allreduce to establish
  an overall consistent number of chunks to walk. This additional
  overhead each time meant that maxCommsSize was rarely actually
  enabled.

  We can, however, instead rely on the send/recv buffers having been
  consistently sized and simply walk through the local send/recvs until
  no further chunks need to be exchanged. As an additional enhancement,
  the message tags are connected to chunking iteration, which allows
  the setup of all send/recvs without an intermediate Allwait.

ENH: extend UPstream::probeMessage to use int64 instead of int for sizes
This commit is contained in:
Mark Olesen 2024-04-29 09:09:50 +02:00
parent e15d696a24
commit d9c73ae489
11 changed files with 625 additions and 688 deletions

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -44,175 +44,62 @@ Description
using namespace Foam;
// Looks like Pstream::exchangeBuf
template<class T>
void do_exchangeBuf
//- Number of elements corresponding to max byte transfer.
// Normal upper limit is INT_MAX since MPI sizes are limited to <int>.
template<class Type>
inline std::size_t maxTransferCount
(
const label sendSize,
const char* sendData,
const label recvSize,
char* recvData,
const int tag,
const label comm,
const bool wait
)
const std::size_t max_bytes = std::size_t(0)
) noexcept
{
const label startOfRequests = UPstream::nRequests();
// Set up receives
// ~~~~~~~~~~~~~~~
// forAll(recvSizes, proci)
{
// if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0)
if (!Pstream::master(comm) && recvSize > 0)
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
UPstream::myProcNo(comm), // proci,
recvData,
recvSize*sizeof(T),
tag,
comm
);
}
}
// Set up sends
// ~~~~~~~~~~~~
// forAll(sendBufs, proci)
for (const int proci : Pstream::subProcs(comm))
{
if (sendSize > 0)
// if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0)
{
if
(
!UOPstream::write
(
UPstream::commsTypes::nonBlocking,
proci,
sendData,
sendSize*sizeof(T),
tag,
comm
)
)
{
FatalErrorInFunction
<< "Cannot send outgoing message. "
<< "to:" << proci << " nBytes:"
<< label(sendSize*sizeof(T))
<< Foam::abort(FatalError);
}
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
if (wait)
{
UPstream::waitRequests(startOfRequests);
}
return
(
(max_bytes == 0) // ie, unlimited
? (std::size_t(0)) //
: (max_bytes > std::size_t(INT_MAX)) // MPI limit is <int>
? (std::size_t(INT_MAX) / sizeof(Type)) //
: (max_bytes > sizeof(Type)) // require an integral number
? (max_bytes / sizeof(Type)) //
: (std::size_t(1)) // min of one element
);
}
// Looks like Pstream::exchangeContainer
template<class Container, class T>
void do_exchangeContainer
//- Upper limit on number of transfer bytes.
// Max bytes is normally INT_MAX since MPI sizes are limited to <int>.
// Negative values indicate a subtraction from INT_MAX.
inline std::size_t PstreamDetail_maxTransferBytes
(
const Container& sendData,
const label recvSize,
Container& recvData,
const int tag,
const label comm,
const bool wait
)
const int64_t max_bytes
) noexcept
{
const label startOfRequests = UPstream::nRequests();
// Set up receives
// ~~~~~~~~~~~~~~~
// for (const int proci : Pstream::allProcs(comm))
{
if (!Pstream::master(comm) && recvSize > 0)
// if (proci != Pstream::myProcNo(comm) && recvSize > 0)
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
UPstream::myProcNo(comm), // proci,
recvData.data_bytes(),
recvSize*sizeof(T),
tag,
comm
);
}
}
// Set up sends
// ~~~~~~~~~~~~
if (Pstream::master(comm) && sendData.size() > 0)
{
for (const int proci : Pstream::subProcs(comm))
{
if
(
!UOPstream::write
(
UPstream::commsTypes::nonBlocking,
proci,
sendData.cdata_bytes(),
sendData.size_bytes(),
tag,
comm
)
)
{
FatalErrorInFunction
<< "Cannot send outgoing message. "
<< "to:" << proci << " nBytes:"
<< label(sendData.size_bytes())
<< Foam::abort(FatalError);
}
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
if (wait)
{
UPstream::waitRequests(startOfRequests);
}
return
(
(max_bytes < 0) // (numBytes fewer than INT_MAX)
? std::size_t(INT_MAX + max_bytes)
: std::size_t(max_bytes)
);
}
template<class Container, class T>
template<class Container, class Type>
void broadcast_chunks
(
Container& sendData,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool wait = true
const label comm = UPstream::worldComm
const int64_t maxComms_bytes = UPstream::maxCommsSize
)
{
// OR static_assert(is_contiguous<T>::value, "Contiguous data only!")
if (!is_contiguous<T>::value)
if (!is_contiguous<Type>::value)
{
FatalErrorInFunction
<< "Contiguous data only." << sizeof(T) << Foam::abort(FatalError);
<< "Contiguous data only." << sizeof(Type)
<< Foam::abort(FatalError);
}
if (UPstream::maxCommsSize <= 0)
if (maxComms_bytes == 0)
{
// Do in one go
Info<< "send " << sendData.size() << " elements in one go" << endl;
@ -227,93 +114,90 @@ void broadcast_chunks
sendData.resize_nocopy(recvSize); // A no-op on master
// Determine the number of chunks to send. Note that we
// only have to look at the sending data since we are
// guaranteed that some processor's sending size is some other
// processor's receive size. Also we can ignore any local comms.
// We need to send chunks so the number of iterations:
// maxChunkSize iterations
// ------------ ----------
// 0 0
// 1..maxChunkSize 1
// maxChunkSize+1..2*maxChunkSize 2
// ...
const label maxChunkSize
// The chunk size (number of elements) corresponding to max byte transfer
// Is zero for non-chunked exchanges.
const std::size_t chunkSize
(
max
PstreamDetail_maxTransferCount<Type>
(
static_cast<label>(1),
static_cast<label>(UPstream::maxCommsSize/sizeof(T))
PstreamDetail_maxTransferBytes(maxComms_bytes)
)
);
label nChunks(0);
{
// Get max send count (elements)
// forAll(sendBufs, proci)
// {
// if (proci != Pstream::myProcNo(comm))
// {
// nChunks = max(nChunks, sendBufs[proci].size());
// }
// }
nChunks = sendSize;
if (chunkSize)
{
// Convert from send count (elements) to number of chunks.
// Can normally calculate with (count-1), but add some safety
if (nChunks)
{
nChunks = 1 + (nChunks/maxChunkSize);
}
reduce(nChunks, maxOp<label>(), tag, comm);
label nChunks = 1 + (sendSize/label(chunkSize));
Info
<< "send " << sendSize << " elements ("
<< (sendSize*sizeof(T)) << " bytes) in " << nChunks
<< " chunks of " << maxChunkSize << " elements ("
<< (maxChunkSize*sizeof(T)) << " bytes) for maxCommsSize:"
<< Pstream::maxCommsSize
<< (sendSize*sizeof(Type)) << " bytes) in " << nChunks
<< " chunks of " << label(chunkSize) << " elements ("
<< label(chunkSize*sizeof(Type)) << " bytes) for maxCommsSize:"
<< label(maxComms_bytes)
<< endl;
}
// stress-test with shortened sendSize
// will produce useless loops, but no calls
// sendSize /= 2;
label nSend(0);
label startSend(0);
char* charPtrSend;
typedef stdFoam::span<Type> sendType;
for (label iter = 0; iter < nChunks; ++iter)
do
{
nSend = min
(
maxChunkSize,
sendSize-startSend
);
sendType payload(sendData.data(), sendData.size());
charPtrSend =
(
nSend > 0
? reinterpret_cast<char*>(&(sendData[startSend]))
: nullptr
);
Info<< "iter " << iter
<< ": beg=" << startSend << " len=" << nSend
<< " (" << (nSend*sizeof(T)) << " bytes)" << endl;
UPstream::broadcast(charPtrSend, nSend*sizeof(T), comm);
// forAll(nSend, proci)
if (!chunkSize)
{
startSend += nSend;
UPstream::broadcast
(
payload.data_bytes(),
payload.size_bytes(),
comm
);
break;
}
// Dispatch chunk-wise until there is nothing left
for (int iter = 0; /*true*/; ++iter)
{
// The begin/end for the data window
const std::size_t beg = (std::size_t(iter)*chunkSize);
const std::size_t end = (std::size_t(iter+1)*chunkSize);
if (payload.size() <= beg)
{
// No more data windows
break;
}
sendType window
(
(end < payload.size())
? payload.subspan(beg, end - beg)
: payload.subspan(beg)
);
Info<< "iter " << iter
<< ": beg=" << label(beg) << " len=" << label(window.size())
<< " (" << label(window.size_bytes()) << " bytes)" << endl;
UPstream::broadcast
(
window.data_bytes(),
window.size_bytes(),
comm
);
}
}
while (false);
Info<< "final: " << startSend << endl;
Info<< "final" << endl;
}
@ -333,7 +217,7 @@ int main(int argc, char *argv[])
}
labelList input1;
if (Pstream::master())
if (UPstream::master())
{
input1 = identity(500);
}
@ -348,7 +232,7 @@ int main(int argc, char *argv[])
// Mostly the same with PstreamBuffers
if (false)
{
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
PstreamBuffers pBufs;
labelList sendData;
if (Pstream::master())

View File

@ -130,7 +130,7 @@ int main(int argc, char *argv[])
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
std::pair<int, int> probed =
std::pair<int, int64_t> probed =
UPstream::probeMessage
(
UPstream::commsTypes::nonBlocking,
@ -143,8 +143,8 @@ int main(int argc, char *argv[])
{
// Message found and had size: receive it
const label proci = probed.first;
const label count = probed.second;
const label proci(probed.first);
const label count(probed.second);
recvBufs(proci).resize_nocopy(count);
recvFromProc(recvRequests.size()) = proci;

View File

@ -119,7 +119,7 @@ int main(int argc, char *argv[])
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
std::pair<int, int> probed =
std::pair<int, int64_t> probed =
UPstream::probeMessage
(
UPstream::commsTypes::nonBlocking,
@ -132,8 +132,8 @@ int main(int argc, char *argv[])
{
// Message found and had size: receive it
const label proci = probed.first;
const label count = probed.second;
const label proci(probed.first);
const label count(probed.second);
if (optNonBlocking)
{

View File

@ -1,7 +1,7 @@
/*--------------------------------*- C++ -*----------------------------------*\
| ========= | |
| \\ / F ield | OpenFOAM: The Open Source CFD Toolbox |
| \\ / O peration | Version: v2312 |
| \\ / O peration | Version: v2406 |
| \\ / A nd | Website: www.openfoam.com |
| \\/ M anipulation | |
\*---------------------------------------------------------------------------*/
@ -146,13 +146,18 @@ OptimisationSwitches
// The default and minimum is (20000000).
mpiBufferSize 0;
// Optional max size (bytes) for unstructured data exchanges. In some
// phases of OpenFOAM it can send over very large data chunks
// Optional max size (bytes) for unstructured data exchanges.
// In some phases of OpenFOAM it can send over very large data chunks
// (e.g. in parallel load balancing) and some MPI implementations have
// problems with this. Setting this variable > 0 indicates that the
// data exchange needs to be done in multiple passes, each of maxCommsSize.
// This is not switched on by default since it requires an additional
// global reduction, even if multi-pass is not needed)
// problems with this.
//
// This tuning parameter specifies the max number of bytes before
// switching to a multi-pass send/recv
// (currently only affects PstreamBuffers exchanges).
//
// 0 : disabled
// >0 : limit exchanges to specified number of bytes
// <0 : limit exchanges to INT_MAX minus specified number of bytes
maxCommsSize 0;
// Optional (experimental) feature in lduMatrixUpdate

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2023 OpenCFD Ltd.
Copyright (C) 2023-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -82,7 +82,7 @@ void exchangeConsensus
{
buf.clear();
}
recvSizes = Zero;
recvSizes = Foam::zero{};
if (!UPstream::is_rank(comm))
{
@ -109,7 +109,7 @@ void exchangeConsensus
recvBufs[myProci] = sendBufs[myProci];
if (myProci < recvSizes.size())
{
recvSizes[myProci] = recvBufs.size();
recvSizes[myProci] = recvBufs[myProci].size();
}
}
@ -175,7 +175,7 @@ void exchangeConsensus
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
std::pair<int, int> probed =
std::pair<int, int64_t> probed =
UPstream::probeMessage
(
UPstream::commsTypes::nonBlocking,
@ -189,8 +189,8 @@ void exchangeConsensus
// Message found and had size.
// - receive into dest buffer location
const label proci = probed.first;
const label count = (probed.second / sizeof(Type));
const label proci(probed.first);
const label count(probed.second / sizeof(Type));
auto& recvData = recvBufs[proci];
recvData.resize(count); // OK with resize() instead of _nocopy()
@ -254,10 +254,10 @@ void exchangeConsensus
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
// TDB: const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);
// Initial: clear all receive locations
// Preferrable to clear out the map entries instead of the map itself
@ -300,7 +300,12 @@ void exchangeConsensus
// Setup sends
// ------------------------------------------------------------------------
// TDB: initialBarrier ...
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere
if (initialBarrier)
{
UPstream::barrier(comm);
}
// Algorithm NBX: Nonblocking consensus with Map (HashTable) containers
@ -347,7 +352,7 @@ void exchangeConsensus
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
std::pair<int, int> probed =
std::pair<int, int64_t> probed =
UPstream::probeMessage
(
UPstream::commsTypes::nonBlocking,
@ -361,8 +366,8 @@ void exchangeConsensus
// Message found and had size.
// - receive into dest buffer location
const label proci = probed.first;
const label count = (probed.second / sizeof(Type));
const label proci(probed.first);
const label count(probed.second / sizeof(Type));
auto& recvData = recvBufs(proci);
recvData.resize(count); // OK with resize() instead of _nocopy()

View File

@ -565,14 +565,14 @@ public:
//- Probe for an incoming message.
//
// \param commsType Blocking or not
// \param commsType Non-blocking or not
// \param fromProcNo The source rank (negative == ANY_SOURCE)
// \param tag The source message tag
// \param communicator The communicator index
//
// \returns source rank and message size (bytes)
// and (-1, 0) on failure
static std::pair<int,int> probeMessage
static std::pair<int,int64_t> probeMessage
(
const UPstream::commsTypes commsType,
const int fromProcNo,

View File

@ -2049,6 +2049,8 @@ void Foam::argList::parse
Info<< "Pstream initialized with:" << nl
<< " floatTransfer : "
<< Switch::name(UPstream::floatTransfer) << nl
<< " maxCommsSize : "
<< UPstream::maxCommsSize << nl
<< " nProcsSimpleSum : "
<< UPstream::nProcsSimpleSum << nl
<< " nonBlockingExchange: "

View File

@ -146,15 +146,17 @@ static void broadcastFile_single
const uint64_t maxChunkSize =
(
UPstream::maxCommsSize > 0
(UPstream::maxCommsSize > 0)
? uint64_t(UPstream::maxCommsSize)
: uint64_t(pTraits<int>::max)
: (UPstream::maxCommsSize < 0) // (numBytes fewer than INT_MAX)
? uint64_t(INT_MAX + UPstream::maxCommsSize)
: uint64_t(INT_MAX) // MPI limit is <int>
);
while (fileLength > 0)
{
const uint64_t sendSize = min(fileLength, maxChunkSize);
const uint64_t sendSize = std::min(fileLength, maxChunkSize);
fileLength -= sendSize;
// Read file contents into a character buffer

View File

@ -91,7 +91,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
{}
std::pair<int,int>
std::pair<int,int64_t>
Foam::UPstream::probeMessage
(
const UPstream::commsTypes commsType,
@ -100,7 +100,7 @@ Foam::UPstream::probeMessage
const label communicator
)
{
return std::pair<int,int>(-1, 0);
return std::pair<int,int64_t>(-1, 0);
}

View File

@ -766,7 +766,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
}
std::pair<int,int>
std::pair<int,int64_t>
Foam::UPstream::probeMessage
(
const UPstream::commsTypes commsType,
@ -775,7 +775,7 @@ Foam::UPstream::probeMessage
const label communicator
)
{
std::pair<int,int> result(-1, 0);
std::pair<int,int64_t> result(-1, 0);
// No-op for non-parallel or not on communicator
if (!UPstream::parRun() || !UPstream::is_rank(communicator))
@ -869,7 +869,7 @@ Foam::UPstream::probeMessage
result.first = status.MPI_SOURCE;
result.second = int(count);
result.second = int64_t(count);
}
return result;