手写阻塞队列

手写阻塞队列来实现消费者和生产者之间的联系

首先,阻塞队列是一个队列,满足队列的基本数据结构,先进先出。其次,当队列满时,队列会阻塞插入元素的线程,直到队列不满;当队列空时,获取元素的线程会等待队列变为非空。

方法1:使用queue来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CyclicBarrier;

public class MyBlockingQueue {

//队列
private final Queue<String> myQueue = new LinkedList<>();

//最大长度
private static final int MAXSIZE = 20;
private static final int MINSIZE = 0;

//获取队列长度
public int getSize(){
return myQueue.size();
}

//生产者
public void push(String str) throws Exception {
//拿到对象锁
synchronized (myQueue){

//如果队列满了,则阻塞
while(getSize() == MAXSIZE){
myQueue.wait();
}

myQueue.offer(str);
System.out.println(Thread.currentThread().getName() + "放入元素" + str);
//唤醒消费者线程,消费者和生产者自己去竞争锁
myQueue.notify();
}
}

//消费者
public String pop() throws Exception {
synchronized (myQueue){
String result = null;

//队列为空则阻塞
while(getSize() == MINSIZE){
myQueue.wait();
}
//先进先出
result = myQueue.poll();

System.out.println(Thread.currentThread().getName()+"取出了元素" + result);
//唤醒生产者线程,消费者和生产者自己去竞争锁
myQueue.notify();

return result;
}
}

public static void main(String args[]){

MyBlockingQueue myBlockingQueue = new MyBlockingQueue();

//两个线程,都执行完成了打印
CyclicBarrier barrier = new CyclicBarrier(2, ()->{
System.out.println("生产结束,下班了,消费者明天再来吧!");
});

//生产者线程
new Thread(()->{
//50个辛勤的生产者循环向队列中添加元素
try {
for(int i = 0; i < 50; i++){
myBlockingQueue.push("——" + i );
}
//生产完了
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
},"生产者").start();

//消费者线程
new Thread(()->{
//50个白拿的消费者疯狂向队列中获取元素
try {
for(int j = 0; j < 50; j++){
myBlockingQueue.pop();
}
//消费完了
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
},"消费者").start();

}
}


方法2,使用数组——sychronized+wait+notify

定义阻塞队列接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface BlockingQueue<E> {
/**
* 往队列尾部添加一个元素,当队列满时阻塞当前线程
* @param e
*/
boolean offer(E e);

/**
* 从队列首部取走一个元素,当队列为空时阻塞当前线程
* @return
*/
E take();

// 返回队列中元素的数量
int size();
}

基于Synchronized+wait()+notify()实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/**
* 同步锁syncronized实现的阻塞队列
*/
@Slf4j
public class SyncronizedBlockingQueue<E> implements BlockingQueue<E> {
/**
* 读条件,队列为空时,用于阻塞读线程,唤醒写线程
*/
private final Object notEmpty;
/**
* 写条件,队列已满时,用于阻塞写线程,唤醒读线程
*/
private final Object notFull;
/**
* 队列容器
*/
private Object[] items;
/**
* 出队索引
*/
private int takeIndex;
/**
* 入队索引
*/
private int putIndex;
/**
* 元素数量
*/
private int count;



/**
* 容器初始化,默认为非公平锁
*
* @param capacity
*/
public SyncronizedBlockingQueue(int capacity) {
this(capacity, false);
}

/**
* 容器初始化,可设置重入锁的类型
*
* @param capacity
*/
public SyncronizedBlockingQueue(int capacity, boolean fair) {
//容器大小小于等于0,抛出空指针
if (capacity <= 0) {
new NullPointerException();
}
//初始化容器
this.items = new Object[capacity];
//非空时的条件(读)
notEmpty = new Object();
//非满时的条件(写)
notFull = new Object();
}


/**
* 往队列尾部添加一个元素,当队列满时阻塞当前线程
*
* @param obj
* @return
*/
@Override
public boolean offer(E obj) {
//入队元素为空,抛出异常
if (obj == null) {
throw new NullPointerException();
}

//1.队列已满
synchronized (notFull) {
try {
while (count == items.length) {
log.info("队列已满,等待消费数据,size:{},items.length:{}", count, items.length);
//阻塞写线程
notFull.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//2.队列未满
synchronized (notEmpty) {
//将当前元素当前入队索引处,即尾部
items[putIndex] = obj;
//写入元素后入队索引等于 容器大小,表明容器已满,将入队索引重置为0,从头开始(++putIndex:先加1在使用)
if (++putIndex == items.length) {
putIndex = 0;
}
//元素数量+1(count++:先使用再加1)
count++;
//唤醒读线程
notEmpty.notify();
return true;
}
}

/**
* 从队列首部取走一个元素,当队列为空时阻塞当前线程
*
* @return
*/
@Override
public E take() {
//1.队列为空
synchronized (notEmpty) {
try {
while (count == 0) {
log.info("队列为空,等待生产数据,size:{},items.length:{}", count, items.length);
//阻塞读线程
notEmpty.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//2.队列不为空
synchronized (notFull) {
//获取当前出队索引处的元素
Object obj = items[takeIndex];
//如果到了最后一个,将出队索引重置为0,则从头开始(++takeIndex:先加1在使用)
if (++takeIndex == items.length) {
takeIndex = 0;
}
//元素数量-1(count--:先使用再减1)
count--;
//唤醒写线程
notFull.notify();
//返回元素
return (E) obj;
}
}

/**
* 队列大小
* @return
*/
@Override
public synchronized int size() {
//获取锁
return this.count;
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) {
SyncronizedBlockingQueue<Object> blockingQueue = new SyncronizedBlockingQueue<>(5);
new Thread(() -> {
for (int i = 1; i <= 100; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.offer(i);
log.info("生产者生产了:{}", i);
}
}, "producer").start();

new Thread(() -> {
for (int i = 1; i <= 100; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object take = blockingQueue.take();
log.info("消费者消费了:{}", take);
}
}, "consumer").start();

}

方法3:Condition+lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/**
* 可重入锁ReentrantLock实现的阻塞队列
*/
@Slf4j
public class ReentrantLockBlockingQueue<E> implements BlockingQueue<E> {
/**
* 队列容器
*/
private Object[] items;
/**
* 出队索引
*/
private int takeIndex;
/**
* 入队索引
*/
private int putIndex;
/**
* 元素数量计算器:用于判定容器边界
*/
private int count;
/**
* 可重入锁
*/
private ReentrantLock lock = new ReentrantLock();
/**
* 读条件,队列为空时,用于阻塞读线程,唤醒写线程
*/
private Condition notEmpty;
/**
* 写条件,队列已满时,用于阻塞写线程,唤醒读线程
*/
private Condition notFull;


/**
* 容器初始化,默认为非公平锁
* @param capacity
*/
public ReentrantLockBlockingQueue(int capacity) {
this(capacity,false);
}

/**
* 容器初始化,可设置重入锁的类型
* @param capacity
*/
public ReentrantLockBlockingQueue(int capacity, boolean fair) {
//容器大小小于等于0,抛出空指针
if (capacity <= 0) {
new NullPointerException();
}
//初始化可重入锁
lock = new ReentrantLock(fair);
//初始化容器
this.items = new Object[capacity];
//非空时的条件(读)
notEmpty = lock.newCondition();
//非满时的条件(写)
notFull = lock.newCondition();
}



/**
* 往队列尾部添加一个元素,当队列满时阻塞当前线程
*
* @param obj
* @return
*/
@Override
public boolean offer(E obj) {
//入队元素为空,抛出异常
if (obj == null) {
throw new NullPointerException();
}

//获取锁
lock.lock();
try {
//1.队列已满
while (count == items.length) {
log.info("队列已满,等待消费数据,size:{},tab.length:{}", count, items.length);
//阻塞写线程
notFull.await();
}

//2.队列未满
//将当前元素当前入队索引处,即尾部
items[putIndex] = obj;
//写入元素后入队索引等于 容器大小,表明容器已满,将入队索引重置为0,从头开始(++putIndex:先加1再使用)
if (++putIndex == items.length) {
putIndex = 0;
}
//元素数量+1(count++:先使用再加1)
count++;
//唤醒读线程
notEmpty.signal();
return true;
} catch (Exception e) {
//唤醒读线程
notEmpty.signal();
} finally {
//释放锁
lock.unlock();
}
return false;
}

/**
* 从队列首部取走一个元素,当队列为空时阻塞当前线程
*
* @return
*/
@Override
public E take() {
//获取锁
lock.lock();
try {
//1.队列为空
while (count == 0) {
log.info("队列为空,等待生产数据,size:{},tab.length:{}", count, items.length);
//阻塞读线程
notEmpty.await();
}

//2.队列不为空
//获取当前出队索引处的元素
Object obj = items[takeIndex];
//如果到了最后一个,将出队索引重置为0,则从头开始(++takeIndex:先加1在使用)
if (++takeIndex == items.length) {
takeIndex = 0;
}
//元素数量-1(count--:先减1再使用)
count--;
//唤醒所有写线程
notFull.signal();
//返回元素
return (E) obj;
} catch (Exception e) {
//唤醒写线程
notFull.signal();
} finally {
//释放锁
lock.unlock();
}
return null;
}

/**
* 队列大小
* @return
*/
@Override
public int size() {
//获取锁
lock.lock();
try {
return this.count;
} finally {
lock.unlock();
}
}
}


手写阻塞队列
http://example.com/2023/04/12/手写阻塞队列/
作者
zlw
发布于
2023年4月12日
许可协议