Know it & why

知其然也知其所以然

0%

非阻塞的条件等待

条件变量一般场景

条件变量作为并发原语之一, 最常用的场景就是多个操作受到同一个条件约束时进行同步等待, 只有当条件满足时才能继续执行.

该场景的一般执行流程:

  1. 执行操作前先检查条件变量是否符合预期.
  2. 如果符合预期, 则开始进行相关操作并更新条件变量.
  3. 如果不符合预期, 则等待其它操作更新条件变量后, 跳转至步骤 1 再次进行条件变量检查.

阻塞式的条件等待

在介绍非阻塞的条件等待前, 先看看如何在 Java 中使用 Object.wait()Object.notifyAll() 实现阻塞式的条件等待需求.

以实现 Semaphore 为例, 下文展示如何使用 Object.wait()Object.notifyAll() 实现 Semaphore.

接口设计

1
2
3
4
5
6
7
public interface BlockingSemaphore {

void acquire() throws InterruptedException;

void release();

}

阻塞实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class BlockingSemaphoreImpl implements BlockingSemaphore {

private final Object lock = new Object();
private int availablePermits;

public BlockingSemaphoreImpl(int permits) {
this.availablePermits = permits;
}

@Override
public void acquire() throws InterruptedException {
synchronized (lock) {
while (availablePermits <= 0) {
// 当条件变量不满足时, 阻塞线程进行等待.
lock.wait();
}

availablePermits -= 1;
}
}

@Override
public void release() {
synchronized (lock) {
availablePermits += 1;
// 条件变量更新后, 通知所有等待者再次进行条件检查.
lock.notifyAll();
}
}

}

非阻塞的条件等待

上文展示了使用 Object.wait()Object.notifyAll() 实现阻塞式的 Semaphore. 但是如果在使用了 Kotlin 协程的挂起函数中使用阻塞式的 API 是不合适的, 会导致执行的线程被阻塞从而导致线程池的使用率降低. 挂起函数中应使用挂起替代阻塞, 这也是为什么需要非阻塞的条件等待的原因.

在实现条件等待时, 最关键的就是要实现条件变量不满足预期时的挂起等待和条件变量变更时的唤醒机制, Flow 就能实现该该需求. 下文将继续以 Semaphore 为例, 演示使用 MutableStateFlow 替代 Object.wait()Object.notifyAll() 实现一个非阻塞式的版本.

该版本的 Semaphore 仅用于演示非阻塞的条件等待应该如何实现, 如在生产项目中应使用官方给出的 Semaphore 实现.

接口设计

1
2
3
4
5
6
7
interface NonBlockingSemaphore {

suspend fun acquire()

suspend fun release()

}

非阻塞实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class NonBlockingSemaphoreImpl(permits: Int) : NonBlockingSemaphore {

private val mutex = Mutex()
private val state = MutableStateFlow(permits)

override suspend fun acquire() {
while (true) {
val notExpectValue = mutex.withLock {
if (state.value <= 0) {
// 条件不满足, 返回非预期的变量值.
return@withLock state.value
}

state.value = state.value - 1
return
}

// 挂起函数直到条件变量值发生变化.
state.first { it != notExpectValue }
}
}

override suspend fun release() {
mutex.withLock {
// 更新条件变量的值, 利用 Flow 的机制通知到所有挂起中的等待着.
state.value = state.value + 1
}
}

}