Problem
Having a look at my code, is there a way to be losing elements?
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class Main {
private final static Map<String, Set<String>> data = Maps.newConcurrentMap();
public static void main(String[] args) throws InterruptedException {
new Main().init("X");
}
public void init(String code) throws InterruptedException {
data.put(code, Sets.newConcurrentHashSet());
subscribeToDataFrom(code);
Set<String> elements = doHttpRequest();
data.get(code).addAll(elements);
data.get(code)
.stream()
.map(e -> e + "_MAPPED")
.forEach(System.out::println);
data.remove(code);
}
private void subscribeToDataFrom(String code) {
Runnable r = () -> {
while (true) {
onMessageFromProvider(code, UUID.randomUUID().toString());
}
};
new Thread(r).start();
new Thread(r).start();
new Thread(r).start();
new Thread(r).start();
new Thread(r).start();
}
public Set<String> doHttpRequest() throws InterruptedException {
Set<String> resultsToReturn = Sets.newHashSet();
resultsToReturn.add("B");
resultsToReturn.add("C");
resultsToReturn.add("D");
resultsToReturn.add("E");
resultsToReturn.add("F");
Thread.sleep(1000);
return resultsToReturn;
}
public void onMessageFromProvider(String code, String element) {
final Set<String> newSet = data.computeIfPresent(code, (k, v) -> {
v.add(element);
return v;
});
if (newSet == null) {
System.out.println("Ok, now I can do something different with: " + element);
}
}
}
I want to be able to execute the init code, get response from doHttpRequest
, and once we have it, merge those results with the streamed results by subscribeToDataFrom
, without losing any data.
Do I need any synchronised
keyword to guarantee that, or is my code thread-safe and there’s no way I’m losing any element coming from HTTP / Streaming?
Solution
There are some things to adress.
“synchronized”
Like you recognized: Using synchronized variants of certain maps or set classes will not get you away from the burden to define a proper “monitor”. Often these variants are not even neccessary to solve the problem so that the synchronization elements will only reduce performance without participating within the synchronisation neccessity. A monitor is an area of code execution that has some assertions to the behaviour of the code under concurrency situations. The simplest situation would be “restrict access to the monitor to only one thread”. So putting a synchronized-keyword “somewhere” will not solve the problem until your monitor is properly defined.
Responsibilities
You have to be clear about what are critical code fragments that should properly behave under concurrency. I do not want to say that I am THE reference but for me it took a while to interprete your code and identify what it is all about. Even (I would say especially) in concurrency we should be aware of clear responsibilities.
Busy waiting
Some other thing is “busy waiting” (while (true)). I reject those constructs. You should always provide a loop exit AND a mechanism to avoid busy waiting.
Usecase
As far as I understood your code you have a stream of elements from multiple arbitrary sources to print out or do whatever you want to do with it. It seems to me like a “Buffer” but currently I am not that sure about it.
Abstraction from element sources
Currently the elements of the stream come from 2 sources:
- A http request, returning a set of elements that are provvided asynchronously to the stream
- from multiple threads that provide message elements
My suggestion is to abstract from the sources where the elements come from. So the first responsibility will be a class that can asynchronously receive elements and output them in a sequence. Following algorithms now only need to depend on objects of this class:
public static class ElementJoiner {
private BlockingDeque<String> queue;
private String code;
public ElementJoiner(String code) {
this.queue = new LinkedBlockingDeque<>();
this.code = code;
}
public void registerElement(String element) {
try {
this.queue.put(element);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void registerElements(Collection<String> elements) {
elements.forEach(element -> registerElement(element));
}
public String getElement() {
try {
return this.queue.pollFirst(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
The internal deque will have a special behaviour if no elements are in the queue: If no element is available it will block a certain amount of time (here 5 seconds). Either a new element was insert that can be returned OR it will return null.
Abstract from Thread “this” and other threads
After all the only difference between registering elements from other “Threads” (UUIDs) and registering the elements of the http request is that you perform the registration in “this” Thread (the main-Thread). You only delay registration for a second (Thread.sleep(1000)).
So the task is to abstract from where elements are received. Furthermore it doesn’t matter from which thread the elements come from. We have to tell one thread…
- … where to receive or how to produce elements (supplier)
- … where to register the received or produced elements (consumer)
Now it is easy to formulate the start and behaviour of a thread:
private void startAsynchronousElementSource(Consumer<Set<String>> consumer, Supplier<Set<String>> supplier) {
Runnable r = () -> {
while (true) {
consumer.accept(supplier.get());
}
};
new Thread(r).start();
}
You can see a high symmetry as we input a set of elements and pass these elements immediately to the consumer. The only thing we define here is what to do asynchronously (maybe starting the thread, but I ignore that for now).
Stream preparation
We have “ElementJoiner”-class that does pretty well. Now we want it to work together with a stream. A standard stream cannot exit as long as there is no indicator for exiting. If you have a Ranged-Stream you limit the stream through the boundaries defined. The stream we want to have has a special indicator so we have to define a so called Spliterator.
public class ElementSpliterator extends Spliterators.AbstractSpliterator<String> {
private Supplier<String> elementSupplier;
protected ElementSpliterator(Supplier<String> elementSupplier) {
super(Long.MAX_VALUE, Spliterator.SIZED);
this.elementSupplier = elementSupplier;
}
@Override
public boolean tryAdvance(Consumer<? super String> action) {
String element = elementSupplier.get();
if (element != null) {
action.accept(element);
return true;
} else {
return false;
}
}
}
The tryAdvance-method handles the case if the supplier returns null to end the stream.
The supplier in our case will be the “ElementJoiner” which getElement-method will return null if the timeout has passed.
Defining simulation data
Here I harmonized the return value to be a Set.
private Set<String> getElementsFromOtherSource() {
return Collections.singleton(UUID.randomUUID().toString());
}
public Set<String> getElementsFromHttpRequest(){
Set<String> resultsFromHttpRequest = new HashSet<>();
resultsFromHttpRequest.add("B");
resultsFromHttpRequest.add("C");
resultsFromHttpRequest.add("D");
resultsFromHttpRequest.add("E");
resultsFromHttpRequest.add("F");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return resultsFromHttpRequest;
}
Reformulating the init-method
I don’t think we need the “code”-attribute in this use case neither the data-HashMap.
The first thing we will do is instantiating the ElementJoiner:
ElementJoiner elementJoiner = new ElementJoiner(forCode);
Then we can instantiate the stream with the “elementJoiner” as an element supplier AND the ElementSpliterator with the handle to identify the stream end.
Stream<String> elementsStream = StreamSupport.stream(new ElementSpliterator(elementJoiner::getElement), true /* parallel */);
After that we can start to produce elements asynchronously that will be registered in the “elementJoiner”. You see that there is no difference in formulating the element source for the “other” and the “http request”.
// Other sources
Consumer<Set<String>> consumer = elements -> elementJoiner.registerElements(elements);
Supplier<Set<String>> otherSupplier = () -> getElementsFromOtherSource();
startAsynchronousElementSource(consumer, otherSupplier);
startAsynchronousElementSource(consumer, otherSupplier);
startAsynchronousElementSource(consumer, otherSupplier);
startAsynchronousElementSource(consumer, otherSupplier);
// http request source, while other source already supply elements
Supplier<Set<String>> httpRequestSupplier = () -> getElementsFromHttpRequest();
startAsynchronousElementSource(consumer, httpRequestSupplier);
The final statement produces the output on the console:
elementsStream.map(e -> e + "_MAPPED").forEach(System.out::println);
Where is the synchronizsation???
The work of synchronisation is done by the “LinkedBlockingDeque”. As we only delegate to ONE synchronized method of the “LinkedBlockingDeque” in “getElement” and “registerElement” we do not need our own synchronisation.