ENH: support independent handling of MPI requests (#2674)

- UPstream::Request wrapping class provides an opaque wrapper for
  vendor MPI_Request values, independent of global lists.

ENH: support for MPI barrier (blocking or non-blocking)
ENH: support for MPI sync-send variants

STYLE: deprecate waitRequests() without a position parameter

- in many cases this can indicate a problem in the program logic since
  normally the startOfRequests should be tracked locally.
This commit is contained in:
Mark Olesen 2023-01-10 12:14:31 +01:00
parent 2d4ecc4326
commit 568ced68e2
35 changed files with 939 additions and 223 deletions

View File

@ -50,9 +50,8 @@ using namespace Foam;
int main(int argc, char *argv[])
{
#include "setRootCase.H"
#include "createTime.H"
argList::noCheckProcessorDirectories();
argList args(argc, argv);
// Test PstreamBuffers
// ~~~~~~~~~~~~~~~~~~~
@ -83,13 +82,13 @@ int main(int argc, char *argv[])
if (Pstream::master())
{
// Collect my own data
allData.append(data);
allData.push_back(data);
for (const int proci : Pstream::subProcs())
{
Perr << "master receiving from " << proci << endl;
UIPstream fromProc(proci, pBufs);
allData.append(vector(fromProc));
allData.push_back(vector(fromProc));
}
}
@ -102,7 +101,7 @@ int main(int argc, char *argv[])
{
Perr << "master sending to " << proci << endl;
UOPstream toProc(proci, pBufs);
toSlave << allData;
toProc << allData;
}
}
@ -125,13 +124,27 @@ int main(int argc, char *argv[])
scalar data1 = 1.0;
label request1 = -1;
{
Foam::reduce(data1, sumOp<scalar>(), UPstream::msgType(), request1);
Foam::reduce
(
data1,
sumOp<scalar>(),
UPstream::msgType(),
UPstream::worldComm,
request1
);
}
scalar data2 = 0.1;
label request2 = -1;
UPstream::Request request2;
{
Foam::reduce(data2, sumOp<scalar>(), UPstream::msgType(), request2);
Foam::reduce
(
data2,
sumOp<scalar>(),
UPstream::msgType(),
UPstream::worldComm,
request2
);
}
@ -168,23 +181,23 @@ int main(int argc, char *argv[])
if (request1 != -1)
{
Pout<< "Waiting for non-blocking reduce with request " << request1
<< endl;
Pstream::waitRequest(request1);
Pout<< "Waiting for non-blocking reduce with request "
<< request1 << endl;
UPstream::waitRequest(request1);
}
Info<< "Reduced data1:" << data1 << endl;
if (request2 != -1)
if (request2.good())
{
Pout<< "Waiting for non-blocking reduce with request " << request1
<< endl;
Pstream::waitRequest(request2);
Pout<< "Waiting for non-blocking reduce with request "
<< Foam::name(request2.pointer()) << endl;
UPstream::waitRequest(request2);
}
Info<< "Reduced data2:" << data2 << endl;
// Clear any outstanding requests
Pstream::resetRequests(0);
// Clear all outstanding requests
UPstream::resetRequests(0);
Info<< "End\n" << endl;

View File

@ -33,7 +33,7 @@ License
Foam::UIPBstream::UIPBstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
@ -61,7 +61,7 @@ Foam::UIPBstream::UIPBstream
Foam::IPBstream::IPBstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
const label bufSize,
const int tag,

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2013 OpenFOAM Foundation
Copyright (C) 2021-2022 OpenCFD Ltd.
Copyright (C) 2021-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -69,7 +69,7 @@ public:
//- and optional buffer size, read format
IPstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
const label bufSize = 0,
const int tag = UPstream::msgType(),
@ -102,7 +102,7 @@ public:
//- Construct for broadcast root, optional buffer size, read format
IPBstream
(
const commsTypes commsType, //!< ignored
const UPstream::commsTypes, //!< ignored
const int rootProcNo, //!< normally UPstream::masterNo()
const label bufSize = 0,
const int tag = UPstream::msgType(), //!< ignored

View File

@ -33,7 +33,7 @@ License
Foam::UIPstream::UIPstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
@ -55,7 +55,7 @@ Foam::UIPstream::UIPstream
fmt
)
{
if (commsType == commsTypes::nonBlocking)
if (commsType == UPstream::commsTypes::nonBlocking)
{
// Message is already received into buffer
}
@ -70,7 +70,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
:
UIPstreamBase(fromProcNo, buffers)
{
if (commsType() == commsTypes::nonBlocking)
if (commsType() == UPstream::commsTypes::nonBlocking)
{
// Message is already received into buffer
messageSize_ = recvBuf_.size();
@ -93,7 +93,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
Foam::IPstream::IPstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
const label bufSize,
const int tag,

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -33,7 +33,7 @@ License
Foam::UOPBstream::UOPBstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const int tag,
@ -48,7 +48,7 @@ Foam::UOPBstream::UOPBstream
Foam::OPBstream::OPBstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int toProcNo,
const label bufSize,
const int tag,

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2013 OpenFOAM Foundation
Copyright (C) 2021-2022 OpenCFD Ltd.
Copyright (C) 2021-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -64,7 +64,7 @@ public:
//- and optional buffer size, write format
OPstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int toProcNo,
const label bufSize = 0,
const int tag = UPstream::msgType(),
@ -92,7 +92,7 @@ public:
//- Construct for broadcast root, optional buffer size, write format
OPBstream
(
const commsTypes commsType, //!< ignored
const UPstream::commsTypes, //!< ignored
const int rootProcNo, //!< normally UPstream::masterNo()
const label bufSize = 0,
const int tag = UPstream::msgType(), //!< ignored

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -33,7 +33,7 @@ License
Foam::UOPstream::UOPstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const int tag,
@ -54,7 +54,7 @@ Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers)
Foam::OPstream::OPstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int toProcNo,
const label bufSize,
const int tag,

View File

@ -102,13 +102,13 @@ protected:
public:
// Declare name of the class and its debug switch
//- Declare name of the class and its debug switch
ClassName("Pstream");
// Constructors
//- Construct for given commsTypes, with optional buffer size
//- Construct for given communication type, with optional buffer size
explicit Pstream
(
const UPstream::commsTypes commsType,

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2016-2022 OpenCFD Ltd.
Copyright (C) 2016-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -101,6 +101,21 @@ void reduce
NotImplemented;
}
//- Non-blocking reduce inplace (cf. MPI Iallreduce)
//- single value. Sets request.
template<class T, class BinaryOp>
void reduce
(
T& Value,
const BinaryOp&,
const int tag,
const label comm,
UPstream::Request& req
)
{
NotImplemented;
}
//- Non-blocking reduce inplace (cf. MPI Iallreduce)
//- single value. Sets request.
template<class T, class BinaryOp>
@ -117,8 +132,23 @@ void reduce
}
//- Non-blocking reduce inplace (cf. MPI Iallreduce)
//- of multiple values (same size on all processes!)
// Sets request.
//- of multiple values (same size on all processes!). Sets request.
template<class T, class BinaryOp>
void reduce
(
T values[],
const int size,
const BinaryOp&,
const int tag,
const label comm,
UPstream::Request& req
)
{
NotImplemented;
}
//- Non-blocking reduce inplace (cf. MPI Iallreduce)
//- of multiple values (same size on all processes!). Sets request.
template<class T, class BinaryOp>
void reduce
(
@ -272,6 +302,18 @@ Pstream_CommonReductions(Native); \
\
/*! \brief Non-blocking reduce (sum) multiple Native values. Sets request */ \
void reduce \
( \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, /*!< (ignored) */ \
const label comm, \
UPstream::Request& req /*!< [out] request information */ \
); \
\
/*! \brief Non-blocking reduce (sum) multiple Native values. Sets request */ \
/*! \deprecated prefer version with UPstream::Request */ \
void reduce \
( \
Native values[], \
const int size, \
@ -283,6 +325,17 @@ void reduce \
\
/*! \brief Non-blocking reduce (sum) single Native value. Sets request */ \
void reduce \
( \
Native& value, \
const sumOp<Native>&, \
const int tag, /*!< (ignored) */ \
const label comm, \
UPstream::Request& req /*!< [out] request information */ \
); \
\
/*! \brief Non-blocking reduce (sum) single Native value. Sets request */ \
/*! \deprecated prefer version with UPstream::Request */ \
void reduce \
( \
Native& value, \
const sumOp<Native>&, \

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2013 OpenFOAM Foundation
Copyright (C) 2017-2022 OpenCFD Ltd.
Copyright (C) 2017-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -107,7 +107,7 @@ protected:
//- and IO format
UIPstreamBase
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
@ -225,7 +225,7 @@ public:
//- and IO format
UIPstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
@ -255,13 +255,40 @@ public:
// \return the message size
static label read
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm
const label comm = UPstream::worldComm,
//! [out] request information (for non-blocking)
UPstream::Request* req = nullptr
);
//- Read buffer contents (non-blocking) from given processor
// \return the message size
inline static label read
(
//! [out] request information
UPstream::Request& req,
const int fromProcNo,
char* buf,
const std::streamsize bufSize,
const int tag,
const label communicator
)
{
return UIPstream::read
(
UPstream::commsTypes::nonBlocking,
fromProcNo,
buf,
bufSize,
tag,
communicator,
&req
);
}
};
@ -290,7 +317,7 @@ public:
//- and IO format
UIPBstream
(
const commsTypes commsType, //!< ignored
const UPstream::commsTypes, //!< ignored
const int rootProcNo, //!< normally UPstream::masterNo()
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
@ -317,11 +344,9 @@ public:
// \return the message size
static label read
(
const commsTypes commsTypes, //!< ignored
const int rootProcNo, //!< normally UPstream::masterNo()
const int rootProcNo, //!< normally UPstream::masterNo()
char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm
);
};

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2015 OpenFOAM Foundation
Copyright (C) 2017-2022 OpenCFD Ltd.
Copyright (C) 2017-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -146,7 +146,7 @@ inline Foam::Istream& Foam::UIPstreamBase::readString(std::string& str)
Foam::UIPstreamBase::UIPstreamBase
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2014 OpenFOAM Foundation
Copyright (C) 2017-2022 OpenCFD Ltd.
Copyright (C) 2017-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -110,7 +110,7 @@ protected:
//- and IO format
UOPstreamBase
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const int tag = UPstream::msgType(),
@ -298,7 +298,7 @@ public:
//- and IO format
UOPstream
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const int tag = UPstream::msgType(),
@ -332,8 +332,38 @@ public:
const char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm
const label comm = UPstream::worldComm,
//! [out] request information (for non-blocking)
UPstream::Request* req = nullptr,
const UPstream::sendModes sendMode = UPstream::sendModes::normal
);
//- Write buffer contents (non-blocking) to given processor
// \return True on success
inline static bool write
(
//! [out] request information
UPstream::Request& req,
const int toProcNo,
const char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const UPstream::sendModes sendMode = UPstream::sendModes::normal
)
{
return UOPstream::write
(
UPstream::commsTypes::nonBlocking,
toProcNo,
buf,
bufSize,
tag,
comm,
&req,
sendMode
);
}
};
@ -364,7 +394,7 @@ public:
//- and IO format
UOPBstream
(
const commsTypes commsType, //!< ignored
const UPstream::commsTypes, //!< ignored
const int toProcNo, //!< normally UPstream::masterNo()
DynamicList<char>& sendBuf,
const int tag = UPstream::msgType(), //!< ignored
@ -390,11 +420,9 @@ public:
// \return True on success
static bool write
(
const commsTypes commsType, //!< ignored
const int rootProcNo, //!< normally UPstream::masterNo()
const int rootProcNo, //!< normally UPstream::masterNo()
const char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm
);
};

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2016-2022 OpenCFD Ltd.
Copyright (C) 2016-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -117,7 +117,7 @@ inline void Foam::UOPstreamBase::putChar(const char c)
{
sendBuf_.setCapacity(1000);
}
sendBuf_.append(c);
sendBuf_.push_back(c);
}
@ -133,7 +133,7 @@ inline void Foam::UOPstreamBase::putString(const std::string& str)
Foam::UOPstreamBase::UOPstreamBase
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const int tag,

View File

@ -519,6 +519,8 @@ Foam::UPstream::commsTypes Foam::UPstream::defaultCommsType
)
);
//! \cond file_scope
namespace Foam
{
//- Registered reader for UPstream::defaultCommsType
@ -549,6 +551,7 @@ namespace Foam
addcommsTypeToOpt addcommsTypeToOpt_("commsType");
}
//! \endcond
int Foam::UPstream::nPollProcInterfaces
(

View File

@ -52,6 +52,10 @@ SourceFiles
namespace Foam
{
//- Implementation details for UPstream/Pstream/MPI etc.
namespace PstreamDetail {}
/*---------------------------------------------------------------------------*\
Class UPstream Declaration
\*---------------------------------------------------------------------------*/
@ -63,7 +67,7 @@ public:
//- Int ranges are used for MPI ranks (processes)
typedef IntRange<int> rangeType;
//- Types of communications
//- Communications types
enum class commsTypes : char
{
blocking, //!< "blocking" : (MPI_Bsend, MPI_Recv)
@ -74,9 +78,21 @@ public:
//- Enumerated names for the communication types
static const Enum<commsTypes> commsTypeNames;
//- Different MPI-send modes (ignored for commsTypes::blocking)
enum class sendModes : char
{
normal, //!< (MPI_Send, MPI_Isend)
sync //!< (MPI_Ssend, MPI_Issend)
};
// Public Classes
// Forward Declarations
//- Wrapper for MPI_Request
class Request;
//- Structure for communicating between processors
class commsStruct
{
@ -100,8 +116,8 @@ public:
// Constructors
//- Default construct. Above == -1
commsStruct() noexcept;
//- Default construct with above == -1
commsStruct() noexcept : above_(-1) {}
//- Construct from components
commsStruct
@ -166,7 +182,6 @@ public:
friend Ostream& operator<<(Ostream&, const commsStruct&);
};
//- combineReduce operator for lists. Used for counting.
struct listEq
{
@ -305,7 +320,7 @@ public:
// Constructors
//- Construct for given communication type
explicit UPstream(const commsTypes commsType)
explicit UPstream(const commsTypes commsType) noexcept
:
commsType_(commsType)
{}
@ -441,6 +456,13 @@ public:
// Fatal if MPI has already been finalized.
static bool initNull();
//- Impose a synchronisation barrier (optionally non-blocking)
static void barrier
(
const label communicator,
UPstream::Request* req = nullptr
);
// Non-blocking comms
@ -452,10 +474,14 @@ public:
// A no-op for out-of-range values.
static void resetRequests(const label n);
//- Wait until all requests (from start onwards) have finished.
//- Wait until all requests (from position onwards) have finished.
// A no-op if parRun() == false, if there are no pending requests
// or if the start is out-of-range (0 to nRequests)
static void waitRequests(const label start = 0);
static void waitRequests(const label pos);
//- Wait until all requests have finished.
// A no-op if parRun() == false or the list is empty
static void waitRequests(UList<UPstream::Request>& requests);
//- Wait until request i has finished.
// A no-op if parRun() == false,
@ -463,12 +489,21 @@ public:
// or if the index is out-of-range (0 to nRequests)
static void waitRequest(const label i);
//- Wait until specified request has finished.
// A no-op if parRun() == false or for a null-request
static void waitRequest(UPstream::Request& req);
//- Non-blocking comms: has request i finished?
// A no-op and returns true if parRun() == false,
// there are no pending requests,
// or if the index is out-of-range (0 to nRequests)
static bool finishedRequest(const label i);
//- Non-blocking comms: has request finished?
// A no-op and returns true if parRun() == false
// or for a null-request
static bool finishedRequest(UPstream::Request& req);
static int allocateTag(const char* const msg = nullptr);
static void freeTag(const int tag, const char* const msg = nullptr);
@ -789,6 +824,12 @@ public:
// Housekeeping
//- Wait for all requests to finish.
// \deprecated(2023-01) Probably not what you want.
// Should normally be restricted to a particular starting request.
FOAM_DEPRECATED_FOR(2023-01, "waitRequests(int) method")
static void waitRequests() { waitRequests(0); }
//- Process index of first sub-process
// \deprecated(2020-09) use subProcs() method instead
static constexpr int firstSlave() noexcept
@ -805,8 +846,114 @@ public:
};
/*---------------------------------------------------------------------------*\
Class UPstream::Request Declaration
\*---------------------------------------------------------------------------*/
//- An opaque wrapper for MPI_Request with a vendor-independent
//- representation independent of any \c <mpi.h> header
//
// The MPI standard states that MPI_Request is always an opaque object.
// Generally it is either an integer (eg, mpich) or a pointer (eg, openmpi).
class UPstream::Request
{
public:
// Public Types
//- Storage for MPI_Request (as integer or pointer)
typedef std::intptr_t value_type;
private:
// Private Data
//- The MPI_Request (as wrapped value)
value_type value_;
public:
// Generated Methods
//- Copy construct
Request(const Request&) noexcept = default;
//- Move construct
Request(Request&&) noexcept = default;
//- Copy assignment
Request& operator=(const Request&) noexcept = default;
//- Move assignment
Request& operator=(Request&&) noexcept = default;
// Member Operators
//- Test for equality
bool operator==(const Request& rhs) const noexcept
{
return (value_ == rhs.value_);
}
//- Test for inequality
bool operator!=(const Request& rhs) const noexcept
{
return (value_ != rhs.value_);
}
// Constructors
//- Default construct as MPI_REQUEST_NULL
Request() noexcept;
//- Construct from MPI_Request (as pointer type)
explicit Request(const void* p) noexcept
:
value_(reinterpret_cast<value_type>(p))
{}
//- Construct from MPI_Request (as integer type)
explicit Request(value_type val) noexcept
:
value_(val)
{}
// Member Functions
//- Return raw value
value_type value() const noexcept { return value_; }
//- Return as pointer value
const void* pointer() const noexcept
{
return reinterpret_cast<const void*>(value_);
}
//- True if not equal to MPI_REQUEST_NULL
bool good() const noexcept;
//- Reset to default constructed value (MPI_REQUEST_NULL)
void reset() noexcept;
//- Same as calling UPstream::waitRequest()
void wait() { UPstream::waitRequest(*this); }
//- Same as calling UPstream::finishedRequest()
bool finished() { return UPstream::finishedRequest(*this); }
};
// * * * * * * * * * * * * * * * IOstream Operators * * * * * * * * * * * * //
Ostream& operator<<(Ostream&, const UPstream::commsStruct&);
// * * * * * * * * * * * * Template Specialisations * * * * * * * * * * * * //
// Template specialisation for access of commsStruct
template<>
UPstream::commsStruct&

View File

@ -30,15 +30,6 @@ License
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::UPstream::commsStruct::commsStruct() noexcept
:
above_(-1),
below_(),
allBelow_(),
allNotBelow_()
{}
Foam::UPstream::commsStruct::commsStruct
(
const label above,

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -39,12 +39,10 @@ void Foam::UIPBstream::bufferIPCrecv()
Foam::label Foam::UIPBstream::read
(
const commsTypes commsType,
const int rootProcNo,
char* buf,
const std::streamsize bufSize,
const int tag,
const label communicator
const label comm
)
{
NotImplemented;

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2015 OpenFOAM Foundation
Copyright (C) 2021-2022 OpenCFD Ltd.
Copyright (C) 2021-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -40,12 +40,13 @@ void Foam::UIPstream::bufferIPCrecv()
Foam::label Foam::UIPstream::read
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize,
const int tag,
const label communicator
const label communicator,
UPstream::Request* req
)
{
NotImplemented;

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -40,12 +40,10 @@ bool Foam::UOPBstream::bufferIPCsend()
bool Foam::UOPBstream::write
(
const commsTypes commsType,
const int rootProcNo,
const char* buf,
const std::streamsize bufSize,
const int tag,
const label communicator
const label comm
)
{
NotImplemented;

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2015 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -46,7 +46,9 @@ bool Foam::UOPstream::write
const char* buf,
const std::streamsize bufSize,
const int tag,
const label communicator
const label communicator,
UPstream::Request* req,
const UPstream::sendModes sendMode
)
{
NotImplemented;

View File

@ -94,5 +94,8 @@ void Foam::UPstream::freeTag(const int tag, const char* const msg)
{}
void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
{}
// ************************************************************************* //

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -133,6 +133,18 @@ void Foam::reduce \
Pstream_CommonReductions(Native); \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, \
const label comm, \
UPstream::Request& req \
) \
{} \
\
/* Deprecated: prefer version with UPstream::Request */ \
void Foam::reduce \
( \
Native values[], \
const int size, \
@ -144,6 +156,17 @@ void Foam::reduce \
{} \
\
void Foam::reduce \
( \
Native& value, \
const sumOp<Native>&, \
const int tag, \
const label comm, \
UPstream::Request& req \
) \
{} \
\
/* Deprecated: prefer version with UPstream::Request */ \
void Foam::reduce \
( \
Native& value, \
const sumOp<Native>&, \

View File

@ -27,17 +27,40 @@ License
#include "UPstream.H"
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::UPstream::Request::Request() noexcept
:
UPstream::Request(nullptr)
{}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UPstream::Request::good() const noexcept
{
return false;
}
void Foam::UPstream::Request::reset() noexcept
{}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::UPstream::nRequests() noexcept { return 0; }
void Foam::UPstream::resetRequests(const label n) {}
void Foam::UPstream::waitRequests(const label start) {}
void Foam::UPstream::waitRequests(const label pos) {}
void Foam::UPstream::waitRequests(UList<UPstream::Request>&) {}
void Foam::UPstream::waitRequest(const label i) {}
void Foam::UPstream::waitRequest(UPstream::Request&) {}
bool Foam::UPstream::finishedRequest(const label i) { return true; }
bool Foam::UPstream::finishedRequest(UPstream::Request&) { return true; }
// ************************************************************************* //

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -113,11 +113,9 @@ void Foam::UIPBstream::bufferIPCrecv()
Foam::label Foam::UIPBstream::read
(
const commsTypes commsType,
const int rootProcNo,
char* buf,
const std::streamsize bufSize,
const int tag,
const label comm
)
{

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2019-2021 OpenCFD Ltd.
Copyright (C) 2019-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -31,8 +31,6 @@ License
#include "profilingPstream.H"
#include "IOstreams.H"
#include <mpi.h>
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::UIPstream::bufferIPCrecv()
@ -99,12 +97,13 @@ void Foam::UIPstream::bufferIPCrecv()
Foam::label Foam::UIPstream::read
(
const commsTypes commsType,
const UPstream::commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize,
const int tag,
const label communicator
const label communicator,
UPstream::Request* req
)
{
if (debug)
@ -130,8 +129,8 @@ Foam::label Foam::UIPstream::read
if
(
commsType == commsTypes::blocking
|| commsType == commsTypes::scheduled
commsType == UPstream::commsTypes::blocking
|| commsType == UPstream::commsTypes::scheduled
)
{
MPI_Status status;
@ -182,7 +181,7 @@ Foam::label Foam::UIPstream::read
return messageSize;
}
else if (commsType == commsTypes::nonBlocking)
else if (commsType == UPstream::commsTypes::nonBlocking)
{
MPI_Request request;
@ -214,11 +213,19 @@ Foam::label Foam::UIPstream::read
Pout<< "UIPstream::read : started read from:" << fromProcNo
<< " tag:" << tag << " read size:" << label(bufSize)
<< " commsType:" << UPstream::commsTypeNames[commsType]
<< " request:" << PstreamGlobals::outstandingRequests_.size()
<<
(req ? label(-1) : PstreamGlobals::outstandingRequests_.size())
<< Foam::endl;
}
PstreamGlobals::outstandingRequests_.push_back(request);
if (req)
{
*req = UPstream::Request(request);
}
else
{
PstreamGlobals::outstandingRequests_.push_back(request);
}
// Assume the message is completely received.
return bufSize;

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -28,8 +28,6 @@ License
#include "UOPstream.H"
#include "PstreamGlobals.H"
#include <mpi.h>
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
bool Foam::UOPBstream::bufferIPCsend()
@ -92,11 +90,9 @@ bool Foam::UOPBstream::bufferIPCsend()
bool Foam::UOPBstream::write
(
const commsTypes commsType, /* unused */
const int rootProcNo,
const char* buf,
const std::streamsize bufSize,
const int tag, /* unused */
const label comm
)
{

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2019-2022 OpenCFD Ltd.
Copyright (C) 2019-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -30,8 +30,6 @@ License
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include <mpi.h>
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
bool Foam::UOPstream::bufferIPCsend()
@ -57,7 +55,9 @@ bool Foam::UOPstream::write
const char* buf,
const std::streamsize bufSize,
const int tag,
const label communicator
const label communicator,
UPstream::Request* req,
const UPstream::sendModes sendMode
)
{
if (debug)
@ -87,7 +87,7 @@ bool Foam::UOPstream::write
profilingPstream::beginTiming();
if (commsType == commsTypes::blocking)
if (commsType == UPstream::commsTypes::blocking)
{
failed = MPI_Bsend
(
@ -110,17 +110,32 @@ bool Foam::UOPstream::write
<< Foam::endl;
}
}
else if (commsType == commsTypes::scheduled)
else if (commsType == UPstream::commsTypes::scheduled)
{
failed = MPI_Send
(
const_cast<char*>(buf),
bufSize,
MPI_BYTE,
toProcNo,
tag,
PstreamGlobals::MPICommunicators_[communicator]
);
if (UPstream::sendModes::sync == sendMode)
{
failed = MPI_Ssend
(
const_cast<char*>(buf),
bufSize,
MPI_BYTE,
toProcNo,
tag,
PstreamGlobals::MPICommunicators_[communicator]
);
}
else
{
failed = MPI_Send
(
const_cast<char*>(buf),
bufSize,
MPI_BYTE,
toProcNo,
tag,
PstreamGlobals::MPICommunicators_[communicator]
);
}
// Assume these are from scatters ...
profilingPstream::addScatterTime();
@ -133,20 +148,36 @@ bool Foam::UOPstream::write
<< Foam::endl;
}
}
else if (commsType == commsTypes::nonBlocking)
else if (commsType == UPstream::commsTypes::nonBlocking)
{
MPI_Request request;
failed = MPI_Isend
(
const_cast<char*>(buf),
bufSize,
MPI_BYTE,
toProcNo,
tag,
PstreamGlobals::MPICommunicators_[communicator],
&request
);
if (UPstream::sendModes::sync == sendMode)
{
failed = MPI_Issend
(
const_cast<char*>(buf),
bufSize,
MPI_BYTE,
toProcNo,
tag,
PstreamGlobals::MPICommunicators_[communicator],
&request
);
}
else
{
failed = MPI_Isend
(
const_cast<char*>(buf),
bufSize,
MPI_BYTE,
toProcNo,
tag,
PstreamGlobals::MPICommunicators_[communicator],
&request
);
}
profilingPstream::addWaitTime();
@ -155,11 +186,20 @@ bool Foam::UOPstream::write
Pout<< "UOPstream::write : started write to:" << toProcNo
<< " tag:" << tag << " size:" << label(bufSize)
<< " commType:" << UPstream::commsTypeNames[commsType]
<< " request:" << PstreamGlobals::outstandingRequests_.size()
<< " request:"
<<
(req ? label(-1) : PstreamGlobals::outstandingRequests_.size())
<< Foam::endl;
}
PstreamGlobals::outstandingRequests_.push_back(request);
if (req)
{
*req = UPstream::Request(request);
}
else
{
PstreamGlobals::outstandingRequests_.push_back(request);
}
}
else
{

View File

@ -693,4 +693,50 @@ void Foam::UPstream::freeTag(const int tag, const char* const msg)
}
void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
{
// No-op for non-parallel
if (!UPstream::parRun())
{
return;
}
if (req)
{
MPI_Request request;
if
(
MPI_Ibarrier
(
PstreamGlobals::MPICommunicators_[communicator],
&request
)
)
{
FatalErrorInFunction
<< "MPI_Ibarrier returned with error"
<< Foam::abort(FatalError);
}
*req = UPstream::Request(request);
}
else
{
if
(
MPI_Barrier
(
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
FatalErrorInFunction
<< "MPI_Barrier returned with error"
<< Foam::abort(FatalError);
}
}
}
// ************************************************************************* //

View File

@ -28,7 +28,6 @@ License
#include "Pstream.H"
#include "UPstreamWrapping.H"
#include <mpi.h>
#include <cinttypes>
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -29,8 +29,6 @@ License
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include <mpi.h>
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UPstream::broadcast

View File

@ -28,7 +28,6 @@ License
#include "Pstream.H"
#include "UPstreamWrapping.H"
#include <mpi.h>
#include <cinttypes>
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -29,7 +29,6 @@ License
#include "PstreamReduceOps.H"
#include "UPstreamWrapping.H"
#include <mpi.h>
#include <cinttypes>
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -177,6 +176,23 @@ void Foam::reduce \
Pstream_CommonReductions(Native, TaggedType); \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, /* (unused) */ \
const label comm, \
UPstream::Request& req \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
values, size, TaggedType, MPI_SUM, comm, &req, nullptr \
); \
} \
\
/* Deprecated: prefer version with UPstream::Request */ \
void Foam::reduce \
( \
Native values[], \
const int size, \
@ -188,11 +204,27 @@ void Foam::reduce \
{ \
PstreamDetail::allReduce<Native> \
( \
values, size, TaggedType, MPI_SUM, comm, &requestID \
values, size, TaggedType, MPI_SUM, comm, nullptr, &requestID \
); \
} \
\
void Foam::reduce \
( \
Native& value, \
const sumOp<Native>&, \
const int tag, /* (unused) */ \
const label comm, \
UPstream::Request& req \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
&value, 1, TaggedType, MPI_SUM, comm, &req, nullptr \
); \
} \
\
/* Deprecated: prefer version with UPstream::Request */ \
void Foam::reduce \
( \
Native& value, \
const sumOp<Native>&, \
@ -203,7 +235,7 @@ void Foam::reduce \
{ \
PstreamDetail::allReduce<Native> \
( \
&value, 1, TaggedType, MPI_SUM, comm, &requestID \
&value, 1, TaggedType, MPI_SUM, comm, nullptr, &requestID \
); \
} \
\

View File

@ -30,6 +30,28 @@ License
#include "PstreamGlobals.H"
#include "profilingPstream.H"
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::UPstream::Request::Request() noexcept
:
UPstream::Request(MPI_REQUEST_NULL)
{}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UPstream::Request::good() const noexcept
{
return MPI_REQUEST_NULL != PstreamDetail::Request::get(*this);
}
void Foam::UPstream::Request::reset() noexcept
{
*this = UPstream::Request(MPI_REQUEST_NULL);
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::UPstream::nRequests() noexcept
@ -47,26 +69,35 @@ void Foam::UPstream::resetRequests(const label n)
}
void Foam::UPstream::waitRequests(const label start)
void Foam::UPstream::waitRequests(const label pos)
{
// No-op for non-parallel, no pending requests or out-of-range
if
(
!UPstream::parRun()
|| start < 0
|| start >= PstreamGlobals::outstandingRequests_.size()
|| pos < 0
|| pos >= PstreamGlobals::outstandingRequests_.size()
/// || !len
)
{
return;
}
const label count = (PstreamGlobals::outstandingRequests_.size() - start);
auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + start);
label count = (PstreamGlobals::outstandingRequests_.size() - pos);
/// // Treat len < 0 like npos (ie, the rest of the list) but also
/// // apply range checking to avoid bad slices
/// if (len > 0 && len < count)
/// {
/// count = len;
/// }
auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
if (UPstream::debug)
{
Pout<< "UPstream::waitRequests : starting wait for "
<< count << " requests starting at " << start << endl;
<< count << " requests starting at " << pos << endl;
}
profilingPstream::beginTiming();
@ -81,8 +112,8 @@ void Foam::UPstream::waitRequests(const label start)
profilingPstream::addWaitTime();
// ie, resetRequests(start)
PstreamGlobals::outstandingRequests_.resize(start);
// ie, resetRequests(pos)
PstreamGlobals::outstandingRequests_.resize(pos);
if (UPstream::debug)
{
@ -91,6 +122,104 @@ void Foam::UPstream::waitRequests(const label start)
}
void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
{
// No-op for non-parallel or no pending requests
if (!UPstream::parRun() || requests.empty())
{
return;
}
// Looks ugly but is legitimate since UPstream::Request is an intptr_t,
// which is always large enough to hold an MPI_Request (int or pointer)
label count = 0;
auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
for (auto& req : requests)
{
if (req.good())
{
waitRequests[count] = PstreamDetail::Request::get(req);
++count;
}
}
if (!count)
{
return;
}
profilingPstream::beginTiming();
// On success: sets request to MPI_REQUEST_NULL
if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
{
FatalErrorInFunction
<< "MPI_Waitall returned with error"
<< Foam::abort(FatalError);
}
profilingPstream::addWaitTime();
// Everything handled, reset all to MPI_REQUEST_NULL
for (auto& req : requests)
{
req.reset();
}
}
// FUTURE?
//
/// void Foam::UPstream::waitRequests
/// (
/// UPstream::Request& req1,
/// UPstream::Request& req2
/// )
/// {
/// // No-op for non-parallel
/// if (!UPstream::parRun())
/// {
/// return;
/// }
///
/// int count = 0;
/// MPI_Request waitRequests[2];
///
/// waitRequests[count] = PstreamDetail::Request::get(req1);
/// if (MPI_REQUEST_NULL != waitRequests[count])
/// {
/// req1.reset();
/// ++count;
/// }
///
/// waitRequests[count] = PstreamDetail::Request::get(req2);
/// if (MPI_REQUEST_NULL != waitRequests[count])
/// {
/// req2.reset();
/// ++count;
/// }
///
/// if (!count)
/// {
/// return;
/// }
///
/// profilingPstream::beginTiming();
///
/// // On success: sets request to MPI_REQUEST_NULL
/// if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
/// {
/// FatalErrorInFunction
/// << "MPI_Waitall returned with error"
/// << Foam::abort(FatalError);
/// }
///
/// profilingPstream::addWaitTime();
/// }
void Foam::UPstream::waitRequest(const label i)
{
// No-op for non-parallel, or out-of-range (eg, placeholder indices)
@ -141,6 +270,37 @@ void Foam::UPstream::waitRequest(const label i)
}
void Foam::UPstream::waitRequest(UPstream::Request& req)
{
// No-op for non-parallel
if (!UPstream::parRun())
{
return;
}
MPI_Request request = PstreamDetail::Request::get(req);
// No-op for null request
if (MPI_REQUEST_NULL == request)
{
return;
}
profilingPstream::beginTiming();
if (MPI_Wait(&request, MPI_STATUS_IGNORE))
{
FatalErrorInFunction
<< "MPI_Wait returned with error"
<< Foam::abort(FatalError);
}
profilingPstream::addWaitTime();
req.reset(); // Handled, reset to MPI_REQUEST_NULL
}
bool Foam::UPstream::finishedRequest(const label i)
{
// No-op for non-parallel, or out-of-range (eg, placeholder indices)
@ -182,4 +342,33 @@ bool Foam::UPstream::finishedRequest(const label i)
}
bool Foam::UPstream::finishedRequest(UPstream::Request& req)
{
// No-op for non-parallel
if (!UPstream::parRun())
{
return true;
}
MPI_Request request = PstreamDetail::Request::get(req);
// No-op for null request
if (MPI_REQUEST_NULL == request)
{
return true;
}
int flag = 0;
MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
if (flag)
{
// Done - reset to MPI_REQUEST_NULL
req.reset();
}
return flag != 0;
}
// ************************************************************************* //

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2012-2016 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -24,12 +24,6 @@ License
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Namespace
Foam::PstreamDetail
Description
Some implementation details for Pstream and/or MPI.
InNamespace
Foam::PstreamDetail
@ -54,6 +48,27 @@ namespace Foam
namespace PstreamDetail
{
// Helper for casting to MPI_Request
struct Request
{
// To pointer
template<typename Type = MPI_Request>
static typename std::enable_if<std::is_pointer<Type>::value, Type>::type
get(const UPstream::Request& req) noexcept
{
return reinterpret_cast<Type>(req.value());
}
// To integer
template<typename Type = MPI_Request>
static typename std::enable_if<std::is_integral<Type>::value, Type>::type
get(const UPstream::Request& req) noexcept
{
return static_cast<Type>(req.value());
}
};
// MPI_Bcast, using root=0
template<class Type>
void broadcast0
@ -84,7 +99,8 @@ void allReduce
MPI_Datatype datatype,
MPI_Op optype,
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Iallreduce
UPstream::Request* req = nullptr, // Non-null for non-blocking
label* requestID = nullptr // (alternative to UPstream::Request)
);
@ -96,7 +112,8 @@ void allToAll
UList<Type>& recvData,
MPI_Datatype datatype,
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Ialltoall
UPstream::Request* req = nullptr, // Non-null for non-blocking
label* requestID = nullptr // (alternative to UPstream::Request)
);
@ -114,7 +131,8 @@ void allToAllv
MPI_Datatype datatype,
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Ialltoallv
UPstream::Request* req = nullptr, // Non-null for non-blocking
label* requestID = nullptr // (alternative to UPstream::Request)
);
@ -130,7 +148,8 @@ void gather
MPI_Datatype datatype, // The send/recv data type
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Igather
UPstream::Request* req = nullptr, // Non-null for non-blocking
label* requestID = nullptr // (alternative to UPstream::Request)
);
@ -146,7 +165,8 @@ void scatter
MPI_Datatype datatype, // The send/recv data type
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Iscatter
UPstream::Request* req = nullptr, // Non-null for non-blocking
label* requestID = nullptr // (alternative to UPstream::Request)
);
@ -163,7 +183,8 @@ void gatherv
MPI_Datatype datatype, // The send/recv data type
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Igatherv
UPstream::Request* req = nullptr, // Non-null for non-blocking
label* requestID = nullptr // (alternative to UPstream::Request)
);
@ -180,7 +201,8 @@ void scatterv
MPI_Datatype datatype, // The send/recv data type
const label comm, // Communicator
label* requestID = nullptr // Non-null for MPI_Igatherv
UPstream::Request* req = nullptr, // Non-null for non-blocking
label* requestID = nullptr // (alternative to UPstream::Request)
);

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2012-2015 OpenFOAM Foundation
Copyright (C) 2019-2022 OpenCFD Ltd.
Copyright (C) 2019-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -119,6 +119,8 @@ void Foam::PstreamDetail::allReduce
MPI_Datatype datatype,
MPI_Op optype,
const label comm,
UPstream::Request* req,
label* requestID
)
{
@ -127,9 +129,11 @@ void Foam::PstreamDetail::allReduce
return;
}
const bool immediate = (req || requestID);
if (UPstream::warnComm != -1 && comm != UPstream::warnComm)
{
if (requestID != nullptr)
if (immediate)
{
Pout<< "** MPI_Iallreduce (non-blocking):";
}
@ -155,10 +159,11 @@ void Foam::PstreamDetail::allReduce
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
if (requestID != nullptr)
if (immediate)
{
handled = true;
MPI_Request request;
if
(
MPI_Iallreduce
@ -179,16 +184,23 @@ void Foam::PstreamDetail::allReduce
<< Foam::abort(FatalError);
}
*requestID = PstreamGlobals::push_request(request);
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
}
#endif
if (!handled)
{
if (requestID != nullptr)
{
*requestID = -1;
}
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Allreduce
@ -220,14 +232,18 @@ void Foam::PstreamDetail::allToAll
UList<Type>& recvData,
MPI_Datatype datatype,
const label comm,
UPstream::Request* req,
label* requestID
)
{
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
if (UPstream::warnComm != -1 && comm != UPstream::warnComm)
{
if (requestID != nullptr)
if (immediate)
{
Pout<< "** MPI_Ialltoall (non-blocking):";
}
@ -262,11 +278,12 @@ void Foam::PstreamDetail::allToAll
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
if (requestID != nullptr)
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
if (immediate)
{
handled = true;
MPI_Request request;
if
(
MPI_Ialltoall
@ -290,17 +307,23 @@ void Foam::PstreamDetail::allToAll
<< Foam::abort(FatalError);
}
*requestID = PstreamGlobals::push_request(request);
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
}
#endif
if (!handled)
{
if (requestID != nullptr)
{
*requestID = -1;
}
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Alltoall
@ -341,14 +364,18 @@ void Foam::PstreamDetail::allToAllv
MPI_Datatype datatype,
const label comm,
UPstream::Request* req,
label* requestID
)
{
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
if (UPstream::warnComm != -1 && comm != UPstream::warnComm)
{
if (requestID != nullptr)
if (immediate)
{
Pout<< "** MPI_Ialltoallv (non-blocking):";
}
@ -402,7 +429,7 @@ void Foam::PstreamDetail::allToAllv
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
if (requestID != nullptr)
if (immediate)
{
handled = true;
MPI_Request request;
@ -431,16 +458,23 @@ void Foam::PstreamDetail::allToAllv
<< Foam::abort(FatalError);
}
*requestID = PstreamGlobals::push_request(request);
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
}
#endif
if (!handled)
{
if (requestID != nullptr)
{
*requestID = -1;
}
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Alltoallv
@ -480,6 +514,8 @@ void Foam::PstreamDetail::gather
MPI_Datatype datatype,
const label comm,
UPstream::Request* req,
label* requestID
)
{
@ -489,11 +525,13 @@ void Foam::PstreamDetail::gather
return;
}
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
if (UPstream::warnComm != -1 && comm != UPstream::warnComm)
{
if (requestID != nullptr)
if (immediate)
{
Pout<< "** MPI_Igather (non-blocking):";
}
@ -514,10 +552,11 @@ void Foam::PstreamDetail::gather
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
if (requestID != nullptr)
if (immediate)
{
handled = true;
MPI_Request request;
if
(
MPI_Igather
@ -541,16 +580,23 @@ void Foam::PstreamDetail::gather
<< Foam::abort(FatalError);
}
*requestID = PstreamGlobals::push_request(request);
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
}
#endif
if (!handled)
{
if (requestID != nullptr)
{
*requestID = -1;
}
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Gather
@ -589,6 +635,8 @@ void Foam::PstreamDetail::scatter
MPI_Datatype datatype,
const label comm,
UPstream::Request* req,
label* requestID
)
{
@ -598,11 +646,13 @@ void Foam::PstreamDetail::scatter
return;
}
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
if (UPstream::warnComm != -1 && comm != UPstream::warnComm)
{
if (requestID != nullptr)
if (immediate)
{
Pout<< "** MPI_Iscatter (non-blocking):";
}
@ -623,10 +673,11 @@ void Foam::PstreamDetail::scatter
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
if (requestID != nullptr)
if (immediate)
{
handled = true;
MPI_Request request;
if
(
MPI_Iscatter
@ -650,16 +701,23 @@ void Foam::PstreamDetail::scatter
<< Foam::abort(FatalError);
}
*requestID = PstreamGlobals::push_request(request);
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
}
#endif
if (!handled)
{
if (requestID != nullptr)
{
*requestID = -1;
}
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Scatter
@ -676,7 +734,7 @@ void Foam::PstreamDetail::scatter
)
{
FatalErrorInFunction
<< "MPI_Iscatter [comm: " << comm << "] failed."
<< "MPI_Scatter [comm: " << comm << "] failed."
<< " sendCount " << sendCount
<< " recvCount " << recvCount
<< Foam::abort(FatalError);
@ -699,6 +757,8 @@ void Foam::PstreamDetail::gatherv
MPI_Datatype datatype,
const label comm,
UPstream::Request* req,
label* requestID
)
{
@ -709,11 +769,13 @@ void Foam::PstreamDetail::gatherv
return;
}
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
if (UPstream::warnComm != -1 && comm != UPstream::warnComm)
{
if (requestID != nullptr)
if (immediate)
{
Pout<< "** MPI_Igatherv (non-blocking):";
}
@ -757,10 +819,11 @@ void Foam::PstreamDetail::gatherv
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
if (requestID != nullptr)
if (immediate)
{
handled = true;
MPI_Request request;
if
(
MPI_Igatherv
@ -785,16 +848,23 @@ void Foam::PstreamDetail::gatherv
<< Foam::abort(FatalError);
}
*requestID = PstreamGlobals::push_request(request);
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
}
#endif
if (!handled)
{
if (requestID != nullptr)
{
*requestID = -1;
}
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Gatherv
@ -835,6 +905,8 @@ void Foam::PstreamDetail::scatterv
MPI_Datatype datatype,
const label comm,
UPstream::Request* req,
label* requestID
)
{
@ -844,11 +916,13 @@ void Foam::PstreamDetail::scatterv
return;
}
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
if (UPstream::warnComm != -1 && comm != UPstream::warnComm)
{
if (requestID != nullptr)
if (immediate)
{
Pout<< "** MPI_Iscatterv (non-blocking):";
}
@ -886,10 +960,11 @@ void Foam::PstreamDetail::scatterv
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
if (requestID != nullptr)
if (immediate)
{
handled = true;
MPI_Request request;
if
(
MPI_Iscatterv
@ -914,16 +989,23 @@ void Foam::PstreamDetail::scatterv
<< Foam::abort(FatalError);
}
*requestID = PstreamGlobals::push_request(request);
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
}
#endif
if (!handled)
{
if (requestID != nullptr)
{
*requestID = -1;
}
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Scatterv