前言
在Java日常开发中很多同学很少或者几乎没有使用过wait notify notifyAll,但对这三个方法名都不会感到陌生,因为他们是Object类的3个本地方法。从名字可以看出他们是跟线程相关的。
作用介绍
wait:线程自动释放其占有的对象锁,并等待notify。
notify:唤醒一个正在wait当前对象锁的线程,并让它拿到对象锁。
notifyAll:唤醒所有正在wait前对象锁的线程。
值得注意的是当前线程必须要拿到这个对象的锁才能使用wait方法否则就会报错
1
| Exception in thread "main" java.lang.IllegalMonitorStateException
|
这也可以理解,作为Object类的本地方法,可以使用synchronized的机制保证了同一时间最多只能有1个线程能拿到对象锁。
代码演示
下面我们用 wait 和 notifyAll 来实现一个简单的发布订阅场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| static final LinkedList<String> list = new LinkedList<>();
public static void main(String[] args) throws InterruptedException { List<String> subscriberList = Arrays.asList("小明", "小虎", "小胖"); for (String name : subscriberList) { Thread t1 = new Thread(() -> subscriber(name)); t1.start(); }
List<String> publisherList = Arrays.asList("EDG夺冠了", "周董出新专辑了", "觉得这篇文章写得好"); for (String s : publisherList) { TimeUnit.SECONDS.sleep(3); publisher(s); } }
@SneakyThrows public static void subscriber(String name) { System.out.println(name + "订阅了通知"); synchronized (list) { while (true) { list.wait(); System.out.printf("%s收到消息[%s] %n", name, list.getLast()); } } }
public static void publisher(String message) { synchronized (list) { list.add(message); list.notifyAll(); } }
|
@SneakyThrows 需要引入Lombok
运行后输出如下
1 2 3 4 5 6 7 8 9 10 11 12
| 小明订阅了通知 小胖订阅了通知 小虎订阅了通知 小虎收到消息[EDG夺冠了] 小胖收到消息[EDG夺冠了] 小明收到消息[EDG夺冠了] 小明收到消息[周董出新专辑了] 小胖收到消息[周董出新专辑了] 小虎收到消息[周董出新专辑了] 小虎收到消息[觉得这篇文章写得好] 小胖收到消息[觉得这篇文章写得好] 小明收到消息[觉得这篇文章写得好]
|
可以看到每次收到消息的订阅者顺序是不同的,可以猜测到 notifyAll 唤醒所有线程后存在着竞争,具体唤醒哪一个线程由JVM来控制。
如果有N个wait在等待,使用notify会唤醒哪个线程呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { synchronized (list) { try { list.wait(); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread.start(); }
ScheduledExecutorService service = Executors.newScheduledThreadPool(1); service.schedule(() -> { synchronized (list) { list.notify(); } }, 3, TimeUnit.SECONDS);
|
输出结果为 Thread-0
总结
今天介绍了 wait notify notifyAll 也用代码演示了一个发布订阅的场景。代码写的比较简单,每个消费者只能消费最新的一条数据。如果存在消费失败,但又要保证每条数据都能被消费掉,代码又应该如何修改?
这里想到了RocketMQ 的设计,每个消费组都维护一个自己的消费标识,消费组中的消费者是竞争关系。
如果有写的不对的地方,欢迎指正。