Compare commits

...

1 Commits

Author SHA1 Message Date
Mark Olesen
5cf5af9d3b WIP: PstreamBuffers with hybrid PEX/NBX exchange
CONFIG: enable nonBlockingExchange as default (for testing)

- changes Pstream::exchangeSizes to use NBX instead of all-to-all
2023-03-02 20:50:52 +01:00
6 changed files with 14 additions and 125 deletions

View File

@ -130,7 +130,9 @@ OptimisationSwitches
// Number processors to change to tree communication
nProcsSimpleSum 0;
// Min numProc to use non-blocking exchange algorithm (Hoeffler: NBX)
nonBlockingExchange 0;
nonBlockingExchange 1;
// Use hybrid NBX/PEX for PstreamBuffers
pbufs.algorithm 0;
// MPI buffer size (bytes)
// Can override with the MPI_BUFFER_SIZE env variable.

View File

@ -27,7 +27,6 @@ License
\*---------------------------------------------------------------------------*/
#include "PstreamBuffers.H"
#include "bitSet.H"
#include "debug.H"
#include "registerSwitch.H"
@ -36,7 +35,7 @@ License
int Foam::PstreamBuffers::algorithm
(
// Not really the most creative name...
Foam::debug::optimisationSwitch("pbufs.algorithm", -1)
Foam::debug::optimisationSwitch("pbufs.algorithm", 0)
);
registerOptSwitch
(
@ -134,8 +133,7 @@ void Foam::PstreamBuffers::finalExchange
void Foam::PstreamBuffers::finalExchange
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const labelUList& neighProcs,
const bool wait,
labelList& recvSizes
)
@ -155,7 +153,7 @@ void Foam::PstreamBuffers::finalExchange
// Preserve self-send, even if not described by neighbourhood
recvSizes[UPstream::myProcNo(comm_)] = 1;
for (const label proci : sendProcs)
for (const label proci : neighProcs)
{
recvSizes[proci] = 1; // Connected
}
@ -172,8 +170,7 @@ void Foam::PstreamBuffers::finalExchange
// PEX stage 1: exchange sizes (limited neighbourhood)
Pstream::exchangeSizes
(
sendProcs,
recvProcs,
neighProcs,
sendBuffers_,
recvSizes,
tag_,
@ -626,7 +623,7 @@ void Foam::PstreamBuffers::finishedNeighbourSends
const bool wait
)
{
finalExchange(neighProcs, neighProcs, wait, recvSizes);
finalExchange(neighProcs, wait, recvSizes);
}
@ -637,71 +634,7 @@ void Foam::PstreamBuffers::finishedNeighbourSends
)
{
labelList recvSizes;
finalExchange(neighProcs, neighProcs, wait, recvSizes);
}
bool Foam::PstreamBuffers::finishedSends
(
bitSet& sendConnections,
DynamicList<label>& sendProcs,
DynamicList<label>& recvProcs,
const bool wait
)
{
bool changed = (sendConnections.size() != nProcs());
if (changed)
{
sendConnections.resize(nProcs());
}
// Update send connections
forAll(sendBuffers_, proci)
{
if (sendConnections.set(proci, !sendBuffers_[proci].empty()))
{
// The state changed
changed = true;
}
}
UPstream::reduceOr(changed, comm_);
if (changed)
{
// Update send/recv topology
labelList recvSizes;
finishedSends(recvSizes, wait); // eg, using all-to-all
// The send ranks
sendProcs.clear();
forAll(sendBuffers_, proci)
{
if (!sendBuffers_[proci].empty())
{
sendProcs.push_back(proci);
}
}
// The recv ranks
recvProcs.clear();
forAll(recvSizes, proci)
{
if (recvSizes[proci] > 0)
{
recvProcs.push_back(proci);
}
}
}
else
{
// Use existing send/recv ranks
labelList recvSizes;
finalExchange(sendProcs, recvProcs, wait, recvSizes);
}
return changed;
finalExchange(neighProcs, wait, recvSizes);
}

View File

@ -108,9 +108,6 @@ SourceFiles
namespace Foam
{
// Forward Declarations
class bitSet;
/*---------------------------------------------------------------------------*\
Class PstreamBuffers Declaration
\*---------------------------------------------------------------------------*/
@ -168,8 +165,7 @@ class PstreamBuffers
// (nonBlocking comms).
void finalExchange
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const labelUList& neighProcs,
const bool wait,
labelList& recvSizes
);
@ -435,25 +431,6 @@ public:
const bool wait = true
);
//- A caching version that uses a limited send/recv connectivity.
//
// Non-blocking mode: populates receive buffers.
// \param sendConnections on/off for sending ranks
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \return True if the send/recv connectivity changed
//
// \warning currently only valid for nonBlocking comms.
bool finishedSends
(
bitSet& sendConnections,
DynamicList<label>& sendProcs,
DynamicList<label>& recvProcs,
const bool wait = true
);
// Gather/scatter modes

View File

@ -109,9 +109,7 @@ void Foam::zoneDistribute::setUpCommforZone
}
}
// Stream the send data into PstreamBuffers,
// which we also use to track the current topology.
// Stream the send data into PstreamBuffers
pBufs_.clear();
for (const int proci : pBufs_.allProcs())
@ -126,7 +124,7 @@ void Foam::zoneDistribute::setUpCommforZone
}
}
pBufs_.finishedSends(sendConnections_, sendProcs_, recvProcs_);
pBufs_.finishedSends();
for (const int proci : pBufs_.allProcs())
{

View File

@ -93,15 +93,6 @@ class zoneDistribute
//- Global cell/face index to send for processor-to-processor comms
List<labelList> send_;
//- Parallel [cache]: send connectivity (true/false)
bitSet sendConnections_;
//- Parallel [cache]: send data to these ranks
DynamicList<label> sendProcs_;
//- Parallel [cache]: recv data from these ranks
DynamicList<label> recvProcs_;
//- Persistent set of exchange buffers
PstreamBuffers pBufs_;

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2019-2020 DLR
Copyright (C) 2020-2022 OpenCFD Ltd.
Copyright (C) 2020-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -161,18 +161,6 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
if (UPstream::parRun())
{
if (sendConnections_.empty())
{
WarningInFunction
<< "The send/recv connections not initialized - "
<< "likely that setUpCommforZone() was not called"
<< endl;
// But don't exit/abort for now
}
// Stream the send data into PstreamBuffers,
// which we also use to track the current topology.
pBufs_.clear();
for (const int proci : pBufs_.allProcs())
@ -198,7 +186,7 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
}
}
pBufs_.finishedSends(sendConnections_, sendProcs_, recvProcs_);
pBufs_.finishedSends();
for (const int proci : pBufs_.allProcs())
{