diff --git a/applications/test/parallel-comm2/Make/files b/applications/test/parallel-comm2/Make/files
new file mode 100644
index 0000000000..a6841ec2a1
--- /dev/null
+++ b/applications/test/parallel-comm2/Make/files
@@ -0,0 +1,3 @@
+Test-parallel-comm2.C
+
+EXE = $(FOAM_USER_APPBIN)/Test-parallel-comm2
diff --git a/applications/test/parallel-comm2/Make/options b/applications/test/parallel-comm2/Make/options
new file mode 100644
index 0000000000..18e6fe47af
--- /dev/null
+++ b/applications/test/parallel-comm2/Make/options
@@ -0,0 +1,2 @@
+/* EXE_INC = */
+/* EXE_LIBS = */
diff --git a/applications/test/parallel-comm2/Test-parallel-comm2.C b/applications/test/parallel-comm2/Test-parallel-comm2.C
new file mode 100644
index 0000000000..934a480c20
--- /dev/null
+++ b/applications/test/parallel-comm2/Test-parallel-comm2.C
@@ -0,0 +1,148 @@
+/*---------------------------------------------------------------------------*\
+ ========= |
+ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
+ \\ / O peration |
+ \\ / A nd | www.openfoam.com
+ \\/ M anipulation |
+-------------------------------------------------------------------------------
+ Copyright (C) 2022 OpenCFD Ltd.
+-------------------------------------------------------------------------------
+License
+ This file is part of OpenFOAM.
+
+ OpenFOAM is free software: you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with OpenFOAM. If not, see .
+
+Application
+ Test-parallel-comm2
+
+Description
+ Basic communicator tests
+
+\*---------------------------------------------------------------------------*/
+
+#include "argList.H"
+#include "Time.H"
+#include "IPstream.H"
+#include "OPstream.H"
+#include "Pair.H"
+#include "Tuple2.H"
+#include "IOstreams.H"
+#include "PstreamReduceOps.H"
+
+using namespace Foam;
+
+void rankInfo(const label comm)
+{
+ const int ranki = UPstream::myProcNo(comm);
+
+ Pout<< "comm:" << comm
+ << "(parent:" << UPstream::parent(comm) << ')'
+ << " rank:" << ranki
+ << "(sub:" << UPstream::is_subrank(comm)
+ << ") nProcs:" << UPstream::nProcs(comm)
+ << " baseProcNo:" << UPstream::baseProcNo(comm, ranki);
+}
+
+
+// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
+
+int main(int argc, char *argv[])
+{
+ argList::noBanner();
+ argList::noCheckProcessorDirectories();
+ argList::addBoolOption("verbose", "Set debug level");
+
+ // Capture manually. We need values before proper startup
+ int nVerbose = 0;
+ for (int argi = 1; argi < argc; ++argi)
+ {
+ if (strcmp(argv[argi], "-verbose") == 0)
+ {
+ ++nVerbose;
+ }
+ }
+
+ UPstream::debug = nVerbose;
+
+ #include "setRootCase.H"
+
+ Info<< nl
+ << "nProcs = " << UPstream::nProcs()
+ << " with " << UPstream::nComms() << " predefined comm(s)" << nl;
+
+ Info<< nl;
+
+ //- Process IDs within a given communicator
+ Info<< "procIDs: "
+ << flatOutput(UPstream::procID(UPstream::worldComm)) << endl;
+
+ rankInfo(UPstream::worldComm);
+ Pout<< endl;
+
+ const int myProci = UPstream::myProcNo(UPstream::worldComm);
+ int localRanki = myProci;
+
+ labelList subRanks;
+ UPstream::communicator newComm;
+
+ #if 1
+ // With first ranks
+ subRanks = identity(UPstream::nProcs(UPstream::worldComm) / 2);
+
+ newComm.reset(UPstream::worldComm, subRanks);
+ localRanki = UPstream::myProcNo(newComm);
+
+ Pout.prefix() =
+ (
+ '[' + Foam::name(myProci) + " a:" + Foam::name(localRanki) + "] "
+ );
+
+ Info<< "procIDs: "
+ << flatOutput(UPstream::procID(newComm)) << endl;
+
+ rankInfo(newComm);
+ Pout<< endl;
+ #endif
+
+ #if 1
+ // With every other rank
+ subRanks = identity(UPstream::nProcs(UPstream::worldComm));
+
+ for (label& val : subRanks)
+ {
+ if (val % 2) val = -1;
+ }
+
+ newComm.reset(UPstream::worldComm, subRanks);
+ localRanki = UPstream::myProcNo(newComm);
+
+ Pout.prefix() =
+ (
+ '[' + Foam::name(myProci) + " b:" + Foam::name(localRanki) + "] "
+ );
+
+ Info<< "procIDs: "
+ << flatOutput(UPstream::procID(newComm)) << endl;
+
+ rankInfo(newComm);
+ Pout<< endl;
+ #endif
+
+ Info<< "\nEnd\n" << endl;
+
+ return 0;
+}
+
+
+// ************************************************************************* //
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C
index d35520f1e2..d4ef48da5e 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C
@@ -179,24 +179,40 @@ Foam::label Foam::UPstream::allocateCommunicator
// Initialise; overwritten by allocatePstreamCommunicator
myProcNo_[index] = 0;
- const label numSubRanks = subRanks.size();
+ // The selected sub-ranks.
+ // - transcribe from label to int. Treat negative values as 'ignore'
+ // - enforce incremental order (so index is rank in next communicator)
- // Convert from label to int
- procIDs_[index].resize_nocopy(numSubRanks);
- forAll(procIDs_[index], i)
+ auto& procIds = procIDs_[index];
+ procIds.resize_nocopy(subRanks.size());
+
+ label numSubRanks = 0;
+ bool monotonicOrder = true;
+ for (const label subRanki : subRanks)
{
- procIDs_[index][i] = subRanks[i];
-
- // Enforce incremental order (so index is rank in next communicator)
- if (i >= 1 && subRanks[i] <= subRanks[i-1])
+ if (subRanki < 0)
{
- FatalErrorInFunction
- << "subranks not sorted : " << subRanks
- << " when allocating subcommunicator from parent "
- << parentIndex
- << Foam::abort(FatalError);
+ continue;
}
+ if (monotonicOrder && numSubRanks)
+ {
+ monotonicOrder = (procIds[numSubRanks-1] < subRanki);
+ }
+
+ procIds[numSubRanks] = subRanki;
+ ++numSubRanks;
}
+
+ if (!monotonicOrder)
+ {
+ auto last = procIds.begin() + numSubRanks;
+ std::sort(procIds.begin(), last);
+ last = std::unique(procIds.begin(), last);
+ numSubRanks = label(last - procIds.begin());
+ }
+
+ procIds.resize(numSubRanks);
+
parentComm_[index] = parentIndex;
// Size but do not fill structure - this is done on-the-fly
@@ -206,6 +222,20 @@ Foam::label Foam::UPstream::allocateCommunicator
if (doPstream && parRun())
{
allocatePstreamCommunicator(parentIndex, index);
+
+ // Could 'remember' locations of uninvolved ranks
+ /// if (myProcNo_[index] < 0 && parentIndex >= 0)
+ /// {
+ /// // As global rank
+ /// myProcNo_[index] = -(myProcNo_[worldComm]+1);
+ ///
+ /// OR:
+ /// // As parent rank number
+ /// if (myProcNo_[parentIndex] >= 0)
+ /// {
+ /// myProcNo_[index] = -(myProcNo_[parentIndex]+1);
+ /// }
+ /// }
}
return index;
@@ -226,9 +256,9 @@ void Foam::UPstream::freeCommunicator
if (debug)
{
- Pout<< "Communicators : Freeing communicator " << communicator << endl
- << " parent : " << parentComm_[communicator] << endl
- << " myProcNo : " << myProcNo_[communicator] << endl
+ Pout<< "Communicators : Freeing communicator " << communicator
+ << " parent: " << parentComm_[communicator]
+ << " myProcNo: " << myProcNo_[communicator]
<< endl;
}
@@ -252,7 +282,7 @@ void Foam::UPstream::freeCommunicators(const bool doPstream)
{
forAll(myProcNo_, communicator)
{
- if (myProcNo_[communicator] != -1)
+ if (myProcNo_[communicator] >= 0)
{
freeCommunicator(communicator, doPstream);
}
@@ -262,7 +292,7 @@ void Foam::UPstream::freeCommunicators(const bool doPstream)
int Foam::UPstream::baseProcNo(label comm, int procID)
{
- while (parent(comm) >= 0)
+ while (parent(comm) >= 0 && procID >= 0)
{
const auto& parentRanks = UPstream::procID(comm);
procID = parentRanks[procID];
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H
index 04ac5f6065..2ad8042b09 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H
@@ -250,7 +250,7 @@ private:
public:
- // Declare name of the class and its debug switch
+ //- Declare name of the class and its debug switch
ClassName("UPstream");
@@ -313,11 +313,16 @@ public:
// Member Functions
- //- Allocate a new communicator
+ //- Allocate a new communicator with subRanks of parent communicator
static label allocateCommunicator
(
+ //! The parent communicator
const label parent,
+
+ //! The sub-ranks of parent to use (ignore negative values)
const labelUList& subRanks,
+
+ //! Call allocatePstreamCommunicator
const bool doPstream = true
);
@@ -332,6 +337,7 @@ public:
//- Free all communicators
static void freeCommunicators(const bool doPstream);
+
//- Wrapper class for allocating/freeing communicators
class communicator
{
@@ -339,38 +345,65 @@ public:
public:
- //- Default construct (a placeholder communicator)
- communicator() : comm_(-1) {}
-
- //- Move construct
- communicator(communicator&&) = default;
-
- //- Move assignment
- communicator& operator=(communicator&&) = default;
-
//- No copy construct
communicator(const communicator&) = delete;
//- No copy assignment
void operator=(const communicator&) = delete;
- //- Allocate a communicator from given parent
+ //- Default construct (a placeholder communicator)
+ communicator() noexcept : comm_(-1) {}
+
+ //- Move construct, takes ownership
+ communicator(communicator&& c) : comm_(c.comm_) { c.comm_ = -1; }
+
+ //- Allocate a communicator based on given parent
communicator
(
+ //! The parent communicator
const label parent,
+
+ //! The sub-ranks of parent to use (ignore negative values)
const labelUList& subRanks,
- const bool doPstream
+
+ //! Call allocatePstreamCommunicator
+ const bool doPstream = true
)
:
comm_(allocateCommunicator(parent, subRanks, doPstream))
{}
//- Free allocated communicator and group
- ~communicator()
+ ~communicator() { freeCommunicator(comm_); }
+
+ //- True if communicator is non-negative (ie, was allocated)
+ bool good() const noexcept { return (comm_ >= 0); }
+
+ //- The communicator label
+ label comm() const noexcept { return comm_; }
+
+ //- Free allocated communicator and group
+ void reset() { freeCommunicator(comm_); comm_ = -1; }
+
+ //- Allocate with subRanks of parent communicator
+ void reset(label parent, const labelUList& subRanks)
{
freeCommunicator(comm_);
+ comm_ = allocateCommunicator(parent, subRanks);
}
+ //- Take ownership, free allocated communicator and group.
+ void reset(communicator&& c)
+ {
+ if (comm_ != c.comm_) freeCommunicator(comm_);
+ comm_ = c.comm_;
+ c.comm_ = -1;
+ }
+
+ //- Move assignment, takes ownership
+ void operator=(communicator&& c) { reset(std::move(c)); }
+
+ //- Cast to label - the same as comm()
operator label() const noexcept { return comm_; }
};
@@ -468,12 +501,18 @@ public:
return 0;
}
- //- Am I the master process
+ //- Am I the master rank
static bool master(const label communicator = worldComm)
{
return myProcNo_[communicator] == masterNo();
}
+ //- Is this process a sub-rank on the communicator
+ static bool is_subrank(const label communicator = worldComm)
+ {
+ return myProcNo_[communicator] > masterNo();
+ }
+
//- Number of this process (starting from masterNo() = 0)
static int myProcNo(const label communicator = worldComm)
{
diff --git a/src/Pstream/mpi/PstreamGlobals.C b/src/Pstream/mpi/PstreamGlobals.C
index a5c9f5c87f..f6b09aa4cd 100644
--- a/src/Pstream/mpi/PstreamGlobals.C
+++ b/src/Pstream/mpi/PstreamGlobals.C
@@ -40,17 +40,15 @@ Foam::DynamicList Foam::PstreamGlobals::MPICommunicators_;
Foam::DynamicList Foam::PstreamGlobals::MPIGroups_;
+// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //
+
void Foam::PstreamGlobals::checkCommunicator
(
const label comm,
const label toProcNo
)
{
- if
- (
- comm < 0
- || comm >= PstreamGlobals::MPICommunicators_.size()
- )
+ if (comm < 0 || comm >= PstreamGlobals::MPICommunicators_.size())
{
FatalErrorInFunction
<< "toProcNo:" << toProcNo << " : illegal communicator "
diff --git a/src/Pstream/mpi/PstreamGlobals.H b/src/Pstream/mpi/PstreamGlobals.H
index c472379e34..33287c6c9a 100644
--- a/src/Pstream/mpi/PstreamGlobals.H
+++ b/src/Pstream/mpi/PstreamGlobals.H
@@ -6,6 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2013-2015 OpenFOAM Foundation
+ Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@@ -65,9 +66,35 @@ extern DynamicList MPICommunicators_;
// Groups associated with the currrent communicators.
extern DynamicList MPIGroups_;
+
+// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //
+
//- Fatal if comm is outside the allocated range
void checkCommunicator(const label comm, const label toProcNo);
+//- Reuse previously freed request locations or push request onto list
+//- of outstanding requests.
+//
+// \return index of request within outstandingRequests_
+inline label push_request(MPI_Request request)
+{
+ label index;
+
+ if (freedRequests_.size())
+ {
+ index = freedRequests_.back();
+ freedRequests_.pop_back();
+ outstandingRequests_[index] = request;
+ }
+ else
+ {
+ index = outstandingRequests_.size();
+ outstandingRequests_.push_back(request);
+ }
+
+ return index;
+}
+
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
diff --git a/src/Pstream/mpi/UIPstreamRead.C b/src/Pstream/mpi/UIPstreamRead.C
index d66285474f..40d67eabc9 100644
--- a/src/Pstream/mpi/UIPstreamRead.C
+++ b/src/Pstream/mpi/UIPstreamRead.C
@@ -218,7 +218,7 @@ Foam::label Foam::UIPstream::read
<< Foam::endl;
}
- PstreamGlobals::outstandingRequests_.append(request);
+ PstreamGlobals::outstandingRequests_.push_back(request);
// Assume the message is completely received.
return bufSize;
diff --git a/src/Pstream/mpi/UOPstreamWrite.C b/src/Pstream/mpi/UOPstreamWrite.C
index 4ee1dc87ad..3c61afecbe 100644
--- a/src/Pstream/mpi/UOPstreamWrite.C
+++ b/src/Pstream/mpi/UOPstreamWrite.C
@@ -159,7 +159,7 @@ bool Foam::UOPstream::write
<< Foam::endl;
}
- PstreamGlobals::outstandingRequests_.append(request);
+ PstreamGlobals::outstandingRequests_.push_back(request);
}
else
{
diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C
index 5ecd11b155..b9e2ab76a1 100644
--- a/src/Pstream/mpi/UPstream.C
+++ b/src/Pstream/mpi/UPstream.C
@@ -687,6 +687,9 @@ void Foam::UPstream::waitRequests(const label start)
<< " outstanding requests starting at " << start << endl;
}
+ // TBD: check for
+ // (start < 0 || start > PstreamGlobals::outstandingRequests_.size())
+
if (PstreamGlobals::outstandingRequests_.size())
{
SubList waitRequests
@@ -698,6 +701,7 @@ void Foam::UPstream::waitRequests(const label start)
profilingPstream::beginTiming();
+ // On success: sets each request to MPI_REQUEST_NULL
if
(
MPI_Waitall
@@ -714,7 +718,7 @@ void Foam::UPstream::waitRequests(const label start)
profilingPstream::addWaitTime();
- resetRequests(start);
+ PstreamGlobals::outstandingRequests_.resize(start);
}
if (debug)
@@ -749,6 +753,7 @@ void Foam::UPstream::waitRequest(const label i)
profilingPstream::beginTiming();
+ // On success: sets request to MPI_REQUEST_NULL
if
(
MPI_Wait
@@ -797,6 +802,7 @@ bool Foam::UPstream::finishedRequest(const label i)
<< Foam::abort(FatalError);
}
+ // On success: sets request to MPI_REQUEST_NULL
int flag;
MPI_Test
(
@@ -821,7 +827,7 @@ int Foam::UPstream::allocateTag(const char* const msg)
if (PstreamGlobals::freedTags_.size())
{
tag = PstreamGlobals::freedTags_.back();
- (void)PstreamGlobals::freedTags_.pop_back();
+ PstreamGlobals::freedTags_.pop_back();
}
else
{
diff --git a/src/Pstream/mpi/UPstreamWrappingTemplates.C b/src/Pstream/mpi/UPstreamWrappingTemplates.C
index 8af13e7e16..65139268f4 100644
--- a/src/Pstream/mpi/UPstreamWrappingTemplates.C
+++ b/src/Pstream/mpi/UPstreamWrappingTemplates.C
@@ -179,16 +179,7 @@ void Foam::PstreamDetail::allReduce
<< Foam::abort(FatalError);
}
- if (PstreamGlobals::freedRequests_.size())
- {
- *requestID = PstreamGlobals::freedRequests_.remove();
- PstreamGlobals::outstandingRequests_[*requestID] = request;
- }
- else
- {
- *requestID = PstreamGlobals::outstandingRequests_.size();
- PstreamGlobals::outstandingRequests_.append(request);
- }
+ *requestID = PstreamGlobals::push_request(request);
}
#endif
@@ -299,16 +290,7 @@ void Foam::PstreamDetail::allToAll
<< Foam::abort(FatalError);
}
- if (PstreamGlobals::freedRequests_.size())
- {
- *requestID = PstreamGlobals::freedRequests_.remove();
- PstreamGlobals::outstandingRequests_[*requestID] = request;
- }
- else
- {
- *requestID = PstreamGlobals::outstandingRequests_.size();
- PstreamGlobals::outstandingRequests_.append(request);
- }
+ *requestID = PstreamGlobals::push_request(request);
}
#endif
@@ -449,16 +431,7 @@ void Foam::PstreamDetail::allToAllv
<< Foam::abort(FatalError);
}
- if (PstreamGlobals::freedRequests_.size())
- {
- *requestID = PstreamGlobals::freedRequests_.remove();
- PstreamGlobals::outstandingRequests_[*requestID] = request;
- }
- else
- {
- *requestID = PstreamGlobals::outstandingRequests_.size();
- PstreamGlobals::outstandingRequests_.append(request);
- }
+ *requestID = PstreamGlobals::push_request(request);
}
#endif
@@ -568,16 +541,7 @@ void Foam::PstreamDetail::gather
<< Foam::abort(FatalError);
}
- if (PstreamGlobals::freedRequests_.size())
- {
- *requestID = PstreamGlobals::freedRequests_.remove();
- PstreamGlobals::outstandingRequests_[*requestID] = request;
- }
- else
- {
- *requestID = PstreamGlobals::outstandingRequests_.size();
- PstreamGlobals::outstandingRequests_.append(request);
- }
+ *requestID = PstreamGlobals::push_request(request);
}
#endif
@@ -686,16 +650,7 @@ void Foam::PstreamDetail::scatter
<< Foam::abort(FatalError);
}
- if (PstreamGlobals::freedRequests_.size())
- {
- *requestID = PstreamGlobals::freedRequests_.remove();
- PstreamGlobals::outstandingRequests_[*requestID] = request;
- }
- else
- {
- *requestID = PstreamGlobals::outstandingRequests_.size();
- PstreamGlobals::outstandingRequests_.append(request);
- }
+ *requestID = PstreamGlobals::push_request(request);
}
#endif
@@ -830,16 +785,7 @@ void Foam::PstreamDetail::gatherv
<< Foam::abort(FatalError);
}
- if (PstreamGlobals::freedRequests_.size())
- {
- *requestID = PstreamGlobals::freedRequests_.remove();
- PstreamGlobals::outstandingRequests_[*requestID] = request;
- }
- else
- {
- *requestID = PstreamGlobals::outstandingRequests_.size();
- PstreamGlobals::outstandingRequests_.append(request);
- }
+ *requestID = PstreamGlobals::push_request(request);
}
#endif
@@ -968,16 +914,7 @@ void Foam::PstreamDetail::scatterv
<< Foam::abort(FatalError);
}
- if (PstreamGlobals::freedRequests_.size())
- {
- *requestID = PstreamGlobals::freedRequests_.remove();
- PstreamGlobals::outstandingRequests_[*requestID] = request;
- }
- else
- {
- *requestID = PstreamGlobals::outstandingRequests_.size();
- PstreamGlobals::outstandingRequests_.append(request);
- }
+ *requestID = PstreamGlobals::push_request(request);
}
#endif