ENH: properly trap any sub-rank changes in allocatePstreamCommunicator

- not currently used, but it is possible that communicator allocation
  modifies the list of sub-ranks. Ensure that the correct size is used
  when (re)initialising the linear/tree structures.

STYLE: adjust MPI test applications

- remove some clutter and unneeded grouping.
  Some ideas for host-only communicators
This commit is contained in:
Mark Olesen 2023-03-16 13:02:36 +01:00
parent d51967d728
commit d826f9259f
3 changed files with 282 additions and 115 deletions

View File

@ -39,6 +39,7 @@ Description
#include "Tuple2.H"
#include "IOstreams.H"
#include "PstreamReduceOps.H"
#include "SHA1.H"
#include <mpi.h>
using namespace Foam;
@ -99,51 +100,61 @@ int main(int argc, char *argv[])
labelList subRanks;
UPstream::communicator newComm;
#if 1
// With first ranks
subRanks = identity(UPstream::nProcs(UPstream::commWorld()) / 2);
newComm.reset(UPstream::commWorld(), subRanks);
localRanki = UPstream::myProcNo(newComm);
Pout.prefix() =
(
'[' + Foam::name(myProci) + " a:" + Foam::name(localRanki) + "] "
);
Info<< "procIDs: "
<< flatOutput(UPstream::procID(newComm)) << endl;
rankInfo(newComm);
Pout<< endl;
#endif
#if 1
// With every other rank
subRanks = identity(UPstream::nProcs(UPstream::commWorld()));
for (label& val : subRanks)
if (!args.found("comm-split") && !args.found("host-comm"))
{
if (val % 2) val = -1;
#if 1
// With first ranks
subRanks = identity(UPstream::nProcs(UPstream::commWorld()) / 2);
newComm.reset(UPstream::commWorld(), subRanks);
localRanki = UPstream::myProcNo(newComm);
Pout.prefix() =
(
'[' + Foam::name(myProci) + " a:" + Foam::name(localRanki) + "] "
);
Info<< "procIDs: "
<< flatOutput(UPstream::procID(newComm)) << endl;
rankInfo(newComm);
Pout<< endl;
#endif
#if 1
// With every other rank
subRanks = identity(UPstream::nProcs(UPstream::commWorld()));
for (label& val : subRanks)
{
if (val % 2) val = -1;
}
newComm.reset(UPstream::commWorld(), subRanks);
localRanki = UPstream::myProcNo(newComm);
Pout.prefix() =
(
'[' + Foam::name(myProci) + " b:" + Foam::name(localRanki) + "] "
);
Info<< "procIDs: "
<< flatOutput(UPstream::procID(newComm)) << endl;
rankInfo(newComm);
Pout<< endl;
#endif
}
newComm.reset(UPstream::commWorld(), subRanks);
localRanki = UPstream::myProcNo(newComm);
Pout.prefix() =
(
'[' + Foam::name(myProci) + " b:" + Foam::name(localRanki) + "] "
);
Info<< "procIDs: "
<< flatOutput(UPstream::procID(newComm)) << endl;
rankInfo(newComm);
Pout<< endl;
#endif
if (Pstream::parRun() && args.found("comm-split"))
{
int world_nprocs = 0;
int world_rank = -1;
MPI_Comm_size(MPI_COMM_WORLD, &world_nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int host_nprocs = 0;
int host_rank = -1;
MPI_Comm hostComm;
MPI_Comm_split_type
(
@ -152,15 +163,98 @@ int main(int argc, char *argv[])
0, MPI_INFO_NULL, &hostComm
);
int host_nprocs = 0;
int host_rank = 0;
MPI_Comm_size(hostComm, &host_nprocs);
MPI_Comm_rank(hostComm, &host_rank);
Pout<< nl << "Host comm with "
<< host_rank << " / " << host_nprocs
<< " (using MPI_Comm_split_type)" << endl;
int leader_nprocs = 0;
int leader_rank = -1;
MPI_Comm hostMasterComm;
if (false)
{
// Easy enough to use MPI_Comm_split, but slightly annoying
// that it returns MPI_COMM_NULL for unused ranks...
MPI_Comm hostMasterComm;
MPI_Comm_split
(
MPI_COMM_WORLD,
(host_rank == 0) ? 0 : MPI_UNDEFINED,
0, &hostMasterComm
);
if (hostMasterComm != MPI_COMM_NULL)
{
MPI_Comm_size(hostMasterComm, &leader_nprocs);
MPI_Comm_rank(hostMasterComm, &leader_rank);
}
}
else
{
boolList isHostLeader(world_nprocs, false);
isHostLeader[world_rank] = (host_rank == 0);
MPI_Allgather
(
// recv is also send
MPI_IN_PLACE, 1, MPI_C_BOOL,
isHostLeader.data(), 1, MPI_C_BOOL,
MPI_COMM_WORLD
);
Pout<< "leaders: " << isHostLeader << endl;
DynamicList<int> subRanks(isHostLeader.size());
forAll(isHostLeader, proci)
{
if (isHostLeader[proci])
{
subRanks.push_back(proci);
}
}
// Starting from parent
MPI_Group parent_group;
MPI_Comm_group(MPI_COMM_WORLD, &parent_group);
MPI_Group active_group;
MPI_Group_incl
(
parent_group,
subRanks.size(),
subRanks.cdata(),
&active_group
);
// Create new communicator for this group
MPI_Comm_create_group
(
MPI_COMM_WORLD,
active_group,
UPstream::msgType(),
&hostMasterComm
);
// Groups not needed after this...
MPI_Group_free(&parent_group);
MPI_Group_free(&active_group);
MPI_Comm_size(hostMasterComm, &leader_nprocs);
MPI_Comm_rank(hostMasterComm, &leader_rank);
}
Pout<< nl << "[MPI_Comm_split_type]" << nl
<< "Host comm with " << host_rank << " / " << host_nprocs
<< " on " << hostName()
<< " master:" << (host_rank == 0)
<< " leader rank: " << leader_rank
<< " / " << leader_nprocs
<< " host leader:" << (leader_rank == 0)
<< " sub-rank:" << (leader_rank > 0)
<< nl;
if (hostMasterComm != MPI_COMM_NULL)
{
MPI_Comm_free(&hostMasterComm);
}
MPI_Comm_free(&hostComm);
}
if (Pstream::parRun() && args.found("host-comm"))
@ -171,16 +265,56 @@ int main(int argc, char *argv[])
label numprocs = UPstream::nProcs(UPstream::commGlobal());
// Option 1: using hostnames
// - pro: trivial coding
// - con: unequal lengths, more allocations and 'hops'
stringList hosts(numprocs);
hosts[Pstream::myProcNo(UPstream::commGlobal())] = hostName();
Pstream::gatherList(hosts, UPstream::msgType(), UPstream::commGlobal());
labelList hostIDs_;
// Option 2: using SHA1 of hostnames
// - con: uglier coding (but only needed locally!)
// - pro: fixed digest length enables direct MPI calls
// can avoid Pstream::gatherList() during setup...
SHA1Digest myHostDigest(SHA1(hostName()).digest());
List<SHA1Digest> digests;
if (UPstream::master(UPstream::commGlobal()))
{
digests.resize(numprocs);
}
UPstream::mpiGather
(
reinterpret_cast<const char*>(myHostDigest.cdata_bytes()),
SHA1Digest::max_size(), // Num send per proc
digests.data_bytes(), // Recv
SHA1Digest::max_size(), // Num recv per proc
UPstream::commGlobal()
);
// MPI_Gather
// (
// myHostDigest.cdata_bytes(), // Send
// SHA1Digest::max_size(), // Num send per proc
// MPI_BYTE,
// digests.data_bytes(), // Recv
// SHA1Digest::max_size(), // Num recv per proc
// MPI_BYTE,
// 0, // root
// MPI_COMM_WORLD
// );
Info<< "digests: " << digests << nl;
labelList hostIDs_(numprocs);
// Compact
if (Pstream::master(UPstream::commGlobal()))
if (UPstream::master(UPstream::commGlobal()))
{
DynamicList<word> hostNames(numprocs);
hostIDs_.resize_nocopy(numprocs);
forAll(hosts, proci)
{
@ -188,20 +322,72 @@ int main(int argc, char *argv[])
hostIDs_[proci] = hostNames.find(host);
if (hostIDs_[proci] == -1)
if (hostIDs_[proci] < 0)
{
hostIDs_[proci] = hostNames.size();
// First appearance of host (encode as leader)
hostIDs_[proci] = -(hostNames.size() + 1);
hostNames.push_back(host);
}
}
DynamicList<SHA1Digest> hostDigest(numprocs);
forAll(digests, proci)
{
const SHA1Digest& dig = digests[proci];
hostIDs_[proci] = hostDigest.find(dig);
if (hostIDs_[proci] < 0)
{
// First appearance of host (encode as leader)
hostIDs_[proci] = -(hostDigest.size() + 1);
hostDigest.push_back(dig);
}
}
}
Pstream::broadcasts(UPstream::commGlobal(), hostIDs_);
Info<< "hosts = " << hosts << endl;
Info<< "hostIDs_ = " << hostIDs_ << endl;
UPstream::broadcast
(
hostIDs_.data_bytes(),
hostIDs_.size_bytes(),
UPstream::commGlobal(),
UPstream::masterNo()
);
DynamicList<label> subRanks(numprocs);
// Ranks for world to hostMaster
forAll(hostIDs_, proci)
{
// Is host leader?
if (hostIDs_[proci] < 0)
{
subRanks.push_back(proci);
// Flip back to generic host id
hostIDs_[proci] = -(hostIDs_[proci] + 1);
}
}
// From world to hostMaster
const label hostMasterComm =
UPstream::allocateCommunicator
(
UPstream::commGlobal(),
subRanks,
true
);
const label myHostId =
hostIDs_[Pstream::myProcNo(UPstream::commGlobal())];
DynamicList<label> subRanks;
// Ranks for within a host
subRanks.clear();
forAll(hostIDs_, proci)
{
if (hostIDs_[proci] == myHostId)
@ -210,7 +396,7 @@ int main(int argc, char *argv[])
}
}
// Allocate new communicator with global communicator as its parent
// The intra-host ranks
const label hostComm =
UPstream::allocateCommunicator
(
@ -219,11 +405,19 @@ int main(int argc, char *argv[])
true
);
Pout<< nl << "Host comm with "
Pout<< nl << "[manual split]" << nl
<< nl << "Host comm with "
<< UPstream::myProcNo(hostComm)
<< " / " << UPstream::nProcs(hostComm)
<< " on " << hostName()
<< " master:" << UPstream::master(hostComm)
<< " leader rank: " << UPstream::myProcNo(hostMasterComm)
<< " / " << UPstream::nProcs(hostMasterComm)
<< " host leader:" << UPstream::master(hostMasterComm)
<< " sub-rank:" << UPstream::is_subrank(hostMasterComm)
<< nl;
UPstream::freeCommunicator(hostMasterComm, true);
UPstream::freeCommunicator(hostComm, true);
}

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2019-2022 OpenCFD Ltd.
Copyright (C) 2019-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -52,11 +52,6 @@ bool startMPI()
int nprocs[3];
int rank[3];
int group_nprocs[3];
int group_rank[3];
MPI_Group mpiGroup;
MPI_Init(nullptr, nullptr);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs[worldComm]);
@ -65,62 +60,27 @@ bool startMPI()
const bool isMaster = (rank[worldComm] == 0);
const string prefix = '[' + Foam::name(rank[worldComm]) + "] ";
MPI_Comm_group(MPI_COMM_WORLD, &mpiGroup);
MPI_Group_size(mpiGroup, &group_nprocs[worldComm]);
MPI_Group_rank(mpiGroup, &group_rank[worldComm]);
if (isMaster && nprocs[worldComm])
{
std::cout
<< nl << "Using MPI with " << nprocs[worldComm]
<< " procs, group:"
<< group_nprocs[worldComm] << nl
<< "World group: " << Foam::name(mpiGroup) << nl
<< nl << "Using MPI with "
<< nprocs[worldComm] << " procs" << nl
<< nl;
}
MPI_Comm worldMpiComm;
MPI_Comm_dup(MPI_COMM_WORLD, &worldMpiComm);
MPI_Comm_group(MPI_COMM_WORLD, &mpiGroup);
if (isMaster && nprocs[worldComm])
{
std::cout
<< "dup comm group: " << Foam::name(mpiGroup) << nl;
}
MPI_Comm_free(&worldMpiComm);
// May be a bad idea
MPI_Group_free(&mpiGroup);
MPI_Comm_size(MPI_COMM_SELF, &nprocs[selfComm]);
MPI_Comm_rank(MPI_COMM_SELF, &rank[selfComm]);
MPI_Comm_group(MPI_COMM_SELF, &mpiGroup);
MPI_Group_size(mpiGroup, &group_nprocs[selfComm]);
MPI_Group_rank(mpiGroup, &group_rank[selfComm]);
if (isMaster && nprocs[worldComm])
{
std::cout
<< nl
<< "Self group: " << Foam::name(mpiGroup) << nl;
}
// Should be a bad idea
MPI_Group_free(&mpiGroup);
// if (nprocs && isMaster)
{
std::cout
<< prefix
<< "Self: " << rank[selfComm] << " from " << nprocs[selfComm]
<< " procs, group:"
<< group_nprocs[selfComm] << nl;
<< "Self: " << rank[selfComm]
<< " from " << nprocs[selfComm] << " procs" << nl;
}
if (isMaster)

View File

@ -155,6 +155,14 @@ Foam::label Foam::UPstream::allocateCommunicator
// LIFO pop
index = freeComms_.back();
freeComms_.pop_back();
// Reset existing
myProcNo_[index] = -1;
parentComm_[index] = parentIndex;
procIDs_[index].clear();
linearCommunication_[index].clear();
treeCommunication_[index].clear();
}
else
{
@ -162,8 +170,9 @@ Foam::label Foam::UPstream::allocateCommunicator
index = parentComm_.size();
myProcNo_.push_back(-1);
parentComm_.push_back(parentIndex);
procIDs_.emplace_back();
parentComm_.push_back(-1);
linearCommunication_.emplace_back();
treeCommunication_.emplace_back();
}
@ -213,8 +222,6 @@ Foam::label Foam::UPstream::allocateCommunicator
procIds.resize(numSubRanks);
parentComm_[index] = parentIndex;
// Size but do not fill structure - this is done on-the-fly
linearCommunication_[index] = List<commsStruct>(numSubRanks);
treeCommunication_[index] = List<commsStruct>(numSubRanks);
@ -236,6 +243,14 @@ Foam::label Foam::UPstream::allocateCommunicator
/// myProcNo_[index] = -(myProcNo_[parentIndex]+1);
/// }
/// }
// Did communicator allocation adjust procIDs_ as well?
if (numSubRanks != procIDs_[index].size())
{
numSubRanks = procIDs_[index].size();
linearCommunication_[index] = List<commsStruct>(numSubRanks);
treeCommunication_[index] = List<commsStruct>(numSubRanks);
}
}
return index;
@ -268,8 +283,8 @@ void Foam::UPstream::freeCommunicator
}
myProcNo_[communicator] = -1;
//procIDs_[communicator].clear();
parentComm_[communicator] = -1;
//procIDs_[communicator].clear();
linearCommunication_[communicator].clear();
treeCommunication_[communicator].clear();
@ -449,26 +464,24 @@ bool Foam::UPstream::haveThreads_(false);
int Foam::UPstream::msgType_(1);
Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIDs_(10);
Foam::DynamicList<Foam::label> Foam::UPstream::parentComm_(10);
Foam::DynamicList<Foam::label> Foam::UPstream::freeComms_;
Foam::wordList Foam::UPstream::allWorlds_(Foam::one{}, "");
Foam::labelList Foam::UPstream::worldIDs_(Foam::one{}, 0);
Foam::DynamicList<Foam::List<Foam::UPstream::commsStruct>>
Foam::UPstream::linearCommunication_(10);
Foam::DynamicList<int> Foam::UPstream::myProcNo_(16);
Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIDs_(16);
Foam::DynamicList<Foam::label> Foam::UPstream::parentComm_(16);
Foam::DynamicList<Foam::label> Foam::UPstream::freeComms_;
Foam::DynamicList<Foam::List<Foam::UPstream::commsStruct>>
Foam::UPstream::treeCommunication_(10);
Foam::UPstream::linearCommunication_(16);
Foam::DynamicList<Foam::List<Foam::UPstream::commsStruct>>
Foam::UPstream::treeCommunication_(16);
Foam::label Foam::UPstream::worldComm(0);
Foam::label Foam::UPstream::warnComm(-1);