Merge branch 'update-Pstream-large-sends' into 'develop'

ENH: improve handling of multi-pass send/recv

See merge request Development/openfoam!680
This commit is contained in:
Andrew Heather 2024-05-07 14:29:14 +00:00
commit 4fb85d1f76
24 changed files with 730 additions and 793 deletions

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -44,175 +44,62 @@ Description
using namespace Foam;
// Looks like Pstream::exchangeBuf
template<class T>
void do_exchangeBuf
//- Number of elements corresponding to max byte transfer.
// Normal upper limit is INT_MAX since MPI sizes are limited to <int>.
template<class Type>
inline std::size_t maxTransferCount
(
const label sendSize,
const char* sendData,
const label recvSize,
char* recvData,
const int tag,
const label comm,
const bool wait
)
const std::size_t max_bytes = std::size_t(0)
) noexcept
{
const label startOfRequests = UPstream::nRequests();
// Set up receives
// ~~~~~~~~~~~~~~~
// forAll(recvSizes, proci)
{
// if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0)
if (!Pstream::master(comm) && recvSize > 0)
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
UPstream::myProcNo(comm), // proci,
recvData,
recvSize*sizeof(T),
tag,
comm
);
}
}
// Set up sends
// ~~~~~~~~~~~~
// forAll(sendBufs, proci)
for (const int proci : Pstream::subProcs(comm))
{
if (sendSize > 0)
// if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0)
{
if
(
!UOPstream::write
(
UPstream::commsTypes::nonBlocking,
proci,
sendData,
sendSize*sizeof(T),
tag,
comm
)
)
{
FatalErrorInFunction
<< "Cannot send outgoing message. "
<< "to:" << proci << " nBytes:"
<< label(sendSize*sizeof(T))
<< Foam::abort(FatalError);
}
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
if (wait)
{
UPstream::waitRequests(startOfRequests);
}
return
(
(max_bytes == 0) // ie, unlimited
? (std::size_t(0)) //
: (max_bytes > std::size_t(INT_MAX)) // MPI limit is <int>
? (std::size_t(INT_MAX) / sizeof(Type)) //
: (max_bytes > sizeof(Type)) // require an integral number
? (max_bytes / sizeof(Type)) //
: (std::size_t(1)) // min of one element
);
}
// Looks like Pstream::exchangeContainer
template<class Container, class T>
void do_exchangeContainer
//- Upper limit on number of transfer bytes.
// Max bytes is normally INT_MAX since MPI sizes are limited to <int>.
// Negative values indicate a subtraction from INT_MAX.
inline std::size_t PstreamDetail_maxTransferBytes
(
const Container& sendData,
const label recvSize,
Container& recvData,
const int tag,
const label comm,
const bool wait
)
const int64_t max_bytes
) noexcept
{
const label startOfRequests = UPstream::nRequests();
// Set up receives
// ~~~~~~~~~~~~~~~
// for (const int proci : Pstream::allProcs(comm))
{
if (!Pstream::master(comm) && recvSize > 0)
// if (proci != Pstream::myProcNo(comm) && recvSize > 0)
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
UPstream::myProcNo(comm), // proci,
recvData.data_bytes(),
recvSize*sizeof(T),
tag,
comm
);
}
}
// Set up sends
// ~~~~~~~~~~~~
if (Pstream::master(comm) && sendData.size() > 0)
{
for (const int proci : Pstream::subProcs(comm))
{
if
(
!UOPstream::write
(
UPstream::commsTypes::nonBlocking,
proci,
sendData.cdata_bytes(),
sendData.size_bytes(),
tag,
comm
)
)
{
FatalErrorInFunction
<< "Cannot send outgoing message. "
<< "to:" << proci << " nBytes:"
<< label(sendData.size_bytes())
<< Foam::abort(FatalError);
}
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
if (wait)
{
UPstream::waitRequests(startOfRequests);
}
return
(
(max_bytes < 0) // (numBytes fewer than INT_MAX)
? std::size_t(INT_MAX + max_bytes)
: std::size_t(max_bytes)
);
}
template<class Container, class T>
template<class Container, class Type>
void broadcast_chunks
(
Container& sendData,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool wait = true
const label comm = UPstream::worldComm
const int64_t maxComms_bytes = UPstream::maxCommsSize
)
{
// OR static_assert(is_contiguous<T>::value, "Contiguous data only!")
if (!is_contiguous<T>::value)
if (!is_contiguous<Type>::value)
{
FatalErrorInFunction
<< "Contiguous data only." << sizeof(T) << Foam::abort(FatalError);
<< "Contiguous data only." << sizeof(Type)
<< Foam::abort(FatalError);
}
if (UPstream::maxCommsSize <= 0)
if (maxComms_bytes == 0)
{
// Do in one go
Info<< "send " << sendData.size() << " elements in one go" << endl;
@ -227,93 +114,90 @@ void broadcast_chunks
sendData.resize_nocopy(recvSize); // A no-op on master
// Determine the number of chunks to send. Note that we
// only have to look at the sending data since we are
// guaranteed that some processor's sending size is some other
// processor's receive size. Also we can ignore any local comms.
// We need to send chunks so the number of iterations:
// maxChunkSize iterations
// ------------ ----------
// 0 0
// 1..maxChunkSize 1
// maxChunkSize+1..2*maxChunkSize 2
// ...
const label maxChunkSize
// The chunk size (number of elements) corresponding to max byte transfer
// Is zero for non-chunked exchanges.
const std::size_t chunkSize
(
max
PstreamDetail_maxTransferCount<Type>
(
static_cast<label>(1),
static_cast<label>(UPstream::maxCommsSize/sizeof(T))
PstreamDetail_maxTransferBytes(maxComms_bytes)
)
);
label nChunks(0);
{
// Get max send count (elements)
// forAll(sendBufs, proci)
// {
// if (proci != Pstream::myProcNo(comm))
// {
// nChunks = max(nChunks, sendBufs[proci].size());
// }
// }
nChunks = sendSize;
if (chunkSize)
{
// Convert from send count (elements) to number of chunks.
// Can normally calculate with (count-1), but add some safety
if (nChunks)
{
nChunks = 1 + (nChunks/maxChunkSize);
}
reduce(nChunks, maxOp<label>(), tag, comm);
label nChunks = 1 + (sendSize/label(chunkSize));
Info
<< "send " << sendSize << " elements ("
<< (sendSize*sizeof(T)) << " bytes) in " << nChunks
<< " chunks of " << maxChunkSize << " elements ("
<< (maxChunkSize*sizeof(T)) << " bytes) for maxCommsSize:"
<< Pstream::maxCommsSize
<< (sendSize*sizeof(Type)) << " bytes) in " << nChunks
<< " chunks of " << label(chunkSize) << " elements ("
<< label(chunkSize*sizeof(Type)) << " bytes) for maxCommsSize:"
<< label(maxComms_bytes)
<< endl;
}
// stress-test with shortened sendSize
// will produce useless loops, but no calls
// sendSize /= 2;
label nSend(0);
label startSend(0);
char* charPtrSend;
typedef stdFoam::span<Type> sendType;
for (label iter = 0; iter < nChunks; ++iter)
do
{
nSend = min
(
maxChunkSize,
sendSize-startSend
);
sendType payload(sendData.data(), sendData.size());
charPtrSend =
(
nSend > 0
? reinterpret_cast<char*>(&(sendData[startSend]))
: nullptr
);
Info<< "iter " << iter
<< ": beg=" << startSend << " len=" << nSend
<< " (" << (nSend*sizeof(T)) << " bytes)" << endl;
UPstream::broadcast(charPtrSend, nSend*sizeof(T), comm);
// forAll(nSend, proci)
if (!chunkSize)
{
startSend += nSend;
UPstream::broadcast
(
payload.data_bytes(),
payload.size_bytes(),
comm
);
break;
}
// Dispatch chunk-wise until there is nothing left
for (int iter = 0; /*true*/; ++iter)
{
// The begin/end for the data window
const std::size_t beg = (std::size_t(iter)*chunkSize);
const std::size_t end = (std::size_t(iter+1)*chunkSize);
if (payload.size() <= beg)
{
// No more data windows
break;
}
sendType window
(
(end < payload.size())
? payload.subspan(beg, end - beg)
: payload.subspan(beg)
);
Info<< "iter " << iter
<< ": beg=" << label(beg) << " len=" << label(window.size())
<< " (" << label(window.size_bytes()) << " bytes)" << endl;
UPstream::broadcast
(
window.data_bytes(),
window.size_bytes(),
comm
);
}
}
while (false);
Info<< "final: " << startSend << endl;
Info<< "final" << endl;
}
@ -333,7 +217,7 @@ int main(int argc, char *argv[])
}
labelList input1;
if (Pstream::master())
if (UPstream::master())
{
input1 = identity(500);
}
@ -348,7 +232,7 @@ int main(int argc, char *argv[])
// Mostly the same with PstreamBuffers
if (false)
{
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
PstreamBuffers pBufs;
labelList sendData;
if (Pstream::master())

View File

@ -130,7 +130,7 @@ int main(int argc, char *argv[])
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
std::pair<int, int> probed =
std::pair<int, int64_t> probed =
UPstream::probeMessage
(
UPstream::commsTypes::nonBlocking,
@ -143,8 +143,8 @@ int main(int argc, char *argv[])
{
// Message found and had size: receive it
const label proci = probed.first;
const label count = probed.second;
const label proci(probed.first);
const label count(probed.second);
recvBufs(proci).resize_nocopy(count);
recvFromProc(recvRequests.size()) = proci;

View File

@ -119,7 +119,7 @@ int main(int argc, char *argv[])
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
std::pair<int, int> probed =
std::pair<int, int64_t> probed =
UPstream::probeMessage
(
UPstream::commsTypes::nonBlocking,
@ -132,8 +132,8 @@ int main(int argc, char *argv[])
{
// Message found and had size: receive it
const label proci = probed.first;
const label count = probed.second;
const label proci(probed.first);
const label count(probed.second);
if (optNonBlocking)
{

View File

@ -1,7 +1,7 @@
/*--------------------------------*- C++ -*----------------------------------*\
| ========= | |
| \\ / F ield | OpenFOAM: The Open Source CFD Toolbox |
| \\ / O peration | Version: v2312 |
| \\ / O peration | Version: v2406 |
| \\ / A nd | Website: www.openfoam.com |
| \\/ M anipulation | |
\*---------------------------------------------------------------------------*/
@ -146,13 +146,18 @@ OptimisationSwitches
// The default and minimum is (20000000).
mpiBufferSize 0;
// Optional max size (bytes) for unstructured data exchanges. In some
// phases of OpenFOAM it can send over very large data chunks
// Optional max size (bytes) for unstructured data exchanges.
// In some phases of OpenFOAM it can send over very large data chunks
// (e.g. in parallel load balancing) and some MPI implementations have
// problems with this. Setting this variable > 0 indicates that the
// data exchange needs to be done in multiple passes, each of maxCommsSize.
// This is not switched on by default since it requires an additional
// global reduction, even if multi-pass is not needed)
// problems with this.
//
// This tuning parameter specifies the max number of bytes before
// switching to a multi-pass send/recv
// (currently only affects PstreamBuffers exchanges).
//
// 0 : disabled
// >0 : limit exchanges to specified number of bytes
// <0 : limit exchanges to INT_MAX minus specified number of bytes
maxCommsSize 0;
// Optional (experimental) feature in lduMatrixUpdate

View File

@ -77,7 +77,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
if (debug)
{
Pout<< "UIPstream::UIPstream PstreamBuffers :"
Perr<< "UIPstream::UIPstream PstreamBuffers :"
<< " fromProcNo:" << fromProcNo_
<< " tag:" << tag_ << " comm:" << comm_
<< " receive buffer size:" << messageSize_

View File

@ -78,7 +78,7 @@ void Foam::Pstream::combineGather
if (debug & 2)
{
Pout<< " received from "
Perr<< " received from "
<< belowID << " data:" << received << endl;
}
@ -98,7 +98,7 @@ void Foam::Pstream::combineGather
if (debug & 2)
{
Pout<< " received from "
Perr<< " received from "
<< belowID << " data:" << received << endl;
}
@ -111,7 +111,7 @@ void Foam::Pstream::combineGather
{
if (debug & 2)
{
Pout<< " sending to " << myComm.above()
Perr<< " sending to " << myComm.above()
<< " data:" << value << endl;
}
@ -190,7 +190,7 @@ void Foam::Pstream::listCombineGather
if (debug & 2)
{
Pout<< " received from "
Perr<< " received from "
<< belowID << " data:" << received << endl;
}
@ -213,7 +213,7 @@ void Foam::Pstream::listCombineGather
if (debug & 2)
{
Pout<< " received from "
Perr<< " received from "
<< belowID << " data:" << received << endl;
}
@ -229,7 +229,7 @@ void Foam::Pstream::listCombineGather
{
if (debug & 2)
{
Pout<< " sending to " << myComm.above()
Perr<< " sending to " << myComm.above()
<< " data:" << values << endl;
}
@ -306,7 +306,7 @@ void Foam::Pstream::mapCombineGather
if (debug & 2)
{
Pout<< " received from "
Perr<< " received from "
<< belowID << " data:" << received << endl;
}
@ -337,7 +337,7 @@ void Foam::Pstream::mapCombineGather
{
if (debug & 2)
{
Pout<< " sending to " << myComm.above()
Perr<< " sending to " << myComm.above()
<< " data:" << values << endl;
}

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2023 OpenCFD Ltd.
Copyright (C) 2023-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -82,7 +82,7 @@ void exchangeConsensus
{
buf.clear();
}
recvSizes = Zero;
recvSizes = Foam::zero{};
if (!UPstream::is_rank(comm))
{
@ -109,7 +109,7 @@ void exchangeConsensus
recvBufs[myProci] = sendBufs[myProci];
if (myProci < recvSizes.size())
{
recvSizes[myProci] = recvBufs.size();
recvSizes[myProci] = recvBufs[myProci].size();
}
}
@ -175,7 +175,7 @@ void exchangeConsensus
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
std::pair<int, int> probed =
std::pair<int, int64_t> probed =
UPstream::probeMessage
(
UPstream::commsTypes::nonBlocking,
@ -189,8 +189,8 @@ void exchangeConsensus
// Message found and had size.
// - receive into dest buffer location
const label proci = probed.first;
const label count = (probed.second / sizeof(Type));
const label proci(probed.first);
const label count(probed.second / sizeof(Type));
auto& recvData = recvBufs[proci];
recvData.resize(count); // OK with resize() instead of _nocopy()
@ -254,10 +254,10 @@ void exchangeConsensus
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
// TDB: const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);
// Initial: clear all receive locations
// Preferrable to clear out the map entries instead of the map itself
@ -300,7 +300,12 @@ void exchangeConsensus
// Setup sends
// ------------------------------------------------------------------------
// TDB: initialBarrier ...
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere
if (initialBarrier)
{
UPstream::barrier(comm);
}
// Algorithm NBX: Nonblocking consensus with Map (HashTable) containers
@ -347,7 +352,7 @@ void exchangeConsensus
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
std::pair<int, int> probed =
std::pair<int, int64_t> probed =
UPstream::probeMessage
(
UPstream::commsTypes::nonBlocking,
@ -361,8 +366,8 @@ void exchangeConsensus
// Message found and had size.
// - receive into dest buffer location
const label proci = probed.first;
const label count = (probed.second / sizeof(Type));
const label proci(probed.first);
const label count(probed.second / sizeof(Type));
auto& recvData = recvBufs(proci);
recvData.resize(count); // OK with resize() instead of _nocopy()

View File

@ -107,7 +107,7 @@ void Foam::Pstream::gatherList
if (debug & 2)
{
Pout<< " received through "
Perr<< " received through "
<< belowID << " data from:" << belowID
<< " data:" << values[belowID] << endl;
}
@ -119,7 +119,7 @@ void Foam::Pstream::gatherList
if (debug & 2)
{
Pout<< " received through "
Perr<< " received through "
<< belowID << " data from:" << leafID
<< " data:" << values[leafID] << endl;
}
@ -136,7 +136,7 @@ void Foam::Pstream::gatherList
if (debug & 2)
{
Pout<< " sending to " << myComm.above()
Perr<< " sending to " << myComm.above()
<< " data from me:" << myProci
<< " data:" << values[myProci] << endl;
}
@ -177,7 +177,7 @@ void Foam::Pstream::gatherList
{
if (debug & 2)
{
Pout<< " sending to "
Perr<< " sending to "
<< myComm.above() << " data from:" << leafID
<< " data:" << values[leafID] << endl;
}
@ -259,7 +259,7 @@ void Foam::Pstream::scatterList
if (debug & 2)
{
Pout<< " received through "
Perr<< " received through "
<< myComm.above() << " data for:" << leafID
<< " data:" << values[leafID] << endl;
}
@ -310,7 +310,7 @@ void Foam::Pstream::scatterList
if (debug & 2)
{
Pout<< " sent through "
Perr<< " sent through "
<< belowID << " data for:" << leafID
<< " data:" << values[leafID] << endl;
}

View File

@ -61,8 +61,8 @@ void reduce
{
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
Pout<< "** reducing:" << value << " with comm:" << comm << endl;
error::printStack(Pout);
Perr<< "** reducing:" << value << " with comm:" << comm << endl;
error::printStack(Perr);
}
Pstream::gather(value, bop, tag, comm);
Pstream::broadcast(value, comm);

View File

@ -257,7 +257,7 @@ Foam::UIPstreamBase::~UIPstreamBase()
{
if (debug)
{
Pout<< "UIPstreamBase Destructor : tag:" << tag_
Perr<< "UIPstreamBase Destructor : tag:" << tag_
<< " fromProcNo:" << fromProcNo_
<< " clearing receive buffer of size "
<< recvBuf_.size()

View File

@ -221,7 +221,7 @@ void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
if (debug)
{
Pout<< "UPstream::setParRun :"
Perr<< "UPstream::setParRun :"
<< " nProcs:" << nProcs
<< " haveThreads:" << haveThreads
<< endl;
@ -274,7 +274,7 @@ Foam::label Foam::UPstream::allocateCommunicator
if (debug)
{
Pout<< "Allocating communicator " << index << nl
Perr<< "Allocating communicator " << index << nl
<< " parent : " << parentIndex << nl
<< " procs : " << subRanks << nl
<< endl;
@ -335,7 +335,7 @@ Foam::label Foam::UPstream::allocateCommunicator
if (debug)
{
Pout<< "Allocating communicator " << index << nl
Perr<< "Allocating communicator " << index << nl
<< " parent : " << parentIndex << nl
<< " procs : " << flatOutput(subRanks) << nl
<< endl;
@ -492,7 +492,7 @@ bool Foam::UPstream::allocateHostCommunicatorPairs()
if (debug)
{
Pout<< "Allocating host communicators "
Perr<< "Allocating host communicators "
<< interHostComm_ << ", " << intraHostComm_ << nl
<< " parent : " << parentCommunicator << nl
<< endl;
@ -588,7 +588,7 @@ void Foam::UPstream::freeCommunicator
if (debug)
{
Pout<< "Communicators : Freeing communicator " << communicator
Perr<< "Communicators : Freeing communicator " << communicator
<< " parent: " << parentComm_[communicator]
<< " myProcNo: " << myProcNo_[communicator]
<< endl;

View File

@ -565,14 +565,14 @@ public:
//- Probe for an incoming message.
//
// \param commsType Blocking or not
// \param commsType Non-blocking or not
// \param fromProcNo The source rank (negative == ANY_SOURCE)
// \param tag The source message tag
// \param communicator The communicator index
//
// \returns source rank and message size (bytes)
// and (-1, 0) on failure
static std::pair<int,int> probeMessage
static std::pair<int,int64_t> probeMessage
(
const UPstream::commsTypes commsType,
const int fromProcNo,

View File

@ -145,8 +145,8 @@ Foam::OSstream& Foam::messageStream::masterStream(const label communicator)
{
if (UPstream::warnComm >= 0 && communicator != UPstream::warnComm)
{
Pout<< "** messageStream with comm:" << communicator << endl;
error::printStack(Pout);
Perr<< "** messageStream with comm:" << communicator << endl;
error::printStack(Perr);
}
if (communicator == UPstream::worldComm || UPstream::master(communicator))

View File

@ -2049,6 +2049,8 @@ void Foam::argList::parse
Info<< "Pstream initialized with:" << nl
<< " floatTransfer : "
<< Switch::name(UPstream::floatTransfer) << nl
<< " maxCommsSize : "
<< UPstream::maxCommsSize << nl
<< " nProcsSimpleSum : "
<< UPstream::nProcsSimpleSum << nl
<< " nonBlockingExchange: "

View File

@ -146,15 +146,17 @@ static void broadcastFile_single
const uint64_t maxChunkSize =
(
UPstream::maxCommsSize > 0
(UPstream::maxCommsSize > 0)
? uint64_t(UPstream::maxCommsSize)
: uint64_t(pTraits<int>::max)
: (UPstream::maxCommsSize < 0) // (numBytes fewer than INT_MAX)
? uint64_t(INT_MAX + UPstream::maxCommsSize)
: uint64_t(INT_MAX) // MPI limit is <int>
);
while (fileLength > 0)
{
const uint64_t sendSize = min(fileLength, maxChunkSize);
const uint64_t sendSize = std::min(fileLength, maxChunkSize);
fileLength -= sendSize;
// Read file contents into a character buffer

View File

@ -91,7 +91,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
{}
std::pair<int,int>
std::pair<int,int64_t>
Foam::UPstream::probeMessage
(
const UPstream::commsTypes commsType,
@ -100,7 +100,7 @@ Foam::UPstream::probeMessage
const label communicator
)
{
return std::pair<int,int>(-1, 0);
return std::pair<int,int64_t>(-1, 0);
}

View File

@ -60,7 +60,7 @@ void Foam::UIPBstream::bufferIPCrecv()
if (UPstream::debug)
{
Pout<< "UOPBstream IPC read buffer :"
Perr<< "UOPBstream IPC read buffer :"
<< " root:" << fromProcNo_
<< " comm:" << comm_
<< " probed size:" << label(bufSize)

View File

@ -68,17 +68,17 @@ static std::streamsize UPstream_mpi_receive
if (UPstream::warnComm >= 0 && communicator != UPstream::warnComm)
{
Pout<< "UIPstream::read : starting read from:" << fromProcNo
Perr<< "UIPstream::read : starting read from:" << fromProcNo
<< " size:" << label(bufSize)
<< " tag:" << tag << " comm:" << communicator
<< " commsType:" << UPstream::commsTypeNames[commsType]
<< " warnComm:" << UPstream::warnComm
<< Foam::endl;
error::printStack(Pout);
error::printStack(Perr);
}
else if (UPstream::debug)
{
Pout<< "UIPstream::read : starting read from:" << fromProcNo
Perr<< "UIPstream::read : starting read from:" << fromProcNo
<< " size:" << label(bufSize)
<< " tag:" << tag << " comm:" << communicator
<< " commsType:" << UPstream::commsTypeNames[commsType]
@ -123,7 +123,7 @@ static std::streamsize UPstream_mpi_receive
}
else if (UPstream::debug)
{
Pout<< "UIPstream::read : finished recv from:"
Perr<< "UIPstream::read : finished recv from:"
<< fromProcNo
<< " size:" << label(bufSize) << " tag:" << tag
<< Foam::endl;
@ -198,7 +198,7 @@ static std::streamsize UPstream_mpi_receive
if (UPstream::debug)
{
Pout<< "UIPstream::read : started non-blocking recv from:"
Perr<< "UIPstream::read : started non-blocking recv from:"
<< fromProcNo
<< " size:" << label(bufSize) << " tag:" << tag
<< " request:" <<
@ -225,7 +225,7 @@ void Foam::UIPstream::bufferIPCrecv()
// Called by constructor
if (UPstream::debug)
{
Pout<< "UIPstream IPC read buffer :"
Perr<< "UIPstream IPC read buffer :"
<< " from:" << fromProcNo_
<< " tag:" << tag_ << " comm:" << comm_
<< " wanted size:" << recvBuf_.capacity()
@ -291,7 +291,7 @@ void Foam::UIPstream::bufferIPCrecv()
if (UPstream::debug)
{
Pout<< "UIPstream::UIPstream : probed size:"
Perr<< "UIPstream::UIPstream : probed size:"
<< label(count) << Foam::endl;
}

View File

@ -74,17 +74,17 @@ bool Foam::UOPstream::write
if (UPstream::warnComm >= 0 && communicator != UPstream::warnComm)
{
Pout<< "UOPstream::write : starting write to:" << toProcNo
Perr<< "UOPstream::write : starting write to:" << toProcNo
<< " size:" << label(bufSize)
<< " tag:" << tag << " comm:" << communicator
<< " commType:" << UPstream::commsTypeNames[commsType]
<< " warnComm:" << UPstream::warnComm
<< Foam::endl;
error::printStack(Pout);
error::printStack(Perr);
}
else if (UPstream::debug)
{
Pout<< "UOPstream::write : starting write to:" << toProcNo
Perr<< "UOPstream::write : starting write to:" << toProcNo
<< " size:" << label(bufSize)
<< " tag:" << tag << " comm:" << communicator
<< " commType:" << UPstream::commsTypeNames[commsType]
@ -114,7 +114,7 @@ bool Foam::UOPstream::write
if (UPstream::debug)
{
Pout<< "UOPstream::write : finished buffered send to:"
Perr<< "UOPstream::write : finished buffered send to:"
<< toProcNo
<< " size:" << label(bufSize) << " tag:" << tag
<< Foam::endl;
@ -152,7 +152,7 @@ bool Foam::UOPstream::write
if (UPstream::debug)
{
Pout<< "UOPstream::write : finished send to:"
Perr<< "UOPstream::write : finished send to:"
<< toProcNo
<< " size:" << label(bufSize) << " tag:" << tag
<< Foam::endl;
@ -191,7 +191,7 @@ bool Foam::UOPstream::write
if (UPstream::debug)
{
Pout<< "UOPstream::write : started non-blocking send to:"
Perr<< "UOPstream::write : started non-blocking send to:"
<< toProcNo
<< " size:" << label(bufSize) << " tag:" << tag
<< " request:" <<

View File

@ -90,13 +90,13 @@ static void attachOurBuffers()
if (Foam::UPstream::debug)
{
Foam::Pout<< "UPstream::init : buffer-size " << len << '\n';
Foam::Perr<< "UPstream::init : buffer-size " << len << '\n';
}
}
else
{
delete[] buf;
Foam::Pout<< "UPstream::init : could not attach buffer\n";
Foam::Perr<< "UPstream::init : could not attach buffer\n";
}
#endif
}
@ -171,7 +171,7 @@ bool Foam::UPstream::initNull()
{
if (UPstream::debug)
{
Pout<< "UPstream::initNull : was already initialized\n";
Perr<< "UPstream::initNull : was already initialized\n";
}
}
else
@ -229,7 +229,7 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
}
else if (UPstream::debug)
{
Pout<< "UPstream::init : was already initialized\n";
Perr<< "UPstream::init : was already initialized\n";
}
}
else
@ -283,7 +283,7 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
if (UPstream::debug)
{
Pout<< "UPstream::init :"
Perr<< "UPstream::init :"
<< " thread-support : requested:" << needsThread
<< " obtained:"
<< (
@ -390,7 +390,7 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
&subRank
);
Pout<< "UPstream::init : in world:" << world
Perr<< "UPstream::init : in world:" << world
<< " using local communicator:" << subComm
<< " rank " << subRank
<< " of " << subNumProcs
@ -436,7 +436,7 @@ void Foam::UPstream::shutdown(int errNo)
}
else if (UPstream::debug && errNo == 0)
{
Pout<< "UPstream::shutdown : was already finalized\n";
Perr<< "UPstream::shutdown : was already finalized\n";
}
ourMpi = false;
return;
@ -465,7 +465,7 @@ void Foam::UPstream::shutdown(int errNo)
if (UPstream::debug)
{
Pout<< "UPstream::shutdown\n";
Perr<< "UPstream::shutdown\n";
}
// Check for any outstanding requests
@ -691,7 +691,7 @@ void Foam::UPstream::freeCommunicatorComponents(const label index)
{
if (UPstream::debug)
{
Pout<< "freeCommunicatorComponents: " << index
Perr<< "freeCommunicatorComponents: " << index
<< " from " << PstreamGlobals::MPICommunicators_.size() << endl;
}
@ -766,7 +766,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
}
std::pair<int,int>
std::pair<int,int64_t>
Foam::UPstream::probeMessage
(
const UPstream::commsTypes commsType,
@ -775,7 +775,7 @@ Foam::UPstream::probeMessage
const label communicator
)
{
std::pair<int,int> result(-1, 0);
std::pair<int,int64_t> result(-1, 0);
// No-op for non-parallel or not on communicator
if (!UPstream::parRun() || !UPstream::is_rank(communicator))
@ -869,7 +869,7 @@ Foam::UPstream::probeMessage
result.first = status.MPI_SOURCE;
result.second = int(count);
result.second = int64_t(count);
}
return result;

View File

@ -49,16 +49,16 @@ bool Foam::UPstream::broadcast
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
Pout<< "UPstream::broadcast : root:" << rootProcNo
Perr<< "UPstream::broadcast : root:" << rootProcNo
<< " comm:" << comm
<< " size:" << label(bufSize)
<< " warnComm:" << UPstream::warnComm
<< Foam::endl;
error::printStack(Pout);
error::printStack(Perr);
}
else if (UPstream::debug)
{
Pout<< "UPstream::broadcast : root:" << rootProcNo
Perr<< "UPstream::broadcast : root:" << rootProcNo
<< " comm:" << comm
<< " size:" << label(bufSize)
<< Foam::endl;

View File

@ -291,7 +291,7 @@ void Foam::UPstream::waitRequests(const label pos, label len)
if (UPstream::debug)
{
Pout<< "UPstream::waitRequests : starting wait for "
Perr<< "UPstream::waitRequests : starting wait for "
<< count << " requests starting at " << pos << endl;
}
@ -328,7 +328,7 @@ void Foam::UPstream::waitRequests(const label pos, label len)
if (UPstream::debug)
{
Pout<< "UPstream::waitRequests : finished wait." << endl;
Perr<< "UPstream::waitRequests : finished wait." << endl;
}
}
@ -409,7 +409,7 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len)
if (UPstream::debug)
{
Pout<< "UPstream::waitAnyRequest : starting wait for any of "
Perr<< "UPstream::waitAnyRequest : starting wait for any of "
<< count << " requests starting at " << pos << endl;
}
@ -470,7 +470,7 @@ bool Foam::UPstream::waitSomeRequests
if (UPstream::debug)
{
Pout<< "UPstream:waitSomeRequest : starting wait for some of "
Perr<< "UPstream:waitSomeRequest : starting wait for some of "
<< count << " requests starting at " << pos << endl;
}
@ -563,7 +563,7 @@ bool Foam::UPstream::waitSomeRequests
if (UPstream::debug)
{
Pout<< "UPstream:waitSomeRequest : starting wait for some of "
Perr<< "UPstream:waitSomeRequest : starting wait for some of "
<< requests.size() << " requests" << endl;
}
@ -753,7 +753,7 @@ void Foam::UPstream::waitRequest(const label i)
if (UPstream::debug)
{
Pout<< "UPstream::waitRequest : starting wait for request:"
Perr<< "UPstream::waitRequest : starting wait for request:"
<< i << endl;
}
@ -771,7 +771,7 @@ void Foam::UPstream::waitRequest(const label i)
if (UPstream::debug)
{
Pout<< "UPstream::waitRequest : finished wait for request:"
Perr<< "UPstream::waitRequest : finished wait for request:"
<< i << endl;
}
}
@ -823,7 +823,7 @@ bool Foam::UPstream::finishedRequest(const label i)
if (UPstream::debug)
{
Pout<< "UPstream::finishedRequest : check request:"
Perr<< "UPstream::finishedRequest : check request:"
<< i << endl;
}
@ -898,7 +898,7 @@ bool Foam::UPstream::finishedRequests(const label pos, label len)
if (UPstream::debug)
{
Pout<< "UPstream::finishedRequests : check " << count
Perr<< "UPstream::finishedRequests : check " << count
<< " requests starting at " << pos << endl;
}

View File

@ -80,18 +80,18 @@ void Foam::PstreamDetail::reduce0
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
Pout<< "** MPI_Reduce (blocking):";
Perr<< "** MPI_Reduce (blocking):";
if (count == 1)
{
Pout<< (*values);
Perr<< (*values);
}
else
{
Pout<< UList<Type>(values, count);
Perr<< UList<Type>(values, count);
}
Pout<< " with comm:" << comm
Perr<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm << endl;
error::printStack(Pout);
error::printStack(Perr);
}
profilingPstream::beginTiming();
@ -138,23 +138,23 @@ void Foam::PstreamDetail::allReduce
{
if (immediate)
{
Pout<< "** MPI_Iallreduce (non-blocking):";
Perr<< "** MPI_Iallreduce (non-blocking):";
}
else
{
Pout<< "** MPI_Allreduce (blocking):";
Perr<< "** MPI_Allreduce (blocking):";
}
if (count == 1)
{
Pout<< (*values);
Perr<< (*values);
}
else
{
Pout<< UList<Type>(values, count);
Perr<< UList<Type>(values, count);
}
Pout<< " with comm:" << comm
Perr<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm << endl;
error::printStack(Pout);
error::printStack(Perr);
}
@ -245,18 +245,18 @@ void Foam::PstreamDetail::allToAll
{
if (immediate)
{
Pout<< "** MPI_Ialltoall (non-blocking):";
Perr<< "** MPI_Ialltoall (non-blocking):";
}
else
{
Pout<< "** MPI_Alltoall (blocking):";
Perr<< "** MPI_Alltoall (blocking):";
}
Pout<< " numProc:" << numProc
Perr<< " numProc:" << numProc
<< " sendData:" << sendData.size()
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
error::printStack(Perr);
}
if
@ -376,18 +376,18 @@ void Foam::PstreamDetail::allToAllv
{
if (immediate)
{
Pout<< "** MPI_Ialltoallv (non-blocking):";
Perr<< "** MPI_Ialltoallv (non-blocking):";
}
else
{
Pout<< "** MPI_Alltoallv (blocking):";
Perr<< "** MPI_Alltoallv (blocking):";
}
Pout<< " sendCounts:" << sendCounts
Perr<< " sendCounts:" << sendCounts
<< " sendOffsets:" << sendOffsets
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
error::printStack(Perr);
}
if
@ -515,13 +515,13 @@ void Foam::PstreamDetail::allToAllConsensus
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
Pout<< "** non-blocking consensus Alltoall (list):";
Pout<< " numProc:" << numProc
Perr<< "** non-blocking consensus Alltoall (list):";
Perr<< " numProc:" << numProc
<< " sendData:" << sendData.size()
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
error::printStack(Perr);
}
if (sendData.size() != numProc || recvData.size() != numProc)
@ -717,13 +717,13 @@ void Foam::PstreamDetail::allToAllConsensus
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
Pout<< "** non-blocking consensus Alltoall (map):";
Pout<< " numProc:" << numProc
Perr<< "** non-blocking consensus Alltoall (map):";
Perr<< " numProc:" << numProc
<< " sendData:" << sendBufs.size()
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
error::printStack(Perr);
}
// Initial: clear out everything
@ -917,18 +917,18 @@ void Foam::PstreamDetail::gather
{
if (immediate)
{
Pout<< "** MPI_Igather (non-blocking):";
Perr<< "** MPI_Igather (non-blocking):";
}
else
{
Pout<< "** MPI_Gather (blocking):";
Perr<< "** MPI_Gather (blocking):";
}
Pout<< " numProc:" << numProc
Perr<< " numProc:" << numProc
<< " count:" << count
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
error::printStack(Perr);
}
@ -1024,18 +1024,18 @@ void Foam::PstreamDetail::scatter
{
if (immediate)
{
Pout<< "** MPI_Iscatter (non-blocking):";
Perr<< "** MPI_Iscatter (non-blocking):";
}
else
{
Pout<< "** MPI_Scatter (blocking):";
Perr<< "** MPI_Scatter (blocking):";
}
Pout<< " numProc:" << numProc
Perr<< " numProc:" << numProc
<< " count:" << count
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
error::printStack(Perr);
}
@ -1132,19 +1132,19 @@ void Foam::PstreamDetail::gatherv
{
if (immediate)
{
Pout<< "** MPI_Igatherv (non-blocking):";
Perr<< "** MPI_Igatherv (non-blocking):";
}
else
{
Pout<< "** MPI_Gatherv (blocking):";
Perr<< "** MPI_Gatherv (blocking):";
}
Pout<< " np:" << np
Perr<< " np:" << np
<< " recvCounts:" << recvCounts
<< " recvOffsets:" << recvOffsets
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
error::printStack(Perr);
}
if
@ -1274,19 +1274,19 @@ void Foam::PstreamDetail::scatterv
{
if (immediate)
{
Pout<< "** MPI_Iscatterv (non-blocking):";
Perr<< "** MPI_Iscatterv (non-blocking):";
}
else
{
Pout<< "** MPI_Scatterv (blocking):";
Perr<< "** MPI_Scatterv (blocking):";
}
Pout<< " np:" << np
Perr<< " np:" << np
<< " sendCounts:" << sendCounts
<< " sendOffsets:" << sendOffsets
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
error::printStack(Perr);
}
if
@ -1400,17 +1400,17 @@ void Foam::PstreamDetail::allGather
{
if (immediate)
{
Pout<< "** MPI_Iallgather (non-blocking):";
Perr<< "** MPI_Iallgather (non-blocking):";
}
else
{
Pout<< "** MPI_Allgather (blocking):";
Perr<< "** MPI_Allgather (blocking):";
}
Pout<< " numProc:" << UPstream::nProcs(comm)
Perr<< " numProc:" << UPstream::nProcs(comm)
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
error::printStack(Perr);
}