ENH: clearer separation of MPI initialize/finalize stages (#2774)

ENH: support transfer from a wrapped MPI request to global list

- allows coding with a list UPstream::Request and subsequently either
  retain that list or transfer into the global list.
This commit is contained in:
Mark Olesen 2023-05-05 11:16:13 +02:00
parent 9d2ae3da67
commit 639b800049
9 changed files with 165 additions and 75 deletions

View File

@ -112,13 +112,20 @@ int main(int argc, char *argv[])
argList::addBoolOption("iter");
argList::addBoolOption("swap");
argList::addBoolOption("default", "reinstate default tests");
argList::addBoolOption("no-wait", "test with skipping request waits");
argList::addNote("runs default tests or specified ones only");
#include "setRootCase.H"
const bool optNowaiting = args.found("no-wait");
// Run default tests, unless only specific tests are requested
const bool defaultTests =
args.found("default") || args.options().empty();
(
args.found("default")
|| args.options().empty()
|| (optNowaiting && args.options().size())
);
typedef FixedList<scalar,2> scalar2Type;
@ -307,27 +314,53 @@ int main(int argc, char *argv[])
List<FixedList<label, 2>> list6{{0, 1}, {2, 3}};
Info<< "list6: " << list6 << nl;
if (Pstream::parRun())
if (UPstream::parRun())
{
// Fixed buffer would also work, but want to test using UList
List<labelPair> buffer;
DynamicList<UPstream::Request> requests;
const label numProcs = UPstream::nProcs();
const label startOfRequests = UPstream::nRequests();
if (Pstream::master())
// NOTE: also test a mix of local and global requests...
UPstream::Request singleRequest;
if (UPstream::master())
{
buffer.resize(UPstream::nProcs());
// Use local requests here
requests.reserve(numProcs);
buffer.resize(numProcs);
buffer[0] = labelPair(0, UPstream::myProcNo());
for (const int proci : Pstream::subProcs())
for (const int proci : UPstream::subProcs())
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
requests.emplace_back(),
proci,
buffer.slice(proci, 1)
);
}
if (requests.size() > 1)
{
// Or just wait for as a single request...
singleRequest = requests.back();
requests.pop_back();
}
if (requests.size() > 2)
{
// Peel off a few from local -> global
// the order will not matter (is MPI_Waitall)
UPstream::addRequest(requests.back()); requests.pop_back();
UPstream::addRequest(requests.back()); requests.pop_back();
}
}
else
{
@ -336,16 +369,34 @@ int main(int argc, char *argv[])
Perr<< "Sending to master: " << buffer << endl;
// Capture the request and transfer to the global list
// (for testing)
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
singleRequest,
UPstream::masterNo(),
buffer.slice(0, 1) // OK
/// buffer // Also OK
);
// if (singleRequest.good())
{
UPstream::addRequest(singleRequest);
}
}
UPstream::waitRequests(startOfRequests);
Pout<< "Pending requests [" << numProcs << " procs] global="
<< (UPstream::nRequests() - startOfRequests)
<< " local=" << requests.size()
<< " single=" << singleRequest.good() << nl;
if (!optNowaiting)
{
UPstream::waitRequests(startOfRequests);
}
UPstream::waitRequests(requests);
UPstream::waitRequest(singleRequest);
Info<< "Gathered: " << buffer << endl;
}

View File

@ -543,7 +543,11 @@ public:
);
// Requests (non-blocking comms)
// Requests (non-blocking comms).
// Pending requests are usually handled as an internal (global) list,
// since this simplifies the overall tracking and provides a convenient
// wrapping to avoid exposing MPI-specific types, but can also handle
// with a wrapped UPstream::Request as well.
//- Number of outstanding requests (on the internal list of requests)
static label nRequests() noexcept;
@ -553,6 +557,10 @@ public:
// A no-op for out-of-range values.
static void resetRequests(const label n);
//- Transfer the (wrapped) MPI request to the internal global list.
// A no-op for non-parallel. No special treatment for null requests.
static void addRequest(UPstream::Request& req);
//- Wait until all requests (from position onwards) have finished.
//- Corresponds to MPI_Waitall()
// A no-op if parRun() == false,

View File

@ -79,11 +79,11 @@ namespace Foam
error::printStack(Perr);
std::abort();
}
else if (Pstream::parRun())
else if (UPstream::parRun())
{
Perr<< nl << err << nl
<< "\nFOAM parallel run exiting\n" << endl;
Pstream::exit(1);
UPstream::exit(1);
}
else
{

View File

@ -958,7 +958,7 @@ Foam::argList::argList
<< " -help-full' for extended usage" << nl
<< nl;
Pstream::exit(1); // works for serial and parallel
UPstream::exit(1); // works for serial and parallel
}
commandLine_ += ' ';
@ -1139,7 +1139,7 @@ void Foam::argList::parse
foamVersion::printBuildInfo(Info.stdStream(), false);
FatalError.write(Info, false);
Pstream::exit(1); // works for serial and parallel
UPstream::exit(1); // works for serial and parallel
}
if (initialise)

View File

@ -78,7 +78,7 @@ Foam::fileOperations::fileOperationInitialise::fileOperationInitialise
<< "Error: option '-ioRanks' requires a list of"
" IO ranks as argument" << nl << nl;
//Pstream::exit(1); // works for serial and parallel
//UPstream::exit(1); // works for serial and parallel
}
}
}

View File

@ -53,6 +53,8 @@ Foam::label Foam::UPstream::nRequests() noexcept { return 0; }
void Foam::UPstream::resetRequests(const label n) {}
void Foam::UPstream::addRequest(UPstream::Request& req) {}
void Foam::UPstream::waitRequests(const label pos, label len) {}
void Foam::UPstream::waitRequests(UList<UPstream::Request>&) {}

View File

@ -37,15 +37,16 @@ License
#include <cstring>
#include <cstdlib>
#include <csignal>
#include <memory>
#include <numeric>
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// The min value and default for MPI buffers length
// The min value and default for MPI buffer length
constexpr int minBufLen = 20000000;
// Track if we have attached MPI buffers
static bool ourBuffers = false;
// Track size of attached MPI buffer
static int attachedBufLen = 0;
// Track if we initialized MPI
static bool ourMpi = false;
@ -53,18 +54,18 @@ static bool ourMpi = false;
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
// Attach user-defined send buffer
static void attachOurBuffers()
{
if (ourBuffers)
#ifndef SGIMPI
if (attachedBufLen)
{
return; // Already attached
}
ourBuffers = true;
// Use UPstream::mpiBufferSize (optimisationSwitch),
// but allow override with MPI_BUFFER_SIZE env variable (int value)
#ifndef SGIMPI
int len = 0;
const std::string str(Foam::getEnv("MPI_BUFFER_SIZE"));
@ -78,14 +79,19 @@ static void attachOurBuffers()
len = minBufLen;
}
if (Foam::UPstream::debug)
{
Foam::Pout<< "UPstream::init : buffer-size " << len << '\n';
}
char* buf = new char[len];
if (MPI_SUCCESS != MPI_Buffer_attach(buf, len))
if (MPI_SUCCESS == MPI_Buffer_attach(buf, len))
{
// Properly attached
attachedBufLen = len;
if (Foam::UPstream::debug)
{
Foam::Pout<< "UPstream::init : buffer-size " << len << '\n';
}
}
else
{
delete[] buf;
Foam::Pout<< "UPstream::init : could not attach buffer\n";
@ -94,26 +100,31 @@ static void attachOurBuffers()
}
// Remove an existing user-defined send buffer
static void detachOurBuffers()
{
if (!ourBuffers)
#ifndef SGIMPI
if (!attachedBufLen)
{
return; // Nothing to detach
}
ourBuffers = false;
// Some MPI notes suggest that the return code is MPI_SUCCESS when
// no buffer is attached.
// Be extra careful and require a non-zero size as well.
#ifndef SGIMPI
int len = 0;
char* buf = nullptr;
int len = 0;
if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
{
// This was presumably the buffer that we attached
// and not someone else.
delete[] buf;
}
// Nothing attached
attachedBufLen = 0;
#endif
}
@ -390,41 +401,49 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
void Foam::UPstream::shutdown(int errNo)
{
if (debug)
{
Pout<< "UPstream::shutdown\n";
}
int flag = 0;
MPI_Initialized(&flag);
if (!flag)
{
// No MPI initialized - we are done
// MPI not initialized - we have nothing to do
return;
}
MPI_Finalized(&flag);
if (flag)
{
// Already finalized elsewhere?
// MPI already finalized - we have nothing to do
if (ourMpi)
{
WarningInFunction
<< "MPI was already finalized (by a connected program?)\n";
}
else if (debug)
else if (debug && errNo == 0)
{
Pout<< "UPstream::shutdown : was already finalized\n";
}
ourMpi = false;
return;
}
else
if (!ourMpi)
{
detachOurBuffers();
WarningInFunction
<< "Finalizing MPI, but was initialized elsewhere\n";
}
ourMpi = false;
// Regular cleanup
// ---------------
if (debug)
{
Pout<< "UPstream::shutdown\n";
}
// Warn about any outstanding requests
// Check for any outstanding requests
{
label nOutstanding = 0;
@ -432,49 +451,40 @@ void Foam::UPstream::shutdown(int errNo)
{
if (MPI_REQUEST_NULL != request)
{
// TBD: MPI_Cancel(&request); MPI_Request_free(&request);
++nOutstanding;
}
}
PstreamGlobals::outstandingRequests_.clear();
if (nOutstanding)
{
WarningInFunction
<< "There were still " << nOutstanding
<< " outstanding MPI requests." << nl
<< "Which means your code exited before doing a "
<< " UPstream::waitRequests()." << nl
<< "This should not happen for a normal code exit."
<< nl;
<< "Still have " << nOutstanding
<< " outstanding MPI requests."
<< " Should not happen for a normal code exit."
<< endl;
}
PstreamGlobals::outstandingRequests_.clear();
}
// TBD: skip these for errNo != 0 ?
{
detachOurBuffers();
forAllReverse(myProcNo_, communicator)
{
freeCommunicatorComponents(communicator);
}
}
// Clean mpi communicators
forAllReverse(myProcNo_, communicator)
if (errNo == 0)
{
freeCommunicatorComponents(communicator);
MPI_Finalize();
}
if (!flag)
else
{
// MPI not already finalized
if (!ourMpi)
{
WarningInFunction
<< "Finalizing MPI, but was initialized elsewhere\n";
}
if (errNo == 0)
{
MPI_Finalize();
}
else
{
// Abort only locally or world?
MPI_Abort(MPI_COMM_WORLD, errNo);
}
MPI_Abort(MPI_COMM_WORLD, errNo);
}
}

View File

@ -69,6 +69,25 @@ void Foam::UPstream::resetRequests(const label n)
}
void Foam::UPstream::addRequest(UPstream::Request& req)
{
// No-op for non-parallel
if (!UPstream::parRun())
{
return;
}
// Transcribe as a MPI_Request
PstreamGlobals::outstandingRequests_.push_back
(
PstreamDetail::Request::get(req)
);
// Invalidate parameter
req = UPstream::Request(MPI_REQUEST_NULL);
}
void Foam::UPstream::waitRequests(const label pos, label len)
{
// No-op for non-parallel, no pending requests or out-of-range
@ -128,9 +147,9 @@ void Foam::UPstream::waitRequests(const label pos, label len)
profilingPstream::addWaitTime();
// ie, resetRequests(pos)
if (trim)
{
// Trim the length of outstanding requests
PstreamGlobals::outstandingRequests_.resize(pos);
}

View File

@ -3897,10 +3897,10 @@ Foam::snappyLayerDriver::makeMeshMover
<< errorMsg.c_str() << nl << nl
<< "Exiting dry-run" << nl << endl;
if (Pstream::parRun())
if (UPstream::parRun())
{
Perr<< "\nFOAM parallel run exiting\n" << endl;
Pstream::exit(0);
UPstream::exit(0);
}
else
{