/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2016-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see .
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include "int.H"
#include "UPstreamWrapping.H"
#include "collatedFileOperation.H"
#include
#include
#include
#include
#include
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// The min value and default for MPI buffer length
constexpr int minBufLen = 20000000;
// Track size of attached MPI buffer
static int attachedBufLen = 0;
// Track if we initialized MPI
static bool ourMpi = false;
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
// Attach user-defined send buffer
static void attachOurBuffers()
{
#ifndef SGIMPI
if (attachedBufLen)
{
return; // Already attached
}
// Use UPstream::mpiBufferSize (optimisationSwitch),
// but allow override with MPI_BUFFER_SIZE env variable (int value)
int len = 0;
const std::string str(Foam::getEnv("MPI_BUFFER_SIZE"));
if (str.empty() || !Foam::read(str, len) || len <= 0)
{
len = Foam::UPstream::mpiBufferSize;
}
if (len < minBufLen)
{
len = minBufLen;
}
char* buf = new char[len];
if (MPI_SUCCESS == MPI_Buffer_attach(buf, len))
{
// Properly attached
attachedBufLen = len;
if (Foam::UPstream::debug)
{
Foam::Pout<< "UPstream::init : buffer-size " << len << '\n';
}
}
else
{
delete[] buf;
Foam::Pout<< "UPstream::init : could not attach buffer\n";
}
#endif
}
// Remove an existing user-defined send buffer
// IMPORTANT:
// This operation will block until all messages currently in the
// buffer have been transmitted.
static void detachOurBuffers()
{
#ifndef SGIMPI
if (!attachedBufLen)
{
return; // Nothing to detach
}
// Some MPI notes suggest that the return code is MPI_SUCCESS when
// no buffer is attached.
// Be extra careful and require a non-zero size as well.
char* buf = nullptr;
int len = 0;
if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
{
// This was presumably the buffer that we attached
// and not someone else.
delete[] buf;
}
// Nothing attached
attachedBufLen = 0;
#endif
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// NOTE:
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
void Foam::UPstream::addValidParOptions(HashTable& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("machinefile", "machine file");
}
bool Foam::UPstream::initNull()
{
int flag = 0;
MPI_Finalized(&flag);
if (flag)
{
// Already finalized - this is an error
FatalErrorInFunction
<< "MPI was already finalized - cannot perform MPI_Init\n"
<< Foam::abort(FatalError);
return false;
}
MPI_Initialized(&flag);
if (flag)
{
if (UPstream::debug)
{
Pout<< "UPstream::initNull : was already initialized\n";
}
}
else
{
// Not already initialized
MPI_Init_thread
(
nullptr, // argc
nullptr, // argv
MPI_THREAD_SINGLE,
&flag // provided_thread_support
);
ourMpi = true;
}
// Could also attach buffers etc.
return true;
}
bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
{
int numprocs = 0, myRank = 0;
int provided_thread_support = 0;
int flag = 0;
MPI_Finalized(&flag);
if (flag)
{
// Already finalized - this is an error
FatalErrorInFunction
<< "MPI was already finalized - cannot perform MPI_Init" << endl
<< Foam::abort(FatalError);
return false;
}
MPI_Initialized(&flag);
if (flag)
{
// Already initialized.
// Warn if we've called twice, but skip if initialized externally
if (ourMpi)
{
WarningInFunction
<< "MPI was already initialized - cannot perform MPI_Init" << nl
<< "This could indicate an application programming error!"
<< endl;
return true;
}
else if (UPstream::debug)
{
Pout<< "UPstream::init : was already initialized\n";
}
}
else
{
MPI_Init_thread
(
&argc,
&argv,
(
needsThread
? MPI_THREAD_MULTIPLE
: MPI_THREAD_SINGLE
),
&provided_thread_support
);
ourMpi = true;
}
// Check argument list for local world
label worldIndex = -1;
word world;
for (int argi = 1; argi < argc; ++argi)
{
if (strcmp(argv[argi], "-world") == 0)
{
worldIndex = argi++;
if (argi >= argc)
{
FatalErrorInFunction
<< "Missing world name to argument \"world\""
<< Foam::abort(FatalError);
}
world = argv[argi];
break;
}
}
// Filter 'world' option
if (worldIndex != -1)
{
for (label i = worldIndex+2; i < argc; i++)
{
argv[i-2] = argv[i];
}
argc -= 2;
}
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
if (UPstream::debug)
{
Pout<< "UPstream::init :"
<< " thread-support : requested:" << needsThread
<< " obtained:"
<< (
(provided_thread_support == MPI_THREAD_SINGLE)
? "SINGLE"
: (provided_thread_support == MPI_THREAD_SERIALIZED)
? "SERIALIZED"
: (provided_thread_support == MPI_THREAD_MULTIPLE)
? "MULTIPLE"
: "other"
)
<< " procs:" << numprocs
<< " rank:" << myRank
<< " world:" << world << endl;
}
if (worldIndex == -1 && numprocs <= 1)
{
FatalErrorInFunction
<< "attempt to run parallel on 1 processor"
<< Foam::abort(FatalError);
}
// Initialise parallel structure
setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
if (worldIndex != -1)
{
// During startup, so commWorld() == commGlobal()
wordList worlds(numprocs);
worlds[UPstream::myProcNo(UPstream::commGlobal())] = world;
Pstream::gatherList
(
worlds,
UPstream::msgType(),
UPstream::commGlobal()
);
// Compact
if (UPstream::master(UPstream::commGlobal()))
{
DynamicList worldNames(numprocs);
worldIDs_.resize_nocopy(numprocs);
forAll(worlds, proci)
{
const word& world = worlds[proci];
worldIDs_[proci] = worldNames.find(world);
if (worldIDs_[proci] == -1)
{
worldIDs_[proci] = worldNames.size();
worldNames.push_back(world);
}
}
allWorlds_.transfer(worldNames);
}
Pstream::broadcasts(UPstream::commGlobal(), allWorlds_, worldIDs_);
const label myWorldId =
worldIDs_[UPstream::myProcNo(UPstream::commGlobal())];
DynamicList