From c4b261c6157f90c38f255fa40da8a6df19b9a909 Mon Sep 17 00:00:00 2001 From: Mark Olesen Date: Mon, 17 Mar 2025 15:43:58 +0100 Subject: [PATCH] ENH: add node-based gather(), listGather(), mapGather() --- src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H | 38 ++++ .../db/IOstreams/Pstreams/PstreamGather.txx | 192 +++++++++++++++++- 2 files changed, 227 insertions(+), 3 deletions(-) diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index 70294111f3..fb964d25e7 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -155,6 +155,19 @@ public: const int communicator ); + //- Implementation: gather (reduce) single element data onto + //- UPstream::masterNo() using a topo algorithm. + // \returns True if topo algorithm was applied + template + static bool gather_topo_algorithm + ( + //! [in,out] + T& value, + BinaryOp bop, + const int tag, + const int communicator + ); + //- Gather (reduce) data, applying \c bop to combine \c value //- from different processors. The basis for Foam::reduce(). // A no-op for non-parallel. @@ -256,6 +269,19 @@ public: const int communicator ); + //- Implementation: gather (reduce) list element data onto + //- UPstream::masterNo() using a topo algorithm. + // \returns True if topo algorithm was applied + template + static bool listGather_topo_algorithm + ( + //! [in,out] + UList& values, + BinaryOp bop, + const int tag, + const int communicator + ); + //- Gather (reduce) list elements, //- applying \c bop to each list element // @@ -337,6 +363,18 @@ public: const int communicator ); + //- Implementation: gather (reduce) Map/HashTable containers onto + //- UPstream::masterNo() using a topo algorithm. + // \returns True if topo algorithm was applied + template + static bool mapGather_topo_algorithm + ( + Container& values, + BinaryOp bop, + const int tag, + const int communicator + ); + //- Gather (reduce) Map/HashTable containers, //- applying \c bop to combine entries from different processors. // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx index 3b5e34c973..6ddb82f2f9 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx @@ -147,6 +147,59 @@ void Foam::Pstream::gather_algorithm } +template +bool Foam::Pstream::gather_topo_algorithm +( + T& value, + BinaryOp bop, + const int tag, + const int communicator +) +{ + const bool withTopo = + ( + UPstream::is_parallel(communicator) + && UPstream::usingTopoControl(UPstream::topoControls::combine) + && UPstream::usingNodeComms(communicator) + ); + + if (withTopo) + { + // Topological reduce + // - linear for local-node (assume communication is fast) + // - tree for inter-node (no assumption about speed) + + using control = std::pair; + + for + ( + auto [subComm, linear] : + { + // 1: within node + control{ UPstream::commLocalNode(), true }, + // 2: between nodes + control{ UPstream::commInterNode(), false } + } + ) + { + if (UPstream::is_parallel(subComm)) + { + Pstream::gather_algorithm + ( + UPstream::whichCommunication(subComm, linear), + value, + bop, + tag, + subComm + ); + } + } + } + + return withTopo; +} + + template void Foam::Pstream::gather ( @@ -172,7 +225,16 @@ void Foam::Pstream::gather communicator ); } - else + else if + ( + !Pstream::gather_topo_algorithm + ( + value, + bop, + tag, + communicator + ) + ) { // Communication order const auto& commOrder = UPstream::whichCommunication(communicator); @@ -337,6 +399,59 @@ void Foam::Pstream::listGather_algorithm } +template +bool Foam::Pstream::listGather_topo_algorithm +( + UList& values, + BinaryOp bop, + const int tag, + const label communicator +) +{ + const bool withTopo = + ( + UPstream::is_parallel(communicator) && !values.empty() + && UPstream::usingTopoControl(UPstream::topoControls::combine) + && UPstream::usingNodeComms(communicator) + ); + + if (withTopo) + { + // Topological reduce + // - linear for local-node (assume communication is fast) + // - tree for inter-node (no assumption about speed) + + using control = std::pair; + + for + ( + auto [subComm, linear] : + { + // 1: within node + control{ UPstream::commLocalNode(), true }, + // 2: between nodes + control{ UPstream::commInterNode(), false } + } + ) + { + if (UPstream::is_parallel(subComm)) + { + Pstream::listGather_algorithm + ( + UPstream::whichCommunication(subComm, linear), + values, + bop, + tag, + subComm + ); + } + } + } + + return withTopo; +} + + template void Foam::Pstream::listGather ( @@ -373,7 +488,16 @@ void Foam::Pstream::listGather communicator ); } - else + else if + ( + !Pstream::listGather_topo_algorithm + ( + values, + bop, + tag, + communicator + ) + ) { // Communication order const auto& commOrder = UPstream::whichCommunication(communicator); @@ -548,6 +672,59 @@ void Foam::Pstream::mapGather_algorithm } +template +bool Foam::Pstream::mapGather_topo_algorithm +( + Container& values, + BinaryOp bop, + const int tag, + const int communicator +) +{ + const bool withTopo = + ( + UPstream::is_parallel(communicator) + && UPstream::usingTopoControl(UPstream::topoControls::mapGather) + && UPstream::usingNodeComms(communicator) + ); + + if (withTopo) + { + // Topological reduce + // - linear for local-node (assume communication is fast) + // - tree for inter-node (no assumption about speed) + + using control = std::pair; + + for + ( + auto [subComm, linear] : + { + // 1: within node + control{ UPstream::commLocalNode(), true }, + // 2: between nodes + control{ UPstream::commInterNode(), false } + } + ) + { + if (UPstream::is_parallel(subComm)) + { + Pstream::mapGather_algorithm + ( + UPstream::whichCommunication(subComm, linear), + values, + bop, + tag, + subComm + ); + } + } + } + + return withTopo; +} + + template void Foam::Pstream::mapGather ( @@ -562,7 +739,16 @@ void Foam::Pstream::mapGather // Nothing to do return; } - else + else if + ( + !Pstream::mapGather_topo_algorithm + ( + values, + bop, + tag, + communicator + ) + ) { // Communication order const auto& commOrder = UPstream::whichCommunication(communicator);