Histogram of scalarField (gathering values from multiple processes?)

Hello fellow palabos users,
dear developers,

For my current project, I want to monitor the development of wall shear stress during a simulation in the form of a histogram.

At first I thought, I just pass the adress of a single std:vector to my processing functional which would grow as all processes ‘push_back’ to it:

[code=“cpp”]
/* ******** MaskedBoxStressNormVectorFunctional2D.h ******* /
template<typename T1, typename T2>
class MaskedBoxStressNormVectorFunctional2D : public BoxProcessingFunctional2D_SS<T1, T2>
{
public:
MaskedBoxStressNormVectorFunctional2D(std::vector
wallStressVec_, int flag_);
virtual void process( Box2D domain,
ScalarField2D& mask,
ScalarField2D& stressNormField );
virtual MaskedBoxStressNormVectorFunctional2D<T1,T2>* clone() const;
virtual void getTypeOfModification(std::vectormodif::ModifT& modified) const {
modified[0] = modif::nothing;
modified[1] = modif::nothing;
}
private:
T1 flag;
std::vector* wallStressVec;
};

/* ******** MaskedBoxStressNormVectorFunctional2D.hh ******* /
template<typename T1, typename T2>
MaskedBoxStressNormVectorFunctional2D<T1,T2>::MaskedBoxStressNormVectorFunctional2D(std::vector
wallStressVec_, int flag_)
: flag(flag_), wallStressVec(wallStressVec_)
{ }

template<typename T1, typename T2>
void MaskedBoxStressNormVectorFunctional2D<T1,T2>::process (
Box2D domain,
ScalarField2D& mask,
ScalarField2D& stressNormField )
{
Dot2D offset = computeRelativeDisplacement(mask, stressNormField);
for (plint iX=domain.x0; iX<=domain.x1; ++iX) {
for (plint iY=domain.y0; iY<=domain.y1; ++iY) {

        if (mask.get(iX,iY)==flag) {

// pcout << "get wall stress, which is = " << stressNormField.get(iX,iY) << std::endl;
wallStressVec->push_back(stressNormField.get(iX+offset.x, iY+offset.y));
// pcout << " process: size of wall stress vec = " << wallStressVec->size() << std::endl;
}
}
}
}

template<typename T1, typename T2>
MaskedBoxStressNormVectorFunctional2D<T1,T2>* MaskedBoxStressNormVectorFunctional2D<T1,T2>::clone() const
{
return new MaskedBoxStressNormVectorFunctional2D<T1,T2>(*this);
}



For 1 or 2 processes this seem to work fine.
However, as I increase the number of processes, strange things happen and I end up with a segfault.
I think this is due to the vector adress not being accessible for all processes.

Palabos uses its 'reductive processing functionals' to calculate/compare values from across all processes- this seems to be the preferred approach.
However, only the statistics 'average', 'sum', 'min' & 'max' are implemented in the palabos source code...
So I thought I might just as well add a new functionality:

I tried to adapt the implemented [i]MaskedBoxScalarAverageFunctional2D[/i] in a way so it adds all values to a private vector which should be accessible using a public function similar to [i]getAverageScalar()[/i].

[code="cpp"]
/* ******** MaskedBoxScalarHistogramFunctional2D ******* */
template<typename T>
class MaskedBoxScalarHistogramFunctional2D : public ReductiveBoxProcessingFunctional2D_SS<T,int> {
public:
    MaskedBoxScalarHistogramFunctional2D(int flag_)
        : histogramScalarId(this->getStatistics().subscribeHistogram()),
          flag(flag_)
    { }

    void process ( Box2D domain,
                   ScalarField2D<T>& scalarField,
                   ScalarField2D<int>& mask ) {
        Dot2D offset = computeRelativeDisplacement(scalarField, mask);
        BlockStatistics& statistics = this->getStatistics();
        for (plint iX=domain.x0; iX<=domain.x1; ++iX) {
            for (plint iY=domain.y0; iY<=domain.y1; ++iY) {
                if (mask.get(iX+offset.x, iY+offset.y)==flag) {
                    statistics.gatherHistogram(histogramScalarId, (T)scalarField.get(iX,iY));
                    statistics.incrementStats();
                }
            }
        }
    }

    MaskedBoxScalarHistogramFunctional2D<T>* clone() const {
        return new MaskedBoxScalarHistogramFunctional2D<T>(*this);
    }

    virtual void getTypeOfModification(std::vector<modif::ModifT>& modified) const {
        modified[0] = modif::nothing;
        modified[1] = modif::nothing;
    }

    std::vector<T> getHistogramScalar() const {
        std::vector<T> histogram = this->getStatistics().getHistogram(histogramScalarId);
        // The histogram is internally computed on floating-point values. If T is
        //   integer, the value must be rounded at the end.
        if (std::numeric_limits<T>::is_integer) {
            return std::vector<T>(histogram.begin(), histogram.end());
        }
        return histogram;
    }
private:
    plint histogramScalarId;
    int flag;
};

In the course of this I found myself changing the structure of ReductiveBoxProcessingFunctionals in general, and now I’m stuck somewhere in the deepest, darkest parts of palabos: the generic implementation of how MPI is used to communicate between processes “src/parallelism/parallelStatistics.cpp”

So instead of learning how to use MPI and altering the palabos source code (another time maybe), I hope that one of you might suggest a simpler, more elegant approach.

How would you gather values from a ScalarField (e.g. norm of shear stress) from all processes in a single std::vector ?

Cheers,
david

Hi again,

I continued altering the way Palabos implemented cross processes statistics and changed these files.

in ‘atomicBlock’ directory:
[ul]
atomicBlock2D.cpp
atomicBlock2D.h
atomicBlock3D.cpp
atomicBlock3D.h
blockLattice2D.hh
[/ul]
in ‘core’ directory:
[ul]
blockStatistics.cpp
blockStatistics.h
[/ul]
in ‘multiBlock’ directory:
[ul]
combinedStatistics.cpp
combinedStatistics.h
multiBlock2D.cpp
multiBlock2D.h
multiBlock3D.cpp
multiBlock3D.h
[/ul]
in ‘parallelism’ directory:
[ul]
mpiManager.h
parallelStatistics.cpp
parallelStatistics
[/ul]

The most important being:

combinedStatistics.cpp

[code=“cpp”]

void CombinedStatistics::computeLocalHistogram (
std::vector<BlockStatistics const*> const& individualStatistics,
std::vector<std::vector>& histObservables,
std::vector& sumWeightsHist ) const
{
// For each “histogram observable”
for (pluint iHist=0; iHist<histObservables.size(); ++iHist) {
histObservables[iHist].clear();
sumWeightsHist[iHist] = 0.;
// Compute local histogram
for (pluint iStat=0; iStat<individualStatistics.size(); ++iStat) {
std::vector tmpIndividualHist = individualStatistics[iStat]->getHistogram(iHist);
histObservables[iHist].insert(
histObservables[iHist].end(), tmpIndividualHist.begin(), tmpIndividualHist.end() );
}
sumWeightsHist[iHist] = histObservables[iHist].size();
}
}

/* ************** other statistics unchanged ***************** */

void CombinedStatistics::combine (
std::vector<BlockStatistics const*>& individualStatistics,
BlockStatistics& result ) const
{

// Local histograms
std::vector<std::vector<double>> histObservables(result.getHistogramVect().size());
std::vector<double> sumWeightsHist(result.getHistogramVect().size());
computeLocalHistogram(individualStatistics, histObservables, sumWeightsHist);

/* ************** other statistics unchanged ***************** */

// Compute global, cross-core statistics
this->reduceStatistics (
        averageObservables, sumWeights,
        histObservables, sumWeightsHist,
        sumObservables,
        maxObservables,
        intSumObservables );

// Update public statistics in resulting block
result.evaluate (
    averageObservables, histObservables, sumObservables, maxObservables, intSumObservables, 0 );

}

SerialCombinedStatistics* SerialCombinedStatistics::clone() const {
return new SerialCombinedStatistics(*this);
}

void SerialCombinedStatistics::reduceStatistics (
std::vector& averageObservables,
std::vector& sumWeights,
std::vector<std::vector>& histObservables,
std::vector& sumWeightsHist,
std::vector& sumObservables,
std::vector& maxObservables,
std::vector& intSumObservables ) const
{
};



and parallelStatistics.cpp
[code="cpp"]
oid ParallelCombinedStatistics::reduceStatistics (
            std::vector<double>& averageObservables,
            std::vector<double>& sumWeights,
            std::vector<std::vector<double>>& histObservables,
            std::vector<double>& sumWeightsHist,
            std::vector<double>& sumObservables,
            std::vector<double>& maxObservables,
            std::vector<plint>& intSumObservables ) const
{

/* ************** other statistics unchanged ***************** */

    // Histogram
    for (pluint iHist=0; iHist<histObservables.size(); ++iHist) {
        int numProcesses = global::mpi().getSize();
        int globalHistogramLength = 0;

    // Each process tells the root how many elements it holds
        // count has to be double, because gatherv_impl takes values from
        // sumWeightHist[iHist] which are of type double
        std::vector<double> counts(numProcesses); 
        // Displacements in the receive buffer for MPI_GATHERV
        std::vector<int> dispsSimple(numProcesses); 
        // Displacement = {0,1,2,3,4,...}
        for (int i = 0; i < numProcesses; i++) dispsSimple[i] = i;
        global::mpi().gatherv_impl(&sumWeightsHist[iHist], 1, &counts[0], &numProcesses, &dispsSimple[0], 0);
        // total number of to be collected values:
        double globalWeightHist = 0;
        for (int i = 0; i < numProcesses; i++) globalWeightHist += int(counts.at(i) );
    // to check if gatherv_impl worked correctly / numProceesses is correct:
        double test_globalWeightHist = 0;
        global::mpi().reduce(sumWeightsHist[iHist], test_globalWeightHist, MPI_SUM);
        if( globalWeightHist != test_globalWeightHist ) exit(1);
        // for next steps, convert to int:
        else globalHistogramLength = int(globalWeightHist);

    // Collection of all histogram values
        std::vector<double> globalHistogram(globalHistogramLength);
        // Displacements in the receive buffer for MPI_GATHERV
        std::vector<int> disps(numProcesses);
        for (int i = 0; i < numProcesses; i++) disps[i] = (i > 0) ? (disps.at(i-1) + (int)counts.at(i-1) ) : 0;
        global::mpi().gatherv_impl(
            &histObservables[iHist][0], int(sumWeightsHist[iHist]),
            &globalHistogram[0], &globalHistogramLength, &disps[0], 0);
//        global::mpi().bCast(&globalHistogram[0], globalHistogramLength);
        histObservables[iHist] = globalHistogram;
    }

Obviously I had to change a lot of other files to get this to compile.
Unfortunately, it only runs for a single processor.

Any MPI experts who want to share some insights?
Dear Palabos developers, is this something you’d like to implement in future releases?
If so, I’d happily share the rest of my changes.

Cheers, david