ENH: improve send/recv robustness in the presence of processorCyclic (#2814)

- the changes introduced in f215ad15d1 aim to reduce unnecessary
  point-to-point communication. However, if there are also
  processorCyclic boundaries involved, there are multiple connections
  between any two processors, so simply skipping empty sends will cause
  synchronization problems.

  Eg,

    On the send side:
        patch0to1_a is zero (doesn't send) and patch0to1_b does send
        (to the same processor).

    On the receive side:
        patch1to0_a receives the data intended for patch1to0_b !

  Remedy
  ======
     Simply stream all of send data into PstreamBuffers
     (regardless if empty or non-empty) but track the sends
     as a bit operation: empty (0) or non-empty (1)

     Reset the buffer slots that were only sent empty data.
     This adds an additional local overhead but avoids communication
     as much as possible.
This commit is contained in:
Mark Olesen 2023-06-21 20:38:44 +02:00 committed by Andrew Heather
parent 0411a75e24
commit 3dee3438d5
6 changed files with 187 additions and 66 deletions

View File

@ -487,6 +487,9 @@ void Foam::globalPoints::sendPatchPoints
const polyBoundaryMesh& patches = mesh_.boundaryMesh();
const labelPairList& patchInfo = globalTransforms_.patchTransformSign();
// Reset send/recv information
pBufs.clear();
// Information to send:
// The patch face
@ -499,6 +502,19 @@ void Foam::globalPoints::sendPatchPoints
DynamicList<labelPairList> allInfo;
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// Has non-zero data sent
Map<int> isActiveSend(0);
if (UPstream::parRun())
{
isActiveSend.resize(2*min(patches.size(),pBufs.nProcs()));
}
forAll(patches, patchi)
{
const polyPatch& pp = patches[patchi];
@ -508,7 +524,7 @@ void Foam::globalPoints::sendPatchPoints
if
(
(Pstream::parRun() && isA<processorPolyPatch>(pp))
(UPstream::parRun() && isA<processorPolyPatch>(pp))
&& (mergeSeparated || patchInfo[patchi].first() == -1)
)
{
@ -561,21 +577,32 @@ void Foam::globalPoints::sendPatchPoints
}
if (!patchFaces.empty())
// Send to neighbour
{
// Send to neighbour
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchFaces << indexInFace << allInfo;
// Record if send is required (data are non-zero)
isActiveSend(nbrProci) |= int(!patchFaces.empty());
if (debug)
{
Pout<< " Sending from " << pp.name() << " to proc:"
Pout<< "Sending from " << pp.name() << " to proc:"
<< nbrProci << " point information:"
<< patchFaces.size() << endl;
}
UOPstream toNeighbour(nbrProci, pBufs);
toNeighbour << patchFaces << indexInFace << allInfo;
}
}
}
// Eliminate unnecessary sends
forAllConstIters(isActiveSend, iter)
{
if (!iter.val())
{
pBufs.clearSend(iter.key());
}
}
}
@ -606,7 +633,7 @@ void Foam::globalPoints::receivePatchPoints
if
(
(Pstream::parRun() && isA<processorPolyPatch>(pp))
(UPstream::parRun() && isA<processorPolyPatch>(pp))
&& (mergeSeparated || patchInfo[patchi].first() == -1)
)
{
@ -615,6 +642,7 @@ void Foam::globalPoints::receivePatchPoints
if (!pBufs.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}
@ -623,8 +651,8 @@ void Foam::globalPoints::receivePatchPoints
List<labelPairList> nbrInfo;
{
UIPstream fromNeighbour(nbrProci, pBufs);
fromNeighbour >> patchFaces >> indexInFace >> nbrInfo;
UIPstream fromNbr(nbrProci, pBufs);
fromNbr >> patchFaces >> indexInFace >> nbrInfo;
}
if (debug)
@ -929,8 +957,6 @@ void Foam::globalPoints::calculateSharedPoints
// Do one exchange iteration to get neighbour points.
{
pBufs.clear();
sendPatchPoints
(
mergeSeparated,
@ -962,8 +988,6 @@ void Foam::globalPoints::calculateSharedPoints
do
{
pBufs.clear();
sendPatchPoints
(
mergeSeparated,

View File

@ -130,7 +130,15 @@ void Foam::syncTools::syncPointMap
DynamicList<label> neighbProcs;
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
// Send
// Sample and send.
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent
Map<int> isActiveSend(2*min(patches.size(),pBufs.nProcs()));
for (const polyPatch& pp : patches)
{
const auto* ppp = isA<processorPolyPatch>(pp);
@ -140,10 +148,10 @@ void Foam::syncTools::syncPointMap
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
neighbProcs.push_back(nbrProci);
// Neighbour connectivity
neighbProcs.push_uniq(nbrProci);
// Get data per patchPoint in neighbouring point numbers.
const labelList& meshPts = procPatch.meshPoints();
const labelList& nbrPts = procPatch.neighbPoints();
@ -161,14 +169,27 @@ void Foam::syncTools::syncPointMap
}
}
if (!patchInfo.empty())
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
// Record if send is required (non-empty data)
isActiveSend(nbrProci) |= int(!patchInfo.empty());
}
}
}
// Eliminate unnecessary sends
forAllConstIters(isActiveSend, iter)
{
if (!iter.val())
{
pBufs.clearSend(iter.key());
}
}
// Limit exchange to involved procs
pBufs.finishedNeighbourSends(neighbProcs);
@ -185,6 +206,7 @@ void Foam::syncTools::syncPointMap
if (!pBufs.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}
@ -386,7 +408,15 @@ void Foam::syncTools::syncEdgeMap
DynamicList<label> neighbProcs;
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
// Send
// Sample and send.
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent
Map<int> isActiveSend(2*min(patches.size(),pBufs.nProcs()));
for (const polyPatch& pp : patches)
{
const auto* ppp = isA<processorPolyPatch>(pp);
@ -396,10 +426,10 @@ void Foam::syncTools::syncEdgeMap
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
neighbProcs.push_back(nbrProci);
// Neighbour connectivity
neighbProcs.push_uniq(nbrProci);
// Get data per patch edge in neighbouring edge.
const edgeList& edges = procPatch.edges();
const labelList& meshPts = procPatch.meshPoints();
const labelList& nbrPts = procPatch.neighbPoints();
@ -419,14 +449,26 @@ void Foam::syncTools::syncEdgeMap
}
}
if (!patchInfo.empty())
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
// Record if send is required (non-empty data)
isActiveSend(nbrProci) |= int(!patchInfo.empty());
}
}
}
// Eliminate unnecessary sends
forAllConstIters(isActiveSend, iter)
{
if (!iter.val())
{
pBufs.clearSend(iter.key());
}
}
// Limit exchange to involved procs
pBufs.finishedNeighbourSends(neighbProcs);
@ -443,6 +485,7 @@ void Foam::syncTools::syncEdgeMap
if (!pBufs.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}
@ -1138,7 +1181,8 @@ void Foam::syncTools::syncBoundaryFaceList
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
neighbProcs.push_back(nbrProci);
// Neighbour connectivity
neighbProcs.push_uniq(nbrProci);
const SubList<T> fld
(

View File

@ -172,24 +172,23 @@ void Foam::fvMeshSubset::doCoupledPatches
label nUncoupled = 0;
if (syncPar && Pstream::parRun())
if (syncPar && UPstream::parRun())
{
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
// Send face usage across processor patches
for (const polyPatch& pp : oldPatches)
if (!nCellsUsingFace.empty())
{
const auto* procPatch = isA<processorPolyPatch>(pp);
if (procPatch)
for (const polyPatch& pp : oldPatches)
{
const label nbrProci = procPatch->neighbProcNo();
const auto* procPatch = isA<processorPolyPatch>(pp);
if (!nCellsUsingFace.empty())
if (procPatch)
{
UOPstream toNeighbour(nbrProci, pBufs);
const label nbrProci = procPatch->neighbProcNo();
toNeighbour <<
UOPstream toNbr(nbrProci, pBufs);
toNbr <<
SubList<label>(nCellsUsingFace, pp.size(), pp.start());
}
}
@ -208,19 +207,18 @@ void Foam::fvMeshSubset::doCoupledPatches
if (!pBufs.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}
UIPstream fromNeighbour(nbrProci, pBufs);
const labelList nbrList(fromNeighbour);
// Combine with this side.
if (!nCellsUsingFace.empty())
labelList nbrCellsUsingFace;
{
const labelList& nbrCellsUsingFace(nbrList);
UIPstream fromNbr(nbrProci, pBufs);
fromNbr >> nbrCellsUsingFace;
}
if (!nCellsUsingFace.empty() && !nbrCellsUsingFace.empty())
{
// Combine with this side.
forAll(pp, i)

View File

@ -532,6 +532,19 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
// Reset buffers
pBufs_.clear();
// Information to send:
DynamicList<Type> sendFacesInfo;
DynamicList<label> sendFaces;
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent
Map<int> isActiveSend(2*neighbourProcs.size());
for (const label patchi : procPatches)
{
const auto& procPatch =
@ -539,13 +552,12 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
const label nbrProci = procPatch.neighbProcNo();
// Allocate buffers
label nSendFaces;
labelList sendFaces(procPatch.size());
List<Type> sendFacesInfo(procPatch.size());
// Resize buffers
sendFaces.resize_nocopy(procPatch.size());
sendFacesInfo.resize_nocopy(procPatch.size());
// Determine which faces changed on current patch
nSendFaces = getChangedPatchFaces
const label nSendFaces = getChangedPatchFaces
(
procPatch,
0,
@ -554,6 +566,10 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
sendFacesInfo
);
// Shrink
sendFaces.resize(nSendFaces);
sendFacesInfo.resize(nSendFaces);
// Adapt wallInfo for leaving domain
leaveDomain
(
@ -563,19 +579,29 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
sendFacesInfo
);
if (nSendFaces)
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs_);
toNbr << sendFaces << sendFacesInfo;
// Record if send is required (non-empty data)
isActiveSend(nbrProci) |= int(!sendFaces.empty());
if (debug & 2)
{
Pout<< " Processor patch " << patchi << ' ' << procPatch.name()
<< " send:" << nSendFaces << " to proc:" << nbrProci
<< " send:" << sendFaces.size() << " to proc:" << nbrProci
<< endl;
}
}
}
UOPstream os(nbrProci, pBufs_);
os
<< SubList<label>(sendFaces, nSendFaces)
<< SubList<Type>(sendFacesInfo, nSendFaces);
// Eliminate unnecessary sends
forAllConstIters(isActiveSend, iter)
{
if (!iter.val())
{
pBufs_.clearSend(iter.key());
}
}
@ -591,6 +617,7 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
if (!pBufs_.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}
@ -602,6 +629,8 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
is >> receiveFaces >> receiveFacesInfo;
}
const label nReceiveFaces = receiveFaces.size();
if (debug & 2)
{
Pout<< " Processor patch " << patchi << ' ' << procPatch.name()
@ -615,7 +644,7 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
transform
(
procPatch.forwardT(),
receiveFaces.size(),
nReceiveFaces,
receiveFacesInfo
);
}
@ -624,7 +653,7 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
enterDomain
(
procPatch,
receiveFaces.size(),
nReceiveFaces,
receiveFaces,
receiveFacesInfo
);
@ -633,7 +662,7 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
mergeFaceInfo
(
procPatch,
receiveFaces.size(),
nReceiveFaces,
receiveFaces,
receiveFacesInfo
);
@ -656,12 +685,11 @@ void Foam::FaceCellWave<Type, TrackingData>::handleCyclicPatches()
const auto& nbrPatch = cycPatch.neighbPatch();
// Allocate buffers
label nReceiveFaces;
labelList receiveFaces(patch.size());
List<Type> receiveFacesInfo(patch.size());
// Determine which faces changed
nReceiveFaces = getChangedPatchFaces
const label nReceiveFaces = getChangedPatchFaces
(
nbrPatch,
0,

View File

@ -320,6 +320,15 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
DynamicList<label> thisPoints;
DynamicList<label> nbrPoints;
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent
Map<int> isActiveSend(2*neighbourProcs.size());
for (const label patchi : procPatches)
{
const auto& procPatch =
@ -329,8 +338,10 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
patchInfo.clear();
patchInfo.reserve(procPatch.nPoints());
thisPoints.clear();
thisPoints.reserve(procPatch.nPoints());
nbrPoints.clear();
nbrPoints.reserve(procPatch.nPoints());
@ -350,20 +361,31 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
// Adapt for leaving domain
leaveDomain(procPatch, thisPoints, patchInfo);
if (patchInfo.size())
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs_);
toNbr << nbrPoints << patchInfo;
// Record if send is required (non-empty data)
isActiveSend(nbrProci) |= int(!patchInfo.empty());
//if (debug & 2)
//{
// Pout<< "Processor patch " << patchi << ' ' << procPatch.name()
// << " send:" << patchInfo.size()
// << " to proc:" << nbrProci << endl;
//}
UOPstream os(nbrProci, pBufs_);
os << nbrPoints << patchInfo;
}
}
// Eliminate unnecessary sends
forAllConstIters(isActiveSend, iter)
{
if (!iter.val())
{
pBufs_.clearSend(iter.key());
}
}
// Finished sends
pBufs_.finishedNeighbourSends(neighbourProcs);
@ -382,6 +404,7 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
if (!pBufs_.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}

View File

@ -497,7 +497,7 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
if (Pstream::parRun())
{
DynamicList<label> neighProcs;
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
// Send
for (const label patchi : procPatchLabels_)
@ -506,8 +506,8 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
refCast<const processorPolyPatch>(patches[patchi]);
const label nbrProci = procPatch.neighbProcNo();
neighProcs.append(nbrProci);
UOPstream toNbr(nbrProci, pBufs);
// Neighbour connectivity
neighProcs.push_uniq(nbrProci);
const scalarField& pFlux = dVf.boundaryField()[patchi];
const List<label>& surfCellFacesOnProcPatch =
@ -519,6 +519,7 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
surfCellFacesOnProcPatch
);
UOPstream toNbr(nbrProci, pBufs);
toNbr << surfCellFacesOnProcPatch << dVfPatch;
}
@ -533,11 +534,14 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
refCast<const processorPolyPatch>(patches[patchi]);
const label nbrProci = procPatch.neighbProcNo();
UIPstream fromNeighb(nbrProci, pBufs);
List<label> faceIDs;
List<scalar> nbrdVfs;
fromNeighb >> faceIDs >> nbrdVfs;
{
UIPstream fromNbr(nbrProci, pBufs);
fromNbr >> faceIDs >> nbrdVfs;
}
if (returnSyncedFaces)
{
List<label> syncedFaceI(faceIDs);