ENH: skip zero data sends via PstreamBuffers

- in several cases can use the PstreamBuffers recvDataCount(proci)
  on the receiving part of the logic
This commit is contained in:
Mark Olesen 2023-02-14 20:50:53 +01:00
parent fb69a54bc3
commit f215ad15d1
11 changed files with 210 additions and 150 deletions

View File

@ -487,6 +487,18 @@ void Foam::globalPoints::sendPatchPoints
const polyBoundaryMesh& patches = mesh_.boundaryMesh();
const labelPairList& patchInfo = globalTransforms_.patchTransformSign();
// Information to send:
// The patch face
DynamicList<label> patchFaces;
// Index in patch face
DynamicList<label> indexInFace;
// All information I currently hold about the patchPoint
DynamicList<labelPairList> allInfo;
forAll(patches, patchi)
{
const polyPatch& pp = patches[patchi];
@ -500,17 +512,17 @@ void Foam::globalPoints::sendPatchPoints
&& (mergeSeparated || patchInfo[patchi].first() == -1)
)
{
const processorPolyPatch& procPatch =
refCast<const processorPolyPatch>(pp);
const auto& procPatch = refCast<const processorPolyPatch>(pp);
const label nbrProci = procPatch.neighbProcNo();
// Information to send:
// patch face
DynamicList<label> patchFaces(pp.nPoints());
// index in patch face
DynamicList<label> indexInFace(pp.nPoints());
// all information I currently hold about this patchPoint
DynamicList<labelPairList> allInfo(pp.nPoints());
patchFaces.clear();
patchFaces.reserve(pp.nPoints());
indexInFace.clear();
indexInFace.reserve(pp.nPoints());
allInfo.clear();
allInfo.reserve(pp.nPoints());
// Now collect information on all points mentioned in
// changedPoints. Note that these points only should occur on
@ -548,16 +560,20 @@ void Foam::globalPoints::sendPatchPoints
}
}
// Send to neighbour
if (debug)
{
Pout<< " Sending from " << pp.name() << " to "
<< procPatch.neighbProcNo() << " point information:"
<< patchFaces.size() << endl;
}
UOPstream toNeighbour(procPatch.neighbProcNo(), pBufs);
toNeighbour << patchFaces << indexInFace << allInfo;
if (!patchFaces.empty())
{
// Send to neighbour
if (debug)
{
Pout<< " Sending from " << pp.name() << " to proc:"
<< nbrProci << " point information:"
<< patchFaces.size() << endl;
}
UOPstream toNeighbour(nbrProci, pBufs);
toNeighbour << patchFaces << indexInFace << allInfo;
}
}
}
}
@ -594,15 +610,20 @@ void Foam::globalPoints::receivePatchPoints
&& (mergeSeparated || patchInfo[patchi].first() == -1)
)
{
const processorPolyPatch& procPatch =
refCast<const processorPolyPatch>(pp);
const auto& procPatch = refCast<const processorPolyPatch>(pp);
const label nbrProci = procPatch.neighbProcNo();
if (!pBufs.recvDataCount(nbrProci))
{
continue;
}
labelList patchFaces;
labelList indexInFace;
List<labelPairList> nbrInfo;
{
UIPstream fromNeighbour(procPatch.neighbProcNo(), pBufs);
UIPstream fromNeighbour(nbrProci, pBufs);
fromNeighbour >> patchFaces >> indexInFace >> nbrInfo;
}
@ -610,7 +631,7 @@ void Foam::globalPoints::receivePatchPoints
{
Pout<< " On " << pp.name()
<< " Received from "
<< procPatch.neighbProcNo() << " point information:"
<< nbrProci << " point information:"
<< patchFaces.size() << endl;
}

View File

@ -140,6 +140,8 @@ void Foam::syncTools::syncPointMap
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
neighbProcs.append(nbrProci);
// Get data per patchPoint in neighbouring point numbers.
const labelList& meshPts = procPatch.meshPoints();
@ -159,15 +161,18 @@ void Foam::syncTools::syncPointMap
}
}
neighbProcs.append(nbrProci);
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
if (!patchInfo.empty())
{
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
}
}
}
// Limit exchange to involved procs
pBufs.finishedNeighbourSends(neighbProcs);
// Receive and combine.
for (const polyPatch& pp : patches)
{
@ -178,8 +183,16 @@ void Foam::syncTools::syncPointMap
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
UIPstream fromNbr(nbrProci, pBufs);
Map<T> nbrPatchInfo(fromNbr);
if (!pBufs.recvDataCount(nbrProci))
{
continue;
}
Map<T> nbrPatchInfo(0);
{
UIPstream fromNbr(nbrProci, pBufs);
fromNbr >> nbrPatchInfo;
}
// Transform
top(procPatch, nbrPatchInfo);
@ -383,6 +396,8 @@ void Foam::syncTools::syncEdgeMap
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
neighbProcs.append(nbrProci);
// Get data per patch edge in neighbouring edge.
const edgeList& edges = procPatch.edges();
@ -404,9 +419,11 @@ void Foam::syncTools::syncEdgeMap
}
}
neighbProcs.append(nbrProci);
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
if (!patchInfo.empty())
{
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
}
}
}
@ -422,10 +439,16 @@ void Foam::syncTools::syncEdgeMap
if (ppp && pp.nEdges())
{
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
EdgeMap<T> nbrPatchInfo;
if (!pBufs.recvDataCount(nbrProci))
{
UIPstream fromNbr(procPatch.neighbProcNo(), pBufs);
continue;
}
EdgeMap<T> nbrPatchInfo(0);
{
UIPstream fromNbr(nbrProci, pBufs);
fromNbr >> nbrPatchInfo;
}
@ -1115,6 +1138,8 @@ void Foam::syncTools::syncBoundaryFaceList
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
neighbProcs.append(nbrProci);
const SubList<T> fld
(
faceValues,
@ -1122,7 +1147,6 @@ void Foam::syncTools::syncBoundaryFaceList
pp.start()-boundaryOffset
);
neighbProcs.append(nbrProci);
UOPstream toNbr(nbrProci, pBufs);
toNbr << fld;
}
@ -1140,11 +1164,13 @@ void Foam::syncTools::syncBoundaryFaceList
if (ppp && pp.size())
{
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
List<T> recvFld(pp.size());
UIPstream fromNbr(procPatch.neighbProcNo(), pBufs);
fromNbr >> recvFld;
List<T> recvFld;
{
UIPstream fromNbr(nbrProci, pBufs);
fromNbr >> recvFld;
}
top(procPatch, recvFld);

View File

@ -318,8 +318,8 @@ void Foam::processorFaPatch::updateMesh(PstreamBuffers& pBufs)
if (Pstream::parRun())
{
labelList nbrPatchEdge(nPoints());
labelList nbrIndexInEdge(nPoints());
labelList nbrPatchEdge;
labelList nbrIndexInEdge;
{
// Note cannot predict exact size since edgeList not (yet) sent as

View File

@ -185,17 +185,13 @@ void Foam::fvMeshSubset::doCoupledPatches
{
const label nbrProci = procPatch->neighbProcNo();
UOPstream toNeighbour(nbrProci, pBufs);
if (!nCellsUsingFace.empty())
{
UOPstream toNeighbour(nbrProci, pBufs);
toNeighbour <<
SubList<label>(nCellsUsingFace, pp.size(), pp.start());
}
else
{
toNeighbour << labelList();
}
}
}
@ -210,6 +206,11 @@ void Foam::fvMeshSubset::doCoupledPatches
{
const label nbrProci = procPatch->neighbProcNo();
if (!pBufs.recvDataCount(nbrProci))
{
continue;
}
UIPstream fromNeighbour(nbrProci, pBufs);
const labelList nbrList(fromNeighbour);

View File

@ -246,11 +246,13 @@ void Foam::RecycleInteraction<CloudType>::postEvolve()
pBufs.finishedSends();
if (!returnReduceOr(pBufs.hasRecvData()))
{
// No parcels to recycle
return;
}
// Not looping, so no early exit needed
//
// if (!returnReduceOr(pBufs.hasRecvData()))
// {
// // No parcels to recycle
// return;
// }
// Retrieve from receive buffers
for (const int proci : pBufs.allProcs())

View File

@ -74,20 +74,28 @@ void Foam::advancingFrontAMI::distributePatches
List<labelList>& faceIDs
) const
{
faces.resize_nocopy(Pstream::nProcs());
points.resize_nocopy(Pstream::nProcs());
faceIDs.resize_nocopy(Pstream::nProcs());
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
for (const int domain : Pstream::allProcs())
{
const labelList& sendElems = map.subMap()[domain];
if (domain != Pstream::myProcNo() && sendElems.size())
if (sendElems.empty())
{
// Safety
faces[domain].clear();
points[domain].clear();
faceIDs[domain].clear();
}
else
{
faceList subFaces(UIndirectList<face>(pp, sendElems));
primitivePatch subPatch
(
SubList<face>(subFaces),
pp.points()
);
primitivePatch subPatch(SubList<face>(subFaces), pp.points());
if (debug & 2)
{
@ -95,42 +103,28 @@ void Foam::advancingFrontAMI::distributePatches
<< " sending faces " << subPatch.faceCentres() << endl;
}
UOPstream toDomain(domain, pBufs);
toDomain
<< subPatch.localFaces() << subPatch.localPoints()
<< gi.toGlobal(sendElems);
if (domain == Pstream::myProcNo())
{
// Do send/receive for myself
faces[domain] = subPatch.localFaces();
points[domain] = subPatch.localPoints();
faceIDs[domain] = gi.toGlobal(sendElems);
}
else
{
// Normal send
UOPstream str(domain, pBufs);
str
<< subPatch.localFaces()
<< subPatch.localPoints()
<< gi.toGlobal(sendElems);
}
}
}
// Start receiving
pBufs.finishedSends();
faces.setSize(Pstream::nProcs());
points.setSize(Pstream::nProcs());
faceIDs.setSize(Pstream::nProcs());
{
// Set up 'send' to myself
const labelList& sendElems = map.subMap()[Pstream::myProcNo()];
faceList subFaces(UIndirectList<face>(pp, sendElems));
primitivePatch subPatch
(
SubList<face>(subFaces),
pp.points()
);
// Receive
if (debug & 2)
{
Pout<< "distributePatches: to processor " << Pstream::myProcNo()
<< " sending faces " << subPatch.faceCentres() << endl;
}
faces[Pstream::myProcNo()] = subPatch.localFaces();
points[Pstream::myProcNo()] = subPatch.localPoints();
faceIDs[Pstream::myProcNo()] = gi.toGlobal(sendElems);
}
// Consume
for (const int domain : Pstream::allProcs())
{
@ -138,9 +132,9 @@ void Foam::advancingFrontAMI::distributePatches
if (domain != Pstream::myProcNo() && recvElems.size())
{
UIPstream str(domain, pBufs);
UIPstream is(domain, pBufs);
str >> faces[domain]
is >> faces[domain]
>> points[domain]
>> faceIDs[domain];
}

View File

@ -537,6 +537,8 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
const auto& procPatch =
refCast<const processorPolyPatch>(mesh_.boundaryMesh()[patchi]);
const label nbrProci = procPatch.neighbProcNo();
// Allocate buffers
label nSendFaces;
labelList sendFaces(procPatch.size());
@ -561,45 +563,50 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
sendFacesInfo
);
if (debug & 2)
if (nSendFaces)
{
Pout<< " Processor patch " << patchi << ' ' << procPatch.name()
<< " communicating with " << procPatch.neighbProcNo()
<< " Sending:" << nSendFaces
<< endl;
}
if (debug & 2)
{
Pout<< " Processor patch " << patchi << ' ' << procPatch.name()
<< " send:" << nSendFaces << " to proc:" << nbrProci
<< endl;
}
UOPstream toNeighbour(procPatch.neighbProcNo(), pBufs_);
//writeFaces(nSendFaces, sendFaces, sendFacesInfo, toNeighbour);
toNeighbour
<< SubList<label>(sendFaces, nSendFaces)
<< SubList<Type>(sendFacesInfo, nSendFaces);
UOPstream os(nbrProci, pBufs_);
os
<< SubList<label>(sendFaces, nSendFaces)
<< SubList<Type>(sendFacesInfo, nSendFaces);
}
}
// Finished sends
pBufs_.finishedNeighbourSends(neighbourProcs);
for (const label patchi : procPatches)
{
const auto& procPatch =
refCast<const processorPolyPatch>(mesh_.boundaryMesh()[patchi]);
// Allocate buffers
const label nbrProci = procPatch.neighbProcNo();
if (!pBufs_.recvDataCount(nbrProci))
{
continue;
}
labelList receiveFaces;
List<Type> receiveFacesInfo;
{
UIPstream fromNeighbour(procPatch.neighbProcNo(), pBufs_);
fromNeighbour >> receiveFaces >> receiveFacesInfo;
UIPstream is(nbrProci, pBufs_);
is >> receiveFaces >> receiveFacesInfo;
}
if (debug & 2)
{
Pout<< " Processor patch " << patchi << ' ' << procPatch.name()
<< " communicating with " << procPatch.neighbProcNo()
<< " Receiving:" << receiveFaces.size()
<< endl;
<< " recv:" << receiveFaces.size() << " from proci:"
<< nbrProci << endl;
}
// Apply transform to received data for non-parallel planes

View File

@ -325,6 +325,8 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
const auto& procPatch =
refCast<const processorPolyPatch>(mesh_.boundaryMesh()[patchi]);
const label nbrProci = procPatch.neighbProcNo();
patchInfo.clear();
patchInfo.reserve(procPatch.nPoints());
thisPoints.clear();
@ -348,21 +350,25 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
// Adapt for leaving domain
leaveDomain(procPatch, thisPoints, patchInfo);
//if (debug)
//{
// Pout<< "Processor patch " << patchi << ' ' << procPatch.name()
// << " communicating with " << procPatch.neighbProcNo()
// << " Sending:" << patchInfo.size() << endl;
//}
if (patchInfo.size())
{
//if (debug & 2)
//{
// Pout<< "Processor patch " << patchi << ' ' << procPatch.name()
// << " send:" << patchInfo.size()
// << " to proc:" << nbrProci << endl;
//}
UOPstream toNeighbour(procPatch.neighbProcNo(), pBufs_);
toNeighbour << nbrPoints << patchInfo;
UOPstream os(nbrProci, pBufs_);
os << nbrPoints << patchInfo;
}
}
// Finished sends
pBufs_.finishedNeighbourSends(neighbourProcs);
//
// 2. Receive all point info on processor patches.
//
@ -372,19 +378,25 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
const auto& procPatch =
refCast<const processorPolyPatch>(mesh_.boundaryMesh()[patchi]);
List<Type> patchInfo;
labelList patchPoints;
const label nbrProci = procPatch.neighbProcNo();
if (!pBufs_.recvDataCount(nbrProci))
{
UIPstream fromNeighbour(procPatch.neighbProcNo(), pBufs_);
fromNeighbour >> patchPoints >> patchInfo;
continue;
}
//if (debug)
labelList patchPoints;
List<Type> patchInfo;
{
UIPstream is(nbrProci, pBufs_);
is >> patchPoints >> patchInfo;
}
//if (debug & 2)
//{
// Pout<< "Processor patch " << patchi << ' ' << procPatch.name()
// << " communicating with " << procPatch.neighbProcNo()
// << " Received:" << patchInfo.size() << endl;
// << " recv:" << patchInfo.size() << " from proc:"
// << nbrProci << endl;
//}
// Apply transform to received data for non-parallel planes

View File

@ -110,7 +110,7 @@ void Foam::processorLODs::box::setRefineFlags
// Identify src boxes that can be refined and send to all remote procs
for (const int proci : Pstream::allProcs())
{
if (proci != Pstream::myProcNo())
if (proci != Pstream::myProcNo() && !boxes_[proci].empty())
{
UOPstream toProc(proci, pBufs);
toProc << nObjectsOfType_ << boxes_[proci] << newToOld_[proci];
@ -123,10 +123,8 @@ void Foam::processorLODs::box::setRefineFlags
// src boxes can/should be refined
for (const int proci : Pstream::allProcs())
{
if (proci == Pstream::myProcNo())
if (proci == Pstream::myProcNo() || !pBufs.recvDataCount(proci))
{
// Not refining boxes I send to myself - will be sending all local
// elements
continue;
}

View File

@ -355,27 +355,26 @@ void Foam::cellCellStencils::inverseDistance::markPatchesAsHoles
{
if (procI != Pstream::myProcNo())
{
//const treeBoundBox& srcBb = srcBbs[procI];
const treeBoundBox& srcPatchBb = srcPatchBbs[procI];
const treeBoundBox& tgtPatchBb = tgtPatchBbs[Pstream::myProcNo()];
if (srcPatchBb.overlaps(tgtPatchBb))
{
UIPstream is(procI, pBufs);
{
treeBoundBox receivedBb(is);
if (srcPatchBb != receivedBb)
{
FatalErrorInFunction
<< "proc:" << procI
<< " srcPatchBb:" << srcPatchBb
<< " receivedBb:" << receivedBb
<< exit(FatalError);
}
}
const treeBoundBox receivedBb(is);
const labelVector zoneDivs(is);
const PackedList<2> srcPatchTypes(is);
// Verify validity
if (srcPatchBb != receivedBb)
{
FatalErrorInFunction
<< "proc:" << procI
<< " srcPatchBb:" << srcPatchBb
<< " receivedBb:" << receivedBb
<< exit(FatalError);
}
forAll(tgtCellMap, tgtCelli)
{
label celli = tgtCellMap[tgtCelli];

View File

@ -273,20 +273,20 @@ void Foam::cellCellStencils::trackingInverseDistance::markPatchesAsHoles
if (srcPatchBb.overlaps(tgtPatchBb))
{
UIPstream is(procI, pBufs);
{
treeBoundBox receivedBb(is);
if (srcPatchBb != receivedBb)
{
FatalErrorInFunction
<< "proc:" << procI
<< " srcPatchBb:" << srcPatchBb
<< " receivedBb:" << receivedBb
<< exit(FatalError);
}
}
const treeBoundBox receivedBb(is);
const labelVector srcDivs(is);
const PackedList<2> srcPatchTypes(is);
// Verify validity
if (srcPatchBb != receivedBb)
{
FatalErrorInFunction
<< "proc:" << procI
<< " srcPatchBb:" << srcPatchBb
<< " receivedBb:" << receivedBb
<< exit(FatalError);
}
forAll(tgtCellMap, tgtCelli)
{
label celli = tgtCellMap[tgtCelli];