ENH: promote IFstream::readContents() to public static function

- reads file contents into a DynamicList<char>, which can then be
  broadcast, moved to a ICharStream etc.
This commit is contained in:
Mark Olesen 2024-03-06 14:09:00 +01:00
parent 7006056eae
commit e7f0628d79
5 changed files with 165 additions and 330 deletions

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2023 OpenCFD Ltd.
Copyright (C) 2023-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -29,219 +29,12 @@ Description
\*---------------------------------------------------------------------------*/
#include "argList.H"
#include "OSspecific.H" // For fileSize()
#include "Fstream.H"
#include "Pstream.H"
#include "SpanStream.H"
#include <limits>
using namespace Foam;
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
bool optUseSeek = false;
bool optVerbose = false;
// Get file contents. Usually master-only and broadcast
static List<char> slurpFile
(
const fileName& pathname,
const bool parallel = UPstream::parRun(),
const bool masterOnly = true
)
{
Info<< "slurp master-only:" << masterOnly
<< " broadcast:" << (masterOnly && parallel)
<< " seek:" << optUseSeek
<< " file: " << pathname << nl;
if (optUseSeek)
{
Info<< "Rewinding gzstream does not work..." << nl;
}
// -------------------------
List<char> buffer;
ifstreamPointer ifp;
if (UPstream::master() || !masterOnly)
{
ifp.open(pathname);
}
if (ifp && ifp->good())
{
Info<< "compressed:"
<< (IOstreamOption::COMPRESSED == ifp.whichCompression()) << nl;
#if 0
uint64_t inputSize = Foam::fileSize(pathname);
if (IOstreamOption::COMPRESSED == ifp.whichCompression())
{
ifp->ignore(std::numeric_limits<std::streamsize>::max());
const std::streamsize nread = ifp->gcount();
if (nread == std::numeric_limits<std::streamsize>::max())
{
FatalErrorInFunction
<< "Failed call to ignore()" << nl
<< exit(FatalError);
}
inputSize = ifp->gcount();
if (optUseSeek)
{
// Rewinding gzstream does not really work...
ifp->rdbuf()->pubseekpos(0, std::ios_base::in);
}
else
{
// Open it again - gzstream rewinding is unreliable...
ifp.open(pathname);
}
}
buffer.resize(label(inputSize));
ifp->read(buffer.data(), buffer.size_bytes());
const std::streamsize nread = ifp->gcount();
if (nread == std::numeric_limits<std::streamsize>::max())
{
FatalErrorInFunction
<< "Failed call to read()" << nl
<< exit(FatalError);
}
buffer.resize(label(nread)); // Extra safety (paranoid)
#else
if (IOstreamOption::COMPRESSED == ifp.whichCompression())
{
// For compressed files we do not have any idea how large
// the result will be. So read chunk-wise.
// Using the compressed size for the chunk size:
// 50% compression = 2 iterations
// 66% compression = 3 iterations
// ...
const auto inputSize = Foam::fileSize(pathname + ".gz");
const uint64_t chunkSize =
(
(inputSize <= 1024)
? uint64_t(4096)
: uint64_t(2*inputSize)
);
uint64_t beg = 0;
bool normalExit = false;
for (int iter = 1; iter < 100000; ++iter)
{
if (optVerbose)
{
Info<< "iter " << iter << nl;
Info<< "chunk " << label(chunkSize) << nl;
Info<< "size " << label(iter * chunkSize) << nl;
}
buffer.resize(label(iter * chunkSize));
ifp->read(buffer.data() + beg, chunkSize);
const std::streamsize nread = ifp->gcount();
if (optVerbose)
{
Info<< "nread: " << nread << nl;
}
if
(
nread < 0
|| nread == std::numeric_limits<std::streamsize>::max()
)
{
if (iter == 0)
{
FatalErrorInFunction
<< "Failed call to read()" << nl
<< exit(FatalError);
}
break;
}
else
{
beg += uint64_t(nread);
if (nread >= 0 && uint64_t(nread) < chunkSize)
{
normalExit = true;
if (optVerbose)
{
Info<< "stopped after "
<< iter << " iterations" << nl;
}
buffer.resize(label(beg));
break;
}
}
}
if (!normalExit)
{
FatalErrorInFunction
<< "Abnormal exit" << nl
<< exit(FatalError);
}
}
else
{
const auto inputSize = Foam::fileSize(pathname);
if (inputSize >= 0)
{
buffer.resize(label(inputSize));
ifp->read(buffer.data(), buffer.size_bytes());
const std::streamsize nread = ifp->gcount();
if
(
nread < 0
|| nread == std::numeric_limits<std::streamsize>::max()
)
{
FatalErrorInFunction
<< "Failed call to read()" << nl
<< exit(FatalError);
}
buffer.resize(label(nread)); // Extra safety (paranoid)
}
}
#endif
}
// Done with input file
ifp.reset(nullptr);
if (parallel && masterOnly)
{
// On the assumption of larger files,
// prefer two broadcasts instead of serialization
Pstream::broadcastList(buffer);
}
return buffer;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Main program:
@ -250,9 +43,8 @@ int main(int argc, char *argv[])
argList::noBanner();
argList::noFunctionObjects();
argList::noCheckProcessorDirectories();
argList::addBoolOption("seek", "seek with gzstream (fails!)");
argList::addVerboseOption("addition information");
argList::addBoolOption("seek", "seek with gzstream");
argList::addVerboseOption("additional information");
argList::addBoolOption("fail", "fail if file cannot be opened");
argList::addBoolOption("no-broadcast", "suppress broadcast contents");
argList::addNote("Test master-only reading (with broadcast)");
@ -262,8 +54,7 @@ int main(int argc, char *argv[])
#include "setRootCase.H"
const bool syncPar = (UPstream::parRun() && !args.found("no-broadcast"));
optUseSeek = args.found("seek");
optVerbose = args.verbose();
const bool optFail = args.found("fail");
auto srcName = args.get<fileName>(1);
@ -276,7 +67,33 @@ int main(int argc, char *argv[])
ICharStream is;
{
List<char> buffer(slurpFile(srcName, syncPar));
DynamicList<char> buffer;
if (UPstream::master() || !syncPar)
{
if (optFail)
{
IFstream ifs(srcName, IOstreamOption::BINARY);
if (!ifs.good())
{
FatalIOErrorInFunction(srcName)
<< "Cannot open file " << srcName
<< exit(FatalIOError);
}
buffer = IFstream::readContents(ifs);
}
else
{
buffer = IFstream::readContents(srcName);
}
}
if (syncPar)
{
// Prefer two broadcasts instead of serialize/de-serialize
Pstream::broadcastList(buffer);
}
is.swap(buffer);
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2017-2023 OpenCFD Ltd.
Copyright (C) 2017-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -37,6 +37,119 @@ namespace Foam
}
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
Foam::DynamicList<char>
Foam::IFstream::readContents(IFstream& ifs)
{
DynamicList<char> buffer;
const auto inputSize = ifs.fileSize();
if (inputSize <= 0)
{
// Nothing to read
}
else if (IOstreamOption::COMPRESSED == ifs.compression())
{
auto& iss = ifs.stdStream();
// For compressed files, no idea how large the result will be.
// So read chunk-wise.
// Using the compressed size for the chunk size:
// 50% compression = 2 iterations
// 66% compression = 3 iterations
// ...
const uint64_t chunkSize =
(
(inputSize <= 1024)
? uint64_t(4096)
: uint64_t(2*inputSize)
);
uint64_t beg = 0;
for (int iter = 1; iter < 100000; ++iter)
{
// Manual resizing to use incremental vs doubling
buffer.setCapacity(label(iter * chunkSize));
buffer.resize(buffer.capacity());
ifs.readRaw(buffer.data() + beg, chunkSize);
const std::streamsize nread = iss.gcount();
if
(
nread < 0
|| nread == std::numeric_limits<std::streamsize>::max()
)
{
// Failed, but treat as normal 'done'
buffer.resize(label(beg));
break;
}
else
{
beg += uint64_t(nread);
if (nread >= 0 && uint64_t(nread) < chunkSize)
{
// normalExit = true;
buffer.resize(label(beg));
break;
}
}
}
}
else
{
// UNCOMPRESSED
{
auto& iss = ifs.stdStream();
buffer.setCapacity(label(inputSize));
buffer.resize(buffer.capacity());
ifs.readRaw(buffer.data(), buffer.size_bytes());
const std::streamsize nread = iss.gcount();
if
(
nread < 0
|| nread == std::numeric_limits<std::streamsize>::max()
)
{
// Failed, but treat as normal 'done'
buffer.clear();
}
else
{
buffer.resize(label(nread)); // Safety
}
}
}
return buffer;
}
Foam::DynamicList<char>
Foam::IFstream::readContents(const fileName& pathname)
{
if (!pathname.empty())
{
IFstream ifs(pathname, IOstreamOption::BINARY);
if (ifs.good())
{
return readContents(ifs);
}
}
return DynamicList<char>();
}
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::IFstream::IFstream
@ -197,7 +310,7 @@ Foam::IFstream& Foam::IFstream::operator()() const
else
{
FatalIOErrorInFunction(*this)
<< "file " << this->name() << " does not exist"
<< "File " << this->name() << " does not exist"
<< exit(FatalIOError);
}
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011 OpenFOAM Foundation
Copyright (C) 2017-2023 OpenCFD Ltd.
Copyright (C) 2017-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -41,6 +41,7 @@ SourceFiles
#include "ISstream.H"
#include "className.H"
#include "fstreamPointer.H"
#include "DynamicList.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -54,7 +55,7 @@ namespace Foam
class IFstream
:
private Foam::ifstreamPointer,
public ISstream
public Foam::ISstream
{
public:
@ -86,6 +87,16 @@ public:
~IFstream() = default;
// Static Functions
//- Get file contents from specified file (compressed/uncompressed).
//- Returns an empty list if the file cannot be opened.
static DynamicList<char> readContents(const fileName& pathname);
//- Get file contents from IFstream (assumed to be rewound)
static DynamicList<char> readContents(IFstream& ifs);
// Member Functions
//- Get character(s)

View File

@ -60,9 +60,6 @@ bool Foam::ofstreamPointer::supports_gz() noexcept
}
// Future: List<char> slurpFile(....);
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::ifstreamPointer::ifstreamPointer

View File

@ -83,101 +83,6 @@ namespace fileOperations
}
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
namespace Foam
{
// Get file contents (compressed or uncompressed)
static DynamicList<char> slurpFile(IFstream& ifs)
{
DynamicList<char> buffer;
auto& iss = ifs.stdStream();
const auto inputSize = ifs.fileSize();
if (IOstreamOption::COMPRESSED == ifs.compression())
{
// For compressed files, no idea how large the result will be.
// So read chunk-wise.
// Using the compressed size for the chunk size:
// 50% compression = 2 iterations
// 66% compression = 3 iterations
// ...
const uint64_t chunkSize =
(
(inputSize <= 1024)
? uint64_t(4096)
: uint64_t(2*inputSize)
);
uint64_t beg = 0;
for (int iter = 1; iter < 100000; ++iter)
{
// Manual resizing to use incremental vs doubling
buffer.setCapacity(label(iter * chunkSize));
buffer.resize(buffer.capacity());
ifs.readRaw(buffer.data() + beg, chunkSize);
const std::streamsize nread = iss.gcount();
if
(
nread < 0
|| nread == std::numeric_limits<std::streamsize>::max()
)
{
// Failed, but treat as normal 'done'
buffer.resize(label(beg));
break;
}
else
{
beg += uint64_t(nread);
if (nread >= 0 && uint64_t(nread) < chunkSize)
{
// normalExit = true;
buffer.resize(label(beg));
break;
}
}
}
}
else
{
if (inputSize >= 0)
{
buffer.setCapacity(label(inputSize));
buffer.resize(buffer.capacity());
ifs.readRaw(buffer.data(), buffer.size_bytes());
const std::streamsize nread = iss.gcount();
if
(
nread < 0
|| nread == std::numeric_limits<std::streamsize>::max()
)
{
// Failed, but treat as normal 'done'
buffer.clear();
}
else
{
buffer.resize(label(nread)); // Safety
}
}
}
return buffer;
}
} // End namespace Foam
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
Foam::word
@ -535,33 +440,25 @@ void Foam::fileOperations::masterUncollatedFileOperation::readAndSend
{
FatalIOErrorInFunction(filePath)
<< "Cannot open file " << filePath
//<< " using communicator " << pBufs.comm()
//<< " ioRanks:" << UPstream::procID(pBufs.comm())
<< exit(FatalIOError);
}
// Read file contents (compressed or uncompressed) into a character buffer
DynamicList<char> buf(IFstream::readContents(ifs));
if (debug)
{
Info<< "masterUncollatedFileOperation::readAndSend :"
<< " compressed:" << bool(ifs.compression()) << " "
<< filePath << endl;
<< filePath
<< " (compressed:" << bool(ifs.compression())
<< ") : " << " bytes" << endl;
}
// Read file contents (compressed or uncompressed) into a character buffer
DynamicList<char> buf(slurpFile(ifs));
for (const label proci : recvProcs)
{
UOPstream os(proci, pBufs);
os.write(buf.cdata_bytes(), buf.size_bytes());
}
if (debug)
{
Info<< "masterUncollatedFileOperation::readStream :"
<< " From " << filePath << " sent " << buf.size()
<< " bytes" << endl;
}
}