WIP: mapped PstreamBuffers with NBX
ENH: sparse storage and data exchange for PstreamBuffers - change the underlying storage from a numProcs list of buffers to a Map of buffers. The reduced memory footprint on large systems is on aspect but the primary motivation is to more easily support sparse data exchange patterns. The Map storage for PstreamBuffers corresponds to a non-blocking consensus exchange of sizes that automatically propagates through different parts of the code and avoids all-to-all. CONFIG: enable nonBlockingExchange as default (for testing) - this changes the Pstream::exchangeSizes to use NBX instead of all-to-all, even for List containers.
This commit is contained in:
parent
d3867a2d34
commit
00e2d3e4ef
@ -130,7 +130,7 @@ OptimisationSwitches
|
||||
// Number processors to change to tree communication
|
||||
nProcsSimpleSum 0;
|
||||
// Min numProc to use non-blocking exchange algorithm (Hoeffler: NBX)
|
||||
nonBlockingExchange 0;
|
||||
nonBlockingExchange 1;
|
||||
|
||||
// MPI buffer size (bytes)
|
||||
// Can override with the MPI_BUFFER_SIZE env variable.
|
||||
|
@ -27,14 +27,52 @@ License
|
||||
\*---------------------------------------------------------------------------*/
|
||||
|
||||
#include "PstreamBuffers.H"
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
#include "bitSet.H"
|
||||
#endif
|
||||
|
||||
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
|
||||
|
||||
namespace Foam
|
||||
{
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
//- Retrieve size of specified buffer, first checking for existence
|
||||
static inline label getBufferSize
|
||||
(
|
||||
const Map<DynamicList<char>>& buffers,
|
||||
const label proci
|
||||
)
|
||||
{
|
||||
const auto iter = buffers.cfind(proci);
|
||||
|
||||
return (iter.good() ? iter.val().size() : 0);
|
||||
}
|
||||
#else
|
||||
//- Retrieve size of specified buffer, no access checking
|
||||
static inline label getBufferSize
|
||||
(
|
||||
const UList<DynamicList<char>>& buffers,
|
||||
const label proci
|
||||
)
|
||||
{
|
||||
return buffers[proci].size();
|
||||
}
|
||||
#endif
|
||||
|
||||
} // End namespace Foam
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
|
||||
|
||||
void Foam::PstreamBuffers::finalExchange
|
||||
(
|
||||
const bool wait,
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes
|
||||
#else
|
||||
labelList& recvSizes
|
||||
#endif
|
||||
)
|
||||
{
|
||||
// Could also check that it is not called twice
|
||||
@ -43,8 +81,35 @@ void Foam::PstreamBuffers::finalExchange
|
||||
|
||||
if (commsType_ == UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
// Dense storage uses all-to-all
|
||||
Pstream::exchangeSizes(sendBuffers_, recvSizes, comm_);
|
||||
// Use PEX algorithm
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
// PEX stage 1: exchange sizes (non-blocking consensus)
|
||||
Pstream::exchangeSizes
|
||||
(
|
||||
sendBuffers_,
|
||||
recvSizes,
|
||||
tag_,
|
||||
comm_
|
||||
);
|
||||
#else
|
||||
// Like Pstream::exchangeSizes
|
||||
labelList sendSizes(nProcs_);
|
||||
forAll(sendBuffers_, proci)
|
||||
{
|
||||
sendSizes[proci] = sendBuffers_[proci].size();
|
||||
}
|
||||
recvSizes.resize_nocopy(nProcs_);
|
||||
|
||||
// PEX stage 1: exchange sizes (non-blocking consensus)
|
||||
UPstream::allToAllConsensus
|
||||
(
|
||||
sendSizes,
|
||||
recvSizes,
|
||||
(tag_ + 314159), // some unique tag?
|
||||
comm_
|
||||
);
|
||||
#endif
|
||||
|
||||
Pstream::exchange<DynamicList<char>, char>
|
||||
(
|
||||
@ -59,6 +124,7 @@ void Foam::PstreamBuffers::finalExchange
|
||||
}
|
||||
|
||||
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
void Foam::PstreamBuffers::finalExchange
|
||||
(
|
||||
const labelUList& sendProcs,
|
||||
@ -94,14 +160,19 @@ void Foam::PstreamBuffers::finalExchange
|
||||
);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finalExchangeGatherScatter
|
||||
void Foam::PstreamBuffers::finalGatherScatter
|
||||
(
|
||||
const bool isGather,
|
||||
const bool wait,
|
||||
const bool needSizes,
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes
|
||||
#else
|
||||
labelList& recvSizes
|
||||
#endif
|
||||
)
|
||||
{
|
||||
// Could also check that it is not called twice
|
||||
@ -114,10 +185,20 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
|
||||
|
||||
// Only send to master [0]. Master is also allowed to 'send' to itself
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllIters(sendBuffers_, iter)
|
||||
{
|
||||
if (iter.key() != 0)
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
}
|
||||
#else
|
||||
for (label proci=1; proci < sendBuffers_.size(); ++proci)
|
||||
{
|
||||
sendBuffers_[proci].clear();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -133,40 +214,100 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
|
||||
|
||||
if (commsType_ == UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
labelList recvCount;
|
||||
#else
|
||||
labelList& recvCount = recvSizes;
|
||||
#endif
|
||||
|
||||
if (isGather)
|
||||
{
|
||||
// gather mode (all-to-one): master [0] <- everyone
|
||||
|
||||
recvSizes =
|
||||
UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
|
||||
const label nSend = getBufferSize(sendBuffers_, 0);
|
||||
|
||||
recvCount = UPstream::listGatherValues(nSend, comm_);
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
// Transcribe recv count from list to map
|
||||
recvSizes.clear();
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
for (label proci=1; proci < recvCount.size(); ++proci)
|
||||
{
|
||||
if (recvCount[proci] > 0)
|
||||
{
|
||||
recvSizes.insert(proci, recvCount[proci]);
|
||||
}
|
||||
}
|
||||
}
|
||||
#else
|
||||
if (!UPstream::master(comm_))
|
||||
{
|
||||
recvSizes.resize_nocopy(nProcs_);
|
||||
recvSizes = Zero;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
// scatter mode (one-to-all): master [0] -> everyone
|
||||
|
||||
recvSizes.resize_nocopy(nProcs_);
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
recvCount.resize(nProcs_, Zero);
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(sendBuffers_, iter)
|
||||
{
|
||||
recvCount[iter.key()] = iter.val().size();
|
||||
}
|
||||
#else
|
||||
forAll(sendBuffers_, proci)
|
||||
{
|
||||
recvCount[proci] = sendBuffers_[proci].size();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
// Scattering, so non-master sends nothing
|
||||
recvCount = Zero;
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
recvSizes.clear();
|
||||
recvSizes.resize_nocopy(nProcs_);
|
||||
#else
|
||||
recvSizes = Zero;
|
||||
#endif
|
||||
}
|
||||
|
||||
const label nRecv(UPstream::listScatterValues(recvCount, comm_));
|
||||
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
forAll(sendBuffers_, proci)
|
||||
{
|
||||
recvSizes[proci] = sendBuffers_[proci].size();
|
||||
}
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
recvSizes.clear();
|
||||
#else
|
||||
recvSizes = Zero;
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
recvSizes.clear();
|
||||
|
||||
const label myRecv(UPstream::listScatterValues(recvSizes, comm_));
|
||||
|
||||
recvSizes = Zero;
|
||||
recvSizes[0] = myRecv;
|
||||
if (nRecv)
|
||||
{
|
||||
recvSizes.insert(0, nRecv);
|
||||
}
|
||||
#else
|
||||
recvSizes = Zero;
|
||||
recvSizes[0] = nRecv;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Pstream::exchange<DynamicList<char>, char>
|
||||
(
|
||||
sendBuffers_,
|
||||
@ -197,9 +338,18 @@ Foam::PstreamBuffers::PstreamBuffers
|
||||
tag_(tag),
|
||||
comm_(communicator),
|
||||
nProcs_(UPstream::nProcs(comm_)),
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
// Default sizing (128) is probably OK.
|
||||
// Meshes often have 16-20 neighbours (avg) and 100 neighbours (max)
|
||||
sendBuffers_(),
|
||||
recvBuffers_(),
|
||||
recvPositions_()
|
||||
#else
|
||||
sendBuffers_(nProcs_),
|
||||
recvBuffers_(nProcs_),
|
||||
recvPositions_(nProcs_, Zero)
|
||||
#endif
|
||||
{}
|
||||
|
||||
|
||||
@ -207,7 +357,23 @@ Foam::PstreamBuffers::PstreamBuffers
|
||||
|
||||
Foam::PstreamBuffers::~PstreamBuffers()
|
||||
{
|
||||
// Check that all data has been consumed.
|
||||
// Check that all data has been consumed
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(recvBuffers_, iter)
|
||||
{
|
||||
const label proci = iter.key();
|
||||
const label len = iter.val().size();
|
||||
const label pos = recvPositions_.lookup(proci, len);
|
||||
|
||||
if (pos < len)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "Message from processor " << proci
|
||||
<< " Only consumed " << pos << " of " << len << " bytes" << nl
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
}
|
||||
#else
|
||||
forAll(recvBuffers_, proci)
|
||||
{
|
||||
const label pos = recvPositions_[proci];
|
||||
@ -221,6 +387,7 @@ Foam::PstreamBuffers::~PstreamBuffers()
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -231,7 +398,11 @@ Foam::DynamicList<char>& Foam::PstreamBuffers::accessSendBuffer
|
||||
const label proci
|
||||
)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
return sendBuffers_(proci); // Created on demand if needed
|
||||
#else
|
||||
return sendBuffers_[proci];
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -240,13 +411,21 @@ Foam::DynamicList<char>& Foam::PstreamBuffers::accessRecvBuffer
|
||||
const label proci
|
||||
)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
return recvBuffers_(proci); // Created on demand if needed
|
||||
#else
|
||||
return recvBuffers_[proci];
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
return recvPositions_(proci, 0); // Created on demand if needed
|
||||
#else
|
||||
return recvPositions_[proci];
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -254,20 +433,38 @@ Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
|
||||
|
||||
void Foam::PstreamBuffers::clearSends()
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllIters(sendBuffers_, iter)
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
#else
|
||||
for (DynamicList<char>& buf : sendBuffers_)
|
||||
{
|
||||
buf.clear();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::clearRecvs()
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllIters(recvBuffers_, iter)
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
forAllIters(recvPositions_, iter)
|
||||
{
|
||||
iter.val() = 0;
|
||||
}
|
||||
#else
|
||||
for (DynamicList<char>& buf : recvBuffers_)
|
||||
{
|
||||
buf.clear();
|
||||
}
|
||||
recvPositions_ = Zero;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -281,14 +478,41 @@ void Foam::PstreamBuffers::clear()
|
||||
|
||||
void Foam::PstreamBuffers::clearSend(const label proci)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
{
|
||||
auto iter = sendBuffers_.find(proci);
|
||||
if (iter.good())
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
}
|
||||
#else
|
||||
sendBuffers_[proci].clear();
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::clearRecv(const label proci)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
{
|
||||
auto iter = recvBuffers_.find(proci);
|
||||
if (iter.good())
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
}
|
||||
{
|
||||
auto iter = recvPositions_.find(proci);
|
||||
if (iter.good())
|
||||
{
|
||||
iter.val() = 0;
|
||||
}
|
||||
}
|
||||
#else
|
||||
recvBuffers_[proci].clear();
|
||||
recvPositions_[proci] = 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -296,6 +520,21 @@ void Foam::PstreamBuffers::clearStorage()
|
||||
{
|
||||
// Could also clear out entire sendBuffers_, recvBuffers_ and reallocate.
|
||||
// Not sure if it makes much difference
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllIters(sendBuffers_, iter)
|
||||
{
|
||||
iter.val().clearStorage();
|
||||
}
|
||||
forAllIters(recvBuffers_, iter)
|
||||
{
|
||||
iter.val().clearStorage();
|
||||
}
|
||||
forAllIters(recvPositions_, iter)
|
||||
{
|
||||
iter.val() = 0;
|
||||
}
|
||||
#else
|
||||
for (DynamicList<char>& buf : sendBuffers_)
|
||||
{
|
||||
buf.clearStorage();
|
||||
@ -305,6 +544,7 @@ void Foam::PstreamBuffers::clearStorage()
|
||||
buf.clearStorage();
|
||||
}
|
||||
recvPositions_ = Zero;
|
||||
#endif
|
||||
|
||||
finishedSendsCalled_ = false;
|
||||
}
|
||||
@ -312,6 +552,15 @@ void Foam::PstreamBuffers::clearStorage()
|
||||
|
||||
bool Foam::PstreamBuffers::hasSendData() const
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(sendBuffers_, iter)
|
||||
{
|
||||
if (!iter.val().empty())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
#else
|
||||
for (const DynamicList<char>& buf : sendBuffers_)
|
||||
{
|
||||
if (!buf.empty())
|
||||
@ -319,6 +568,7 @@ bool Foam::PstreamBuffers::hasSendData() const
|
||||
return true;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -327,6 +577,18 @@ bool Foam::PstreamBuffers::hasRecvData() const
|
||||
{
|
||||
if (finishedSendsCalled_)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(recvBuffers_, iter)
|
||||
{
|
||||
const label proci = iter.key();
|
||||
const label len = iter.val().size();
|
||||
|
||||
if (recvPositions_.lookup(proci, 0) < len)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
#else
|
||||
forAll(recvBuffers_, proci)
|
||||
{
|
||||
if (recvPositions_[proci] < recvBuffers_[proci].size())
|
||||
@ -334,6 +596,7 @@ bool Foam::PstreamBuffers::hasRecvData() const
|
||||
return true;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#ifdef FULLDEBUG
|
||||
else
|
||||
@ -349,7 +612,7 @@ bool Foam::PstreamBuffers::hasRecvData() const
|
||||
|
||||
Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
|
||||
{
|
||||
return sendBuffers_[proci].size();
|
||||
return getBufferSize(sendBuffers_, proci);
|
||||
}
|
||||
|
||||
|
||||
@ -357,8 +620,16 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
|
||||
{
|
||||
if (finishedSendsCalled_)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
#else
|
||||
const label len
|
||||
(
|
||||
getBufferSize(recvBuffers_, proci)
|
||||
- recvPositions_.lookup(proci, 0)
|
||||
);
|
||||
#else
|
||||
const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
|
||||
|
||||
#endif
|
||||
if (len > 0)
|
||||
{
|
||||
return len;
|
||||
@ -378,10 +649,25 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
|
||||
|
||||
Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
|
||||
{
|
||||
labelList counts(recvPositions_.size(), Zero);
|
||||
labelList counts(nProcs_, Zero);
|
||||
|
||||
if (finishedSendsCalled_)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(recvBuffers_, iter)
|
||||
{
|
||||
const label proci = iter.key();
|
||||
const label len
|
||||
(
|
||||
iter.val().size() - recvPositions_.lookup(proci, 0)
|
||||
);
|
||||
|
||||
if (len > 0)
|
||||
{
|
||||
counts[proci] = len;
|
||||
}
|
||||
}
|
||||
#else
|
||||
forAll(recvBuffers_, proci)
|
||||
{
|
||||
const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
|
||||
@ -391,6 +677,7 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
|
||||
counts[proci] = len;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#ifdef FULLDEBUG
|
||||
else
|
||||
@ -404,20 +691,35 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
|
||||
}
|
||||
|
||||
|
||||
Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount(const label proci) const
|
||||
Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount
|
||||
(
|
||||
const label excludeProci
|
||||
) const
|
||||
{
|
||||
label maxLen = 0;
|
||||
|
||||
if (finishedSendsCalled_)
|
||||
{
|
||||
forAll(recvBuffers_, idx)
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(recvBuffers_, iter)
|
||||
{
|
||||
const label len(recvBuffers_[idx].size() - recvPositions_[idx]);
|
||||
if (idx != proci)
|
||||
const label proci = iter.key();
|
||||
if (excludeProci != proci)
|
||||
{
|
||||
label len(iter.val().size() - recvPositions_.lookup(proci, 0));
|
||||
maxLen = max(maxLen, len);
|
||||
}
|
||||
}
|
||||
#else
|
||||
forAll(recvBuffers_, proci)
|
||||
{
|
||||
if (excludeProci != proci)
|
||||
{
|
||||
label len(recvBuffers_[proci].size() - recvPositions_[proci]);
|
||||
maxLen = max(maxLen, len);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#ifdef FULLDEBUG
|
||||
else
|
||||
@ -449,6 +751,24 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
|
||||
{
|
||||
if (finishedSendsCalled_)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
const auto iter = recvBuffers_.cfind(proci);
|
||||
|
||||
if (iter.good())
|
||||
{
|
||||
const label pos = recvPositions_.lookup(proci, 0);
|
||||
const label len = iter.val().size();
|
||||
|
||||
if (pos < len)
|
||||
{
|
||||
return UList<char>
|
||||
(
|
||||
const_cast<char*>(iter.val().cbegin(pos)),
|
||||
(len - pos)
|
||||
);
|
||||
}
|
||||
}
|
||||
#else
|
||||
const label pos = recvPositions_[proci];
|
||||
const label len = recvBuffers_[proci].size();
|
||||
|
||||
@ -460,6 +780,7 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
|
||||
(len - pos)
|
||||
);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#ifdef FULLDEBUG
|
||||
else
|
||||
@ -483,14 +804,22 @@ bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
|
||||
|
||||
void Foam::PstreamBuffers::finishedSends(const bool wait)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label> recvSizes;
|
||||
#else
|
||||
labelList recvSizes;
|
||||
#endif
|
||||
finalExchange(wait, recvSizes);
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finishedSends
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait
|
||||
)
|
||||
{
|
||||
@ -510,6 +839,7 @@ void Foam::PstreamBuffers::finishedSends
|
||||
}
|
||||
|
||||
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
void Foam::PstreamBuffers::finishedSends
|
||||
(
|
||||
const labelUList& sendProcs,
|
||||
@ -612,16 +942,58 @@ bool Foam::PstreamBuffers::finishedSends
|
||||
|
||||
return changed;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finishedNeighbourSends
|
||||
(
|
||||
const labelUList& neighProcs,
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait
|
||||
)
|
||||
{
|
||||
finishedSends(neighProcs, neighProcs, recvSizes, wait);
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
recvSizes.clear();
|
||||
|
||||
for (const label proci : neighProcs)
|
||||
{
|
||||
recvSizes.insert(proci, 0);
|
||||
}
|
||||
|
||||
// Prune any send buffers that are not neighbours
|
||||
forAllIters(sendBuffers_, iter)
|
||||
{
|
||||
if (!recvSizes.contains(iter.key()))
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
}
|
||||
|
||||
finalExchange(wait, recvSizes);
|
||||
#else
|
||||
// Resize for copying back
|
||||
recvSizes.resize_nocopy(sendBuffers_.size());
|
||||
|
||||
// Prune send buffers that are not neighbours
|
||||
{
|
||||
labelHashSet keepProcs(neighProcs);
|
||||
|
||||
// Prune send buffers that are not neighbours
|
||||
forAll(sendBuffers_, proci)
|
||||
{
|
||||
if (!keepProcs.contains(proci))
|
||||
{
|
||||
sendBuffers_[proci].clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
finalExchange(wait, recvSizes);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -631,31 +1003,65 @@ void Foam::PstreamBuffers::finishedNeighbourSends
|
||||
const bool wait
|
||||
)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
finishedSends(neighProcs, neighProcs, wait);
|
||||
#else
|
||||
labelList recvSizes;
|
||||
|
||||
// Prune send buffers that are not neighbours
|
||||
{
|
||||
labelHashSet keepProcs(neighProcs);
|
||||
|
||||
// Prune send buffers that are not neighbours
|
||||
forAll(sendBuffers_, proci)
|
||||
{
|
||||
if (!keepProcs.contains(proci))
|
||||
{
|
||||
sendBuffers_[proci].clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
finalExchange(wait, recvSizes);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finishedGathers(const bool wait)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label> recvSizes;
|
||||
finalGatherScatter(true, wait, false, recvSizes);
|
||||
#else
|
||||
labelList recvSizes;
|
||||
finalExchangeGatherScatter(true, wait, false, recvSizes);
|
||||
finalGatherScatter(true, wait, false, recvSizes);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finishedScatters(const bool wait)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label> recvSizes;
|
||||
finalGatherScatter(false, wait, false, recvSizes);
|
||||
#else
|
||||
labelList recvSizes;
|
||||
finalExchangeGatherScatter(false, wait, false, recvSizes);
|
||||
finalGatherScatter(false, wait, false, recvSizes);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finishedGathers
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait
|
||||
)
|
||||
{
|
||||
finalExchangeGatherScatter(true, wait, true, recvSizes);
|
||||
finalGatherScatter(true, wait, true, recvSizes);
|
||||
|
||||
if (commsType_ != UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
@ -673,11 +1079,15 @@ void Foam::PstreamBuffers::finishedGathers
|
||||
|
||||
void Foam::PstreamBuffers::finishedScatters
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait
|
||||
)
|
||||
{
|
||||
finalExchangeGatherScatter(false, wait, true, recvSizes);
|
||||
finalGatherScatter(false, wait, true, recvSizes);
|
||||
|
||||
if (commsType_ != UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
|
@ -100,16 +100,27 @@ SourceFiles
|
||||
#define Foam_PstreamBuffers_H
|
||||
|
||||
#include "DynamicList.H"
|
||||
#include "Map.H"
|
||||
#include "UPstream.H"
|
||||
#include "IOstream.H"
|
||||
|
||||
// Transitional
|
||||
#define Foam_PstreamBuffers_map_storage
|
||||
// #define Foam_PstreamBuffers_dense
|
||||
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
#undef Foam_PstreamBuffers_map_storage
|
||||
#endif
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
namespace Foam
|
||||
{
|
||||
|
||||
// Forward Declarations
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
class bitSet;
|
||||
#endif
|
||||
|
||||
/*---------------------------------------------------------------------------*\
|
||||
Class PstreamBuffers Declaration
|
||||
@ -143,6 +154,19 @@ class PstreamBuffers
|
||||
|
||||
// Buffer storage
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
|
||||
//- Send buffers (sparse)
|
||||
Map<DynamicList<char>> sendBuffers_;
|
||||
|
||||
//- Receive buffers (sparse)
|
||||
Map<DynamicList<char>> recvBuffers_;
|
||||
|
||||
//- Current read positions within recvBuffers_ (sparse)
|
||||
Map<label> recvPositions_;
|
||||
|
||||
#else
|
||||
|
||||
//- Send buffers. Size is nProcs()
|
||||
List<DynamicList<char>> sendBuffers_;
|
||||
|
||||
@ -152,6 +176,8 @@ class PstreamBuffers
|
||||
//- Current read positions within recvBuffers_. Size is nProcs()
|
||||
labelList recvPositions_;
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
// Private Member Functions
|
||||
|
||||
@ -160,12 +186,17 @@ class PstreamBuffers
|
||||
void finalExchange
|
||||
(
|
||||
const bool wait,
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes
|
||||
#else
|
||||
labelList& recvSizes
|
||||
#endif
|
||||
);
|
||||
|
||||
//- Mark sends as done.
|
||||
// Only exchange sizes using the sendProcs/recvProcs subset
|
||||
// (nonBlocking comms).
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
void finalExchange
|
||||
(
|
||||
const labelUList& sendProcs,
|
||||
@ -173,6 +204,7 @@ class PstreamBuffers
|
||||
const bool wait,
|
||||
labelList& recvSizes
|
||||
);
|
||||
#endif
|
||||
|
||||
//- For all-to-one or one-to-all
|
||||
void finalExchangeGatherScatter
|
||||
@ -180,7 +212,11 @@ class PstreamBuffers
|
||||
const bool isGather,
|
||||
const bool wait,
|
||||
const bool needSizes, // If recvSizes needed or scratch
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes
|
||||
#else
|
||||
labelList& recvSizes
|
||||
#endif
|
||||
);
|
||||
|
||||
|
||||
@ -340,7 +376,7 @@ public:
|
||||
|
||||
//- Maximum receive size, excluding the specified processor rank
|
||||
//- Must call finishedSends() or other finished.. method first!
|
||||
label maxNonLocalRecvCount(const label proci) const;
|
||||
label maxNonLocalRecvCount(const label excludeProci) const;
|
||||
|
||||
//- Number of unconsumed receive bytes for the specified processor.
|
||||
//- Must call finishedSends() or other finished.. method first!
|
||||
@ -392,7 +428,15 @@ public:
|
||||
// \param wait wait for requests to complete (in nonBlocking mode)
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
void finishedSends(labelList& recvSizes, const bool wait = true);
|
||||
void finishedSends
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait = true
|
||||
);
|
||||
|
||||
|
||||
// Functions with restricted neighbours
|
||||
@ -406,12 +450,14 @@ public:
|
||||
// \param wait wait for requests to complete (in nonBlocking mode)
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
void finishedSends
|
||||
(
|
||||
const labelUList& sendProcs,
|
||||
const labelUList& recvProcs,
|
||||
const bool wait = true
|
||||
);
|
||||
#endif
|
||||
|
||||
//- Mark sends as done using subset of send/recv ranks
|
||||
//- to exchange data on. Recovers the sizes (bytes) received.
|
||||
@ -424,6 +470,7 @@ public:
|
||||
// \param wait wait for requests to complete (in nonBlocking mode)
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
void finishedSends
|
||||
(
|
||||
const labelUList& sendProcs,
|
||||
@ -431,6 +478,7 @@ public:
|
||||
labelList& recvSizes,
|
||||
const bool wait = true
|
||||
);
|
||||
#endif
|
||||
|
||||
//- A caching version that uses a limited send/recv connectivity.
|
||||
//
|
||||
@ -443,6 +491,7 @@ public:
|
||||
// \return True if the send/recv connectivity changed
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
bool finishedSends
|
||||
(
|
||||
bitSet& sendConnections,
|
||||
@ -450,6 +499,7 @@ public:
|
||||
DynamicList<label>& recvProcs,
|
||||
const bool wait = true
|
||||
);
|
||||
#endif
|
||||
|
||||
//- Mark sends as done using subset of send/recv ranks
|
||||
//- and recover the sizes (bytes) received.
|
||||
@ -480,7 +530,11 @@ public:
|
||||
void finishedNeighbourSends
|
||||
(
|
||||
const labelUList& neighProcs,
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait = true
|
||||
);
|
||||
|
||||
@ -505,7 +559,15 @@ public:
|
||||
// \param wait wait for requests to complete (in nonBlocking mode)
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
void finishedGathers(labelList& recvSizes, const bool wait = true);
|
||||
void finishedGathers
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait = true
|
||||
);
|
||||
|
||||
//- Mark all sends to sub-procs as done.
|
||||
//
|
||||
@ -525,7 +587,15 @@ public:
|
||||
// \param wait wait for requests to complete (in nonBlocking mode)
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
void finishedScatters(labelList& recvSizes, const bool wait = true);
|
||||
void finishedScatters
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait = true
|
||||
);
|
||||
};
|
||||
|
||||
|
||||
|
@ -43,7 +43,12 @@ Foam::zoneDistribute::zoneDistribute(const fvMesh& mesh)
|
||||
MeshObject<fvMesh, Foam::TopologicalMeshObject, zoneDistribute>(mesh),
|
||||
stencil_(zoneCPCStencil::New(mesh)),
|
||||
globalNumbering_(stencil_.globalNumbering()),
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
send_(UPstream::nProcs()),
|
||||
#else
|
||||
// Default map sizing (as per PstreamBuffers)
|
||||
send_(),
|
||||
#endif
|
||||
pBufs_(UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
// Don't clear storage on persistent buffer
|
||||
@ -90,6 +95,7 @@ void Foam::zoneDistribute::setUpCommforZone
|
||||
|
||||
if (UPstream::parRun())
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
List<labelHashSet> needed(UPstream::nProcs());
|
||||
|
||||
// Bin according to originating (sending) processor
|
||||
@ -136,6 +142,61 @@ void Foam::zoneDistribute::setUpCommforZone
|
||||
fromProc >> send_[proci];
|
||||
}
|
||||
}
|
||||
#else
|
||||
|
||||
forAllIters(send_, iter)
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
|
||||
// Bin according to originating (sending) processor
|
||||
for (const label celli : stencil.needsComm())
|
||||
{
|
||||
if (zone[celli])
|
||||
{
|
||||
for (const label gblIdx : stencil_[celli])
|
||||
{
|
||||
const label proci = globalNumbering_.whichProcID(gblIdx);
|
||||
|
||||
if (proci != Pstream::myProcNo())
|
||||
{
|
||||
send_(proci).insert(gblIdx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stream the send data into PstreamBuffers,
|
||||
|
||||
pBufs_.clear();
|
||||
|
||||
forAllIters(send_, iter)
|
||||
{
|
||||
const label proci = iter.key();
|
||||
auto& indices = iter.val();
|
||||
|
||||
if (proci != UPstream::myProcNo() && !indices.empty())
|
||||
{
|
||||
// Serialize as List
|
||||
UOPstream toProc(proci, pBufs_);
|
||||
toProc << indices;
|
||||
}
|
||||
|
||||
// Clear out old contents
|
||||
indices.clear();
|
||||
}
|
||||
|
||||
pBufs_.finishedSends();
|
||||
|
||||
for (const int proci : pBufs_.allProcs())
|
||||
{
|
||||
if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci))
|
||||
{
|
||||
UIPstream fromProc(proci, pBufs_);
|
||||
fromProc >> send_(proci);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,6 +90,8 @@ class zoneDistribute
|
||||
//- Global number into index of cells/faces
|
||||
const globalIndex& globalNumbering_;
|
||||
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
|
||||
//- Global cell/face index to send for processor-to-processor comms
|
||||
List<labelList> send_;
|
||||
|
||||
@ -102,6 +104,14 @@ class zoneDistribute
|
||||
//- Parallel [cache]: recv data from these ranks
|
||||
DynamicList<label> recvProcs_;
|
||||
|
||||
#else
|
||||
|
||||
//- Per proc the global cell/face index to send for
|
||||
//- processor-to-processor comms
|
||||
Map<labelHashSet> send_;
|
||||
|
||||
#endif
|
||||
|
||||
//- Persistent set of exchange buffers
|
||||
PstreamBuffers pBufs_;
|
||||
|
||||
|
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2019-2020 DLR
|
||||
Copyright (C) 2020-2022 OpenCFD Ltd.
|
||||
Copyright (C) 2020-2023 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -161,6 +161,8 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
|
||||
|
||||
if (UPstream::parRun())
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
|
||||
if (sendConnections_.empty())
|
||||
{
|
||||
WarningInFunction
|
||||
@ -208,6 +210,48 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
|
||||
neiValues += tmpValues;
|
||||
}
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
pBufs_.clear();
|
||||
|
||||
forAllConstIters(send_, iter)
|
||||
{
|
||||
const label proci = iter.key();
|
||||
const auto& indices = iter.val();
|
||||
|
||||
if (proci != UPstream::myProcNo() && !indices.empty())
|
||||
{
|
||||
// Serialize as Map
|
||||
Map<Type> sendValues(2*indices.size());
|
||||
|
||||
for (const label sendIdx : indices)
|
||||
{
|
||||
sendValues.insert
|
||||
(
|
||||
sendIdx,
|
||||
getLocalValue(phi, globalNumbering_.toLocal(sendIdx))
|
||||
);
|
||||
}
|
||||
|
||||
UOPstream toProc(proci, pBufs_);
|
||||
toProc << sendValues;
|
||||
}
|
||||
}
|
||||
|
||||
pBufs_.finishedSends();
|
||||
|
||||
for (const int proci : pBufs_.allProcs())
|
||||
{
|
||||
if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci))
|
||||
{
|
||||
UIPstream fromProc(proci, pBufs_);
|
||||
Map<Type> tmpValues(fromProc);
|
||||
|
||||
neiValues += tmpValues;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
return neiValues;
|
||||
|
Loading…
Reference in New Issue
Block a user