From 1f5cf3958bc8743af25c8870f84082258c5b4f5a Mon Sep 17 00:00:00 2001 From: Mark Olesen Date: Tue, 11 Apr 2023 10:28:34 +0200 Subject: [PATCH] ENH: add request-driven polling/consumption of processor interfaces - with (nPollProcInterfaces < 0) it does the following: - loop, waiting for some requests to finish - for each out-of-date interface, check if its associated requests have now finished (ie, the ready() check). - if ready() -> call updateInterfaceMatrix() In contrast to (nPollProcInterfaces > 0) which loops a specified number of times with several calls to MPI_Test each time, the (nPollProcInterfaces < 0) variant relies on internal MPI looping within MPI_Waitsome to progress communication. The actual dispatch still remains non-deterministic (ie, waiting for some requests to finish does not mean that any particular interface is eligible for update, or in any particular order). However, using Waitsome places the tight looping into the MPI layer, which results in few calls and eliminates behaviour dependent on the value of nPollProcInterfaces. TUT: add polling to windAroundBuildings case (for testing purposes) --- .../LduMatrixUpdateMatrixInterfaces.C | 57 ++++++++- .../lduMatrixUpdateMatrixInterfaces.C | 121 +++++++++++------- .../windAroundBuildings/Allrun-parallel | 3 +- 3 files changed, 126 insertions(+), 55 deletions(-) diff --git a/src/OpenFOAM/matrices/LduMatrix/LduMatrix/LduMatrixUpdateMatrixInterfaces.C b/src/OpenFOAM/matrices/LduMatrix/LduMatrix/LduMatrixUpdateMatrixInterfaces.C index 1e36a82086..5376a1947b 100644 --- a/src/OpenFOAM/matrices/LduMatrix/LduMatrix/LduMatrixUpdateMatrixInterfaces.C +++ b/src/OpenFOAM/matrices/LduMatrix/LduMatrix/LduMatrixUpdateMatrixInterfaces.C @@ -115,23 +115,70 @@ void Foam::LduMatrix::updateMatrixInterfaces { const UPstream::commsTypes commsType = UPstream::defaultCommsType; - // Block until sends/receives have finished - if (commsType == UPstream::commsTypes::nonBlocking) + if + ( + commsType == UPstream::commsTypes::nonBlocking + && UPstream::nPollProcInterfaces + ) { - UPstream::waitRequests(startRequest); + // Wait for some interface requests to become available and + // consume them. No guarantee that the finished requests actually + // correspond to any particular interface, but it is reasonably + // probable that some interfaces will be able to start consumption + // without waiting for all requests. + + DynamicList indices; // (work array) + while + ( + UPstream::nPollProcInterfaces < 0 + && UPstream::waitSomeRequests(startRequest, &indices) + ) + { + forAll(interfaces_, interfacei) + { + auto* intf = interfaces_.get(interfacei); + + if (intf && !intf->updatedMatrix() && intf->ready()) + { + intf->updateInterfaceMatrix + ( + result, + add, + lduMesh_.lduAddr(), + interfacei, + psiif, + interfaceCoeffs[interfacei], + commsType + ); + } + } + } } + if ( commsType == UPstream::commsTypes::blocking || commsType == UPstream::commsTypes::nonBlocking ) { + // Wait until sends/receives have finished. + // - effectively a no-op (without waiting) if already completed. + if (commsType == UPstream::commsTypes::nonBlocking) + { + UPstream::waitRequests(startRequest); + } + + // Check/no-check for updatedMatrix() ? + const bool noCheck = (commsType == UPstream::commsTypes::blocking); + forAll(interfaces_, interfacei) { - if (interfaces_.set(interfacei)) + auto* intf = interfaces_.get(interfacei); + + if (intf && (noCheck || !intf->updatedMatrix())) { - interfaces_[interfacei].updateInterfaceMatrix + intf->updateInterfaceMatrix ( result, add, diff --git a/src/OpenFOAM/matrices/lduMatrix/lduMatrix/lduMatrixUpdateMatrixInterfaces.C b/src/OpenFOAM/matrices/lduMatrix/lduMatrix/lduMatrixUpdateMatrixInterfaces.C index d2e78b4cba..1d20ed7023 100644 --- a/src/OpenFOAM/matrices/lduMatrix/lduMatrix/lduMatrixUpdateMatrixInterfaces.C +++ b/src/OpenFOAM/matrices/lduMatrix/lduMatrix/lduMatrixUpdateMatrixInterfaces.C @@ -118,46 +118,68 @@ void Foam::lduMatrix::updateMatrixInterfaces { const UPstream::commsTypes commsType = UPstream::defaultCommsType; - if (commsType == UPstream::commsTypes::blocking) + if + ( + commsType == UPstream::commsTypes::nonBlocking + && UPstream::nPollProcInterfaces + ) { - forAll(interfaces, interfacei) + // Wait for some interface requests to become available and + // consume them. No guarantee that the finished requests actually + // correspond to any particular interface, but it is reasonably + // probable that some interfaces will be able to start consumption + // without waiting for all requests. + + DynamicList indices; // (work array) + while + ( + UPstream::nPollProcInterfaces < 0 + && UPstream::waitSomeRequests(startRequest, &indices) + ) { - if (interfaces.set(interfacei)) + forAll(interfaces, interfacei) { - interfaces[interfacei].updateInterfaceMatrix - ( - result, - add, - mesh().lduAddr(), - interfacei, - psiif, - coupleCoeffs[interfacei], - cmpt, - commsType - ); + auto* intf = interfaces.get(interfacei); + + if (intf && !intf->updatedMatrix() && intf->ready()) + { + intf->updateInterfaceMatrix + ( + result, + add, + mesh().lduAddr(), + interfacei, + psiif, + coupleCoeffs[interfacei], + cmpt, + commsType + ); + } } } - } - else if (commsType == UPstream::commsTypes::nonBlocking) - { - // Try and consume interfaces as they become available - bool allUpdated = false; - for (label i=0; iupdatedMatrix()) { - if (interfaces[interfacei].ready()) + if (intf->ready()) { - interfaces[interfacei].updateInterfaceMatrix + intf->updateInterfaceMatrix ( result, add, @@ -178,35 +200,36 @@ void Foam::lduMatrix::updateMatrixInterfaces if (allUpdated) { - break; + break; // Early exit } } + } - // Block for everything - if (Pstream::parRun()) + + if + ( + commsType == UPstream::commsTypes::blocking + || commsType == UPstream::commsTypes::nonBlocking + ) + { + // Wait until sends/receives have finished. + // - effectively a no-op (without waiting) if already completed. + if (commsType == UPstream::commsTypes::nonBlocking) { - if (allUpdated) - { - // All received. Just remove all outstanding requests - UPstream::resetRequests(startRequest); - } - else - { - // Block for all requests and remove storage - UPstream::waitRequests(startRequest); - } + UPstream::waitRequests(startRequest); } - // Consume + // Check/no-check for updatedMatrix() ? + const bool noCheck = (commsType == UPstream::commsTypes::blocking); + + // Consume anything still outstanding forAll(interfaces, interfacei) { - if - ( - interfaces.set(interfacei) - && !interfaces[interfacei].updatedMatrix() - ) + auto* intf = interfaces.get(interfacei); + + if (intf && (noCheck || !intf->updatedMatrix())) { - interfaces[interfacei].updateInterfaceMatrix + intf->updateInterfaceMatrix ( result, add, @@ -282,7 +305,7 @@ void Foam::lduMatrix::updateMatrixInterfaces psiif, coupleCoeffs[interfacei], cmpt, - Pstream::commsTypes::blocking + UPstream::commsTypes::blocking ); } } diff --git a/tutorials/incompressible/simpleFoam/windAroundBuildings/Allrun-parallel b/tutorials/incompressible/simpleFoam/windAroundBuildings/Allrun-parallel index c391fe6cca..96d81ea377 100755 --- a/tutorials/incompressible/simpleFoam/windAroundBuildings/Allrun-parallel +++ b/tutorials/incompressible/simpleFoam/windAroundBuildings/Allrun-parallel @@ -9,7 +9,8 @@ runApplication decomposePar restore0Dir -processor -runParallel $(getApplication) +# Test polling interfaces +runParallel $(getApplication) -opt-switch nPollProcInterfaces=-1 runApplication reconstructPar