Java并发控制之ReentrantLock Condition的使用

生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。

其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。
问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。
同样地,当缓冲区已经空了,而消费者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。

ConditionObject 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供 waitsignal 方法。
其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

Condition 中,用 await() 替换 wait(),用 signal() 替换 notify(),用signalAll() 替换 notifyAll(),传统线程的通信方式,Condition 都可以实现,这里注意,Condition 是被绑定到 Lock 上的,要创建一个 LockCondition 必须用 newCondition() 方法。

例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class BoundedBuffer {
final Lock lock = new ReentrantLock();//锁对象
final Condition notFull = lock.newCondition();//写线程条件
final Condition notEmpty = lock.newCondition();//读线程条件

final Object[] items = new Object[100];//缓存队列
int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)//如果队列满了
notFull.await();//阻塞写线程
items[putptr] = x;//赋值
if (++putptr == items.length) putptr = 0;//如果写索引写到队列的最后一个位置了,那么置为0
++count;//个数++
notEmpty.signal();//唤醒读线程
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)//如果队列为空
notEmpty.await();//阻塞读线程
Object x = items[takeptr];//取值
if (++takeptr == items.length) takeptr = 0;//如果读索引读到队列的最后一个位置了,那么置为0
--count;//个数--
notFull.signal();//唤醒写线程
return x;
} finally {
lock.unlock();
}
}
}

这是一个处于多线程工作环境下的缓存区,缓存区提供了两个方法,puttakeput 是存数据,take 是取数据,内部有个缓存队列。

这个缓存区类实现的功能:有多个线程往里面存数据和从里面取数据,其缓存队列(先进先出后进后出)能缓存的最大数值是100,多个线程间是互斥的,当缓存队列中存储的值达到100时,将写线程阻塞,并唤醒读线程,当缓存队列中存储的值为0时,将读线程阻塞,并唤醒写线程,这也是 ArrayBlockingQueue 的内部实现。

下面分析一下代码的执行过程:

  1. 一个写线程执行,调用 put 方法;

  2. 判断 count 是否为100,显然没有100;

  3. 继续执行,存入值;

  4. 判断当前写入的索引位置++后,是否和100相等,相等将写入索引值变为0,并将count+1;

  5. 仅唤醒读线程阻塞队列中的一个;

  6. 一个读线程执行,调用take方法;

  7. ……

  8. 仅唤醒写线程阻塞队列中的一个。

这就是多个 Condition 的强大之处,假设缓存队列中已经存满,那么阻塞的肯定是写线程,唤醒的肯定是读线程,相反,阻塞的肯定是读线程,唤醒的肯定是写线程,那么假设只有一个Condition会有什么效果呢,缓存队列中已经存满,这个Lock不知道唤醒的是读线程还是写线程了,如果唤醒的是读线程,皆大欢喜,如果唤醒的是写线程,那么线程刚被唤醒,又被阻塞了,这时又去唤醒,这样就浪费了很多时间。

例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
static class NumberWrapper {
public int value = 1;
}

public static void main(String[] args) {
//初始化可重入锁
final Lock lock = new ReentrantLock();

//第一个条件当屏幕上输出到3
final Condition reachThreeCondition = lock.newCondition();
//第二个条件当屏幕上输出到6
final Condition reachSixCondition = lock.newCondition();

//NumberWrapper只是为了封装一个数字,一边可以将数字对象共享,并可以设置为final
//注意这里不要用Integer, Integer 是不可变对象
final NumberWrapper num = new NumberWrapper();
//初始化A线程
Thread threadA = new Thread(new Runnable() {
@Override
public void run() {
//需要先获得锁
lock.lock();
try {
System.out.println("threadA start write");
//A线程先输出前3个数
while (num.value <= 3) {
System.out.println(num.value);
num.value++;
}
//输出到3时要signal,告诉B线程可以开始了
reachThreeCondition.signal();
} finally {
lock.unlock();
}
lock.lock();
try {
//等待输出6的条件
reachSixCondition.await();
System.out.println("threadA start rewrite");
//输出剩余数字
while (num.value <= 9) {
System.out.println(num.value);
num.value++;
}

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

});


Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();

while (num.value <= 3) {
//等待3输出完毕的信号
reachThreeCondition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
try {
lock.lock();
//已经收到信号,开始输出4,5,6
System.out.println("threadB start write");
while (num.value <= 6) {
System.out.println(num.value);
num.value++;
}
//4,5,6输出完毕,告诉A线程6输出完了
reachSixCondition.signal();
} finally {
lock.unlock();
}
}

});

//启动两个线程
threadA.start();
threadB.start();
}
}

结果如下:

1
2
3
4
5
6
threadA start write
2
threadB start write
5
threadA start rewrite
8

基本思路就是首先要A线程先写1,2,3,这时候B线程应该等待 reachThredCondition 信号,而当A线程写完3之后就通过signal告诉B线程“我写到3了,该你了”,

这时候A线程要等 reachSixCondition 信号,同时B线程得到通知,开始写4,5,6,写完4,5,6之后B线程通知A线程 reachSixCondition 条件成立了,这时候A线程就开始写剩下的7,8,9了。

例3

Java官方提供的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
public static void main(String[] args) {
final BoundedBuffer boundedBuffer = new BoundedBuffer();

Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("t1 run");
for (int i=0;i<20;i++) {
try {
System.out.println("putting..");
boundedBuffer.put(Integer.valueOf(i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}) ;

Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i=0;i<20;i++) {
try {
Object val = boundedBuffer.take();
System.out.println(val);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}) ;

t1.start();
t2.start();
}

/**
* BoundedBuffer 是一个定长100的集合,当集合中没有元素时,take方法需要等待,直到有元素时才返回元素
* 当其中的元素数达到最大值时,要等待直到元素被take之后才执行put的操作
* @author yukaizhao
*
*/
static class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
System .out.println("put wait lock");
lock.lock();
System.out.println("put get lock");
try {
while (count == items.length) {
System.out.println("buffer full, please wait");
notFull.await();
}

items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
System.out.println("take wait lock");
lock.lock();
System.out.println("take get lock");
try {
while (count == 0) {
System.out.println("no elements, please wait");
notEmpty.await();
}
Object x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
}

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

t1 run
putting..
put wait lock
take wait lock
put get lock
putting..
put wait lock
take get lock
0
take wait lock
take get lock
no elements, please wait
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
1
take wait lock
put wait lock
take get lock
2
put get lock
take wait lock
take get lock
3
take wait lock
putting..
put wait lock
take get lock
4
take wait lock
put get lock
putting..
put wait lock
take get lock
5
take wait lock
put get lock
putting..
put wait lock
take get lock
6
take wait lock
put get lock
putting..
put wait lock
take get lock
7
take wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
take get lock
8
take wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
take get lock
9
take wait lock
put get lock
putting..
put wait lock
take get lock
put get lock
putting..
put wait lock
10
take wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
take get lock
11
take wait lock
take get lock
12
take wait lock
take get lock
13
take wait lock
take get lock
14
take wait lock
take get lock
15
take wait lock
take get lock
no elements, please wait
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
putting..
put wait lock
put get lock
16
take wait lock
take get lock
17
take wait lock
take get lock
18
take wait lock
take get lock
19

这个示例中 BoundedBuffer 是一个固定长度的集合,

这个在其 put 操作时,如果发现长度已经达到最大长度,那么要等待notFull信号才能继续 put,如果得到 notFull 信号会像集合中添加元素,并且 put 操作会发出 notEmpty 的信号,

而在其 take 方法中如果发现集合长度为空,那么会等待 notEmpty 的信号,接受到 notEmpty 信号才能继续 take,同时如果拿到一个元素,那么会发出 notFull 的信号。

如果采用 Object 类中的 wait(),notify(),notifyAll() 实现该缓冲区,当向缓冲区写入数据之后需要唤醒”读线程”时,不可能通过 notify()notifyAll() 明确的指定唤醒”读线程”,而只能通过 notifyAll 唤醒所有线程(但是 notifyAll 无法区分唤醒的线程是读线程,还是写线程)。 但是,通过 Condition,就能明确的指定唤醒读线程。

Condition 原理分析

ConditionObject 是同步器 AbstractQueuedSynchronizer 的内部类,因为 Condition 的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个 Condition 对象都包含着一个队列,该队列是 Condition 对象实现等待/通知功能的关键。下面将分析Condition 的实现,主要包括:等待队列、等待和通知。

等待队列

等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会构造成节点加入等待队列并进入等待状态,在 unlock() 方法后释放锁。
一个 Condition 包含一个等待队列,Condition 拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用 Condition.await() 方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如下图所示:

如图所示,Condition 拥有首尾节点的引用,而新增节点只需要将原有的尾节点 nextWaiter 指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用 await() 方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的 Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,其对应关系如下图所示:

等待

调用 Conditionawait() 方法,会使当前线程进入等待队列,同时线程状态变为等待状态,在调用 unlock() 方法后释放锁。当从 await() 方法返回时,当前线程一定获取了Condition 相关联的锁。如果从队列(同步队列和等待队列)的角度看 await() 方法,当调用 await() 方法时,相当于同步队列的首节点(获取了锁的节点)移动到 Condition 的等待队列中。调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用 Condition.signal() 方法唤醒,而是对等待线程进行中断,则会抛出 InterruptedException :

通知

调用 Conditionsignal() 方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。调用该方法的前置条件是当前线程必须获取了锁,接着获取等待队列的首节点,将其移动到同步队列。节点从等待队列移动到同步队列的过程如下图所示:

signal() 方法只是将 Condition 等待队列头结点移出队列,此时该线程节点还是阻塞的,同时将该节点的线程重新包装加入同步队列,当调用 unlock() 方法时,会唤醒同步队列的第二个节点,假如这个新节点是处于第二个位置,那么它将会被唤醒,否则,继续阻塞。

坚持原创技术分享,您的支持将鼓励我继续创作!