Compare commits
1 Commits
master
...
feature-ps
Author | SHA1 | Date | |
---|---|---|---|
|
00e2d3e4ef |
@ -130,7 +130,7 @@ OptimisationSwitches
|
||||
// Number processors to change to tree communication
|
||||
nProcsSimpleSum 0;
|
||||
// Min numProc to use non-blocking exchange algorithm (Hoeffler: NBX)
|
||||
nonBlockingExchange 0;
|
||||
nonBlockingExchange 1;
|
||||
|
||||
// MPI buffer size (bytes)
|
||||
// Can override with the MPI_BUFFER_SIZE env variable.
|
||||
|
@ -27,14 +27,52 @@ License
|
||||
\*---------------------------------------------------------------------------*/
|
||||
|
||||
#include "PstreamBuffers.H"
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
#include "bitSet.H"
|
||||
#endif
|
||||
|
||||
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
|
||||
|
||||
namespace Foam
|
||||
{
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
//- Retrieve size of specified buffer, first checking for existence
|
||||
static inline label getBufferSize
|
||||
(
|
||||
const Map<DynamicList<char>>& buffers,
|
||||
const label proci
|
||||
)
|
||||
{
|
||||
const auto iter = buffers.cfind(proci);
|
||||
|
||||
return (iter.good() ? iter.val().size() : 0);
|
||||
}
|
||||
#else
|
||||
//- Retrieve size of specified buffer, no access checking
|
||||
static inline label getBufferSize
|
||||
(
|
||||
const UList<DynamicList<char>>& buffers,
|
||||
const label proci
|
||||
)
|
||||
{
|
||||
return buffers[proci].size();
|
||||
}
|
||||
#endif
|
||||
|
||||
} // End namespace Foam
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
|
||||
|
||||
void Foam::PstreamBuffers::finalExchange
|
||||
(
|
||||
const bool wait,
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes
|
||||
#else
|
||||
labelList& recvSizes
|
||||
#endif
|
||||
)
|
||||
{
|
||||
// Could also check that it is not called twice
|
||||
@ -43,8 +81,35 @@ void Foam::PstreamBuffers::finalExchange
|
||||
|
||||
if (commsType_ == UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
// Dense storage uses all-to-all
|
||||
Pstream::exchangeSizes(sendBuffers_, recvSizes, comm_);
|
||||
// Use PEX algorithm
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
// PEX stage 1: exchange sizes (non-blocking consensus)
|
||||
Pstream::exchangeSizes
|
||||
(
|
||||
sendBuffers_,
|
||||
recvSizes,
|
||||
tag_,
|
||||
comm_
|
||||
);
|
||||
#else
|
||||
// Like Pstream::exchangeSizes
|
||||
labelList sendSizes(nProcs_);
|
||||
forAll(sendBuffers_, proci)
|
||||
{
|
||||
sendSizes[proci] = sendBuffers_[proci].size();
|
||||
}
|
||||
recvSizes.resize_nocopy(nProcs_);
|
||||
|
||||
// PEX stage 1: exchange sizes (non-blocking consensus)
|
||||
UPstream::allToAllConsensus
|
||||
(
|
||||
sendSizes,
|
||||
recvSizes,
|
||||
(tag_ + 314159), // some unique tag?
|
||||
comm_
|
||||
);
|
||||
#endif
|
||||
|
||||
Pstream::exchange<DynamicList<char>, char>
|
||||
(
|
||||
@ -59,6 +124,7 @@ void Foam::PstreamBuffers::finalExchange
|
||||
}
|
||||
|
||||
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
void Foam::PstreamBuffers::finalExchange
|
||||
(
|
||||
const labelUList& sendProcs,
|
||||
@ -94,14 +160,19 @@ void Foam::PstreamBuffers::finalExchange
|
||||
);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finalExchangeGatherScatter
|
||||
void Foam::PstreamBuffers::finalGatherScatter
|
||||
(
|
||||
const bool isGather,
|
||||
const bool wait,
|
||||
const bool needSizes,
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes
|
||||
#else
|
||||
labelList& recvSizes
|
||||
#endif
|
||||
)
|
||||
{
|
||||
// Could also check that it is not called twice
|
||||
@ -114,10 +185,20 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
|
||||
|
||||
// Only send to master [0]. Master is also allowed to 'send' to itself
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllIters(sendBuffers_, iter)
|
||||
{
|
||||
if (iter.key() != 0)
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
}
|
||||
#else
|
||||
for (label proci=1; proci < sendBuffers_.size(); ++proci)
|
||||
{
|
||||
sendBuffers_[proci].clear();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -133,40 +214,100 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
|
||||
|
||||
if (commsType_ == UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
labelList recvCount;
|
||||
#else
|
||||
labelList& recvCount = recvSizes;
|
||||
#endif
|
||||
|
||||
if (isGather)
|
||||
{
|
||||
// gather mode (all-to-one): master [0] <- everyone
|
||||
|
||||
recvSizes =
|
||||
UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
|
||||
const label nSend = getBufferSize(sendBuffers_, 0);
|
||||
|
||||
recvCount = UPstream::listGatherValues(nSend, comm_);
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
// Transcribe recv count from list to map
|
||||
recvSizes.clear();
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
for (label proci=1; proci < recvCount.size(); ++proci)
|
||||
{
|
||||
if (recvCount[proci] > 0)
|
||||
{
|
||||
recvSizes.insert(proci, recvCount[proci]);
|
||||
}
|
||||
}
|
||||
}
|
||||
#else
|
||||
if (!UPstream::master(comm_))
|
||||
{
|
||||
recvSizes.resize_nocopy(nProcs_);
|
||||
recvSizes = Zero;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
// scatter mode (one-to-all): master [0] -> everyone
|
||||
|
||||
recvSizes.resize_nocopy(nProcs_);
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
recvCount.resize(nProcs_, Zero);
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(sendBuffers_, iter)
|
||||
{
|
||||
recvCount[iter.key()] = iter.val().size();
|
||||
}
|
||||
#else
|
||||
forAll(sendBuffers_, proci)
|
||||
{
|
||||
recvCount[proci] = sendBuffers_[proci].size();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
// Scattering, so non-master sends nothing
|
||||
recvCount = Zero;
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
recvSizes.clear();
|
||||
recvSizes.resize_nocopy(nProcs_);
|
||||
#else
|
||||
recvSizes = Zero;
|
||||
#endif
|
||||
}
|
||||
|
||||
const label nRecv(UPstream::listScatterValues(recvCount, comm_));
|
||||
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
forAll(sendBuffers_, proci)
|
||||
{
|
||||
recvSizes[proci] = sendBuffers_[proci].size();
|
||||
}
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
recvSizes.clear();
|
||||
#else
|
||||
recvSizes = Zero;
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
recvSizes.clear();
|
||||
|
||||
const label myRecv(UPstream::listScatterValues(recvSizes, comm_));
|
||||
|
||||
recvSizes = Zero;
|
||||
recvSizes[0] = myRecv;
|
||||
if (nRecv)
|
||||
{
|
||||
recvSizes.insert(0, nRecv);
|
||||
}
|
||||
#else
|
||||
recvSizes = Zero;
|
||||
recvSizes[0] = nRecv;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Pstream::exchange<DynamicList<char>, char>
|
||||
(
|
||||
sendBuffers_,
|
||||
@ -197,9 +338,18 @@ Foam::PstreamBuffers::PstreamBuffers
|
||||
tag_(tag),
|
||||
comm_(communicator),
|
||||
nProcs_(UPstream::nProcs(comm_)),
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
// Default sizing (128) is probably OK.
|
||||
// Meshes often have 16-20 neighbours (avg) and 100 neighbours (max)
|
||||
sendBuffers_(),
|
||||
recvBuffers_(),
|
||||
recvPositions_()
|
||||
#else
|
||||
sendBuffers_(nProcs_),
|
||||
recvBuffers_(nProcs_),
|
||||
recvPositions_(nProcs_, Zero)
|
||||
#endif
|
||||
{}
|
||||
|
||||
|
||||
@ -207,7 +357,23 @@ Foam::PstreamBuffers::PstreamBuffers
|
||||
|
||||
Foam::PstreamBuffers::~PstreamBuffers()
|
||||
{
|
||||
// Check that all data has been consumed.
|
||||
// Check that all data has been consumed
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(recvBuffers_, iter)
|
||||
{
|
||||
const label proci = iter.key();
|
||||
const label len = iter.val().size();
|
||||
const label pos = recvPositions_.lookup(proci, len);
|
||||
|
||||
if (pos < len)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "Message from processor " << proci
|
||||
<< " Only consumed " << pos << " of " << len << " bytes" << nl
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
}
|
||||
#else
|
||||
forAll(recvBuffers_, proci)
|
||||
{
|
||||
const label pos = recvPositions_[proci];
|
||||
@ -221,6 +387,7 @@ Foam::PstreamBuffers::~PstreamBuffers()
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -231,7 +398,11 @@ Foam::DynamicList<char>& Foam::PstreamBuffers::accessSendBuffer
|
||||
const label proci
|
||||
)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
return sendBuffers_(proci); // Created on demand if needed
|
||||
#else
|
||||
return sendBuffers_[proci];
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -240,13 +411,21 @@ Foam::DynamicList<char>& Foam::PstreamBuffers::accessRecvBuffer
|
||||
const label proci
|
||||
)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
return recvBuffers_(proci); // Created on demand if needed
|
||||
#else
|
||||
return recvBuffers_[proci];
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
return recvPositions_(proci, 0); // Created on demand if needed
|
||||
#else
|
||||
return recvPositions_[proci];
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -254,20 +433,38 @@ Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
|
||||
|
||||
void Foam::PstreamBuffers::clearSends()
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllIters(sendBuffers_, iter)
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
#else
|
||||
for (DynamicList<char>& buf : sendBuffers_)
|
||||
{
|
||||
buf.clear();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::clearRecvs()
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllIters(recvBuffers_, iter)
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
forAllIters(recvPositions_, iter)
|
||||
{
|
||||
iter.val() = 0;
|
||||
}
|
||||
#else
|
||||
for (DynamicList<char>& buf : recvBuffers_)
|
||||
{
|
||||
buf.clear();
|
||||
}
|
||||
recvPositions_ = Zero;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -281,14 +478,41 @@ void Foam::PstreamBuffers::clear()
|
||||
|
||||
void Foam::PstreamBuffers::clearSend(const label proci)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
{
|
||||
auto iter = sendBuffers_.find(proci);
|
||||
if (iter.good())
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
}
|
||||
#else
|
||||
sendBuffers_[proci].clear();
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::clearRecv(const label proci)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
{
|
||||
auto iter = recvBuffers_.find(proci);
|
||||
if (iter.good())
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
}
|
||||
{
|
||||
auto iter = recvPositions_.find(proci);
|
||||
if (iter.good())
|
||||
{
|
||||
iter.val() = 0;
|
||||
}
|
||||
}
|
||||
#else
|
||||
recvBuffers_[proci].clear();
|
||||
recvPositions_[proci] = 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -296,6 +520,21 @@ void Foam::PstreamBuffers::clearStorage()
|
||||
{
|
||||
// Could also clear out entire sendBuffers_, recvBuffers_ and reallocate.
|
||||
// Not sure if it makes much difference
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllIters(sendBuffers_, iter)
|
||||
{
|
||||
iter.val().clearStorage();
|
||||
}
|
||||
forAllIters(recvBuffers_, iter)
|
||||
{
|
||||
iter.val().clearStorage();
|
||||
}
|
||||
forAllIters(recvPositions_, iter)
|
||||
{
|
||||
iter.val() = 0;
|
||||
}
|
||||
#else
|
||||
for (DynamicList<char>& buf : sendBuffers_)
|
||||
{
|
||||
buf.clearStorage();
|
||||
@ -305,6 +544,7 @@ void Foam::PstreamBuffers::clearStorage()
|
||||
buf.clearStorage();
|
||||
}
|
||||
recvPositions_ = Zero;
|
||||
#endif
|
||||
|
||||
finishedSendsCalled_ = false;
|
||||
}
|
||||
@ -312,6 +552,15 @@ void Foam::PstreamBuffers::clearStorage()
|
||||
|
||||
bool Foam::PstreamBuffers::hasSendData() const
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(sendBuffers_, iter)
|
||||
{
|
||||
if (!iter.val().empty())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
#else
|
||||
for (const DynamicList<char>& buf : sendBuffers_)
|
||||
{
|
||||
if (!buf.empty())
|
||||
@ -319,6 +568,7 @@ bool Foam::PstreamBuffers::hasSendData() const
|
||||
return true;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -327,6 +577,18 @@ bool Foam::PstreamBuffers::hasRecvData() const
|
||||
{
|
||||
if (finishedSendsCalled_)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(recvBuffers_, iter)
|
||||
{
|
||||
const label proci = iter.key();
|
||||
const label len = iter.val().size();
|
||||
|
||||
if (recvPositions_.lookup(proci, 0) < len)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
#else
|
||||
forAll(recvBuffers_, proci)
|
||||
{
|
||||
if (recvPositions_[proci] < recvBuffers_[proci].size())
|
||||
@ -334,6 +596,7 @@ bool Foam::PstreamBuffers::hasRecvData() const
|
||||
return true;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#ifdef FULLDEBUG
|
||||
else
|
||||
@ -349,7 +612,7 @@ bool Foam::PstreamBuffers::hasRecvData() const
|
||||
|
||||
Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
|
||||
{
|
||||
return sendBuffers_[proci].size();
|
||||
return getBufferSize(sendBuffers_, proci);
|
||||
}
|
||||
|
||||
|
||||
@ -357,8 +620,16 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
|
||||
{
|
||||
if (finishedSendsCalled_)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
#else
|
||||
const label len
|
||||
(
|
||||
getBufferSize(recvBuffers_, proci)
|
||||
- recvPositions_.lookup(proci, 0)
|
||||
);
|
||||
#else
|
||||
const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
|
||||
|
||||
#endif
|
||||
if (len > 0)
|
||||
{
|
||||
return len;
|
||||
@ -378,10 +649,25 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
|
||||
|
||||
Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
|
||||
{
|
||||
labelList counts(recvPositions_.size(), Zero);
|
||||
labelList counts(nProcs_, Zero);
|
||||
|
||||
if (finishedSendsCalled_)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(recvBuffers_, iter)
|
||||
{
|
||||
const label proci = iter.key();
|
||||
const label len
|
||||
(
|
||||
iter.val().size() - recvPositions_.lookup(proci, 0)
|
||||
);
|
||||
|
||||
if (len > 0)
|
||||
{
|
||||
counts[proci] = len;
|
||||
}
|
||||
}
|
||||
#else
|
||||
forAll(recvBuffers_, proci)
|
||||
{
|
||||
const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
|
||||
@ -391,6 +677,7 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
|
||||
counts[proci] = len;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#ifdef FULLDEBUG
|
||||
else
|
||||
@ -404,20 +691,35 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
|
||||
}
|
||||
|
||||
|
||||
Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount(const label proci) const
|
||||
Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount
|
||||
(
|
||||
const label excludeProci
|
||||
) const
|
||||
{
|
||||
label maxLen = 0;
|
||||
|
||||
if (finishedSendsCalled_)
|
||||
{
|
||||
forAll(recvBuffers_, idx)
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
forAllConstIters(recvBuffers_, iter)
|
||||
{
|
||||
const label len(recvBuffers_[idx].size() - recvPositions_[idx]);
|
||||
if (idx != proci)
|
||||
const label proci = iter.key();
|
||||
if (excludeProci != proci)
|
||||
{
|
||||
label len(iter.val().size() - recvPositions_.lookup(proci, 0));
|
||||
maxLen = max(maxLen, len);
|
||||
}
|
||||
}
|
||||
#else
|
||||
forAll(recvBuffers_, proci)
|
||||
{
|
||||
if (excludeProci != proci)
|
||||
{
|
||||
label len(recvBuffers_[proci].size() - recvPositions_[proci]);
|
||||
maxLen = max(maxLen, len);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#ifdef FULLDEBUG
|
||||
else
|
||||
@ -449,6 +751,24 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
|
||||
{
|
||||
if (finishedSendsCalled_)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
const auto iter = recvBuffers_.cfind(proci);
|
||||
|
||||
if (iter.good())
|
||||
{
|
||||
const label pos = recvPositions_.lookup(proci, 0);
|
||||
const label len = iter.val().size();
|
||||
|
||||
if (pos < len)
|
||||
{
|
||||
return UList<char>
|
||||
(
|
||||
const_cast<char*>(iter.val().cbegin(pos)),
|
||||
(len - pos)
|
||||
);
|
||||
}
|
||||
}
|
||||
#else
|
||||
const label pos = recvPositions_[proci];
|
||||
const label len = recvBuffers_[proci].size();
|
||||
|
||||
@ -460,6 +780,7 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
|
||||
(len - pos)
|
||||
);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#ifdef FULLDEBUG
|
||||
else
|
||||
@ -483,14 +804,22 @@ bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
|
||||
|
||||
void Foam::PstreamBuffers::finishedSends(const bool wait)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label> recvSizes;
|
||||
#else
|
||||
labelList recvSizes;
|
||||
#endif
|
||||
finalExchange(wait, recvSizes);
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finishedSends
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait
|
||||
)
|
||||
{
|
||||
@ -510,6 +839,7 @@ void Foam::PstreamBuffers::finishedSends
|
||||
}
|
||||
|
||||
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
void Foam::PstreamBuffers::finishedSends
|
||||
(
|
||||
const labelUList& sendProcs,
|
||||
@ -612,16 +942,58 @@ bool Foam::PstreamBuffers::finishedSends
|
||||
|
||||
return changed;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finishedNeighbourSends
|
||||
(
|
||||
const labelUList& neighProcs,
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait
|
||||
)
|
||||
{
|
||||
finishedSends(neighProcs, neighProcs, recvSizes, wait);
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
recvSizes.clear();
|
||||
|
||||
for (const label proci : neighProcs)
|
||||
{
|
||||
recvSizes.insert(proci, 0);
|
||||
}
|
||||
|
||||
// Prune any send buffers that are not neighbours
|
||||
forAllIters(sendBuffers_, iter)
|
||||
{
|
||||
if (!recvSizes.contains(iter.key()))
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
}
|
||||
|
||||
finalExchange(wait, recvSizes);
|
||||
#else
|
||||
// Resize for copying back
|
||||
recvSizes.resize_nocopy(sendBuffers_.size());
|
||||
|
||||
// Prune send buffers that are not neighbours
|
||||
{
|
||||
labelHashSet keepProcs(neighProcs);
|
||||
|
||||
// Prune send buffers that are not neighbours
|
||||
forAll(sendBuffers_, proci)
|
||||
{
|
||||
if (!keepProcs.contains(proci))
|
||||
{
|
||||
sendBuffers_[proci].clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
finalExchange(wait, recvSizes);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@ -631,31 +1003,65 @@ void Foam::PstreamBuffers::finishedNeighbourSends
|
||||
const bool wait
|
||||
)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
finishedSends(neighProcs, neighProcs, wait);
|
||||
#else
|
||||
labelList recvSizes;
|
||||
|
||||
// Prune send buffers that are not neighbours
|
||||
{
|
||||
labelHashSet keepProcs(neighProcs);
|
||||
|
||||
// Prune send buffers that are not neighbours
|
||||
forAll(sendBuffers_, proci)
|
||||
{
|
||||
if (!keepProcs.contains(proci))
|
||||
{
|
||||
sendBuffers_[proci].clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
finalExchange(wait, recvSizes);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finishedGathers(const bool wait)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label> recvSizes;
|
||||
finalGatherScatter(true, wait, false, recvSizes);
|
||||
#else
|
||||
labelList recvSizes;
|
||||
finalExchangeGatherScatter(true, wait, false, recvSizes);
|
||||
finalGatherScatter(true, wait, false, recvSizes);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finishedScatters(const bool wait)
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label> recvSizes;
|
||||
finalGatherScatter(false, wait, false, recvSizes);
|
||||
#else
|
||||
labelList recvSizes;
|
||||
finalExchangeGatherScatter(false, wait, false, recvSizes);
|
||||
finalGatherScatter(false, wait, false, recvSizes);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void Foam::PstreamBuffers::finishedGathers
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait
|
||||
)
|
||||
{
|
||||
finalExchangeGatherScatter(true, wait, true, recvSizes);
|
||||
finalGatherScatter(true, wait, true, recvSizes);
|
||||
|
||||
if (commsType_ != UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
@ -673,11 +1079,15 @@ void Foam::PstreamBuffers::finishedGathers
|
||||
|
||||
void Foam::PstreamBuffers::finishedScatters
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait
|
||||
)
|
||||
{
|
||||
finalExchangeGatherScatter(false, wait, true, recvSizes);
|
||||
finalGatherScatter(false, wait, true, recvSizes);
|
||||
|
||||
if (commsType_ != UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
|
@ -100,16 +100,27 @@ SourceFiles
|
||||
#define Foam_PstreamBuffers_H
|
||||
|
||||
#include "DynamicList.H"
|
||||
#include "Map.H"
|
||||
#include "UPstream.H"
|
||||
#include "IOstream.H"
|
||||
|
||||
// Transitional
|
||||
#define Foam_PstreamBuffers_map_storage
|
||||
// #define Foam_PstreamBuffers_dense
|
||||
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
#undef Foam_PstreamBuffers_map_storage
|
||||
#endif
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
namespace Foam
|
||||
{
|
||||
|
||||
// Forward Declarations
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
class bitSet;
|
||||
#endif
|
||||
|
||||
/*---------------------------------------------------------------------------*\
|
||||
Class PstreamBuffers Declaration
|
||||
@ -143,6 +154,19 @@ class PstreamBuffers
|
||||
|
||||
// Buffer storage
|
||||
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
|
||||
//- Send buffers (sparse)
|
||||
Map<DynamicList<char>> sendBuffers_;
|
||||
|
||||
//- Receive buffers (sparse)
|
||||
Map<DynamicList<char>> recvBuffers_;
|
||||
|
||||
//- Current read positions within recvBuffers_ (sparse)
|
||||
Map<label> recvPositions_;
|
||||
|
||||
#else
|
||||
|
||||
//- Send buffers. Size is nProcs()
|
||||
List<DynamicList<char>> sendBuffers_;
|
||||
|
||||
@ -152,6 +176,8 @@ class PstreamBuffers
|
||||
//- Current read positions within recvBuffers_. Size is nProcs()
|
||||
labelList recvPositions_;
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
// Private Member Functions
|
||||
|
||||
@ -160,12 +186,17 @@ class PstreamBuffers
|
||||
void finalExchange
|
||||
(
|
||||
const bool wait,
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes
|
||||
#else
|
||||
labelList& recvSizes
|
||||
#endif
|
||||
);
|
||||
|
||||
//- Mark sends as done.
|
||||
// Only exchange sizes using the sendProcs/recvProcs subset
|
||||
// (nonBlocking comms).
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
void finalExchange
|
||||
(
|
||||
const labelUList& sendProcs,
|
||||
@ -173,6 +204,7 @@ class PstreamBuffers
|
||||
const bool wait,
|
||||
labelList& recvSizes
|
||||
);
|
||||
#endif
|
||||
|
||||
//- For all-to-one or one-to-all
|
||||
void finalExchangeGatherScatter
|
||||
@ -180,7 +212,11 @@ class PstreamBuffers
|
||||
const bool isGather,
|
||||
const bool wait,
|
||||
const bool needSizes, // If recvSizes needed or scratch
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes
|
||||
#else
|
||||
labelList& recvSizes
|
||||
#endif
|
||||
);
|
||||
|
||||
|
||||
@ -340,7 +376,7 @@ public:
|
||||
|
||||
//- Maximum receive size, excluding the specified processor rank
|
||||
//- Must call finishedSends() or other finished.. method first!
|
||||
label maxNonLocalRecvCount(const label proci) const;
|
||||
label maxNonLocalRecvCount(const label excludeProci) const;
|
||||
|
||||
//- Number of unconsumed receive bytes for the specified processor.
|
||||
//- Must call finishedSends() or other finished.. method first!
|
||||
@ -392,7 +428,15 @@ public:
|
||||
// \param wait wait for requests to complete (in nonBlocking mode)
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
void finishedSends(labelList& recvSizes, const bool wait = true);
|
||||
void finishedSends
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait = true
|
||||
);
|
||||
|
||||
|
||||
// Functions with restricted neighbours
|
||||
@ -406,12 +450,14 @@ public:
|
||||
// \param wait wait for requests to complete (in nonBlocking mode)
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
void finishedSends
|
||||
(
|
||||
const labelUList& sendProcs,
|
||||
const labelUList& recvProcs,
|
||||
const bool wait = true
|
||||
);
|
||||
#endif
|
||||
|
||||
//- Mark sends as done using subset of send/recv ranks
|
||||
//- to exchange data on. Recovers the sizes (bytes) received.
|
||||
@ -424,6 +470,7 @@ public:
|
||||
// \param wait wait for requests to complete (in nonBlocking mode)
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
void finishedSends
|
||||
(
|
||||
const labelUList& sendProcs,
|
||||
@ -431,6 +478,7 @@ public:
|
||||
labelList& recvSizes,
|
||||
const bool wait = true
|
||||
);
|
||||
#endif
|
||||
|
||||
//- A caching version that uses a limited send/recv connectivity.
|
||||
//
|
||||
@ -443,6 +491,7 @@ public:
|
||||
// \return True if the send/recv connectivity changed
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
bool finishedSends
|
||||
(
|
||||
bitSet& sendConnections,
|
||||
@ -450,6 +499,7 @@ public:
|
||||
DynamicList<label>& recvProcs,
|
||||
const bool wait = true
|
||||
);
|
||||
#endif
|
||||
|
||||
//- Mark sends as done using subset of send/recv ranks
|
||||
//- and recover the sizes (bytes) received.
|
||||
@ -480,7 +530,11 @@ public:
|
||||
void finishedNeighbourSends
|
||||
(
|
||||
const labelUList& neighProcs,
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait = true
|
||||
);
|
||||
|
||||
@ -505,7 +559,15 @@ public:
|
||||
// \param wait wait for requests to complete (in nonBlocking mode)
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
void finishedGathers(labelList& recvSizes, const bool wait = true);
|
||||
void finishedGathers
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait = true
|
||||
);
|
||||
|
||||
//- Mark all sends to sub-procs as done.
|
||||
//
|
||||
@ -525,7 +587,15 @@ public:
|
||||
// \param wait wait for requests to complete (in nonBlocking mode)
|
||||
//
|
||||
// \warning currently only valid for nonBlocking comms.
|
||||
void finishedScatters(labelList& recvSizes, const bool wait = true);
|
||||
void finishedScatters
|
||||
(
|
||||
#ifdef Foam_PstreamBuffers_map_storage
|
||||
Map<label>& recvSizes,
|
||||
#else
|
||||
labelList& recvSizes,
|
||||
#endif
|
||||
const bool wait = true
|
||||
);
|
||||
};
|
||||
|
||||
|
||||
|
@ -43,7 +43,12 @@ Foam::zoneDistribute::zoneDistribute(const fvMesh& mesh)
|
||||
MeshObject<fvMesh, Foam::TopologicalMeshObject, zoneDistribute>(mesh),
|
||||
stencil_(zoneCPCStencil::New(mesh)),
|
||||
globalNumbering_(stencil_.globalNumbering()),
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
send_(UPstream::nProcs()),
|
||||
#else
|
||||
// Default map sizing (as per PstreamBuffers)
|
||||
send_(),
|
||||
#endif
|
||||
pBufs_(UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
// Don't clear storage on persistent buffer
|
||||
@ -90,6 +95,7 @@ void Foam::zoneDistribute::setUpCommforZone
|
||||
|
||||
if (UPstream::parRun())
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
List<labelHashSet> needed(UPstream::nProcs());
|
||||
|
||||
// Bin according to originating (sending) processor
|
||||
@ -136,6 +142,61 @@ void Foam::zoneDistribute::setUpCommforZone
|
||||
fromProc >> send_[proci];
|
||||
}
|
||||
}
|
||||
#else
|
||||
|
||||
forAllIters(send_, iter)
|
||||
{
|
||||
iter.val().clear();
|
||||
}
|
||||
|
||||
// Bin according to originating (sending) processor
|
||||
for (const label celli : stencil.needsComm())
|
||||
{
|
||||
if (zone[celli])
|
||||
{
|
||||
for (const label gblIdx : stencil_[celli])
|
||||
{
|
||||
const label proci = globalNumbering_.whichProcID(gblIdx);
|
||||
|
||||
if (proci != Pstream::myProcNo())
|
||||
{
|
||||
send_(proci).insert(gblIdx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stream the send data into PstreamBuffers,
|
||||
|
||||
pBufs_.clear();
|
||||
|
||||
forAllIters(send_, iter)
|
||||
{
|
||||
const label proci = iter.key();
|
||||
auto& indices = iter.val();
|
||||
|
||||
if (proci != UPstream::myProcNo() && !indices.empty())
|
||||
{
|
||||
// Serialize as List
|
||||
UOPstream toProc(proci, pBufs_);
|
||||
toProc << indices;
|
||||
}
|
||||
|
||||
// Clear out old contents
|
||||
indices.clear();
|
||||
}
|
||||
|
||||
pBufs_.finishedSends();
|
||||
|
||||
for (const int proci : pBufs_.allProcs())
|
||||
{
|
||||
if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci))
|
||||
{
|
||||
UIPstream fromProc(proci, pBufs_);
|
||||
fromProc >> send_(proci);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,6 +90,8 @@ class zoneDistribute
|
||||
//- Global number into index of cells/faces
|
||||
const globalIndex& globalNumbering_;
|
||||
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
|
||||
//- Global cell/face index to send for processor-to-processor comms
|
||||
List<labelList> send_;
|
||||
|
||||
@ -102,6 +104,14 @@ class zoneDistribute
|
||||
//- Parallel [cache]: recv data from these ranks
|
||||
DynamicList<label> recvProcs_;
|
||||
|
||||
#else
|
||||
|
||||
//- Per proc the global cell/face index to send for
|
||||
//- processor-to-processor comms
|
||||
Map<labelHashSet> send_;
|
||||
|
||||
#endif
|
||||
|
||||
//- Persistent set of exchange buffers
|
||||
PstreamBuffers pBufs_;
|
||||
|
||||
|
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2019-2020 DLR
|
||||
Copyright (C) 2020-2022 OpenCFD Ltd.
|
||||
Copyright (C) 2020-2023 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -161,6 +161,8 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
|
||||
|
||||
if (UPstream::parRun())
|
||||
{
|
||||
#ifdef Foam_PstreamBuffers_dense
|
||||
|
||||
if (sendConnections_.empty())
|
||||
{
|
||||
WarningInFunction
|
||||
@ -208,6 +210,48 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
|
||||
neiValues += tmpValues;
|
||||
}
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
pBufs_.clear();
|
||||
|
||||
forAllConstIters(send_, iter)
|
||||
{
|
||||
const label proci = iter.key();
|
||||
const auto& indices = iter.val();
|
||||
|
||||
if (proci != UPstream::myProcNo() && !indices.empty())
|
||||
{
|
||||
// Serialize as Map
|
||||
Map<Type> sendValues(2*indices.size());
|
||||
|
||||
for (const label sendIdx : indices)
|
||||
{
|
||||
sendValues.insert
|
||||
(
|
||||
sendIdx,
|
||||
getLocalValue(phi, globalNumbering_.toLocal(sendIdx))
|
||||
);
|
||||
}
|
||||
|
||||
UOPstream toProc(proci, pBufs_);
|
||||
toProc << sendValues;
|
||||
}
|
||||
}
|
||||
|
||||
pBufs_.finishedSends();
|
||||
|
||||
for (const int proci : pBufs_.allProcs())
|
||||
{
|
||||
if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci))
|
||||
{
|
||||
UIPstream fromProc(proci, pBufs_);
|
||||
Map<Type> tmpValues(fromProc);
|
||||
|
||||
neiValues += tmpValues;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
return neiValues;
|
||||
|
Loading…
Reference in New Issue
Block a user