ENH: add request-driven polling/consumption of processor interfaces
- with (nPollProcInterfaces < 0) it does the following: - loop, waiting for some requests to finish - for each out-of-date interface, check if its associated requests have now finished (ie, the ready() check). - if ready() -> call updateInterfaceMatrix() In contrast to (nPollProcInterfaces > 0) which loops a specified number of times with several calls to MPI_Test each time, the (nPollProcInterfaces < 0) variant relies on internal MPI looping within MPI_Waitsome to progress communication. The actual dispatch still remains non-deterministic (ie, waiting for some requests to finish does not mean that any particular interface is eligible for update, or in any particular order). However, using Waitsome places the tight looping into the MPI layer, which results in few calls and eliminates behaviour dependent on the value of nPollProcInterfaces. TUT: add polling to windAroundBuildings case (for testing purposes)
This commit is contained in:
parent
e1cb12509e
commit
1f5cf3958b
@ -115,23 +115,70 @@ void Foam::LduMatrix<Type, DType, LUType>::updateMatrixInterfaces
|
||||
{
|
||||
const UPstream::commsTypes commsType = UPstream::defaultCommsType;
|
||||
|
||||
// Block until sends/receives have finished
|
||||
if (commsType == UPstream::commsTypes::nonBlocking)
|
||||
if
|
||||
(
|
||||
commsType == UPstream::commsTypes::nonBlocking
|
||||
&& UPstream::nPollProcInterfaces
|
||||
)
|
||||
{
|
||||
UPstream::waitRequests(startRequest);
|
||||
// Wait for some interface requests to become available and
|
||||
// consume them. No guarantee that the finished requests actually
|
||||
// correspond to any particular interface, but it is reasonably
|
||||
// probable that some interfaces will be able to start consumption
|
||||
// without waiting for all requests.
|
||||
|
||||
DynamicList<int> indices; // (work array)
|
||||
while
|
||||
(
|
||||
UPstream::nPollProcInterfaces < 0
|
||||
&& UPstream::waitSomeRequests(startRequest, &indices)
|
||||
)
|
||||
{
|
||||
forAll(interfaces_, interfacei)
|
||||
{
|
||||
auto* intf = interfaces_.get(interfacei);
|
||||
|
||||
if (intf && !intf->updatedMatrix() && intf->ready())
|
||||
{
|
||||
intf->updateInterfaceMatrix
|
||||
(
|
||||
result,
|
||||
add,
|
||||
lduMesh_.lduAddr(),
|
||||
interfacei,
|
||||
psiif,
|
||||
interfaceCoeffs[interfacei],
|
||||
commsType
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if
|
||||
(
|
||||
commsType == UPstream::commsTypes::blocking
|
||||
|| commsType == UPstream::commsTypes::nonBlocking
|
||||
)
|
||||
{
|
||||
// Wait until sends/receives have finished.
|
||||
// - effectively a no-op (without waiting) if already completed.
|
||||
if (commsType == UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
UPstream::waitRequests(startRequest);
|
||||
}
|
||||
|
||||
// Check/no-check for updatedMatrix() ?
|
||||
const bool noCheck = (commsType == UPstream::commsTypes::blocking);
|
||||
|
||||
forAll(interfaces_, interfacei)
|
||||
{
|
||||
if (interfaces_.set(interfacei))
|
||||
auto* intf = interfaces_.get(interfacei);
|
||||
|
||||
if (intf && (noCheck || !intf->updatedMatrix()))
|
||||
{
|
||||
interfaces_[interfacei].updateInterfaceMatrix
|
||||
intf->updateInterfaceMatrix
|
||||
(
|
||||
result,
|
||||
add,
|
||||
|
@ -118,46 +118,68 @@ void Foam::lduMatrix::updateMatrixInterfaces
|
||||
{
|
||||
const UPstream::commsTypes commsType = UPstream::defaultCommsType;
|
||||
|
||||
if (commsType == UPstream::commsTypes::blocking)
|
||||
if
|
||||
(
|
||||
commsType == UPstream::commsTypes::nonBlocking
|
||||
&& UPstream::nPollProcInterfaces
|
||||
)
|
||||
{
|
||||
forAll(interfaces, interfacei)
|
||||
// Wait for some interface requests to become available and
|
||||
// consume them. No guarantee that the finished requests actually
|
||||
// correspond to any particular interface, but it is reasonably
|
||||
// probable that some interfaces will be able to start consumption
|
||||
// without waiting for all requests.
|
||||
|
||||
DynamicList<int> indices; // (work array)
|
||||
while
|
||||
(
|
||||
UPstream::nPollProcInterfaces < 0
|
||||
&& UPstream::waitSomeRequests(startRequest, &indices)
|
||||
)
|
||||
{
|
||||
if (interfaces.set(interfacei))
|
||||
forAll(interfaces, interfacei)
|
||||
{
|
||||
interfaces[interfacei].updateInterfaceMatrix
|
||||
(
|
||||
result,
|
||||
add,
|
||||
mesh().lduAddr(),
|
||||
interfacei,
|
||||
psiif,
|
||||
coupleCoeffs[interfacei],
|
||||
cmpt,
|
||||
commsType
|
||||
);
|
||||
auto* intf = interfaces.get(interfacei);
|
||||
|
||||
if (intf && !intf->updatedMatrix() && intf->ready())
|
||||
{
|
||||
intf->updateInterfaceMatrix
|
||||
(
|
||||
result,
|
||||
add,
|
||||
mesh().lduAddr(),
|
||||
interfacei,
|
||||
psiif,
|
||||
coupleCoeffs[interfacei],
|
||||
cmpt,
|
||||
commsType
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (commsType == UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
// Try and consume interfaces as they become available
|
||||
bool allUpdated = false;
|
||||
|
||||
for (label i=0; i<UPstream::nPollProcInterfaces; i++)
|
||||
// [OLDER]
|
||||
// Alternative for consuming interfaces as they become available.
|
||||
// Within the loop, the ready() calls an MPI_Test
|
||||
// that can trigger progression. However, a bit less reliably
|
||||
// (and less efficient) since it is implies multiple calls to
|
||||
// MPI_Test to progress the MPI transfer, but no guarantee that
|
||||
// any of them will actually complete within nPollProcInterfaces
|
||||
// calls.
|
||||
|
||||
for (label i = 0; i < UPstream::nPollProcInterfaces; ++i)
|
||||
{
|
||||
allUpdated = true;
|
||||
bool allUpdated = true;
|
||||
|
||||
forAll(interfaces, interfacei)
|
||||
{
|
||||
if
|
||||
(
|
||||
interfaces.set(interfacei)
|
||||
&& !interfaces[interfacei].updatedMatrix()
|
||||
)
|
||||
auto* intf = interfaces.get(interfacei);
|
||||
|
||||
if (intf && !intf->updatedMatrix())
|
||||
{
|
||||
if (interfaces[interfacei].ready())
|
||||
if (intf->ready())
|
||||
{
|
||||
interfaces[interfacei].updateInterfaceMatrix
|
||||
intf->updateInterfaceMatrix
|
||||
(
|
||||
result,
|
||||
add,
|
||||
@ -178,35 +200,36 @@ void Foam::lduMatrix::updateMatrixInterfaces
|
||||
|
||||
if (allUpdated)
|
||||
{
|
||||
break;
|
||||
break; // Early exit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Block for everything
|
||||
if (Pstream::parRun())
|
||||
|
||||
if
|
||||
(
|
||||
commsType == UPstream::commsTypes::blocking
|
||||
|| commsType == UPstream::commsTypes::nonBlocking
|
||||
)
|
||||
{
|
||||
// Wait until sends/receives have finished.
|
||||
// - effectively a no-op (without waiting) if already completed.
|
||||
if (commsType == UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
if (allUpdated)
|
||||
{
|
||||
// All received. Just remove all outstanding requests
|
||||
UPstream::resetRequests(startRequest);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Block for all requests and remove storage
|
||||
UPstream::waitRequests(startRequest);
|
||||
}
|
||||
UPstream::waitRequests(startRequest);
|
||||
}
|
||||
|
||||
// Consume
|
||||
// Check/no-check for updatedMatrix() ?
|
||||
const bool noCheck = (commsType == UPstream::commsTypes::blocking);
|
||||
|
||||
// Consume anything still outstanding
|
||||
forAll(interfaces, interfacei)
|
||||
{
|
||||
if
|
||||
(
|
||||
interfaces.set(interfacei)
|
||||
&& !interfaces[interfacei].updatedMatrix()
|
||||
)
|
||||
auto* intf = interfaces.get(interfacei);
|
||||
|
||||
if (intf && (noCheck || !intf->updatedMatrix()))
|
||||
{
|
||||
interfaces[interfacei].updateInterfaceMatrix
|
||||
intf->updateInterfaceMatrix
|
||||
(
|
||||
result,
|
||||
add,
|
||||
@ -282,7 +305,7 @@ void Foam::lduMatrix::updateMatrixInterfaces
|
||||
psiif,
|
||||
coupleCoeffs[interfacei],
|
||||
cmpt,
|
||||
Pstream::commsTypes::blocking
|
||||
UPstream::commsTypes::blocking
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -9,7 +9,8 @@ runApplication decomposePar
|
||||
|
||||
restore0Dir -processor
|
||||
|
||||
runParallel $(getApplication)
|
||||
# Test polling interfaces
|
||||
runParallel $(getApplication) -opt-switch nPollProcInterfaces=-1
|
||||
|
||||
runApplication reconstructPar
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user