手写阻塞队列来实现消费者和生产者之间的联系。
首先,阻塞队列是一个队列,满足队列的基本数据结构,先进先出。其次,当队列满时,队列会阻塞插入元素的线程,直到队列不满;当队列空时,获取元素的线程会等待队列变为非空。
方法1:使用queue来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.CyclicBarrier;
public class MyBlockingQueue {
private final Queue<String> myQueue = new LinkedList<>();
private static final int MAXSIZE = 20; private static final int MINSIZE = 0;
public int getSize(){ return myQueue.size(); }
public void push(String str) throws Exception { synchronized (myQueue){
while(getSize() == MAXSIZE){ myQueue.wait(); }
myQueue.offer(str); System.out.println(Thread.currentThread().getName() + "放入元素" + str); myQueue.notify(); } }
public String pop() throws Exception { synchronized (myQueue){ String result = null;
while(getSize() == MINSIZE){ myQueue.wait(); } result = myQueue.poll();
System.out.println(Thread.currentThread().getName()+"取出了元素" + result); myQueue.notify();
return result; } }
public static void main(String args[]){
MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
CyclicBarrier barrier = new CyclicBarrier(2, ()->{ System.out.println("生产结束,下班了,消费者明天再来吧!"); });
new Thread(()->{ try { for(int i = 0; i < 50; i++){ myBlockingQueue.push("——" + i ); } barrier.await(); } catch (Exception e) { e.printStackTrace(); } },"生产者").start();
new Thread(()->{ try { for(int j = 0; j < 50; j++){ myBlockingQueue.pop(); } barrier.await(); } catch (Exception e) { e.printStackTrace(); } },"消费者").start();
} }
|
方法2,使用数组——sychronized+wait+notify
定义阻塞队列接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public interface BlockingQueue<E> {
boolean offer(E e);
E take();
int size(); }
|
基于Synchronized+wait()+notify()实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
@Slf4j public class SyncronizedBlockingQueue<E> implements BlockingQueue<E> {
private final Object notEmpty;
private final Object notFull;
private Object[] items;
private int takeIndex;
private int putIndex;
private int count;
public SyncronizedBlockingQueue(int capacity) { this(capacity, false); }
public SyncronizedBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) { new NullPointerException(); } this.items = new Object[capacity]; notEmpty = new Object(); notFull = new Object(); }
@Override public boolean offer(E obj) { if (obj == null) { throw new NullPointerException(); }
synchronized (notFull) { try { while (count == items.length) { log.info("队列已满,等待消费数据,size:{},items.length:{}", count, items.length); notFull.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } }
synchronized (notEmpty) { items[putIndex] = obj; if (++putIndex == items.length) { putIndex = 0; } count++; notEmpty.notify(); return true; } }
@Override public E take() { synchronized (notEmpty) { try { while (count == 0) { log.info("队列为空,等待生产数据,size:{},items.length:{}", count, items.length); notEmpty.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } }
synchronized (notFull) { Object obj = items[takeIndex]; if (++takeIndex == items.length) { takeIndex = 0; } count--; notFull.notify(); return (E) obj; } }
@Override public synchronized int size() { return this.count; } }
|
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public static void main(String[] args) { SyncronizedBlockingQueue<Object> blockingQueue = new SyncronizedBlockingQueue<>(5); new Thread(() -> { for (int i = 1; i <= 100; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } blockingQueue.offer(i); log.info("生产者生产了:{}", i); } }, "producer").start();
new Thread(() -> { for (int i = 1; i <= 100; i++) { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } Object take = blockingQueue.take(); log.info("消费者消费了:{}", take); } }, "consumer").start();
}
|
方法3:Condition+lock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
@Slf4j public class ReentrantLockBlockingQueue<E> implements BlockingQueue<E> {
private Object[] items;
private int takeIndex;
private int putIndex;
private int count;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty;
private Condition notFull;
public ReentrantLockBlockingQueue(int capacity) { this(capacity,false); }
public ReentrantLockBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) { new NullPointerException(); } lock = new ReentrantLock(fair); this.items = new Object[capacity]; notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
@Override public boolean offer(E obj) { if (obj == null) { throw new NullPointerException(); }
lock.lock(); try { while (count == items.length) { log.info("队列已满,等待消费数据,size:{},tab.length:{}", count, items.length); notFull.await(); }
items[putIndex] = obj; if (++putIndex == items.length) { putIndex = 0; } count++; notEmpty.signal(); return true; } catch (Exception e) { notEmpty.signal(); } finally { lock.unlock(); } return false; }
@Override public E take() { lock.lock(); try { while (count == 0) { log.info("队列为空,等待生产数据,size:{},tab.length:{}", count, items.length); notEmpty.await(); }
Object obj = items[takeIndex]; if (++takeIndex == items.length) { takeIndex = 0; } count--; notFull.signal(); return (E) obj; } catch (Exception e) { notFull.signal(); } finally { lock.unlock(); } return null; }
@Override public int size() { lock.lock(); try { return this.count; } finally { lock.unlock(); } } }
|