[LinkedIn面试]implement the take() and put() of blocking queue


http://www.cnblogs.com/jxlgetter/p/4395115.html
http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
Use two conditions: fullCondition, emptyCondition.
http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
public class BlockingQueue {

  private List queue = new LinkedList();
  private int  limit = 10;

  public BlockingQueue(int limit){
    this.limit = limit;
  }


  public synchronized void enqueue(Object item)
  throws InterruptedException  {
    while(this.queue.size() == this.limit) {
      wait();
    }
    /** This is what comes from the post, but if you notify here, say a 'dequeue()' is waiting, the size would remain at 0 doesn't it? So I would call notify at the end
    if(this.queue.size() == 0) {
      notifyAll();
    }
    **/
    this.queue.add(item);
    notifyAll();
  }


  public synchronized Object dequeue()
  throws InterruptedException{
    while(this.queue.size() == 0){
      wait();
    }
    /** This is what comes from the post, but if you notify here, say a 'enqueue()' is waiting, the size would remain at the limit doesn't it? So I would call notify at the end

    if(this.queue.size() == this.limit){
      notifyAll();
    }
    **/

    Object ret = this.queue.remove(0);
    notifyAll();
    return ret;

  }

}
http://n00tc0d3r.blogspot.com/2013/08/implement-bounded-blocking-queue.html
We use two Reentrant Locks to replace the use of synchronized methods. With separate locks for put and take, a consumer and a producer can access the queue at the same time (if it is neither empty nor full). A reentrant lock provides the same basic behaviors as a Lock does by using synchronized methods and statements. Beyond that, it is owned by the thread last successfully locking and thus when the same thread invokes lock() again, it will return immediately without lock it again.

Together with lock, we use Condition to replace the object monitor (wait and notifyAll). A Condition instance is intrinsically bound to a lock. Thus, we can use it to signal threads that are waiting for the associated lock. Even better, multiple condition instances can be associated with one single lock and each instance will have its own wait-thread-set, which means instead of waking up all threads waiting for a lock, we can wake up a predefined subset of such threads. Similar to wait()Condition.await() can atomically release the associated lock and suspend the current thread.

We use Atomic Integer for the count of elements in the queue to ensure that the count will be updated atomically.
public class BoundedBlockingQueue<E> {  
   private final Queue<E> queue = new LinkedList<E>();  
   private final int capacity;  
   private final AtomicInteger count = new AtomicInteger(0);  
   
   private final ReentrantLock putLock = new ReentrantLock();  
   private final ReentrantLock takeLock = new ReentrantLock();  

   private final Condition notFull = putLock.newCondition();
   private final Condition notEmpty = takeLock.newCondition();

   public BoundedBlockingQueue(int capacity) {  
     if (capacity <= 0)  throw new InvalidArgumentException("The capacity of the queue must be > 0.");
     this.capacity = capacity;  
   }  
   
   public int size() {  
     return count.get();  
   }  
   
   public void add(E e) throws RuntimeException {  
     if (e == null) throw new NullPointerException("Null element is not allowed.");  
   
     int oldCount = -1;
     putLock.lock();  
     try {  
       // we use count as a wait condition although count isn't protected by a lock
       // since at this point all other put threads are blocked, count can only
       // decrease (via some take thread).
       while (count.get() == capacity) notFull.await();  
   
       queue.add(e);  
       oldCount = count.getAndIncrement();  
       if (oldCount + 1 < capacity) {
         notFull.signal(); // notify other producers for count change 
       }
     } finally {  
       putLock.unlock();  
     }  

     // notify other waiting consumers
     if (oldCount == 0) {
       takeLock.lock();
       try {
         notEmpty.signal();
       } finally {
         takeLock.unlock();
       }
     }
   }  
   
   public E remove() throws NoSuchElementException {  
     E e;  
   
     int oldCount = -1;
     takeLock.lock();  
     try {  
       while (count.get() == 0) notEmpty.await();  
   
       e = queue.remove();  
       oldCount = count.getAndDecrement();  
       if (oldCount > 1) { //??? ==1
         notEmpty.signal(); // notify other consumers for count change 
       }
     } finally {  
       takeLock.unlock();  
     }  

     // notify other waiting producers
     if (oldCount == capacity) {
       putLock.lock();
       try {
         notFull.signal();
       } finally {
         putLock.unlock();
       }
     }

     return e;
   } 
 
   /* Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty. */
   public E peek() {  
     if (count.get() == 0) return null;  
   
     takeLock.lock();  
     try {  
       return queue.peek();  
     } finally {  
       takeLock.unlock();  
     }  
   }  
 }  


One shortcoming of using synchronization is that it only allow one thread access the queue at the same time, either consumer or producer.)

Plus, we need to use notifyAll instead of notify since there could be multiple waiting producers and consumers and notify can wake up any thread which could be a producer or a consumer. This stackoverflow post gives a detailed example to explainnotify vs. notifyAll.

Notice that if the queue was empty before add or full before remove, we need to notify other waiting threads to unblock them.
We only need to emit such notifications in the above two cases since otherwise there cannot be any waiting threads.
public class BoundedBlockingQueue<E> {  
   private final Queue<E> queue = new LinkedList<E>();  
   private final int capacity;  
   private final AtomicInteger count = new AtomicInteger(0);  

   public BoundedBlockingQueue(int capacity) {  
     if (capacity <= 0)  throw new InvalidArgumentException("The capacity of the queue must be > 0.");
     this.capacity = capacity;  
   }  
   
   public int size() {  
     return count.get();  
   }  
   
   public synchronized void add(E e) throws RuntimeException {  
     if (e == null) throw new NullPointerException("Null element is not allowed.");  
   
     int oldCount = -1;
     while (count.get() == capacity) wait();  
   
     queue.add(e);  
     oldCount = count.getAndIncrement();  
     if (oldCount == 0) {
       notifyAll(); // notify other waiting threads (could be producers or consumers)  
     }
   }  
   
   public synchronized E remove() throws NoSuchElementException {  
     E e;  
   
     int oldCount = -1;
     while (count.get() == 0) wait();  
  
     e = queue.remove();  
     oldCount = count.getAndDecrement();  
     if (oldCount == this.capacity) {
       notifyAll(); // notify other waiting threads (could be producers or consumers)  
     }
     return e;
   } 
 
   /* Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty. */
   public E peek() {  
     if (count.get() == 0) return null;  
     synchronized(this) {
       return queue.peek();  
     }
   }  
 }  
http://baozitraining.org/blog/design-and-implement-a-blocking-queue/
Condition “Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods. Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true. Because access to this shared state information occurs in different threads, it must be protected, so a lock of some form is associated with the condition. The key property that waiting for a condition provides is that it atomically releases the associated lock and suspends the current thread, just like Object.wait. A Condition instance is intrinsically bound to a lock. To obtain a Condition instance for a particular Lock instance use its newCondition() method.”
We use two condition as two waiting queues where we put the suspended thread. One is notFull queue which contains all producer thread wait for the not full signal. notEmpty queue contains all consumer threads wait for the not empty signal. I am using another Lock to assure pushList can be finished atomically.
public class BoundedBlockingQueue<E> {

 private int capacity;
 private Queue<E> queue;
 private Lock lock = new ReentrantLock();
 private Lock pushLock = new ReentrantLock();
 private Condition notFull = this.lock.newCondition();
 private Condition notEmpty = this.lock.newCondition();
    
 // only initialize this queue once and throws Exception if the user is
 // trying to initialize it multiple t times.
 public void init(int capacity) throws Exception {
     this.lock.lock();
     try{
         if(this.queue == null){
             this.queue = new LinkedList<>();
             this.capacity = capacity;
         } else {
             throw new Exception();
         }
     }finally{
         this.lock.unlock();
     }
 }

 // throws Exception if the queue is not initialized
 public void push(E obj) throws Exception {
     this.pushLock.lock();
      this.lock.lock();
     try{
         while(this.capacity == this.queue.size())
             this.notFull.wait();
         this.queue.add(obj);
         this.notEmpty.notifyAll();
     }finally{
         this.lock.unlock();
         this.pushLock.lock();
     }
 }

 // throws Exception if the queue is not initialized
 public E pop() throws Exception {
     this.lock.lock();
     try{
         while(this.capacity==0)
             this.notEmpty.wait();
         E result = this.queue.poll();
         notFull.notifyAll();
         return result;
     }finally{
         this.lock.unlock();
     }
 }

 // implement a atomic putList function which can put a list of object
 // atomically. By atomically i mean the objs in the list should next to each
 // other in the queue. The size of the list could be larger than the queue
 // capacity.
 // throws Exception if the queue is not initialized
 public void pushList(List<E> objs) throws Exception {
     this.pushLock.lock();
     this.lock.lock();
     try{
         for(E obj : objs){
             while(this.queue.size() == this.capacity)
                 this.notFull.wait();
             this.queue.add(obj);
             this.notEmpty.notifyAll();
         }
     }finally{
         this.lock.unlock();
         this.pushLock.unlock();
     }
 }
}
http://baptiste-wicht.com/posts/2010/09/java-concurrency-part-5-monitors-locks-and-conditions.html
  1. The two methods are protected with the lock to ensure mutual exclusion
  2. Then we use two conditions variables. One to wait for the buffer to be not empty and an other one to wait for the buffer to be not full.
  3. You can see that I have wrapped the await operation on a while loop. This is to avoid signal stealers problem that can occurs when using Signal & Continue
public class BoundedBuffer {
    private final String[] buffer;
    private final int capacity;

    private int front;
    private int rear;
    private int count;

    private final Lock lock = new ReentrantLock();

    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBuffer(int capacity) {
        super();

        this.capacity = capacity;

        buffer = new String[capacity];
    }

    public void deposit(String data) throws InterruptedException {
        lock.lock();

        try {
            while (count == capacity) {
                notFull.await();
            }

            buffer[rear] = data;
            rear = (rear + 1) % capacity;
            count++;

            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public String fetch() throws InterruptedException {
        lock.lock();

        try {
            while (count == 0) {
                notEmpty.await();
            }

            String result = buffer[front];
            front = (front + 1) % capacity;
            count--;

            notFull.signal();

            return result;
        } finally {
            lock.unlock();
        }
    }
}
http://stackoverflow.com/questions/20110013/implement-your-own-blocking-queue-in-java

Labels

LeetCode (1432) GeeksforGeeks (1122) LeetCode - Review (1067) Review (882) Algorithm (668) to-do (609) Classic Algorithm (270) Google Interview (237) Classic Interview (222) Dynamic Programming (220) DP (186) Bit Algorithms (145) POJ (141) Math (137) Tree (132) LeetCode - Phone (129) EPI (122) Cracking Coding Interview (119) DFS (115) Difficult Algorithm (115) Lintcode (115) Different Solutions (110) Smart Algorithm (104) Binary Search (96) BFS (91) HackerRank (90) Binary Tree (86) Hard (79) Two Pointers (78) Stack (76) Company-Facebook (75) BST (72) Graph Algorithm (72) Time Complexity (69) Greedy Algorithm (68) Interval (63) Company - Google (62) Geometry Algorithm (61) Interview Corner (61) LeetCode - Extended (61) Union-Find (60) Trie (58) Advanced Data Structure (56) List (56) Priority Queue (53) Codility (52) ComProGuide (50) LeetCode Hard (50) Matrix (50) Bisection (48) Segment Tree (48) Sliding Window (48) USACO (46) Space Optimization (45) Company-Airbnb (41) Greedy (41) Mathematical Algorithm (41) Tree - Post-Order (41) ACM-ICPC (40) Algorithm Interview (40) Data Structure Design (40) Graph (40) Backtracking (39) Data Structure (39) Jobdu (39) Random (39) Codeforces (38) Knapsack (38) LeetCode - DP (38) Recursive Algorithm (38) String Algorithm (38) TopCoder (38) Sort (37) Introduction to Algorithms (36) Pre-Sort (36) Beauty of Programming (35) Must Known (34) Binary Search Tree (33) Follow Up (33) prismoskills (33) Palindrome (32) Permutation (31) Array (30) Google Code Jam (30) HDU (30) Array O(N) (29) Logic Thinking (29) Monotonic Stack (29) Puzzles (29) Code - Detail (27) Company-Zenefits (27) Microsoft 100 - July (27) Queue (27) Binary Indexed Trees (26) TreeMap (26) to-do-must (26) 1point3acres (25) GeeksQuiz (25) Merge Sort (25) Reverse Thinking (25) hihocoder (25) Company - LinkedIn (24) Hash (24) High Frequency (24) Summary (24) Divide and Conquer (23) Proof (23) Game Theory (22) Topological Sort (22) Lintcode - Review (21) Tree - Modification (21) Algorithm Game (20) CareerCup (20) Company - Twitter (20) DFS + Review (20) DP - Relation (20) Brain Teaser (19) DP - Tree (19) Left and Right Array (19) O(N) (19) Sweep Line (19) UVA (19) DP - Bit Masking (18) LeetCode - Thinking (18) KMP (17) LeetCode - TODO (17) Probabilities (17) Simulation (17) String Search (17) Codercareer (16) Company-Uber (16) Iterator (16) Number (16) O(1) Space (16) Shortest Path (16) itint5 (16) DFS+Cache (15) Dijkstra (15) Euclidean GCD (15) Heap (15) LeetCode - Hard (15) Majority (15) Number Theory (15) Rolling Hash (15) Tree Traversal (15) Brute Force (14) Bucket Sort (14) DP - Knapsack (14) DP - Probability (14) Difficult (14) Fast Power Algorithm (14) Pattern (14) Prefix Sum (14) TreeSet (14) Algorithm Videos (13) Amazon Interview (13) Basic Algorithm (13) Codechef (13) Combination (13) Computational Geometry (13) DP - Digit (13) LCA (13) LeetCode - DFS (13) Linked List (13) Long Increasing Sequence(LIS) (13) Math-Divisible (13) Reservoir Sampling (13) mitbbs (13) Algorithm - How To (12) Company - Microsoft (12) DP - Interval (12) DP - Multiple Relation (12) DP - Relation Optimization (12) LeetCode - Classic (12) Level Order Traversal (12) Prime (12) Pruning (12) Reconstruct Tree (12) Thinking (12) X Sum (12) AOJ (11) Bit Mask (11) Company-Snapchat (11) DP - Space Optimization (11) Dequeue (11) Graph DFS (11) MinMax (11) Miscs (11) Princeton (11) Quick Sort (11) Stack - Tree (11) 尺取法 (11) 挑战程序设计竞赛 (11) Coin Change (10) DFS+Backtracking (10) Facebook Hacker Cup (10) Fast Slow Pointers (10) HackerRank Easy (10) Interval Tree (10) Limited Range (10) Matrix - Traverse (10) Monotone Queue (10) SPOJ (10) Starting Point (10) States (10) Stock (10) Theory (10) Tutorialhorizon (10) Kadane - Extended (9) Mathblog (9) Max-Min Flow (9) Maze (9) Median (9) O(32N) (9) Quick Select (9) Stack Overflow (9) System Design (9) Tree - Conversion (9) Use XOR (9) Book Notes (8) Company-Amazon (8) DFS+BFS (8) DP - States (8) Expression (8) Longest Common Subsequence(LCS) (8) One Pass (8) Quadtrees (8) Traversal Once (8) Trie - Suffix (8) 穷竭搜索 (8) Algorithm Problem List (7) All Sub (7) Catalan Number (7) Cycle (7) DP - Cases (7) Facebook Interview (7) Fibonacci Numbers (7) Flood fill (7) Game Nim (7) Graph BFS (7) HackerRank Difficult (7) Hackerearth (7) Inversion (7) Kadane’s Algorithm (7) Manacher (7) Morris Traversal (7) Multiple Data Structures (7) Normalized Key (7) O(XN) (7) Radix Sort (7) Recursion (7) Sampling (7) Suffix Array (7) Tech-Queries (7) Tree - Serialization (7) Tree DP (7) Trie - Bit (7) 蓝桥杯 (7) Algorithm - Brain Teaser (6) BFS - Priority Queue (6) BFS - Unusual (6) Classic Data Structure Impl (6) DP - 2D (6) DP - Monotone Queue (6) DP - Unusual (6) DP-Space Optimization (6) Dutch Flag (6) How To (6) Interviewstreet (6) Knapsack - MultiplePack (6) Local MinMax (6) MST (6) Minimum Spanning Tree (6) Number - Reach (6) Parentheses (6) Pre-Sum (6) Probability (6) Programming Pearls (6) Rabin-Karp (6) Reverse (6) Scan from right (6) Schedule (6) Stream (6) Subset Sum (6) TSP (6) Xpost (6) n00tc0d3r (6) reddit (6) AI (5) Abbreviation (5) Anagram (5) Art Of Programming-July (5) Assumption (5) Bellman Ford (5) Big Data (5) Code - Solid (5) Code Kata (5) Codility-lessons (5) Coding (5) Company - WMware (5) Convex Hull (5) Crazyforcode (5) DFS - Multiple (5) DFS+DP (5) DP - Multi-Dimension (5) DP-Multiple Relation (5) Eulerian Cycle (5) Graph - Unusual (5) Graph Cycle (5) Hash Strategy (5) Immutability (5) Java (5) LogN (5) Manhattan Distance (5) Matrix Chain Multiplication (5) N Queens (5) Pre-Sort: Index (5) Quick Partition (5) Quora (5) Randomized Algorithms (5) Resources (5) Robot (5) SPFA(Shortest Path Faster Algorithm) (5) Shuffle (5) Sieve of Eratosthenes (5) Strongly Connected Components (5) Subarray Sum (5) Sudoku (5) Suffix Tree (5) Swap (5) Threaded (5) Tree - Creation (5) Warshall Floyd (5) Word Search (5) jiuzhang (5)

Popular Posts