ENH: cleanup broadcast streams
- remove unused/unusable broadcast stream constructors/methods - provide OPBstream::sends() and IPBstream::recvs() methods, refactored from Pstream::broadcasts. These will always use serializations, even for contiguous content. - additional methods to support special handling of zero-sized lists. For example, if (UPstream::master(comm)) { if (list.empty()) OPBstream::send(Foam::zero, comm); else OPBstream::send(list, comm); } else { IPBstream is(comm); if (is.remaining()) { is >> list; } else { list.clear(); } } This avoids serialization of an empty list and the resulting double broadcast (size + content), using instead a single broadcast (size). STYLE: more consistency in communicator types (int vs label)
This commit is contained in:
parent
be30598e3d
commit
d4b5280742
@ -5,7 +5,7 @@
|
||||
\\ / A nd | www.openfoam.com
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2022-2024 OpenCFD Ltd.
|
||||
Copyright (C) 2022-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -33,24 +33,21 @@ License
|
||||
|
||||
Foam::UIPBstream::UIPBstream
|
||||
(
|
||||
const UPstream::commsTypes commsType,
|
||||
const int rootProcNo,
|
||||
DynamicList<char>& receiveBuf,
|
||||
label& receiveBufPosition,
|
||||
const int tag,
|
||||
const label comm,
|
||||
const int communicator,
|
||||
const bool clearAtEnd,
|
||||
IOstreamOption::streamFormat fmt
|
||||
)
|
||||
:
|
||||
UIPstreamBase
|
||||
(
|
||||
commsType, // irrelevant
|
||||
rootProcNo, // normally UPstream::masterNo()
|
||||
UPstream::commsTypes::scheduled, // irrelevant
|
||||
UPstream::masterNo(), // irrelevant
|
||||
receiveBuf,
|
||||
receiveBufPosition,
|
||||
tag, // irrelevant
|
||||
comm,
|
||||
UPstream::msgType(), // irrelevant
|
||||
communicator,
|
||||
clearAtEnd,
|
||||
fmt
|
||||
)
|
||||
@ -61,64 +58,20 @@ Foam::UIPBstream::UIPBstream
|
||||
|
||||
Foam::IPBstream::IPBstream
|
||||
(
|
||||
const UPstream::commsTypes commsType,
|
||||
const int rootProcNo,
|
||||
const label bufSize,
|
||||
const int tag,
|
||||
const label comm,
|
||||
const int communicator,
|
||||
IOstreamOption::streamFormat fmt
|
||||
)
|
||||
:
|
||||
Pstream(commsType, bufSize),
|
||||
Pstream(UPstream::commsTypes::scheduled), // type is irrelevant
|
||||
UIPBstream
|
||||
(
|
||||
commsType, // irrelevant
|
||||
rootProcNo, // normally UPstream::masterNo()
|
||||
Pstream::transferBuf_,
|
||||
UIPstreamBase::storedRecvBufPos_, // Internal only
|
||||
tag, // irrelevant
|
||||
comm,
|
||||
communicator,
|
||||
false, // Do not clear Pstream::transferBuf_ if at end
|
||||
fmt
|
||||
)
|
||||
{}
|
||||
|
||||
|
||||
Foam::IPBstream::IPBstream
|
||||
(
|
||||
const int rootProcNo,
|
||||
const label comm,
|
||||
IOstreamOption::streamFormat fmt
|
||||
)
|
||||
:
|
||||
IPBstream
|
||||
(
|
||||
UPstream::commsTypes::scheduled, // irrelevant
|
||||
rootProcNo,
|
||||
label(0), // bufSize
|
||||
UPstream::msgType(), // irrelevant
|
||||
comm,
|
||||
fmt
|
||||
)
|
||||
{}
|
||||
|
||||
|
||||
Foam::IPBstream::IPBstream
|
||||
(
|
||||
const label comm,
|
||||
IOstreamOption::streamFormat fmt
|
||||
)
|
||||
:
|
||||
IPBstream
|
||||
(
|
||||
UPstream::commsTypes::scheduled, // irrelevant
|
||||
UPstream::masterNo(), // rootProcNo
|
||||
label(0), // bufSize
|
||||
UPstream::msgType(), // irrelevant
|
||||
comm,
|
||||
fmt
|
||||
)
|
||||
{}
|
||||
|
||||
|
||||
// ************************************************************************* //
|
||||
|
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2011-2013 OpenFOAM Foundation
|
||||
Copyright (C) 2021-2024 OpenCFD Ltd.
|
||||
Copyright (C) 2021-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -116,31 +116,11 @@ public:
|
||||
|
||||
// Constructors
|
||||
|
||||
//- Construct for broadcast root, optional buffer size, read format
|
||||
IPBstream
|
||||
(
|
||||
const UPstream::commsTypes, //!< ignored
|
||||
const int rootProcNo, //!< normally UPstream::masterNo()
|
||||
const label bufSize = 0,
|
||||
const int tag = UPstream::msgType(), //!< ignored
|
||||
const label comm = UPstream::worldComm,
|
||||
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
|
||||
);
|
||||
|
||||
//- Construct for broadcast root and communicator,
|
||||
//- with optional read format
|
||||
IPBstream
|
||||
(
|
||||
const int rootProcNo, //!< normally UPstream::masterNo()
|
||||
const label comm,
|
||||
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
|
||||
);
|
||||
|
||||
//- Construct with optional communicator and read format.
|
||||
//- Uses UPstream::masterNo() root
|
||||
explicit IPBstream
|
||||
(
|
||||
const label comm = UPstream::worldComm,
|
||||
const int communicator = UPstream::worldComm,
|
||||
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
|
||||
);
|
||||
|
||||
@ -154,12 +134,33 @@ public:
|
||||
static void recv
|
||||
(
|
||||
Type& value,
|
||||
const label comm = UPstream::worldComm,
|
||||
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
|
||||
const int communicator = UPstream::worldComm
|
||||
)
|
||||
{
|
||||
IPBstream is(comm, fmt);
|
||||
is >> value;
|
||||
IPBstream is(communicator);
|
||||
{
|
||||
is >> value;
|
||||
}
|
||||
}
|
||||
|
||||
//- Receive (from broadcast) a buffer and deserialize
|
||||
//- multiple items.
|
||||
//- Uses \c operator>> for de-serialization
|
||||
template<class Type, class... Args>
|
||||
static void recvs
|
||||
(
|
||||
const int communicator,
|
||||
Type& value,
|
||||
Args&&... values
|
||||
)
|
||||
{
|
||||
IPBstream is(communicator);
|
||||
{
|
||||
Detail::inputLoop(is, value, std::forward<Args>(values)...);
|
||||
// Depending on compiler support:
|
||||
// Unpack via fold expression
|
||||
// (((is >> value) >> std::forward<Args>(values)), ...);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
\\ / A nd | www.openfoam.com
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2022-2024 OpenCFD Ltd.
|
||||
Copyright (C) 2022-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -33,22 +33,19 @@ License
|
||||
|
||||
Foam::UOPBstream::UOPBstream
|
||||
(
|
||||
const UPstream::commsTypes commsType,
|
||||
const int rootProcNo,
|
||||
DynamicList<char>& sendBuf,
|
||||
const int tag,
|
||||
const label comm,
|
||||
const int communicator,
|
||||
const bool sendAtDestruct,
|
||||
IOstreamOption::streamFormat fmt
|
||||
)
|
||||
:
|
||||
UOPstreamBase
|
||||
(
|
||||
commsType, // irrelevant
|
||||
rootProcNo, // normally UPstream::masterNo()
|
||||
UPstream::commsTypes::scheduled, // irrelevant
|
||||
UPstream::masterNo(), // irrelevant
|
||||
sendBuf,
|
||||
tag, // irrelevant
|
||||
comm,
|
||||
UPstream::msgType(), // irrelevant
|
||||
communicator,
|
||||
sendAtDestruct,
|
||||
fmt
|
||||
)
|
||||
@ -57,65 +54,21 @@ Foam::UOPBstream::UOPBstream
|
||||
|
||||
Foam::OPBstream::OPBstream
|
||||
(
|
||||
const UPstream::commsTypes commsType,
|
||||
const int rootProcNo,
|
||||
const label bufSize,
|
||||
const int tag,
|
||||
const label comm,
|
||||
const int communicator,
|
||||
IOstreamOption::streamFormat fmt
|
||||
)
|
||||
:
|
||||
Pstream(commsType, bufSize),
|
||||
Pstream(UPstream::commsTypes::scheduled), // type is irrelevant
|
||||
UOPBstream
|
||||
(
|
||||
commsType, // irrelevant
|
||||
rootProcNo, // normally UPstream::masterNo()
|
||||
Pstream::transferBuf_,
|
||||
tag, // irrelevant
|
||||
comm,
|
||||
communicator,
|
||||
true, // sendAtDestruct
|
||||
fmt
|
||||
)
|
||||
{}
|
||||
|
||||
|
||||
Foam::OPBstream::OPBstream
|
||||
(
|
||||
const int rootProcNo,
|
||||
const label comm,
|
||||
IOstreamOption::streamFormat fmt
|
||||
)
|
||||
:
|
||||
OPBstream
|
||||
(
|
||||
UPstream::commsTypes::scheduled, // irrelevant
|
||||
rootProcNo,
|
||||
label(0), // bufSize
|
||||
UPstream::msgType(), // irrelevant
|
||||
comm,
|
||||
fmt
|
||||
)
|
||||
{}
|
||||
|
||||
|
||||
Foam::OPBstream::OPBstream
|
||||
(
|
||||
const label comm,
|
||||
IOstreamOption::streamFormat fmt
|
||||
)
|
||||
:
|
||||
OPBstream
|
||||
(
|
||||
UPstream::commsTypes::scheduled, // irrelevant
|
||||
UPstream::masterNo(), // rootProcNo
|
||||
label(0), // bufSize
|
||||
UPstream::msgType(), // irrelevant
|
||||
comm,
|
||||
fmt
|
||||
)
|
||||
{}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
|
||||
|
||||
Foam::UOPBstream::~UOPBstream()
|
||||
@ -125,8 +78,7 @@ Foam::UOPBstream::~UOPBstream()
|
||||
if (!bufferIPCsend())
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "Failed broadcast message of size "
|
||||
<< sendBuf_.size() << " root: " << toProcNo_
|
||||
<< "Failed broadcast message of size " << sendBuf_.size()
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2011-2013 OpenFOAM Foundation
|
||||
Copyright (C) 2021-2024 OpenCFD Ltd.
|
||||
Copyright (C) 2021-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -133,49 +133,51 @@ public:
|
||||
|
||||
// Constructors
|
||||
|
||||
//- Construct for broadcast root, optional buffer size, write format
|
||||
OPBstream
|
||||
(
|
||||
const UPstream::commsTypes, //!< ignored
|
||||
const int rootProcNo, //!< normally UPstream::masterNo()
|
||||
const label bufSize = 0,
|
||||
const int tag = UPstream::msgType(), //!< ignored
|
||||
const label comm = UPstream::worldComm,
|
||||
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
|
||||
);
|
||||
|
||||
//- Construct for broadcast root and communicator,
|
||||
//- with optional write format
|
||||
OPBstream
|
||||
(
|
||||
const int rootProcNo, //!< normally UPstream::masterNo()
|
||||
const label comm,
|
||||
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
|
||||
);
|
||||
|
||||
//- Construct with optional communicator and write format.
|
||||
//- Uses UPstream::masterNo() root
|
||||
explicit OPBstream
|
||||
(
|
||||
const label comm = UPstream::worldComm,
|
||||
const int communicator = UPstream::worldComm,
|
||||
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
|
||||
);
|
||||
|
||||
|
||||
// Static Functions
|
||||
|
||||
//- Use all send methods from base
|
||||
using UOPBstream::send;
|
||||
|
||||
//- Serialize a value and broadcast (root == UPstream::masterNo()).
|
||||
//- Uses \c operator<< for serialization
|
||||
template<class Type>
|
||||
static void send
|
||||
(
|
||||
const Type& value,
|
||||
const label comm = UPstream::worldComm,
|
||||
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
|
||||
const int communicator = UPstream::worldComm
|
||||
)
|
||||
{
|
||||
OPBstream os(comm, fmt);
|
||||
os << value;
|
||||
OPBstream os(communicator);
|
||||
{
|
||||
os << value;
|
||||
}
|
||||
}
|
||||
|
||||
//- Serialize multiple items and broadcast the buffer
|
||||
//- Uses \c operator<< for serialization
|
||||
template<class Type, class... Args>
|
||||
static void sends
|
||||
(
|
||||
const int communicator,
|
||||
Type& value,
|
||||
Args&&... values
|
||||
)
|
||||
{
|
||||
OPBstream os(communicator);
|
||||
{
|
||||
Detail::outputLoop(os, value, std::forward<Args>(values)...);
|
||||
// Depending on compiler support:
|
||||
// Pack via fold expression
|
||||
// (((os << value) << std::forward<Args>(values)), ...);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -104,13 +104,18 @@ public:
|
||||
static void broadcast
|
||||
(
|
||||
Type& value,
|
||||
const label comm = UPstream::worldComm
|
||||
const int communicator = UPstream::worldComm
|
||||
);
|
||||
|
||||
//- Broadcast multiple items to all communicator ranks.
|
||||
//- Does nothing in \b non-parallel.
|
||||
template<class Type, class... Args>
|
||||
static void broadcasts(const label comm, Type& arg1, Args&&... args);
|
||||
static void broadcasts
|
||||
(
|
||||
const int communicator,
|
||||
Type& value,
|
||||
Args&&... values
|
||||
);
|
||||
|
||||
//- Broadcast list content (contiguous or non-contiguous) to all
|
||||
//- communicator ranks. Does nothing in \b non-parallel.
|
||||
@ -120,7 +125,7 @@ public:
|
||||
static void broadcastList
|
||||
(
|
||||
ListType& list,
|
||||
const label comm = UPstream::worldComm
|
||||
const int communicator = UPstream::worldComm
|
||||
);
|
||||
|
||||
|
||||
|
@ -25,109 +25,136 @@ License
|
||||
|
||||
\*---------------------------------------------------------------------------*/
|
||||
|
||||
#include "OPstream.H"
|
||||
#include "IPstream.H"
|
||||
#include "contiguous.H"
|
||||
#include "OPstream.H"
|
||||
|
||||
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
|
||||
|
||||
template<class Type>
|
||||
void Foam::Pstream::broadcast(Type& value, const label comm)
|
||||
void Foam::Pstream::broadcast
|
||||
(
|
||||
Type& value,
|
||||
const int communicator
|
||||
)
|
||||
{
|
||||
if constexpr (is_contiguous_v<Type>)
|
||||
if (!UPstream::is_parallel(communicator))
|
||||
{
|
||||
return;
|
||||
}
|
||||
else if constexpr (is_contiguous_v<Type>)
|
||||
{
|
||||
// Note: contains parallel guard internally
|
||||
UPstream::broadcast
|
||||
(
|
||||
reinterpret_cast<char*>(&value),
|
||||
sizeof(Type),
|
||||
comm
|
||||
communicator
|
||||
);
|
||||
}
|
||||
else if (UPstream::is_parallel(comm))
|
||||
else
|
||||
{
|
||||
if (UPstream::master(comm))
|
||||
if (UPstream::master(communicator))
|
||||
{
|
||||
OPBstream os(comm);
|
||||
os << value;
|
||||
OPBstream::send(value, communicator);
|
||||
}
|
||||
else // UPstream::is_subrank(comm)
|
||||
else
|
||||
{
|
||||
IPBstream is(comm);
|
||||
is >> value;
|
||||
IPBstream::recv(value, communicator);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template<class Type, class... Args>
|
||||
void Foam::Pstream::broadcasts(const label comm, Type& arg1, Args&&... args)
|
||||
void Foam::Pstream::broadcasts
|
||||
(
|
||||
const int communicator,
|
||||
Type& value,
|
||||
Args&&... values
|
||||
)
|
||||
{
|
||||
if (UPstream::is_parallel(comm))
|
||||
if (!UPstream::is_parallel(communicator))
|
||||
{
|
||||
if (UPstream::master(comm))
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (UPstream::master(communicator))
|
||||
{
|
||||
OPBstream os(comm);
|
||||
Detail::outputLoop(os, arg1, std::forward<Args>(args)...);
|
||||
OPBstream::sends
|
||||
(
|
||||
communicator,
|
||||
value,
|
||||
std::forward<Args>(values)...
|
||||
);
|
||||
}
|
||||
else // UPstream::is_subrank(comm)
|
||||
else
|
||||
{
|
||||
IPBstream is(comm);
|
||||
Detail::inputLoop(is, arg1, std::forward<Args>(args)...);
|
||||
IPBstream::recvs
|
||||
(
|
||||
communicator,
|
||||
value,
|
||||
std::forward<Args>(values)...
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template<class ListType>
|
||||
void Foam::Pstream::broadcastList(ListType& list, const label comm)
|
||||
void Foam::Pstream::broadcastList
|
||||
(
|
||||
ListType& list,
|
||||
const int communicator
|
||||
)
|
||||
{
|
||||
if constexpr (is_contiguous_v<typename ListType::value_type>)
|
||||
if (!UPstream::is_parallel(communicator))
|
||||
{
|
||||
return;
|
||||
}
|
||||
else if constexpr (is_contiguous_v<typename ListType::value_type>)
|
||||
{
|
||||
// List data are contiguous
|
||||
// 1. broadcast the size
|
||||
// 2. resize for receiver list
|
||||
// 3. broadcast contiguous contents
|
||||
|
||||
if (UPstream::is_parallel(comm))
|
||||
{
|
||||
label len(list.size());
|
||||
label len(list.size());
|
||||
|
||||
UPstream::broadcast
|
||||
(
|
||||
reinterpret_cast<char*>(&len),
|
||||
sizeof(label),
|
||||
communicator
|
||||
);
|
||||
|
||||
if (UPstream::is_subrank(communicator))
|
||||
{
|
||||
list.resize_nocopy(len);
|
||||
}
|
||||
|
||||
if (len)
|
||||
{
|
||||
UPstream::broadcast
|
||||
(
|
||||
reinterpret_cast<char*>(&len),
|
||||
sizeof(label),
|
||||
comm
|
||||
list.data_bytes(),
|
||||
list.size_bytes(),
|
||||
communicator
|
||||
);
|
||||
|
||||
if (UPstream::is_subrank(comm))
|
||||
{
|
||||
list.resize_nocopy(len);
|
||||
}
|
||||
|
||||
if (len)
|
||||
{
|
||||
UPstream::broadcast
|
||||
(
|
||||
list.data_bytes(),
|
||||
list.size_bytes(),
|
||||
comm
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (UPstream::is_parallel(comm))
|
||||
else
|
||||
{
|
||||
// List data are non-contiguous - serialize/de-serialize
|
||||
|
||||
if (UPstream::master(comm))
|
||||
if (UPstream::master(communicator))
|
||||
{
|
||||
OPBstream os(comm);
|
||||
OPBstream os(communicator);
|
||||
os << list;
|
||||
}
|
||||
else // UPstream::is_subrank(comm)
|
||||
else
|
||||
{
|
||||
IPBstream is(comm);
|
||||
IPBstream is(communicator);
|
||||
is >> list;
|
||||
}
|
||||
}
|
||||
@ -148,11 +175,11 @@ template<class Type>
|
||||
Type returnBroadcast
|
||||
(
|
||||
const Type& value,
|
||||
const label comm = UPstream::worldComm
|
||||
const int communicator = UPstream::worldComm
|
||||
)
|
||||
{
|
||||
Type work(value);
|
||||
Pstream::broadcast(work, comm);
|
||||
Pstream::broadcast(work, communicator);
|
||||
return work;
|
||||
}
|
||||
|
||||
|
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2011-2013 OpenFOAM Foundation
|
||||
Copyright (C) 2017-2024 OpenCFD Ltd.
|
||||
Copyright (C) 2017-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -165,6 +165,9 @@ public:
|
||||
return std::ios_base::fmtflags(0);
|
||||
}
|
||||
|
||||
//- The number of characters remaining in the get buffer
|
||||
label remaining() const noexcept;
|
||||
|
||||
|
||||
// Read Functions
|
||||
|
||||
@ -447,17 +450,13 @@ public:
|
||||
|
||||
// Constructors
|
||||
|
||||
//- Construct given process index to read from using the given
|
||||
//- attached receive buffer, optional communication characteristics
|
||||
//- and IO format
|
||||
//- Construct using the given attached receive buffer,
|
||||
// optional communication characteristics and IO format
|
||||
UIPBstream
|
||||
(
|
||||
const UPstream::commsTypes, //!< irrelevant
|
||||
const int rootProcNo, //!< normally UPstream::masterNo()
|
||||
DynamicList<char>& receiveBuf,
|
||||
label& receiveBufPosition,
|
||||
const int tag = UPstream::msgType(), //!< irrelevant
|
||||
const label comm = UPstream::worldComm,
|
||||
const int communicator = UPstream::worldComm,
|
||||
const bool clearAtEnd = false, //!< destroy receiveBuf if at end
|
||||
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
|
||||
);
|
||||
@ -465,25 +464,6 @@ public:
|
||||
|
||||
//- Destructor
|
||||
virtual ~UIPBstream() = default;
|
||||
|
||||
|
||||
// Member Functions
|
||||
|
||||
//- Use all read methods from base
|
||||
using UIPstreamBase::read;
|
||||
|
||||
|
||||
// Static Functions
|
||||
|
||||
//- Wrapped version of UPstream::broadcast
|
||||
// \return the message size (bytes read). May change in the future
|
||||
static std::streamsize read
|
||||
(
|
||||
const int rootProcNo, //!< normally UPstream::masterNo()
|
||||
char* buf,
|
||||
const std::streamsize bufSize,
|
||||
const label comm = UPstream::worldComm
|
||||
);
|
||||
};
|
||||
|
||||
|
||||
|
@ -548,6 +548,18 @@ bool Foam::UIPstreamBase::beginRawRead()
|
||||
/// return recvBufPos_;
|
||||
/// }
|
||||
|
||||
Foam::label Foam::UIPstreamBase::remaining() const noexcept
|
||||
{
|
||||
if (messageSize_ && (recvBufPos_ < recvBuf_.size()))
|
||||
{
|
||||
return (recvBuf_.size() - recvBufPos_);
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Foam::UIPstreamBase::rewind()
|
||||
{
|
||||
|
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2011-2014 OpenFOAM Foundation
|
||||
Copyright (C) 2017-2024 OpenCFD Ltd.
|
||||
Copyright (C) 2017-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -481,16 +481,12 @@ public:
|
||||
|
||||
// Constructors
|
||||
|
||||
//- Construct given process index to write to using the given
|
||||
//- attached send buffer, optional communication characteristics
|
||||
//- and IO format
|
||||
UOPBstream
|
||||
//- Construct with attached send buffer,
|
||||
//- optional communication characteristics and IO format
|
||||
explicit UOPBstream
|
||||
(
|
||||
const UPstream::commsTypes, //!< irrelevant
|
||||
const int toProcNo, //!< normally UPstream::masterNo()
|
||||
DynamicList<char>& sendBuf,
|
||||
const int tag = UPstream::msgType(), //!< irrelevant
|
||||
const label comm = UPstream::worldComm,
|
||||
const int communicator = UPstream::worldComm,
|
||||
const bool sendAtDestruct = true,
|
||||
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
|
||||
);
|
||||
@ -502,21 +498,9 @@ public:
|
||||
|
||||
// Member Functions
|
||||
|
||||
//- Use all write methods from base
|
||||
using UOPstreamBase::write;
|
||||
|
||||
|
||||
// Static Functions
|
||||
|
||||
//- Wrapped version of UPstream::broadcast with const-cast
|
||||
// \return True on success
|
||||
static bool write
|
||||
(
|
||||
const int rootProcNo, //!< normally UPstream::masterNo()
|
||||
const char* buf,
|
||||
const std::streamsize bufSize,
|
||||
const label comm = UPstream::worldComm
|
||||
);
|
||||
//- Broadcast a zero value (buffer) size that can be matched
|
||||
//- by the UIPBstream constructor.
|
||||
static void send(Foam::zero, const int communicator);
|
||||
};
|
||||
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
\\ / A nd | www.openfoam.com
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2022-2024 OpenCFD Ltd.
|
||||
Copyright (C) 2022-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -35,19 +35,4 @@ void Foam::UIPBstream::bufferIPCrecv()
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||
|
||||
std::streamsize Foam::UIPBstream::read
|
||||
(
|
||||
const int rootProcNo,
|
||||
char* buf,
|
||||
const std::streamsize bufSize,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
NotImplemented;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
// ************************************************************************* //
|
||||
|
@ -5,7 +5,7 @@
|
||||
\\ / A nd | www.openfoam.com
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2022-2023 OpenCFD Ltd.
|
||||
Copyright (C) 2022-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -38,17 +38,8 @@ bool Foam::UOPBstream::bufferIPCsend()
|
||||
|
||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||
|
||||
bool Foam::UOPBstream::write
|
||||
(
|
||||
const int rootProcNo,
|
||||
const char* buf,
|
||||
const std::streamsize bufSize,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
NotImplemented;
|
||||
return false;
|
||||
}
|
||||
void Foam::UOPBstream::send(Foam::zero, const int communicator)
|
||||
{}
|
||||
|
||||
|
||||
// ************************************************************************* //
|
||||
|
@ -203,6 +203,25 @@ inline void push_request
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * Convenience Methods * * * * * * * * * * * * * //
|
||||
|
||||
//- Broadcast a single int64 value.
|
||||
//
|
||||
// Ensures consistent data types. Used within the following:
|
||||
// - UIPBstream::bufferIPCrecv()
|
||||
// - UOPBstream::bufferIPCsend()
|
||||
// - UOPBstream::send(Foam::zero, ...)
|
||||
|
||||
inline bool broadcast_int64(int64_t& value, int comm)
|
||||
{
|
||||
return
|
||||
(
|
||||
MPI_SUCCESS
|
||||
== MPI_Bcast(&value, 1, MPI_INT64_T, 0, MPICommunicators_[comm])
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
} // End namespace PstreamGlobals
|
||||
|
@ -5,7 +5,7 @@
|
||||
\\ / A nd | www.openfoam.com
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2022-2024 OpenCFD Ltd.
|
||||
Copyright (C) 2022-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -26,8 +26,8 @@ License
|
||||
\*---------------------------------------------------------------------------*/
|
||||
|
||||
#include "UIPstream.H"
|
||||
#include "PstreamGlobals.H"
|
||||
#include "IOstreams.H"
|
||||
#include "PstreamGlobals.H"
|
||||
|
||||
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
|
||||
|
||||
@ -37,33 +37,32 @@ void Foam::UIPBstream::bufferIPCrecv()
|
||||
// 1. for the data size
|
||||
// 2. for the data itself
|
||||
|
||||
// Expected message size, similar to MPI_Probe
|
||||
// Same type must be expected in UOPBstream::bufferIPCsend()
|
||||
std::streamsize bufSize(0);
|
||||
|
||||
// Broadcast #1 - data size
|
||||
if
|
||||
(
|
||||
!UPstream::broadcast
|
||||
(
|
||||
reinterpret_cast<char*>(&bufSize),
|
||||
sizeof(std::streamsize),
|
||||
comm_,
|
||||
fromProcNo_ //< is actually rootProcNo
|
||||
)
|
||||
)
|
||||
// Same data type must be used in UOPBstream::bufferIPCsend()
|
||||
|
||||
int64_t count(0);
|
||||
if (!PstreamGlobals::broadcast_int64(count, comm_))
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "MPI_Bcast failure receiving buffer size" << nl
|
||||
<< "Broadcast failure receiving buffer size" << nl
|
||||
<< " comm:" << comm_ << nl
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
|
||||
if (UPstream::debug)
|
||||
// This is not actually possible - sender uses List::size()
|
||||
//
|
||||
// if (FOAM_UNLIKELY(count > int64_t(UList<char>::max_size())))
|
||||
// {
|
||||
// FatalErrorInFunction
|
||||
// << "Broadcast list size larger than UList<char>::max_size()"
|
||||
// << Foam::abort(FatalError);
|
||||
// }
|
||||
|
||||
if (FOAM_UNLIKELY(UPstream::debug))
|
||||
{
|
||||
Perr<< "UOPBstream IPC read buffer :"
|
||||
<< " root:" << fromProcNo_
|
||||
Perr<< "UIPBstream IPC read buffer :"
|
||||
<< " comm:" << comm_
|
||||
<< " probed size:" << label(bufSize)
|
||||
<< " probed size:" << label(count)
|
||||
<< " wanted size:" << recvBuf_.capacity()
|
||||
<< Foam::endl;
|
||||
}
|
||||
@ -71,33 +70,32 @@ void Foam::UIPBstream::bufferIPCrecv()
|
||||
|
||||
// Set buffer size, avoiding any copying and resize doubling etc.
|
||||
recvBuf_.clear();
|
||||
if (recvBuf_.capacity() < label(bufSize))
|
||||
if (recvBuf_.capacity() < label(count))
|
||||
{
|
||||
recvBuf_.setCapacity_nocopy(label(bufSize));
|
||||
recvBuf_.setCapacity_nocopy(label(count));
|
||||
}
|
||||
recvBuf_.resize_nocopy(label(bufSize));
|
||||
recvBuf_.resize_nocopy(label(count));
|
||||
|
||||
// This is the only real information we can trust
|
||||
messageSize_ = label(bufSize);
|
||||
messageSize_ = label(count);
|
||||
|
||||
|
||||
// Broadcast #2 - data content
|
||||
// - skip if there is no data to receive
|
||||
if
|
||||
(
|
||||
(bufSize > 0)
|
||||
(count > 0) // ie, not empty
|
||||
&& !UPstream::broadcast
|
||||
(
|
||||
recvBuf_.data(),
|
||||
recvBuf_.size(), // same as bufSize
|
||||
comm_,
|
||||
fromProcNo_ //< is actually rootProcNo
|
||||
recvBuf_.size(), // same as count
|
||||
comm_
|
||||
)
|
||||
)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "MPI_Bcast failure receiving buffer data:"
|
||||
<< recvBuf_.size() << nl
|
||||
<< "Broadcast failure receiving buffer data:"
|
||||
<< recvBuf_.size() << " comm:" << comm_ << nl
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
|
||||
@ -108,29 +106,4 @@ void Foam::UIPBstream::bufferIPCrecv()
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||
|
||||
std::streamsize Foam::UIPBstream::read
|
||||
(
|
||||
const int rootProcNo,
|
||||
char* buf,
|
||||
const std::streamsize bufSize,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
if
|
||||
(
|
||||
!UPstream::broadcast(buf, bufSize, comm, rootProcNo)
|
||||
)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "MPI_Bcast failure receiving data:" << label(bufSize) << nl
|
||||
<< Foam::abort(FatalError);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return bufSize;
|
||||
}
|
||||
|
||||
|
||||
// ************************************************************************* //
|
||||
|
@ -5,7 +5,7 @@
|
||||
\\ / A nd | www.openfoam.com
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2022-2023 OpenCFD Ltd.
|
||||
Copyright (C) 2022-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -39,55 +39,35 @@ bool Foam::UOPBstream::bufferIPCsend()
|
||||
|
||||
PstreamGlobals::checkCommunicator(comm_, toProcNo_);
|
||||
|
||||
// Same type must be expected in UIPBstream::bufferIPCrecv()
|
||||
std::streamsize bufSize(sendBuf_.size());
|
||||
|
||||
// TODO: some corrective action
|
||||
#if 0
|
||||
if (bufSize > std::streamsize(INT_MAX))
|
||||
{
|
||||
Perr<< "UOPBstream::write() :"
|
||||
<< " exceeds INT_MAX bytes" << Foam::endl;
|
||||
error::printStack(Perr);
|
||||
}
|
||||
#endif
|
||||
|
||||
// Broadcast #1 - data size
|
||||
if
|
||||
(
|
||||
!UPstream::broadcast
|
||||
(
|
||||
reinterpret_cast<char*>(&bufSize),
|
||||
sizeof(std::streamsize),
|
||||
comm_,
|
||||
toProcNo_ //< is actually rootProcNo
|
||||
)
|
||||
)
|
||||
// Same data type must be used in UIPBstream::bufferIPCrecv()
|
||||
|
||||
int64_t count(sendBuf_.size());
|
||||
if (!PstreamGlobals::broadcast_int64(count, comm_))
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "MPI_Bcast failure sending buffer size:" << bufSize << nl
|
||||
<< "Broadcast failure sending buffer size:"
|
||||
<< label(count) << " comm:" << comm_ << nl
|
||||
<< Foam::abort(FatalError);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
// Broadcast #2 - data content
|
||||
// - skip if there is no data to send
|
||||
if
|
||||
(
|
||||
(bufSize > 0)
|
||||
(count > 0) // ie, not empty
|
||||
&& !UPstream::broadcast
|
||||
(
|
||||
sendBuf_.data(),
|
||||
sendBuf_.size(), // same as bufSize
|
||||
comm_,
|
||||
toProcNo_ //< is actually rootProcNo
|
||||
sendBuf_.size(), // same as count
|
||||
comm_
|
||||
)
|
||||
)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "MPI_Bcast failure sending buffer data:"
|
||||
<< sendBuf_.size() << nl
|
||||
<< "Broadcast failure sending buffer data:"
|
||||
<< sendBuf_.size() << " comm:" << comm_ << nl
|
||||
<< Foam::abort(FatalError);
|
||||
return false;
|
||||
}
|
||||
@ -98,26 +78,10 @@ bool Foam::UOPBstream::bufferIPCsend()
|
||||
|
||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||
|
||||
bool Foam::UOPBstream::write
|
||||
(
|
||||
const int rootProcNo,
|
||||
const char* buf,
|
||||
const std::streamsize bufSize,
|
||||
const label comm
|
||||
)
|
||||
void Foam::UOPBstream::send(Foam::zero, const int communicator)
|
||||
{
|
||||
if
|
||||
(
|
||||
!UPstream::broadcast(const_cast<char*>(buf), bufSize, comm, rootProcNo)
|
||||
)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "MPI_Bcast failure sending buffer data:" << label(bufSize) << nl
|
||||
<< Foam::abort(FatalError);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
int64_t count(0);
|
||||
PstreamGlobals::broadcast_int64(count, communicator);
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user