Java中的 wait notify notifyAll

前言

在Java日常开发中很多同学很少或者几乎没有使用过wait notify notifyAll,但对这三个方法名都不会感到陌生,因为他们是Object类的3个本地方法。从名字可以看出他们是跟线程相关的。

作用介绍

wait:线程自动释放其占有的对象锁,并等待notify。
notify:唤醒一个正在wait当前对象锁的线程,并让它拿到对象锁。
notifyAll:唤醒所有正在wait前对象锁的线程。

值得注意的是当前线程必须要拿到这个对象的锁才能使用wait方法否则就会报错

1
Exception in thread "main" java.lang.IllegalMonitorStateException

这也可以理解,作为Object类的本地方法,可以使用synchronized的机制保证了同一时间最多只能有1个线程能拿到对象锁。

代码演示

下面我们用 wait 和 notifyAll 来实现一个简单的发布订阅场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 定义一个全局的list保存消息
static final LinkedList<String> list = new LinkedList<>();

public static void main(String[] args) throws InterruptedException {
// 三个订阅者 订阅同一个主题
List<String> subscriberList = Arrays.asList("小明", "小虎", "小胖");
for (String name : subscriberList) {
Thread t1 = new Thread(() -> subscriber(name));
t1.start();
}

// 每隔3秒 发布一条消息
List<String> publisherList = Arrays.asList("EDG夺冠了", "周董出新专辑了", "觉得这篇文章写得好");
for (String s : publisherList) {
TimeUnit.SECONDS.sleep(3);
publisher(s);
}
}

// 订阅者
@SneakyThrows
public static void subscriber(String name) {
System.out.println(name + "订阅了通知");
synchronized (list) {
while (true) {
list.wait();
System.out.printf("%s收到消息[%s] %n", name, list.getLast());
}
}
}

// 发布者(主题)
public static void publisher(String message) {
synchronized (list) {
list.add(message);
list.notifyAll();
}
}

@SneakyThrows 需要引入Lombok

运行后输出如下

1
2
3
4
5
6
7
8
9
10
11
12
小明订阅了通知
小胖订阅了通知
小虎订阅了通知
小虎收到消息[EDG夺冠了]
小胖收到消息[EDG夺冠了]
小明收到消息[EDG夺冠了]
小明收到消息[周董出新专辑了]
小胖收到消息[周董出新专辑了]
小虎收到消息[周董出新专辑了]
小虎收到消息[觉得这篇文章写得好]
小胖收到消息[觉得这篇文章写得好]
小明收到消息[觉得这篇文章写得好]

可以看到每次收到消息的订阅者顺序是不同的,可以猜测到 notifyAll 唤醒所有线程后存在着竞争,具体唤醒哪一个线程由JVM来控制。

如果有N个wait在等待,使用notify会唤醒哪个线程呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
List<String> list = new ArrayList<>();
// 开启10个 wait 等待
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
synchronized (list) {
try {
list.wait();
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}

// 开启一个线程 延迟3秒后执行 notify() 保证前面10个线程都进入wait
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
service.schedule(() -> {
synchronized (list) {
list.notify();
}
}, 3, TimeUnit.SECONDS);

输出结果为 Thread-0

总结

今天介绍了 wait notify notifyAll 也用代码演示了一个发布订阅的场景。代码写的比较简单,每个消费者只能消费最新的一条数据。如果存在消费失败,但又要保证每条数据都能被消费掉,代码又应该如何修改?

这里想到了RocketMQ 的设计,每个消费组都维护一个自己的消费标识,消费组中的消费者是竞争关系。

如果有写的不对的地方,欢迎指正。