ENH: reduce overhead of masterOFstream

- use OCharStream instead of OStringStream to avoid copying char data.

- direct non-blocking send/recv with probing instead of PstreamBuffers
  to avoid serialization/de-serialization of char data and reduce the
  memory footprint somewhat.

- polling dispatch to write file content as it becomes available,
  which should improve communication and IO overlap
This commit is contained in:
Mark Olesen 2023-09-05 18:23:58 +02:00
parent dce009cef1
commit f1ece719b0
2 changed files with 128 additions and 61 deletions

View File

@ -29,7 +29,7 @@ License
#include "masterOFstream.H"
#include "OFstream.H"
#include "OSspecific.H"
#include "PstreamBuffers.H"
#include "Pstream.H"
#include "masterUncollatedFileOperation.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
@ -38,10 +38,10 @@ void Foam::masterOFstream::checkWrite
(
const fileName& fName,
const char* str,
std::streamsize len
const std::streamsize len
)
{
if (!len)
if (!str || !len)
{
// Can probably skip all of this if there is nothing to write
return;
@ -63,9 +63,7 @@ void Foam::masterOFstream::checkWrite
<< exit(FatalIOError);
}
// Use writeRaw() instead of writeQuoted(string,false) to output
// characters directly.
// Write characters directly to std::ostream
os.writeRaw(str, len);
if (!os.good())
@ -77,97 +75,159 @@ void Foam::masterOFstream::checkWrite
}
void Foam::masterOFstream::checkWrite
(
const fileName& fName,
const std::string& s
)
{
checkWrite(fName, s.data(), s.length());
}
void Foam::masterOFstream::commit()
{
// Take ownership of serialized content, without copying or reallocation
DynamicList<char> charData(OCharStream::release());
if (UPstream::parRun())
{
// Ignore content if not writing (reduces communication)
if (!writeOnProc_)
{
charData.clear();
}
List<fileName> filePaths(UPstream::nProcs(comm_));
filePaths[UPstream::myProcNo(comm_)] = pathName_;
Pstream::gatherList(filePaths, UPstream::msgType(), comm_);
// Test for identical output paths
bool uniform =
(
UPstream::master(comm_)
&& fileOperation::uniformFile(filePaths)
? fileOperation::uniformFile(filePaths)
: true
);
Pstream::broadcast(uniform, comm_);
if (uniform)
{
// Identical file paths - write on master
if (UPstream::master(comm_) && writeOnProc_)
{
checkWrite(pathName_, this->str());
checkWrite(pathName_, charData);
}
this->reset();
return;
}
// Different files
PstreamBuffers pBufs(comm_);
// ---------------
// Current strategy is to setup all non-blocking send/recv
// using the probed message size to establish the recv size
// (to avoid an additional communication of the sizes).
//
// For ranks with writeOnProc=false, the message size is 0.
if (!UPstream::master(comm_))
// An alternative approach would be to gather recv sizes
// to avoid zero-sized messages and/or use double buffering
// to recv into a buffer and write.
//
// const labelList recvSizes
// (
// UPstream::listGatherValues<label>
// (
// (UPstream::is_subrank(comm_) ? charData.size() : label(0)),
// comm_
// )
// );
const label startOfRequests = UPstream::nRequests();
// Some unique tag for this read/write/probe grouping
const int messageTag = UPstream::msgType() + 256;
if (UPstream::is_subrank(comm_))
{
if (writeOnProc_)
{
// Send buffer to master
string s(this->str());
UOPstream os(UPstream::masterNo(), pBufs);
os.write(s.data(), s.length());
}
this->reset(); // Done with contents
// Send to master. When (!writeOnProc_) it is zero-sized.
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
UPstream::masterNo(),
charData.cdata_bytes(),
charData.size_bytes(),
messageTag,
comm_
);
}
pBufs.finishedGathers();
if (UPstream::master(comm_))
else if (UPstream::master(comm_))
{
// The receive slots
List<List<char>> procBuffers(UPstream::nProcs(comm_));
const auto recvProcs = UPstream::subProcs(comm_);
for (const int proci : recvProcs)
{
auto& procSlice = procBuffers[proci];
// Probe the message size
std::pair<int, int64_t> probed =
UPstream::probeMessage
(
UPstream::commsTypes::scheduled, // blocking call
proci,
messageTag,
comm_
);
procSlice.resize_nocopy(probed.second);
// Receive content (can also be zero-sized)
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
procSlice.data_bytes(),
procSlice.size_bytes(),
messageTag,
comm_
);
}
if (writeOnProc_)
{
// Write master data
checkWrite(filePaths[UPstream::masterNo()], this->str());
// Write non-empty master data
checkWrite(pathName_, charData);
}
this->reset(); // Done with contents
// Allocate large enough to read without resizing
List<char> buf(pBufs.maxRecvCount());
for (const int proci : UPstream::subProcs(comm_))
// Poll for completed receive requests and dispatch
DynamicList<int> indices(recvProcs.size());
while
(
UPstream::waitSomeRequests
(
startOfRequests,
recvProcs.size(),
&indices
)
)
{
const std::streamsize count(pBufs.recvDataCount(proci));
if (count)
for (const int idx : indices)
{
UIPstream is(proci, pBufs);
const int proci = recvProcs[idx];
auto& procSlice = procBuffers[proci];
is.read(buf.data(), count);
checkWrite(filePaths[proci], buf.cdata(), count);
if (!procSlice.empty())
{
// Write non-empty sub-proc data
checkWrite(filePaths[proci], procSlice);
}
// Eager cleanup?
// TBD: procSlice.clear();
}
}
}
UPstream::waitRequests(startOfRequests);
}
else
{
checkWrite(pathName_, this->str());
this->reset();
// Write (non-empty) data
checkWrite(pathName_, charData);
}
// This method is only called once (internally)
// so no need to clear/flush old buffered data
}
@ -183,7 +243,7 @@ Foam::masterOFstream::masterOFstream
const bool writeOnProc
)
:
OStringStream(streamOpt),
OCharStream(streamOpt),
pathName_(pathName),
atomic_(atomic),
compression_(streamOpt.compression()),

View File

@ -41,7 +41,7 @@ SourceFiles
#ifndef Foam_masterOFstream_H
#define Foam_masterOFstream_H
#include "StringStream.H"
#include "SpanStream.H"
#include "UPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -55,7 +55,7 @@ namespace Foam
class masterOFstream
:
public OStringStream
public OCharStream
{
// Private Data
@ -85,13 +85,20 @@ class masterOFstream
(
const fileName& fName,
const char* str,
std::streamsize len
const std::streamsize len
);
//- Open file with checking and write append contents
void checkWrite(const fileName& fName, const std::string& s);
void checkWrite
(
const fileName& fName,
const UList<char>& charData
)
{
checkWrite(fName, charData.cdata(), charData.size_bytes());
}
//- Commit buffered information, including parallel gather as required
//- Commit buffered information, including communication as required
void commit();