Problem
I have a batch job that contains a lot of items. I need to split these items into 4 categories – 1,2
category should be moved to a separate jobs as is; 3,4
categories job be additionally checked on validness and split into separate jobs – invalid items should be moved to a separate jobs.
Roughly diagram of the expected flow looks like this
For now I come up the prototype that looks like this
import java.util.*;
import java.util.stream.*;
public class MyClass {
public static void main(String args[]) {
Job job = new Job();
job.items = Arrays.asList(
new Item(ItemCategory.CAT_1, "1"),
new Item(ItemCategory.CAT_2, "2"),
new Item(ItemCategory.CAT_3, "3"),
new Item(ItemCategory.CAT_1, "4"),
new Item(ItemCategory.CAT_2, "5"),
new Item(ItemCategory.CAT_3, "6"),
new Item(ItemCategory.CAT_4, "7"),
new Item(ItemCategory.CAT_4, "8")
);
/*
[1:CAT_1, 4:CAT_1]
[2:CAT_2, 5:CAT_2]
[6:CAT_3]
[8:CAT_4]
[3:CAT_3, 7:CAT_4]
*/
List<List<Item>> categorizedItems = job.items.parallelStream().collect(
() -> Arrays.asList(new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>()), // last item is error aggregator
(list, item) -> {
switch (item.category) {
case CAT_1: list.get(0).add(item); break;
case CAT_2: list.get(1).add(item); break;
case CAT_3: {
if (isValid(item.id)) {
list.get(2).add(item);
} else {
list.get(4).add(item);
}
break;
}
case CAT_4: {
if (isValid(item.id)) {
list.get(3).add(item);
} else {
list.get(4).add(item);
}
break;
}
}
},
(list1, list2) -> {
list1.get(0).addAll(list2.get(0));
list1.get(1).addAll(list2.get(1));
list1.get(2).addAll(list2.get(2));
list1.get(3).addAll(list2.get(3));
list1.get(4).addAll(list2.get(4));
}
);
}
public static boolean isValid(String id) {
return Integer.valueOf(id) % 2 == 0;
}
public static class Item {
ItemCategory category;
String id;
Item(ItemCategory category, String id) {
this.id = id;
this.category = category;
}
@Override
public String toString() {
return id + ":" + category;
}
}
public enum ItemCategory {
CAT_1, CAT_2, CAT_3, CAT_4;
}
public static class Job {
List<Item> items;
}
}
Is it a good a idea to do everything in a single stream? Or for example it is better split into 4 categories first and then process 3,4 categories separately?
Solution
Your Collector
is reinventing the wheel. There already exists a collector which partitions the collected items into groups: Collectors.groupingBy
, which you could use something like:
.collect(
Collectors.groupingBy(
item -> (item.category == CAT_3 || item.category == CAT_4) && !isValid(item.id)
? CAT_5 : item.category)
)
)
Of course, this will return a Map<ItemCategory, List<Item>>
. You can transform this back to a list of lists, if needed.
And of course, you would need to add the CAT_5
to the ItemCategory
enum. Alternately, you could use null
at the category 5 key, if you don’t mind null
as a key value, but be warned that it will make some people’s skin crawl.
If the order of the items in the all of the groups (as opposed to just the Category 5 group) does not matter, then groupingByConcurrent
will give better parallel stream performance.