ENH: direct support for broadcast of bitSet

- the internal data are contiguous so can broadcast size and internals
  directly without an intermediate stream.

ENH: split out broadcast time for profilingPstream information

STYLE: minor Pstream cleanup

- UPstream::commsType_ from protected to private, since it already has
  inlined noexcept getters/setters that should be used.

- don't pass unused/unneed tag into low-level MPI reduction templates.
  Document where tags are not needed

- had Pstream::broadcast instead of UPstream::broadcast in internals
This commit is contained in:
Mark Olesen 2022-03-03 14:21:31 +01:00 committed by Andrew Heather
parent 341d9c402d
commit e11fde900c
21 changed files with 214 additions and 165 deletions

View File

@ -34,6 +34,7 @@ Description
#include "List.H"
#include "argList.H"
#include "Time.H"
#include "bitSet.H"
#include "vector.H"
#include "IPstream.H"
#include "OPstream.H"
@ -74,6 +75,16 @@ void testBroadcast(List<T>& values)
}
void testBroadcast(bitSet& values)
{
Pout<< "pre-broadcast: "
<< values.size() << ": " << flatOutput(values.values()) << endl;
Pstream::broadcast(values);
Pout<< "post-broadcast: "
<< values.size() << ": " << flatOutput(values.values()) << endl;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
int main(int argc, char *argv[])
@ -149,6 +160,22 @@ int main(int argc, char *argv[])
testBroadcast(values);
}
{
bitSet values;
if (Pstream::master())
{
values.set(labelList({1, 4, 8}));
values.resize(10);
}
else
{
// Just something different
values.set(labelList({0, 2}));
values.resize(5);
}
testBroadcast(values);
}
Info<< "End\n" << endl;
return 0;

View File

@ -102,10 +102,10 @@ public:
//- Construct for broadcast root, optional buffer size, read format
IPBstream
(
const commsTypes commsType,
const int fromProcNo, //!< UPstream::masterNo() - root procNo
const commsTypes commsType, //!< ignored
const int rootProcNo, //!< normally UPstream::masterNo()
const label bufSize = 0,
const int tag = UPstream::msgType(),
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
@ -114,7 +114,7 @@ public:
//- write format
explicit IPBstream
(
const int fromProcNo, //!< UPstream::masterNo() - root procNo
const int rootProcNo, //!< normally UPstream::masterNo()
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);

View File

@ -89,13 +89,13 @@ public:
// Constructors
//- Construct for broadcast root, optional buffer size, read format
//- Construct for broadcast root, optional buffer size, write format
OPBstream
(
const commsTypes commsType,
const int toProcNo, //!< UPstream::masterNo() - root procNo
const commsTypes commsType, //!< ignored
const int rootProcNo, //!< normally UPstream::masterNo()
const label bufSize = 0,
const int tag = UPstream::msgType(),
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
@ -104,7 +104,7 @@ public:
//- write format
explicit OPBstream
(
const int toProcNo, //!< UPstream::masterNo() - root procNo
const int rootProcNo, //!< normally UPstream::masterNo()
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);

View File

@ -5,7 +5,8 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2012 OpenFOAM Foundation
Copyright (C) 2011 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -26,12 +27,49 @@ License
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "bitSet.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
namespace Foam
{
defineTypeNameAndDebug(Pstream, 0);
defineTypeNameAndDebug(Pstream, 0);
}
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
void Foam::Pstream::broadcast
(
bitSet& values,
const label comm
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
// Broadcast the size
label len(values.size());
UPstream::broadcast
(
reinterpret_cast<char*>(&len),
sizeof(label),
comm,
UPstream::masterNo()
);
values.resize_nocopy(len); // A no-op on master
if (len)
{
UPstream::broadcast
(
values.data_bytes(),
values.size_bytes(),
comm,
UPstream::masterNo()
);
}
}
}

View File

@ -54,6 +54,9 @@ SourceFiles
namespace Foam
{
// Forward Declarations
class bitSet;
/*---------------------------------------------------------------------------*\
Class Pstream Declaration
\*---------------------------------------------------------------------------*/
@ -166,6 +169,14 @@ public:
const label comm = UPstream::worldComm
);
//- Broadcast bitSet values
//- to all processes in communicator.
static void broadcast
(
bitSet& values,
const label comm = UPstream::worldComm
);
// Gather

View File

@ -29,7 +29,7 @@ License
#include "IPstream.H"
#include "contiguous.H"
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
template<class T>
void Foam::Pstream::genericBroadcast(T& value, const label comm)
@ -80,7 +80,7 @@ void Foam::Pstream::broadcast(List<T>& values, const label comm)
}
else if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
// Broadcast the size of the list
// Broadcast the size
label len(values.size());
UPstream::broadcast
(
@ -114,7 +114,7 @@ void Foam::Pstream::broadcast(DynamicList<T, SizeMin>& values, const label comm)
}
else if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
// Broadcast the size of the list
// Broadcast the size
label len(values.size());
UPstream::broadcast
(

View File

@ -171,7 +171,7 @@ void reduce
(
bool& value,
const andOp<bool>&,
const int tag = UPstream::msgType(),
const int tag = UPstream::msgType(), /*!< (ignored) */
const label comm = UPstream::worldComm
);
@ -180,7 +180,7 @@ void reduce
(
bool& value,
const orOp<bool>&,
const int tag = UPstream::msgType(),
const int tag = UPstream::msgType(), /*!< (ignored) */
const label comm = UPstream::worldComm
);
@ -197,7 +197,7 @@ void reduce \
( \
Native& value, \
const minOp<Native>&, \
const int tag = UPstream::msgType(), \
const int tag = UPstream::msgType(), /*!< (ignored) */ \
const label comm = UPstream::worldComm \
); \
\
@ -206,7 +206,7 @@ void reduce \
( \
Native& value, \
const maxOp<Native>&, \
const int tag = UPstream::msgType(), \
const int tag = UPstream::msgType(), /*!< (ignored) */ \
const label comm = UPstream::worldComm \
); \
\
@ -215,7 +215,7 @@ void reduce \
( \
Native& value, \
const sumOp<Native>&, \
const int tag = UPstream::msgType(), \
const int tag = UPstream::msgType(), /*!< (ignored) */ \
const label comm = UPstream::worldComm \
); \
\
@ -225,7 +225,7 @@ void reduce \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, \
const int tag, /*!< (ignored) */ \
const label comm \
); \
\
@ -235,7 +235,7 @@ inline void reduce \
( \
FixedList<Native, N>& values, \
const sumOp<Native>&, \
const int tag = UPstream::msgType(), \
const int tag = UPstream::msgType(), /*!< (ignored) */ \
const label comm = UPstream::worldComm \
) \
{ \
@ -265,7 +265,7 @@ void sumReduce \
( \
Native& value, \
label& count, \
const int tag = UPstream::msgType(), \
const int tag = UPstream::msgType(), /*!< (ignored) */ \
const label comm = UPstream::worldComm \
); \
\
@ -274,7 +274,7 @@ void reduce \
( \
Native& value, \
const sumOp<Native>&, \
const int tag, \
const int tag, /*!< (ignored) */ \
const label comm, \
label& requestID \
); \
@ -285,7 +285,7 @@ void reduce \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, \
const int tag, /*!< (ignored) */ \
const label comm, \
label& requestID \
);

View File

@ -290,13 +290,13 @@ public:
//- and IO format
UIPBstream
(
const commsTypes commsType, //!< ignored
const int fromProcNo, //!< UPstream::masterNo()
const commsTypes commsType, //!< ignored
const int rootProcNo, //!< normally UPstream::masterNo()
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
const int tag = UPstream::msgType(),
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm,
const bool clearAtEnd = false, // destroy receiveBuf if at end
const bool clearAtEnd = false, //!< destroy receiveBuf if at end
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
@ -317,8 +317,8 @@ public:
// \return the message size
static label read
(
const commsTypes commsTypes, //!< ignored
const int rootProcNo, //!< UPstream::masterNo()
const commsTypes commsTypes, //!< ignored
const int rootProcNo, //!< normally UPstream::masterNo()
char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(), //!< ignored

View File

@ -364,10 +364,10 @@ public:
//- and IO format
UOPBstream
(
const commsTypes commsType, //!< ignored
const int toProcNo, //!< UPstream::masterNo()
const commsTypes commsType, //!< ignored
const int toProcNo, //!< normally UPstream::masterNo()
DynamicList<char>& sendBuf,
const int tag = UPstream::msgType(), //!< ignored
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm,
const bool sendAtDestruct = true,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
@ -390,11 +390,11 @@ public:
// \return True on success
static bool write
(
const commsTypes commsType, //!< ignored
const int rootProcNo, //!< UPstream::masterNo()
const commsTypes commsType, //!< ignored
const int rootProcNo, //!< normally UPstream::masterNo()
const char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(), //!< ignored
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm
);
};

View File

@ -177,6 +177,12 @@ public:
private:
// Private Data
//- Communications type of this stream
commsTypes commsType_;
// Private Static Data
//- By default this is not a parallel run
@ -250,14 +256,6 @@ private:
);
protected:
// Protected Data
//- Communications type of this stream
commsTypes commsType_;
public:
// Declare name of the class and its debug switch
@ -271,8 +269,7 @@ public:
//- in accuracy
static bool floatTransfer;
//- Number of processors at which the sum algorithm changes from linear
//- to tree
//- Number of processors to change from linear to tree communication
static int nProcsSimpleSum;
//- Default commsType

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2019-2020 OpenCFD Ltd.
Copyright (C) 2019-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -35,8 +35,8 @@ SourceFiles
\*---------------------------------------------------------------------------*/
#ifndef profilingPstream_H
#define profilingPstream_H
#ifndef Foam_profilingPstream_H
#define Foam_profilingPstream_H
#include "cpuTime.H"
#include "FixedList.H"
@ -62,13 +62,14 @@ public:
{
GATHER = 0,
SCATTER,
BROADCAST,
REDUCE,
WAIT,
ALL_TO_ALL
};
//- The timing values
typedef FixedList<double, 5> timingList;
typedef FixedList<double, 6> timingList;
private:
@ -165,6 +166,12 @@ public:
addTime(SCATTER);
}
//- Add time increment to broadcastTime
inline static void addBroadcastTime()
{
addTime(BROADCAST);
}
//- Add time increment to reduceTime
inline static void addReduceTime()
{

View File

@ -27,7 +27,6 @@ License
#include "UIPstream.H"
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include "IOstreams.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //

View File

@ -77,7 +77,7 @@ void Foam::UIPstream::bufferIPCrecv()
messageSize_ = UIPstream::read
(
commsType_,
commsType(),
fromProcNo_,
recvBuf_.data(),
recvBuf_.capacity(),

View File

@ -27,7 +27,6 @@ License
#include "UOPstream.H"
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include <mpi.h>
@ -69,7 +68,7 @@ bool Foam::UOPBstream::bufferIPCsend()
{
if
(
!Pstream::broadcast
!UPstream::broadcast
(
sendBuf_.data(),
sendBuf_.size(), // same as bufSize

View File

@ -38,7 +38,7 @@ bool Foam::UOPstream::bufferIPCsend()
{
return UOPstream::write
(
commsType_,
commsType(),
toProcNo_,
sendBuf_.cdata(),
sendBuf_.size(),

View File

@ -77,14 +77,7 @@ bool Foam::UPstream::broadcast
PstreamGlobals::MPICommunicators_[communicator]
);
if (rootProcNo == UPstream::myProcNo(communicator))
{
profilingPstream::addScatterTime();
}
else
{
profilingPstream::addGatherTime();
}
profilingPstream::addBroadcastTime();
return !failed;
}

View File

@ -40,13 +40,13 @@ void Foam::reduce
(
bool& value,
const andOp<bool>&,
const int tag,
const int tag, /* (unused) */
const label comm
)
{
// This can also work:
// PstreamDetail::allReduce(&value, 1, MPI_BYTE, MPI_BAND, tag, comm);
PstreamDetail::allReduce(&value, 1, MPI_C_BOOL, MPI_LAND, tag, comm);
// PstreamDetail::allReduce(&value, 1, MPI_BYTE, MPI_BAND, comm);
PstreamDetail::allReduce(&value, 1, MPI_C_BOOL, MPI_LAND, comm);
}
@ -54,13 +54,13 @@ void Foam::reduce
(
bool& value,
const orOp<bool>&,
const int tag,
const int tag, /* (unused) */
const label comm
)
{
// This can also work:
// PstreamDetail::allReduce(&value, 1, MPI_BYTE, MPI_BOR, tag, comm);
PstreamDetail::allReduce(&value, 1, MPI_C_BOOL, MPI_LOR, tag, comm);
// PstreamDetail::allReduce(&value, 1, MPI_BYTE, MPI_BOR, comm);
PstreamDetail::allReduce(&value, 1, MPI_C_BOOL, MPI_LOR, comm);
}
@ -75,13 +75,13 @@ void Foam::reduce \
( \
Native& value, \
const minOp<Native>&, \
const int tag, \
const int tag, /* (unused) */ \
const label comm \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
&value, 1, TaggedType, MPI_MIN, tag, comm \
&value, 1, TaggedType, MPI_MIN, comm \
); \
} \
\
@ -89,13 +89,13 @@ void Foam::reduce \
( \
Native& value, \
const maxOp<Native>&, \
const int tag, \
const int tag, /* (unused) */ \
const label comm \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
&value, 1, TaggedType, MPI_MAX, tag, comm \
&value, 1, TaggedType, MPI_MAX, comm \
); \
} \
\
@ -103,13 +103,13 @@ void Foam::reduce \
( \
Native& value, \
const sumOp<Native>&, \
const int tag, \
const int tag, /* (unused) */ \
const label comm \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
&value, 1, TaggedType, MPI_SUM, tag, comm \
&value, 1, TaggedType, MPI_SUM, comm \
); \
} \
\
@ -118,13 +118,13 @@ void Foam::reduce \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, \
const int tag, /* (unused) */ \
const label comm \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
values, size, TaggedType, MPI_SUM, tag, comm \
values, size, TaggedType, MPI_SUM, comm \
); \
} \
@ -150,7 +150,7 @@ void Foam::sumReduce \
( \
Native& value, \
label& count, \
const int tag, \
const int tag, /* (unused) */ \
const label comm \
) \
{ \
@ -162,7 +162,7 @@ void Foam::sumReduce \
\
PstreamDetail::allReduce<Native> \
( \
values, 2, TaggedType, MPI_SUM, tag, comm \
values, 2, TaggedType, MPI_SUM, comm \
); \
\
value = values[0]; \
@ -174,7 +174,7 @@ void Foam::reduce \
( \
Native& value, \
const sumOp<Native>&, \
const int tag, \
const int tag, /* (unused) */ \
const label comm, \
label& requestID \
) \
@ -190,7 +190,7 @@ void Foam::reduce \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, \
const int tag, /* (unused) */ \
const label comm, \
label& requestID \
) \

View File

@ -70,7 +70,6 @@ void allReduce
int count,
MPI_Datatype datatype,
MPI_Op optype,
const int tag,
const label communicator
);

View File

@ -58,7 +58,7 @@ void Foam::PstreamDetail::allBroadcast
PstreamGlobals::MPICommunicators_[communicator]
);
profilingPstream::addScatterTime();
profilingPstream::addBroadcastTime();
}
@ -69,7 +69,6 @@ void Foam::PstreamDetail::allReduce
int count,
MPI_Datatype datatype,
MPI_Op optype,
const int tag,
const label communicator
)
{

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2019 OpenCFD Ltd.
Copyright (C) 2019-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -50,45 +50,6 @@ namespace functionObjects
);
} // End namespace functionObject
// Processor and time for each of: -min -max -sum
typedef FixedList<Tuple2<label, scalar>, 3> statData;
//- Reduction class. If x and y are not equal assign value.
struct statsEqOp
{
void operator()
(
FixedList<statData, 2>& xStats,
const FixedList<statData, 2>& yStats
) const
{
forAll(xStats, i)
{
statData& x = xStats[i];
const statData& y = yStats[i];
// 0 : min
// 1 : max
// 2 : sum
if (x[0].second() > y[0].second())
{
x[0].second() = y[0].second();
x[0].first() = y[0].first();
}
if (x[1].second() < y[1].second())
{
x[1].second() = y[1].second();
x[1].first() = y[1].first();
}
x[2].second() += y[2].second();
x[2].first()++;
}
}
};
} // End namespace Foam
@ -124,57 +85,59 @@ void Foam::functionObjects::parProfiling::report()
return;
}
typedef FixedList<Tuple2<label, scalar>, 3> statData;
FixedList<statData, 2> times;
// (Time, Processor) for each of: min/max/sum
typedef FixedList<Tuple2<double, int>, 3> statData;
typedef FixedList<statData, 2> statDataTimes;
// Reduction: if x and y are unequal assign value.
auto statsEqOp = [](statDataTimes& xStats, const statDataTimes& yStats)
{
forAll(xStats, i)
{
statData& x = xStats[i];
const statData& y = yStats[i];
// 0: min, 1: max, 2: total (or avg)
if (x[0].first() > y[0].first())
{
x[0] = y[0];
}
if (x[1].first() < y[1].first())
{
x[1] = y[1];
}
x[2].first() += y[2].first();
}
};
statDataTimes times;
{
const scalar masterTime =
const double masterTime =
(
profilingPstream::times(profilingPstream::REDUCE)
+ profilingPstream::times(profilingPstream::GATHER)
+ profilingPstream::times(profilingPstream::SCATTER)
// Include broadcast with reduce instead of all-to-all
+ profilingPstream::times(profilingPstream::BROADCAST)
);
statData& reduceStats = times[0];
Tuple2<label, scalar>& minTime = reduceStats[0];
minTime.first() = Pstream::myProcNo();
minTime.second() = masterTime;
Tuple2<label, scalar>& maxTime = reduceStats[1];
maxTime.first() = Pstream::myProcNo();
maxTime.second() = masterTime;
Tuple2<label, scalar>& sumTime = reduceStats[2];
sumTime.first() = 1;
sumTime.second() = masterTime;
times[0] = Tuple2<double, int>(masterTime, Pstream::myProcNo());
}
{
const scalar allTime =
const double allTime =
(
profilingPstream::times(profilingPstream::WAIT)
+ profilingPstream::times(profilingPstream::ALL_TO_ALL)
);
statData& allToAllStats = times[1];
Tuple2<label, scalar>& minTime = allToAllStats[0];
minTime.first() = Pstream::myProcNo();
minTime.second() = allTime;
Tuple2<label, scalar>& maxTime = allToAllStats[1];
maxTime.first() = Pstream::myProcNo();
maxTime.second() = allTime;
Tuple2<label, scalar>& sumTime = allToAllStats[2];
sumTime.first() = 1;
sumTime.second() = allTime;
times[1] = Tuple2<double, int>(allTime, Pstream::myProcNo());
}
profilingPstream::suspend();
Pstream::combineGather(times, statsEqOp());
Pstream::combineGather(times, statsEqOp);
profilingPstream::resume();
@ -184,21 +147,23 @@ void Foam::functionObjects::parProfiling::report()
const statData& reduceStats = times[0];
const statData& allToAllStats = times[1];
scalar reduceAvg = reduceStats[2].second()/Pstream::nProcs();
scalar allToAllAvg = allToAllStats[2].second()/Pstream::nProcs();
double reduceAvg = reduceStats[2].first()/Pstream::nProcs();
double allToAllAvg = allToAllStats[2].first()/Pstream::nProcs();
Info<< type() << ':' << nl
<< incrIndent
<< indent << "reduce : avg = " << reduceAvg << 's' << nl
<< indent << " min = " << reduceStats[0].second()
<< "s (processor " << reduceStats[0].first() << ')' << nl
<< indent << " max = " << reduceStats[1].second()
<< "s (processor " << reduceStats[1].first() << ')' << nl
<< indent << " min = " << reduceStats[0].first()
<< "s (processor " << reduceStats[0].second() << ')' << nl
<< indent << " max = " << reduceStats[1].first()
<< "s (processor " << reduceStats[1].second() << ')' << nl
<< indent << "all-all : avg = " << allToAllAvg << 's' << nl
<< indent << " min = " << allToAllStats[0].second()
<< "s (processor " << allToAllStats[0].first() << ')' << nl
<< indent << " max = " << allToAllStats[1].second()
<< "s (processor " << allToAllStats[1].first() << ')'
<< indent << " min = " << allToAllStats[0].first()
<< "s (processor " << allToAllStats[0].second() << ')' << nl
<< indent << " max = " << allToAllStats[1].first()
<< "s (processor " << allToAllStats[1].second() << ')'
<< decrIndent << endl;
}
}

View File

@ -0,0 +1,15 @@
// -*- C++ -*-
profiling
{
type parProfiling;
libs (utilityFunctionObjects);
// Report stats on exit only (instead of every time step)
executeControl onEnd;
writeControl none;
}
// ************************************************************************* //