Block Bootstrap Estimation in Java – Part 2

Posted on

Problem

The following is my attempt at parallelizing (I learned about parallel computing over the last weekend, so I’m very new to it) and making more efficient the code in my previous question Block Bootstrap Estimation using Java. The text file text.txt can be found at https://drive.google.com/open?id=1vLBoNmFyh4alDZt1eoJpavuEwWlPZSKX (please download the file directly should you wish to test it; there are some odd things about it that you will not pick up through even Notepad). This is a small 10 x 10 dataset with maxBlockSize = 10, but this needs to scale up to twenty 5000 x 3000 datasets with maxBlockSize = 3000, just to get an idea of scale.

import java.io.FileInputStream;
import java.lang.Math;
import java.util.Scanner;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.FileOutputStream;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.DoubleStream;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.lang.InterruptedException;

public class BlockBootstrapTestParallel {

    // Sum of a subarray, based on B(x, i, L) -- i is one-indexing
    public static double sum(double[] x, int i, int L) {
        return IntStream.range(i, i + L)
                        .parallel()
                        .mapToDouble(idx -> x[idx - 1])
                        .sum();
    }

    // Mean of a subarray, based on B(x, i, L) -- i is one-indexing
    public static double mean(double[] x, int i, int L) {
        return IntStream.range(i, i + L)
                        .parallel()
                        .mapToDouble(idx -> x[idx - 1])
                        .average()
                        .orElse(0);
    }

    // Compute MBB mean
    public static double mbbMu(double[] x, int L) {     
        return IntStream.range(0, x.length - L + 1)
                        .parallel()
                        .mapToDouble(idx -> mean(x, idx + 1, L))
                        .average()
                        .orElse(0);
    }

    // Compute MBB variance
    public static double mbbVariance(double[] x, int L, double alpha) {
        return IntStream.range(0, x.length - L + 1)
                        .parallel()
                        .mapToDouble(idx -> (Math.pow(L, alpha) * Math.pow(mean(x, idx + 1, L) - mbbMu(x, L), 2)))
                        .average()
                        .orElse(0);
    }

    // Compute NBB mean
    public static double nbbMu(double[] x, int L) {
        return IntStream.range(0, x.length / L)
                        .parallel()
                        .mapToDouble(idx -> (mean(x, 1 + ((idx + 1) - 1) * L, L)))
                        .average()
                        .orElse(0);
    }

    // Compute NBB variance
    public static double nbbVariance(double[] x, int L, double alpha) {

        double varSum = IntStream.range(0, x.length / L)
                                 .parallel()
                                 .mapToDouble(idx -> (Math.pow(mean(x, 1 + ((idx + 1) - 1) * L, L) - nbbMu(x, L), 2)))
                                 .average()
                                 .orElse(0);

        return Math.pow((double) L, alpha) * varSum;

    }

    // factorial implementation
    public static double factorial(int x) {
        double[] fact = {1.0, 1.0, 2.0, 6.0, 24.0, 120.0, 720.0, 5040.0, 40320.0, 362880.0, 3628800.0};
        return fact[x];
    }

    // Hermite polynomial
    public static double H(double x, int p) {
        double out = IntStream.range(0, (p / 2) + 1)
                              .parallel()
                              .mapToDouble(idx -> (Math.pow(-1, idx) * Math.pow(x, p - (2 * idx)) / 
                                                    ((factorial(idx) * factorial(p - (2 * idx))) * (1L << idx))))
                              .sum();
        out *= factorial(p);
        return out;
    }

    // Row means
    public static double[] rowMeans(double[][] x, int nrows, int ncols) {
        return IntStream.range(0, nrows)
                        .parallel()
                        .mapToDouble(idx -> (mean(x[idx], 1, ncols)))
                        .toArray();
    }

    public static void duration(long start, long end) {
        System.out.println("Total execution time: " + (((double)(end - start))/60000) + " minutes");
    }


    public static void main(String[] argv) throws InterruptedException, BrokenBarrierException, IOException {
        final long start = System.currentTimeMillis();
        FileInputStream fileIn = new FileInputStream("test.txt");
        FileOutputStream fileOutMBB = new FileOutputStream("MBB_test.txt");
        FileOutputStream fileOutNBB = new FileOutputStream("NBB_test.txt");
        FileOutputStream fileOutMean = new FileOutputStream("means_test.txt");

        Scanner scnr = new Scanner(fileIn);
        PrintWriter outFSMBB = new PrintWriter(fileOutMBB);
        PrintWriter outFSNBB = new PrintWriter(fileOutNBB);
        PrintWriter outFSmean = new PrintWriter(fileOutMean);

        // These variables are taken from the command line, but are inputted here for ease of use.
        int rows = 10;
        int cols = 10;
        int maxBlockSize = 10; // this could potentially be any value <= cols
        int p = 1;
        double alpha = 0.1;
        double[][] timeSeries = new double[rows][cols];

        // read in the file, and perform the H_p(x) transformation
        for (int i = 0; i < rows; i++) {
            for (int j = 0; j < cols; j++) {
                timeSeries[i][j] = H(scnr.nextDouble(), p);
            }
            scnr.next(); // skip null terminator
        }

        // row means
        double[] sampleMeans = rowMeans(timeSeries, rows, cols);
        for (int i = 0; i < rows; i++) {
            outFSmean.print(sampleMeans[i] + " ");
        }
        outFSmean.println();
        outFSmean.close();

        final CyclicBarrier gate = new CyclicBarrier(3);

        Thread mbbThread = new Thread(() -> {
            try {
                gate.await();
                for (int j = 0; j < rows; j++) {
                    for (int m = 0; m < maxBlockSize; m++) {
                        outFSMBB.print(mbbVariance(timeSeries[j], m + 1, alpha) + " ");
                    }
                outFSMBB.println();
                }           
                outFSMBB.close();
            } catch (InterruptedException e) {
                System.out.println("Main Thread interrupted!");
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                System.out.println("Main Thread interrupted!");
                e.printStackTrace();
            }
        });

        Thread nbbThread = new Thread(() -> {
            try {
                gate.await();
                for (int j = 0; j < rows; j++) {
                    for (int m = 0; m < maxBlockSize; m++) {
                        outFSNBB.print(nbbVariance(timeSeries[j], m + 1, alpha) + " ");
                    }
                outFSNBB.println();
                }           
                outFSNBB.close();
            } catch (InterruptedException e) {
                System.out.println("Main Thread interrupted!");
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                System.out.println("Main Thread interrupted!");
                e.printStackTrace();
            }
        });

        // start threads simultaneously
        mbbThread.start();
        nbbThread.start();
        gate.await();

        // wait for threads to die 
        mbbThread.join();
        nbbThread.join();

        duration(start, System.currentTimeMillis());
    }
}

If helpful, I have 8 6 cores available with 64GB of RAM, as well as two GPUs that I have no idea how to use (Intel UHD Graphics 630, NVIDIA Quadro P620). I’ll be looking over how to use these in the next few days if I have to.

Solution

You’ve got inefficiency here:

    // Sum of a subarray, based on B(x, i, L) -- i is one-indexing
    public static double sum(double[] x, int i, int L) {
        return IntStream.range(i, i + L)
                        .parallel()
                        .mapToDouble(idx -> x[idx - 1])
                        .sum();
    }

I don’t know how big L is, but you are doing idx - 1 that many times, distributed over all of the threads. You can easily move this out of the loop …

        return IntStream.range(i - 1, i + L - 1)
                        .parallel()
                        .mapToDouble(idx -> x[idx])
                        .sum();

… so the subtraction is done exactly twice.


Parallel stream in a parallel stream:

public static double mbbMu(double[] x, int L) {     
    return IntStream.range(0, x.length - L + 1)
                    .parallel()
                    .mapToDouble(idx -> mean(x, idx + 1, L))
                    .average()
                    .orElse(0);
}

The mean(...) function is trying to do things in a parallel stream. And this function is trying to do things in a parallel stream. So you’ve got parallel overhead, plus resource contention. With 8 cores, your outer stream will create 8 threads, which will each try to create 8 threads, for 8*8 threads on 8 cores!

Eeek!

Use parallel streams on the outermost streams, and leave all inner streams as serial.


Try losing the indexing altogether, and stream the values directly with:

    public static double sum(double[] x, int i, int L) {
        return Arrays.stream(x).skip(i-1).limit(L).sum();
    }

Avoid unnecessary work.

    // Compute MBB variance
    public static double mbbVariance(double[] x, int L, double alpha) {
        return IntStream.range(0, x.length - L + 1)
                        .parallel()
                        .mapToDouble(idx -> (Math.pow(L, alpha) * Math.pow(mean(x, idx + 1, L) - mbbMu(x, L), 2)))
                        .average()
                        .orElse(0);
    }

How many times does this calculate Math.pow(L, alpha)? How many times is the value different? Maybe you want to move it out of the loop.

Leave a Reply

Your email address will not be published. Required fields are marked *