http://www.cnblogs.com/jxlgetter/p/4395115.html
http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
Use two conditions: fullCondition, emptyCondition.
http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
We use two Reentrant Locks to replace the use of synchronized methods. With separate locks for put and take, a consumer and a producer can access the queue at the same time (if it is neither empty nor full). A reentrant lock provides the same basic behaviors as a Lock does by using synchronized methods and statements. Beyond that, it is owned by the thread last successfully locking and thus when the same thread invokes
Together with lock, we use Condition to replace the object monitor (wait and notifyAll). A Condition instance is intrinsically bound to a lock. Thus, we can use it to signal threads that are waiting for the associated lock. Even better, multiple condition instances can be associated with one single lock and each instance will have its own wait-thread-set, which means instead of waking up all threads waiting for a lock, we can wake up a predefined subset of such threads. Similar to
We use Atomic Integer for the count of elements in the queue to ensure that the count will be updated atomically.
One shortcoming of using synchronization is that it only allow one thread access the queue at the same time, either consumer or producer.)
Plus, we need to use
Notice that if the queue was empty before
We only need to emit such notifications in the above two cases since otherwise there cannot be any waiting threads.
http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
Use two conditions: fullCondition, emptyCondition.
http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
public class BlockingQueue {
private List queue = new LinkedList();
private int limit = 10;
public BlockingQueue(int limit){
this.limit = limit;
}
public synchronized void enqueue(Object item)
throws InterruptedException {
while(this.queue.size() == this.limit) {
wait();
}
/** This is what comes from the post, but if you notify here, say a 'dequeue()' is waiting, the size would remain at 0 doesn't it? So I would call notify at the end
if(this.queue.size() == 0) {
notifyAll();
}
**/
this.queue.add(item);
notifyAll();
}
public synchronized Object dequeue()
throws InterruptedException{
while(this.queue.size() == 0){
wait();
}
/** This is what comes from the post, but if you notify here, say a 'enqueue()' is waiting, the size would remain at the limit doesn't it? So I would call notify at the end
if(this.queue.size() == this.limit){
notifyAll();
}
**/
Object ret = this.queue.remove(0);
notifyAll();
return ret;
}
}
http://n00tc0d3r.blogspot.com/2013/08/implement-bounded-blocking-queue.htmlWe use two Reentrant Locks to replace the use of synchronized methods. With separate locks for put and take, a consumer and a producer can access the queue at the same time (if it is neither empty nor full). A reentrant lock provides the same basic behaviors as a Lock does by using synchronized methods and statements. Beyond that, it is owned by the thread last successfully locking and thus when the same thread invokes
lock()
again, it will return immediately without lock it again.Together with lock, we use Condition to replace the object monitor (wait and notifyAll). A Condition instance is intrinsically bound to a lock. Thus, we can use it to signal threads that are waiting for the associated lock. Even better, multiple condition instances can be associated with one single lock and each instance will have its own wait-thread-set, which means instead of waking up all threads waiting for a lock, we can wake up a predefined subset of such threads. Similar to
wait()
, Condition.await()
can atomically release the associated lock and suspend the current thread.We use Atomic Integer for the count of elements in the queue to ensure that the count will be updated atomically.
public class BoundedBlockingQueue<E> {
private final Queue<E> queue = new LinkedList<E>();
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final ReentrantLock putLock = new ReentrantLock();
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final Condition notEmpty = takeLock.newCondition();
public BoundedBlockingQueue(int capacity) {
if (capacity <= 0) throw new InvalidArgumentException("The capacity of the queue must be > 0.");
this.capacity = capacity;
}
public int size() {
return count.get();
}
public void add(E e) throws RuntimeException {
if (e == null) throw new NullPointerException("Null element is not allowed.");
int oldCount = -1;
putLock.lock();
try {
// we use count as a wait condition although count isn't protected by a lock
// since at this point all other put threads are blocked, count can only
// decrease (via some take thread).
while (count.get() == capacity) notFull.await();
queue.add(e);
oldCount = count.getAndIncrement();
if (oldCount + 1 < capacity) {
notFull.signal(); // notify other producers for count change
}
} finally {
putLock.unlock();
}
// notify other waiting consumers
if (oldCount == 0) {
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
}
public E remove() throws NoSuchElementException {
E e;
int oldCount = -1;
takeLock.lock();
try {
while (count.get() == 0) notEmpty.await();
e = queue.remove();
oldCount = count.getAndDecrement();
if (oldCount > 1) { //??? ==1
notEmpty.signal(); // notify other consumers for count change
}
} finally {
takeLock.unlock();
}
// notify other waiting producers
if (oldCount == capacity) {
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
return e;
}
/* Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty. */
public E peek() {
if (count.get() == 0) return null;
takeLock.lock();
try {
return queue.peek();
} finally {
takeLock.unlock();
}
}
}
One shortcoming of using synchronization is that it only allow one thread access the queue at the same time, either consumer or producer.)
Plus, we need to use
notifyAll
instead of notify
since there could be multiple waiting producers and consumers and notify
can wake up any thread which could be a producer or a consumer. This stackoverflow post gives a detailed example to explainnotify vs. notifyAll.Notice that if the queue was empty before
add
or full before remove
, we need to notify other waiting threads to unblock them.We only need to emit such notifications in the above two cases since otherwise there cannot be any waiting threads.
public class BoundedBlockingQueue<E> {
private final Queue<E> queue = new LinkedList<E>();
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
public BoundedBlockingQueue(int capacity) {
if (capacity <= 0) throw new InvalidArgumentException("The capacity of the queue must be > 0.");
this.capacity = capacity;
}
public int size() {
return count.get();
}
public synchronized void add(E e) throws RuntimeException {
if (e == null) throw new NullPointerException("Null element is not allowed.");
int oldCount = -1;
while (count.get() == capacity) wait();
queue.add(e);
oldCount = count.getAndIncrement();
if (oldCount == 0) {
notifyAll(); // notify other waiting threads (could be producers or consumers)
}
}
public synchronized E remove() throws NoSuchElementException {
E e;
int oldCount = -1;
while (count.get() == 0) wait();
e = queue.remove();
oldCount = count.getAndDecrement();
if (oldCount == this.capacity) {
notifyAll(); // notify other waiting threads (could be producers or consumers)
}
return e;
}
/* Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty. */
public E peek() {
if (count.get() == 0) return null;
synchronized(this) {
return queue.peek();
}
}
}
http://baozitraining.org/blog/design-and-implement-a-blocking-queue/
Condition “Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods. Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true. Because access to this shared state information occurs in different threads, it must be protected, so a lock of some form is associated with the condition. The key property that waiting for a condition provides is that it atomically releases the associated lock and suspends the current thread, just like Object.wait. A Condition instance is intrinsically bound to a lock. To obtain a Condition instance for a particular Lock instance use its newCondition() method.”
We use two condition as two waiting queues where we put the suspended thread. One is notFull queue which contains all producer thread wait for the not full signal. notEmpty queue contains all consumer threads wait for the not empty signal. I am using another Lock to assure pushList can be finished atomically.
public class BoundedBlockingQueue<E> { private int capacity; private Queue<E> queue; private Lock lock = new ReentrantLock(); private Lock pushLock = new ReentrantLock(); private Condition notFull = this.lock.newCondition(); private Condition notEmpty = this.lock.newCondition(); // only initialize this queue once and throws Exception if the user is // trying to initialize it multiple t times. public void init(int capacity) throws Exception { this.lock.lock(); try{ if(this.queue == null){ this.queue = new LinkedList<>(); this.capacity = capacity; } else { throw new Exception(); } }finally{ this.lock.unlock(); } } // throws Exception if the queue is not initialized public void push(E obj) throws Exception { this.pushLock.lock(); this.lock.lock(); try{ while(this.capacity == this.queue.size()) this.notFull.wait(); this.queue.add(obj); this.notEmpty.notifyAll(); }finally{ this.lock.unlock(); this.pushLock.lock(); } } // throws Exception if the queue is not initialized public E pop() throws Exception { this.lock.lock(); try{ while(this.capacity==0) this.notEmpty.wait(); E result = this.queue.poll(); notFull.notifyAll(); return result; }finally{ this.lock.unlock(); } } // implement a atomic putList function which can put a list of object // atomically. By atomically i mean the objs in the list should next to each // other in the queue. The size of the list could be larger than the queue // capacity. // throws Exception if the queue is not initialized public void pushList(List<E> objs) throws Exception { this.pushLock.lock(); this.lock.lock(); try{ for(E obj : objs){ while(this.queue.size() == this.capacity) this.notFull.wait(); this.queue.add(obj); this.notEmpty.notifyAll(); } }finally{ this.lock.unlock(); this.pushLock.unlock(); } } }http://baptiste-wicht.com/posts/2010/09/java-concurrency-part-5-monitors-locks-and-conditions.html
- The two methods are protected with the lock to ensure mutual exclusion
- Then we use two conditions variables. One to wait for the buffer to be not empty and an other one to wait for the buffer to be not full.
- You can see that I have wrapped the await operation on a while loop. This is to avoid signal stealers problem that can occurs when using Signal & Continue
public class BoundedBuffer { private final String[] buffer; private final int capacity; private int front; private int rear; private int count; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public BoundedBuffer(int capacity) { super(); this.capacity = capacity; buffer = new String[capacity]; } public void deposit(String data) throws InterruptedException { lock.lock(); try { while (count == capacity) { notFull.await(); } buffer[rear] = data; rear = (rear + 1) % capacity; count++; notEmpty.signal(); } finally { lock.unlock(); } } public String fetch() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } String result = buffer[front]; front = (front + 1) % capacity; count--; notFull.signal(); return result; } finally { lock.unlock(); } } }http://stackoverflow.com/questions/20110013/implement-your-own-blocking-queue-in-java