diff --git a/applications/test/parallel-waitSome/Make/files b/applications/test/parallel-waitSome/Make/files new file mode 100644 index 0000000000..b27e308ef6 --- /dev/null +++ b/applications/test/parallel-waitSome/Make/files @@ -0,0 +1,3 @@ +Test-parallel-waitSome.C + +EXE = $(FOAM_USER_APPBIN)/Test-parallel-waitSome diff --git a/applications/test/parallel-waitSome/Make/options b/applications/test/parallel-waitSome/Make/options new file mode 100644 index 0000000000..18e6fe47af --- /dev/null +++ b/applications/test/parallel-waitSome/Make/options @@ -0,0 +1,2 @@ +/* EXE_INC = */ +/* EXE_LIBS = */ diff --git a/applications/test/parallel-waitSome/Test-parallel-waitSome.C b/applications/test/parallel-waitSome/Test-parallel-waitSome.C new file mode 100644 index 0000000000..b17bb6e345 --- /dev/null +++ b/applications/test/parallel-waitSome/Test-parallel-waitSome.C @@ -0,0 +1,328 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2023 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see . + +Application + Test-parallel-waitSome + +Description + Test polling versus wait-all for processing receive data. + Will not see much difference between -wait-all and -no-polling though + since the master doesn't have enough other work. + +\*---------------------------------------------------------------------------*/ + +#include "List.H" +#include "argList.H" +#include "Time.H" +#include "IPstream.H" +#include "OPstream.H" +#include "IOstreams.H" +#include "Switch.H" +#include "clockTime.H" + +using namespace Foam; + + +// The 'classic' waiting receive, but also only waiting for recv request +template +void waitingReceive +( + const labelRange& recvRequests, + const List>& recvBuffers, + const bool waitAll = false +) +{ + clockTime waitTiming; + + if (waitAll) + { + // Wait for send and recv (assumes recv followed by send) + UPstream::waitRequests(recvRequests.start(), -1); + } + else + { + // Wait for receives only + UPstream::waitRequests(recvRequests.start(), recvRequests.size()); + } + + double waited = waitTiming.timeIncrement(); + if (waited > 1e-3) + { + Pout<< "waited: " << waited << " before processing" << endl; + } + + forAll(recvBuffers, proci) + { + const auto& slice = recvBuffers[proci]; + + if (!slice.empty()) + { + // Process data from proci + Pout<< "proc:" << proci + << ' ' << flatOutput(slice) << nl; + } + } +} + + +// Polling receive +template +void pollingReceive +( + const labelRange& recvRequests, + const UList& recvProcs, + const List>& recvBuffers +) +{ + clockTime waitTiming; + + DynamicList indices(recvRequests.size()); + + if (!recvRequests.empty()) Pout<< "..." << endl; + + for + ( + label loop = 0; + UPstream::waitSomeRequests + ( + recvRequests.start(), + recvRequests.size(), + &indices + ); + ++loop + ) + { + double waited = waitTiming.timeIncrement(); + if (waited <= 1e-3) + { + waited = 0; + } + Pout<< "loop:" << loop + << " waited: " << waited + << " before processing" << endl; + + for (const int idx : indices) + { + const int proci = recvProcs[idx]; + const auto& slice = recvBuffers[proci]; + + // Process data from proci + Pout<< "loop:" << loop << " polled:" << indices.size() + << " proc:" << proci + << ' ' << flatOutput(slice) << endl; + } + Pout<< "..." << endl; + } +} + + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +int main(int argc, char *argv[]) +{ + argList::noCheckProcessorDirectories(); + argList::addVerboseOption("timings etc"); + argList::addBoolOption("no-polling", "wait all instead of polling"); + argList::addBoolOption("wait-all", "wait all instead of polling"); + argList::addOption("sleep", "s", "change sleep (default: 5)"); + argList::noCheckProcessorDirectories(); + + const label transferSize = 10; + label sleepSeconds = 5; + + #include "setRootCase.H" + + args.readIfPresent("sleep", sleepSeconds); + const bool waitAll = args.found("wait-all"); + const bool nonPolling = args.found("no-polling"); + + if (!Pstream::parRun()) + { + Info<< "\nWarning: not parallel - skipping further tests\n" << endl; + return 0; + } + + Info<< "Calling with sleep=" << sleepSeconds + << ", polling=" << Switch::name(!nonPolling) + << ", wait-all=" << Switch::name(waitAll) << nl; + + labelList sendBuffer; + List recvBuffers; + + if (UPstream::master()) + { + recvBuffers.resize(UPstream::nProcs()); + } + else + { + recvBuffers.resize(1); + } + + clockTime timing; + + const label startOfRequests = UPstream::nRequests(); + + // Setup receives + labelRange recvRequests(UPstream::nRequests(), 0); + DynamicList recvProcs(UPstream::nProcs()); + + if (UPstream::master()) + { + for (const int proci : UPstream::subProcs()) + { + // The rank corresponding to the request + recvProcs.push_back(proci); + auto& slice = recvBuffers[proci]; + slice.resize_nocopy(transferSize); + + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + slice + ); + } + } + else + { + const int proci = UPstream::masterNo(); + + if ((UPstream::myProcNo() % 2) == 0) + { + recvProcs.push_back(proci); + auto& slice = recvBuffers[proci]; + slice.resize_nocopy(transferSize); + + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + slice + ); + } + } + // OR: recvRequests.size() = (UPstream::nRequests() - recvRequests.start()); + recvRequests += recvProcs.size(); + + + labelList overallRecvRequests + ( + UPstream::listGatherValues