Problem
I am trying to implement a shutdown method in queue implementation. I took the code from BlockingQueue source from Java and trying to check the shutdown method. Will the following code be thread safe and an acceptable implementation?
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue<E> {
/* Node encapsulating the object that needs to be stored */
static class Node<E>{
/* Make sure all the threads read / write are done on updated variable */
volatile E element;
Node<E> next;
Node(E value) { element = value;}
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicInteger shutdown = new AtomicInteger(0);
/* Need lock to make the thread re-entrant */
private final ReentrantLock putLock = new ReentrantLock(true);
private final ReentrantLock getLock = new ReentrantLock();
private final Condition queueNotEmpty = getLock.newCondition();
private final Condition queueNotFull = putLock.newCondition();
/* Make sure updates to head and tail variables are atomic
* May be redundant as I have already enclosed the updated to head and tail
* using locks.
*/
private AtomicReference<Node<E>> head, tail;
public BlockingQueue()
{
this.capacity = Integer.MAX_VALUE;
}
public BlockingQueue(int capacity){
if (capacity <= 0)
throw new IllegalArgumentException();
this.capacity = capacity;
head = tail = new AtomicReference<BlockingQueue.Node<E>>(null);
}
public void add(E e) throws InterruptedException, Exception{
if (e == null) throw new NullPointerException();
/* We need to wait if there is not enough space on the queue
*
*/
final ReentrantLock lock = this.putLock;
final AtomicInteger count = this.count;
int c =-1;
lock.lockInterruptibly();
try {
try {
while(count.get() == capacity)
queueNotFull.await();
if(shutdown.get() == 1)
throw new Exception();
} catch (InterruptedException exception) {
queueNotFull.signal();
throw exception;
}
// Add the element to the queue
Node<E> elem = new Node<E>(e);
Node<E> prevTail = tail.getAndSet(elem);
prevTail.next = elem;
c = count.incrementAndGet();
if (c + 1 < capacity)
queueNotFull.signal();
} finally {
lock.unlock();
}
if(c == 0)
signalNotEmpty();
}
/**
* Signals a waiting get. Called only from put/offer (which do not
* otherwise ordinarily lock getLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.getLock;
takeLock.lock();
try {
queueNotEmpty.signal();
} finally {
takeLock.unlock();
}
}
public void shutDown()
{
/* Get the locks */
putLock.lock();
getLock.lock();
try{
head.getAndSet(null);
tail.getAndSet(null);
shutdown.getAndSet(1);
//Signal all put and get threads
queueNotFull.signalAll();
queueNotEmpty.signalAll();
}finally {
putLock.unlock();
getLock.unlock();
}
}
}
Is there a better way to implement this method?
Solution
My immediate thought is that this is an unacceptable implementation. In particular, you seem to be trying to shutdown the threads accessing the queue by signally conditions, which I think confuses things badly.
See Concurrent Programming in Java, Second Edition by Doug Lea — especially Chapter 3 (“State Dependence”).
Consider this use case: you have a service, with multiple publishers (Runnables
) and consumers (also Runnables
); publishers offer items to the queue, consumers take items from it. There’s a UI, with a big red SHUTDOWN button on it. When the user hits that button, what does the user want?
You seem to be implementing the idea that the user wants to shut down the queue. But I put it to you that it’s more accurate that the user wants to shut down the service itself. The service itself maintains its own shutdown state, which the runnables can access. Invoking service shutdown then does two things – (1) it sets the shutdown flag, so that the producer and consumer flags can see the change, and (2) it interrupts the producers and consumers, so that they look at the shutdown flag.
So the usual idiom would be to run the publishers and consumers in an ExecutorService
, and after the shutdown flag has been set, ExecutorService.shutdownNow()
would be invoked to signal the interruption to the running threads. The threads that are parked on your conditions (queueNotFull.await()
) throw InterruptedExceptions
, and the handlers for these exceptions can then check the shutdown flag and do the right thing.
Condition.signalAll
doesn’t interrupt()
the threads, it just gets them contending on the lock again. When they get the lock, they just return from await; which means they check the while condition, and then put themselves back to sleep (Condition.await
) because it is still true.
In short, Locks
and Conditions
are for co-ordination; if you are trying to implement cancellation, you should be using interrupts.
Is there a better way to implement this method?
Probably, yes, but what constitutes better depends on your actual goal. VoiceOfUnreason has a point in considering that “shutting down” is perhaps not so much something that happens to a queue so much as any services using that queue. There’s already an answer covering that, so I’ll focus on two other things: considering alternatives, and reviewing the actual given code.
(I’m also going to state the obvious: this class is not complete. It allows adding elements, but not retrieving them. This limits the usefulness of a code review.)
Alternative: Decorator
Consider creating a new class, CloseableBlockingQueue, that handles the closing logic, and otherwise delegates to a provided BlockingQueue implementation (as per decorating).
- ✓ allows you to add the closeable feature as an extra layer without having to muck in other classes.
- ✓ can be used with any BlockingQueue implementation.
- ✗ may be less efficient in terms of runtime performance. Probably negligible.
Unless you have very compelling reasons to do as you currently are, I strongly recommend that you take the decorator approach. Correctness under concurrency is difficult to get right and even more difficult to test.
Code Review
throws Exception
+ throw new Exception()
— A more specific exception should be thrown(1), probably IllegalStateException("shutdown")
. Train yourself in learning some of the available exception types (or making your own); half the effort in making useful, useable classes is being informative when something goes wrong.
head = tail = new AtomicReference<BlockingQueue.Node<E>>(null);
— This is likely a bug: both head and tail now point to the same AtomicReference. This means that head.set(...)
affects tail
. Separate these.
In fact, I’m pretty sure they don’t even have to be atomic references at all. From the code we have, all access happens under an (exclusive) lock. The same can be said of shutdown
; could be a simple boolean. (Again, this is judging solely from the code up for review.)
I would add that the linear locking in shutDown()
may be dangerous, and should be a layered try-finally. The original code in the clear()
method of LinkedBlockingQueue takes the same approach, and I’d dare say it sets a bad example, but I admit the odds of it going wrong are vanishingly small.
Rather than using a shutDown()
method, consider implementing AutoCloseable, which has a close()
method with similar semantics.
count
is not set to 0 in shutDown()
. This means that, if the queue is full and threads are waiting when it is shut down, these threads will wait ‘forever’ (i.e. until interrupted somehow).
(1) Ever start a car and have a pip on the dashboard that’s suddenly on or blinking, and it has no icon or text or any indication about what it is, and you stare at it for a few seconds trying to figure it out before you grumble and have to find the 800-page manual and flip through it, and then find out the light means “parking brake is on”? That’s throw new Exception()
.