# Block Bootstrap Estimation in Java – Part 2

Posted on

Problem

The following is my attempt at parallelizing (I learned about parallel computing over the last weekend, so I’m very new to it) and making more efficient the code in my previous question Block Bootstrap Estimation using Java. The text file text.txt can be found at https://drive.google.com/open?id=1vLBoNmFyh4alDZt1eoJpavuEwWlPZSKX (please download the file directly should you wish to test it; there are some odd things about it that you will not pick up through even Notepad). This is a small 10 x 10 dataset with `maxBlockSize = 10`, but this needs to scale up to twenty 5000 x 3000 datasets with `maxBlockSize = 3000`, just to get an idea of scale.

``````import java.io.FileInputStream;
import java.lang.Math;
import java.util.Scanner;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.FileOutputStream;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.DoubleStream;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.lang.InterruptedException;

public class BlockBootstrapTestParallel {

// Sum of a subarray, based on B(x, i, L) -- i is one-indexing
public static double sum(double[] x, int i, int L) {
return IntStream.range(i, i + L)
.parallel()
.mapToDouble(idx -> x[idx - 1])
.sum();
}

// Mean of a subarray, based on B(x, i, L) -- i is one-indexing
public static double mean(double[] x, int i, int L) {
return IntStream.range(i, i + L)
.parallel()
.mapToDouble(idx -> x[idx - 1])
.average()
.orElse(0);
}

// Compute MBB mean
public static double mbbMu(double[] x, int L) {
return IntStream.range(0, x.length - L + 1)
.parallel()
.mapToDouble(idx -> mean(x, idx + 1, L))
.average()
.orElse(0);
}

// Compute MBB variance
public static double mbbVariance(double[] x, int L, double alpha) {
return IntStream.range(0, x.length - L + 1)
.parallel()
.mapToDouble(idx -> (Math.pow(L, alpha) * Math.pow(mean(x, idx + 1, L) - mbbMu(x, L), 2)))
.average()
.orElse(0);
}

// Compute NBB mean
public static double nbbMu(double[] x, int L) {
return IntStream.range(0, x.length / L)
.parallel()
.mapToDouble(idx -> (mean(x, 1 + ((idx + 1) - 1) * L, L)))
.average()
.orElse(0);
}

// Compute NBB variance
public static double nbbVariance(double[] x, int L, double alpha) {

double varSum = IntStream.range(0, x.length / L)
.parallel()
.mapToDouble(idx -> (Math.pow(mean(x, 1 + ((idx + 1) - 1) * L, L) - nbbMu(x, L), 2)))
.average()
.orElse(0);

return Math.pow((double) L, alpha) * varSum;

}

// factorial implementation
public static double factorial(int x) {
double[] fact = {1.0, 1.0, 2.0, 6.0, 24.0, 120.0, 720.0, 5040.0, 40320.0, 362880.0, 3628800.0};
return fact[x];
}

// Hermite polynomial
public static double H(double x, int p) {
double out = IntStream.range(0, (p / 2) + 1)
.parallel()
.mapToDouble(idx -> (Math.pow(-1, idx) * Math.pow(x, p - (2 * idx)) /
((factorial(idx) * factorial(p - (2 * idx))) * (1L << idx))))
.sum();
out *= factorial(p);
return out;
}

// Row means
public static double[] rowMeans(double[][] x, int nrows, int ncols) {
return IntStream.range(0, nrows)
.parallel()
.mapToDouble(idx -> (mean(x[idx], 1, ncols)))
.toArray();
}

public static void duration(long start, long end) {
System.out.println("Total execution time: " + (((double)(end - start))/60000) + " minutes");
}

public static void main(String[] argv) throws InterruptedException, BrokenBarrierException, IOException {
final long start = System.currentTimeMillis();
FileInputStream fileIn = new FileInputStream("test.txt");
FileOutputStream fileOutMBB = new FileOutputStream("MBB_test.txt");
FileOutputStream fileOutNBB = new FileOutputStream("NBB_test.txt");
FileOutputStream fileOutMean = new FileOutputStream("means_test.txt");

Scanner scnr = new Scanner(fileIn);
PrintWriter outFSMBB = new PrintWriter(fileOutMBB);
PrintWriter outFSNBB = new PrintWriter(fileOutNBB);
PrintWriter outFSmean = new PrintWriter(fileOutMean);

// These variables are taken from the command line, but are inputted here for ease of use.
int rows = 10;
int cols = 10;
int maxBlockSize = 10; // this could potentially be any value <= cols
int p = 1;
double alpha = 0.1;
double[][] timeSeries = new double[rows][cols];

// read in the file, and perform the H_p(x) transformation
for (int i = 0; i < rows; i++) {
for (int j = 0; j < cols; j++) {
timeSeries[i][j] = H(scnr.nextDouble(), p);
}
scnr.next(); // skip null terminator
}

// row means
double[] sampleMeans = rowMeans(timeSeries, rows, cols);
for (int i = 0; i < rows; i++) {
outFSmean.print(sampleMeans[i] + " ");
}
outFSmean.println();
outFSmean.close();

final CyclicBarrier gate = new CyclicBarrier(3);

try {
gate.await();
for (int j = 0; j < rows; j++) {
for (int m = 0; m < maxBlockSize; m++) {
outFSMBB.print(mbbVariance(timeSeries[j], m + 1, alpha) + " ");
}
outFSMBB.println();
}
outFSMBB.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});

try {
gate.await();
for (int j = 0; j < rows; j++) {
for (int m = 0; m < maxBlockSize; m++) {
outFSNBB.print(nbbVariance(timeSeries[j], m + 1, alpha) + " ");
}
outFSNBB.println();
}
outFSNBB.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});

gate.await();

// wait for threads to die

duration(start, System.currentTimeMillis());
}
}
``````

If helpful, I have 8 6 cores available with 64GB of RAM, as well as two GPUs that I have no idea how to use (Intel UHD Graphics 630, NVIDIA Quadro P620). I’ll be looking over how to use these in the next few days if I have to.

Solution

You’ve got inefficiency here:

``````    // Sum of a subarray, based on B(x, i, L) -- i is one-indexing
public static double sum(double[] x, int i, int L) {
return IntStream.range(i, i + L)
.parallel()
.mapToDouble(idx -> x[idx - 1])
.sum();
}
``````

I don’t know how big `L` is, but you are doing `idx - 1` that many times, distributed over all of the threads. You can easily move this out of the loop …

``````        return IntStream.range(i - 1, i + L - 1)
.parallel()
.mapToDouble(idx -> x[idx])
.sum();
``````

… so the subtraction is done exactly twice.

Parallel stream in a parallel stream:

``````public static double mbbMu(double[] x, int L) {
return IntStream.range(0, x.length - L + 1)
.parallel()
.mapToDouble(idx -> mean(x, idx + 1, L))
.average()
.orElse(0);
}
``````

The `mean(...)` function is trying to do things in a parallel stream. And this function is trying to do things in a parallel stream. So you’ve got parallel overhead, plus resource contention. With 8 cores, your outer stream will create 8 threads, which will each try to create 8 threads, for 8*8 threads on 8 cores!

Eeek!

Use parallel streams on the outermost streams, and leave all inner streams as serial.

Try losing the indexing altogether, and stream the values directly with:

``````    public static double sum(double[] x, int i, int L) {
return Arrays.stream(x).skip(i-1).limit(L).sum();
}
``````

Avoid unnecessary work.

``````    // Compute MBB variance
public static double mbbVariance(double[] x, int L, double alpha) {
return IntStream.range(0, x.length - L + 1)
.parallel()
.mapToDouble(idx -> (Math.pow(L, alpha) * Math.pow(mean(x, idx + 1, L) - mbbMu(x, L), 2)))
.average()
.orElse(0);
}
``````

How many times does this calculate `Math.pow(L, alpha)`? How many times is the value different? Maybe you want to move it out of the loop.