ENH: reduce overhead of decomposedBlockData IO
- OCharStream for serializing - skip intermediate blocks without reading - support character spans - read and distribute with direct non-blocking send/recv instead of PstreamBuffers or with IPstream/OPstream streaming operators. - non-blocking gather/write when using intermediate buffer space
This commit is contained in:
parent
f1ece719b0
commit
90d8f45298
File diff suppressed because it is too large
Load Diff
@ -135,23 +135,15 @@ protected:
|
||||
|
||||
// Protected Member Functions
|
||||
|
||||
//- Helper: determine number of processors whose recvSizes fits
|
||||
//- into maxBufferSize
|
||||
static label calcNumProcs
|
||||
(
|
||||
const label comm,
|
||||
const off_t maxBufferSize,
|
||||
const labelUList& recvSizes,
|
||||
const label startProci
|
||||
);
|
||||
|
||||
//- Read data into *this. ISstream is only valid on master.
|
||||
//- Read data (on master) and transmit.
|
||||
static bool readBlocks
|
||||
(
|
||||
const label comm,
|
||||
// [in] The input stream (only valid on master)
|
||||
autoPtr<ISstream>& isPtr,
|
||||
List<char>& contentChars,
|
||||
const UPstream::commsTypes commsType
|
||||
// [out] The processor local data
|
||||
List<char>& localData,
|
||||
const UPstream::commsTypes commsType /* unused */
|
||||
);
|
||||
|
||||
//- Helper: skip a block of (binary) character data
|
||||
@ -277,6 +269,19 @@ public:
|
||||
}
|
||||
|
||||
//- Helper: write block of (binary) character content
|
||||
// Housekeeping
|
||||
static std::streamoff writeBlockEntry
|
||||
(
|
||||
OSstream& os,
|
||||
const label blocki,
|
||||
const stdFoam::span<char>& s
|
||||
)
|
||||
{
|
||||
return writeBlockEntry(os, blocki, s.data(), s.size());
|
||||
}
|
||||
|
||||
//- Helper: write block of (binary) character content
|
||||
// Housekeeping
|
||||
static std::streamoff writeBlockEntry
|
||||
(
|
||||
OSstream& os,
|
||||
@ -307,41 +312,37 @@ public:
|
||||
);
|
||||
|
||||
//- Read master header information (into headerIO) and return
|
||||
//- data in stream. Note: isPtr is only valid on master.
|
||||
//- data in stream.
|
||||
static autoPtr<ISstream> readBlocks
|
||||
(
|
||||
const label comm,
|
||||
const fileName& fName,
|
||||
//! [in] The input stream (only valid on master)
|
||||
autoPtr<ISstream>& isPtr,
|
||||
//! [out] header information
|
||||
IOobject& headerIO,
|
||||
const UPstream::commsTypes commsType
|
||||
const UPstream::commsTypes commsType /* unused */
|
||||
);
|
||||
|
||||
//- Helper: gather single label. Note: using native Pstream.
|
||||
// datas sized with num procs but undefined contents on
|
||||
// slaves
|
||||
static void gather
|
||||
(
|
||||
const label comm,
|
||||
const label data,
|
||||
labelList& datas
|
||||
);
|
||||
|
||||
//- Helper: gather data from (subset of) slaves.
|
||||
//- Helper: gather data from (subset of) sub-ranks.
|
||||
// In non-blocking mode it sets up send/recv for non-empty content.
|
||||
// In blocking/scheduled mode it uses MPI_Gatherv to collect data.
|
||||
//
|
||||
// Returns:
|
||||
// - recvData : received data
|
||||
// - recvData : the received data
|
||||
// - recvOffsets : offset in data. recvOffsets is nProcs+1
|
||||
static void gatherSlaveData
|
||||
static void gatherProcData
|
||||
(
|
||||
const label comm,
|
||||
const UList<char>& data,
|
||||
const labelUList& recvSizes,
|
||||
const UList<char>& localData, //!< [in] required on all procs
|
||||
const labelUList& recvSizes, //!< [in] only required on master
|
||||
|
||||
const labelRange& fromProcs,
|
||||
const labelRange& whichProcs, //!< [in] required on all procs
|
||||
|
||||
List<int>& recvOffsets,
|
||||
DynamicList<char>& recvData
|
||||
List<int>& recvOffsets, //!< [out] only relevant on master
|
||||
DynamicList<char>& recvData, //!< [out] only relevant on master
|
||||
|
||||
const UPstream::commsTypes commsType
|
||||
);
|
||||
|
||||
//- Write *this. Ostream only valid on master.
|
||||
@ -349,19 +350,98 @@ public:
|
||||
static bool writeBlocks
|
||||
(
|
||||
const label comm,
|
||||
|
||||
//! [in] output stream (relevant on master)
|
||||
autoPtr<OSstream>& osPtr,
|
||||
//! [out] start offsets to each block (relevant on master),
|
||||
//! ignored if List::null() type
|
||||
List<std::streamoff>& blockOffset,
|
||||
|
||||
const UList<char>& masterData,
|
||||
const UList<char>& localData, //!< [in] required on all procs
|
||||
const labelUList& recvSizes, //!< [in] only required on master
|
||||
|
||||
const labelUList& recvSizes,
|
||||
|
||||
// Optional slave data (on master)
|
||||
const UPtrList<SubList<char>>& slaveData,
|
||||
//! Optional proc data (only written on master)
|
||||
//! but \b must also be symmetrically defined (empty/non-empty)
|
||||
//! on all ranks
|
||||
const UList<stdFoam::span<char>>& procData,
|
||||
|
||||
const UPstream::commsTypes commsType,
|
||||
const bool syncReturnState = true
|
||||
);
|
||||
|
||||
|
||||
// Housekeeping
|
||||
|
||||
//- Write *this. Ostream only valid on master.
|
||||
// Returns offsets of processor blocks in blockOffset
|
||||
FOAM_DEPRECATED_FOR(2023-09, "write with char span instead")
|
||||
static bool writeBlocks
|
||||
(
|
||||
const label comm,
|
||||
autoPtr<OSstream>& osPtr,
|
||||
List<std::streamoff>& blockOffset,
|
||||
|
||||
const UList<char>& localData, // [in] required on all procs
|
||||
const labelUList& recvSizes, // [in] only required on master
|
||||
|
||||
// Optional proc data (only written on master)
|
||||
// but \b must also be symmetrically defined (empty/non-empty)
|
||||
// on all ranks
|
||||
const UPtrList<SubList<char>>& procData,
|
||||
|
||||
const UPstream::commsTypes commsType,
|
||||
const bool syncReturnState = true
|
||||
)
|
||||
{
|
||||
// Transcribe to span<char>
|
||||
List<stdFoam::span<char>> spans(procData.size());
|
||||
forAll(procData, proci)
|
||||
{
|
||||
if (procData.test(proci))
|
||||
{
|
||||
spans[proci] = stdFoam::span<char>
|
||||
(
|
||||
const_cast<char*>(procData[proci].cdata()),
|
||||
procData[proci].size()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return decomposedBlockData::writeBlocks
|
||||
(
|
||||
comm,
|
||||
osPtr,
|
||||
blockOffset,
|
||||
localData,
|
||||
recvSizes,
|
||||
spans,
|
||||
commsType,
|
||||
syncReturnState
|
||||
);
|
||||
}
|
||||
|
||||
//- Deprecated(2023-09) - consider UPstream::listGatherValue
|
||||
// The only difference is that this gather also resizes the output
|
||||
// on the non-master procs
|
||||
// \deprecated(2023-09) - consider UPstream::listGatherValue
|
||||
FOAM_DEPRECATED_FOR(2023-09, "consider UPstream::listGatherValue()")
|
||||
static void gather
|
||||
(
|
||||
const label comm,
|
||||
const label localValue,
|
||||
labelList& allValues
|
||||
)
|
||||
{
|
||||
allValues.resize_nocopy(UPstream::nProcs(comm));
|
||||
|
||||
UPstream::mpiGather
|
||||
(
|
||||
reinterpret_cast<const char*>(&localValue),
|
||||
allValues.data_bytes(),
|
||||
sizeof(label), // The send/recv size per rank
|
||||
comm
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user