From e453f0bf02997d5c0fad3865f7ffda58328d52e9 Mon Sep 17 00:00:00 2001 From: mattijs Date: Mon, 12 Dec 2016 17:32:24 +0000 Subject: [PATCH 1/3] ENH: Pstream: added maxCommsSize setting to do (unstructured) parallel transfers in blocks. This is controlled by the setting maxCommsSize in etc/controlDict which specifies the max number of bytes per exchange. If set to <= 0 it is ignored. This max size of messages is important when doing e.g. load balancing which can send over whole meshes. --- src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H | 34 +- .../db/IOstreams/Pstreams/PstreamReduceOps.H | 3 +- src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C | 12 + src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H | 3 + src/OpenFOAM/db/IOstreams/Pstreams/exchange.C | 300 +++++++++++++++--- .../db/dynamicLibrary/codedBase/codedBase.C | 1 + src/Pstream/dummy/UPstream.C | 4 +- src/Pstream/mpi/UPstream.C | 4 +- 8 files changed, 308 insertions(+), 53 deletions(-) diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index e2ec02e88b..2e2f015403 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -3,7 +3,7 @@ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation - \\/ M anipulation | + \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -55,6 +55,37 @@ class Pstream : public UPstream { + // Private Static Functions + + //- Exchange contiguous data. Sends sendData, receives into + // recvData. If block=true will wait for all transfers to finish. + // Data provided and received as container. + template + static void exchangeContainer + ( + const UList& sendBufs, + const labelUList& recvSizes, + List& recvBufs, + const int tag, + const label comm, + const bool block + ); + + //- Exchange contiguous data. Sends sendData, receives into + // recvData. If block=true will wait for all transfers to finish. + // Data provided and received as pointers. + template + static void exchangeBuf + ( + const labelUList& sendSizes, // number of T, not number of char + const UList& sendBufs, + const labelUList& recvSizes, // number of T, not number of char + List& recvBufs, + const int tag, + const label comm, + const bool block + ); + protected: @@ -63,6 +94,7 @@ protected: //- Transfer buffer DynamicList buf_; + public: // Declare name of the class and its debug switch diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H index 53c4b6f344..51fb00060b 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H @@ -3,7 +3,7 @@ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation - \\/ M anipulation | + \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -32,7 +32,6 @@ Description #ifndef PstreamReduceOps_H #define PstreamReduceOps_H -#include "Pstream.H" #include "ops.H" #include "vector2D.H" diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index d8954cb1c9..77692890b3 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -464,4 +464,16 @@ registerOptSwitch ); +int Foam::UPstream::maxCommsSize +( + Foam::debug::optimisationSwitch("maxCommsSize", 0) +); +registerOptSwitch +( + "maxCommsSize", + int, + Foam::UPstream::maxCommsSize +); + + // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 609155d492..e2fb8a7701 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -268,6 +268,9 @@ public: //- Number of polling cycles in processor updates static int nPollProcInterfaces; + //- Optional maximum message size (bytes) + static int maxCommsSize; + //- Default communicator (all processors) static label worldComm; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C index 3a3436cf27..34114a29a4 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C @@ -3,7 +3,7 @@ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation - \\/ M anipulation | + \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -28,11 +28,157 @@ Description #include "Pstream.H" #include "contiguous.H" -#include "PstreamCombineReduceOps.H" -#include "UPstream.H" +#include "PstreamReduceOps.H" // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // +template +void Foam::Pstream::exchangeContainer +( + const UList& sendBufs, + const labelUList& recvSizes, + List& recvBufs, + const int tag, + const label comm, + const bool block +) +{ + label startOfRequests = Pstream::nRequests(); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + forAll(recvSizes, proci) + { + if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0) + { + UIPstream::read + ( + UPstream::nonBlocking, + proci, + reinterpret_cast(recvBufs[proci].begin()), + recvSizes[proci]*sizeof(T), + tag, + comm + ); + } + } + + + // Set up sends + // ~~~~~~~~~~~~ + + forAll(sendBufs, proci) + { + if (proci != Pstream::myProcNo(comm) && sendBufs[proci].size() > 0) + { + if + ( + !UOPstream::write + ( + UPstream::nonBlocking, + proci, + reinterpret_cast(sendBufs[proci].begin()), + sendBufs[proci].size()*sizeof(T), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message. " + << "to:" << proci << " nBytes:" + << label(sendBufs[proci].size()*sizeof(T)) + << Foam::abort(FatalError); + } + } + } + + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (block) + { + Pstream::waitRequests(startOfRequests); + } +} + + +template +void Foam::Pstream::exchangeBuf +( + const labelUList& sendSizes, + const UList& sendBufs, + const labelUList& recvSizes, + List& recvBufs, + const int tag, + const label comm, + const bool block +) +{ + label startOfRequests = Pstream::nRequests(); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + forAll(recvSizes, proci) + { + if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0) + { + UIPstream::read + ( + UPstream::nonBlocking, + proci, + recvBufs[proci], + recvSizes[proci]*sizeof(T), + tag, + comm + ); + } + } + + + // Set up sends + // ~~~~~~~~~~~~ + + forAll(sendBufs, proci) + { + if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0) + { + if + ( + !UOPstream::write + ( + UPstream::nonBlocking, + proci, + sendBufs[proci], + sendSizes[proci]*sizeof(T), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message. " + << "to:" << proci << " nBytes:" + << label(sendSizes[proci]*sizeof(T)) + << Foam::abort(FatalError); + } + } + } + + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (block) + { + Pstream::waitRequests(startOfRequests); + } +} + + template void Foam::Pstream::exchange ( @@ -63,11 +209,7 @@ void Foam::Pstream::exchange if (UPstream::parRun() && UPstream::nProcs(comm) > 1) { - label startOfRequests = Pstream::nRequests(); - - // Set up receives - // ~~~~~~~~~~~~~~~ - + // Presize all receive buffers forAll(recvSizes, proci) { label nRecv = recvSizes[proci]; @@ -75,55 +217,121 @@ void Foam::Pstream::exchange if (proci != Pstream::myProcNo(comm) && nRecv > 0) { recvBufs[proci].setSize(nRecv); - UIPstream::read - ( - UPstream::nonBlocking, - proci, - reinterpret_cast(recvBufs[proci].begin()), - nRecv*sizeof(T), - tag, - comm - ); } } - - // Set up sends - // ~~~~~~~~~~~~ - - forAll(sendBufs, proci) + if (Pstream::maxCommsSize <= 0) { - if (proci != Pstream::myProcNo(comm) && sendBufs[proci].size() > 0) + // Do the exchanging in one go + exchangeContainer + ( + sendBufs, + recvSizes, + recvBufs, + tag, + comm, + block + ); + } + else + { + // Determine the number of chunks to send. Note that we + // only have to look at the sending data since we are + // guaranteed that some processor's sending size is some other + // processor's receive size. Also we can ignore any local comms. + + label maxNSend = 0; + forAll(sendBufs, proci) { - if - ( - !UOPstream::write - ( - UPstream::nonBlocking, - proci, - reinterpret_cast(sendBufs[proci].begin()), - sendBufs[proci].size()*sizeof(T), - tag, - comm - ) - ) + if (proci != Pstream::myProcNo(comm)) { - FatalErrorInFunction - << "Cannot send outgoing message. " - << "to:" << proci << " nBytes:" - << label(sendBufs[proci].size()*sizeof(T)) - << Foam::abort(FatalError); + maxNSend = max(maxNSend, sendBufs[proci].size()); } } - } + + const label maxNBytes = sizeof(T)*maxNSend; + + // We need to send maxNBytes bytes so the number of iterations: + // maxNBytes iterations + // --------- ---------- + // 0 0 + // 1..maxCommsSize 1 + // maxCommsSize+1..2*maxCommsSize 2 + // etc. + + label nIter; + if (maxNBytes == 0) + { + nIter = 0; + } + else + { + nIter = (maxNBytes-1)/Pstream::maxCommsSize+1; + } + reduce(nIter, maxOp