ENH: add slice/range support to UPstream::waitSomeRequests()

- this simplifies polling receives and allows separation from
  the sends

ENH: add UPstream::removeRequests(pos, len)

- cancel/free of outstanding requests and remove segment from the
  internal list of outstanding requests
This commit is contained in:
Mark Olesen 2023-07-26 09:05:34 +02:00
parent f717f79833
commit 32903d337e
8 changed files with 525 additions and 21 deletions

View File

@ -0,0 +1,3 @@
Test-parallel-waitSome.C
EXE = $(FOAM_USER_APPBIN)/Test-parallel-waitSome

View File

@ -0,0 +1,2 @@
/* EXE_INC = */
/* EXE_LIBS = */

View File

@ -0,0 +1,328 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application
Test-parallel-waitSome
Description
Test polling versus wait-all for processing receive data.
Will not see much difference between -wait-all and -no-polling though
since the master doesn't have enough other work.
\*---------------------------------------------------------------------------*/
#include "List.H"
#include "argList.H"
#include "Time.H"
#include "IPstream.H"
#include "OPstream.H"
#include "IOstreams.H"
#include "Switch.H"
#include "clockTime.H"
using namespace Foam;
// The 'classic' waiting receive, but also only waiting for recv request
template<class Type>
void waitingReceive
(
const labelRange& recvRequests,
const List<List<Type>>& recvBuffers,
const bool waitAll = false
)
{
clockTime waitTiming;
if (waitAll)
{
// Wait for send and recv (assumes recv followed by send)
UPstream::waitRequests(recvRequests.start(), -1);
}
else
{
// Wait for receives only
UPstream::waitRequests(recvRequests.start(), recvRequests.size());
}
double waited = waitTiming.timeIncrement();
if (waited > 1e-3)
{
Pout<< "waited: " << waited << " before processing" << endl;
}
forAll(recvBuffers, proci)
{
const auto& slice = recvBuffers[proci];
if (!slice.empty())
{
// Process data from proci
Pout<< "proc:" << proci
<< ' ' << flatOutput(slice) << nl;
}
}
}
// Polling receive
template<class Type>
void pollingReceive
(
const labelRange& recvRequests,
const UList<int>& recvProcs,
const List<List<Type>>& recvBuffers
)
{
clockTime waitTiming;
DynamicList<int> indices(recvRequests.size());
if (!recvRequests.empty()) Pout<< "..." << endl;
for
(
label loop = 0;
UPstream::waitSomeRequests
(
recvRequests.start(),
recvRequests.size(),
&indices
);
++loop
)
{
double waited = waitTiming.timeIncrement();
if (waited <= 1e-3)
{
waited = 0;
}
Pout<< "loop:" << loop
<< " waited: " << waited
<< " before processing" << endl;
for (const int idx : indices)
{
const int proci = recvProcs[idx];
const auto& slice = recvBuffers[proci];
// Process data from proci
Pout<< "loop:" << loop << " polled:" << indices.size()
<< " proc:" << proci
<< ' ' << flatOutput(slice) << endl;
}
Pout<< "..." << endl;
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
int main(int argc, char *argv[])
{
argList::noCheckProcessorDirectories();
argList::addVerboseOption("timings etc");
argList::addBoolOption("no-polling", "wait all instead of polling");
argList::addBoolOption("wait-all", "wait all instead of polling");
argList::addOption("sleep", "s", "change sleep (default: 5)");
argList::noCheckProcessorDirectories();
const label transferSize = 10;
label sleepSeconds = 5;
#include "setRootCase.H"
args.readIfPresent("sleep", sleepSeconds);
const bool waitAll = args.found("wait-all");
const bool nonPolling = args.found("no-polling");
if (!Pstream::parRun())
{
Info<< "\nWarning: not parallel - skipping further tests\n" << endl;
return 0;
}
Info<< "Calling with sleep=" << sleepSeconds
<< ", polling=" << Switch::name(!nonPolling)
<< ", wait-all=" << Switch::name(waitAll) << nl;
labelList sendBuffer;
List<labelList> recvBuffers;
if (UPstream::master())
{
recvBuffers.resize(UPstream::nProcs());
}
else
{
recvBuffers.resize(1);
}
clockTime timing;
const label startOfRequests = UPstream::nRequests();
// Setup receives
labelRange recvRequests(UPstream::nRequests(), 0);
DynamicList<int> recvProcs(UPstream::nProcs());
if (UPstream::master())
{
for (const int proci : UPstream::subProcs())
{
// The rank corresponding to the request
recvProcs.push_back(proci);
auto& slice = recvBuffers[proci];
slice.resize_nocopy(transferSize);
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
slice
);
}
}
else
{
const int proci = UPstream::masterNo();
if ((UPstream::myProcNo() % 2) == 0)
{
recvProcs.push_back(proci);
auto& slice = recvBuffers[proci];
slice.resize_nocopy(transferSize);
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
slice
);
}
}
// OR: recvRequests.size() = (UPstream::nRequests() - recvRequests.start());
recvRequests += recvProcs.size();
labelList overallRecvRequests
(
UPstream::listGatherValues<label>(recvRequests.size())
);
Info<< "Number of recv requests: "
<< flatOutput(overallRecvRequests) << nl << nl;
// Setup sends
sendBuffer.resize_nocopy(transferSize);
sendBuffer = UPstream::myProcNo();
const auto startBufferSend = [&]() -> void
{
if (sleepSeconds > 0)
{
// Dispatch some immediately, others with a delay
if ((UPstream::myProcNo() % 2) == 0)
{
sleep(sleepSeconds);
}
else if ((UPstream::myProcNo() % 3) == 0)
{
sleep(1.5*sleepSeconds);
}
}
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
UPstream::masterNo(),
sendBuffer
);
};
if (UPstream::master())
{
for (const int proci : UPstream::subProcs())
{
if ((UPstream::myProcNo() % 2) == 0)
{
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
proci,
sendBuffer
);
}
}
}
else if (waitAll)
{
startBufferSend();
}
// Some skulduggery to get a differential in timings...
const int nloops = (UPstream::master() ? 1 : 2);
for (int loopi = 0; loopi < nloops; ++loopi)
{
if (waitAll || nonPolling)
{
waitingReceive(recvRequests, recvBuffers, waitAll);
}
else
{
pollingReceive(recvRequests, recvProcs, recvBuffers);
}
// Timing for processing all the receives
if (args.verbose())
{
Pout<< "receive: " << timing.timeIncrement() << 's' << endl;
}
if (!UPstream::master() && loopi == 0 && !waitAll)
{
startBufferSend();
}
}
if (args.verbose())
{
Pout<< "timing: " << timing.elapsedTime() << 's' << endl;
}
// Final
UPstream::waitRequests(startOfRequests);
Info<< "End\n" << endl;
return 0;
}
// ************************************************************************* //

View File

@ -604,6 +604,17 @@ public:
// A no-op if parRun() == false or list is empty
static void cancelRequests(UList<UPstream::Request>& requests);
//- Non-blocking comms: cancel/free outstanding requests
//- (from position onwards) and remove from internal list of requests.
//- Corresponds to MPI_Cancel() + MPI_Request_free()
// A no-op if parRun() == false,
// if the position is out-of-range [0 to nRequests()],
// or the internal list of requests is empty.
//
// \param pos starting position within the internal list of requests
// \param len length of slice to remove (negative = until the end)
static void removeRequests(const label pos, label len = -1);
//- Non-blocking comms: free outstanding request.
//- Corresponds to MPI_Request_free()
// A no-op if parRun() == false
@ -656,13 +667,32 @@ public:
// \returns false if all requests have already been handled
//
// \param pos starting position within the internal list of requests
// \param len length of slice to check (negative = until the end)
// \param[out] indices the completed request indices relative to the
// starting position. The is an optional parameter, which can be
// starting position. This is an optional parameter that can be
// used to recover the indices or simply to avoid reallocations
// when calling within a loop.
static bool waitSomeRequests
(
const label pos,
label len = -1,
DynamicList<int>* indices = nullptr
);
//- Wait until some requests have finished.
//- Corresponds to MPI_Waitsome()
// A no-op and returns false if parRun() == false,
// the list is empty,
// or if all the requests have already been handled.
//
// \param requests the requests
// \param[out] indices the completed request indices relative to the
// starting position. This is an optional parameter that can be
// used to recover the indices or simply to avoid reallocations
// when calling within a loop.
static bool waitSomeRequests
(
UList<UPstream::Request>& requests,
DynamicList<int>* indices = nullptr
);

View File

@ -134,7 +134,7 @@ void Foam::LduMatrix<Type, DType, LUType>::updateMatrixInterfaces
bool pollingActive = (UPstream::nPollProcInterfaces < 0);
(
pollingActive
&& UPstream::waitSomeRequests(startRequest, &indices)
&& UPstream::waitSomeRequests(startRequest, -1, &indices)
);
/*nil*/
)

View File

@ -144,7 +144,7 @@ void Foam::lduMatrix::updateMatrixInterfaces
bool pollingActive = (UPstream::nPollProcInterfaces < 0);
(
pollingActive
&& UPstream::waitSomeRequests(startRequest, &indices)
&& UPstream::waitSomeRequests(startRequest, -1, &indices)
);
/*nil*/
)

View File

@ -59,6 +59,8 @@ void Foam::UPstream::cancelRequest(const label i) {}
void Foam::UPstream::cancelRequest(UPstream::Request&) {}
void Foam::UPstream::cancelRequests(UList<UPstream::Request>&) {}
void Foam::UPstream::removeRequests(const label pos, label len) {}
void Foam::UPstream::freeRequest(UPstream::Request&) {}
void Foam::UPstream::freeRequests(UList<UPstream::Request>&) {}
@ -73,6 +75,17 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len)
bool Foam::UPstream::waitSomeRequests
(
const label pos,
label len,
DynamicList<int>* indices
)
{
if (indices) indices->clear();
return false;
}
bool Foam::UPstream::waitSomeRequests
(
UList<UPstream::Request>& requests,
DynamicList<int>* indices
)
{

View File

@ -153,6 +153,47 @@ void Foam::UPstream::cancelRequests(UList<UPstream::Request>& requests)
}
void Foam::UPstream::removeRequests(const label pos, label len)
{
// No-op for non-parallel, no pending requests or out-of-range
if
(
!UPstream::parRun()
|| (pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size())
|| !len
)
{
return;
}
label count = (PstreamGlobals::outstandingRequests_.size() - pos);
// Apply range-checking on slice with (len < 0) behaving like npos
// (ie, the rest of the list)
if (len >= 0 && len < count)
{
// A non-trailing slice
count = len;
}
// Have count >= 1
const labelRange range(pos, count);
for (const label i : range)
{
auto& request = PstreamGlobals::outstandingRequests_[i];
if (MPI_REQUEST_NULL != request) // Active handle is mandatory
{
MPI_Cancel(&request);
MPI_Request_free(&request); //<- Sets to MPI_REQUEST_NULL
}
}
// Remove from list of outstanding requests and move down
PstreamGlobals::outstandingRequests_.remove(range);
}
void Foam::UPstream::freeRequest(UPstream::Request& req)
{
// No-op for non-parallel
@ -214,7 +255,7 @@ void Foam::UPstream::waitRequests(const label pos, label len)
}
label count = (PstreamGlobals::outstandingRequests_.size() - pos);
bool trim = true; // Trim the trailing part of the list
bool trim = true; // Can trim the trailing part of the list
// Apply range-checking on slice with (len < 0) behaving like npos
// (ie, the rest of the list)
@ -348,7 +389,7 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len)
if (UPstream::debug)
{
Pout<< "UPstream::waitAnyRequest : starting wait for some of "
Pout<< "UPstream::waitAnyRequest : starting wait for any of "
<< count << " requests starting at " << pos << endl;
}
@ -378,6 +419,7 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len)
bool Foam::UPstream::waitSomeRequests
(
const label pos,
label len,
DynamicList<int>* indices
)
{
@ -386,13 +428,10 @@ bool Foam::UPstream::waitSomeRequests
(
!UPstream::parRun()
|| (pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size())
// || !len
|| !len
)
{
if (indices)
{
indices->clear();
}
if (indices) indices->clear();
return false;
}
@ -400,25 +439,24 @@ bool Foam::UPstream::waitSomeRequests
// Apply range-checking on slice with (len < 0) behaving like npos
// (ie, the rest of the list)
// if (len >= 0 && len < count)
// {
// // A non-trailing slice
// count = len;
// }
if (len >= 0 && len < count)
{
// A non-trailing slice
count = len;
}
// Have count >= 1
auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
if (UPstream::debug)
{
Pout<< "UPstream:waitSomeRequest : starting wait for any of "
Pout<< "UPstream:waitSomeRequest : starting wait for some of "
<< count << " requests starting at " << pos << endl;
}
// Local temporary storage, or return via calling parameter
List<int> tmpIndices;
if (indices)
{
indices->resize_nocopy(count);
@ -454,10 +492,7 @@ bool Foam::UPstream::waitSomeRequests
if (outcount == MPI_UNDEFINED || outcount < 1)
{
// No active request handles
if (indices)
{
indices->clear();
}
if (indices) indices->clear();
return false;
}
@ -470,6 +505,99 @@ bool Foam::UPstream::waitSomeRequests
}
bool Foam::UPstream::waitSomeRequests
(
UList<UPstream::Request>& requests,
DynamicList<int>* indices
)
{
// No-op for non-parallel or no pending requests
if (!UPstream::parRun() || requests.empty())
{
if (indices) indices->clear();
return false;
}
// Looks ugly but is legitimate since UPstream::Request is an intptr_t,
// which is always large enough to hold an MPI_Request (int or pointer)
label count = 0;
auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
for (auto& req : requests)
{
waitRequests[count] = PstreamDetail::Request::get(req);
++count;
}
// Local temporary storage, or return via calling parameter
List<int> tmpIndices;
if (indices)
{
indices->resize_nocopy(count);
}
else
{
tmpIndices.resize(count);
}
if (UPstream::debug)
{
Pout<< "UPstream:waitSomeRequest : starting wait for some of "
<< requests.size() << " requests" << endl;
}
profilingPstream::beginTiming();
// On success: sets non-blocking requests to MPI_REQUEST_NULL
int outcount = 0;
if
(
MPI_Waitsome
(
count,
waitRequests,
&outcount,
(indices ? indices->data() : tmpIndices.data()),
MPI_STATUSES_IGNORE
)
)
{
FatalErrorInFunction
<< "MPI_Waitsome returned with error"
<< Foam::abort(FatalError);
}
profilingPstream::addWaitTime();
if (outcount == MPI_UNDEFINED || outcount < 1)
{
// No active request handles
if (indices) indices->clear();
// Everything handled or inactive, reset all to MPI_REQUEST_NULL
requests = UPstream::Request(MPI_REQUEST_NULL);
return false;
}
if (indices)
{
indices->resize(outcount);
}
// Transcribe MPI_Request back into UPstream::Request
// - do in reverse order - see note in finishedRequests()
{
for (label i = requests.size()-1; i >= 0; --i)
{
requests[i] = UPstream::Request(waitRequests[i]);
}
}
return true;
}
Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
{
// No-op for non-parallel or no pending requests