ENH: extend wrapping of MPI calls to more data types

- supports gatherv of label and scalar types

- combine blocking and non-blocking interfaces in UPstreamWrapping
  (code reduction).

DEFEATURE: remove unused UPstream allToAllv wrapping

- backend interface preserved in UPstreamWrapping

COMP: add genericListBroadcast - simplifies code
This commit is contained in:
Mark Olesen 2022-03-28 14:13:11 +02:00
parent d38de84d21
commit 87e3b196b0
19 changed files with 1781 additions and 1005 deletions

View File

@ -143,6 +143,15 @@ public:
const label comm = UPstream::worldComm
);
//- Generic broadcast multiple values (contiguous or non-contiguous)
//- to all processes in communicator.
template<class ListType>
static void genericListBroadcast
(
ListType& values,
const label comm = UPstream::worldComm
);
//- Broadcast value (contiguous or non-contiguous)
//- to all processes in communicator.
template<class T>

View File

@ -51,6 +51,40 @@ void Foam::Pstream::genericBroadcast(T& value, const label comm)
}
template<class ListType>
void Foam::Pstream::genericListBroadcast(ListType& values, const label comm)
{
if (!is_contiguous<typename ListType::value_type>::value)
{
Pstream::genericBroadcast(values, comm);
}
else if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
// Broadcast the size
label len(values.size());
UPstream::broadcast
(
reinterpret_cast<char*>(&len),
sizeof(label),
comm,
UPstream::masterNo()
);
values.resize_nocopy(len); // A no-op on master
if (len)
{
UPstream::broadcast
(
values.data_bytes(),
values.size_bytes(),
comm,
UPstream::masterNo()
);
}
}
}
template<class T>
void Foam::Pstream::broadcast(T& value, const label comm)
{
@ -74,68 +108,14 @@ void Foam::Pstream::broadcast(T& value, const label comm)
template<class T>
void Foam::Pstream::broadcast(List<T>& values, const label comm)
{
if (!is_contiguous<T>::value)
{
Pstream::genericBroadcast(values, comm);
}
else if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
// Broadcast the size
label len(values.size());
UPstream::broadcast
(
reinterpret_cast<char*>(&len),
sizeof(label),
comm,
UPstream::masterNo()
);
values.resize_nocopy(len); // A no-op on master
if (len)
{
UPstream::broadcast
(
values.data_bytes(),
values.size_bytes(),
comm,
UPstream::masterNo()
);
}
}
Pstream::genericListBroadcast(values, comm);
}
template<class T, int SizeMin>
void Foam::Pstream::broadcast(DynamicList<T, SizeMin>& values, const label comm)
{
if (!is_contiguous<T>::value)
{
Pstream::genericBroadcast(values, comm);
}
else if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
// Broadcast the size
label len(values.size());
UPstream::broadcast
(
reinterpret_cast<char*>(&len),
sizeof(label),
comm,
UPstream::masterNo()
);
values.resize_nocopy(len); // A no-op on master
if (len)
{
UPstream::broadcast
(
values.data_bytes(),
values.size_bytes(),
comm,
UPstream::masterNo()
);
}
}
Pstream::genericListBroadcast(values, comm);
}

View File

@ -45,7 +45,8 @@ namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
//- Reduce operation with user specified communication schedule
//- Reduce inplace (cf. MPI Allreduce)
//- using specified communication schedule.
template<class T, class BinaryOp>
void reduce
(
@ -66,7 +67,8 @@ void reduce
}
//- Reduce (inplace) using either linear or tree communication schedule
//- Reduce inplace (cf. MPI Allreduce)
//- using linear/tree communication schedule
template<class T, class BinaryOp>
void reduce
(
@ -99,7 +101,8 @@ T returnReduce
}
//- Reduce with sum of both value and count (for averaging)
//- Reduce inplace (cf. MPI Allreduce)
//- the sum of both value and count (for averaging)
template<class T>
void sumReduce
(
@ -117,7 +120,8 @@ void sumReduce
}
//- Reduce multiple values (identical size on all processes!)
//- Reduce inplace (cf. MPI Allreduce)
//- multiple values (identical size on all processes!)
template<class T, class BinaryOp>
void reduce
(
@ -131,7 +135,8 @@ void reduce
NotImplemented;
}
//- Non-blocking reduce single value. Sets request.
//- Non-blocking reduce inplace (cf. MPI Iallreduce)
//- single value. Sets request.
template<class T, class BinaryOp>
void reduce
(
@ -145,8 +150,9 @@ void reduce
NotImplemented;
}
//- Non-blocking reduce multiple values (identical size on all processes!)
//- Sets request.
//- Non-blocking reduce inplace (cf. MPI Iallreduce)
//- of multiple values (same size on all processes!)
// Sets request.
template<class T, class BinaryOp>
void reduce
(
@ -166,7 +172,7 @@ void reduce
// Specialisations for bool
//- Logical (and) reduction
//- Logical (and) reduction (cf. MPI AllReduce)
void reduce
(
bool& value,
@ -175,7 +181,7 @@ void reduce
const label comm = UPstream::worldComm
);
//- Logical (or) reduction
//- Logical (or) reduction (cf. MPI AllReduce)
void reduce
(
bool& value,

View File

@ -585,78 +585,89 @@ public:
//- Shutdown (finalize) MPI as required and exit program with errNo.
static void exit(int errNo = 1);
//- Exchange label with all processors (in the communicator).
// sendData[proci] is the label to send to proci.
//- Exchange integer data with all processors (in the communicator).
// \c sendData[proci] is the value to send to proci.
// After return recvData contains the data from the other processors.
static void allToAll
(
const labelUList& sendData,
labelUList& recvData,
const UList<int32_t>& sendData,
UList<int32_t>& recvData,
const label communicator = worldComm
);
//- Exchange data with all processors (in the communicator)
// sendSizes, sendOffsets give (per processor) the slice of
// sendData to send, similarly recvSizes, recvOffsets give the slice
// of recvData to receive
//- Exchange integer data with all processors (in the communicator).
// \c sendData[proci] is the value to send to proci.
// After return recvData contains the data from the other processors.
static void allToAll
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const UList<int64_t>& sendData,
UList<int64_t>& recvData,
const label communicator = worldComm
);
//- Receive data from all processors on the master (low-level)
static void mpiGather
(
const char* sendData,
int sendSize,
char* recvData,
int recvSize,
const label communicator = worldComm
// Low-level gather/scatter routines
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
\
/*! \brief Receive identically-sized \c Native data from all ranks */ \
static void mpiGather \
( \
const Native* sendData, \
int sendCount, \
char* recvData, \
int recvCount, \
const label communicator = worldComm \
); \
\
/*! \brief Send identically-sized \c Native data to all ranks */ \
static void mpiScatter \
( \
const Native* sendData, \
int sendCount, \
char* recvData, \
int recvCount, \
const label communicator = worldComm \
); \
Pstream_CommonRoutines(char);
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
\
/*! \brief Receive variable length \c Native data from all ranks */ \
static void gather \
( \
const Native* sendData, \
int sendCount, /*!< Ignored on master if recvCount[0] == 0 */ \
Native* recvData, /*!< Ignored on non-root rank */ \
const UList<int>& recvCounts, /*!< Ignored on non-root rank */ \
const UList<int>& recvOffsets, /*!< Ignored on non-root rank */ \
const label communicator = worldComm \
); \
\
/*! \brief Send variable length \c Native data to all ranks */ \
static void scatter \
( \
const Native* sendData, /*!< Ignored on non-root rank */ \
const UList<int>& sendCounts, /*!< Ignored on non-root rank */ \
const UList<int>& sendOffsets, /*!< Ignored on non-root rank */ \
Native* recvData, \
int recvCount, \
const label communicator = worldComm \
);
//- Send data to all processors from master (low-level)
static void mpiScatter
(
const char* sendData,
int sendSize,
Pstream_CommonRoutines(char);
Pstream_CommonRoutines(int32_t);
Pstream_CommonRoutines(int64_t);
Pstream_CommonRoutines(uint32_t);
Pstream_CommonRoutines(uint64_t);
Pstream_CommonRoutines(float);
Pstream_CommonRoutines(double);
char* recvData,
int recvSize,
const label communicator = worldComm
);
//- Receive data from all processors on the master
static void gather
(
const char* sendData,
int sendSize,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator = worldComm
);
//- Send data to all processors from the root of the communicator
static void scatter
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
int recvSize,
const label communicator = worldComm
);
#undef Pstream_CommonRoutines
// Gather single, contiguous value(s)

View File

@ -1,5 +1,7 @@
UPstream.C
UPstreamAllToAll.C
UPstreamBroadcast.C
UPstreamGatherScatter.C
UPstreamReduce.C
UIPstreamRead.C

View File

@ -75,75 +75,6 @@ void Foam::UPstream::abort()
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
void Foam::UPstream::allToAll
(
const labelUList& sendData,
labelUList& recvData,
const label communicator
)
{
recvData.deepCopy(sendData);
}
void Foam::UPstream::mpiGather
(
const char* sendData,
int sendSize,
char* recvData,
int recvSize,
const label communicator
)
{
std::memmove(recvData, sendData, sendSize);
}
void Foam::UPstream::mpiScatter
(
const char* sendData,
int sendSize,
char* recvData,
int recvSize,
const label communicator
)
{
std::memmove(recvData, sendData, sendSize);
}
void Foam::UPstream::gather
(
const char* sendData,
int sendSize,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator
)
{
std::memmove(recvData, sendData, sendSize);
}
void Foam::UPstream::scatter
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
int recvSize,
const label communicator
)
{
std::memmove(recvData, sendData, recvSize);
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label,

View File

@ -0,0 +1,84 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "UPstream.H"
#include <cinttypes>
#include <cstring> // memmove
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
void Foam::UPstream::allToAll \
( \
const UList<Native>& sendData, \
UList<Native>& recvData, \
const label comm \
) \
{ \
recvData.deepCopy(sendData); \
} \
Pstream_CommonRoutines(int32_t);
Pstream_CommonRoutines(int64_t);
#undef Pstream_CommonRoutines
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
void Foam::UPstream::allToAll \
( \
const Native* sendData, \
const UList<int>& sendCounts, \
const UList<int>& sendOffsets, \
Native* recvData, \
const UList<int>& recvCounts, \
const UList<int>& recvOffsets, \
const label comm \
) \
{ \
if (recvCounts[0] != sendCounts[0]) \
{ \
FatalErrorInFunction \
<< "Number to send " << sendCounts[0] \
<< " does not equal number to receive " << recvCounts[0] \
<< Foam::abort(FatalError); \
} \
std::memmove(recvData, sendData, recvCounts[0]*sizeof(Native)); \
}
// Unused: Pstream_CommonRoutines(char);
#undef Pstream_CommonRoutines
// ************************************************************************* //

View File

@ -33,7 +33,7 @@ bool Foam::UPstream::broadcast
(
char* buf,
const std::streamsize bufSize,
const label communicator,
const label comm,
const int rootProcNo
)
{

View File

@ -0,0 +1,112 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "UPstream.H"
#include <cstring> // memmove
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
void Foam::UPstream::mpiGather \
( \
const Native* sendData, \
int sendCount, \
\
Native* recvData, \
int recvCount, \
const label comm \
) \
{ \
std::memmove(recvData, sendData, recvCount*sizeof(Native)); \
} \
\
\
void Foam::UPstream::mpiScatter \
( \
const Native* sendData, \
int sendCount, \
\
Native* recvData, \
int recvCount, \
const label comm \
) \
{ \
std::memmove(recvData, sendData, recvCount*sizeof(Native)); \
}
Pstream_CommonRoutines(char);
#undef Pstream_CommonRoutines
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
void Foam::UPstream::gather \
( \
const Native* sendData, \
int sendCount, \
\
Native* recvData, \
const UList<int>& recvCounts, \
const UList<int>& recvOffsets, \
const label comm \
) \
{ \
/* recvCounts[0] may be invalid - use sendCount instead */ \
std::memmove(recvData, sendData, sendCount*sizeof(Native)); \
} \
\
void Foam::UPstream::scatter \
( \
const Native* sendData, \
const UList<int>& sendCounts, \
const UList<int>& sendOffsets, \
\
Native* recvData, \
int recvCount, \
const label comm \
) \
{ \
std::memmove(recvData, sendData, recvCount*sizeof(Native)); \
}
//TDB: Pstream_CommonRoutines(bool);
Pstream_CommonRoutines(char);
Pstream_CommonRoutines(int32_t);
Pstream_CommonRoutines(int64_t);
Pstream_CommonRoutines(uint32_t);
Pstream_CommonRoutines(uint64_t);
Pstream_CommonRoutines(float);
Pstream_CommonRoutines(double);
#undef Pstream_CommonRoutines
// ************************************************************************* //

View File

@ -1,6 +1,8 @@
PstreamGlobals.C
UPstream.C
UPstreamAllToAll.C
UPstreamBroadcast.C
UPstreamGatherScatter.C
UPstreamReduce.C
UIPstreamRead.C

View File

@ -31,7 +31,7 @@ License
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include "SubList.H"
#include "allReduce.H"
#include "UPstreamWrapping.H"
#include "int.H"
#include "collatedFileOperation.H"
@ -482,428 +482,6 @@ void Foam::UPstream::abort()
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
void Foam::UPstream::allToAll
(
const labelUList& sendData,
labelUList& recvData,
const label communicator
)
{
const label np = nProcs(communicator);
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** allToAll :"
<< " np:" << np
<< " sendData:" << sendData.size()
<< " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
if (sendData.size() != np || recvData.size() != np)
{
FatalErrorInFunction
<< "Size of sendData " << sendData.size()
<< " or size of recvData " << recvData.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
recvData.deepCopy(sendData);
}
else
{
profilingPstream::beginTiming();
if
(
MPI_Alltoall
(
// NOTE: const_cast is a temporary hack for
// backward-compatibility with versions of OpenMPI < 1.7.4
const_cast<label*>(sendData.cdata()),
sizeof(label),
MPI_BYTE,
recvData.data(),
sizeof(label),
MPI_BYTE,
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
FatalErrorInFunction
<< "MPI_Alltoall failed for " << sendData
<< " on communicator " << communicator
<< Foam::abort(FatalError);
}
profilingPstream::addAllToAllTime();
}
}
void Foam::UPstream::allToAll
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator
)
{
const label np = nProcs(communicator);
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** MPI_Alltoallv :"
<< " sendSizes:" << sendSizes
<< " sendOffsets:" << sendOffsets
<< " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
if
(
sendSizes.size() != np
|| sendOffsets.size() != np
|| recvSizes.size() != np
|| recvOffsets.size() != np
)
{
FatalErrorInFunction
<< "Size of sendSize " << sendSizes.size()
<< ", sendOffsets " << sendOffsets.size()
<< ", recvSizes " << recvSizes.size()
<< " or recvOffsets " << recvOffsets.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
if (recvSizes[0] != sendSizes[0])
{
FatalErrorInFunction
<< "Bytes to send " << sendSizes[0]
<< " does not equal bytes to receive " << recvSizes[0]
<< Foam::abort(FatalError);
}
std::memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
}
else
{
profilingPstream::beginTiming();
if
(
MPI_Alltoallv
(
const_cast<char*>(sendData),
const_cast<int*>(sendSizes.cdata()),
const_cast<int*>(sendOffsets.cdata()),
MPI_BYTE,
recvData,
const_cast<int*>(recvSizes.cdata()),
const_cast<int*>(recvOffsets.cdata()),
MPI_BYTE,
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
FatalErrorInFunction
<< "MPI_Alltoallv failed for sendSizes " << sendSizes
<< " recvSizes " << recvSizes
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
profilingPstream::addAllToAllTime();
}
}
void Foam::UPstream::mpiGather
(
const char* sendData,
int sendSize,
char* recvData,
int recvSize,
const label communicator
)
{
const label np = nProcs(communicator);
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** MPI_Gather :"
<< " np:" << np
<< " recvSize:" << recvSize
<< " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
if (!UPstream::parRun())
{
std::memmove(recvData, sendData, recvSize);
}
else
{
profilingPstream::beginTiming();
if
(
MPI_Gather
(
const_cast<char*>(sendData),
sendSize,
MPI_BYTE,
recvData,
recvSize,
MPI_BYTE,
0,
MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
)
)
{
FatalErrorInFunction
<< "MPI_Gather failed for sendSize " << sendSize
<< " recvSize " << recvSize
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
profilingPstream::addGatherTime();
}
}
void Foam::UPstream::mpiScatter
(
const char* sendData,
int sendSize,
char* recvData,
int recvSize,
const label communicator
)
{
const label np = nProcs(communicator);
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** MPI_Scatter :"
<< " np:" << np
<< " recvSize:" << recvSize
<< " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
if (!UPstream::parRun())
{
std::memmove(recvData, sendData, recvSize);
}
else
{
profilingPstream::beginTiming();
if
(
MPI_Scatter
(
const_cast<char*>(sendData),
sendSize,
MPI_BYTE,
recvData,
recvSize,
MPI_BYTE,
0,
MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
)
)
{
FatalErrorInFunction
<< "MPI_Scatter failed for sendSize " << sendSize
<< " recvSize " << recvSize
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
profilingPstream::addScatterTime();
}
}
void Foam::UPstream::gather
(
const char* sendData,
int sendSize,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator
)
{
const label np = nProcs(communicator);
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** MPI_Gatherv :"
<< " np:" << np
<< " recvSizes:" << recvSizes
<< " recvOffsets:" << recvOffsets
<< " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
if
(
UPstream::master(communicator)
&& (recvSizes.size() != np || recvOffsets.size() < np)
)
{
// Note: allow recvOffsets to be e.g. 1 larger than np so we
// can easily loop over the result
FatalErrorInFunction
<< "Size of recvSizes " << recvSizes.size()
<< " or recvOffsets " << recvOffsets.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
// recvSizes[0] may be invalid - use sendSize instead
std::memmove(recvData, sendData, sendSize);
}
else
{
profilingPstream::beginTiming();
if
(
MPI_Gatherv
(
const_cast<char*>(sendData),
sendSize,
MPI_BYTE,
recvData,
const_cast<int*>(recvSizes.cdata()),
const_cast<int*>(recvOffsets.cdata()),
MPI_BYTE,
0,
MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
)
)
{
FatalErrorInFunction
<< "MPI_Gatherv failed for sendSize " << sendSize
<< " recvSizes " << recvSizes
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
profilingPstream::addGatherTime();
}
}
void Foam::UPstream::scatter
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
int recvSize,
const label communicator
)
{
const label np = nProcs(communicator);
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** MPI_Scatterv :"
<< " np:" << np
<< " sendSizes:" << sendSizes
<< " sendOffsets:" << sendOffsets
<< " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
if
(
UPstream::master(communicator)
&& (sendSizes.size() != np || sendOffsets.size() != np)
)
{
FatalErrorInFunction
<< "Size of sendSizes " << sendSizes.size()
<< " or sendOffsets " << sendOffsets.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
std::memmove(recvData, sendData, recvSize);
}
else
{
profilingPstream::beginTiming();
if
(
MPI_Scatterv
(
const_cast<char*>(sendData),
const_cast<int*>(sendSizes.cdata()),
const_cast<int*>(sendOffsets.cdata()),
MPI_BYTE,
recvData,
recvSize,
MPI_BYTE,
0,
MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
)
)
{
FatalErrorInFunction
<< "MPI_Scatterv failed for sendSizes " << sendSizes
<< " sendOffsets " << sendOffsets
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
profilingPstream::addScatterTime();
}
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label parentIndex,

View File

@ -0,0 +1,86 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "UPstreamWrapping.H"
#include <mpi.h>
#include <cinttypes>
#include <cstring> // memmove
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native, TaggedType) \
void Foam::UPstream::allToAll \
( \
const UList<Native>& sendData, \
UList<Native>& recvData, \
const label comm \
) \
{ \
PstreamDetail::allToAll \
( \
sendData, recvData, TaggedType, comm \
); \
} \
Pstream_CommonRoutines(int32_t, MPI_INT32_T);
Pstream_CommonRoutines(int64_t, MPI_INT64_T);
#undef Pstream_CommonRoutines
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native, TaggedType) \
void Foam::UPstream::allToAll \
( \
const Native* sendData, \
const UList<int>& sendCounts, \
const UList<int>& sendOffsets, \
Native* recvData, \
const UList<int>& recvCounts, \
const UList<int>& recvOffsets, \
const label comm \
) \
{ \
PstreamDetail::allToAllv \
( \
sendData, sendCounts, sendOffsets, \
recvData, recvCounts, recvOffsets, \
TaggedType, comm \
); \
}
// Unused: Pstream_CommonRoutines(char, MPI_BYTE);
#undef Pstream_CommonRoutines
// ************************************************************************* //

View File

@ -37,29 +37,29 @@ bool Foam::UPstream::broadcast
(
char* buf,
const std::streamsize bufSize,
const label communicator,
const label comm,
const int rootProcNo
)
{
if (!UPstream::parRun() || UPstream::nProcs(communicator) < 2)
if (!UPstream::parRun() || UPstream::nProcs(comm) < 2)
{
// Nothing to do - ignore
return true;
}
//Needed? PstreamGlobals::checkCommunicator(communicator, rootProcNo);
//Needed? PstreamGlobals::checkCommunicator(comm, rootProcNo);
if (debug)
{
Pout<< "UPstream::broadcast : root:" << rootProcNo
<< " comm:" << communicator
<< " comm:" << comm
<< " size:" << label(bufSize)
<< Foam::endl;
}
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
if (UPstream::warnComm != -1 && comm != UPstream::warnComm)
{
Pout<< "UPstream::broadcast : root:" << rootProcNo
<< " comm:" << communicator
<< " comm:" << comm
<< " size:" << label(bufSize)
<< " warnComm:" << UPstream::warnComm
<< Foam::endl;
@ -74,7 +74,7 @@ bool Foam::UPstream::broadcast
bufSize,
MPI_BYTE,
rootProcNo,
PstreamGlobals::MPICommunicators_[communicator]
PstreamGlobals::MPICommunicators_[comm]
);
profilingPstream::addBroadcastTime();

View File

@ -0,0 +1,133 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "UPstreamWrapping.H"
#include <mpi.h>
#include <cinttypes>
#include <cstring> // memmove
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native, TaggedType) \
void Foam::UPstream::mpiGather \
( \
const Native* sendData, \
int sendCount, \
\
Native* recvData, \
int recvCount, \
const label comm \
) \
{ \
PstreamDetail::gather \
( \
sendData, sendCount, recvData, recvCount, \
TaggedType, comm \
); \
} \
\
\
void Foam::UPstream::mpiScatter \
( \
const Native* sendData, \
int sendCount, \
\
Native* recvData, \
int recvCount, \
const label comm \
) \
{ \
PstreamDetail::scatter \
( \
sendData, sendCount, recvData, recvCount, \
TaggedType, comm \
); \
}
Pstream_CommonRoutines(char, MPI_BYTE);
#undef Pstream_CommonRoutines
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native, TaggedType) \
void Foam::UPstream::gather \
( \
const Native* sendData, \
int sendCount, \
\
Native* recvData, \
const UList<int>& recvCounts, \
const UList<int>& recvOffsets, \
const label comm \
) \
{ \
PstreamDetail::gatherv \
( \
sendData, sendCount, \
recvData, recvCounts, recvOffsets, \
TaggedType, comm \
); \
} \
\
void Foam::UPstream::scatter \
( \
const Native* sendData, \
const UList<int>& sendCounts, \
const UList<int>& sendOffsets, \
\
Native* recvData, \
int recvCount, \
const label comm \
) \
{ \
PstreamDetail::scatterv \
( \
sendData, sendCounts, sendOffsets, \
recvData, recvCount, \
TaggedType, comm \
); \
}
//TDB: Pstream_CommonRoutines(bool, MPI_C_BOOL);
Pstream_CommonRoutines(char, MPI_BYTE);
Pstream_CommonRoutines(int32_t, MPI_INT32_T);
Pstream_CommonRoutines(int64_t, MPI_INT64_T);
Pstream_CommonRoutines(uint32_t, MPI_UINT32_T);
Pstream_CommonRoutines(uint64_t, MPI_UINT64_T);
Pstream_CommonRoutines(float, MPI_FLOAT);
Pstream_CommonRoutines(double, MPI_DOUBLE);
#undef Pstream_CommonRoutines
// ************************************************************************* //

View File

@ -27,7 +27,7 @@ License
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "allReduce.H"
#include "UPstreamWrapping.H"
#include <mpi.h>
#include <cinttypes>
@ -179,9 +179,9 @@ void Foam::reduce \
label& requestID \
) \
{ \
PstreamDetail::iallReduce<Native> \
PstreamDetail::allReduce<Native> \
( \
&value, 1, TaggedType, MPI_SUM, comm, requestID \
&value, 1, TaggedType, MPI_SUM, comm, &requestID \
); \
} \
\
@ -195,9 +195,9 @@ void Foam::reduce \
label& requestID \
) \
{ \
PstreamDetail::iallReduce<Native> \
PstreamDetail::allReduce<Native> \
( \
values, size, TaggedType, MPI_SUM, comm, requestID \
values, size, TaggedType, MPI_SUM, comm, &requestID \
); \
}

View File

@ -0,0 +1,202 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2012-2016 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Namespace
Foam::PstreamDetail
Description
Some implementation details for Pstream and/or MPI.
InNamespace
Foam::PstreamDetail
Description
Functions to wrap MPI_Bcast, MPI_Allreduce, MPI_Iallreduce etc.
SourceFiles
UPstreamWrappingTemplates.C
\*---------------------------------------------------------------------------*/
#ifndef Foam_UPstreamWrapping_H
#define Foam_UPstreamWrapping_H
#include "UPstream.H"
#include <mpi.h>
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
namespace PstreamDetail
{
// MPI_Bcast, using root=0
template<class Type>
void broadcast0
(
Type* values,
int count,
MPI_Datatype datatype,
const label comm
);
// MPI_Reduce, using root=0
template<class Type>
void reduce0
(
Type* values,
int count,
MPI_Datatype datatype,
MPI_Op optype,
const label comm
);
// MPI_Allreduce or MPI_Iallreduce
template<class Type>
void allReduce
(
Type* values,
int count,
MPI_Datatype datatype,
MPI_Op optype,
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Iallreduce
);
// MPI_Alltoall or MPI_Ialltoall with one element per rank
template<class Type>
void allToAll
(
const UList<Type>& sendData,
UList<Type>& recvData,
MPI_Datatype datatype,
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Ialltoall
);
// MPI_Alltoallv or MPI_Ialltoallv
template<class Type>
void allToAllv
(
const Type* sendData,
const UList<int>& sendCounts,
const UList<int>& sendOffsets,
Type* recvData,
const UList<int>& recvCounts,
const UList<int>& recvOffsets,
MPI_Datatype datatype,
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Ialltoallv
);
// MPI_Gather or MPI_Igather
template<class Type>
void gather
(
const Type* sendData,
int sendCount,
Type* recvData, // Ignored on non-root rank
int recvCount, // Ignored on non-root rank
MPI_Datatype datatype, // The send/recv data type
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Igather
);
// MPI_Scatter or MPI_Iscatter
template<class Type>
void scatter
(
const Type* sendData, // Ignored on non-root rank
int sendCount, // Ignored on non-root rank
Type* recvData,
int recvCount,
MPI_Datatype datatype, // The send/recv data type
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Iscatter
);
// MPI_Gatherv or MPI_Igatherv
template<class Type>
void gatherv
(
const Type* sendData,
int sendCount, // Ignored on master if recvCounts[0] == 0
Type* recvData, // Ignored on non-root rank
const UList<int>& recvCounts, // Ignored on non-root rank
const UList<int>& recvOffsets, // Ignored on non-root rank
MPI_Datatype datatype, // The send/recv data type
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Igatherv
);
// MPI_Scatterv or MPI_Iscatterv
template<class Type>
void scatterv
(
const Type* sendData, // Ignored on non-root rank
const UList<int>& sendCounts, // Ignored on non-root rank
const UList<int>& sendOffsets, // Ignored on non-root rank
Type* recvData,
int recvCount,
MPI_Datatype datatype, // The send/recv data type
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Igatherv
);
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace PstreamDetail
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#ifdef NoRepository
#include "UPstreamWrappingTemplates.C"
#endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif
// ************************************************************************* //

File diff suppressed because it is too large Load Diff

View File

@ -1,117 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2012-2016 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Namespace
Foam::PstreamDetail
Description
Some implementation details for Pstream and/or MPI.
InNamespace
Foam::PstreamDetail
Description
Functions to wrap MPI_Bcast, MPI_Allreduce, MPI_Iallreduce
SourceFiles
allReduceTemplates.C
\*---------------------------------------------------------------------------*/
#ifndef Foam_allReduce_H
#define Foam_allReduce_H
#include "UPstream.H"
#include <mpi.h>
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
namespace PstreamDetail
{
// MPI_Bcast, using root=0
template<class Type>
void broadcast0
(
Type* values,
int count,
MPI_Datatype datatype,
const label communicator
);
// MPI_Reduce, using root=0
template<class Type>
void reduce0
(
Type* values,
int count,
MPI_Datatype datatype,
MPI_Op optype,
const label communicator
);
// MPI_Allreduce
template<class Type>
void allReduce
(
Type* values,
int count,
MPI_Datatype datatype,
MPI_Op optype,
const label communicator
);
// MPI_Iallreduce
template<class Type>
void iallReduce
(
Type* values,
int count,
MPI_Datatype datatype,
MPI_Op optype,
const label communicator,
label& requestID
);
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace PstreamDetail
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#ifdef NoRepository
#include "allReduceTemplates.C"
#endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif
// ************************************************************************* //

View File

@ -1,261 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2012-2015 OpenFOAM Foundation
Copyright (C) 2019-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "allReduce.H"
#include "profilingPstream.H"
#include "PstreamGlobals.H"
// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //
template<class Type>
void Foam::PstreamDetail::broadcast0
(
Type* values,
int count,
MPI_Datatype datatype,
const label communicator
)
{
if (!UPstream::parRun())
{
return;
}
profilingPstream::beginTiming();
// const int retval =
MPI_Bcast
(
values,
count,
datatype,
0, // (root process) is master == UPstream::masterNo()
PstreamGlobals::MPICommunicators_[communicator]
);
profilingPstream::addBroadcastTime();
}
template<class Type>
void Foam::PstreamDetail::reduce0
(
Type* values,
int count,
MPI_Datatype datatype,
MPI_Op optype,
const label communicator
)
{
if (!UPstream::parRun())
{
return;
}
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:";
if (count == 1)
{
Pout<< (*values);
}
else
{
Pout<< UList<Type>(values, count);
}
Pout<< " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm << endl;
error::printStack(Pout);
}
profilingPstream::beginTiming();
// const int retval =
MPI_Reduce
(
MPI_IN_PLACE,
values,
count,
datatype,
optype,
0, // (root process) is master == UPstream::masterNo()
PstreamGlobals::MPICommunicators_[communicator]
);
profilingPstream::addReduceTime();
}
template<class Type>
void Foam::PstreamDetail::allReduce
(
Type* values,
int count,
MPI_Datatype datatype,
MPI_Op optype,
const label communicator
)
{
if (!UPstream::parRun())
{
return;
}
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:";
if (count == 1)
{
Pout<< (*values);
}
else
{
Pout<< UList<Type>(values, count);
}
Pout<< " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm << endl;
error::printStack(Pout);
}
profilingPstream::beginTiming();
// const int retval =
MPI_Allreduce
(
MPI_IN_PLACE,
values,
count,
datatype,
optype,
PstreamGlobals::MPICommunicators_[communicator]
);
profilingPstream::addReduceTime();
}
template<class Type>
void Foam::PstreamDetail::iallReduce
(
Type* values,
int count,
MPI_Datatype datatype,
MPI_Op optype,
const label communicator,
label& requestID
)
{
if (!UPstream::parRun())
{
return;
}
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** non-blocking reducing:";
if (count == 1)
{
Pout<< (*values);
}
else
{
Pout<< UList<Type>(values, count);
}
Pout<< " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm << endl;
error::printStack(Pout);
}
profilingPstream::beginTiming();
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
MPI_Request request;
if
(
MPI_Iallreduce
(
MPI_IN_PLACE,
values,
count,
datatype,
optype,
PstreamGlobals::MPICommunicators_[communicator],
&request
)
)
{
FatalErrorInFunction
<< "MPI_Iallreduce failed for "
<< UList<Type>(values, count)
<< Foam::abort(FatalError);
}
if (PstreamGlobals::freedRequests_.size())
{
requestID = PstreamGlobals::freedRequests_.remove();
PstreamGlobals::outstandingRequests_[requestID] = request;
}
else
{
requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
}
if (UPstream::debug)
{
Pout<< "UPstream::allocateRequest for non-blocking reduce"
<< " : request:" << requestID << endl;
}
#else
// Non-blocking not yet implemented in mpi
if
(
MPI_Allreduce
(
MPI_IN_PLACE,
values,
count,
datatype,
optype,
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
FatalErrorInFunction
<< "MPI_Allreduce failed for "
<< UList<Type>(values, count)
<< Foam::abort(FatalError);
}
requestID = -1;
#endif
profilingPstream::addReduceTime();
}
// ************************************************************************* //