Problem
In my Java 8
application there are 3 different classes but each of them contains one similar method, which is dedicated to parallelization of the business logic execution. It’s important to note, two of these three methods are called from another static
methods, while the last one from instance-based method. Below, I provide the source code of these 3 parallelizationEngine()
methods:
parallelizationEngine #1
private static void parallelizationEngine(int cpuCoresNum,
ExecutorService service,
MongoCollection<Document> dbC_Episodes,
MongoCollection<Document> dbC_Events,
ArrayList<Document> unprocessedEpisodes,
ArrayList<Document> unprocessedEvents) {
// update status of the queried items
DBAgent.updateItemsStatus(dbC_Episodes, unprocessedEpisodes, FetchStatus.IN_PROCESS_META.getID());
DBAgent.updateItemsStatus(dbC_Events, unprocessedEvents, FetchStatus.IN_PROCESS_META.getID());
int itemsPerCore = unprocessedEvents.size() / cpuCoresNum;
// prevent infinite thread generation if num of unprocessed items is less than number of CPU cores
if (itemsPerCore == 0) {
itemsPerCore = 1;
}
int itNum = 1;
int upperIndex = 0;
ArrayList<Callable<Boolean>> callables = new ArrayList<>();
for (int i = 0; i < unprocessedEvents.size(); i += itemsPerCore) {
int indexFrom = i;
upperIndex += itemsPerCore;
int toIndex = upperIndex;
// all remaining items will be processed in the last thread
if (itNum == cpuCoresNum) {
toIndex = unprocessedEvents.size();
i = unprocessedEvents.size();
}
int indexTo = toIndex;
int treadID = itNum++;
callables.add(() -> {
Thread.currentThread().setName("im_metaDataFiller_#" + String.format("%03d", treadID));
metaDataEngine(dbC_Episodes, dbC_Events, new ArrayList<>(unprocessedEpisodes.subList(indexFrom, indexTo)), unprocessedEvents);
return true;
});
}
try {
// check if every thread has been processed correctly
List<Future<Boolean>> futures = service.invokeAll(callables);
for (Future<Boolean> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
Logger.printMsg(true, "Exception", "dba.MetaDataFiller.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
}
}
} catch (InterruptedException e) {
Logger.printMsg(true, "Exception", "dba.MetaDataFiller.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
}
}
parallelizationEngine #2
private static void parallelizationEngine(int cpuCoresNum,
ExecutorService service,
MongoCollection<Document> dbC_Events,
ArrayList<Document> unprocessedEvents) {
// update status of the queried items
DBAgent.updateItemsStatus(dbC_Events, unprocessedEvents, FetchStatus.IN_PROCESS_NLP.getID());
StanfordCoreNLP pipeline = loadNLPPipeline();
NERClassifierCombiner nerCombClassifier = loadNERClassifiers(getNERClassPath());
int itemsPerCore = unprocessedEvents.size() / cpuCoresNum;
// prevent infinite thread generation if num of unprocessed items is less than number of CPU cores
if (itemsPerCore == 0) {
itemsPerCore = 1;
}
int itNum = 1;
int upperIndex = 0;
ArrayList<Callable<Boolean>> callables = new ArrayList<>();
for (int i = 0; i < unprocessedEvents.size(); i += itemsPerCore) {
int indexFrom = i;
upperIndex += itemsPerCore;
int toIndex = upperIndex;
// all remaining items will be processed in the last thread
if (itNum == cpuCoresNum) {
toIndex = unprocessedEvents.size();
i = unprocessedEvents.size();
}
int indexTo = toIndex;
int treadID = itNum++;
callables.add(() -> {
Thread.currentThread().setName("im_nlpAnalyzer_#" + String.format("%03d", treadID));
analyzerEngine(pipeline, nerCombClassifier, dbC_Events, new ArrayList<>(unprocessedEvents.subList(indexFrom, indexTo)));
return true;
});
}
try {
// check if every thread has been processed correctly
List<Future<Boolean>> futures = service.invokeAll(callables);
for (Future<Boolean> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
Logger.printMsg(true, "Exception", "ir.NLPAnalyzer.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
}
}
} catch (InterruptedException e) {
Logger.printMsg(true, "Exception", "ir.NLPAnalyzer.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
}
}
parallelizationEngine #3
private void parallelizationEngine(int cpuCoresNum,
ExecutorService service,
MongoCollection<Document> dbC_Events,
ArrayList<Document> unprocessedEvents) {
// update status of queried items
DBAgent.updateItemsStatus(dbC_Events, unprocessedEvents, FetchStatus.IN_PROCESS_FETCH.getID());
int itemsPerCore = unprocessedEvents.size() / cpuCoresNum;
// prevent infinite thread generation if num of unprocessed items is less than number of CPU cores
if (itemsPerCore == 0) {
itemsPerCore = 1;
}
int itNum = 1;
int upperIndex = 0;
ArrayList<Callable<Boolean>> callables = new ArrayList<>();
for (int i = 0; i < unprocessedEvents.size(); i += itemsPerCore) {
int indexFrom = i;
upperIndex += itemsPerCore;
int toIndex = upperIndex;
// all remaining items will be processed in the last thread
if (itNum == cpuCoresNum) {
toIndex = unprocessedEvents.size();
i = unprocessedEvents.size();
}
int indexTo = toIndex;
int treadID = itNum++;
callables.add(() -> {
Thread.currentThread().setName("im_evFullTextImg_#" + String.format("%03d", treadID));
this.fetcherEngine(dbC_Events, new ArrayList<>(unprocessedEvents.subList(indexFrom, indexTo)));
return true;
});
}
try {
// check if every thread has been processed correctly
List<Future<Boolean>> futures = service.invokeAll(callables);
for (Future<Boolean> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
Logger.printMsg(true, "Exception", "crawler.FullTextImgFetcher.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
}
}
} catch (InterruptedException e) {
Logger.printMsg(true, "Exception", "crawler.FullTextImgFetcher.parallelizationEngine(): ", e.getClass().getName() + " : " + e.getMessage());
}
}
As you may see, these three methods are quite similar, and I think it will be correct to refactor this code in order to eliminate extra code duplication.
The problem is that two of three methods are static
, and one is instance-based. Bearing in mind this fact, is there any way to extract the duplicated code to the separate entity and to pass the method I want to execute within callables.add()
as a parameter?
Solution
You can move your method to a Utils-class and pass instances of different Classes all implementing the same Interface.
e.g.
Based on your parallelizationEngine #3:
Utils:
public static void parallelizationEngine(int cpuCoresNum,
ExecutorService service,
MongoCollection<Document> dbC_Events,
ArrayList<Document> unprocessedEvents,
AddHandler addHandler) {
// update status of queried items
DBAgent.updateItemsStatus(dbC_Events, unprocessedEvents, FetchStatus.IN_PROCESS_FETCH.getID());
int itemsPerCore = unprocessedEvents.size() / cpuCoresNum;
// prevent infinite thread generation if num of unprocessed items is less than number of CPU cores
if (itemsPerCore == 0) {
itemsPerCore = 1;
}
int itNum = 1;
int upperIndex = 0;
ArrayList<Callable<Boolean>> callables = new ArrayList<>();
for (int i = 0; i < unprocessedEvents.size(); i += itemsPerCore) {
int indexFrom = i;
upperIndex += itemsPerCore;
int toIndex = upperIndex;
// all remaining items will be processed in the last thread
if (itNum == cpuCoresNum) {
toIndex = unprocessedEvents.size();
i = unprocessedEvents.size();
}
int indexTo = toIndex;
int treadID = itNum++;
callables.add(() -> {
Thread.currentThread().setName("im_metaDataFiller_#" + String.format("%03d", treadID));
addHandler.doSomething(); // HERE DEPENDING ON THE OPPARAMETER 'addHandler', DIFFERENT THINGS HAPPEN
return true;
});
}
try {
// check if every thread has been processed correctly
List<Future<Boolean>> futures = service.invokeAll(callables);
for (Future<Boolean> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
Logger.printMsg(true, "Exception", "dba.MetaDataFiller.performMetaDataComputing(): ", e.getClass().getName() + " : " + e.getMessage());
}
}
} catch (InterruptedException e) {
Logger.printMsg(true, "Exception", "dba.MetaDataFiller.performMetaDataComputing(): ", e.getClass().getName() + " : " + e.getMessage());
}
}
interface AddHandler{
public void doSometing();
}
And classes implementing the Interface:
class A implements AddHandler{
public void doSometing(){
.. so something
}
}
class B implements AddHandler{
public void doSometing(){
.. so something different form class A
}
}
Note you need to do more!
-
Because the number of parameters of your different parallelizationEngine(..) differ a bit you can write a universal method with all parameters needed by any implementation and than writhe warp this in methods with only the required parameters.
-
Because at the begin of your different parallelizationEngine(..) methods also differs a bit you can also extract this part to method of a parameter (like I showed for the part in the for-loop) to make all your implementations exactly the same so you can reduce them to only one implementation.