Compare commits

...

1 Commits

Author SHA1 Message Date
Mark Olesen
00e2d3e4ef WIP: mapped PstreamBuffers with NBX
ENH: sparse storage and data exchange for PstreamBuffers

- change the underlying storage from a numProcs list of buffers to a
  Map of buffers. The reduced memory footprint on large systems is
  on aspect but the primary motivation is to more easily support
  sparse data exchange patterns.
  The Map storage for PstreamBuffers corresponds to a non-blocking
  consensus exchange of sizes that automatically propagates through
  different parts of the code and avoids all-to-all.

CONFIG: enable nonBlockingExchange as default (for testing)

- this changes the Pstream::exchangeSizes to use NBX instead of
  all-to-all, even for List containers.
2023-03-02 12:49:20 +01:00
6 changed files with 629 additions and 34 deletions

View File

@ -130,7 +130,7 @@ OptimisationSwitches
// Number processors to change to tree communication
nProcsSimpleSum 0;
// Min numProc to use non-blocking exchange algorithm (Hoeffler: NBX)
nonBlockingExchange 0;
nonBlockingExchange 1;
// MPI buffer size (bytes)
// Can override with the MPI_BUFFER_SIZE env variable.

View File

@ -27,14 +27,52 @@ License
\*---------------------------------------------------------------------------*/
#include "PstreamBuffers.H"
#ifdef Foam_PstreamBuffers_dense
#include "bitSet.H"
#endif
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
namespace Foam
{
#ifdef Foam_PstreamBuffers_map_storage
//- Retrieve size of specified buffer, first checking for existence
static inline label getBufferSize
(
const Map<DynamicList<char>>& buffers,
const label proci
)
{
const auto iter = buffers.cfind(proci);
return (iter.good() ? iter.val().size() : 0);
}
#else
//- Retrieve size of specified buffer, no access checking
static inline label getBufferSize
(
const UList<DynamicList<char>>& buffers,
const label proci
)
{
return buffers[proci].size();
}
#endif
} // End namespace Foam
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::PstreamBuffers::finalExchange
(
const bool wait,
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes
#else
labelList& recvSizes
#endif
)
{
// Could also check that it is not called twice
@ -43,8 +81,35 @@ void Foam::PstreamBuffers::finalExchange
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
// Dense storage uses all-to-all
Pstream::exchangeSizes(sendBuffers_, recvSizes, comm_);
// Use PEX algorithm
#ifdef Foam_PstreamBuffers_map_storage
// PEX stage 1: exchange sizes (non-blocking consensus)
Pstream::exchangeSizes
(
sendBuffers_,
recvSizes,
tag_,
comm_
);
#else
// Like Pstream::exchangeSizes
labelList sendSizes(nProcs_);
forAll(sendBuffers_, proci)
{
sendSizes[proci] = sendBuffers_[proci].size();
}
recvSizes.resize_nocopy(nProcs_);
// PEX stage 1: exchange sizes (non-blocking consensus)
UPstream::allToAllConsensus
(
sendSizes,
recvSizes,
(tag_ + 314159), // some unique tag?
comm_
);
#endif
Pstream::exchange<DynamicList<char>, char>
(
@ -59,6 +124,7 @@ void Foam::PstreamBuffers::finalExchange
}
#ifdef Foam_PstreamBuffers_dense
void Foam::PstreamBuffers::finalExchange
(
const labelUList& sendProcs,
@ -94,14 +160,19 @@ void Foam::PstreamBuffers::finalExchange
);
}
}
#endif
void Foam::PstreamBuffers::finalExchangeGatherScatter
void Foam::PstreamBuffers::finalGatherScatter
(
const bool isGather,
const bool wait,
const bool needSizes,
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes
#else
labelList& recvSizes
#endif
)
{
// Could also check that it is not called twice
@ -114,10 +185,20 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
// Only send to master [0]. Master is also allowed to 'send' to itself
#ifdef Foam_PstreamBuffers_map_storage
forAllIters(sendBuffers_, iter)
{
if (iter.key() != 0)
{
iter.val().clear();
}
}
#else
for (label proci=1; proci < sendBuffers_.size(); ++proci)
{
sendBuffers_[proci].clear();
}
#endif
}
else
{
@ -133,40 +214,100 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
#ifdef Foam_PstreamBuffers_map_storage
labelList recvCount;
#else
labelList& recvCount = recvSizes;
#endif
if (isGather)
{
// gather mode (all-to-one): master [0] <- everyone
recvSizes =
UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
const label nSend = getBufferSize(sendBuffers_, 0);
recvCount = UPstream::listGatherValues(nSend, comm_);
#ifdef Foam_PstreamBuffers_map_storage
// Transcribe recv count from list to map
recvSizes.clear();
if (UPstream::master(comm_))
{
for (label proci=1; proci < recvCount.size(); ++proci)
{
if (recvCount[proci] > 0)
{
recvSizes.insert(proci, recvCount[proci]);
}
}
}
#else
if (!UPstream::master(comm_))
{
recvSizes.resize_nocopy(nProcs_);
recvSizes = Zero;
}
#endif
}
else
{
// scatter mode (one-to-all): master [0] -> everyone
recvSizes.resize_nocopy(nProcs_);
if (UPstream::master(comm_))
{
recvCount.resize(nProcs_, Zero);
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(sendBuffers_, iter)
{
recvCount[iter.key()] = iter.val().size();
}
#else
forAll(sendBuffers_, proci)
{
recvCount[proci] = sendBuffers_[proci].size();
}
#endif
}
else
{
// Scattering, so non-master sends nothing
recvCount = Zero;
#ifdef Foam_PstreamBuffers_map_storage
recvSizes.clear();
recvSizes.resize_nocopy(nProcs_);
#else
recvSizes = Zero;
#endif
}
const label nRecv(UPstream::listScatterValues(recvCount, comm_));
if (UPstream::master(comm_))
{
forAll(sendBuffers_, proci)
{
recvSizes[proci] = sendBuffers_[proci].size();
}
#ifdef Foam_PstreamBuffers_map_storage
recvSizes.clear();
#else
recvSizes = Zero;
#endif
}
else
{
#ifdef Foam_PstreamBuffers_map_storage
recvSizes.clear();
const label myRecv(UPstream::listScatterValues(recvSizes, comm_));
recvSizes = Zero;
recvSizes[0] = myRecv;
if (nRecv)
{
recvSizes.insert(0, nRecv);
}
#else
recvSizes = Zero;
recvSizes[0] = nRecv;
#endif
}
}
Pstream::exchange<DynamicList<char>, char>
(
sendBuffers_,
@ -197,9 +338,18 @@ Foam::PstreamBuffers::PstreamBuffers
tag_(tag),
comm_(communicator),
nProcs_(UPstream::nProcs(comm_)),
#ifdef Foam_PstreamBuffers_map_storage
// Default sizing (128) is probably OK.
// Meshes often have 16-20 neighbours (avg) and 100 neighbours (max)
sendBuffers_(),
recvBuffers_(),
recvPositions_()
#else
sendBuffers_(nProcs_),
recvBuffers_(nProcs_),
recvPositions_(nProcs_, Zero)
#endif
{}
@ -207,7 +357,23 @@ Foam::PstreamBuffers::PstreamBuffers
Foam::PstreamBuffers::~PstreamBuffers()
{
// Check that all data has been consumed.
// Check that all data has been consumed
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(recvBuffers_, iter)
{
const label proci = iter.key();
const label len = iter.val().size();
const label pos = recvPositions_.lookup(proci, len);
if (pos < len)
{
FatalErrorInFunction
<< "Message from processor " << proci
<< " Only consumed " << pos << " of " << len << " bytes" << nl
<< Foam::abort(FatalError);
}
}
#else
forAll(recvBuffers_, proci)
{
const label pos = recvPositions_[proci];
@ -221,6 +387,7 @@ Foam::PstreamBuffers::~PstreamBuffers()
<< Foam::abort(FatalError);
}
}
#endif
}
@ -231,7 +398,11 @@ Foam::DynamicList<char>& Foam::PstreamBuffers::accessSendBuffer
const label proci
)
{
#ifdef Foam_PstreamBuffers_map_storage
return sendBuffers_(proci); // Created on demand if needed
#else
return sendBuffers_[proci];
#endif
}
@ -240,13 +411,21 @@ Foam::DynamicList<char>& Foam::PstreamBuffers::accessRecvBuffer
const label proci
)
{
#ifdef Foam_PstreamBuffers_map_storage
return recvBuffers_(proci); // Created on demand if needed
#else
return recvBuffers_[proci];
#endif
}
Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
{
#ifdef Foam_PstreamBuffers_map_storage
return recvPositions_(proci, 0); // Created on demand if needed
#else
return recvPositions_[proci];
#endif
}
@ -254,20 +433,38 @@ Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
void Foam::PstreamBuffers::clearSends()
{
#ifdef Foam_PstreamBuffers_map_storage
forAllIters(sendBuffers_, iter)
{
iter.val().clear();
}
#else
for (DynamicList<char>& buf : sendBuffers_)
{
buf.clear();
}
#endif
}
void Foam::PstreamBuffers::clearRecvs()
{
#ifdef Foam_PstreamBuffers_map_storage
forAllIters(recvBuffers_, iter)
{
iter.val().clear();
}
forAllIters(recvPositions_, iter)
{
iter.val() = 0;
}
#else
for (DynamicList<char>& buf : recvBuffers_)
{
buf.clear();
}
recvPositions_ = Zero;
#endif
}
@ -281,14 +478,41 @@ void Foam::PstreamBuffers::clear()
void Foam::PstreamBuffers::clearSend(const label proci)
{
#ifdef Foam_PstreamBuffers_map_storage
{
auto iter = sendBuffers_.find(proci);
if (iter.good())
{
iter.val().clear();
}
}
#else
sendBuffers_[proci].clear();
#endif
}
void Foam::PstreamBuffers::clearRecv(const label proci)
{
#ifdef Foam_PstreamBuffers_map_storage
{
auto iter = recvBuffers_.find(proci);
if (iter.good())
{
iter.val().clear();
}
}
{
auto iter = recvPositions_.find(proci);
if (iter.good())
{
iter.val() = 0;
}
}
#else
recvBuffers_[proci].clear();
recvPositions_[proci] = 0;
#endif
}
@ -296,6 +520,21 @@ void Foam::PstreamBuffers::clearStorage()
{
// Could also clear out entire sendBuffers_, recvBuffers_ and reallocate.
// Not sure if it makes much difference
#ifdef Foam_PstreamBuffers_map_storage
forAllIters(sendBuffers_, iter)
{
iter.val().clearStorage();
}
forAllIters(recvBuffers_, iter)
{
iter.val().clearStorage();
}
forAllIters(recvPositions_, iter)
{
iter.val() = 0;
}
#else
for (DynamicList<char>& buf : sendBuffers_)
{
buf.clearStorage();
@ -305,6 +544,7 @@ void Foam::PstreamBuffers::clearStorage()
buf.clearStorage();
}
recvPositions_ = Zero;
#endif
finishedSendsCalled_ = false;
}
@ -312,6 +552,15 @@ void Foam::PstreamBuffers::clearStorage()
bool Foam::PstreamBuffers::hasSendData() const
{
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(sendBuffers_, iter)
{
if (!iter.val().empty())
{
return true;
}
}
#else
for (const DynamicList<char>& buf : sendBuffers_)
{
if (!buf.empty())
@ -319,6 +568,7 @@ bool Foam::PstreamBuffers::hasSendData() const
return true;
}
}
#endif
return false;
}
@ -327,6 +577,18 @@ bool Foam::PstreamBuffers::hasRecvData() const
{
if (finishedSendsCalled_)
{
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(recvBuffers_, iter)
{
const label proci = iter.key();
const label len = iter.val().size();
if (recvPositions_.lookup(proci, 0) < len)
{
return true;
}
}
#else
forAll(recvBuffers_, proci)
{
if (recvPositions_[proci] < recvBuffers_[proci].size())
@ -334,6 +596,7 @@ bool Foam::PstreamBuffers::hasRecvData() const
return true;
}
}
#endif
}
#ifdef FULLDEBUG
else
@ -349,7 +612,7 @@ bool Foam::PstreamBuffers::hasRecvData() const
Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
{
return sendBuffers_[proci].size();
return getBufferSize(sendBuffers_, proci);
}
@ -357,8 +620,16 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
{
if (finishedSendsCalled_)
{
#ifdef Foam_PstreamBuffers_map_storage
#else
const label len
(
getBufferSize(recvBuffers_, proci)
- recvPositions_.lookup(proci, 0)
);
#else
const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
#endif
if (len > 0)
{
return len;
@ -378,10 +649,25 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
{
labelList counts(recvPositions_.size(), Zero);
labelList counts(nProcs_, Zero);
if (finishedSendsCalled_)
{
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(recvBuffers_, iter)
{
const label proci = iter.key();
const label len
(
iter.val().size() - recvPositions_.lookup(proci, 0)
);
if (len > 0)
{
counts[proci] = len;
}
}
#else
forAll(recvBuffers_, proci)
{
const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
@ -391,6 +677,7 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
counts[proci] = len;
}
}
#endif
}
#ifdef FULLDEBUG
else
@ -404,20 +691,35 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
}
Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount(const label proci) const
Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount
(
const label excludeProci
) const
{
label maxLen = 0;
if (finishedSendsCalled_)
{
forAll(recvBuffers_, idx)
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(recvBuffers_, iter)
{
const label len(recvBuffers_[idx].size() - recvPositions_[idx]);
if (idx != proci)
const label proci = iter.key();
if (excludeProci != proci)
{
label len(iter.val().size() - recvPositions_.lookup(proci, 0));
maxLen = max(maxLen, len);
}
}
#else
forAll(recvBuffers_, proci)
{
if (excludeProci != proci)
{
label len(recvBuffers_[proci].size() - recvPositions_[proci]);
maxLen = max(maxLen, len);
}
}
#endif
}
#ifdef FULLDEBUG
else
@ -449,6 +751,24 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
{
if (finishedSendsCalled_)
{
#ifdef Foam_PstreamBuffers_map_storage
const auto iter = recvBuffers_.cfind(proci);
if (iter.good())
{
const label pos = recvPositions_.lookup(proci, 0);
const label len = iter.val().size();
if (pos < len)
{
return UList<char>
(
const_cast<char*>(iter.val().cbegin(pos)),
(len - pos)
);
}
}
#else
const label pos = recvPositions_[proci];
const label len = recvBuffers_[proci].size();
@ -460,6 +780,7 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
(len - pos)
);
}
#endif
}
#ifdef FULLDEBUG
else
@ -483,14 +804,22 @@ bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
void Foam::PstreamBuffers::finishedSends(const bool wait)
{
#ifdef Foam_PstreamBuffers_map_storage
Map<label> recvSizes;
#else
labelList recvSizes;
#endif
finalExchange(wait, recvSizes);
}
void Foam::PstreamBuffers::finishedSends
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait
)
{
@ -510,6 +839,7 @@ void Foam::PstreamBuffers::finishedSends
}
#ifdef Foam_PstreamBuffers_dense
void Foam::PstreamBuffers::finishedSends
(
const labelUList& sendProcs,
@ -612,16 +942,58 @@ bool Foam::PstreamBuffers::finishedSends
return changed;
}
#endif
void Foam::PstreamBuffers::finishedNeighbourSends
(
const labelUList& neighProcs,
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait
)
{
finishedSends(neighProcs, neighProcs, recvSizes, wait);
#ifdef Foam_PstreamBuffers_map_storage
recvSizes.clear();
for (const label proci : neighProcs)
{
recvSizes.insert(proci, 0);
}
// Prune any send buffers that are not neighbours
forAllIters(sendBuffers_, iter)
{
if (!recvSizes.contains(iter.key()))
{
iter.val().clear();
}
}
finalExchange(wait, recvSizes);
#else
// Resize for copying back
recvSizes.resize_nocopy(sendBuffers_.size());
// Prune send buffers that are not neighbours
{
labelHashSet keepProcs(neighProcs);
// Prune send buffers that are not neighbours
forAll(sendBuffers_, proci)
{
if (!keepProcs.contains(proci))
{
sendBuffers_[proci].clear();
}
}
}
finalExchange(wait, recvSizes);
#endif
}
@ -631,31 +1003,65 @@ void Foam::PstreamBuffers::finishedNeighbourSends
const bool wait
)
{
#ifdef Foam_PstreamBuffers_map_storage
finishedSends(neighProcs, neighProcs, wait);
#else
labelList recvSizes;
// Prune send buffers that are not neighbours
{
labelHashSet keepProcs(neighProcs);
// Prune send buffers that are not neighbours
forAll(sendBuffers_, proci)
{
if (!keepProcs.contains(proci))
{
sendBuffers_[proci].clear();
}
}
}
finalExchange(wait, recvSizes);
#endif
}
void Foam::PstreamBuffers::finishedGathers(const bool wait)
{
#ifdef Foam_PstreamBuffers_map_storage
Map<label> recvSizes;
finalGatherScatter(true, wait, false, recvSizes);
#else
labelList recvSizes;
finalExchangeGatherScatter(true, wait, false, recvSizes);
finalGatherScatter(true, wait, false, recvSizes);
#endif
}
void Foam::PstreamBuffers::finishedScatters(const bool wait)
{
#ifdef Foam_PstreamBuffers_map_storage
Map<label> recvSizes;
finalGatherScatter(false, wait, false, recvSizes);
#else
labelList recvSizes;
finalExchangeGatherScatter(false, wait, false, recvSizes);
finalGatherScatter(false, wait, false, recvSizes);
#endif
}
void Foam::PstreamBuffers::finishedGathers
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait
)
{
finalExchangeGatherScatter(true, wait, true, recvSizes);
finalGatherScatter(true, wait, true, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -673,11 +1079,15 @@ void Foam::PstreamBuffers::finishedGathers
void Foam::PstreamBuffers::finishedScatters
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait
)
{
finalExchangeGatherScatter(false, wait, true, recvSizes);
finalGatherScatter(false, wait, true, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{

View File

@ -100,16 +100,27 @@ SourceFiles
#define Foam_PstreamBuffers_H
#include "DynamicList.H"
#include "Map.H"
#include "UPstream.H"
#include "IOstream.H"
// Transitional
#define Foam_PstreamBuffers_map_storage
// #define Foam_PstreamBuffers_dense
#ifdef Foam_PstreamBuffers_dense
#undef Foam_PstreamBuffers_map_storage
#endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// Forward Declarations
#ifdef Foam_PstreamBuffers_dense
class bitSet;
#endif
/*---------------------------------------------------------------------------*\
Class PstreamBuffers Declaration
@ -143,6 +154,19 @@ class PstreamBuffers
// Buffer storage
#ifdef Foam_PstreamBuffers_map_storage
//- Send buffers (sparse)
Map<DynamicList<char>> sendBuffers_;
//- Receive buffers (sparse)
Map<DynamicList<char>> recvBuffers_;
//- Current read positions within recvBuffers_ (sparse)
Map<label> recvPositions_;
#else
//- Send buffers. Size is nProcs()
List<DynamicList<char>> sendBuffers_;
@ -152,6 +176,8 @@ class PstreamBuffers
//- Current read positions within recvBuffers_. Size is nProcs()
labelList recvPositions_;
#endif
// Private Member Functions
@ -160,12 +186,17 @@ class PstreamBuffers
void finalExchange
(
const bool wait,
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes
#else
labelList& recvSizes
#endif
);
//- Mark sends as done.
// Only exchange sizes using the sendProcs/recvProcs subset
// (nonBlocking comms).
#ifdef Foam_PstreamBuffers_dense
void finalExchange
(
const labelUList& sendProcs,
@ -173,6 +204,7 @@ class PstreamBuffers
const bool wait,
labelList& recvSizes
);
#endif
//- For all-to-one or one-to-all
void finalExchangeGatherScatter
@ -180,7 +212,11 @@ class PstreamBuffers
const bool isGather,
const bool wait,
const bool needSizes, // If recvSizes needed or scratch
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes
#else
labelList& recvSizes
#endif
);
@ -340,7 +376,7 @@ public:
//- Maximum receive size, excluding the specified processor rank
//- Must call finishedSends() or other finished.. method first!
label maxNonLocalRecvCount(const label proci) const;
label maxNonLocalRecvCount(const label excludeProci) const;
//- Number of unconsumed receive bytes for the specified processor.
//- Must call finishedSends() or other finished.. method first!
@ -392,7 +428,15 @@ public:
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedSends(labelList& recvSizes, const bool wait = true);
void finishedSends
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait = true
);
// Functions with restricted neighbours
@ -406,12 +450,14 @@ public:
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
#ifdef Foam_PstreamBuffers_dense
void finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool wait = true
);
#endif
//- Mark sends as done using subset of send/recv ranks
//- to exchange data on. Recovers the sizes (bytes) received.
@ -424,6 +470,7 @@ public:
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
#ifdef Foam_PstreamBuffers_dense
void finishedSends
(
const labelUList& sendProcs,
@ -431,6 +478,7 @@ public:
labelList& recvSizes,
const bool wait = true
);
#endif
//- A caching version that uses a limited send/recv connectivity.
//
@ -443,6 +491,7 @@ public:
// \return True if the send/recv connectivity changed
//
// \warning currently only valid for nonBlocking comms.
#ifdef Foam_PstreamBuffers_dense
bool finishedSends
(
bitSet& sendConnections,
@ -450,6 +499,7 @@ public:
DynamicList<label>& recvProcs,
const bool wait = true
);
#endif
//- Mark sends as done using subset of send/recv ranks
//- and recover the sizes (bytes) received.
@ -480,7 +530,11 @@ public:
void finishedNeighbourSends
(
const labelUList& neighProcs,
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait = true
);
@ -505,7 +559,15 @@ public:
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedGathers(labelList& recvSizes, const bool wait = true);
void finishedGathers
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait = true
);
//- Mark all sends to sub-procs as done.
//
@ -525,7 +587,15 @@ public:
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedScatters(labelList& recvSizes, const bool wait = true);
void finishedScatters
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait = true
);
};

View File

@ -43,7 +43,12 @@ Foam::zoneDistribute::zoneDistribute(const fvMesh& mesh)
MeshObject<fvMesh, Foam::TopologicalMeshObject, zoneDistribute>(mesh),
stencil_(zoneCPCStencil::New(mesh)),
globalNumbering_(stencil_.globalNumbering()),
#ifdef Foam_PstreamBuffers_dense
send_(UPstream::nProcs()),
#else
// Default map sizing (as per PstreamBuffers)
send_(),
#endif
pBufs_(UPstream::commsTypes::nonBlocking)
{
// Don't clear storage on persistent buffer
@ -90,6 +95,7 @@ void Foam::zoneDistribute::setUpCommforZone
if (UPstream::parRun())
{
#ifdef Foam_PstreamBuffers_dense
List<labelHashSet> needed(UPstream::nProcs());
// Bin according to originating (sending) processor
@ -136,6 +142,61 @@ void Foam::zoneDistribute::setUpCommforZone
fromProc >> send_[proci];
}
}
#else
forAllIters(send_, iter)
{
iter.val().clear();
}
// Bin according to originating (sending) processor
for (const label celli : stencil.needsComm())
{
if (zone[celli])
{
for (const label gblIdx : stencil_[celli])
{
const label proci = globalNumbering_.whichProcID(gblIdx);
if (proci != Pstream::myProcNo())
{
send_(proci).insert(gblIdx);
}
}
}
}
// Stream the send data into PstreamBuffers,
pBufs_.clear();
forAllIters(send_, iter)
{
const label proci = iter.key();
auto& indices = iter.val();
if (proci != UPstream::myProcNo() && !indices.empty())
{
// Serialize as List
UOPstream toProc(proci, pBufs_);
toProc << indices;
}
// Clear out old contents
indices.clear();
}
pBufs_.finishedSends();
for (const int proci : pBufs_.allProcs())
{
if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci))
{
UIPstream fromProc(proci, pBufs_);
fromProc >> send_(proci);
}
}
#endif
}
}

View File

@ -90,6 +90,8 @@ class zoneDistribute
//- Global number into index of cells/faces
const globalIndex& globalNumbering_;
#ifdef Foam_PstreamBuffers_dense
//- Global cell/face index to send for processor-to-processor comms
List<labelList> send_;
@ -102,6 +104,14 @@ class zoneDistribute
//- Parallel [cache]: recv data from these ranks
DynamicList<label> recvProcs_;
#else
//- Per proc the global cell/face index to send for
//- processor-to-processor comms
Map<labelHashSet> send_;
#endif
//- Persistent set of exchange buffers
PstreamBuffers pBufs_;

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2019-2020 DLR
Copyright (C) 2020-2022 OpenCFD Ltd.
Copyright (C) 2020-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -161,6 +161,8 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
if (UPstream::parRun())
{
#ifdef Foam_PstreamBuffers_dense
if (sendConnections_.empty())
{
WarningInFunction
@ -208,6 +210,48 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
neiValues += tmpValues;
}
}
#else
pBufs_.clear();
forAllConstIters(send_, iter)
{
const label proci = iter.key();
const auto& indices = iter.val();
if (proci != UPstream::myProcNo() && !indices.empty())
{
// Serialize as Map
Map<Type> sendValues(2*indices.size());
for (const label sendIdx : indices)
{
sendValues.insert
(
sendIdx,
getLocalValue(phi, globalNumbering_.toLocal(sendIdx))
);
}
UOPstream toProc(proci, pBufs_);
toProc << sendValues;
}
}
pBufs_.finishedSends();
for (const int proci : pBufs_.allProcs())
{
if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci))
{
UIPstream fromProc(proci, pBufs_);
Map<Type> tmpValues(fromProc);
neiValues += tmpValues;
}
}
#endif
}
return neiValues;