ENH: add request-driven polling/consumption of processor interfaces

- with (nPollProcInterfaces < 0) it does the following:

  - loop, waiting for some requests to finish
  - for each out-of-date interface, check if its associated
    requests have now finished (ie, the ready() check).
  - if ready() -> call updateInterfaceMatrix()

  In contrast to (nPollProcInterfaces > 0) which loops a specified
  number of times with several calls to MPI_Test each time, the
  (nPollProcInterfaces < 0) variant relies on internal MPI looping
  within MPI_Waitsome to progress communication.

  The actual dispatch still remains non-deterministic (ie, waiting for
  some requests to finish does not mean that any particular interface
  is eligible for update, or in any particular order). However, using
  Waitsome places the tight looping into the MPI layer, which results
  in few calls and eliminates behaviour dependent on the value of
  nPollProcInterfaces.

TUT: add polling to windAroundBuildings case (for testing purposes)
This commit is contained in:
Mark Olesen 2023-04-11 10:28:34 +02:00
parent e1cb12509e
commit 1f5cf3958b
3 changed files with 126 additions and 55 deletions

View File

@ -115,23 +115,70 @@ void Foam::LduMatrix<Type, DType, LUType>::updateMatrixInterfaces
{
const UPstream::commsTypes commsType = UPstream::defaultCommsType;
// Block until sends/receives have finished
if (commsType == UPstream::commsTypes::nonBlocking)
if
(
commsType == UPstream::commsTypes::nonBlocking
&& UPstream::nPollProcInterfaces
)
{
UPstream::waitRequests(startRequest);
// Wait for some interface requests to become available and
// consume them. No guarantee that the finished requests actually
// correspond to any particular interface, but it is reasonably
// probable that some interfaces will be able to start consumption
// without waiting for all requests.
DynamicList<int> indices; // (work array)
while
(
UPstream::nPollProcInterfaces < 0
&& UPstream::waitSomeRequests(startRequest, &indices)
)
{
forAll(interfaces_, interfacei)
{
auto* intf = interfaces_.get(interfacei);
if (intf && !intf->updatedMatrix() && intf->ready())
{
intf->updateInterfaceMatrix
(
result,
add,
lduMesh_.lduAddr(),
interfacei,
psiif,
interfaceCoeffs[interfacei],
commsType
);
}
}
}
}
if
(
commsType == UPstream::commsTypes::blocking
|| commsType == UPstream::commsTypes::nonBlocking
)
{
// Wait until sends/receives have finished.
// - effectively a no-op (without waiting) if already completed.
if (commsType == UPstream::commsTypes::nonBlocking)
{
UPstream::waitRequests(startRequest);
}
// Check/no-check for updatedMatrix() ?
const bool noCheck = (commsType == UPstream::commsTypes::blocking);
forAll(interfaces_, interfacei)
{
if (interfaces_.set(interfacei))
auto* intf = interfaces_.get(interfacei);
if (intf && (noCheck || !intf->updatedMatrix()))
{
interfaces_[interfacei].updateInterfaceMatrix
intf->updateInterfaceMatrix
(
result,
add,

View File

@ -118,46 +118,68 @@ void Foam::lduMatrix::updateMatrixInterfaces
{
const UPstream::commsTypes commsType = UPstream::defaultCommsType;
if (commsType == UPstream::commsTypes::blocking)
if
(
commsType == UPstream::commsTypes::nonBlocking
&& UPstream::nPollProcInterfaces
)
{
forAll(interfaces, interfacei)
// Wait for some interface requests to become available and
// consume them. No guarantee that the finished requests actually
// correspond to any particular interface, but it is reasonably
// probable that some interfaces will be able to start consumption
// without waiting for all requests.
DynamicList<int> indices; // (work array)
while
(
UPstream::nPollProcInterfaces < 0
&& UPstream::waitSomeRequests(startRequest, &indices)
)
{
if (interfaces.set(interfacei))
forAll(interfaces, interfacei)
{
interfaces[interfacei].updateInterfaceMatrix
(
result,
add,
mesh().lduAddr(),
interfacei,
psiif,
coupleCoeffs[interfacei],
cmpt,
commsType
);
auto* intf = interfaces.get(interfacei);
if (intf && !intf->updatedMatrix() && intf->ready())
{
intf->updateInterfaceMatrix
(
result,
add,
mesh().lduAddr(),
interfacei,
psiif,
coupleCoeffs[interfacei],
cmpt,
commsType
);
}
}
}
}
else if (commsType == UPstream::commsTypes::nonBlocking)
{
// Try and consume interfaces as they become available
bool allUpdated = false;
for (label i=0; i<UPstream::nPollProcInterfaces; i++)
// [OLDER]
// Alternative for consuming interfaces as they become available.
// Within the loop, the ready() calls an MPI_Test
// that can trigger progression. However, a bit less reliably
// (and less efficient) since it is implies multiple calls to
// MPI_Test to progress the MPI transfer, but no guarantee that
// any of them will actually complete within nPollProcInterfaces
// calls.
for (label i = 0; i < UPstream::nPollProcInterfaces; ++i)
{
allUpdated = true;
bool allUpdated = true;
forAll(interfaces, interfacei)
{
if
(
interfaces.set(interfacei)
&& !interfaces[interfacei].updatedMatrix()
)
auto* intf = interfaces.get(interfacei);
if (intf && !intf->updatedMatrix())
{
if (interfaces[interfacei].ready())
if (intf->ready())
{
interfaces[interfacei].updateInterfaceMatrix
intf->updateInterfaceMatrix
(
result,
add,
@ -178,35 +200,36 @@ void Foam::lduMatrix::updateMatrixInterfaces
if (allUpdated)
{
break;
break; // Early exit
}
}
}
// Block for everything
if (Pstream::parRun())
if
(
commsType == UPstream::commsTypes::blocking
|| commsType == UPstream::commsTypes::nonBlocking
)
{
// Wait until sends/receives have finished.
// - effectively a no-op (without waiting) if already completed.
if (commsType == UPstream::commsTypes::nonBlocking)
{
if (allUpdated)
{
// All received. Just remove all outstanding requests
UPstream::resetRequests(startRequest);
}
else
{
// Block for all requests and remove storage
UPstream::waitRequests(startRequest);
}
UPstream::waitRequests(startRequest);
}
// Consume
// Check/no-check for updatedMatrix() ?
const bool noCheck = (commsType == UPstream::commsTypes::blocking);
// Consume anything still outstanding
forAll(interfaces, interfacei)
{
if
(
interfaces.set(interfacei)
&& !interfaces[interfacei].updatedMatrix()
)
auto* intf = interfaces.get(interfacei);
if (intf && (noCheck || !intf->updatedMatrix()))
{
interfaces[interfacei].updateInterfaceMatrix
intf->updateInterfaceMatrix
(
result,
add,
@ -282,7 +305,7 @@ void Foam::lduMatrix::updateMatrixInterfaces
psiif,
coupleCoeffs[interfacei],
cmpt,
Pstream::commsTypes::blocking
UPstream::commsTypes::blocking
);
}
}

View File

@ -9,7 +9,8 @@ runApplication decomposePar
restore0Dir -processor
runParallel $(getApplication)
# Test polling interfaces
runParallel $(getApplication) -opt-switch nPollProcInterfaces=-1
runApplication reconstructPar