ENH: expand UPstream communicator support

- split/duplicate functionality

- rework inter-node/intra-node handling to allow selection of
  splitting based on 'shared' or hostname (default).

- always creates node communicators at startup:
  * commInterNode() - between nodes
  * commLocalNode() - within a node

- world-comm is now always a duplicate of MPI_COMM_WORLD to provide
  better separation from other processes.

NB:
   the inter-node comm is a slight exception to other communicators
   in that we always retain its list of (global) ranks, even if the
   local process is not in that communicator.
   This can help when constructing topology-aware patterns.

FIX: non-participating ranks still had knowledge of their potential siblings

- after create by group, the procIDs_ of non-participating ranks
  should be empty (except for the inter-node exception)
This commit is contained in:
Mark Olesen 2025-02-07 13:51:39 +01:00
parent 579f8ef7c6
commit 831a55f1ba
32 changed files with 2270 additions and 1087 deletions

View File

@ -0,0 +1,3 @@
Test-globalIndex3.cxx
EXE = $(FOAM_USER_APPBIN)/Test-globalIndex3

View File

@ -0,0 +1,4 @@
include $(GENERAL_RULES)/mpi-rules
EXE_INC = $(PFLAGS) $(PINC)
EXE_LIBS = $(PLIBS)

View File

@ -0,0 +1,578 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application
Test-globalIndex3
Description
Tests for globalIndex with node-wise splitting
\*---------------------------------------------------------------------------*/
#include "globalIndex.H"
#include "globalMeshData.H"
#include "argList.H"
#include "Time.H"
#include "polyMesh.H"
#include "IndirectList.H"
#include "IOstreams.H"
#include "Random.H"
#include "openfoam_mpi.H"
// pre-scan for "-split-size NUM"
int option_splitsize(int argc, char *argv[])
{
int ivalue = -1;
for (int argi = 1; argi < argc-1; ++argi)
{
if (strcmp(argv[argi], "-split-size") == 0)
{
++argi;
ivalue = atoi(argv[argi]);
}
}
return ivalue;
}
using namespace Foam;
template<class T>
void printList(Ostream& os, const UList<T>& list)
{
os << list.size() << " " << flatOutput(list) << nl;
}
void printGlobalIndex(Ostream& os, const globalIndex& gi)
{
printList(os, gi.offsets());
}
template<class ProcIDsContainer, class Type>
void globalIndexGather
(
const labelUList& off, // needed on master only
const label comm,
const ProcIDsContainer& procIDs,
const UList<Type>& fld,
UList<Type>& allFld, // must be adequately sized on master
const int tag,
UPstream::commsTypes commsType,
bool useWindow = false
)
{
// low-level: no parRun guard
const int masterProci = procIDs.size() ? procIDs[0] : 0;
// Protection for disjoint calls
if (FOAM_UNLIKELY(!UPstream::is_rank(comm)))
{
FatalErrorInFunction
<< "Calling with process not on the communicator"
<< Foam::abort(FatalError);
}
// Require contiguous data for non-blocking
if constexpr (!is_contiguous_v<Type>)
{
if (commsType == UPstream::commsTypes::nonBlocking)
{
commsType = UPstream::commsTypes::scheduled;
}
}
const label startOfRequests = UPstream::nRequests();
// Very hard-coded at the moment
int returnCode = MPI_SUCCESS;
const int nCmpts = pTraits<Type>::nComponents;
MPI_Win win;
MPI_Datatype dataType = MPI_DOUBLE;
if (useWindow)
{
using cmptType = typename pTraits<Type>::cmptType;
if (std::is_same<float, cmptType>::value)
{
dataType = MPI_FLOAT;
}
else if (std::is_same<double, cmptType>::value)
{
dataType = MPI_DOUBLE;
}
else
{
// Not supported
useWindow = false;
}
}
if (useWindow)
{
MPI_Comm mpiComm =
PstreamUtils::Cast::to_mpi(UPstream::Communicator::lookup(comm));
char commName[MPI_MAX_OBJECT_NAME];
int nameLen = 0;
if
(
MPI_COMM_NULL != mpiComm
&& MPI_SUCCESS == MPI_Comm_get_name(mpiComm, commName, &nameLen)
&& (nameLen > 0)
)
{
Pout<< "window on " << commName << nl;
}
if (UPstream::myProcNo(comm) == masterProci || fld.empty())
{
// Collective
returnCode = MPI_Win_create
(
nullptr,
0,
1, // disp_units
MPI_INFO_NULL,
mpiComm,
&win
);
}
else
{
// Collective
returnCode = MPI_Win_create
(
const_cast<char *>(fld.cdata_bytes()),
fld.size_bytes(),
sizeof(Type), // disp_units
MPI_INFO_NULL,
mpiComm,
&win
);
}
if (MPI_SUCCESS != returnCode || MPI_WIN_NULL == win)
{
FatalErrorInFunction
<< "MPI_Win_create() failed"
<< Foam::abort(FatalError);
// return nullptr;
}
}
if (UPstream::myProcNo(comm) == masterProci)
{
const label total = off.back(); // == totalSize()
if (allFld.size() < total)
{
FatalErrorInFunction
<< "[out] UList size=" << allFld.size()
<< " too small to receive " << total << nl
<< Foam::abort(FatalError);
}
// Assign my local data - respect offset information
// so that we can request 0 entries to be copied.
// Also handle the case where we have a slice of the full
// list.
{
SubList<Type> dst(allFld, off[1]-off[0], off[0]);
SubList<Type> src(fld, off[1]-off[0]);
if (!dst.empty() && (dst.data() != src.data()))
{
dst = src;
}
}
if (useWindow)
{
MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
}
for (label i = 1; i < procIDs.size(); ++i)
{
SubList<Type> slot(allFld, off[i+1]-off[i], off[i]);
if (slot.empty())
{
// Nothing to do
}
else if (useWindow)
{
returnCode = MPI_Get
(
// origin
slot.data(),
slot.size()*(nCmpts),
dataType,
// target
procIDs[i],
0, // displacement
slot.size()*(nCmpts),
dataType,
win
);
if (MPI_SUCCESS != returnCode)
{
FatalErrorInFunction
<< "MPI_Get failed"
<< Foam::abort(FatalError);
// return nullptr;
}
}
else if constexpr (is_contiguous_v<Type>)
{
UIPstream::read
(
commsType,
procIDs[i],
slot,
tag,
comm
);
}
else
{
IPstream::recv(slot, procIDs[i], tag, comm);
}
}
if (useWindow)
{
MPI_Win_unlock_all(win);
}
}
else if (!useWindow)
{
if (fld.empty())
{
// Nothing to do
}
else if constexpr (is_contiguous_v<Type>)
{
UOPstream::write
(
commsType,
masterProci,
fld,
tag,
comm
);
}
else
{
OPstream::send(fld, commsType, masterProci, tag, comm);
}
}
if (useWindow)
{
// Collective
MPI_Win_free(&win);
}
if (commsType == UPstream::commsTypes::nonBlocking)
{
// Wait for outstanding requests
UPstream::waitRequests(startOfRequests);
}
}
// Report inter-node/intra-node offsets
static void reportOffsets(const globalIndex& gi)
{
labelList interNodeOffsets;
labelList localNodeOffsets;
labelRange nodeRange;
const label numProc = UPstream::nProcs(UPstream::commConstWorld());
gi.splitNodeOffsets
(
interNodeOffsets,
localNodeOffsets,
UPstream::worldComm
);
const auto interNodeComm = UPstream::commInterNode();
// Only communicate to the node leaders
labelList allOffsets;
if (UPstream::is_rank(interNodeComm))
{
// Send top-level offsets to the node leaders
if (UPstream::master(interNodeComm))
{
allOffsets = gi.offsets();
}
else // ie, UPstream::is_subrank(interNodeComm)
{
allOffsets.resize_nocopy(numProc+1);
}
UPstream::broadcast
(
allOffsets.data_bytes(),
allOffsets.size_bytes(),
interNodeComm
);
}
// Ranges (node leaders only)
if (UPstream::is_rank(interNodeComm))
{
const auto& procIds = UPstream::procID(interNodeComm);
const int ranki = UPstream::myProcNo(interNodeComm);
// For reporting
nodeRange.reset
(
procIds[ranki],
(
(ranki+1 < procIds.size() ? procIds[ranki+1] : numProc)
- procIds[ranki]
)
);
}
Pout<< "node-range: " << nodeRange << nl;
Pout<< "all-offset: "; printList(Pout, allOffsets);
Pout<< "inter-offset: "; printList(Pout, interNodeOffsets);
Pout<< "intra-offset: "; printList(Pout, localNodeOffsets);
}
template<class Type>
void globalIndexGather
(
const globalIndex& gi,
const UList<Type>& sendData,
List<Type>& allData,
const int tag,
const UPstream::commsTypes commsType,
const label comm = UPstream::worldComm,
bool useWindow = false
)
{
if (!UPstream::parRun())
{
// Serial: direct copy
allData = sendData;
return;
}
if (UPstream::master(comm))
{
allData.resize_nocopy(gi.offsets().back()); // == totalSize()
}
else
{
allData.clear(); // zero-size on non-master
}
const auto& offsets = gi.offsets(); // needed on master only
Info<< "Using node-comms: " << UPstream::usingNodeComms(comm) << nl;
const auto interNodeComm = UPstream::commInterNode();
const auto localNodeComm = UPstream::commLocalNode();
if (UPstream::usingNodeComms(comm))
{
// Stage 0 : The inter-node/intra-node offsets
labelList interNodeOffsets;
labelList localNodeOffsets;
gi.splitNodeOffsets(interNodeOffsets, localNodeOffsets, comm);
// The first node re-uses the output (allData) when collecting
// content. All other nodes require temporary node-local storage.
List<Type> tmpNodeData;
if (UPstream::is_subrank(interNodeComm))
{
tmpNodeData.resize(localNodeOffsets.back());
}
List<Type>& nodeData =
(
UPstream::master(interNodeComm) ? allData : tmpNodeData
);
// Stage 1 : Gather data within the node
{
globalIndexGather
(
localNodeOffsets, // (master only)
localNodeComm,
UPstream::allProcs(localNodeComm),
sendData,
nodeData,
tag,
commsType,
useWindow
);
}
// Stage 2 : Gather data between nodes
if (UPstream::is_rank(interNodeComm))
{
globalIndexGather
(
interNodeOffsets, // (master only)
interNodeComm,
UPstream::allProcs(interNodeComm),
nodeData,
allData,
tag,
commsType,
useWindow
);
}
}
else
{
globalIndexGather
(
offsets, // needed on master only
comm,
UPstream::allProcs(comm), // All communicator ranks
sendData,
allData,
tag,
commsType,
useWindow
);
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Main program:
int main(int argc, char *argv[])
{
argList::noCheckProcessorDirectories();
argList::addVerboseOption("Set UPstream::debug level");
argList::addOption("split-size", "NUM", "split with ncores/node");
argList::addBoolOption("builtin", "only use builtin globalIndex::gather");
argList::addBoolOption("window", "get data via window");
// Check -verbose before initialisation
UPstream::debug = argList::verbose(argc, argv);
// Check -split-size before initialisation
{
int splitSize = option_splitsize(argc, argv);
if (splitSize >= 0)
{
UPstream::nodeCommsControl_ = splitSize;
}
}
#include "setRootCase.H"
const bool useLocalComms = UPstream::usingNodeComms();
bool useWindow = args.found("window");
bool useBuiltin = args.found("builtin");
Info<< nl
<< "Getting local-comms: " << Switch::name(useLocalComms) << nl
<< "Getting data with window: " << Switch::name(useWindow) << nl
<< nl;
if (useWindow && useBuiltin)
{
Info<< "Selected '-window' and '-builtin' : ignoring -builtin'"
<< nl;
useBuiltin = false;
}
Random rng(31 + 2*UPstream::myProcNo());
const label localSize = (5*rng.position<label>(1, 15));
globalIndex globIndex
(
globalIndex::gatherOnly{},
localSize,
UPstream::commWorld()
);
Info<< "global-index: ";
printGlobalIndex(Info, globIndex);
reportOffsets(globIndex);
Field<scalar> allData;
Field<scalar> localFld(localSize, scalar(UPstream::myProcNo()));
if (useBuiltin)
{
globIndex.gather
(
localFld,
allData,
UPstream::msgType(),
UPstream::commsTypes::nonBlocking,
UPstream::commWorld()
);
}
else
{
globalIndexGather
(
globIndex,
localFld,
allData,
UPstream::msgType(),
UPstream::commsTypes::nonBlocking,
UPstream::commWorld(),
useWindow
);
}
Pout<< "local: " << flatOutput(localFld) << nl;
Info<< "field: " << flatOutput(allData) << nl;
Info<< "\nEnd\n" << endl;
return 0;
}
// ************************************************************************* //

View File

@ -0,0 +1,3 @@
Test-nodeTopology.cxx
EXE = $(FOAM_USER_APPBIN)/Test-nodeTopology

View File

@ -0,0 +1,2 @@
/* EXE_INC = */
/* EXE_LIBS = */

View File

@ -0,0 +1,198 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application
Test-nodeTopology
Description
Simple reporting of node topology
\*---------------------------------------------------------------------------*/
#include "argList.H"
#include "IOstreams.H"
using namespace Foam;
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
int main(int argc, char *argv[])
{
argList::noBanner();
argList::noCheckProcessorDirectories();
argList::addOption
(
"numProcs",
"int",
"Num of ranks to simulate (default: 16)"
);
argList::addOption
(
"cores",
"int",
"Num of cores to simulate (default: 4)"
);
#include "setRootCase.H"
label nProcs = UPstream::nProcs(UPstream::worldComm);
List<int> interNodeProcs_fake;
if (UPstream::parRun())
{
if (args.found("numProcs"))
{
InfoErr<< "ignoring -np option in parallel" << nl;
}
if (args.found("cores"))
{
InfoErr<< "ignoring -cores option in parallel" << nl;
}
}
else
{
// serial
nProcs = args.getOrDefault<label>("numProcs", 16);
label nCores = args.getOrDefault<label>("cores", 4);
if (nCores > 1 && nCores < nProcs)
{
const label numNodes
= (nProcs/nCores) + ((nProcs % nCores) ? 1 : 0);
interNodeProcs_fake.resize(numNodes);
for (label nodei = 0; nodei < numNodes; ++nodei)
{
interNodeProcs_fake[nodei] = nodei * nCores;
}
}
}
const List<int>& interNodeProcs =
(
UPstream::parRun()
? UPstream::procID(UPstream::commInterNode())
: interNodeProcs_fake
);
// Generate the graph
if (UPstream::master(UPstream::worldComm))
{
auto& os = Info.stream();
os << "// node topology graph:" << nl;
os.beginBlock("graph");
// Prefer left-to-right layout for large graphs
os << indent << "rankdir=LR" << nl;
int pos = 0;
// First level are the inter-node connections
const label parent = 0;
for (const auto proci : interNodeProcs)
{
if (parent == proci) continue;
if (pos)
{
os << " ";
}
else
{
os << indent;
}
os << parent << " -- " << proci;
if (++pos >= 4) // Max 4 items per line
{
pos = 0;
os << nl;
}
}
if (pos)
{
pos = 0;
os << nl;
}
// Next level are within the nodes
for (label nodei = 0; nodei < interNodeProcs.size(); ++nodei)
{
pos = 0;
label firstProc = interNodeProcs[nodei];
const label lastProc =
(
(nodei+1 < interNodeProcs.size())
? interNodeProcs[nodei+1]
: nProcs
);
os << indent << "// inter-node " << nodei
<< " [" << firstProc
<< ".." << lastProc-1 << "]" << nl;
for (label proci = firstProc; proci < lastProc; ++proci)
{
if (firstProc == proci) continue;
if (pos)
{
os << " ";
}
else
{
os << indent;
}
os << firstProc << " -- " << proci;
if (++pos >= 4) // Max 4 items per line
{
pos = 0;
os << nl;
}
}
if (pos)
{
pos = 0;
os << nl;
}
}
os.endBlock();
os << "// end graph" << nl;
}
InfoErr << "\nDone" << nl;
return 0;
}
// ************************************************************************* //

View File

@ -158,7 +158,7 @@ int main(int argc, char *argv[])
for (label count = 0; count < repeat; ++count)
{
label comm = UPstream::allocateCommunicator(UPstream::worldComm, top);
label comm = UPstream::newCommunicator(UPstream::worldComm, top);
scalar localValue = 111*UPstream::myProcNo(UPstream::worldComm);

View File

@ -68,14 +68,14 @@ int main(int argc, char *argv[])
argList::noCheckProcessorDirectories();
argList::addBoolOption("info", "information");
argList::addBoolOption("print-tree", "Report tree(s) as graph");
argList::addBoolOption("comm-split", "Test simple comm split");
argList::addBoolOption("mpi-host-comm", "Test DIY host-comm split");
argList::addBoolOption("no-test", "Disable general tests");
argList::addBoolOption("host-comm", "Test Pstream host-comm");
argList::addBoolOption("host-broadcast", "Test host-base broadcasts");
#include "setRootCase.H"
const bool optPrintTree = args.found("print-tree");
bool generalTest = !args.found("no-test");
Info<< nl
<< "parallel:" << UPstream::parRun()
@ -89,6 +89,18 @@ int main(int argc, char *argv[])
UPstream::printCommTree(UPstream::commWorld());
}
if (UPstream::parRun())
{
Pout<< "world ranks: 0.."
<< UPstream::nProcs(UPstream::commWorld())-1 << nl;
Pout<< "inter-node ranks: " << UPstream::numNodes() << ' '
<< flatOutput(UPstream::procID(UPstream::commInterNode())) << nl;
Pout<< "local-node ranks: "
<< flatOutput(UPstream::procID(UPstream::commLocalNode())) << nl;
}
if (args.found("info"))
{
Info<< nl;
@ -104,334 +116,29 @@ int main(int argc, char *argv[])
Pout<< endl;
}
bool generalTest = true;
if (UPstream::parRun() && args.found("comm-split"))
{
generalTest = false;
int world_nprocs = 0;
int world_rank = -1;
MPI_Comm_size(MPI_COMM_WORLD, &world_nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int host_nprocs = 0;
int host_rank = -1;
MPI_Comm commIntraHost;
MPI_Comm_split_type
(
MPI_COMM_WORLD,
MPI_COMM_TYPE_SHARED, // OMPI_COMM_TYPE_NODE
0, MPI_INFO_NULL, &commIntraHost
);
MPI_Comm_size(commIntraHost, &host_nprocs);
MPI_Comm_rank(commIntraHost, &host_rank);
int leader_nprocs = 0;
int leader_rank = -1;
MPI_Comm commInterHost;
if (false)
{
// Easy enough to use MPI_Comm_split, but slightly annoying
// that it returns MPI_COMM_NULL for unused ranks...
MPI_Comm commInterHost;
MPI_Comm_split
(
MPI_COMM_WORLD,
(host_rank == 0) ? 0 : MPI_UNDEFINED,
0, &commInterHost
);
if (commInterHost != MPI_COMM_NULL)
{
MPI_Comm_size(commInterHost, &leader_nprocs);
MPI_Comm_rank(commInterHost, &leader_rank);
}
}
else
{
boolList isHostLeader(world_nprocs, false);
isHostLeader[world_rank] = (host_rank == 0);
MPI_Allgather
(
// recv is also send
MPI_IN_PLACE, 1, MPI_C_BOOL,
isHostLeader.data(), 1, MPI_C_BOOL,
MPI_COMM_WORLD
);
Pout<< "leaders: " << isHostLeader << endl;
DynamicList<int> subRanks(isHostLeader.size());
forAll(isHostLeader, proci)
{
if (isHostLeader[proci])
{
subRanks.push_back(proci);
}
}
// Starting from parent
MPI_Group parent_group;
MPI_Comm_group(MPI_COMM_WORLD, &parent_group);
MPI_Group active_group;
MPI_Group_incl
(
parent_group,
subRanks.size(),
subRanks.cdata(),
&active_group
);
// Create new communicator for this group
MPI_Comm_create_group
(
MPI_COMM_WORLD,
active_group,
UPstream::msgType(),
&commInterHost
);
// Groups not needed after this...
MPI_Group_free(&parent_group);
MPI_Group_free(&active_group);
MPI_Comm_size(commInterHost, &leader_nprocs);
MPI_Comm_rank(commInterHost, &leader_rank);
}
Pout<< nl << "[MPI_Comm_split_type]" << nl
<< "Host rank " << host_rank << " / " << host_nprocs
<< " on " << hostName()
<< " inter-rank: " << leader_rank << " / " << leader_nprocs
<< " host leader:" << (leader_rank == 0)
<< " sub-rank:" << (leader_rank > 0)
<< nl;
if (commInterHost != MPI_COMM_NULL)
{
MPI_Comm_free(&commInterHost);
}
if (commIntraHost != MPI_COMM_NULL)
{
MPI_Comm_free(&commIntraHost);
}
}
if (UPstream::parRun() && args.found("mpi-host-comm"))
{
generalTest = false;
// Host communicator, based on the current world communicator
// Use hostname
// Lowest rank per hostname is the IO rank
label numprocs = UPstream::nProcs(UPstream::commGlobal());
// Option 1: using hostnames
// - pro: trivial coding
// - con: unequal lengths, more allocations and 'hops'
stringList hosts(numprocs);
hosts[Pstream::myProcNo(UPstream::commGlobal())] = hostName();
Pstream::gatherList(hosts, UPstream::msgType(), UPstream::commGlobal());
// Option 2: using SHA1 of hostnames
// - con: uglier coding (but only needed locally!)
// - pro: fixed digest length enables direct MPI calls
// can avoid Pstream::gatherList() during setup...
List<SHA1Digest> digests;
if (UPstream::master(UPstream::commGlobal()))
{
digests.resize(numprocs);
}
{
const SHA1Digest myDigest(SHA1(hostName()).digest());
UPstream::mpiGather
(
myDigest.cdata_bytes(), // Send
digests.data_bytes(), // Recv
SHA1Digest::max_size(), // Num send/recv per rank
UPstream::commGlobal()
);
}
labelList hostIDs(numprocs);
DynamicList<label> subRanks(numprocs);
Info<< "digests: " << digests << nl;
// Compact numbering
if (UPstream::master(UPstream::commGlobal()))
{
DynamicList<word> hostNames(numprocs);
forAll(hosts, proci)
{
const word& host = hosts[proci];
hostIDs[proci] = hostNames.find(host);
if (hostIDs[proci] < 0)
{
// First appearance of host (encode as leader)
hostIDs[proci] = -(hostNames.size() + 1);
hostNames.push_back(host);
}
}
hostIDs = -1;
DynamicList<SHA1Digest> uniqDigests(numprocs);
forAll(digests, proci)
{
const SHA1Digest& dig = digests[proci];
hostIDs[proci] = uniqDigests.find(dig);
if (hostIDs[proci] < 0)
{
// First appearance of host (encode as leader)
hostIDs[proci] = -(uniqDigests.size() + 1);
uniqDigests.push_back(dig);
}
}
}
Info<< "hosts = " << hosts << endl;
Info<< "hostIDs = " << hostIDs << endl;
UPstream::broadcast
(
hostIDs.data_bytes(),
hostIDs.size_bytes(),
UPstream::commGlobal(),
UPstream::masterNo()
);
// Ranks for world to inter-host communicator
// - very straightforward
#if 0
subRanks.clear();
forAll(hostIDs, proci)
{
// Is host leader?
if (hostIDs[proci] < 0)
{
subRanks.push_back(proci);
// Flip back to generic host id
hostIDs[proci] = -(hostIDs[proci] + 1);
}
}
// From world to hostMaster
const label commInterHost =
UPstream::allocateCommunicator(UPstream::commGlobal(), subRanks);
#endif
const label myWorldProci = UPstream::myProcNo(UPstream::commGlobal());
label myHostId = hostIDs[myWorldProci];
if (myHostId < 0) myHostId = -(myHostId + 1); // Flip to generic id
// Ranks for within a host
subRanks.clear();
forAll(hostIDs, proci)
{
label id = hostIDs[proci];
if (id < 0) id = -(id + 1); // Flip to generic id
if (id == myHostId)
{
subRanks.push_back(proci);
}
}
// The intra-host ranks
const label commIntraHost =
UPstream::allocateCommunicator(UPstream::commGlobal(), subRanks);
// Test what if we have intra-host comm and we want host-master
List<bool> isHostMaster(numprocs, false);
if (UPstream::master(commIntraHost))
{
isHostMaster[myWorldProci] = true;
}
UPstream::mpiAllGather
(
isHostMaster.data_bytes(),
sizeof(bool),
UPstream::commGlobal()
);
// Ranks for world to hostMaster
// - very straightforward
subRanks.clear();
forAll(isHostMaster, proci)
{
if (isHostMaster[proci])
{
subRanks.push_back(proci);
}
}
// From world to hostMaster
const label commInterHost =
UPstream::allocateCommunicator(UPstream::commGlobal(), subRanks);
Pout<< nl << "[manual split]" << nl
<< nl << "Host rank " << UPstream::myProcNo(commIntraHost)
<< " / " << UPstream::nProcs(commIntraHost)
<< " on " << hostName()
<< ", inter-rank: " << UPstream::myProcNo(commInterHost)
<< " / " << UPstream::nProcs(commInterHost)
<< " host leader:" << UPstream::master(commInterHost)
<< " sub-rank:" << UPstream::is_subrank(commInterHost)
<< nl;
UPstream::freeCommunicator(commInterHost);
UPstream::freeCommunicator(commIntraHost);
}
if (UPstream::parRun() && args.found("host-comm"))
{
generalTest = false;
Info<< nl << "[pstream host-comm]" << nl << endl;
const label commInterHost = UPstream::commInterHost();
const label commIntraHost = UPstream::commIntraHost();
const label commInterNode = UPstream::commInterNode();
const label commLocalNode = UPstream::commLocalNode();
Pout<< "Host rank " << UPstream::myProcNo(commIntraHost)
<< " / " << UPstream::nProcs(commIntraHost)
Pout<< "Host rank " << UPstream::myProcNo(commLocalNode)
<< " / " << UPstream::nProcs(commLocalNode)
<< " on " << hostName()
<< ", inter-rank: " << UPstream::myProcNo(commInterHost)
<< " / " << UPstream::nProcs(commInterHost)
<< ", host leader:" << UPstream::master(commInterHost)
<< " sub-rank:" << UPstream::is_subrank(commInterHost)
<< ", inter-rank: " << UPstream::myProcNo(commInterNode)
<< " / " << UPstream::nProcs(commInterNode)
<< ", host leader:" << UPstream::master(commInterNode)
<< " sub-rank:" << UPstream::is_subrank(commInterNode)
<< endl;
{
Info<< "host-master: "
<< UPstream::whichCommunication(commInterHost) << endl;
<< UPstream::whichCommunication(commInterNode) << endl;
UPstream::printCommTree(commInterHost);
UPstream::printCommTree(commIntraHost);
UPstream::printCommTree(commInterNode);
UPstream::printCommTree(commLocalNode);
}
}
@ -440,32 +147,32 @@ int main(int argc, char *argv[])
generalTest = false;
Info<< nl << "[pstream host-broadcast]" << nl << endl;
const label commInterHost = UPstream::commInterHost();
const label commIntraHost = UPstream::commIntraHost();
const label commInterNode = UPstream::commInterNode();
const label commLocalNode = UPstream::commLocalNode();
Pout<< "world rank: " << UPstream::myProcNo(UPstream::commWorld())
<< " host-leader rank: "
<< UPstream::myProcNo(UPstream::commInterHost())
<< UPstream::myProcNo(UPstream::commInterNode())
<< " intra-host rank: "
<< UPstream::myProcNo(UPstream::commIntraHost())
<< UPstream::myProcNo(UPstream::commLocalNode())
<< endl;
label value1(0), value2(0), value3(0);
label hostIndex = UPstream::myProcNo(commInterHost);
label hostIndex = UPstream::myProcNo(commInterNode);
if (UPstream::master(commInterHost))
if (UPstream::master(commInterNode))
{
value1 = 100;
value2 = 200;
}
if (UPstream::master(commIntraHost))
if (UPstream::master(commLocalNode))
{
value3 = 300;
}
Pstream::broadcast(value1, commInterHost);
Pstream::broadcast(value2, commIntraHost);
Pstream::broadcast(hostIndex, commIntraHost);
Pstream::broadcast(value1, commInterNode);
Pstream::broadcast(value2, commLocalNode);
Pstream::broadcast(hostIndex, commLocalNode);
Pout<< "host: " << hostIndex
<< " broadcast 1: "
@ -474,7 +181,7 @@ int main(int argc, char *argv[])
<< value3 << endl;
// re-broadcast
Pstream::broadcast(value1, commIntraHost);
Pstream::broadcast(value1, commLocalNode);
Pout<< "host: " << hostIndex
<< " broadcast 2: "
<< value1 << endl;
@ -483,42 +190,42 @@ int main(int argc, char *argv[])
label reduced1 = value1;
label reduced2 = value1;
reduce
Foam::reduce
(
reduced1,
sumOp<label>(),
UPstream::msgType(),
commIntraHost
commLocalNode
);
reduce
Foam::reduce
(
reduced2,
sumOp<label>(),
UPstream::msgType(),
commInterHost
commInterNode
);
Pout<< "value1: (host) " << reduced1
<< " (leader) " << reduced2 << endl;
// Pout<< "ranks: " << UPstream::nProcs(commInterHost) << endl;
// Pout<< "ranks: " << UPstream::nProcs(commInterNode) << endl;
wordList strings;
if (UPstream::is_rank(commInterHost))
if (UPstream::is_rank(commInterNode))
{
strings.resize(UPstream::nProcs(commInterHost));
strings[UPstream::myProcNo(commInterHost)] = name(pid());
strings.resize(UPstream::nProcs(commInterNode));
strings[UPstream::myProcNo(commInterNode)] = name(pid());
}
// Some basic gather/scatter
Pstream::allGatherList(strings, UPstream::msgType(), commInterHost);
Pstream::allGatherList(strings, UPstream::msgType(), commInterNode);
Pout<< "pids " << flatOutput(strings) << endl;
Foam::reverse(strings);
Pstream::broadcast(strings, commIntraHost);
Pstream::broadcast(strings, commLocalNode);
Pout<< "PIDS " << flatOutput(strings) << endl;
}

View File

@ -135,6 +135,18 @@ OptimisationSwitches
// Default communication type (nonBlocking | scheduled | buffered)
commsType nonBlocking;
// Use host/node topology-aware routines
// 0: disabled
// 1: split by hostname [default]
// 2: split by shared
// >=4: (debug/manual) split with given number per node
nodeComms 1;
// Minimum number of nodes before topology-aware routines are enabled
// <= 2 : always
// >= 3 : when there are more than N nodes
nodeComms.min 0;
// Transfer double as float for processor boundaries. Mostly defunct.
floatTransfer 0;

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2015-2023 OpenCFD Ltd.
Copyright (C) 2015-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -58,82 +58,6 @@ Foam::UPstream::commsTypeNames
});
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
namespace Foam
{
// Determine host grouping.
// Uses SHA1 of hostname instead of MPI_Comm_split or MPI_Comm_split_type
// for two reasons:
// - Comm_split returns an MPI_COMM_NULL on non-participating process
// which does not easily fit into the OpenFOAM framework
//
// - use the SHA1 of hostname allows a single MPI_Gather, determination of
// the inter-host vs intra-host (on the master) followed by a single
// broadcast of integers.
//
// Returns: the unique host indices with the leading hosts encoded
// with negative values
static List<int> getHostGroupIds(const label parentCommunicator)
{
const label numProcs = UPstream::nProcs(parentCommunicator);
List<SHA1Digest> digests;
if (UPstream::master(parentCommunicator))
{
digests.resize(numProcs);
}
// Could also add lowercase etc, but since hostName()
// will be consistent within the same node, there is no need.
SHA1Digest myDigest(SHA1(hostName()).digest());
// The fixed-length digest allows use of MPI_Gather
UPstream::mpiGather
(
myDigest.cdata_bytes(), // Send
digests.data_bytes(), // Recv
SHA1Digest::size_bytes(), // Num send/recv data per rank
parentCommunicator
);
List<int> hostIDs(numProcs);
// Compact numbering of hosts.
if (UPstream::master(parentCommunicator))
{
DynamicList<SHA1Digest> uniqDigests;
forAll(digests, proci)
{
const SHA1Digest& dig = digests[proci];
hostIDs[proci] = uniqDigests.find(dig);
if (hostIDs[proci] < 0)
{
// First appearance of host. Encode as leader
hostIDs[proci] = -(uniqDigests.size() + 1);
uniqDigests.push_back(dig);
}
}
}
UPstream::broadcast
(
hostIDs.data_bytes(),
hostIDs.size_bytes(),
parentCommunicator,
UPstream::masterNo()
);
return hostIDs;
}
} // End namespace Foam
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
@ -158,7 +82,7 @@ void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
freeCommunicator(UPstream::commGlobal());
// 0: COMM_WORLD : commWorld() / commGlobal()
comm = allocateCommunicator(-1, singleProc, false);
comm = newCommunicator(-1, singleProc, false);
if (comm != UPstream::commGlobal())
{
// Failed sanity check
@ -169,7 +93,7 @@ void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
}
// 1: COMM_SELF
comm = allocateCommunicator(-2, singleProc, false);
comm = newCommunicator(-2, singleProc, false);
if (comm != UPstream::commSelf())
{
// Failed sanity check
@ -192,7 +116,7 @@ void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
freeCommunicator(UPstream::commGlobal());
// 0: COMM_WORLD : commWorld() / commGlobal()
comm = allocateCommunicator(-1, labelRange(nProcs), true);
comm = newCommunicator(-1, labelRange(nProcs), true);
if (comm != UPstream::commGlobal())
{
// Failed sanity check
@ -202,10 +126,12 @@ void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
<< Foam::exit(FatalError);
}
const int globalRanki = UPstream::myProcNo(UPstream::commGlobal());
// 1: COMM_SELF
// - Processor number wrt world communicator
singleProc.start() = UPstream::myProcNo(UPstream::commGlobal());
comm = allocateCommunicator(-2, singleProc, true);
singleProc.start() = globalRanki;
comm = newCommunicator(-2, singleProc, true);
if (comm != UPstream::commSelf())
{
// Failed sanity check
@ -215,7 +141,7 @@ void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
<< Foam::exit(FatalError);
}
Pout.prefix() = '[' + Foam::name(myProcNo(commGlobal())) + "] ";
Pout.prefix() = '[' + std::to_string(globalRanki) + "] ";
Perr.prefix() = Pout.prefix();
}
@ -243,6 +169,7 @@ Foam::label Foam::UPstream::getAvailableCommIndex(const label parentIndex)
parentComm_[index] = parentIndex;
procIDs_[index].clear();
// Sizing and filling are demand-driven
linearCommunication_[index].clear();
treeCommunication_[index].clear();
}
@ -255,6 +182,7 @@ Foam::label Foam::UPstream::getAvailableCommIndex(const label parentIndex)
parentComm_.push_back(parentIndex);
procIDs_.emplace_back();
// Sizing and filling are demand-driven
linearCommunication_.emplace_back();
treeCommunication_.emplace_back();
}
@ -263,7 +191,7 @@ Foam::label Foam::UPstream::getAvailableCommIndex(const label parentIndex)
}
Foam::label Foam::UPstream::allocateCommunicator
Foam::label Foam::UPstream::newCommunicator
(
const label parentIndex,
const labelRange& subRanks,
@ -274,57 +202,46 @@ Foam::label Foam::UPstream::allocateCommunicator
if (debug)
{
Perr<< "Allocating communicator " << index << nl
<< " parent : " << parentIndex << nl
<< " procs : " << subRanks << nl
Perr<< "Allocate communicator ["
<< index << "] from [" << parentIndex
<< "] ranks : " << subRanks << nl
<< endl;
}
// Initially treat as master,
// overwritten by allocateCommunicatorComponents
myProcNo_[index] = UPstream::masterNo();
auto& procIds = procIDs_[index];
// The selected sub-ranks.
// - transcribe from label to int
// - already in incremental order
auto& procIds = procIDs_[index];
procIds.resize_nocopy(subRanks.size());
label numSubRanks = 0;
for (const label subRanki : subRanks)
// - already in monotonic order
if
(
(withComponents && UPstream::parRun())
? (parentIndex < 0 || subRanks.contains(myProcNo_[parentIndex]))
: !subRanks.empty()
)
{
procIds[numSubRanks] = subRanki;
++numSubRanks;
procIds.resize_nocopy(subRanks.size());
std::iota(procIds.begin(), procIds.end(), subRanks.start());
}
else
{
// Not involved
procIds.clear();
}
// Sizing and filling are demand-driven
linearCommunication_[index].clear();
treeCommunication_[index].clear();
if (withComponents && parRun())
if (withComponents && UPstream::parRun())
{
allocateCommunicatorComponents(parentIndex, index);
// Could 'remember' locations of uninvolved ranks
/// if (myProcNo_[index] < 0 && parentIndex >= 0)
/// {
/// // As global rank
/// myProcNo_[index] = -(myProcNo_[worldComm]+1);
///
/// OR:
/// // As parent rank number
/// if (myProcNo_[parentIndex] >= 0)
/// {
/// myProcNo_[index] = -(myProcNo_[parentIndex]+1);
/// }
/// }
}
return index;
}
Foam::label Foam::UPstream::allocateCommunicator
Foam::label Foam::UPstream::newCommunicator
(
const label parentIndex,
const labelUList& subRanks,
@ -335,236 +252,288 @@ Foam::label Foam::UPstream::allocateCommunicator
if (debug)
{
Perr<< "Allocating communicator " << index << nl
<< " parent : " << parentIndex << nl
<< " procs : " << flatOutput(subRanks) << nl
Perr<< "Allocate communicator ["
<< index << "] from [" << parentIndex
<< "] ranks : " << flatOutput(subRanks) << nl
<< endl;
}
// Initially treat as master,
// overwritten by allocateCommunicatorComponents
myProcNo_[index] = UPstream::masterNo();
auto& procIds = procIDs_[index];
// The selected sub-ranks.
// - transcribe from label to int. Treat negative values as 'ignore'
// - enforce incremental order (so index is rank in next communicator)
auto& procIds = procIDs_[index];
procIds.resize_nocopy(subRanks.size());
label numSubRanks = 0;
bool monotonicOrder = true;
for (const label subRanki : subRanks)
// - transcribe from label to int
// - sort into monotonic order (if needed)
if
(
(withComponents && UPstream::parRun())
? (parentIndex < 0 || subRanks.contains(myProcNo_[parentIndex]))
: !subRanks.empty()
)
{
if (subRanki < 0)
procIds.resize_nocopy(subRanks.size());
label count = 0;
bool monotonicOrder = true;
for (const auto ranki : subRanks)
{
continue;
}
if (monotonicOrder && numSubRanks)
{
monotonicOrder = (procIds[numSubRanks-1] < subRanki);
if (ranki < 0)
{
continue;
}
// Could also flag/ignore out-of-range ranks
// (ranki >= numProcs)
if (monotonicOrder && count)
{
monotonicOrder = (procIds[count-1] < ranki);
}
procIds[count] = ranki;
++count;
}
procIds[numSubRanks] = subRanki;
++numSubRanks;
}
if (!monotonicOrder)
{
auto last = procIds.begin() + count;
std::sort(procIds.begin(), last);
last = std::unique(procIds.begin(), last);
count = label(last - procIds.begin());
}
if (!monotonicOrder)
procIds.resize(count);
}
else
{
auto last = procIds.begin() + numSubRanks;
std::sort(procIds.begin(), last);
last = std::unique(procIds.begin(), last);
numSubRanks = label(last - procIds.begin());
// Not involved
procIds.clear();
}
procIds.resize(numSubRanks);
// Sizing and filling are demand-driven
linearCommunication_[index].clear();
treeCommunication_[index].clear();
if (withComponents && parRun())
if (withComponents && UPstream::parRun())
{
allocateCommunicatorComponents(parentIndex, index);
// Could 'remember' locations of uninvolved ranks
/// if (myProcNo_[index] < 0 && parentIndex >= 0)
/// {
/// // As global rank
/// myProcNo_[index] = -(myProcNo_[worldComm]+1);
///
/// OR:
/// // As parent rank number
/// if (myProcNo_[parentIndex] >= 0)
/// {
/// myProcNo_[index] = -(myProcNo_[parentIndex]+1);
/// }
/// }
}
return index;
}
Foam::label Foam::UPstream::allocateInterHostCommunicator
Foam::label Foam::UPstream::dupCommunicator
(
const label parentCommunicator
const label parentIndex
)
{
List<int> hostIDs = getHostGroupIds(parentCommunicator);
DynamicList<label> subRanks(hostIDs.size());
// From master to host-leader. Ranks between hosts.
forAll(hostIDs, proci)
{
// Is host leader?
if (hostIDs[proci] < 0)
{
subRanks.push_back(proci);
}
}
return allocateCommunicator(parentCommunicator, subRanks);
}
Foam::label Foam::UPstream::allocateIntraHostCommunicator
(
const label parentCommunicator
)
{
List<int> hostIDs = getHostGroupIds(parentCommunicator);
DynamicList<label> subRanks(hostIDs.size());
// Intra-host ranks. Ranks within a host
int myHostId = hostIDs[UPstream::myProcNo(parentCommunicator)];
if (myHostId < 0) myHostId = -(myHostId + 1); // Flip to generic id
forAll(hostIDs, proci)
{
int id = hostIDs[proci];
if (id < 0) id = -(id + 1); // Flip to generic id
if (id == myHostId)
{
subRanks.push_back(proci);
}
}
return allocateCommunicator(parentCommunicator, subRanks);
}
bool Foam::UPstream::allocateHostCommunicatorPairs()
{
// Use the world communicator (not global communicator)
const label parentCommunicator = worldComm;
// Skip if non-parallel
if (!parRun())
{
return false;
}
if (interHostComm_ >= 0 || intraHostComm_ >= 0)
#ifdef FULLDEBUG
if (FOAM_UNLIKELY(parentIndex < 0))
{
// Failed sanity check
FatalErrorInFunction
<< "Host communicator(s) already created!" << endl
<< "Attempted to duplicate an invalid communicator: "
<< parentIndex
<< Foam::exit(FatalError);
return false;
}
#endif
interHostComm_ = getAvailableCommIndex(parentCommunicator);
intraHostComm_ = getAvailableCommIndex(parentCommunicator);
// Sorted order, purely cosmetic
if (intraHostComm_ < interHostComm_)
{
std::swap(intraHostComm_, interHostComm_);
}
// Overwritten later
myProcNo_[intraHostComm_] = UPstream::masterNo();
myProcNo_[interHostComm_] = UPstream::masterNo();
const label index = getAvailableCommIndex(parentIndex);
if (debug)
{
Perr<< "Allocating host communicators "
<< interHostComm_ << ", " << intraHostComm_ << nl
<< " parent : " << parentCommunicator << nl
Perr<< "Duplicate communicator ["
<< index << "] from [" << parentIndex << "]" << endl;
}
// Initially treat as unknown,
// overwritten by dupCommunicatorComponents
myProcNo_[index] = -1;
procIDs_[index].clear();
if (UPstream::parRun())
{
dupCommunicatorComponents(parentIndex, index);
}
return index;
}
Foam::label Foam::UPstream::splitCommunicator
(
const label parentIndex,
const int colour
)
{
#ifdef FULLDEBUG
if (FOAM_UNLIKELY(parentIndex < 0))
{
// Failed sanity check
FatalErrorInFunction
<< "Attempted to split an invalid communicator: "
<< parentIndex
<< Foam::exit(FatalError);
}
#endif
const label index = getAvailableCommIndex(parentIndex);
if (debug)
{
Perr<< "Split communicator ["
<< index << "] from [" << parentIndex
<< "] using colour=" << colour << endl;
}
// Initially treat as unknown,
// overwritten by splitCommunicatorComponents
myProcNo_[index] = -1;
procIDs_[index].clear();
if (UPstream::parRun())
{
splitCommunicatorComponents(parentIndex, index, colour);
}
return index;
}
bool Foam::UPstream::setHostCommunicators(const int numPerNode)
{
// Uses the world communicator (not global communicator)
// Skip if non-parallel
if (!UPstream::parRun())
{
numNodes_ = 1;
return false;
}
if (FOAM_UNLIKELY(commInterNode_ >= 0 || commLocalNode_ >= 0))
{
// Failed sanity check
FatalErrorInFunction
<< "Node communicator(s) already created!" << endl
<< Foam::abort(FatalError);
return false;
}
commInterNode_ = getAvailableCommIndex(constWorldComm_);
commLocalNode_ = getAvailableCommIndex(constWorldComm_);
// Overwritten later
myProcNo_[commInterNode_] = UPstream::masterNo();
myProcNo_[commLocalNode_] = UPstream::masterNo();
// Sorted order, purely cosmetic
if (commLocalNode_ < commInterNode_)
{
std::swap(commLocalNode_, commInterNode_);
}
if (debug)
{
Perr<< "Allocating node communicators "
<< commInterNode_ << ", " << commLocalNode_
<< " on parent : " << constWorldComm_ << nl
<< endl;
}
List<int> hostIDs = getHostGroupIds(parentCommunicator);
const int worldRank = UPstream::myProcNo(constWorldComm_);
const int worldSize = UPstream::nProcs(constWorldComm_);
DynamicList<int> subRanks(hostIDs.size());
// From master to host-leader. Ranks between hosts.
if (numPerNode > 1)
{
subRanks.clear();
forAll(hostIDs, proci)
// Manual splitting based on given number of ranks per node
const int myNodeId = (worldRank/numPerNode);
// Establish the topology
{
// Is host leader?
if (hostIDs[proci] < 0)
DynamicList<int> nodeGroup(numPerNode);
DynamicList<int> nodeLeader(1+worldSize/numPerNode);
for (int proci = 0; proci < worldSize; ++proci)
{
subRanks.push_back(proci);
if (myNodeId == (proci/numPerNode))
{
nodeGroup.push_back(proci);
}
// Flip to generic host id
hostIDs[proci] = -(hostIDs[proci] + 1);
if ((proci % numPerNode) == 0)
{
// Local rank 0 is a node leader
nodeLeader.push_back(proci);
}
}
procIDs_[commInterNode_] = std::move(nodeLeader);
procIDs_[commLocalNode_] = std::move(nodeGroup);
}
}
else
{
// Determine inter-host/inter-host grouping based on the SHA1 of the
// hostnames. This allows a single initial Allgather to establish
// the overall topology. The alternative is to use MPI_Split_comm_type()
// on SHARED and then MPI_Comm_split() on the leader ranks.
const label index = interHostComm_;
// Could also add lowercase etc, but since hostName()
// will be consistent within the same node, there is no need.
const SHA1Digest myDigest(SHA1(hostName()).digest());
// Direct copy (subRanks is also int)
procIDs_[index] = subRanks;
List<SHA1Digest> digests(worldSize);
digests[worldRank] = myDigest;
// Implicitly: withComponents = true
if (parRun()) // Already checked...
// The fixed-length digest allows use of MPI_Allgather.
UPstream::mpiAllGather
(
digests.data_bytes(), // Send/Rev
SHA1Digest::size_bytes(), // Num send/recv data per rank
UPstream::constWorldComm_
);
// Establish the topology
{
allocateCommunicatorComponents(parentCommunicator, index);
}
DynamicList<int> nodeGroup(64);
DynamicList<int> nodeLeader(64);
DynamicList<SHA1Digest> uniqDigests(64);
// Sizing and filling are demand-driven
linearCommunication_[index].clear();
treeCommunication_[index].clear();
for (int proci = 0; proci < worldSize; ++proci)
{
const auto& dig = digests[proci];
if (myDigest == dig)
{
nodeGroup.push_back(proci);
}
if (!uniqDigests.contains(dig))
{
// First appearance of host
uniqDigests.push_back(dig);
nodeLeader.push_back(proci);
}
}
procIDs_[commInterNode_] = std::move(nodeLeader);
procIDs_[commLocalNode_] = std::move(nodeGroup);
}
}
// Intra-host ranks. Ranks within a host
{
int myHostId = hostIDs[UPstream::myProcNo(parentCommunicator)];
if (myHostId < 0) myHostId = -(myHostId + 1); // Flip to generic id
subRanks.clear();
forAll(hostIDs, proci)
{
int id = hostIDs[proci];
if (id < 0) id = -(id + 1); // Flip to generic id
// Capture the size (number of nodes) before doing anything further
numNodes_ = procIDs_[commInterNode_].size();
if (id == myHostId)
{
subRanks.push_back(proci);
}
}
// ~~~~~~~~~
// IMPORTANT
// ~~~~~~~~~
// Always retain knowledge of the inter-node leaders,
// even if this process is not on that communicator.
// This will help when constructing topology-aware communication.
const label index = intraHostComm_;
// Direct copy (subRanks is also int)
procIDs_[index] = subRanks;
// Implicitly: withComponents = true
if (parRun()) // Already checked...
{
allocateCommunicatorComponents(parentCommunicator, index);
}
// Sizing and filling are demand-driven
linearCommunication_[index].clear();
treeCommunication_[index].clear();
}
// Allocate backend MPI components
allocateCommunicatorComponents(constWorldComm_, commInterNode_);
allocateCommunicatorComponents(constWorldComm_, commLocalNode_);
return true;
}
@ -582,10 +551,6 @@ void Foam::UPstream::freeCommunicator
return;
}
// Update demand-driven communicators
if (interHostComm_ == communicator) interHostComm_ = -1;
if (intraHostComm_ == communicator) intraHostComm_ = -1;
if (debug)
{
Perr<< "Communicators : Freeing communicator " << communicator
@ -688,45 +653,23 @@ void Foam::UPstream::printCommTree(const label communicator)
}
Foam::label Foam::UPstream::commIntraHost()
bool Foam::UPstream::usingNodeComms(const label communicator)
{
if (!parRun())
{
return worldComm; // Don't know anything better to return
}
if (intraHostComm_ < 0)
{
allocateHostCommunicatorPairs();
}
return intraHostComm_;
}
// Starting point must be "real" world-communicator
// ("real" means without any local trickery with worldComm)
// Avoid corner cases:
// - everthing is on one node
// - everthing is on different nodes
Foam::label Foam::UPstream::commInterHost()
{
if (!parRun())
{
return worldComm; // Don't know anything better to return
}
if (interHostComm_ < 0)
{
allocateHostCommunicatorPairs();
}
return interHostComm_;
}
bool Foam::UPstream::hasHostComms()
{
return (intraHostComm_ >= 0 || interHostComm_ >= 0);
}
void Foam::UPstream::clearHostComms()
{
// Always with Pstream
freeCommunicator(intraHostComm_, true);
freeCommunicator(interHostComm_, true);
return
(
parRun_ && (constWorldComm_ == communicator)
&& (nodeCommsControl_ > 0)
// More than one node and above defined threshold
&& (numNodes_ > 1) && (numNodes_ >= nodeCommsMin_)
// Some processes do share nodes
&& (numNodes_ < procIDs_[constWorldComm_].size())
);
}
@ -756,10 +699,12 @@ Foam::DynamicList<Foam::List<Foam::UPstream::commsStruct>>
Foam::UPstream::treeCommunication_(16);
Foam::label Foam::UPstream::intraHostComm_(-1);
Foam::label Foam::UPstream::interHostComm_(-1);
Foam::label Foam::UPstream::constWorldComm_(0);
Foam::label Foam::UPstream::numNodes_(1);
Foam::label Foam::UPstream::commInterNode_(-1);
Foam::label Foam::UPstream::commLocalNode_(-1);
Foam::label Foam::UPstream::worldComm(0);
Foam::label Foam::UPstream::worldComm(0); // Initially same as constWorldComm_
Foam::label Foam::UPstream::warnComm(-1);
@ -767,16 +712,39 @@ Foam::label Foam::UPstream::warnComm(-1);
// These are overwritten in parallel mode (by UPstream::setParRun())
const Foam::label nPredefinedComm = []()
{
// 0: COMM_WORLD : commWorld() / commGlobal()
(void) Foam::UPstream::allocateCommunicator(-1, Foam::labelRange(1), false);
// 0: COMM_WORLD : commGlobal(), constWorldComm_, worldComm
(void) Foam::UPstream::newCommunicator(-1, Foam::labelRange(1), false);
// 1: COMM_SELF
(void) Foam::UPstream::allocateCommunicator(-2, Foam::labelRange(1), false);
(void) Foam::UPstream::newCommunicator(-2, Foam::labelRange(1), false);
return Foam::UPstream::nComms();
}();
int Foam::UPstream::nodeCommsControl_
(
Foam::debug::optimisationSwitch("nodeComms", 1)
);
registerOptSwitch
(
"nodeComms",
int,
Foam::UPstream::nodeCommsControl_
);
int Foam::UPstream::nodeCommsMin_
(
Foam::debug::optimisationSwitch("nodeComms.min", 0)
);
registerOptSwitch
(
"nodeComms.min",
int,
Foam::UPstream::nodeCommsMin_
);
bool Foam::UPstream::floatTransfer
(
Foam::debug::optimisationSwitch("floatTransfer", 0)

View File

@ -220,18 +220,28 @@ private:
//- Standard transfer message type
static int msgType_;
//- Index to the world-communicator as defined at startup
//- (after any multi-world definitions).
//- Is unaffected by any later changes to worldComm.
static label constWorldComm_;
//- The number of shared/host nodes in the (const) world communicator.
static label numNodes_;
//- Index to the inter-node communicator (between nodes),
//- defined based on constWorldComm_
static label commInterNode_;
//- Index to the intra-host communicator (within a node),
//- defined based on constWorldComm_
static label commLocalNode_;
//- Names of all worlds
static wordList allWorlds_;
//- Per processor the world index (into allWorlds_)
static labelList worldIDs_;
//- Intra-host communicator
static label intraHostComm_;
//- Inter-host communicator (between host leaders)
static label interHostComm_;
// Communicator specific data
@ -259,24 +269,59 @@ private:
//- Set data for parallel running
static void setParRun(const label nProcs, const bool haveThreads);
//- Initialise entries for new communicator. Return the index
//- Initialise entries for new communicator.
//
// Resets corresponding entry in myProcNo_, procIDs_,
// linearCommunication_, treeCommunication_
// \return the communicator index
static label getAvailableCommIndex(const label parentIndex);
//- Allocate MPI components of communicator with given index
//- Define inter-host/intra-host communicators (uses commConstWorld).
// Optionally specify a given number per node.
static bool setHostCommunicators(const int numPerNode = 0);
//- Define inter-host/intra-host communicators based on
//- shared-memory information. Uses comm-world.
static bool setSharedMemoryCommunicators();
//- Allocate MPI components of communicator with given index.
// This represents a "top-down" approach, creating a communicator
// based on the procIDs_ groupings.
//
// Modifies myProcNo_, reads and modifies procIDs_
static void allocateCommunicatorComponents
(
const label parentIndex,
const label index
);
//- Allocate MPI components as duplicate of the parent communicator
//
// Modifies myProcNo_, procIDs_
static void dupCommunicatorComponents
(
const label parentIndex,
const label index
);
//- Allocate MPI components for the given index by splitting
//- the parent communicator on the given \em colour.
// This represents a "bottom-up" approach, when the individual ranks
// only know which group they should belong to, but don't yet know
// which other ranks will be in their group.
//
// Modifies myProcNo_, procIDs_
static void splitCommunicatorComponents
(
const label parentIndex,
const label index,
const int colour
);
//- Free MPI components of communicator.
// Does not touch the first two communicators (SELF, WORLD)
static void freeCommunicatorComponents(const label index);
//- Allocate inter-host, intra-host communicators
//- with comm-world as parent
static bool allocateHostCommunicatorPairs();
public:
@ -286,6 +331,18 @@ public:
// Static Data
//- Use of host/node topology-aware routines
// 0: disabled
// 1: split by hostname [default]
// 2: split by shared
// >=4: (debug) split with given number per node
static int nodeCommsControl_;
//- Minimum number of nodes before topology-aware routines are enabled
// <= 2 : always
// >= 3 : when there are more than N nodes
static int nodeCommsMin_;
//- Should compact transfer be used in which floats replace doubles
//- reducing the bandwidth requirement at the expense of some loss
//- in accuracy
@ -323,12 +380,19 @@ public:
//- Debugging: warn for use of any communicator differing from warnComm
static label warnComm;
//- Communicator for all ranks, irrespective of any local worlds
//- Communicator for all ranks, irrespective of any local worlds.
// This value \em never changes during a simulation.
static constexpr label commGlobal() noexcept { return 0; }
//- Communicator within the current rank only
// This value \em never changes during a simulation.
static constexpr label commSelf() noexcept { return 1; }
//- Communicator for all ranks (respecting any local worlds).
// This value \em never changes after startup. Unlike the commWorld()
// which can be temporarily overriden.
static label commConstWorld() noexcept { return constWorldComm_; }
//- Communicator for all ranks (respecting any local worlds)
static label commWorld() noexcept { return worldComm; }
@ -343,6 +407,7 @@ public:
//- Alter communicator debugging setting.
//- Warns for use of any communicator differing from specified.
//- Negative values disable.
// \returns the previous warn index
static label commWarn(const label communicator) noexcept
{
@ -360,17 +425,33 @@ public:
// Host Communicators
//- Demand-driven: Intra-host communicator (respects any local worlds)
static label commIntraHost();
//- Communicator between nodes/hosts (respects any local worlds)
static label commInterNode() noexcept
{
return (parRun_ ? commInterNode_ : constWorldComm_);
}
//- Demand-driven: Inter-host communicator (respects any local worlds)
static label commInterHost();
//- Communicator within the node/host (respects any local worlds)
static label commLocalNode() noexcept
{
return (parRun_ ? commLocalNode_ : constWorldComm_);
}
//- Test for presence of any intra or inter host communicators
static bool hasHostComms();
//- Both inter-node and local-node communicators have been created
static bool hasNodeCommunicators() noexcept
{
return
(
(commInterNode_ > constWorldComm_)
&& (commLocalNode_ > constWorldComm_)
);
}
//- Remove any existing intra and inter host communicators
static void clearHostComms();
//- True if node topology-aware routines have been enabled,
//- it is running in parallel, the starting point is the
//- world-communicator and it is not an odd corner case
//- (ie, all processes on one node, all processes on different nodes)
static bool usingNodeComms(const label communicator = worldComm);
// Constructors
@ -384,9 +465,8 @@ public:
// Member Functions
//- Allocate new communicator with contiguous sub-ranks
//- on the parent communicator.
static label allocateCommunicator
//- Create new communicator with sub-ranks on the parent communicator
static label newCommunicator
(
//! The parent communicator
const label parent,
@ -398,8 +478,8 @@ public:
const bool withComponents = true
);
//- Allocate new communicator with sub-ranks on the parent communicator
static label allocateCommunicator
//- Creaet new communicator with sub-ranks on the parent communicator
static label newCommunicator
(
//! The parent communicator
const label parent,
@ -411,6 +491,28 @@ public:
const bool withComponents = true
);
//- Duplicate the parent communicator
//
// Always calls dupCommunicatorComponents() internally
static label dupCommunicator
(
//! The parent communicator
const label parent
);
//- Allocate a new communicator by splitting the parent communicator
//- on the given \em colour.
// Always calls splitCommunicatorComponents() internally
static label splitCommunicator
(
//! The parent communicator
const label parent,
//! The colouring to select which ranks to include.
//! Negative values correspond to 'ignore'
const int colour
);
//- Free a previously allocated communicator.
// Ignores placeholder (negative) communicators.
static void freeCommunicator
@ -419,19 +521,6 @@ public:
const bool withComponents = true
);
//- Allocate an inter-host communicator
static label allocateInterHostCommunicator
(
const label parentCommunicator = worldComm
);
//- Allocate an intra-host communicator
static label allocateIntraHostCommunicator
(
const label parentCommunicator = worldComm
);
//- Wrapper class for allocating/freeing communicators. Always invokes
//- allocateCommunicatorComponents() and freeCommunicatorComponents()
class communicator
@ -457,12 +546,11 @@ public:
(
//! The parent communicator
const label parentComm,
//! The contiguous sub-ranks of parent to use
const labelRange& subRanks
)
:
comm_(UPstream::allocateCommunicator(parentComm, subRanks))
comm_(UPstream::newCommunicator(parentComm, subRanks))
{}
//- Allocate communicator for sub-ranks on given parent
@ -470,14 +558,38 @@ public:
(
//! The parent communicator
const label parentComm,
//! The sub-ranks of parent to use (negative values ignored)
const labelUList& subRanks
)
:
comm_(UPstream::allocateCommunicator(parentComm, subRanks))
comm_(UPstream::newCommunicator(parentComm, subRanks))
{}
//- Factory Method :
//- Duplicate the given communicator
static communicator duplicate(const label parentComm)
{
communicator c;
c.comm_ = UPstream::dupCommunicator(parentComm);
return c;
}
//- Factory Method :
//- Split the communicator on the given \em colour.
static communicator split
(
//! The parent communicator
const label parentComm,
//! The colouring to select which ranks to include.
//! Negative values correspond to 'ignore'
const int colour
)
{
communicator c;
c.comm_ = UPstream::splitCommunicator(parentComm, colour);
return c;
}
//- Free allocated communicator
~communicator() { UPstream::freeCommunicator(comm_); }
@ -498,14 +610,14 @@ public:
void reset(label parent, const labelRange& subRanks)
{
UPstream::freeCommunicator(comm_);
comm_ = UPstream::allocateCommunicator(parent, subRanks);
comm_ = UPstream::newCommunicator(parent, subRanks);
}
//- Allocate with sub-ranks of parent communicator
void reset(label parent, const labelUList& subRanks)
{
UPstream::freeCommunicator(comm_);
comm_ = UPstream::allocateCommunicator(parent, subRanks);
comm_ = UPstream::newCommunicator(parent, subRanks);
}
//- Take ownership, free allocated communicator
@ -805,7 +917,7 @@ public:
}
//- Rank of this process in the communicator (starting from masterNo()).
//- Can be negative if the process is not a rank in the communicator
//- Negative if the process is not a rank in the communicator.
static int myProcNo(const label communicator = worldComm)
{
return myProcNo_[communicator];
@ -817,11 +929,11 @@ public:
return myProcNo_[communicator] == masterNo();
}
//- True if process corresponds to any rank (master or sub-rank)
//- True if process corresponds to \b any rank (master or sub-rank)
//- in the given communicator
static bool is_rank(const label communicator = worldComm)
{
return myProcNo_[communicator] >= 0;
return myProcNo_[communicator] >= masterNo();
}
//- True if process corresponds to a sub-rank in the given communicator
@ -842,6 +954,12 @@ public:
);
}
//- The number of shared/host nodes in the (const) world communicator.
static label numNodes() noexcept
{
return numNodes_;
}
//- The parent communicator
static label parent(const label communicator)
{
@ -931,7 +1049,7 @@ public:
(
np <= 1
? List<commsStruct>::null()
: np < nProcsSimpleSum
: (np <= 2 || np < nProcsSimpleSum)
? linearCommunication(communicator)
: treeCommunication(communicator)
);
@ -983,7 +1101,7 @@ public:
static void shutdown(int errNo = 0);
//- Call MPI_Abort with no other checks or cleanup
static void abort();
static void abort(int errNo = 1);
//- Shutdown (finalize) MPI as required and exit program with errNo.
static void exit(int errNo = 1);
@ -1205,27 +1323,43 @@ public:
// Housekeeping
//- Create new communicator with sub-ranks on the parent communicator
// \deprecated(2025-02)
static label allocateCommunicator
(
const label parent,
const labelRange& subRanks,
const bool withComponents = true
)
{
return newCommunicator(parent, subRanks, withComponents);
}
//- Create new communicator with sub-ranks on the parent communicator
// \deprecated(2025-02)
static label allocateCommunicator
(
const label parent,
const labelUList& subRanks,
const bool withComponents = true
)
{
return newCommunicator(parent, subRanks, withComponents);
}
//- Communicator between nodes (respects any local worlds)
FOAM_DEPRECATED_FOR(2025-02, "commInterNode()")
static label commInterHost() noexcept { return commInterNode(); }
//- Communicator within the node (respects any local worlds)
FOAM_DEPRECATED_FOR(2025-02, "commLocalNode()")
static label commIntraHost() noexcept { return commLocalNode(); }
//- Wait for all requests to finish.
// \deprecated(2023-01) Probably not what you want.
// Should normally be restricted to a particular starting request.
FOAM_DEPRECATED_FOR(2023-01, "waitRequests(int) method")
static void waitRequests() { waitRequests(0); }
//- Process index of first sub-process
// \deprecated(2020-09) use subProcs() method instead
FOAM_DEPRECATED_FOR(2020-09, "subProcs() method")
static constexpr int firstSlave() noexcept
{
return 1;
}
//- Process index of last sub-process
// \deprecated(2020-09) use subProcs() method instead
FOAM_DEPRECATED_FOR(2020-09, "subProcs() or allProcs() method")
static int lastSlave(const label communicator = worldComm)
{
return nProcs(communicator) - 1;
}
};

View File

@ -28,6 +28,178 @@ License
#include "UPstream.H"
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
namespace Foam
{
// This outputs as depth-first, but graphviz sorts that for us
static void printGraph_impl
(
Ostream& os,
const UList<UPstream::commsStruct>& comms,
const label proci,
label depth,
const label maxDepth = 1024
)
{
if (proci >= comms.size())
{
// Corner case when only a single rank involved
// (eg, for node-local communicator)
return;
}
const auto& below = comms[proci].below();
if (proci == 0)
{
os << nl << "// communication graph:" << nl;
os.beginBlock("graph");
// Prefer left-to-right layout for large graphs
os << indent << "rankdir=LR" << nl;
if (below.empty())
{
// A graph with a single-node (eg, self-comm)
os << indent << proci << nl;
}
}
int pos = 0;
for (const auto nbrProci : below)
{
if (pos)
{
os << " ";
}
else
{
os << indent;
}
os << proci << " -- " << nbrProci;
if (++pos >= 4) // Max 4 items per line
{
pos = 0;
os << nl;
}
}
if (pos)
{
os << nl;
}
// Limit the maximum depth
++depth;
if (depth >= maxDepth && (proci != 0))
{
return;
}
for (const auto nbrProci : below)
{
// if (proci == nbrProci) continue; // Extreme safety!
printGraph_impl(os, comms, nbrProci, depth, maxDepth);
}
if (proci == 0)
{
os.endBlock();
os << "// end graph" << nl;
}
}
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Create a tree-like schedule. For 8 procs:
// (level 0)
// 0 receives from 1
// 2 receives from 3
// 4 receives from 5
// 6 receives from 7
// (level 1)
// 0 receives from 2
// 4 receives from 6
// (level 2)
// 0 receives from 4
//
// The sends/receives for all levels are collected per processor
// (one send per processor; multiple receives possible) creating
// a table:
//
// So per processor:
// proc receives from sends to
// ---- ------------- --------
// 0 1,2,4 -
// 1 - 0
// 2 3 0
// 3 - 2
// 4 5 0
// 5 - 4
// 6 7 4
// 7 - 6
namespace Foam
{
static label simpleTree
(
const label procID,
const label numProcs,
DynamicList<label>& below,
DynamicList<label>& allBelow
)
{
label above(-1);
for (label mod = 2, step = 1; step < numProcs; step = mod)
{
mod = step * 2;
if (procID % mod)
{
// The rank above
above = procID - (procID % mod);
break;
}
else
{
for
(
label j = procID + step;
j < numProcs && j < procID + mod;
j += step
)
{
below.push_back(j);
}
for
(
label j = procID + step;
j < numProcs && j < procID + mod;
j++
)
{
allBelow.push_back(j);
}
}
}
return above;
}
} // End namespace Foam
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::UPstream::commsStruct::commsStruct
@ -91,7 +263,6 @@ Foam::UPstream::commsStruct::commsStruct
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
// This outputs as depth-first, but graphviz sorts that for us
void Foam::UPstream::commsStruct::printGraph
(
Ostream& os,
@ -99,59 +270,13 @@ void Foam::UPstream::commsStruct::printGraph
const label proci
)
{
// if (proci >= comms.size()) return; // Extreme safety!
// Print graph - starting at depth 0
// Avoid corner case when only a single rank involved
// (eg, for node-local communicator)
const auto& below = comms[proci].below();
if (proci == 0)
if (proci < comms.size())
{
os << nl << "// communication graph:" << nl;
os.beginBlock("graph");
if (below.empty())
{
// A graph with a single-node (eg, self-comm)
os << indent << proci << nl;
}
}
int pos = 0;
for (const label nbrProci : below)
{
if (pos)
{
os << " ";
}
else
{
os << indent;
}
os << proci << " -- " << nbrProci;
if (++pos >= 4) // Max 4 items per line
{
pos = 0;
os << nl;
}
}
if (pos)
{
os << nl;
}
for (const label nbrProci : below)
{
// if (proci == nbrProci) continue; // Extreme safety!
printGraph(os, comms, nbrProci);
}
if (proci == 0)
{
os.endBlock();
os << "// end graph" << nl;
printGraph_impl(os, comms, proci, 0);
}
}
@ -181,88 +306,37 @@ void Foam::UPstream::commsStruct::reset
{
reset();
label above(-1);
DynamicList<label> below;
DynamicList<label> allBelow;
if (numProcs < UPstream::nProcsSimpleSum)
if (numProcs <= 2 || numProcs < UPstream::nProcsSimpleSum)
{
// Linear schedule
// Linear communication pattern
label above(-1);
labelList below;
if (procID == 0)
{
below = identity(numProcs-1, 1);
allBelow = below;
}
else
{
above = 0;
}
*this = UPstream::commsStruct(numProcs, procID, above, below, below);
return;
}
else
{
// Use tree like schedule. For 8 procs:
// (level 0)
// 0 receives from 1
// 2 receives from 3
// 4 receives from 5
// 6 receives from 7
// (level 1)
// 0 receives from 2
// 4 receives from 6
// (level 2)
// 0 receives from 4
//
// The sends/receives for all levels are collected per processor
// (one send per processor; multiple receives possible) creating
// a table:
//
// So per processor:
// proc receives from sends to
// ---- ------------- --------
// 0 1,2,4 -
// 1 - 0
// 2 3 0
// 3 - 2
// 4 5 0
// 5 - 4
// 6 7 4
// 7 - 6
label mod = 0;
for (label step = 1; step < numProcs; step = mod)
{
mod = step * 2;
// Simple tree communication pattern
DynamicList<label> below;
DynamicList<label> allBelow;
if (procID % mod)
{
above = procID - (procID % mod);
break;
}
else
{
for
(
label j = procID + step;
j < numProcs && j < procID + mod;
j += step
)
{
below.push_back(j);
}
for
(
label j = procID + step;
j < numProcs && j < procID + mod;
j++
)
{
allBelow.push_back(j);
}
}
}
}
label above = simpleTree
(
procID,
numProcs,
below,
allBelow
);
*this = UPstream::commsStruct(numProcs, procID, above, below, allBelow);
}

View File

@ -2093,8 +2093,24 @@ void Foam::argList::parse
Info<< "Roots : " << roots << nl;
}
}
Info<< "Pstream initialized with:" << nl
<< " floatTransfer : "
<< " node communication : ";
if (UPstream::nodeCommsControl_ > 0)
{
Info<< Switch::name(UPstream::usingNodeComms())
<< " [min=" << UPstream::nodeCommsMin_
<< ", type=" << UPstream::nodeCommsControl_
<< "]";
}
else
{
Info<< "disabled";
}
Info<< " (" << UPstream::nProcs() << " ranks, "
<< UPstream::numNodes() << " nodes)" << nl;
Info<< " floatTransfer : "
<< Switch::name(UPstream::floatTransfer) << nl
<< " maxCommsSize : "
<< UPstream::maxCommsSize << nl

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2019-2023 OpenCFD Ltd.
Copyright (C) 2019-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -290,14 +290,7 @@ Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize)
maxBufferSize_(maxBufferSize),
threadRunning_(false),
localComm_(UPstream::worldComm),
threadComm_
(
UPstream::allocateCommunicator
(
localComm_,
labelRange(UPstream::nProcs(localComm_))
)
)
threadComm_(UPstream::dupCommunicator(localComm_))
{}
@ -310,14 +303,7 @@ Foam::OFstreamCollator::OFstreamCollator
maxBufferSize_(maxBufferSize),
threadRunning_(false),
localComm_(comm),
threadComm_
(
UPstream::allocateCommunicator
(
localComm_,
labelRange(UPstream::nProcs(localComm_))
)
)
threadComm_(UPstream::dupCommunicator(localComm_))
{}

View File

@ -236,7 +236,7 @@ static Tuple2<label, labelList> getCommPattern()
if (UPstream::parRun() && commAndIORanks.second().size() > 1)
{
// Multiple masters: ranks for my IO range
commAndIORanks.first() = UPstream::allocateCommunicator
commAndIORanks.first() = UPstream::newCommunicator
(
UPstream::worldComm,
fileOperation::subRanks(commAndIORanks.second())

View File

@ -84,7 +84,7 @@ static Tuple2<label, labelList> getCommPattern()
if (UPstream::parRun() && commAndIORanks.second().size() > 1)
{
// Multiple masters: ranks for my IO range
commAndIORanks.first() = UPstream::allocateCommunicator
commAndIORanks.first() = UPstream::newCommunicator
(
UPstream::worldComm,
fileOperation::subRanks(commAndIORanks.second())

View File

@ -362,7 +362,7 @@ Foam::fileOperation::New_impl
// Warning: MS-MPI currently uses MPI_Comm_create() instead of
// MPI_Comm_create_group() so it will block there!
commAndIORanks.first() = UPstream::allocateCommunicator
commAndIORanks.first() = UPstream::newCommunicator
(
UPstream::worldComm,
siblings

View File

@ -106,54 +106,44 @@ Foam::labelRange Foam::fileOperation::subRanks(const labelUList& mainIOranks)
Foam::labelList Foam::fileOperation::getGlobalHostIORanks()
{
const label numProcs = UPstream::nProcs(UPstream::worldComm);
// Very similar to the code in UPstream::setHostCommunicators()
// except we need the leader information on *all* ranks!
// Use hostname
// Lowest rank per hostname is the IO rank
List<SHA1Digest> digests;
if (UPstream::master(UPstream::worldComm))
{
digests.resize(numProcs);
}
const label myProci = UPstream::myProcNo(UPstream::worldComm);
const label numProc = UPstream::nProcs(UPstream::worldComm);
// Could also add lowercase etc, but since hostName()
// will be consistent within the same node, there is no need.
SHA1Digest myDigest(SHA1(hostName()).digest());
const SHA1Digest myDigest(SHA1(hostName()).digest());
// The fixed-length digest allows use of MPI_Gather
UPstream::mpiGather
List<SHA1Digest> digests(numProc);
digests[myProci] = myDigest;
// The fixed-length digest allows use of MPI_Allgather.
UPstream::mpiAllGather
(
myDigest.cdata_bytes(), // Send
digests.data_bytes(), // Recv
digests.data_bytes(), // Send/Recv
SHA1Digest::size_bytes(), // Num send/recv per rank
UPstream::worldComm
);
labelList ranks;
DynamicList<label> dynRanks;
if (UPstream::master(UPstream::worldComm))
DynamicList<label> hostLeaders(UPstream::numNodes());
hostLeaders.push_back(0); // Always include master
for (label previ = 0, proci = 1; proci < digests.size(); ++proci)
{
dynRanks.reserve(numProcs);
dynRanks.push_back(0); // Always include master
label previ = 0;
for (label proci = 1; proci < digests.size(); ++proci)
if (digests[previ] != digests[proci])
{
if (digests[proci] != digests[previ])
{
dynRanks.push_back(proci);
previ = proci;
}
hostLeaders.push_back(proci);
previ = proci;
}
ranks.transfer(dynRanks);
}
Pstream::broadcast(ranks, UPstream::worldComm);
return ranks;
return labelList(std::move(hostLeaders));
// Alternative is to recover information from commInterNode()
// and broadcast via commLocalNode()
}

View File

@ -612,7 +612,7 @@ static Tuple2<label, labelList> getCommPattern()
if (UPstream::parRun() && commAndIORanks.second().size() > 1)
{
// Multiple masters: ranks for my IO range
commAndIORanks.first() = UPstream::allocateCommunicator
commAndIORanks.first() = UPstream::newCommunicator
(
UPstream::worldComm,
fileOperation::subRanks(commAndIORanks.second())

View File

@ -85,7 +85,7 @@ static Tuple2<label, labelList> getCommPattern()
if (UPstream::parRun() && commAndIORanks.second().size() > 1)
{
// Multiple masters: ranks for my IO range
commAndIORanks.first() = UPstream::allocateCommunicator
commAndIORanks.first() = UPstream::newCommunicator
(
UPstream::worldComm,
fileOperation::subRanks(commAndIORanks.second())

View File

@ -204,7 +204,7 @@ static Tuple2<label, labelList> getCommPattern()
if (UPstream::parRun() && commAndIORanks.second().size() > 1)
{
// Multiple masters: ranks for my IO range
commAndIORanks.first() = UPstream::allocateCommunicator
commAndIORanks.first() = UPstream::newCommunicator
(
UPstream::worldComm,
fileOperation::subRanks(commAndIORanks.second())

View File

@ -119,7 +119,7 @@ bool Foam::eagerGAMGProcAgglomeration::agglomerate()
// Communicator for the processor-agglomerated matrix
comms_.push_back
(
UPstream::allocateCommunicator
UPstream::newCommunicator
(
levelComm,
masterProcs

View File

@ -167,7 +167,7 @@ bool Foam::manualGAMGProcAgglomeration::agglomerate()
// Communicator for the processor-agglomerated matrix
comms_.push_back
(
UPstream::allocateCommunicator
UPstream::newCommunicator
(
levelMesh.comm(),
coarseToMaster

View File

@ -194,7 +194,7 @@ bool Foam::masterCoarsestGAMGProcAgglomeration::agglomerate()
// Communicator for the processor-agglomerated matrix
comms_.push_back
(
UPstream::allocateCommunicator
UPstream::newCommunicator
(
levelComm,
masterProcs

View File

@ -286,7 +286,7 @@ bool Foam::procFacesGAMGProcAgglomeration::agglomerate()
// Communicator for the processor-agglomerated matrix
comms_.push_back
(
UPstream::allocateCommunicator
UPstream::newCommunicator
(
levelComm,
masterProcs

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2018-2023 OpenCFD Ltd.
Copyright (C) 2018-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -263,6 +263,117 @@ Foam::globalIndex::calcRanges
}
bool Foam::globalIndex::splitNodeOffsets
(
labelList& interNodeOffsets,
labelList& localNodeOffsets,
const label communicator,
const bool absoluteLocalNodeOffsets
) const
{
// Require const-world as the starting point
if (!UPstream::parRun() || communicator != UPstream::commConstWorld())
{
interNodeOffsets.clear();
localNodeOffsets.clear();
return false;
}
const auto interNodeComm = UPstream::commInterNode();
// Only generate information on the node leaders
if (!UPstream::is_rank(interNodeComm))
{
interNodeOffsets.clear();
localNodeOffsets.clear();
return true; // Not involved, but return true to match others...
}
const label numProc = UPstream::nProcs(UPstream::commConstWorld());
const auto& procIds = UPstream::procID(interNodeComm);
const int ranki = UPstream::myProcNo(interNodeComm);
if (FOAM_UNLIKELY(procIds.empty()))
{
// Should not happen...
interNodeOffsets.clear();
localNodeOffsets.clear();
return true; // Return true to match others...
}
// The inter-node offsets from the node-specific segment of the
// overall offsets, but avoiding MPI_Scatterv (slow, doesn't
// handle overlaps) and using MPI_Bcast() instead.
// Send top-level offsets to the node leaders.
// Could also be a mutable operation and use offsets_ directly.
//
// - number of overall offsets is always (nProc+1) [worldComm]
labelList allOffsets;
if (UPstream::master(interNodeComm))
{
allOffsets = offsets_;
}
else // ie, UPstream::is_subrank(interNodeComm)
{
allOffsets.resize_nocopy(numProc+1);
}
UPstream::broadcast
(
allOffsets.data_bytes(),
allOffsets.size_bytes(),
interNodeComm
);
if (FOAM_UNLIKELY(allOffsets.empty()))
{
// Should not happen...
interNodeOffsets.clear();
localNodeOffsets.clear();
return true; // Return true to match others...
}
// The local node span
const label firstProc = procIds[ranki];
const label lastProc =
(
(ranki+1 < procIds.size())
? procIds[ranki+1]
: numProc
);
// Offsets (within a node)
localNodeOffsets = allOffsets.slice
(
firstProc,
(lastProc - firstProc) + 1 // +1 since offsets
);
if (!absoluteLocalNodeOffsets && !localNodeOffsets.empty())
{
const auto start0 = localNodeOffsets.front();
for (auto& val : localNodeOffsets)
{
val -= start0;
}
}
// Offsets (between nodes)
interNodeOffsets.resize_nocopy(procIds.size()+1);
{
forAll(procIds, i)
{
interNodeOffsets[i] = allOffsets[procIds[i]];
}
interNodeOffsets.back() = allOffsets.back();
}
return true;
}
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::globalIndex::globalIndex(Istream& is)

View File

@ -582,6 +582,23 @@ public:
const bool checkOverflow = false
);
//- Split the top-level offsets into inter-node and local-node
//- components suitable to a two-stage hierarchy.
bool splitNodeOffsets
(
//! [out] Offsets between nodes (only non-empty on node leaders)
labelList& interNodeOffsets,
//! [out] Offsets within a node (only non-empty on node leaders)
labelList& localNodeOffsets,
//! The communicator. Must resolve to const world-comm
const label communicator = UPstream::worldComm,
//! Retain absolute values for the localNode offsets
const bool absoluteLocalNodeOffsets = false
) const;
// Low-level gather routines
//- Collect single values in processor order on master (== procIDs[0]).
// Handles contiguous/non-contiguous data.
template<class ProcIDsContainer, class Type>

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2018 OpenFOAM Foundation
Copyright (C) 2016-2023 OpenCFD Ltd.
Copyright (C) 2016-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -55,6 +55,12 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
}
bool Foam::UPstream::setSharedMemoryCommunicators()
{
return false;
}
void Foam::UPstream::shutdown(int errNo)
{}
@ -66,7 +72,7 @@ void Foam::UPstream::exit(int errNo)
}
void Foam::UPstream::abort()
void Foam::UPstream::abort(int errNo)
{
// No MPI - just abort
std::abort();
@ -77,13 +83,29 @@ void Foam::UPstream::abort()
void Foam::UPstream::allocateCommunicatorComponents
(
const label,
const label
const label parentIndex,
const label index
)
{}
void Foam::UPstream::freeCommunicatorComponents(const label)
void Foam::UPstream::dupCommunicatorComponents
(
const label parentIndex,
const label index
)
{}
void Foam::UPstream::splitCommunicatorComponents
(
const label parentIndex,
const label index,
int colour
)
{}
void Foam::UPstream::freeCommunicatorComponents(const label index)
{}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2013-2015 OpenFOAM Foundation
Copyright (C) 2023 OpenCFD Ltd.
Copyright (C) 2023-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -35,22 +35,27 @@ Foam::DynamicList<MPI_Comm> Foam::PstreamGlobals::MPICommunicators_;
Foam::DynamicList<MPI_Request> Foam::PstreamGlobals::outstandingRequests_;
// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * Communicators * * * * * * * * * * * * * * * //
void Foam::PstreamGlobals::checkCommunicator
(
const label comm,
const label toProcNo
)
void Foam::PstreamGlobals::initCommunicator(const label index)
{
if (comm < 0 || comm >= PstreamGlobals::MPICommunicators_.size())
if (FOAM_UNLIKELY(index < 0 || index > MPICommunicators_.size()))
{
FatalErrorInFunction
<< "toProcNo:" << toProcNo << " : illegal communicator "
<< comm << nl
<< "Communicator should be within range [0,"
<< PstreamGlobals::MPICommunicators_.size()
<< ')' << abort(FatalError);
<< "PstreamGlobals out of sync with UPstream data. Problem."
<< Foam::abort(FatalError);
}
else if (index == MPICommunicators_.size())
{
// Extend storage with null values
pendingMPIFree_.emplace_back(false);
MPICommunicators_.emplace_back(MPI_COMM_NULL);
}
else
{
// Init with null values
pendingMPIFree_[index] = false;
MPICommunicators_[index] = MPI_COMM_NULL;
}
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2013-2015 OpenFOAM Foundation
Copyright (C) 2022-2023 OpenCFD Ltd.
Copyright (C) 2022-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -50,6 +50,8 @@ namespace Foam
namespace PstreamGlobals
{
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// Track if MPI_Comm_free is needed for communicator index in MPICommunicators_
extern DynamicList<bool> pendingMPIFree_;
@ -61,11 +63,27 @@ extern DynamicList<MPI_Comm> MPICommunicators_;
extern DynamicList<MPI_Request> outstandingRequests_;
// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * Communicators * * * * * * * * * * * * * * * //
//- Fatal if comm is outside the allocated range
void checkCommunicator(const label comm, const label toProcNo);
//- Initialize bookkeeping for MPI communicator index
void initCommunicator(const label index);
//- Fatal if communicator is outside the allocated range
inline void checkCommunicator(int comm, int rank)
{
if (FOAM_UNLIKELY(comm < 0 || comm >= MPICommunicators_.size()))
{
FatalErrorInFunction
<< "rank:" << rank << " : illegal communicator "
<< comm << nl
<< "Communicator should be within range [0,"
<< MPICommunicators_.size()
<< ')' << Foam::abort(FatalError);
}
}
// * * * * * * * * * * * * * * * * Requests * * * * * * * * * * * * * * * * //
//- Reset UPstream::Request to null and/or the index of the outstanding
//- request to -1.

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2016-2024 OpenCFD Ltd.
Copyright (C) 2016-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -26,14 +26,13 @@ License
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "UPstream.H"
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include "int.H"
#include "UPstreamWrapping.H"
#include "collatedFileOperation.H"
#include <algorithm>
#include <cstdlib>
#include <cstring>
#include <memory>
@ -197,9 +196,8 @@ bool Foam::UPstream::initNull()
bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
{
int numprocs = 0, myRank = 0;
int provided_thread_support = 0;
int flag = 0;
int provided_thread_support = 0;
MPI_Finalized(&flag);
if (flag)
@ -231,19 +229,25 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
{
Perr<< "UPstream::init : was already initialized\n";
}
MPI_Query_thread(&provided_thread_support);
}
else
{
// (SINGLE | FUNNELED | SERIALIZED | MULTIPLE)
int required_thread_support =
(
needsThread
? MPI_THREAD_MULTIPLE
: MPI_THREAD_SINGLE
);
MPI_Init_thread
(
&argc,
&argv,
(
needsThread
? MPI_THREAD_MULTIPLE
: MPI_THREAD_SINGLE
),
&provided_thread_support
required_thread_support,
&provided_thread_support
);
ourMpi = true;
@ -251,26 +255,26 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
// Check argument list for local world
label worldIndex = -1;
word world;
for (int argi = 1; argi < argc; ++argi)
{
if (strcmp(argv[argi], "-world") == 0)
{
worldIndex = argi++;
if (argi >= argc)
worldIndex = argi;
if (argi+1 >= argc)
{
FatalErrorInFunction
<< "Missing world name to argument \"world\""
<< "Missing world name for option '-world'" << nl
<< Foam::abort(FatalError);
}
world = argv[argi];
break;
}
}
// Filter 'world' option
// Extract world name and filter out '-world <name>' from argv list
word worldName;
if (worldIndex != -1)
{
worldName = argv[worldIndex+1];
for (label i = worldIndex+2; i < argc; i++)
{
argv[i-2] = argv[i];
@ -278,14 +282,15 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
argc -= 2;
}
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
int numProcs = 0, globalRanki = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &globalRanki);
MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
if (UPstream::debug)
{
Perr<< "UPstream::init :"
<< " thread-support : requested:" << needsThread
<< " obtained:"
<< " provided:"
<< (
(provided_thread_support == MPI_THREAD_SINGLE)
? "SINGLE"
@ -295,12 +300,12 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
? "MULTIPLE"
: "other"
)
<< " procs:" << numprocs
<< " rank:" << myRank
<< " world:" << world << endl;
<< " procs:" << numProcs
<< " rank:" << globalRanki
<< " world:" << worldName << endl;
}
if (worldIndex == -1 && numprocs <= 1)
if (worldIndex == -1 && numProcs <= 1)
{
FatalErrorInFunction
<< "attempt to run parallel on 1 processor"
@ -308,46 +313,78 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
}
// Initialise parallel structure
setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
setParRun(numProcs, provided_thread_support == MPI_THREAD_MULTIPLE);
if (worldIndex != -1)
{
// Using local worlds.
// During startup, so commWorld() == commGlobal()
const auto mpiGlobalComm =
PstreamGlobals::MPICommunicators_[UPstream::commGlobal()];
wordList worlds(numprocs);
worlds[UPstream::myProcNo(UPstream::commGlobal())] = world;
Pstream::gatherList
(
worlds,
UPstream::msgType(),
UPstream::commGlobal()
);
// Gather the names of all worlds and determine unique names/indices.
//
// Minimize communication and use low-level MPI to relying on any
// OpenFOAM structures which not yet have been created
// Compact
if (UPstream::master(UPstream::commGlobal()))
{
DynamicList<word> worldNames(numprocs);
worldIDs_.resize_nocopy(numprocs);
// Include a trailing nul character in the lengths
int stride = int(worldName.size()) + 1;
forAll(worlds, proci)
// Use identical size on all ranks (avoids MPI_Allgatherv)
MPI_Allreduce
(
MPI_IN_PLACE,
&stride,
1,
MPI_INT,
MPI_MAX,
mpiGlobalComm
);
// Gather as an extended C-string with embedded nul characters
auto buffer_storage = std::make_unique<char[]>(numProcs*stride);
char* allStrings = buffer_storage.get();
// Fill in local value, slot starts at (rank*stride)
{
const word& world = worlds[proci];
char* slot = (allStrings + (globalRanki*stride));
std::fill_n(slot, stride, '\0');
std::copy_n(worldName.data(), worldName.size(), slot);
}
worldIDs_[proci] = worldNames.find(world);
// Gather everything into the extended C-string
MPI_Allgather
(
MPI_IN_PLACE, 0, MPI_CHAR,
allStrings, stride, MPI_CHAR,
mpiGlobalComm
);
worldIDs_.resize_nocopy(numProcs);
// Transcribe and compact (unique world names)
DynamicList<word> uniqWorlds(numProcs);
for (label proci = 0; proci < numProcs; ++proci)
{
// Create from C-string at slot=(rank*stride),
// relying on the embedded nul chars
word world(allStrings + (proci*stride));
worldIDs_[proci] = uniqWorlds.find(world);
if (worldIDs_[proci] == -1)
{
worldIDs_[proci] = worldNames.size();
worldNames.push_back(world);
worldIDs_[proci] = uniqWorlds.size();
uniqWorlds.push_back(std::move(world));
}
}
allWorlds_.transfer(worldNames);
allWorlds_ = std::move(uniqWorlds);
}
Pstream::broadcasts(UPstream::commGlobal(), allWorlds_, worldIDs_);
const label myWorldId =
worldIDs_[UPstream::myProcNo(UPstream::commGlobal())];
const label myWorldId = worldIDs_[globalRanki];
DynamicList<label> subRanks;
forAll(worldIDs_, proci)
@ -358,54 +395,107 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
}
}
// Allocate new communicator with comm-global as its parent
const label subComm =
UPstream::allocateCommunicator(UPstream::commGlobal(), subRanks);
// New local-world communicator with comm-global as its parent.
// - the updated (const) world comm does not change after this.
UPstream::constWorldComm_ =
UPstream::newCommunicator(UPstream::commGlobal(), subRanks);
// Override worldComm
UPstream::worldComm = subComm;
// For testing: warn use of non-worldComm
UPstream::warnComm = UPstream::worldComm;
UPstream::worldComm = UPstream::constWorldComm_;
UPstream::warnComm = UPstream::constWorldComm_;
const int worldRanki = UPstream::myProcNo(UPstream::constWorldComm_);
// MPI_COMM_SELF : the processor number wrt the new world communicator
if (procIDs_[UPstream::commSelf()].size())
{
procIDs_[UPstream::commSelf()].front() =
UPstream::myProcNo(subComm);
procIDs_[UPstream::commSelf()].front() = worldRanki;
}
// Name the old world communicator as '<openfoam:global>'
// - it is the inter-world communicator
if (MPI_COMM_NULL != mpiGlobalComm)
{
MPI_Comm_set_name(mpiGlobalComm, "<openfoam:global>");
}
const auto mpiWorldComm =
PstreamGlobals::MPICommunicators_[UPstream::constWorldComm_];
if (MPI_COMM_NULL != mpiWorldComm)
{
MPI_Comm_set_name(mpiWorldComm, ("world=" + worldName).data());
}
if (UPstream::debug)
{
// Check
int subNumProcs, subRank;
MPI_Comm_size
(
PstreamGlobals::MPICommunicators_[subComm],
&subNumProcs
);
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[subComm],
&subRank
);
int newRanki, newSize;
MPI_Comm_rank(mpiWorldComm, &newRanki);
MPI_Comm_size(mpiWorldComm, &newSize);
Perr<< "UPstream::init : in world:" << world
<< " using local communicator:" << subComm
<< " rank " << subRank
<< " of " << subNumProcs
<< endl;
Perr<< "UPstream::init : in world:" << worldName
<< " using local communicator:" << constWorldComm_
<< " rank " << newRanki << " of " << newSize << endl;
}
// Override Pout prefix (move to setParRun?)
Pout.prefix() = '[' + world + '/' + name(myProcNo(subComm)) + "] ";
Pout.prefix() = '[' + worldName + '/' + Foam::name(worldRanki) + "] ";
Perr.prefix() = Pout.prefix();
}
else
{
// All processors use world 0
worldIDs_.resize_nocopy(numprocs);
worldIDs_.resize_nocopy(numProcs);
worldIDs_ = 0;
const auto mpiWorldComm =
PstreamGlobals::MPICommunicators_[UPstream::constWorldComm_];
// Name the world communicator as '<openfoam:world>'
if (MPI_COMM_NULL != mpiWorldComm)
{
MPI_Comm_set_name(mpiWorldComm, "<openfoam:world>");
}
}
// Define inter-node and intra-node communicators
if (UPstream::nodeCommsControl_ >= 4)
{
// Debugging: split with given number per node
setHostCommunicators(UPstream::nodeCommsControl_);
}
#ifndef MSMPI_VER /* Uncertain if this would work with MSMPI */
else if (UPstream::nodeCommsControl_ == 2)
{
// Defined based on shared-memory hardware information
setSharedMemoryCommunicators();
}
#endif
else
{
// Defined based on hostname, even if nominally disabled
setHostCommunicators();
}
// Provide some names for these communicators
if (MPI_COMM_NULL != PstreamGlobals::MPICommunicators_[commInterNode_])
{
MPI_Comm_set_name
(
PstreamGlobals::MPICommunicators_[commInterNode_],
"<openfoam:inter-node>"
);
}
if (MPI_COMM_NULL != PstreamGlobals::MPICommunicators_[commLocalNode_])
{
MPI_Comm_set_name
(
PstreamGlobals::MPICommunicators_[commLocalNode_],
"<openfoam:local-node>"
);
}
attachOurBuffers();
@ -455,7 +545,7 @@ void Foam::UPstream::shutdown(int errNo)
if (errNo != 0)
{
MPI_Abort(MPI_COMM_WORLD, errNo);
UPstream::abort(errNo);
return;
}
@ -515,9 +605,26 @@ void Foam::UPstream::exit(int errNo)
}
void Foam::UPstream::abort()
void Foam::UPstream::abort(int errNo)
{
MPI_Abort(MPI_COMM_WORLD, 1);
// TBD: only abort on our own communicator?
#if 0
MPI_Comm abortComm = MPI_COMM_WORLD;
const label index = UPstream::commGlobal();
if (index > 0 && index < PstreamGlobals::MPICommunicators_.size())
{
abortComm = PstreamGlobals::MPICommunicators_[index];
if (MPI_COMM_NULL == abortComm)
{
abortComm = MPI_COMM_WORLD;
}
}
MPI_Abort(abortComm, errNo);
#endif
MPI_Abort(MPI_COMM_WORLD, errNo);
}
@ -529,19 +636,9 @@ void Foam::UPstream::allocateCommunicatorComponents
const label index
)
{
if (index == PstreamGlobals::MPICommunicators_.size())
{
// Extend storage with null values
PstreamGlobals::pendingMPIFree_.emplace_back(false);
PstreamGlobals::MPICommunicators_.emplace_back(MPI_COMM_NULL);
}
else if (index > PstreamGlobals::MPICommunicators_.size())
{
FatalErrorInFunction
<< "PstreamGlobals out of sync with UPstream data. Problem."
<< Foam::exit(FatalError);
}
PstreamGlobals::initCommunicator(index);
int returnCode = MPI_SUCCESS;
if (parentIndex == -1)
{
@ -554,27 +651,19 @@ void Foam::UPstream::allocateCommunicatorComponents
<< UPstream::commGlobal()
<< Foam::exit(FatalError);
}
auto& mpiNewComm = PstreamGlobals::MPICommunicators_[index];
PstreamGlobals::pendingMPIFree_[index] = false;
PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
// PstreamGlobals::pendingMPIFree_[index] = false;
// PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
// TBD: MPI_Comm_dup(MPI_COMM_WORLD, ...);
// with pendingMPIFree_[index] = true
// Note: freeCommunicatorComponents() may need an update
PstreamGlobals::pendingMPIFree_[index] = true;
MPI_Comm_dup(MPI_COMM_WORLD, &mpiNewComm);
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
);
MPI_Comm_rank(mpiNewComm, &myProcNo_[index]);
// Set the number of ranks to the actual number
int numProcs;
MPI_Comm_size
(
PstreamGlobals::MPICommunicators_[index],
&numProcs
);
int numProcs = 0;
MPI_Comm_size(mpiNewComm, &numProcs);
// identity [0-numProcs], as 'int'
procIDs_[index].resize_nocopy(numProcs);
@ -589,21 +678,6 @@ void Foam::UPstream::allocateCommunicatorComponents
MPI_Comm_rank(MPI_COMM_SELF, &myProcNo_[index]);
// Number of ranks is always 1 (self communicator)
#ifdef FULLDEBUG
int numProcs;
MPI_Comm_size(MPI_COMM_SELF, &numProcs);
if (numProcs != 1)
{
// Already finalized - this is an error
FatalErrorInFunction
<< "MPI_COMM_SELF had " << numProcs << " != 1 ranks!\n"
<< Foam::abort(FatalError);
}
#endif
// For MPI_COMM_SELF : the process IDs within the world communicator.
// Uses MPI_COMM_WORLD in case called before UPstream::commGlobal()
// was initialized
@ -613,17 +687,20 @@ void Foam::UPstream::allocateCommunicatorComponents
}
else
{
// General sub-communicator
// General sub-communicator.
// Create based on the groupings predefined by procIDs_
const auto mpiParentComm =
PstreamGlobals::MPICommunicators_[parentIndex];
auto& mpiNewComm =
PstreamGlobals::MPICommunicators_[index];
PstreamGlobals::pendingMPIFree_[index] = true;
// Starting from parent
MPI_Group parent_group;
MPI_Comm_group
(
PstreamGlobals::MPICommunicators_[parentIndex],
&parent_group
);
MPI_Comm_group(mpiParentComm, &parent_group);
MPI_Group active_group;
MPI_Group_incl
@ -638,18 +715,18 @@ void Foam::UPstream::allocateCommunicatorComponents
// ms-mpi (10.0 and others?) does not have MPI_Comm_create_group
MPI_Comm_create
(
PstreamGlobals::MPICommunicators_[parentIndex],
mpiParentComm,
active_group,
&PstreamGlobals::MPICommunicators_[index]
&mpiNewComm
);
#else
// Create new communicator for this group
MPI_Comm_create_group
(
PstreamGlobals::MPICommunicators_[parentIndex],
mpiParentComm,
active_group,
UPstream::msgType(),
&PstreamGlobals::MPICommunicators_[index]
&mpiNewComm
);
#endif
@ -657,27 +734,34 @@ void Foam::UPstream::allocateCommunicatorComponents
MPI_Group_free(&parent_group);
MPI_Group_free(&active_group);
if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
if (MPI_COMM_NULL == mpiNewComm)
{
// No communicator created
// This process is not involved in the new communication pattern
myProcNo_[index] = -1;
PstreamGlobals::pendingMPIFree_[index] = false;
// ~~~~~~~~~
// IMPORTANT
// ~~~~~~~~~
// Always retain knowledge of the inter-node leaders,
// even if this process is not on that communicator.
// This will help when constructing topology-aware communication.
if (index != commInterNode_)
{
procIDs_[index].clear();
}
}
else
{
if
(
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
)
)
returnCode = MPI_Comm_rank(mpiNewComm, &myProcNo_[index]);
if (FOAM_UNLIKELY(MPI_SUCCESS != returnCode))
{
FatalErrorInFunction
<< "Problem :"
<< " when allocating communicator at " << index
<< " from ranks " << procIDs_[index]
<< " from ranks " << flatOutput(procIDs_[index])
<< " of parent " << parentIndex
<< " cannot find my own rank"
<< Foam::exit(FatalError);
@ -687,6 +771,99 @@ void Foam::UPstream::allocateCommunicatorComponents
}
void Foam::UPstream::dupCommunicatorComponents
(
const label parentIndex,
const label index
)
{
PstreamGlobals::initCommunicator(index);
PstreamGlobals::pendingMPIFree_[index] = true;
MPI_Comm_dup
(
PstreamGlobals::MPICommunicators_[parentIndex],
&PstreamGlobals::MPICommunicators_[index]
);
myProcNo_[index] = myProcNo_[parentIndex];
procIDs_[index] = procIDs_[parentIndex];
}
void Foam::UPstream::splitCommunicatorComponents
(
const label parentIndex,
const label index,
int colour
)
{
PstreamGlobals::initCommunicator(index);
// ------------------------------------------------------------------------
// Create sub-communicator according to its colouring
// => MPI_Comm_split().
// Since other parts of OpenFOAM may still need a view of the siblings:
// => MPI_Group_translate_ranks().
//
// The MPI_Group_translate_ranks() step can be replaced with an
// MPI_Allgather() of the involved parent ranks (since we alway maintain
// the relative rank order when splitting).
//
// Since MPI_Comm_split() already does an MPI_Allgather() internally
// to pick out the colours (and do any sorting), we can simply to
// do the same thing:
//
// Do the Allgather first and pickout identical colours to define the
// group and create a communicator based on that.
//
// This is no worse than the Allgather communication overhead of using
// MPI_Comm_split() and saves the extra translate_ranks step.
// ------------------------------------------------------------------------
const auto mpiParentComm = PstreamGlobals::MPICommunicators_[parentIndex];
int parentRank = 0;
int parentSize = 0;
MPI_Comm_rank(mpiParentComm, &parentRank);
MPI_Comm_size(mpiParentComm, &parentSize);
// Initialize, first marking the 'procIDs_' with the colours
auto& procIds = procIDs_[index];
myProcNo_[index] = -1;
procIds.resize_nocopy(parentSize);
procIds[parentRank] = colour;
MPI_Allgather
(
MPI_IN_PLACE, 0, MPI_INT,
procIds.data(), 1, MPI_INT,
mpiParentComm
);
if (colour < 0)
{
procIds.clear();
}
else
{
auto last =
std::copy_if
(
procIds.cbegin(),
procIds.cend(),
procIds.begin(),
[=](int c){ return (c == colour); }
);
procIds.resize(std::distance(procIds.begin(), last));
}
allocateCommunicatorComponents(parentIndex, index);
}
void Foam::UPstream::freeCommunicatorComponents(const label index)
{
if (UPstream::debug)
@ -717,6 +894,164 @@ void Foam::UPstream::freeCommunicatorComponents(const label index)
}
bool Foam::UPstream::setSharedMemoryCommunicators()
{
// Uses the world communicator (not global communicator)
// Skip if non-parallel
if (!UPstream::parRun())
{
numNodes_ = 1;
return false;
}
if (FOAM_UNLIKELY(commInterNode_ >= 0 || commLocalNode_ >= 0))
{
// Failed sanity check
FatalErrorInFunction
<< "Node communicator(s) already created!" << endl
<< Foam::abort(FatalError);
return false;
}
commInterNode_ = getAvailableCommIndex(constWorldComm_);
commLocalNode_ = getAvailableCommIndex(constWorldComm_);
PstreamGlobals::initCommunicator(commInterNode_);
PstreamGlobals::initCommunicator(commLocalNode_);
// Overwritten later
myProcNo_[commInterNode_] = UPstream::masterNo();
myProcNo_[commLocalNode_] = UPstream::masterNo();
// Sorted order, purely cosmetic
if (commLocalNode_ < commInterNode_)
{
std::swap(commLocalNode_, commInterNode_);
}
if (debug)
{
Perr<< "Allocating node communicators "
<< commInterNode_ << ", " << commLocalNode_ << nl
<< " parent : " << constWorldComm_ << nl
<< endl;
}
const auto mpiParentComm =
PstreamGlobals::MPICommunicators_[constWorldComm_];
auto& mpiLocalNode =
PstreamGlobals::MPICommunicators_[commLocalNode_];
int parentRank = 0;
int parentSize = 0;
MPI_Comm_rank(mpiParentComm, &parentRank);
MPI_Comm_size(mpiParentComm, &parentSize);
List<int> nodeLeaders(parentSize);
nodeLeaders = -1;
MPI_Comm_split_type
(
mpiParentComm,
MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,
&mpiLocalNode
);
if (FOAM_UNLIKELY(MPI_COMM_NULL == mpiLocalNode))
{
// This process is not involved in an intra-host communication?
// - should never happen!
const label index = commLocalNode_;
PstreamGlobals::pendingMPIFree_[index] = false;
myProcNo_[index] = -1;
procIDs_[index].clear();
FatalErrorInFunction
<< "Comm_split_type(shared) failed\n"
<< Foam::abort(FatalError);
}
else
{
// This process is involved in intra-host communication
const label index = commLocalNode_;
auto& procIds = procIDs_[index];
PstreamGlobals::pendingMPIFree_[index] = true;
int localRank = 0;
int localSize = 0;
MPI_Comm_rank(mpiLocalNode, &localRank);
MPI_Comm_size(mpiLocalNode, &localSize);
if (localRank == 0)
{
// This process is a host leader - mark its position
nodeLeaders[parentRank] = parentRank;
}
procIds.resize_nocopy(localSize);
procIds[localRank] = UPstream::myProcNo(UPstream::constWorldComm_);
// OR: procIds[localRank] = parentRank;
// Get all of the siblings (within the node)
MPI_Allgather
(
MPI_IN_PLACE, 0, MPI_INT,
procIds.data(), 1, MPI_INT,
mpiLocalNode
);
}
// Get all of the host-leader information and find who they are.
{
auto& procIds = procIDs_[commInterNode_];
MPI_Allgather
(
MPI_IN_PLACE, 0, MPI_INT,
nodeLeaders.data(), 1, MPI_INT,
mpiParentComm
);
// Capture the size (number of nodes) before doing anything further
numNodes_ = std::count_if
(
nodeLeaders.cbegin(),
nodeLeaders.cend(),
[](int rank){ return (rank >= 0); }
);
// ~~~~~~~~~
// IMPORTANT
// ~~~~~~~~~
// Always retain knowledge of the inter-node leaders,
// even if this process is not on that communicator.
// This will help when constructing topology-aware communication.
procIds.resize_nocopy(numNodes_);
std::copy_if
(
nodeLeaders.cbegin(),
nodeLeaders.cend(),
procIds.begin(),
[](int rank){ return (rank >= 0); }
);
}
// From master to host-leader. Ranks between hosts.
allocateCommunicatorComponents(UPstream::worldComm, commInterNode_);
return true;
}
void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
{
// No-op for non-parallel or not on communicator

View File

@ -162,7 +162,7 @@ Foam::label Foam::multiWorldConnections::createCommunicator(const edge& worlds)
}
// Allocate new communicator with global world
comm = UPstream::allocateCommunicator(UPstream::commGlobal(), subRanks);
comm = UPstream::newCommunicator(UPstream::commGlobal(), subRanks);
if (debug & 2)
{