From d9c73ae48933ef9b8253a3bc1af36231e000fafe Mon Sep 17 00:00:00 2001 From: Mark Olesen Date: Mon, 29 Apr 2024 09:09:50 +0200 Subject: [PATCH] ENH: improve handling of multi-pass send/recv (#3152) - the maxCommsSize variable is used to 'chunk' large data transfers (eg, with PstreamBuffers) into a multi-pass send/recv sequence. The send/recv windows for chunk-wise transfers: iter data window ---- ----------- 0 [0, 1*chunk] 1 [1*chunk, 2*chunk] 2 [2*chunk, 3*chunk] ... Since we mostly send/recv in bytes, the current internal limit for MPI counts (INT_MAX) can be hit rather quickly. The chunking limit should thus also be INT_MAX, but since it is rather tedious to specify such large numbers, can instead use maxCommsSize = -1 to specify (INT_MAX-1) as the limit. The default value of maxCommsSize = 0 (ie, no chunking). Note ~~~~ In previous versions, the number of chunks was determined by the sender sizes. This required an additional MPI_Allreduce to establish an overall consistent number of chunks to walk. This additional overhead each time meant that maxCommsSize was rarely actually enabled. We can, however, instead rely on the send/recv buffers having been consistently sized and simply walk through the local send/recvs until no further chunks need to be exchanged. As an additional enhancement, the message tags are connected to chunking iteration, which allows the setup of all send/recvs without an intermediate Allwait. ENH: extend UPstream::probeMessage to use int64 instead of int for sizes --- .../parallel-chunks/Test-parallel-chunks.C | 308 ++---- .../parallel-comm3b/Test-parallel-comm3b.C | 6 +- .../test/parallel-nbx2/Test-parallel-nbx2.C | 6 +- etc/controlDict | 19 +- .../db/IOstreams/Pstreams/PstreamExchange.C | 921 +++++++++--------- .../Pstreams/PstreamExchangeConsensus.C | 29 +- src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H | 4 +- src/OpenFOAM/global/argList/argList.C | 2 + .../fileOperation/fileOperationBroadcast.C | 8 +- src/Pstream/dummy/UPstream.C | 4 +- src/Pstream/mpi/UPstream.C | 6 +- 11 files changed, 625 insertions(+), 688 deletions(-) diff --git a/applications/test/parallel-chunks/Test-parallel-chunks.C b/applications/test/parallel-chunks/Test-parallel-chunks.C index b5181bccec..30e71a12c5 100644 --- a/applications/test/parallel-chunks/Test-parallel-chunks.C +++ b/applications/test/parallel-chunks/Test-parallel-chunks.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -44,175 +44,62 @@ Description using namespace Foam; -// Looks like Pstream::exchangeBuf -template -void do_exchangeBuf +//- Number of elements corresponding to max byte transfer. +// Normal upper limit is INT_MAX since MPI sizes are limited to . +template +inline std::size_t maxTransferCount ( - const label sendSize, - const char* sendData, - const label recvSize, - char* recvData, - const int tag, - const label comm, - const bool wait -) + const std::size_t max_bytes = std::size_t(0) +) noexcept { - const label startOfRequests = UPstream::nRequests(); - - // Set up receives - // ~~~~~~~~~~~~~~~ - - // forAll(recvSizes, proci) - { - // if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0) - if (!Pstream::master(comm) && recvSize > 0) - { - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - UPstream::myProcNo(comm), // proci, - recvData, - recvSize*sizeof(T), - tag, - comm - ); - } - } - - - // Set up sends - // ~~~~~~~~~~~~ - - // forAll(sendBufs, proci) - for (const int proci : Pstream::subProcs(comm)) - { - if (sendSize > 0) - // if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData, - sendSize*sizeof(T), - tag, - comm - ) - ) - { - FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendSize*sizeof(T)) - << Foam::abort(FatalError); - } - } - } - - - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ - - if (wait) - { - UPstream::waitRequests(startOfRequests); - } + return + ( + (max_bytes == 0) // ie, unlimited + ? (std::size_t(0)) // + : (max_bytes > std::size_t(INT_MAX)) // MPI limit is + ? (std::size_t(INT_MAX) / sizeof(Type)) // + : (max_bytes > sizeof(Type)) // require an integral number + ? (max_bytes / sizeof(Type)) // + : (std::size_t(1)) // min of one element + ); } -// Looks like Pstream::exchangeContainer -template -void do_exchangeContainer +//- Upper limit on number of transfer bytes. +// Max bytes is normally INT_MAX since MPI sizes are limited to . +// Negative values indicate a subtraction from INT_MAX. +inline std::size_t PstreamDetail_maxTransferBytes ( - const Container& sendData, - const label recvSize, - Container& recvData, - const int tag, - const label comm, - const bool wait -) + const int64_t max_bytes +) noexcept { - const label startOfRequests = UPstream::nRequests(); - - // Set up receives - // ~~~~~~~~~~~~~~~ - - // for (const int proci : Pstream::allProcs(comm)) - { - if (!Pstream::master(comm) && recvSize > 0) - // if (proci != Pstream::myProcNo(comm) && recvSize > 0) - { - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - UPstream::myProcNo(comm), // proci, - recvData.data_bytes(), - recvSize*sizeof(T), - tag, - comm - ); - } - } - - - // Set up sends - // ~~~~~~~~~~~~ - - if (Pstream::master(comm) && sendData.size() > 0) - { - for (const int proci : Pstream::subProcs(comm)) - { - if - ( - !UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - proci, - sendData.cdata_bytes(), - sendData.size_bytes(), - tag, - comm - ) - ) - { - FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendData.size_bytes()) - << Foam::abort(FatalError); - } - } - } - - // Wait for all to finish - // ~~~~~~~~~~~~~~~~~~~~~~ - - if (wait) - { - UPstream::waitRequests(startOfRequests); - } + return + ( + (max_bytes < 0) // (numBytes fewer than INT_MAX) + ? std::size_t(INT_MAX + max_bytes) + : std::size_t(max_bytes) + ); } -template +template void broadcast_chunks ( Container& sendData, const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm, - const bool wait = true + const label comm = UPstream::worldComm + const int64_t maxComms_bytes = UPstream::maxCommsSize ) { // OR static_assert(is_contiguous::value, "Contiguous data only!") - if (!is_contiguous::value) + if (!is_contiguous::value) { FatalErrorInFunction - << "Contiguous data only." << sizeof(T) << Foam::abort(FatalError); + << "Contiguous data only." << sizeof(Type) + << Foam::abort(FatalError); } - if (UPstream::maxCommsSize <= 0) + if (maxComms_bytes == 0) { // Do in one go Info<< "send " << sendData.size() << " elements in one go" << endl; @@ -227,93 +114,90 @@ void broadcast_chunks sendData.resize_nocopy(recvSize); // A no-op on master - // Determine the number of chunks to send. Note that we - // only have to look at the sending data since we are - // guaranteed that some processor's sending size is some other - // processor's receive size. Also we can ignore any local comms. - // We need to send chunks so the number of iterations: - // maxChunkSize iterations - // ------------ ---------- - // 0 0 - // 1..maxChunkSize 1 - // maxChunkSize+1..2*maxChunkSize 2 - // ... - - const label maxChunkSize + // The chunk size (number of elements) corresponding to max byte transfer + // Is zero for non-chunked exchanges. + const std::size_t chunkSize ( - max + PstreamDetail_maxTransferCount ( - static_cast