ENH: cleanup Pstream internal names, cull unneeded parameters

- Pstreams can be ascii/binary but are always currentVersion

- rename UIPstream externalBuf_ to 'recvBuf_' for similar naming as
  PstreamBuffers and symmetry with UOPstream::sendBuf_

- specific enum size for commsType (for more compact structures in the
  future). Default construct lists items.

BUG: possible incidental indexing in UIPstream::read(char*, std::streamsize)

- raw reading had been split into beginRawRead(), readRaw().
  However, this could change the current input position (due to word
  boundary alignment), even if the expected count is zero. Make a
  no-op for count == 0. This mirrors UOPstream::write behaviour.
This commit is contained in:
Mark Olesen 2021-10-19 11:27:39 +02:00
parent 609fb366e3
commit 1f20747b1a
17 changed files with 313 additions and 290 deletions

View File

@ -37,8 +37,7 @@ Foam::IPstream::IPstream
const label bufSize,
const int tag,
const label comm,
IOstreamOption::streamFormat fmt,
IOstreamOption::versionNumber ver
IOstreamOption::streamFormat fmt
)
:
Pstream(commsType, bufSize),
@ -46,15 +45,14 @@ Foam::IPstream::IPstream
(
commsType,
fromProcNo,
buf_,
externalBufPosition_,
tag, // tag
Pstream::transferBuf_,
transferBufPosition_,
tag,
comm,
false, // do not clear buf_ if at end
fmt,
ver
false, // Do not clear Pstream::transferBuf_ if at end
fmt
),
externalBufPosition_(0)
transferBufPosition_(0)
{}

View File

@ -56,16 +56,17 @@ class IPstream
public Pstream,
public UIPstream
{
// Private Data
//- Receive index
label externalBufPosition_;
//- Receive index into Pstream::transferBuf_
label transferBufPosition_;
public:
// Constructors
//- Construct given process index to read from and optional buffer size,
//- read format and IO version
//- Construct given process index to read from
//- and optional buffer size, read format
IPstream
(
const commsTypes commsType,
@ -73,8 +74,7 @@ public:
const label bufSize = 0,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY,
IOstreamOption::versionNumber ver = IOstreamOption::currentVersion
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
};

View File

@ -37,12 +37,20 @@ Foam::OPstream::OPstream
const label bufSize,
const int tag,
const label comm,
IOstreamOption::streamFormat fmt,
IOstreamOption::versionNumber ver
IOstreamOption::streamFormat fmt
)
:
Pstream(commsType, bufSize),
UOPstream(commsType, toProcNo, buf_, tag, comm, true, fmt, ver)
UOPstream
(
commsType,
toProcNo,
Pstream::transferBuf_,
tag,
comm,
true, // sendAtDestruct
fmt
)
{}

View File

@ -60,8 +60,8 @@ public:
// Constructors
//- Construct given process index to send to and optional buffer size,
//- write format and IO version
//- Construct for given process index to send to
//- and optional buffer size, write format
OPstream
(
const commsTypes commsType,
@ -69,8 +69,7 @@ public:
const label bufSize = 0,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY,
IOstreamOption::versionNumber ver = IOstreamOption::currentVersion
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
};

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2016 OpenCFD Ltd.
Copyright (C) 2016-2021 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -32,8 +32,8 @@ Description
SourceFiles
Pstream.C
gatherScatter.C
combineGatherScatter.C
gatherScatter.C
gatherScatterList.C
exchange.C
@ -92,10 +92,10 @@ class Pstream
protected:
// Protected data
// Protected Data
//- Transfer buffer
DynamicList<char> buf_;
//- Allocated transfer buffer (can be used for send or receive)
DynamicList<char> transferBuf_;
public:
@ -107,26 +107,25 @@ public:
// Constructors
//- Construct given optional buffer size
Pstream
explicit Pstream
(
const commsTypes commsType,
const label bufSize = 0
)
:
UPstream(commsType),
buf_(0)
UPstream(commsType)
{
if (bufSize)
{
buf_.setCapacity(bufSize + 2*sizeof(scalar) + 1);
transferBuf_.setCapacity(bufSize + 2*sizeof(scalar) + 1);
}
}
// Gather and scatter
//- Gather data. Apply bop to combine Value
// from different processors
//- Gather data.
//- Apply bop to combine Value from different processors
template<class T, class BinaryOp>
static void gather
(
@ -166,6 +165,7 @@ public:
const label comm = Pstream::worldComm
);
// Combine variants. Inplace combine values from processors.
// (Uses construct from Istream instead of <<)
@ -208,6 +208,7 @@ public:
const label comm = Pstream::worldComm
);
// Combine variants working on whole List at a time.
template<class T, class CombineOp>
@ -249,6 +250,7 @@ public:
const label comm = Pstream::worldComm
);
// Combine variants working on whole map at a time. Container needs to
// have iterators and find() defined.

View File

@ -28,11 +28,6 @@ License
#include "PstreamBuffers.H"
/* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
Foam::DynamicList<char> Foam::PstreamBuffers::nullBuf(0);
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::PstreamBuffers::PstreamBuffers
@ -40,18 +35,16 @@ Foam::PstreamBuffers::PstreamBuffers
const UPstream::commsTypes commsType,
const int tag,
const label comm,
IOstreamOption::streamFormat fmt,
IOstreamOption::versionNumber ver
IOstreamOption::streamFormat fmt
)
:
commsType_(commsType),
tag_(tag),
comm_(comm),
format_(fmt),
version_(ver),
sendBuf_(UPstream::nProcs(comm)),
recvBuf_(UPstream::nProcs(comm)),
recvBufPos_(UPstream::nProcs(comm), 0),
recvBufPos_(UPstream::nProcs(comm), Zero),
finishedSendsCalled_(false)
{}
@ -67,9 +60,8 @@ Foam::PstreamBuffers::~PstreamBuffers()
{
FatalErrorInFunction
<< "Message from processor " << proci
<< " not fully consumed. messageSize:" << recvBuf_[proci].size()
<< " bytes of which only " << recvBufPos_[proci]
<< " consumed."
<< " Only consumed " << recvBufPos_[proci] << " of "
<< recvBuf_[proci].size() << " bytes" << nl
<< Foam::abort(FatalError);
}
}
@ -80,6 +72,7 @@ Foam::PstreamBuffers::~PstreamBuffers()
void Foam::PstreamBuffers::finishedSends(const bool block)
{
// Could also check that it is not called twice
finishedSendsCalled_ = true;
if (commsType_ == UPstream::commsTypes::nonBlocking)
@ -98,6 +91,7 @@ void Foam::PstreamBuffers::finishedSends(const bool block)
void Foam::PstreamBuffers::finishedSends(labelList& recvSizes, const bool block)
{
// Could also check that it is not called twice
finishedSendsCalled_ = true;
if (commsType_ == UPstream::commsTypes::nonBlocking)

View File

@ -91,79 +91,79 @@ class PstreamBuffers
friend class UOPstream;
friend class UIPstream;
// Private data
// Private Data
//- Communications type of this stream
const UPstream::commsTypes commsType_;
//- The transfer message type
const int tag_;
//- Communicator
const label comm_;
//- Buffer format (ascii | binary)
const IOstreamOption::streamFormat format_;
const IOstreamOption::versionNumber version_;
//- Send buffer
List<DynamicList<char>> sendBuf_;
//- Receive buffer
List<DynamicList<char>> recvBuf_;
//- Read position in recvBuf_
//- Current read positions within recvBuf_
labelList recvBufPos_;
//- Track if sends are complete
bool finishedSendsCalled_;
public:
// Static data
static DynamicList<char> nullBuf;
// Constructors
//- Construct given comms type,
// write format and IO version
PstreamBuffers
//- Construct given comms type, communication options, IO format
explicit PstreamBuffers
(
const UPstream::commsTypes commsType,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat vmt = IOstreamOption::BINARY,
IOstreamOption::versionNumber ver = IOstreamOption::currentVersion
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Destructor
//- Destructor - checks that all data have been consumed
~PstreamBuffers();
// Member functions
// Member Functions
int tag() const
//- The transfer message type
int tag() const noexcept
{
return tag_;
}
label comm() const
//- Communicator
label comm() const noexcept
{
return comm_;
}
//- Mark all sends as having been done. This will start receives
// in non-blocking mode. If block will wait for all transfers to
// finish (only relevant for nonBlocking mode)
//- Mark all sends as having been done.
// This will start receives in non-blocking mode.
// If block will wait for all transfers to finish
// (only relevant for nonBlocking mode)
void finishedSends(const bool block = true);
//- Mark all sends as having been done. Same as above but also returns
// sizes (bytes) received. Note:currently only valid for
// non-blocking.
//- Mark all sends as having been done.
// Same as above but also returns sizes (bytes) received.
// \note currently only valid for non-blocking.
void finishedSends(labelList& recvSizes, const bool block = true);
//- Clear storage and reset
//- Reset (clear) individual buffers and reset state.
// Does not clear buffer storage
void clear();
};

View File

@ -57,14 +57,26 @@ inline static void processFlags(Istream& is, int flagMask)
}
}
} // End anonymous namespace
// Return the position with word boundary alignment
inline static label byteAlign(const label pos, const size_t align)
{
return
(
(align > 1)
? (align + ((pos - 1) & ~(align - 1)))
: pos
);
}
} // End namespace Foam
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
inline void Foam::UIPstream::checkEof()
{
if (externalBufPosition_ == messageSize_)
if (recvBufPos_ == messageSize_)
{
setEof();
}
@ -73,11 +85,7 @@ inline void Foam::UIPstream::checkEof()
inline void Foam::UIPstream::prepareBuffer(const size_t align)
{
if (align > 1)
{
externalBufPosition_ =
align + ((externalBufPosition_ - 1) & ~(align - 1));
}
recvBufPos_ = byteAlign(recvBufPos_, align);
}
@ -86,8 +94,8 @@ inline void Foam::UIPstream::readFromBuffer(T& val)
{
prepareBuffer(sizeof(T));
val = reinterpret_cast<T&>(externalBuf_[externalBufPosition_]);
externalBufPosition_ += sizeof(T);
val = reinterpret_cast<T&>(recvBuf_[recvBufPos_]);
recvBufPos_ += sizeof(T);
checkEof();
}
@ -98,7 +106,7 @@ inline void Foam::UIPstream::readFromBuffer
const size_t count
)
{
const char* const __restrict__ buf = &externalBuf_[externalBufPosition_];
const char* const __restrict__ buf = &recvBuf_[recvBufPos_];
char* const __restrict__ output = reinterpret_cast<char*>(data);
for (size_t i = 0; i < count; ++i)
@ -106,12 +114,12 @@ inline void Foam::UIPstream::readFromBuffer
output[i] = buf[i];
}
externalBufPosition_ += count;
recvBufPos_ += count;
checkEof();
}
inline Foam::Istream& Foam::UIPstream::readStringFromBuffer(std::string& str)
inline Foam::Istream& Foam::UIPstream::readString(std::string& str)
{
// Use std::string::assign() to copy content, including '\0'.
// Stripping (when desired) is the responsibility of the sending side.
@ -121,8 +129,8 @@ inline Foam::Istream& Foam::UIPstream::readStringFromBuffer(std::string& str)
if (len)
{
str.assign(&externalBuf_[externalBufPosition_], len);
externalBufPosition_ += len;
str.assign(&recvBuf_[recvBufPos_], len);
recvBufPos_ += len;
checkEof();
}
else
@ -144,11 +152,11 @@ Foam::UIPstream::~UIPstream()
{
Pout<< "UIPstream::~UIPstream() : tag:" << tag_
<< " fromProcNo:" << fromProcNo_
<< " clearing externalBuf_ of size "
<< externalBuf_.size()
<< " clearing receive buffer of size "
<< recvBuf_.size()
<< " messageSize_:" << messageSize_ << endl;
}
externalBuf_.clearStorage();
recvBuf_.clearStorage();
}
}
@ -232,7 +240,7 @@ Foam::Istream& Foam::UIPstream::read(token& t)
case token::tokenType::DIRECTIVE :
{
word val;
if (readStringFromBuffer(val))
if (readString(val))
{
if (token::compound::isCompound(val))
{
@ -258,7 +266,7 @@ Foam::Istream& Foam::UIPstream::read(token& t)
case token::tokenType::VERBATIM :
{
string val;
if (readStringFromBuffer(val))
if (readString(val))
{
t = std::move(val);
t.setType(token::tokenType(c));
@ -335,8 +343,8 @@ Foam::Istream& Foam::UIPstream::read(token& t)
Foam::Istream& Foam::UIPstream::read(char& c)
{
c = externalBuf_[externalBufPosition_];
++externalBufPosition_;
c = recvBuf_[recvBufPos_];
++recvBufPos_;
checkEof();
return *this;
}
@ -344,13 +352,13 @@ Foam::Istream& Foam::UIPstream::read(char& c)
Foam::Istream& Foam::UIPstream::read(word& str)
{
return readStringFromBuffer(str);
return readString(str);
}
Foam::Istream& Foam::UIPstream::read(string& str)
{
return readStringFromBuffer(str);
return readString(str);
}
@ -377,9 +385,14 @@ Foam::Istream& Foam::UIPstream::read(doubleScalar& val)
Foam::Istream& Foam::UIPstream::read(char* data, std::streamsize count)
{
beginRawRead();
readRaw(data, count);
endRawRead();
if (count)
{
// For count == 0, a no-op
// - see UOPstream::write(const char*, streamsize)
beginRawRead();
readRaw(data, count);
endRawRead();
}
return *this;
}
@ -405,7 +418,9 @@ bool Foam::UIPstream::beginRawRead()
<< Foam::abort(FatalError);
}
// Alignment = 8, as per read(const char*, streamsize)
// Align on word boundary (64-bit)
// - as per read(const char*, streamsize)
// The check for zero-size will have been done by the caller
prepareBuffer(8);
return true;
@ -414,7 +429,7 @@ bool Foam::UIPstream::beginRawRead()
void Foam::UIPstream::rewind()
{
externalBufPosition_ = 0;
recvBufPos_ = 0;
}

View File

@ -63,9 +63,9 @@ class UIPstream
int fromProcNo_;
DynamicList<char>& externalBuf_;
DynamicList<char>& recvBuf_;
label& externalBufPosition_;
label& recvBufPos_;
const int tag_;
@ -78,45 +78,45 @@ class UIPstream
// Private Member Functions
//- Check the bufferPosition against messageSize_ for EOF
//- Check buffer position against messageSize_ for EOF
inline void checkEof();
//- Prepare transfer buffer by adjusting alignment
//- Prepare receive buffer by adjusting alignment
inline void prepareBuffer(const size_t align);
//- Read a T from the transfer buffer
//- Read a T from the receive buffer
template<class T>
inline void readFromBuffer(T& val);
//- Read count bytes of data from the transfer buffer.
//- Read count bytes of data from the receive buffer.
// Prior data alignment is done by prepareBuffer
inline void readFromBuffer(void* data, const size_t count);
//- Read string length and its content.
inline Istream& readStringFromBuffer(std::string& str);
//- Read string length and string content
inline Istream& readString(std::string& str);
public:
// Constructors
//- Construct given process index to read from and optional buffer size,
//- read format and IO version
//- Construct given process index to read from using the given
//- attached receive buffer, optional communication characteristics
//- and IO format
UIPstream
(
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
label& externalBufPosition,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool clearAtEnd = false, // destroy externalBuf if at end
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY,
IOstreamOption::versionNumber ver = IOstreamOption::currentVersion
const bool clearAtEnd = false, // destroy receiveBuf if at end
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Construct given buffers
UIPstream(const int fromProcNo, PstreamBuffers&);
UIPstream(const int fromProcNo, PstreamBuffers& buffers);
//- Destructor

View File

@ -31,6 +31,25 @@ License
#include "token.H"
#include <cctype>
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
namespace Foam
{
// Return the position with word boundary alignment
inline static label byteAlign(const label pos, const size_t align)
{
return
(
(align > 1)
? (align + ((pos - 1) & ~(align - 1)))
: pos
);
}
} // End namespace Foam
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
inline void Foam::UOPstream::prepareBuffer
@ -44,14 +63,8 @@ inline void Foam::UOPstream::prepareBuffer
return;
}
// The current output position
label pos = sendBuf_.size();
if (align > 1)
{
// Align output position. Pads sendBuf_.size() - oldPos characters.
pos = align + ((pos - 1) & ~(align - 1));
}
// Align for the next output position
const label pos = byteAlign(sendBuf_.size(), align);
// Extend buffer (as required)
sendBuf_.reserve(max(1000, label(pos + count)));
@ -68,16 +81,6 @@ inline void Foam::UOPstream::writeToBuffer(const T& val)
}
inline void Foam::UOPstream::writeToBuffer(const char& c)
{
if (!sendBuf_.capacity())
{
sendBuf_.setCapacity(1000);
}
sendBuf_.append(c);
}
inline void Foam::UOPstream::writeToBuffer
(
const void* data,
@ -98,7 +101,7 @@ inline void Foam::UOPstream::writeToBuffer
// Extend the addressable range for direct pointer access
sendBuf_.resize(pos + count);
char* const __restrict__ buf = (sendBuf_.begin() + pos);
char* const __restrict__ buf = (sendBuf_.data() + pos);
const char* const __restrict__ input = reinterpret_cast<const char*>(data);
for (size_t i = 0; i < count; ++i)
@ -108,11 +111,21 @@ inline void Foam::UOPstream::writeToBuffer
}
inline void Foam::UOPstream::writeStringToBuffer(const std::string& str)
inline void Foam::UOPstream::putChar(const char c)
{
if (!sendBuf_.capacity())
{
sendBuf_.setCapacity(1000);
}
sendBuf_.append(c);
}
inline void Foam::UOPstream::putString(const std::string& str)
{
const size_t len = str.size();
writeToBuffer(len);
writeToBuffer(str.data(), len, 1);
writeToBuffer(str.data(), len, 1); // no-op when len == 0
}
@ -126,12 +139,11 @@ Foam::UOPstream::UOPstream
const int tag,
const label comm,
const bool sendAtDestruct,
IOstreamOption::streamFormat fmt,
IOstreamOption::versionNumber ver
IOstreamOption::streamFormat fmt
)
:
UPstream(commsType),
Ostream(fmt, ver),
Ostream(fmt, IOstreamOption::currentVersion),
toProcNo_(toProcNo),
sendBuf_(sendBuf),
tag_(tag),
@ -146,7 +158,7 @@ Foam::UOPstream::UOPstream
Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers)
:
UPstream(buffers.commsType_),
Ostream(buffers.format_, buffers.version_),
Ostream(buffers.format_, IOstreamOption::currentVersion),
toProcNo_(toProcNo),
sendBuf_(buffers.sendBuf_[toProcNo]),
tag_(buffers.tag_),
@ -196,8 +208,8 @@ bool Foam::UOPstream::write(const token& tok)
{
case token::tokenType::FLAG :
{
writeToBuffer(char(token::tokenType::FLAG));
writeToBuffer(char(tok.flagToken()));
putChar(token::tokenType::FLAG);
putChar(tok.flagToken());
return true;
}
@ -206,8 +218,8 @@ bool Foam::UOPstream::write(const token& tok)
case token::tokenType::WORD :
case token::tokenType::DIRECTIVE :
{
writeToBuffer(char(tok.type()));
writeStringToBuffer(tok.wordToken());
putChar(tok.type());
putString(tok.wordToken());
return true;
}
@ -218,8 +230,8 @@ bool Foam::UOPstream::write(const token& tok)
case token::tokenType::VARIABLE :
case token::tokenType::VERBATIM :
{
writeToBuffer(char(tok.type()));
writeStringToBuffer(tok.stringToken());
putChar(tok.type());
putString(tok.stringToken());
return true;
}
@ -236,7 +248,7 @@ Foam::Ostream& Foam::UOPstream::write(const char c)
{
if (!isspace(c))
{
writeToBuffer(c);
putChar(c);
}
return *this;
@ -262,8 +274,8 @@ Foam::Ostream& Foam::UOPstream::write(const char* str)
Foam::Ostream& Foam::UOPstream::write(const word& str)
{
writeToBuffer(char(token::tokenType::WORD));
writeStringToBuffer(str);
putChar(token::tokenType::WORD);
putString(str);
return *this;
}
@ -271,8 +283,8 @@ Foam::Ostream& Foam::UOPstream::write(const word& str)
Foam::Ostream& Foam::UOPstream::write(const string& str)
{
writeToBuffer(char(token::tokenType::STRING));
writeStringToBuffer(str);
putChar(token::tokenType::STRING);
putString(str);
return *this;
}
@ -286,13 +298,13 @@ Foam::Ostream& Foam::UOPstream::writeQuoted
{
if (quoted)
{
writeToBuffer(char(token::tokenType::STRING));
putChar(token::tokenType::STRING);
}
else
{
writeToBuffer(char(token::tokenType::WORD));
putChar(token::tokenType::WORD);
}
writeStringToBuffer(str);
putString(str);
return *this;
}
@ -300,7 +312,7 @@ Foam::Ostream& Foam::UOPstream::writeQuoted
Foam::Ostream& Foam::UOPstream::write(const int32_t val)
{
writeToBuffer(char(token::tokenType::LABEL));
putChar(token::tokenType::LABEL);
writeToBuffer(val);
return *this;
}
@ -308,7 +320,7 @@ Foam::Ostream& Foam::UOPstream::write(const int32_t val)
Foam::Ostream& Foam::UOPstream::write(const int64_t val)
{
writeToBuffer(char(token::tokenType::LABEL));
putChar(token::tokenType::LABEL);
writeToBuffer(val);
return *this;
}
@ -316,7 +328,7 @@ Foam::Ostream& Foam::UOPstream::write(const int64_t val)
Foam::Ostream& Foam::UOPstream::write(const floatScalar val)
{
writeToBuffer(char(token::tokenType::FLOAT));
putChar(token::tokenType::FLOAT);
writeToBuffer(val);
return *this;
}
@ -324,7 +336,7 @@ Foam::Ostream& Foam::UOPstream::write(const floatScalar val)
Foam::Ostream& Foam::UOPstream::write(const doubleScalar val)
{
writeToBuffer(char(token::tokenType::DOUBLE));
putChar(token::tokenType::DOUBLE);
writeToBuffer(val);
return *this;
}
@ -339,6 +351,7 @@ Foam::Ostream& Foam::UOPstream::write(const char* data, std::streamsize count)
<< Foam::abort(FatalError);
}
// Align on word boundary (64-bit)
writeToBuffer(data, count, 8);
return *this;
@ -370,7 +383,8 @@ bool Foam::UOPstream::beginRawWrite(std::streamsize count)
<< Foam::abort(FatalError);
}
// Alignment = 8, as per write(const char*, streamsize)
// Align on word boundary (64-bit)
// - as per write(const char*, streamsize)
prepareBuffer(count, 8);
return true;

View File

@ -75,18 +75,16 @@ class UOPstream
// Private Member Functions
//- Prepare buffer for count bytes of output at specified alignment.
//- Prepare send buffer for count bytes of output,
//- with specified alignment.
inline void prepareBuffer(const size_t count, const size_t align);
//- Write data to the transfer buffer
//- Generic write data to the send buffer, aligned by sizeof(T)
template<class T>
inline void writeToBuffer(const T& val);
//- Write a char to the transfer buffer
inline void writeToBuffer(const char& c);
//- Write count bytes of data to the transfer buffer
// using align byte alignment
//- Write count bytes of data to the send buffer,
//- using align byte alignment
inline void writeToBuffer
(
const void* data,
@ -94,17 +92,21 @@ class UOPstream
const size_t align
);
//- Write string length and content.
//- Add a single char to the send buffer. No alignment needed
inline void putChar(const char c);
//- Write string length and string content.
// The content includes the trailing nul char.
inline void writeStringToBuffer(const std::string& str);
inline void putString(const std::string& str);
public:
// Constructors
//- Construct given process index to send to and optional buffer size,
//- write format and IO version
//- Construct given process index to write to using the given
//- attached send buffer, optional communication characteristics
//- and IO format
UOPstream
(
const commsTypes commsType,
@ -113,15 +115,14 @@ public:
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool sendAtDestruct = true,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY,
IOstreamOption::versionNumber ver = IOstreamOption::currentVersion
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Construct given buffers
UOPstream(const int toProcNo, PstreamBuffers& buffers);
//- Destructor
//- Destructor.
~UOPstream();

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2015-2020 OpenCFD Ltd.
Copyright (C) 2015-2021 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -106,7 +106,7 @@ Foam::label Foam::UPstream::allocateCommunicator
label index;
if (!freeComms_.empty())
{
index = freeComms_.pop();
index = freeComms_.remove(); // LIFO pop
}
else
{
@ -114,10 +114,10 @@ Foam::label Foam::UPstream::allocateCommunicator
index = parentCommunicator_.size();
myProcNo_.append(-1);
procIDs_.append(List<int>(0));
procIDs_.append(List<int>());
parentCommunicator_.append(-1);
linearCommunication_.append(List<commsStruct>(0));
treeCommunication_.append(List<commsStruct>(0));
linearCommunication_.append(List<commsStruct>());
treeCommunication_.append(List<commsStruct>());
}
if (debug)
@ -186,7 +186,7 @@ void Foam::UPstream::freeCommunicator
linearCommunication_[communicator].clear();
treeCommunication_[communicator].clear();
freeComms_.push(communicator);
freeComms_.append(communicator); // LIFO push
}
@ -365,14 +365,14 @@ bool Foam::UPstream::haveThreads_(false);
int Foam::UPstream::msgType_(1);
Foam::LIFOStack<Foam::label> Foam::UPstream::freeComms_;
Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIDs_(10);
Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10);
Foam::DynamicList<Foam::label> Foam::UPstream::freeComms_;
Foam::wordList Foam::UPstream::allWorlds_(Foam::one{}, "");
Foam::labelList Foam::UPstream::worldIDs_(Foam::one{}, 0);

View File

@ -48,7 +48,6 @@ SourceFiles
#include "string.H"
#include "Enum.H"
#include "ListOps.H"
#include "LIFOStack.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -67,7 +66,7 @@ public:
typedef IntRange<int> rangeType;
//- Types of communications
enum class commsTypes
enum class commsTypes : char
{
blocking,
scheduled,
@ -83,7 +82,7 @@ public:
//- Structure for communicating between processors
class commsStruct
{
// Private data
// Private Data
//- procID of above processor
label above_;
@ -103,7 +102,7 @@ public:
// Constructors
//- Construct null
//- Default construct. Above == -1
commsStruct();
//- Construct from components
@ -128,33 +127,30 @@ public:
// Member Functions
// Access
label above() const noexcept
{
return above_;
}
label above() const
{
return above_;
}
const labelList& below() const noexcept
{
return below_;
}
const labelList& below() const
{
return below_;
}
const labelList& allBelow() const noexcept
{
return allBelow_;
}
const labelList& allBelow() const
{
return allBelow_;
}
const labelList& allNotBelow() const
{
return allNotBelow_;
}
const labelList& allNotBelow() const noexcept
{
return allNotBelow_;
}
// Member operators
// Member Operators
bool operator==(const commsStruct&) const;
bool operator!=(const commsStruct&) const;
@ -183,7 +179,7 @@ public:
private:
// Private Data
// Private Static Data
//- By default this is not a parallel run
static bool parRun_;
@ -203,9 +199,6 @@ private:
// Communicator specific data
//- Free communicators
static LIFOStack<label> freeComms_;
//- My processor number
static DynamicList<int> myProcNo_;
@ -215,6 +208,9 @@ private:
//- Parent communicator
static DynamicList<label> parentCommunicator_;
//- Free communicators
static DynamicList<label> freeComms_;
//- Linear communication schedule
static DynamicList<List<commsStruct>> linearCommunication_;
@ -263,6 +259,7 @@ protected:
//- Communications type of this stream
commsTypes commsType_;
public:
// Declare name of the class and its debug switch
@ -301,8 +298,8 @@ public:
// Constructors
//- Construct given optional buffer size
UPstream(const commsTypes commsType)
//- Construct for given communication type
explicit UPstream(const commsTypes commsType)
:
commsType_(commsType)
{}
@ -356,7 +353,7 @@ public:
freeCommunicator(comm_);
}
operator label() const
operator label() const noexcept
{
return comm_;
}
@ -424,21 +421,22 @@ public:
//- Set as parallel run on/off.
// \return the previous value
static bool parRun(const bool on)
static bool parRun(const bool on) noexcept
{
bool old(parRun_);
parRun_ = on;
return old;
}
//- Test if this a parallel run, or allow modify access
static bool& parRun()
//- Test if this a parallel run
// Modify access is deprecated
static bool& parRun() noexcept
{
return parRun_;
}
//- Have support for threads
static bool haveThreads()
static bool haveThreads() noexcept
{
return haveThreads_;
}
@ -482,13 +480,13 @@ public:
// Worlds
//- All worlds
static const wordList& allWorlds()
static const wordList& allWorlds() noexcept
{
return allWorlds_;
}
//- worldID (index in allWorlds) of all processes
static const labelList& worldIDs()
static const labelList& worldIDs() noexcept
{
return worldIDs_;
}
@ -539,24 +537,24 @@ public:
}
//- Message tag of standard messages
static int& msgType()
static int& msgType() noexcept
{
return msgType_;
}
//- Get the communications type of the stream
commsTypes commsType() const
commsTypes commsType() const noexcept
{
return commsType_;
}
//- Set the communications type of the stream
commsTypes commsType(const commsTypes ct)
commsTypes commsType(const commsTypes ct) noexcept
{
commsTypes oldCommsType = commsType_;
commsTypes old(commsType_);
commsType_ = ct;
return oldCommsType;
return old;
}

View File

@ -6,6 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2021 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -26,16 +27,15 @@ License
\*---------------------------------------------------------------------------*/
#include "UPstream.H"
#include "boolList.H"
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::UPstream::commsStruct::commsStruct()
:
above_(-1),
below_(0),
allBelow_(0),
allNotBelow_(0)
below_(),
allBelow_(),
allNotBelow_()
{}

View File

@ -37,20 +37,19 @@ Foam::UIPstream::UIPstream
(
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
label& externalBufPosition,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
const int tag,
const label comm,
const bool clearAtEnd,
IOstreamOption::streamFormat fmt,
IOstreamOption::versionNumber ver
IOstreamOption::streamFormat fmt
)
:
UPstream(commsType),
Istream(fmt, ver),
Istream(fmt, IOstreamOption::currentVersion),
fromProcNo_(fromProcNo),
externalBuf_(externalBuf),
externalBufPosition_(externalBufPosition),
recvBuf_(receiveBuf),
recvBufPos_(receiveBufPosition),
tag_(tag),
comm_(comm),
clearAtEnd_(clearAtEnd),
@ -63,10 +62,10 @@ Foam::UIPstream::UIPstream
Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
:
UPstream(buffers.commsType_),
Istream(buffers.format_, buffers.version_),
Istream(buffers.format_, IOstreamOption::currentVersion),
fromProcNo_(fromProcNo),
externalBuf_(buffers.recvBuf_[fromProcNo]),
externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
recvBuf_(buffers.recvBuf_[fromProcNo]),
recvBufPos_(buffers.recvBufPos_[fromProcNo]),
tag_(buffers.tag_),
comm_(buffers.comm_),
clearAtEnd_(true),

View File

@ -42,20 +42,19 @@ Foam::UIPstream::UIPstream
(
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
label& externalBufPosition,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
const int tag,
const label comm,
const bool clearAtEnd,
IOstreamOption::streamFormat fmt,
IOstreamOption::versionNumber ver
IOstreamOption::streamFormat fmt
)
:
UPstream(commsType),
Istream(fmt, ver),
Istream(fmt, IOstreamOption::currentVersion),
fromProcNo_(fromProcNo),
externalBuf_(externalBuf),
externalBufPosition_(externalBufPosition),
recvBuf_(receiveBuf),
recvBufPos_(receiveBufPosition),
tag_(tag),
comm_(comm),
clearAtEnd_(clearAtEnd),
@ -66,29 +65,26 @@ Foam::UIPstream::UIPstream
if (commsType == commsTypes::nonBlocking)
{
// Message is already received into externalBuf
// Message is already received into buffer
}
else
{
MPI_Status status;
label wantedSize = externalBuf_.capacity();
if (debug)
{
Pout<< "UIPstream::UIPstream : read from:" << fromProcNo
<< " tag:" << tag << " comm:" << comm_
<< " wanted size:" << wantedSize
Pout<< "UIPstream::UIPstream :"
<< " read from:" << fromProcNo
<< " tag:" << tag_ << " comm:" << comm_
<< " wanted size:" << recvBuf_.capacity()
<< Foam::endl;
}
// If the buffer size is not specified, probe the incoming message
// and set it
if (!wantedSize)
// No buffer size allocated/specified - probe size of incoming message
if (!recvBuf_.capacity())
{
profilingPstream::beginTiming();
MPI_Status status;
MPI_Probe
(
fromProcNo_,
@ -101,13 +97,12 @@ Foam::UIPstream::UIPstream
// Assume these are from gathers ...
profilingPstream::addGatherTime();
externalBuf_.setCapacity(messageSize_);
wantedSize = messageSize_;
recvBuf_.resize(messageSize_);
if (debug)
{
Pout<< "UIPstream::UIPstream : probed size:" << wantedSize
<< Foam::endl;
Pout<< "UIPstream::UIPstream : probed size:"
<< messageSize_ << Foam::endl;
}
}
@ -115,14 +110,14 @@ Foam::UIPstream::UIPstream
(
commsType,
fromProcNo_,
externalBuf_.data(),
wantedSize,
recvBuf_.data(),
recvBuf_.capacity(),
tag_,
comm_
);
// Set addressed size. Leave actual allocated memory intact.
externalBuf_.setSize(messageSize_);
recvBuf_.resize(messageSize_);
if (!messageSize_)
{
@ -135,10 +130,10 @@ Foam::UIPstream::UIPstream
Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
:
UPstream(buffers.commsType_),
Istream(buffers.format_, buffers.version_),
Istream(buffers.format_, IOstreamOption::currentVersion),
fromProcNo_(fromProcNo),
externalBuf_(buffers.recvBuf_[fromProcNo]),
externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
recvBuf_(buffers.recvBuf_[fromProcNo]),
recvBufPos_(buffers.recvBufPos_[fromProcNo]),
tag_(buffers.tag_),
comm_(buffers.comm_),
clearAtEnd_(true),
@ -162,8 +157,8 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
if (commsType() == commsTypes::nonBlocking)
{
// Message is already received into externalBuf
messageSize_ = buffers.recvBuf_[fromProcNo].size();
// Message is already received into buffer
messageSize_ = recvBuf_.size();
if (debug)
{
@ -176,25 +171,22 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
}
else
{
MPI_Status status;
label wantedSize = externalBuf_.capacity();
if (debug)
{
Pout<< "UIPstream::UIPstream PstreamBuffers :"
<< " read from:" << fromProcNo
<< " tag:" << tag_ << " comm:" << comm_
<< " wanted size:" << wantedSize
<< " wanted size:" << recvBuf_.capacity()
<< Foam::endl;
}
// If the buffer size is not specified, probe the incoming message
// and set it
if (!wantedSize)
// No buffer size allocated/specified - probe size of incoming message
if (!recvBuf_.capacity())
{
profilingPstream::beginTiming();
MPI_Status status;
MPI_Probe
(
fromProcNo_,
@ -207,13 +199,12 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
// Assume these are from gathers ...
profilingPstream::addGatherTime();
externalBuf_.setCapacity(messageSize_);
wantedSize = messageSize_;
recvBuf_.resize(messageSize_);
if (debug)
{
Pout<< "UIPstream::UIPstream PstreamBuffers : probed size:"
<< wantedSize << Foam::endl;
<< messageSize_ << Foam::endl;
}
}
@ -221,14 +212,14 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
(
commsType(),
fromProcNo_,
externalBuf_.data(),
wantedSize,
recvBuf_.data(),
recvBuf_.capacity(),
tag_,
comm_
);
// Set addressed size. Leave actual allocated memory intact.
externalBuf_.setSize(messageSize_);
recvBuf_.resize(messageSize_);
if (!messageSize_)
{
@ -271,7 +262,11 @@ Foam::label Foam::UIPstream::read
profilingPstream::beginTiming();
if (commsType == commsTypes::blocking || commsType == commsTypes::scheduled)
if
(
commsType == commsTypes::blocking
|| commsType == commsTypes::scheduled
)
{
MPI_Status status;

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2019 OpenCFD Ltd.
Copyright (C) 2019-2021 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -81,9 +81,9 @@ bool Foam::UOPstream::write
const_cast<char*>(buf),
bufSize,
MPI_BYTE,
toProcNo, //procID(toProcNo),
toProcNo,
tag,
PstreamGlobals::MPICommunicators_[communicator] //MPI_COMM_WORLD
PstreamGlobals::MPICommunicators_[communicator]
);
// Assume these are from scatters ...
@ -104,9 +104,9 @@ bool Foam::UOPstream::write
const_cast<char*>(buf),
bufSize,
MPI_BYTE,
toProcNo, //procID(toProcNo),
toProcNo,
tag,
PstreamGlobals::MPICommunicators_[communicator] //MPI_COMM_WORLD
PstreamGlobals::MPICommunicators_[communicator]
);
// Assume these are from scatters ...
@ -129,9 +129,9 @@ bool Foam::UOPstream::write
const_cast<char*>(buf),
bufSize,
MPI_BYTE,
toProcNo, //procID(toProcNo),
toProcNo,
tag,
PstreamGlobals::MPICommunicators_[communicator],//MPI_COMM_WORLD,
PstreamGlobals::MPICommunicators_[communicator],
&request
);