ENH: bundle together freed/outstanding MPI request handling

- improve communicator wrapping
- avoid indexing errors when using partial subranks
- UPstream::is_subrank (for testing partial subranks)
This commit is contained in:
Mark Olesen 2022-12-06 14:00:53 +01:00
parent d5e82f072e
commit b69db76b67
11 changed files with 302 additions and 112 deletions

View File

@ -0,0 +1,3 @@
Test-parallel-comm2.C
EXE = $(FOAM_USER_APPBIN)/Test-parallel-comm2

View File

@ -0,0 +1,2 @@
/* EXE_INC = */
/* EXE_LIBS = */

View File

@ -0,0 +1,148 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application
Test-parallel-comm2
Description
Basic communicator tests
\*---------------------------------------------------------------------------*/
#include "argList.H"
#include "Time.H"
#include "IPstream.H"
#include "OPstream.H"
#include "Pair.H"
#include "Tuple2.H"
#include "IOstreams.H"
#include "PstreamReduceOps.H"
using namespace Foam;
void rankInfo(const label comm)
{
const int ranki = UPstream::myProcNo(comm);
Pout<< "comm:" << comm
<< "(parent:" << UPstream::parent(comm) << ')'
<< " rank:" << ranki
<< "(sub:" << UPstream::is_subrank(comm)
<< ") nProcs:" << UPstream::nProcs(comm)
<< " baseProcNo:" << UPstream::baseProcNo(comm, ranki);
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
int main(int argc, char *argv[])
{
argList::noBanner();
argList::noCheckProcessorDirectories();
argList::addBoolOption("verbose", "Set debug level");
// Capture manually. We need values before proper startup
int nVerbose = 0;
for (int argi = 1; argi < argc; ++argi)
{
if (strcmp(argv[argi], "-verbose") == 0)
{
++nVerbose;
}
}
UPstream::debug = nVerbose;
#include "setRootCase.H"
Info<< nl
<< "nProcs = " << UPstream::nProcs()
<< " with " << UPstream::nComms() << " predefined comm(s)" << nl;
Info<< nl;
//- Process IDs within a given communicator
Info<< "procIDs: "
<< flatOutput(UPstream::procID(UPstream::worldComm)) << endl;
rankInfo(UPstream::worldComm);
Pout<< endl;
const int myProci = UPstream::myProcNo(UPstream::worldComm);
int localRanki = myProci;
labelList subRanks;
UPstream::communicator newComm;
#if 1
// With first ranks
subRanks = identity(UPstream::nProcs(UPstream::worldComm) / 2);
newComm.reset(UPstream::worldComm, subRanks);
localRanki = UPstream::myProcNo(newComm);
Pout.prefix() =
(
'[' + Foam::name(myProci) + " a:" + Foam::name(localRanki) + "] "
);
Info<< "procIDs: "
<< flatOutput(UPstream::procID(newComm)) << endl;
rankInfo(newComm);
Pout<< endl;
#endif
#if 1
// With every other rank
subRanks = identity(UPstream::nProcs(UPstream::worldComm));
for (label& val : subRanks)
{
if (val % 2) val = -1;
}
newComm.reset(UPstream::worldComm, subRanks);
localRanki = UPstream::myProcNo(newComm);
Pout.prefix() =
(
'[' + Foam::name(myProci) + " b:" + Foam::name(localRanki) + "] "
);
Info<< "procIDs: "
<< flatOutput(UPstream::procID(newComm)) << endl;
rankInfo(newComm);
Pout<< endl;
#endif
Info<< "\nEnd\n" << endl;
return 0;
}
// ************************************************************************* //

View File

@ -179,24 +179,40 @@ Foam::label Foam::UPstream::allocateCommunicator
// Initialise; overwritten by allocatePstreamCommunicator
myProcNo_[index] = 0;
const label numSubRanks = subRanks.size();
// The selected sub-ranks.
// - transcribe from label to int. Treat negative values as 'ignore'
// - enforce incremental order (so index is rank in next communicator)
// Convert from label to int
procIDs_[index].resize_nocopy(numSubRanks);
forAll(procIDs_[index], i)
auto& procIds = procIDs_[index];
procIds.resize_nocopy(subRanks.size());
label numSubRanks = 0;
bool monotonicOrder = true;
for (const label subRanki : subRanks)
{
procIDs_[index][i] = subRanks[i];
// Enforce incremental order (so index is rank in next communicator)
if (i >= 1 && subRanks[i] <= subRanks[i-1])
if (subRanki < 0)
{
FatalErrorInFunction
<< "subranks not sorted : " << subRanks
<< " when allocating subcommunicator from parent "
<< parentIndex
<< Foam::abort(FatalError);
continue;
}
if (monotonicOrder && numSubRanks)
{
monotonicOrder = (procIds[numSubRanks-1] < subRanki);
}
procIds[numSubRanks] = subRanki;
++numSubRanks;
}
if (!monotonicOrder)
{
auto last = procIds.begin() + numSubRanks;
std::sort(procIds.begin(), last);
last = std::unique(procIds.begin(), last);
numSubRanks = label(last - procIds.begin());
}
procIds.resize(numSubRanks);
parentComm_[index] = parentIndex;
// Size but do not fill structure - this is done on-the-fly
@ -206,6 +222,20 @@ Foam::label Foam::UPstream::allocateCommunicator
if (doPstream && parRun())
{
allocatePstreamCommunicator(parentIndex, index);
// Could 'remember' locations of uninvolved ranks
/// if (myProcNo_[index] < 0 && parentIndex >= 0)
/// {
/// // As global rank
/// myProcNo_[index] = -(myProcNo_[worldComm]+1);
///
/// OR:
/// // As parent rank number
/// if (myProcNo_[parentIndex] >= 0)
/// {
/// myProcNo_[index] = -(myProcNo_[parentIndex]+1);
/// }
/// }
}
return index;
@ -226,9 +256,9 @@ void Foam::UPstream::freeCommunicator
if (debug)
{
Pout<< "Communicators : Freeing communicator " << communicator << endl
<< " parent : " << parentComm_[communicator] << endl
<< " myProcNo : " << myProcNo_[communicator] << endl
Pout<< "Communicators : Freeing communicator " << communicator
<< " parent: " << parentComm_[communicator]
<< " myProcNo: " << myProcNo_[communicator]
<< endl;
}
@ -252,7 +282,7 @@ void Foam::UPstream::freeCommunicators(const bool doPstream)
{
forAll(myProcNo_, communicator)
{
if (myProcNo_[communicator] != -1)
if (myProcNo_[communicator] >= 0)
{
freeCommunicator(communicator, doPstream);
}
@ -262,7 +292,7 @@ void Foam::UPstream::freeCommunicators(const bool doPstream)
int Foam::UPstream::baseProcNo(label comm, int procID)
{
while (parent(comm) >= 0)
while (parent(comm) >= 0 && procID >= 0)
{
const auto& parentRanks = UPstream::procID(comm);
procID = parentRanks[procID];

View File

@ -250,7 +250,7 @@ private:
public:
// Declare name of the class and its debug switch
//- Declare name of the class and its debug switch
ClassName("UPstream");
@ -313,11 +313,16 @@ public:
// Member Functions
//- Allocate a new communicator
//- Allocate a new communicator with subRanks of parent communicator
static label allocateCommunicator
(
//! The parent communicator
const label parent,
//! The sub-ranks of parent to use (ignore negative values)
const labelUList& subRanks,
//! Call allocatePstreamCommunicator
const bool doPstream = true
);
@ -332,6 +337,7 @@ public:
//- Free all communicators
static void freeCommunicators(const bool doPstream);
//- Wrapper class for allocating/freeing communicators
class communicator
{
@ -339,38 +345,65 @@ public:
public:
//- Default construct (a placeholder communicator)
communicator() : comm_(-1) {}
//- Move construct
communicator(communicator&&) = default;
//- Move assignment
communicator& operator=(communicator&&) = default;
//- No copy construct
communicator(const communicator&) = delete;
//- No copy assignment
void operator=(const communicator&) = delete;
//- Allocate a communicator from given parent
//- Default construct (a placeholder communicator)
communicator() noexcept : comm_(-1) {}
//- Move construct, takes ownership
communicator(communicator&& c) : comm_(c.comm_) { c.comm_ = -1; }
//- Allocate a communicator based on given parent
communicator
(
//! The parent communicator
const label parent,
//! The sub-ranks of parent to use (ignore negative values)
const labelUList& subRanks,
const bool doPstream
//! Call allocatePstreamCommunicator
const bool doPstream = true
)
:
comm_(allocateCommunicator(parent, subRanks, doPstream))
{}
//- Free allocated communicator and group
~communicator()
~communicator() { freeCommunicator(comm_); }
//- True if communicator is non-negative (ie, was allocated)
bool good() const noexcept { return (comm_ >= 0); }
//- The communicator label
label comm() const noexcept { return comm_; }
//- Free allocated communicator and group
void reset() { freeCommunicator(comm_); comm_ = -1; }
//- Allocate with subRanks of parent communicator
void reset(label parent, const labelUList& subRanks)
{
freeCommunicator(comm_);
comm_ = allocateCommunicator(parent, subRanks);
}
//- Take ownership, free allocated communicator and group.
void reset(communicator&& c)
{
if (comm_ != c.comm_) freeCommunicator(comm_);
comm_ = c.comm_;
c.comm_ = -1;
}
//- Move assignment, takes ownership
void operator=(communicator&& c) { reset(std::move(c)); }
//- Cast to label - the same as comm()
operator label() const noexcept { return comm_; }
};
@ -468,12 +501,18 @@ public:
return 0;
}
//- Am I the master process
//- Am I the master rank
static bool master(const label communicator = worldComm)
{
return myProcNo_[communicator] == masterNo();
}
//- Is this process a sub-rank on the communicator
static bool is_subrank(const label communicator = worldComm)
{
return myProcNo_[communicator] > masterNo();
}
//- Number of this process (starting from masterNo() = 0)
static int myProcNo(const label communicator = worldComm)
{

View File

@ -40,17 +40,15 @@ Foam::DynamicList<MPI_Comm> Foam::PstreamGlobals::MPICommunicators_;
Foam::DynamicList<MPI_Group> Foam::PstreamGlobals::MPIGroups_;
// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //
void Foam::PstreamGlobals::checkCommunicator
(
const label comm,
const label toProcNo
)
{
if
(
comm < 0
|| comm >= PstreamGlobals::MPICommunicators_.size()
)
if (comm < 0 || comm >= PstreamGlobals::MPICommunicators_.size())
{
FatalErrorInFunction
<< "toProcNo:" << toProcNo << " : illegal communicator "

View File

@ -6,6 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2013-2015 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -65,9 +66,35 @@ extern DynamicList<MPI_Comm> MPICommunicators_;
// Groups associated with the currrent communicators.
extern DynamicList<MPI_Group> MPIGroups_;
// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //
//- Fatal if comm is outside the allocated range
void checkCommunicator(const label comm, const label toProcNo);
//- Reuse previously freed request locations or push request onto list
//- of outstanding requests.
//
// \return index of request within outstandingRequests_
inline label push_request(MPI_Request request)
{
label index;
if (freedRequests_.size())
{
index = freedRequests_.back();
freedRequests_.pop_back();
outstandingRequests_[index] = request;
}
else
{
index = outstandingRequests_.size();
outstandingRequests_.push_back(request);
}
return index;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -218,7 +218,7 @@ Foam::label Foam::UIPstream::read
<< Foam::endl;
}
PstreamGlobals::outstandingRequests_.append(request);
PstreamGlobals::outstandingRequests_.push_back(request);
// Assume the message is completely received.
return bufSize;

View File

@ -159,7 +159,7 @@ bool Foam::UOPstream::write
<< Foam::endl;
}
PstreamGlobals::outstandingRequests_.append(request);
PstreamGlobals::outstandingRequests_.push_back(request);
}
else
{

View File

@ -687,6 +687,9 @@ void Foam::UPstream::waitRequests(const label start)
<< " outstanding requests starting at " << start << endl;
}
// TBD: check for
// (start < 0 || start > PstreamGlobals::outstandingRequests_.size())
if (PstreamGlobals::outstandingRequests_.size())
{
SubList<MPI_Request> waitRequests
@ -698,6 +701,7 @@ void Foam::UPstream::waitRequests(const label start)
profilingPstream::beginTiming();
// On success: sets each request to MPI_REQUEST_NULL
if
(
MPI_Waitall
@ -714,7 +718,7 @@ void Foam::UPstream::waitRequests(const label start)
profilingPstream::addWaitTime();
resetRequests(start);
PstreamGlobals::outstandingRequests_.resize(start);
}
if (debug)
@ -749,6 +753,7 @@ void Foam::UPstream::waitRequest(const label i)
profilingPstream::beginTiming();
// On success: sets request to MPI_REQUEST_NULL
if
(
MPI_Wait
@ -797,6 +802,7 @@ bool Foam::UPstream::finishedRequest(const label i)
<< Foam::abort(FatalError);
}
// On success: sets request to MPI_REQUEST_NULL
int flag;
MPI_Test
(
@ -821,7 +827,7 @@ int Foam::UPstream::allocateTag(const char* const msg)
if (PstreamGlobals::freedTags_.size())
{
tag = PstreamGlobals::freedTags_.back();
(void)PstreamGlobals::freedTags_.pop_back();
PstreamGlobals::freedTags_.pop_back();
}
else
{

View File

@ -179,16 +179,7 @@ void Foam::PstreamDetail::allReduce
<< Foam::abort(FatalError);
}
if (PstreamGlobals::freedRequests_.size())
{
*requestID = PstreamGlobals::freedRequests_.remove();
PstreamGlobals::outstandingRequests_[*requestID] = request;
}
else
{
*requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
}
*requestID = PstreamGlobals::push_request(request);
}
#endif
@ -299,16 +290,7 @@ void Foam::PstreamDetail::allToAll
<< Foam::abort(FatalError);
}
if (PstreamGlobals::freedRequests_.size())
{
*requestID = PstreamGlobals::freedRequests_.remove();
PstreamGlobals::outstandingRequests_[*requestID] = request;
}
else
{
*requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
}
*requestID = PstreamGlobals::push_request(request);
}
#endif
@ -449,16 +431,7 @@ void Foam::PstreamDetail::allToAllv
<< Foam::abort(FatalError);
}
if (PstreamGlobals::freedRequests_.size())
{
*requestID = PstreamGlobals::freedRequests_.remove();
PstreamGlobals::outstandingRequests_[*requestID] = request;
}
else
{
*requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
}
*requestID = PstreamGlobals::push_request(request);
}
#endif
@ -568,16 +541,7 @@ void Foam::PstreamDetail::gather
<< Foam::abort(FatalError);
}
if (PstreamGlobals::freedRequests_.size())
{
*requestID = PstreamGlobals::freedRequests_.remove();
PstreamGlobals::outstandingRequests_[*requestID] = request;
}
else
{
*requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
}
*requestID = PstreamGlobals::push_request(request);
}
#endif
@ -686,16 +650,7 @@ void Foam::PstreamDetail::scatter
<< Foam::abort(FatalError);
}
if (PstreamGlobals::freedRequests_.size())
{
*requestID = PstreamGlobals::freedRequests_.remove();
PstreamGlobals::outstandingRequests_[*requestID] = request;
}
else
{
*requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
}
*requestID = PstreamGlobals::push_request(request);
}
#endif
@ -830,16 +785,7 @@ void Foam::PstreamDetail::gatherv
<< Foam::abort(FatalError);
}
if (PstreamGlobals::freedRequests_.size())
{
*requestID = PstreamGlobals::freedRequests_.remove();
PstreamGlobals::outstandingRequests_[*requestID] = request;
}
else
{
*requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
}
*requestID = PstreamGlobals::push_request(request);
}
#endif
@ -968,16 +914,7 @@ void Foam::PstreamDetail::scatterv
<< Foam::abort(FatalError);
}
if (PstreamGlobals::freedRequests_.size())
{
*requestID = PstreamGlobals::freedRequests_.remove();
PstreamGlobals::outstandingRequests_[*requestID] = request;
}
else
{
*requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
}
*requestID = PstreamGlobals::push_request(request);
}
#endif