Business logic parallelization engine

Posted on

Problem

In my Java 8 application there are 3 different classes but each of them contains one similar method, which is dedicated to parallelization of the business logic execution. It’s important to note, two of these three methods are called from another static methods, while the last one from instance-based method. Below, I provide the source code of these 3 parallelizationEngine() methods:

parallelizationEngine #1

private static void parallelizationEngine(int cpuCoresNum,
                                          ExecutorService service,
                                          MongoCollection<Document> dbC_Episodes,
                                          MongoCollection<Document> dbC_Events,
                                          ArrayList<Document> unprocessedEpisodes,
                                          ArrayList<Document> unprocessedEvents) {

    // update status of the queried items
    DBAgent.updateItemsStatus(dbC_Episodes, unprocessedEpisodes, FetchStatus.IN_PROCESS_META.getID());
    DBAgent.updateItemsStatus(dbC_Events, unprocessedEvents, FetchStatus.IN_PROCESS_META.getID());

    int itemsPerCore = unprocessedEvents.size() / cpuCoresNum;

    // prevent infinite thread generation if num of unprocessed items is less than number of CPU cores
    if (itemsPerCore == 0) {
        itemsPerCore = 1;
    }

    int itNum = 1;
    int upperIndex = 0;

    ArrayList<Callable<Boolean>> callables = new ArrayList<>();

    for (int i = 0; i < unprocessedEvents.size(); i += itemsPerCore) {

        int indexFrom = i;
        upperIndex += itemsPerCore;
        int toIndex = upperIndex;

        // all remaining items will be processed in the last thread
        if (itNum == cpuCoresNum) {
            toIndex = unprocessedEvents.size();
            i = unprocessedEvents.size();
        }

        int indexTo = toIndex;
        int treadID = itNum++;

        callables.add(() -> {
            Thread.currentThread().setName("im_metaDataFiller_#" + String.format("%03d", treadID));
            metaDataEngine(dbC_Episodes, dbC_Events, new ArrayList<>(unprocessedEpisodes.subList(indexFrom, indexTo)), unprocessedEvents);
            return true;
        });
    }

    try {
        // check if every thread has been processed correctly
        List<Future<Boolean>> futures = service.invokeAll(callables);

        for (Future<Boolean> future : futures) {
            try {
                future.get();
            } catch (ExecutionException e) {
                Logger.printMsg(true, "Exception", "dba.MetaDataFiller.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
            }
        }
    } catch (InterruptedException e) {
        Logger.printMsg(true, "Exception", "dba.MetaDataFiller.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
    }
}

parallelizationEngine #2

private static void parallelizationEngine(int cpuCoresNum,
                                          ExecutorService service,
                                          MongoCollection<Document> dbC_Events,
                                          ArrayList<Document> unprocessedEvents) {

    // update status of the queried items
    DBAgent.updateItemsStatus(dbC_Events, unprocessedEvents, FetchStatus.IN_PROCESS_NLP.getID());

    StanfordCoreNLP pipeline = loadNLPPipeline();
    NERClassifierCombiner nerCombClassifier = loadNERClassifiers(getNERClassPath());

    int itemsPerCore = unprocessedEvents.size() / cpuCoresNum;

    // prevent infinite thread generation if num of unprocessed items is less than number of CPU cores
    if (itemsPerCore == 0) {
        itemsPerCore = 1;
    }

    int itNum = 1;
    int upperIndex = 0;

    ArrayList<Callable<Boolean>> callables = new ArrayList<>();

    for (int i = 0; i < unprocessedEvents.size(); i += itemsPerCore) {

        int indexFrom = i;
        upperIndex += itemsPerCore;
        int toIndex = upperIndex;

        // all remaining items will be processed in the last thread
        if (itNum == cpuCoresNum) {
            toIndex = unprocessedEvents.size();
            i = unprocessedEvents.size();
        }

        int indexTo = toIndex;
        int treadID = itNum++;

        callables.add(() -> {
            Thread.currentThread().setName("im_nlpAnalyzer_#" + String.format("%03d", treadID));
            analyzerEngine(pipeline, nerCombClassifier, dbC_Events, new ArrayList<>(unprocessedEvents.subList(indexFrom, indexTo)));
            return true;
        });
    }

    try {
        // check if every thread has been processed correctly
        List<Future<Boolean>> futures = service.invokeAll(callables);

        for (Future<Boolean> future : futures) {
            try {
                future.get();
            } catch (ExecutionException e) {
                Logger.printMsg(true, "Exception", "ir.NLPAnalyzer.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
            }
        }
    } catch (InterruptedException e) {
        Logger.printMsg(true, "Exception", "ir.NLPAnalyzer.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
    }
}

parallelizationEngine #3

private void parallelizationEngine(int cpuCoresNum,
                                   ExecutorService service,
                                   MongoCollection<Document> dbC_Events,
                                   ArrayList<Document> unprocessedEvents) {

    // update status of queried items
    DBAgent.updateItemsStatus(dbC_Events, unprocessedEvents, FetchStatus.IN_PROCESS_FETCH.getID());

    int itemsPerCore = unprocessedEvents.size() / cpuCoresNum;

    // prevent infinite thread generation if num of unprocessed items is less than number of CPU cores
    if (itemsPerCore == 0) {
        itemsPerCore = 1;
    }

    int itNum = 1;
    int upperIndex = 0;

    ArrayList<Callable<Boolean>> callables = new ArrayList<>();

    for (int i = 0; i < unprocessedEvents.size(); i += itemsPerCore) {

        int indexFrom = i;
        upperIndex += itemsPerCore;
        int toIndex = upperIndex;

        // all remaining items will be processed in the last thread
        if (itNum == cpuCoresNum) {
            toIndex = unprocessedEvents.size();
            i = unprocessedEvents.size();
        }

        int indexTo = toIndex;
        int treadID = itNum++;

        callables.add(() -> {
            Thread.currentThread().setName("im_evFullTextImg_#" + String.format("%03d", treadID));
            this.fetcherEngine(dbC_Events, new ArrayList<>(unprocessedEvents.subList(indexFrom, indexTo)));
            return true;
        });
    }

    try {
        // check if every thread has been processed correctly
        List<Future<Boolean>> futures = service.invokeAll(callables);

        for (Future<Boolean> future : futures) {
            try {
                future.get();
            } catch (ExecutionException e) {
                Logger.printMsg(true, "Exception", "crawler.FullTextImgFetcher.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
            }
        }
    } catch (InterruptedException e) {
        Logger.printMsg(true, "Exception", "crawler.FullTextImgFetcher.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
    }
}

As you may see, these three methods are quite similar, and I think it will be correct to refactor this code in order to eliminate extra code duplication.

The problem is that two of three methods are static, and one is instance-based. Bearing in mind this fact, is there any way to extract the duplicated code to the separate entity and to pass the method I want to execute within callables.add() as a parameter?

Solution

You can move your method to a Utils-class and pass instances of different Classes all implementing the same Interface.
e.g.

Based on your parallelizationEngine #3:

Utils:

public static void parallelizationEngine(int cpuCoresNum,
                               ExecutorService service,
                               MongoCollection<Document> dbC_Events,
                               ArrayList<Document> unprocessedEvents,
                               AddHandler addHandler) {

  // update status of queried items
  DBAgent.updateItemsStatus(dbC_Events, unprocessedEvents, FetchStatus.IN_PROCESS_FETCH.getID());

  int itemsPerCore = unprocessedEvents.size() / cpuCoresNum;

  // prevent infinite thread generation if num of unprocessed items is less than number of CPU cores
  if (itemsPerCore == 0) {
    itemsPerCore = 1;
  }

  int itNum = 1;
  int upperIndex = 0;

  ArrayList<Callable<Boolean>> callables = new ArrayList<>();
  for (int i = 0; i < unprocessedEvents.size(); i += itemsPerCore) {

    int indexFrom = i;
    upperIndex += itemsPerCore;
    int toIndex = upperIndex;

    // all remaining items will be processed in the last thread
    if (itNum == cpuCoresNum) {
        toIndex = unprocessedEvents.size();
        i = unprocessedEvents.size();
    }

    int indexTo = toIndex;
    int treadID = itNum++;

    callables.add(() -> {
        Thread.currentThread().setName("im_metaDataFiller_#" + String.format("%03d", treadID));
        addHandler.doSomething();  // HERE DEPENDING ON THE OPPARAMETER 'addHandler', DIFFERENT THINGS HAPPEN
        return true;
    });
  }

  try {
    // check if every thread has been processed correctly
    List<Future<Boolean>> futures = service.invokeAll(callables);

    for (Future<Boolean> future : futures) {
        try {
            future.get();
        } catch (ExecutionException e) {
            Logger.printMsg(true, "Exception", "dba.MetaDataFiller.performMetaDataComputing(): ", e.getClass().getName() + " : " + e.getMessage());
        }
    }
  } catch (InterruptedException e) {
    Logger.printMsg(true, "Exception", "dba.MetaDataFiller.performMetaDataComputing(): ", e.getClass().getName() + " : " + e.getMessage());
  }
}


interface AddHandler{
  public void doSometing();
}

And classes implementing the Interface:

class A implements AddHandler{

  public void doSometing(){
    .. so something 
  }
}

class B implements AddHandler{

  public void doSometing(){
    .. so something different form class A
  }
}

Note you need to do more!

  • Because the number of parameters of your different parallelizationEngine(..) differ a bit you can write a universal method with all parameters needed by any implementation and than writhe warp this in methods with only the required parameters.

  • Because at the begin of your different parallelizationEngine(..) methods also differs a bit you can also extract this part to method of a parameter (like I showed for the part in the for-loop) to make all your implementations exactly the same so you can reduce them to only one implementation.

Leave a Reply

Your email address will not be published. Required fields are marked *