ENH: add node-based gatherList()

This commit is contained in:
Mark Olesen 2025-03-17 15:52:22 +01:00 committed by Mark OLESEN
parent c4b261c615
commit a01f3ed8b7
2 changed files with 150 additions and 1 deletions

View File

@ -459,6 +459,17 @@ public:
const int communicator
);
//- Gather data, keeping individual values separate.
// \returns True if topo algorithm was applied
template<class T>
static bool gatherList_topo_algorithm
(
//! [in,out]
UList<T>& values,
const int tag,
const int communicator
);
//- Implementation: inverse of gatherList_algorithm
template<class T>
static void scatterList_algorithm

View File

@ -252,6 +252,127 @@ void Foam::Pstream::gatherList_algorithm
}
template<class T>
bool Foam::Pstream::gatherList_topo_algorithm
(
UList<T>& values,
const int tag,
const int communicator
)
{
const bool withTopo =
(
UPstream::is_parallel(communicator)
&& UPstream::usingTopoControl(UPstream::topoControls::gatherList)
&& UPstream::usingNodeComms(communicator)
);
if (withTopo)
{
// Topological gathering
if (FOAM_UNLIKELY(values.size() < UPstream::nProcs(communicator)))
{
FatalErrorInFunction
<< "List of values:" << values.size()
<< " < numProcs:" << UPstream::nProcs(communicator) << nl
<< Foam::abort(FatalError);
}
// Overall node-wise offsets
const auto& off = UPstream::interNode_offsets();
// The per-node processor range
const auto& nodeProcs = UPstream::localNode_parentProcs();
// The per-node sub-section of values
auto nodeValues = values.slice(nodeProcs.start(), nodeProcs.size());
// Stage 1: gather values within a node
// - linear for local-node (assume communication is fast)
if (UPstream::is_parallel(UPstream::commLocalNode()))
{
const auto subComm = UPstream::commLocalNode();
constexpr bool linear(true);
Pstream::gatherList_algorithm<T>
(
UPstream::whichCommunication(subComm, linear),
nodeValues,
tag,
subComm
);
}
// Stage 2: gather between node leaders
// - this unfortunately corresponds to a gatherv process
// (number of cores per node is not identical)
// - code strongly resembles globalIndex::gather
if (UPstream::is_parallel(UPstream::commInterNode()))
{
const auto subComm = UPstream::commInterNode();
if (UPstream::master(subComm))
{
for (const int proci : UPstream::subProcs(subComm))
{
auto slot =
values.slice(off[proci], off[proci+1]-off[proci]);
// Probably not contiguous though,
// otherwise would have used mpiGather()
if constexpr (is_contiguous_v<T>)
{
UIPstream::read
(
UPstream::commsTypes::scheduled,
proci,
slot,
tag,
subComm
);
}
else
{
IPstream::recv(slot, proci, tag, subComm);
}
}
}
else
{
if constexpr (is_contiguous_v<T>)
{
UOPstream::write
(
UPstream::commsTypes::scheduled,
UPstream::masterNo(),
nodeValues,
tag,
subComm
);
}
else
{
OPstream::send
(
nodeValues,
UPstream::commsTypes::scheduled,
UPstream::masterNo(),
tag,
subComm
);
}
}
}
}
return withTopo;
}
template<class T>
void Foam::Pstream::scatterList_algorithm
(
@ -448,7 +569,15 @@ void Foam::Pstream::gatherList
auto* ptr = values.data() + UPstream::myProcNo(communicator);
UPstream::mpiGather(ptr, ptr, 1, communicator);
}
else
else if
(
!Pstream::gatherList_topo_algorithm
(
values,
tag,
communicator
)
)
{
// Communication order
const auto& commOrder = UPstream::whichCommunication(communicator);
@ -486,6 +615,15 @@ void Foam::Pstream::allGatherList
}
else
{
// IMPORTANT: always call the *_algorithm() versions here and
// never the base versions [eg, Pstream::gatherList()] since
// the communcation order must be absolutely identical
// for the gatherList and scatterList, otherwise the results will
// not replicate the allGather behaviour.
//
// This also means that we must avoid the gatherList_topo_algorithm()
// as well, since this does not pair well with scatterList_algorithm()
// Communication order
const auto& commOrder = UPstream::whichCommunication(communicator);