WIP: PstreamBuffers with hybrid PEX/NBX exchange

CONFIG: enable nonBlockingExchange as default (for testing)

- changes Pstream::exchangeSizes to use NBX instead of all-to-all
This commit is contained in:
Mark Olesen 2023-02-13 14:43:20 +01:00
parent d5c0852de1
commit 5cf5af9d3b
6 changed files with 14 additions and 125 deletions

View File

@ -130,7 +130,9 @@ OptimisationSwitches
// Number processors to change to tree communication // Number processors to change to tree communication
nProcsSimpleSum 0; nProcsSimpleSum 0;
// Min numProc to use non-blocking exchange algorithm (Hoeffler: NBX) // Min numProc to use non-blocking exchange algorithm (Hoeffler: NBX)
nonBlockingExchange 0; nonBlockingExchange 1;
// Use hybrid NBX/PEX for PstreamBuffers
pbufs.algorithm 0;
// MPI buffer size (bytes) // MPI buffer size (bytes)
// Can override with the MPI_BUFFER_SIZE env variable. // Can override with the MPI_BUFFER_SIZE env variable.

View File

@ -27,7 +27,6 @@ License
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "PstreamBuffers.H" #include "PstreamBuffers.H"
#include "bitSet.H"
#include "debug.H" #include "debug.H"
#include "registerSwitch.H" #include "registerSwitch.H"
@ -36,7 +35,7 @@ License
int Foam::PstreamBuffers::algorithm int Foam::PstreamBuffers::algorithm
( (
// Not really the most creative name... // Not really the most creative name...
Foam::debug::optimisationSwitch("pbufs.algorithm", -1) Foam::debug::optimisationSwitch("pbufs.algorithm", 0)
); );
registerOptSwitch registerOptSwitch
( (
@ -134,8 +133,7 @@ void Foam::PstreamBuffers::finalExchange
void Foam::PstreamBuffers::finalExchange void Foam::PstreamBuffers::finalExchange
( (
const labelUList& sendProcs, const labelUList& neighProcs,
const labelUList& recvProcs,
const bool wait, const bool wait,
labelList& recvSizes labelList& recvSizes
) )
@ -155,7 +153,7 @@ void Foam::PstreamBuffers::finalExchange
// Preserve self-send, even if not described by neighbourhood // Preserve self-send, even if not described by neighbourhood
recvSizes[UPstream::myProcNo(comm_)] = 1; recvSizes[UPstream::myProcNo(comm_)] = 1;
for (const label proci : sendProcs) for (const label proci : neighProcs)
{ {
recvSizes[proci] = 1; // Connected recvSizes[proci] = 1; // Connected
} }
@ -172,8 +170,7 @@ void Foam::PstreamBuffers::finalExchange
// PEX stage 1: exchange sizes (limited neighbourhood) // PEX stage 1: exchange sizes (limited neighbourhood)
Pstream::exchangeSizes Pstream::exchangeSizes
( (
sendProcs, neighProcs,
recvProcs,
sendBuffers_, sendBuffers_,
recvSizes, recvSizes,
tag_, tag_,
@ -626,7 +623,7 @@ void Foam::PstreamBuffers::finishedNeighbourSends
const bool wait const bool wait
) )
{ {
finalExchange(neighProcs, neighProcs, wait, recvSizes); finalExchange(neighProcs, wait, recvSizes);
} }
@ -637,71 +634,7 @@ void Foam::PstreamBuffers::finishedNeighbourSends
) )
{ {
labelList recvSizes; labelList recvSizes;
finalExchange(neighProcs, neighProcs, wait, recvSizes); finalExchange(neighProcs, wait, recvSizes);
}
bool Foam::PstreamBuffers::finishedSends
(
bitSet& sendConnections,
DynamicList<label>& sendProcs,
DynamicList<label>& recvProcs,
const bool wait
)
{
bool changed = (sendConnections.size() != nProcs());
if (changed)
{
sendConnections.resize(nProcs());
}
// Update send connections
forAll(sendBuffers_, proci)
{
if (sendConnections.set(proci, !sendBuffers_[proci].empty()))
{
// The state changed
changed = true;
}
}
UPstream::reduceOr(changed, comm_);
if (changed)
{
// Update send/recv topology
labelList recvSizes;
finishedSends(recvSizes, wait); // eg, using all-to-all
// The send ranks
sendProcs.clear();
forAll(sendBuffers_, proci)
{
if (!sendBuffers_[proci].empty())
{
sendProcs.push_back(proci);
}
}
// The recv ranks
recvProcs.clear();
forAll(recvSizes, proci)
{
if (recvSizes[proci] > 0)
{
recvProcs.push_back(proci);
}
}
}
else
{
// Use existing send/recv ranks
labelList recvSizes;
finalExchange(sendProcs, recvProcs, wait, recvSizes);
}
return changed;
} }

View File

@ -108,9 +108,6 @@ SourceFiles
namespace Foam namespace Foam
{ {
// Forward Declarations
class bitSet;
/*---------------------------------------------------------------------------*\ /*---------------------------------------------------------------------------*\
Class PstreamBuffers Declaration Class PstreamBuffers Declaration
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
@ -168,8 +165,7 @@ class PstreamBuffers
// (nonBlocking comms). // (nonBlocking comms).
void finalExchange void finalExchange
( (
const labelUList& sendProcs, const labelUList& neighProcs,
const labelUList& recvProcs,
const bool wait, const bool wait,
labelList& recvSizes labelList& recvSizes
); );
@ -435,25 +431,6 @@ public:
const bool wait = true const bool wait = true
); );
//- A caching version that uses a limited send/recv connectivity.
//
// Non-blocking mode: populates receive buffers.
// \param sendConnections on/off for sending ranks
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \return True if the send/recv connectivity changed
//
// \warning currently only valid for nonBlocking comms.
bool finishedSends
(
bitSet& sendConnections,
DynamicList<label>& sendProcs,
DynamicList<label>& recvProcs,
const bool wait = true
);
// Gather/scatter modes // Gather/scatter modes

View File

@ -109,9 +109,7 @@ void Foam::zoneDistribute::setUpCommforZone
} }
} }
// Stream the send data into PstreamBuffers, // Stream the send data into PstreamBuffers
// which we also use to track the current topology.
pBufs_.clear(); pBufs_.clear();
for (const int proci : pBufs_.allProcs()) for (const int proci : pBufs_.allProcs())
@ -126,7 +124,7 @@ void Foam::zoneDistribute::setUpCommforZone
} }
} }
pBufs_.finishedSends(sendConnections_, sendProcs_, recvProcs_); pBufs_.finishedSends();
for (const int proci : pBufs_.allProcs()) for (const int proci : pBufs_.allProcs())
{ {

View File

@ -93,15 +93,6 @@ class zoneDistribute
//- Global cell/face index to send for processor-to-processor comms //- Global cell/face index to send for processor-to-processor comms
List<labelList> send_; List<labelList> send_;
//- Parallel [cache]: send connectivity (true/false)
bitSet sendConnections_;
//- Parallel [cache]: send data to these ranks
DynamicList<label> sendProcs_;
//- Parallel [cache]: recv data from these ranks
DynamicList<label> recvProcs_;
//- Persistent set of exchange buffers //- Persistent set of exchange buffers
PstreamBuffers pBufs_; PstreamBuffers pBufs_;

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2019-2020 DLR Copyright (C) 2019-2020 DLR
Copyright (C) 2020-2022 OpenCFD Ltd. Copyright (C) 2020-2023 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -161,18 +161,6 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
if (UPstream::parRun()) if (UPstream::parRun())
{ {
if (sendConnections_.empty())
{
WarningInFunction
<< "The send/recv connections not initialized - "
<< "likely that setUpCommforZone() was not called"
<< endl;
// But don't exit/abort for now
}
// Stream the send data into PstreamBuffers,
// which we also use to track the current topology.
pBufs_.clear(); pBufs_.clear();
for (const int proci : pBufs_.allProcs()) for (const int proci : pBufs_.allProcs())
@ -198,7 +186,7 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
} }
} }
pBufs_.finishedSends(sendConnections_, sendProcs_, recvProcs_); pBufs_.finishedSends();
for (const int proci : pBufs_.allProcs()) for (const int proci : pBufs_.allProcs())
{ {