ENH: add separate tracking of MPI_Comm_free, MPI_Group_free pending
- permits distinction between communicators/groups that were user-created (eg, MPI_Comm_create) versus those queried from MPI. Previously simply relied on non-null values, but that is too fragile ENH: support List<Request> version of UPstream::finishedRequests - allows more independent algorithms ENH: added UPstream::probeMessage(...). Blocking or non-blocking
This commit is contained in:
parent
ab4c5f25ac
commit
068ab8ccc7
@ -1,2 +1,4 @@
|
||||
/* EXE_INC = */
|
||||
/* EXE_LIBS = */
|
||||
include $(GENERAL_RULES)/mpi-rules
|
||||
|
||||
EXE_INC = $(PFLAGS) $(PINC) $(c++LESSWARN)
|
||||
EXE_LIBS = $(PLIBS)
|
||||
|
@ -5,7 +5,7 @@
|
||||
\\ / A nd | www.openfoam.com
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2022 OpenCFD Ltd.
|
||||
Copyright (C) 2022-2023 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -39,6 +39,7 @@ Description
|
||||
#include "Tuple2.H"
|
||||
#include "IOstreams.H"
|
||||
#include "PstreamReduceOps.H"
|
||||
#include <mpi.h>
|
||||
|
||||
using namespace Foam;
|
||||
|
||||
@ -62,6 +63,8 @@ int main(int argc, char *argv[])
|
||||
argList::noBanner();
|
||||
argList::noCheckProcessorDirectories();
|
||||
argList::addBoolOption("verbose", "Set debug level");
|
||||
argList::addBoolOption("comm-split", "Test simple comm split");
|
||||
argList::addBoolOption("host-comm", "Test DIY host-comm split");
|
||||
|
||||
// Capture manually. We need values before proper startup
|
||||
int nVerbose = 0;
|
||||
@ -139,6 +142,91 @@ int main(int argc, char *argv[])
|
||||
Pout<< endl;
|
||||
#endif
|
||||
|
||||
if (Pstream::parRun() && args.found("comm-split"))
|
||||
{
|
||||
MPI_Comm hostComm;
|
||||
MPI_Comm_split_type
|
||||
(
|
||||
MPI_COMM_WORLD,
|
||||
MPI_COMM_TYPE_SHARED, // OMPI_COMM_TYPE_NODE
|
||||
0, MPI_INFO_NULL, &hostComm
|
||||
);
|
||||
|
||||
int host_nprocs = 0;
|
||||
int host_rank = 0;
|
||||
MPI_Comm_size(hostComm, &host_nprocs);
|
||||
MPI_Comm_rank(hostComm, &host_rank);
|
||||
|
||||
Pout<< nl << "Host comm with "
|
||||
<< host_rank << " / " << host_nprocs
|
||||
<< " (using MPI_Comm_split_type)" << endl;
|
||||
|
||||
MPI_Comm_free(&hostComm);
|
||||
}
|
||||
if (Pstream::parRun() && args.found("host-comm"))
|
||||
{
|
||||
// Host communicator, based on the current worldComm
|
||||
// Use hostname
|
||||
// Lowest rank per hostname is the IO rank
|
||||
|
||||
label numprocs = UPstream::nProcs(UPstream::globalComm);
|
||||
|
||||
stringList hosts(numprocs);
|
||||
hosts[Pstream::myProcNo(UPstream::globalComm)] = hostName();
|
||||
|
||||
labelList hostIDs_;
|
||||
|
||||
// Compact
|
||||
if (Pstream::master(UPstream::globalComm))
|
||||
{
|
||||
DynamicList<word> hostNames(numprocs);
|
||||
hostIDs_.resize_nocopy(numprocs);
|
||||
|
||||
forAll(hosts, proci)
|
||||
{
|
||||
const word& host = hosts[proci];
|
||||
|
||||
hostIDs_[proci] = hostNames.find(host);
|
||||
|
||||
if (hostIDs_[proci] == -1)
|
||||
{
|
||||
hostIDs_[proci] = hostNames.size();
|
||||
hostNames.push_back(host);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Pstream::broadcasts(UPstream::globalComm, hostIDs_);
|
||||
|
||||
const label myHostId =
|
||||
hostIDs_[Pstream::myProcNo(UPstream::globalComm)];
|
||||
|
||||
DynamicList<label> subRanks;
|
||||
forAll(hostIDs_, proci)
|
||||
{
|
||||
if (hostIDs_[proci] == myHostId)
|
||||
{
|
||||
subRanks.push_back(proci);
|
||||
}
|
||||
}
|
||||
|
||||
// Allocate new communicator with globalComm as its parent
|
||||
const label hostComm =
|
||||
UPstream::allocateCommunicator
|
||||
(
|
||||
UPstream::globalComm, // parent
|
||||
subRanks,
|
||||
true
|
||||
);
|
||||
|
||||
Pout<< nl << "Host comm with "
|
||||
<< UPstream::myProcNo(hostComm)
|
||||
<< " / " << UPstream::nProcs(hostComm)
|
||||
<< nl;
|
||||
|
||||
UPstream::freeCommunicator(hostComm, true);
|
||||
}
|
||||
|
||||
Info<< "\nEnd\n" << endl;
|
||||
|
||||
return 0;
|
||||
|
@ -1,2 +1,4 @@
|
||||
/* EXE_INC = */
|
||||
/* EXE_LIBS = */
|
||||
include $(GENERAL_RULES)/mpi-rules
|
||||
|
||||
EXE_INC = $(PFLAGS) $(PINC) $(c++LESSWARN)
|
||||
EXE_LIBS = $(PLIBS)
|
||||
|
@ -33,6 +33,7 @@ Description
|
||||
#include "polyMesh.H"
|
||||
#include "globalMeshData.H"
|
||||
#include "OFstream.H"
|
||||
#include <mpi.h>
|
||||
|
||||
using namespace Foam;
|
||||
|
||||
@ -42,11 +43,25 @@ using namespace Foam;
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
argList::noFunctionObjects();
|
||||
argList::addBoolOption("verbose", "Set debug level");
|
||||
argList::addBoolOption("comm-graph", "Test simple graph communicator");
|
||||
argList::addNote
|
||||
(
|
||||
"Create graph of OpenFOAM mesh connections"
|
||||
);
|
||||
|
||||
// Capture manually. We need values before proper startup
|
||||
int nVerbose = 0;
|
||||
for (int argi = 1; argi < argc; ++argi)
|
||||
{
|
||||
if (strcmp(argv[argi], "-verbose") == 0)
|
||||
{
|
||||
++nVerbose;
|
||||
}
|
||||
}
|
||||
|
||||
UPstream::debug = nVerbose;
|
||||
|
||||
#include "setRootCase.H"
|
||||
|
||||
if (!Pstream::parRun())
|
||||
@ -105,6 +120,127 @@ int main(int argc, char *argv[])
|
||||
<< "Use neato, circo or fdp graphviz tools" << nl;
|
||||
}
|
||||
|
||||
if (Pstream::parRun() && args.found("comm-graph"))
|
||||
{
|
||||
Info<< nl;
|
||||
|
||||
// Local neighbours
|
||||
const labelList& neighbours =
|
||||
mesh.globalData().topology().procNeighbours();
|
||||
|
||||
Pout<< "Neigbours: " << flatOutput(neighbours) << endl;
|
||||
|
||||
// As integers values
|
||||
List<int> connected(neighbours.size());
|
||||
List<int> weights(neighbours.size());
|
||||
forAll(neighbours, i)
|
||||
{
|
||||
connected[i] = neighbours[i];
|
||||
weights[i] = 1;
|
||||
}
|
||||
|
||||
MPI_Comm topoComm;
|
||||
|
||||
int mpiErrorCode =
|
||||
MPI_Dist_graph_create_adjacent
|
||||
(
|
||||
MPI_COMM_WORLD,
|
||||
// Connections into this rank
|
||||
connected.size(), connected.cdata(), MPI_UNWEIGHTED,
|
||||
// Connections out of this rank
|
||||
connected.size(), connected.cdata(), MPI_UNWEIGHTED,
|
||||
MPI_INFO_NULL,
|
||||
0, // no reordering (apparently broken anyhow)
|
||||
&topoComm
|
||||
);
|
||||
|
||||
if (mpiErrorCode)
|
||||
{
|
||||
FatalError
|
||||
<< "Failed to create topo communicator. Error:"
|
||||
<< mpiErrorCode << exit(FatalError);
|
||||
}
|
||||
|
||||
int topo_rank = 0;
|
||||
int topo_nprocs = 0;
|
||||
int topo_inCount = 0;
|
||||
int topo_outCount = 0;
|
||||
int topo_isWeighted = 0;
|
||||
MPI_Comm_rank(topoComm, &topo_rank);
|
||||
MPI_Comm_size(topoComm, &topo_nprocs);
|
||||
|
||||
{
|
||||
int topo_type = 0;
|
||||
MPI_Topo_test(topoComm, &topo_type);
|
||||
|
||||
if (MPI_CART == topo_type)
|
||||
{
|
||||
Info<< "MPI topology : Cartesian" << endl;
|
||||
}
|
||||
else if (MPI_GRAPH == topo_type)
|
||||
{
|
||||
Info<< "MPI topology : Graph" << endl;
|
||||
}
|
||||
else if (MPI_DIST_GRAPH == topo_type)
|
||||
{
|
||||
Info<< "MPI topology : Distributed graph" << endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
Info<< "MPI topology : None" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
MPI_Dist_graph_neighbors_count
|
||||
(
|
||||
topoComm,
|
||||
&topo_inCount,
|
||||
&topo_outCount,
|
||||
&topo_isWeighted
|
||||
);
|
||||
|
||||
Pout<< "Topo comm with "
|
||||
<< topo_rank << " / " << topo_nprocs
|
||||
<< " from " << connected.size() << flatOutput(connected)
|
||||
<< " numNbr:" << topo_inCount
|
||||
<< nl;
|
||||
|
||||
|
||||
List<int> myPatchIds(neighbours.size());
|
||||
forAll(myPatchIds, i)
|
||||
{
|
||||
// Patches to neighbours
|
||||
myPatchIds[i] =
|
||||
mesh.globalData().topology().procPatchLookup(neighbours[i]);
|
||||
}
|
||||
|
||||
List<int> nbrPatchIds(neighbours.size(), Zero);
|
||||
|
||||
mpiErrorCode = MPI_Neighbor_alltoall
|
||||
(
|
||||
myPatchIds.data(),
|
||||
1, // one element per neighbour
|
||||
MPI_INT,
|
||||
nbrPatchIds.data(),
|
||||
1, // one element per neighbour
|
||||
MPI_INT,
|
||||
topoComm
|
||||
);
|
||||
|
||||
if (mpiErrorCode)
|
||||
{
|
||||
FatalError
|
||||
<< "MPI Error: " << mpiErrorCode << exit(FatalError);
|
||||
}
|
||||
|
||||
Pout<< "proc neighbours:" << flatOutput(neighbours)
|
||||
<< " my patches:" << flatOutput(myPatchIds)
|
||||
<< " their patches:" << flatOutput(nbrPatchIds)
|
||||
<< endl;
|
||||
|
||||
MPI_Comm_free(&topoComm);
|
||||
}
|
||||
|
||||
Info<< nl << "End\n" << endl;
|
||||
|
||||
return 0;
|
||||
|
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2011-2017 OpenFOAM Foundation
|
||||
Copyright (C) 2015-2022 OpenCFD Ltd.
|
||||
Copyright (C) 2015-2023 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -161,11 +161,11 @@ Foam::label Foam::UPstream::allocateCommunicator
|
||||
// Extend storage
|
||||
index = parentComm_.size();
|
||||
|
||||
myProcNo_.append(-1);
|
||||
procIDs_.append(List<int>());
|
||||
parentComm_.append(-1);
|
||||
linearCommunication_.append(List<commsStruct>());
|
||||
treeCommunication_.append(List<commsStruct>());
|
||||
myProcNo_.push_back(-1);
|
||||
procIDs_.emplace_back();
|
||||
parentComm_.push_back(-1);
|
||||
linearCommunication_.emplace_back();
|
||||
treeCommunication_.emplace_back();
|
||||
}
|
||||
|
||||
if (debug)
|
||||
@ -292,7 +292,7 @@ void Foam::UPstream::freeCommunicators(const bool doPstream)
|
||||
|
||||
int Foam::UPstream::baseProcNo(label comm, int procID)
|
||||
{
|
||||
while (parent(comm) >= 0 && procID >= 0)
|
||||
while (UPstream::parent(comm) >= 0 && procID >= 0)
|
||||
{
|
||||
const auto& parentRanks = UPstream::procID(comm);
|
||||
procID = parentRanks[procID];
|
||||
@ -305,14 +305,14 @@ int Foam::UPstream::baseProcNo(label comm, int procID)
|
||||
|
||||
Foam::label Foam::UPstream::procNo(const label comm, const int baseProcID)
|
||||
{
|
||||
const auto& parentRanks = procID(comm);
|
||||
label parentComm = parent(comm);
|
||||
const auto& parentRanks = UPstream::procID(comm);
|
||||
label parentComm = UPstream::parent(comm);
|
||||
|
||||
int procID = baseProcID;
|
||||
|
||||
if (parentComm >= 0)
|
||||
{
|
||||
procID = procNo(parentComm, baseProcID);
|
||||
procID = UPstream::procNo(parentComm, baseProcID);
|
||||
}
|
||||
|
||||
return parentRanks.find(procID);
|
||||
@ -327,7 +327,7 @@ Foam::label Foam::UPstream::procNo
|
||||
)
|
||||
{
|
||||
label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
|
||||
return procNo(comm, physProcID);
|
||||
return UPstream::procNo(comm, physProcID);
|
||||
}
|
||||
|
||||
|
||||
|
@ -463,6 +463,23 @@ public:
|
||||
UPstream::Request* req = nullptr
|
||||
);
|
||||
|
||||
//- Probe for an incoming message.
|
||||
//
|
||||
// \param commsType Blocking or not
|
||||
// \param fromProcNo The source rank (negative == ANY_SOURCE)
|
||||
// \param tag The source message tag
|
||||
// \param communicator The communicator index
|
||||
//
|
||||
// \returns source rank and message size (bytes)
|
||||
// and (-1, 0) on failure
|
||||
static std::pair<int,int> probeMessage
|
||||
(
|
||||
const UPstream::commsTypes commsType,
|
||||
const int fromProcNo,
|
||||
const int tag = UPstream::msgType(),
|
||||
const label communicator = worldComm
|
||||
);
|
||||
|
||||
|
||||
// Non-blocking comms
|
||||
|
||||
@ -504,6 +521,10 @@ public:
|
||||
// or for a null-request
|
||||
static bool finishedRequest(UPstream::Request& req);
|
||||
|
||||
//- Non-blocking comms: have all requests finished?
|
||||
// A no-op and returns true if parRun() == false or list is empty
|
||||
static bool finishedRequests(UList<UPstream::Request>& requests);
|
||||
|
||||
static int allocateTag(const char* const msg = nullptr);
|
||||
static void freeTag(const int tag, const char* const msg = nullptr);
|
||||
|
||||
|
@ -98,4 +98,17 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
|
||||
{}
|
||||
|
||||
|
||||
std::pair<int,int>
|
||||
Foam::UPstream::probeMessage
|
||||
(
|
||||
const UPstream::commsTypes commsType,
|
||||
const int fromProcNo,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
return std::pair<int,int>(-1, 0);
|
||||
}
|
||||
|
||||
|
||||
// ************************************************************************* //
|
||||
|
@ -61,6 +61,10 @@ void Foam::UPstream::waitRequest(UPstream::Request&) {}
|
||||
|
||||
bool Foam::UPstream::finishedRequest(const label i) { return true; }
|
||||
bool Foam::UPstream::finishedRequest(UPstream::Request&) { return true; }
|
||||
bool Foam::UPstream::finishedRequests(UList<UPstream::Request>&)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
// ************************************************************************* //
|
||||
|
@ -6,6 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2013-2015 OpenFOAM Foundation
|
||||
Copyright (C) 2023 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -29,6 +30,8 @@ License
|
||||
|
||||
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
|
||||
|
||||
Foam::DynamicList<unsigned> Foam::PstreamGlobals::pendingMPIFree_;
|
||||
|
||||
Foam::DynamicList<MPI_Comm> Foam::PstreamGlobals::MPICommunicators_;
|
||||
Foam::DynamicList<MPI_Group> Foam::PstreamGlobals::MPIGroups_;
|
||||
|
||||
|
@ -49,11 +49,24 @@ namespace Foam
|
||||
namespace PstreamGlobals
|
||||
{
|
||||
|
||||
//- Internal enumeration to track the state of MPI_Comm, MPI_Group allocation
|
||||
// Handled as bitmasks
|
||||
enum allocationTypes : unsigned
|
||||
{
|
||||
NonePending = 0u, // No MPI free is pending
|
||||
CommPending = 1u, // MPI_Comm_free() is needed
|
||||
GroupPending = 2u // MPI_Group_free() is needed
|
||||
};
|
||||
|
||||
// Track if MPI_Comm_free and/or MPI_Group_free are pending for
|
||||
// each communicator index (indexes into MPICommunicators_, MPIGroups_)
|
||||
extern DynamicList<unsigned> pendingMPIFree_;
|
||||
|
||||
// Current communicators, which may be allocated or predefined
|
||||
// (eg, MPI_COMM_SELF, MPI_COMM_WORLD)
|
||||
extern DynamicList<MPI_Comm> MPICommunicators_;
|
||||
|
||||
// Groups associated with the currrent communicators.
|
||||
// Groups used to create communicators
|
||||
extern DynamicList<MPI_Group> MPIGroups_;
|
||||
|
||||
//- Outstanding non-blocking operations.
|
||||
|
@ -37,6 +37,7 @@ License
|
||||
#include <cstring>
|
||||
#include <cstdlib>
|
||||
#include <csignal>
|
||||
#include <numeric>
|
||||
|
||||
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
|
||||
|
||||
@ -502,15 +503,17 @@ void Foam::UPstream::allocatePstreamCommunicator
|
||||
const label index
|
||||
)
|
||||
{
|
||||
if (index == PstreamGlobals::MPIGroups_.size())
|
||||
if (index == PstreamGlobals::MPICommunicators_.size())
|
||||
{
|
||||
// Extend storage with dummy values
|
||||
MPI_Comm newComm = MPI_COMM_NULL;
|
||||
MPI_Group newGroup = MPI_GROUP_NULL;
|
||||
PstreamGlobals::MPIGroups_.push_back(newGroup);
|
||||
PstreamGlobals::MPICommunicators_.push_back(newComm);
|
||||
// Extend storage with null values
|
||||
|
||||
PstreamGlobals::
|
||||
pendingMPIFree_.emplace_back(PstreamGlobals::NonePending);
|
||||
|
||||
PstreamGlobals::MPICommunicators_.emplace_back(MPI_COMM_NULL);
|
||||
PstreamGlobals::MPIGroups_.emplace_back(MPI_GROUP_NULL);
|
||||
}
|
||||
else if (index > PstreamGlobals::MPIGroups_.size())
|
||||
else if (index > PstreamGlobals::MPICommunicators_.size())
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "PstreamGlobals out of sync with UPstream data. Problem."
|
||||
@ -530,27 +533,40 @@ void Foam::UPstream::allocatePstreamCommunicator
|
||||
<< Foam::exit(FatalError);
|
||||
}
|
||||
|
||||
PstreamGlobals::pendingMPIFree_[index] = PstreamGlobals::NonePending;
|
||||
PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
|
||||
MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
|
||||
MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_[index]);
|
||||
PstreamGlobals::MPIGroups_[index] = MPI_GROUP_NULL;
|
||||
|
||||
// TBD: MPI_Comm_dup(MPI_COMM_WORLD, ...);
|
||||
// with pendingMPIFree_[index] = CommPending ...
|
||||
// Note: freePstreamCommunicator may need an update
|
||||
|
||||
MPI_Comm_rank
|
||||
(
|
||||
PstreamGlobals::MPICommunicators_[index],
|
||||
&myProcNo_[index]
|
||||
);
|
||||
|
||||
// Set the number of ranks to the actual number
|
||||
int numProcs;
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
|
||||
MPI_Comm_size
|
||||
(
|
||||
PstreamGlobals::MPICommunicators_[index],
|
||||
&numProcs
|
||||
);
|
||||
|
||||
//procIDs_[index] = identity(numProcs);
|
||||
// identity [0-numProcs], as 'int'
|
||||
procIDs_[index].resize_nocopy(numProcs);
|
||||
forAll(procIDs_[index], i)
|
||||
{
|
||||
procIDs_[index][i] = i;
|
||||
}
|
||||
std::iota(procIDs_[index].begin(), procIDs_[index].end(), 0);
|
||||
}
|
||||
else if (parentIndex == -2)
|
||||
{
|
||||
// Self communicator
|
||||
|
||||
PstreamGlobals::pendingMPIFree_[index] = PstreamGlobals::NonePending;
|
||||
PstreamGlobals::MPICommunicators_[index] = MPI_COMM_SELF;
|
||||
MPI_Comm_group(MPI_COMM_SELF, &PstreamGlobals::MPIGroups_[index]);
|
||||
PstreamGlobals::MPIGroups_[index] = MPI_GROUP_NULL;
|
||||
|
||||
MPI_Comm_rank(MPI_COMM_SELF, &myProcNo_[index]);
|
||||
|
||||
// Number of ranks is always 1 (self communicator)
|
||||
@ -573,6 +589,11 @@ void Foam::UPstream::allocatePstreamCommunicator
|
||||
}
|
||||
else
|
||||
{
|
||||
// General sub-communicator
|
||||
|
||||
PstreamGlobals::pendingMPIFree_[index]
|
||||
= (PstreamGlobals::CommPending | PstreamGlobals::GroupPending);
|
||||
|
||||
// Create new group
|
||||
MPI_Group_incl
|
||||
(
|
||||
@ -603,7 +624,10 @@ void Foam::UPstream::allocatePstreamCommunicator
|
||||
|
||||
if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
|
||||
{
|
||||
// No communicator created, group only
|
||||
myProcNo_[index] = -1;
|
||||
PstreamGlobals::
|
||||
pendingMPIFree_[index] = PstreamGlobals::GroupPending;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -629,30 +653,48 @@ void Foam::UPstream::allocatePstreamCommunicator
|
||||
}
|
||||
|
||||
|
||||
void Foam::UPstream::freePstreamCommunicator(const label communicator)
|
||||
void Foam::UPstream::freePstreamCommunicator(const label index)
|
||||
{
|
||||
// Skip placeholders and pre-defined (not allocated) communicators
|
||||
|
||||
if (UPstream::debug)
|
||||
{
|
||||
Pout<< "freePstreamCommunicator: " << communicator
|
||||
Pout<< "freePstreamCommunicator: " << index
|
||||
<< " from " << PstreamGlobals::MPICommunicators_.size() << endl;
|
||||
}
|
||||
|
||||
// Not touching the first two communicators (SELF, WORLD)
|
||||
if (communicator > 1)
|
||||
if (index > 1)
|
||||
{
|
||||
if (MPI_COMM_NULL != PstreamGlobals::MPICommunicators_[communicator])
|
||||
if
|
||||
(
|
||||
(MPI_COMM_NULL != PstreamGlobals::MPICommunicators_[index])
|
||||
&&
|
||||
(
|
||||
PstreamGlobals::pendingMPIFree_[index]
|
||||
& PstreamGlobals::CommPending
|
||||
)
|
||||
)
|
||||
{
|
||||
// Free communicator. Sets communicator to MPI_COMM_NULL
|
||||
MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
|
||||
MPI_Comm_free(&PstreamGlobals::MPICommunicators_[index]);
|
||||
}
|
||||
|
||||
if (MPI_GROUP_NULL != PstreamGlobals::MPIGroups_[communicator])
|
||||
if
|
||||
(
|
||||
(MPI_GROUP_NULL != PstreamGlobals::MPIGroups_[index])
|
||||
&&
|
||||
(
|
||||
PstreamGlobals::pendingMPIFree_[index]
|
||||
& PstreamGlobals::GroupPending
|
||||
)
|
||||
)
|
||||
{
|
||||
// Free group. Sets group to MPI_GROUP_NULL
|
||||
MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
|
||||
MPI_Group_free(&PstreamGlobals::MPIGroups_[index]);
|
||||
}
|
||||
|
||||
PstreamGlobals::pendingMPIFree_[index] = PstreamGlobals::NonePending;
|
||||
}
|
||||
}
|
||||
|
||||
@ -705,6 +747,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
|
||||
{
|
||||
MPI_Request request;
|
||||
|
||||
// Non-blocking
|
||||
if
|
||||
(
|
||||
MPI_Ibarrier
|
||||
@ -723,6 +766,7 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
|
||||
}
|
||||
else
|
||||
{
|
||||
// Blocking
|
||||
if
|
||||
(
|
||||
MPI_Barrier
|
||||
@ -739,4 +783,77 @@ void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
|
||||
}
|
||||
|
||||
|
||||
std::pair<int,int>
|
||||
Foam::UPstream::probeMessage
|
||||
(
|
||||
const UPstream::commsTypes commsType,
|
||||
const int fromProcNo,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
std::pair<int,int> result(-1, 0);
|
||||
|
||||
if (!UPstream::parRun())
|
||||
{
|
||||
return result;
|
||||
}
|
||||
|
||||
const int source = (fromProcNo < 0) ? MPI_ANY_SOURCE : fromProcNo;
|
||||
// Supporting MPI_ANY_TAG is not particularly useful...
|
||||
|
||||
int flag = 0;
|
||||
MPI_Status status;
|
||||
|
||||
if (UPstream::commsTypes::blocking == commsType)
|
||||
{
|
||||
// Blocking
|
||||
if
|
||||
(
|
||||
MPI_Probe
|
||||
(
|
||||
source,
|
||||
tag,
|
||||
PstreamGlobals::MPICommunicators_[comm],
|
||||
&status
|
||||
)
|
||||
)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "MPI_Probe returned with error"
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
flag = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Non-blocking
|
||||
if
|
||||
(
|
||||
MPI_Iprobe
|
||||
(
|
||||
source,
|
||||
tag,
|
||||
PstreamGlobals::MPICommunicators_[comm],
|
||||
&flag,
|
||||
&status
|
||||
)
|
||||
)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "MPI_Iprobe returned with error"
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
}
|
||||
|
||||
if (flag)
|
||||
{
|
||||
result.first = status.MPI_SOURCE;
|
||||
MPI_Get_count(&status, MPI_BYTE, &result.second);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
// ************************************************************************* //
|
||||
|
@ -138,21 +138,24 @@ void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
|
||||
|
||||
for (auto& req : requests)
|
||||
{
|
||||
if (req.good())
|
||||
MPI_Request request = PstreamDetail::Request::get(req);
|
||||
|
||||
if (MPI_REQUEST_NULL != request)
|
||||
{
|
||||
waitRequests[count] = PstreamDetail::Request::get(req);
|
||||
waitRequests[count] = request;
|
||||
++count;
|
||||
}
|
||||
}
|
||||
|
||||
if (!count)
|
||||
{
|
||||
// Early exit: non-NULL requests found
|
||||
return;
|
||||
}
|
||||
|
||||
profilingPstream::beginTiming();
|
||||
|
||||
// On success: sets request to MPI_REQUEST_NULL
|
||||
// On success: sets each request to MPI_REQUEST_NULL
|
||||
if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
|
||||
{
|
||||
FatalErrorInFunction
|
||||
@ -163,10 +166,7 @@ void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
|
||||
profilingPstream::addWaitTime();
|
||||
|
||||
// Everything handled, reset all to MPI_REQUEST_NULL
|
||||
for (auto& req : requests)
|
||||
{
|
||||
req.reset();
|
||||
}
|
||||
requests = UPstream::Request(MPI_REQUEST_NULL);
|
||||
}
|
||||
|
||||
|
||||
@ -190,14 +190,16 @@ void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
|
||||
/// waitRequests[count] = PstreamDetail::Request::get(req1);
|
||||
/// if (MPI_REQUEST_NULL != waitRequests[count])
|
||||
/// {
|
||||
/// req1.reset();
|
||||
/// // Flag in advance as being handled
|
||||
/// req1 = UPstream::Request(MPI_REQUEST_NULL);
|
||||
/// ++count;
|
||||
/// }
|
||||
///
|
||||
/// waitRequests[count] = PstreamDetail::Request::get(req2);
|
||||
/// if (MPI_REQUEST_NULL != waitRequests[count])
|
||||
/// {
|
||||
/// req2.reset();
|
||||
/// // Flag in advance as being handled
|
||||
/// req2 = UPstream::Request(MPI_REQUEST_NULL);
|
||||
/// ++count;
|
||||
/// }
|
||||
///
|
||||
@ -208,7 +210,7 @@ void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
|
||||
///
|
||||
/// profilingPstream::beginTiming();
|
||||
///
|
||||
/// // On success: sets request to MPI_REQUEST_NULL
|
||||
/// // On success: sets each request to MPI_REQUEST_NULL
|
||||
/// if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
|
||||
/// {
|
||||
/// FatalErrorInFunction
|
||||
@ -297,7 +299,8 @@ void Foam::UPstream::waitRequest(UPstream::Request& req)
|
||||
|
||||
profilingPstream::addWaitTime();
|
||||
|
||||
req.reset(); // Handled, reset to MPI_REQUEST_NULL
|
||||
// Handled, reset to MPI_REQUEST_NULL
|
||||
req = UPstream::Request(MPI_REQUEST_NULL);
|
||||
}
|
||||
|
||||
|
||||
@ -363,8 +366,69 @@ bool Foam::UPstream::finishedRequest(UPstream::Request& req)
|
||||
|
||||
if (flag)
|
||||
{
|
||||
// Done - reset to MPI_REQUEST_NULL
|
||||
req.reset();
|
||||
// Success: reset request to MPI_REQUEST_NULL
|
||||
req = UPstream::Request(MPI_REQUEST_NULL);
|
||||
}
|
||||
|
||||
return flag != 0;
|
||||
}
|
||||
|
||||
|
||||
bool Foam::UPstream::finishedRequests(UList<UPstream::Request>& requests)
|
||||
{
|
||||
// No-op for non-parallel or no pending requests
|
||||
if (!UPstream::parRun() || requests.empty())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// Looks ugly but is legitimate since UPstream::Request is an intptr_t,
|
||||
// which is always large enough to hold an MPI_Request (int or pointer)
|
||||
|
||||
label count = 0;
|
||||
auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
|
||||
|
||||
for (auto& req : requests)
|
||||
{
|
||||
MPI_Request request = PstreamDetail::Request::get(req);
|
||||
|
||||
if (MPI_REQUEST_NULL != request)
|
||||
{
|
||||
waitRequests[count] = request;
|
||||
++count;
|
||||
}
|
||||
}
|
||||
|
||||
if (!count)
|
||||
{
|
||||
// Early exit: non-NULL requests found
|
||||
return true;
|
||||
}
|
||||
|
||||
// On success: sets each request to MPI_REQUEST_NULL
|
||||
// On failure: no request is modified
|
||||
int flag = 0;
|
||||
MPI_Testall(count, waitRequests, &flag, MPI_STATUSES_IGNORE);
|
||||
|
||||
if (flag)
|
||||
{
|
||||
// Success: reset all requests to MPI_REQUEST_NULL
|
||||
requests = UPstream::Request(MPI_REQUEST_NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Not all done. Recover wrapped representation but in reverse order
|
||||
// since sizeof(MPI_Request) can be smaller than
|
||||
// sizeof(UPstream::Request::value_type)
|
||||
// eg, mpich has MPI_Request as 'int'
|
||||
//
|
||||
// This is uglier that we'd like, but much better than allocating
|
||||
// and freeing a scratch buffer each time we query things.
|
||||
|
||||
while (--count >= 0)
|
||||
{
|
||||
requests[count] = UPstream::Request(waitRequests[count]);
|
||||
}
|
||||
}
|
||||
|
||||
return flag != 0;
|
||||
|
Loading…
Reference in New Issue
Block a user