ENH: consolidate PstreamBuffers reduced communication bookkeeping

- can reduce communication by only sending non-zero data (especially
  when using NBX for size exchanges), but proper synchronisation with
  multiply-connected processor/processor patches (eg, processorCyclic)
  may still require speculative sends.

  Can now setup for PstreamBuffers 'registered' sends to avoid
  ad hoc bookkeeping within the caller.
This commit is contained in:
Mark Olesen 2023-07-05 12:44:15 +02:00
parent a1e34bb251
commit f398d7b313
6 changed files with 169 additions and 156 deletions

View File

@ -55,15 +55,23 @@ static constexpr int algorithm_full_NBX = 1; // Very experimental
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
inline void Foam::PstreamBuffers::initFinalExchange()
{
// Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
clearUnregistered();
}
void Foam::PstreamBuffers::finalExchange
(
const bool wait,
labelList& recvSizes
)
{
// Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
initFinalExchange();
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
@ -140,9 +148,7 @@ void Foam::PstreamBuffers::finalExchange
labelList& recvSizes
)
{
// Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
initFinalExchange();
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
@ -201,9 +207,7 @@ void Foam::PstreamBuffers::finalGatherScatter
labelList& recvSizes
)
{
// Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
initFinalExchange();
if (isGather)
{
@ -316,7 +320,7 @@ Foam::PstreamBuffers::~PstreamBuffers()
const label pos = recvPositions_[proci];
const label len = recvBuffers_[proci].size();
if (pos < len)
if (pos >= 0 && pos < len)
{
FatalErrorInFunction
<< "Message from processor " << proci
@ -382,9 +386,27 @@ void Foam::PstreamBuffers::clear()
}
void Foam::PstreamBuffers::clearUnregistered()
{
for (label proci = 0; proci < nProcs_; ++proci)
{
if (recvPositions_[proci] < 0)
{
recvPositions_[proci] = 0;
sendBuffers_[proci].clear();
}
}
}
void Foam::PstreamBuffers::clearSend(const label proci)
{
sendBuffers_[proci].clear();
if (recvPositions_[proci] < 0)
{
// Reset the unregistered flag
recvPositions_[proci] = 0;
}
}
@ -413,6 +435,30 @@ void Foam::PstreamBuffers::clearStorage()
}
void Foam::PstreamBuffers::initRegisterSend()
{
if (!finishedSendsCalled_)
{
for (label proci = 0; proci < nProcs_; ++proci)
{
sendBuffers_[proci].clear();
// Mark send buffer as 'unregistered'
recvPositions_[proci] = -1;
}
}
}
void Foam::PstreamBuffers::registerSend(const label proci, const bool toggleOn)
{
// Clear the 'unregistered' flag
if (toggleOn && recvPositions_[proci] < 0)
{
recvPositions_[proci] = 0;
}
}
bool Foam::PstreamBuffers::hasSendData() const
{
for (const DynamicList<char>& buf : sendBuffers_)

View File

@ -63,7 +63,7 @@ Description
}
\endcode
There are additional special versions of finishedSends() for
There are special versions of finishedSends() for
restricted neighbour communication as well as for special
one-to-all and all-to-one communication patterns.
For example,
@ -89,6 +89,40 @@ Description
}
\endcode
Additionally there are some situations that use speculative sends
that may not actually be required. In this case, it is possible to
mark all sends as initially \em unregistered and subsequently
mark the "real" sends as \em registered.
For example,
\code
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
pBufs.initRegisterSend();
for (const polyPatch& pp : patches)
{
const auto* ppp = isA<processorPolyPatch>(pp);
if (ppp)
{
const label nbrProci = ppp->neighbProcNo();
// Gather some patch information...
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
// The send is needed if patchInfo is non-empty
pBufs.registerSend(nbrProci, !patchInfo.empty());
}
}
// optional: pBufs.clearUnregistered();
pBufs.finishedSends();
...
\endcode
SourceFiles
PstreamBuffers.C
@ -150,11 +184,15 @@ class PstreamBuffers
List<DynamicList<char>> recvBuffers_;
//- Current read positions within recvBuffers_. Size is nProcs()
// This list is also misused for registerSend() bookkeeping
labelList recvPositions_;
// Private Member Functions
//- Clear 'unregistered' send buffers, tag as being send-ready
inline void initFinalExchange();
//- Mark all sends as having been done.
// This will start receives (nonBlocking comms).
void finalExchange
@ -381,6 +419,21 @@ public:
bool allowClearRecv(bool on) noexcept;
// Registered Sending
//- Initialise registerSend() bookkeeping by mark all send buffers
//- as 'unregistered'
// Usually called immediately after construction or clear().
void initRegisterSend();
//- Toggle an individual send buffer as 'registered'.
//- The setting is sticky (does not turn off)
void registerSend(const label proci, const bool toggleOn = true);
//- Clear any 'unregistered' send buffers.
void clearUnregistered();
// Regular Functions
//- Mark sends as done

View File

@ -487,8 +487,15 @@ void Foam::globalPoints::sendPatchPoints
const polyBoundaryMesh& patches = mesh_.boundaryMesh();
const labelPairList& patchInfo = globalTransforms_.patchTransformSign();
// Reset send/recv information
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// Reset buffers, initialize for registerSend() bookkeeping
pBufs.clear();
pBufs.initRegisterSend();
// Information to send:
@ -502,19 +509,6 @@ void Foam::globalPoints::sendPatchPoints
DynamicList<labelPairList> allInfo;
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// Has non-zero data sent
Map<int> isActiveSend(0);
if (UPstream::parRun())
{
isActiveSend.resize(2*min(patches.size(),pBufs.nProcs()));
}
forAll(patches, patchi)
{
const polyPatch& pp = patches[patchi];
@ -583,7 +577,7 @@ void Foam::globalPoints::sendPatchPoints
toNbr << patchFaces << indexInFace << allInfo;
// Record if send is required (data are non-zero)
isActiveSend(nbrProci) |= int(!patchFaces.empty());
pBufs.registerSend(nbrProci, !patchFaces.empty());
if (debug)
{
@ -595,14 +589,8 @@ void Foam::globalPoints::sendPatchPoints
}
}
// Eliminate unnecessary sends
forAllConstIters(isActiveSend, iter)
{
if (!iter.val())
{
pBufs.clearSend(iter.key());
}
}
// Discard unnecessary (unregistered) sends
pBufs.clearUnregistered();
}

View File

@ -132,14 +132,16 @@ void Foam::syncTools::syncPointMap
DynamicList<label> neighbProcs(patches.nProcessorPatches());
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
// Sample and send.
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent (index corresponding to neighbProcs)
DynamicList<bool> isActiveSend(neighbProcs.capacity());
// Initialize for registerSend() bookkeeping
pBufs.initRegisterSend();
// Sample and send.
for (const polyPatch& pp : patches)
{
@ -168,45 +170,22 @@ void Foam::syncTools::syncPointMap
}
}
const bool hasSendData = (!patchInfo.empty());
// Neighbour connectivity (push_uniq)
// - record if send is required (non-empty data)
{
label nbrIndex = neighbProcs.find(nbrProci);
if (nbrIndex < 0)
{
nbrIndex = neighbProcs.size();
neighbProcs.push_back(nbrProci);
isActiveSend.push_back(false);
}
if (hasSendData)
{
isActiveSend[nbrIndex] = true;
}
}
// Neighbour connectivity
neighbProcs.push_uniq(nbrProci);
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
// Record if send is required (data are non-zero)
pBufs.registerSend(nbrProci, !patchInfo.empty());
}
}
}
// Eliminate unnecessary sends
forAll(neighbProcs, nbrIndex)
{
if (!isActiveSend[nbrIndex])
{
pBufs.clearSend(neighbProcs[nbrIndex]);
}
}
// Limit exchange to involved procs
// Limit exchange to involved procs.
// - automatically discards unnecessary (unregistered) sends
pBufs.finishedNeighbourSends(neighbProcs);
@ -426,14 +405,16 @@ void Foam::syncTools::syncEdgeMap
DynamicList<label> neighbProcs(patches.nProcessorPatches());
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
// Sample and send.
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent (index corresponding to neighbProcs)
DynamicList<bool> isActiveSend(neighbProcs.capacity());
// Initialize for registerSend() bookkeeping
pBufs.initRegisterSend();
// Sample and send.
for (const polyPatch& pp : patches)
{
@ -465,44 +446,22 @@ void Foam::syncTools::syncEdgeMap
}
const bool hasSendData = (!patchInfo.empty());
// Neighbour connectivity (push_uniq)
// and record if send is required (non-empty data)
{
label nbrIndex = neighbProcs.find(nbrProci);
if (nbrIndex < 0)
{
nbrIndex = neighbProcs.size();
neighbProcs.push_back(nbrProci);
isActiveSend.push_back(false);
}
if (hasSendData)
{
isActiveSend[nbrIndex] = true;
}
}
// Neighbour connectivity
neighbProcs.push_uniq(nbrProci);
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
// Record if send is required (data are non-zero)
pBufs.registerSend(nbrProci, !patchInfo.empty());
}
}
}
// Eliminate unnecessary sends
forAll(neighbProcs, nbrIndex)
{
if (!isActiveSend[nbrIndex])
{
pBufs.clearSend(neighbProcs[nbrIndex]);
}
}
// Limit exchange to involved procs
// - automatically discards unnecessary (unregistered) sends
pBufs.finishedNeighbourSends(neighbProcs);

View File

@ -529,21 +529,19 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
// Which processors this processor is connected to
const labelList& neighbourProcs = pData.topology().procNeighbours();
// Reset buffers
pBufs_.clear();
// Information to send:
DynamicList<Type> sendFacesInfo;
DynamicList<label> sendFaces;
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent (index corresponding to neighbourProcs)
List<bool> isActiveSend(neighbourProcs.size(), false);
// Reset buffers, initialize for registerSend() bookkeeping
pBufs_.clear();
pBufs_.initRegisterSend();
// Information to send
DynamicList<Type> sendFacesInfo;
DynamicList<label> sendFaces;
for (const label patchi : procPatches)
{
@ -579,21 +577,14 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
sendFacesInfo
);
// Record if send is required (non-empty data)
if (!sendFaces.empty())
{
const label nbrIndex = neighbourProcs.find(nbrProci);
if (nbrIndex >= 0) // Safety check (should be unnecessary)
{
isActiveSend[nbrIndex] = true;
}
}
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs_);
toNbr << sendFaces << sendFacesInfo;
// Record if send is required (data are non-zero)
pBufs_.registerSend(nbrProci, !sendFaces.empty());
if (debug & 2)
{
Pout<< " Processor patch " << patchi << ' ' << procPatch.name()
@ -603,16 +594,8 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
}
}
// Eliminate unnecessary sends
forAll(neighbourProcs, nbrIndex)
{
if (!isActiveSend[nbrIndex])
{
pBufs_.clearSend(neighbourProcs[nbrIndex]);
}
}
// Limit exchange to involved procs
// - automatically discards unnecessary (unregistered) sends
pBufs_.finishedNeighbourSends(neighbourProcs);

View File

@ -313,21 +313,20 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
// Which processors this processor is connected to
const labelList& neighbourProcs = pData.topology().procNeighbours();
// Reset buffers
pBufs_.clear();
DynamicList<Type> patchInfo;
DynamicList<label> thisPoints;
DynamicList<label> nbrPoints;
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent (index corresponding to neighbourProcs)
List<bool> isActiveSend(neighbourProcs.size(), false);
// Reset buffers, initialize for registerSend() bookkeeping
pBufs_.clear();
pBufs_.initRegisterSend();
// Information to send
DynamicList<Type> patchInfo;
DynamicList<label> thisPoints;
DynamicList<label> nbrPoints;
for (const label patchi : procPatches)
{
@ -361,21 +360,14 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
// Adapt for leaving domain
leaveDomain(procPatch, thisPoints, patchInfo);
// Record if send is required (non-empty data)
if (!patchInfo.empty())
{
const label nbrIndex = neighbourProcs.find(nbrProci);
if (nbrIndex >= 0) // Safety check (should be unnecessary)
{
isActiveSend[nbrIndex] = true;
}
}
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs_);
toNbr << nbrPoints << patchInfo;
// Record if send is required (data are non-zero)
pBufs_.registerSend(nbrProci, !patchInfo.empty());
//if (debug & 2)
//{
// Pout<< "Processor patch " << patchi << ' ' << procPatch.name()
@ -385,16 +377,8 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
}
}
// Eliminate unnecessary sends
forAll(neighbourProcs, nbrIndex)
{
if (!isActiveSend[nbrIndex])
{
pBufs_.clearSend(neighbourProcs[nbrIndex]);
}
}
// Limit exchange to involved procs
// - automatically discards unnecessary (unregistered) sends
pBufs_.finishedNeighbourSends(neighbourProcs);