1.简介
sync.Cond 是基于互斥锁/读写锁实现的条件变量,用来协调想要访问共享资源的那些 Goroutine。当共享资源的状态发生变化的时候,sync.Cond 可以用来通知等待条件发生而阻塞的 Goroutine。
sync.Cond 基于互斥锁/读写锁,那它和互斥锁的区别是什么呢?
互斥锁 sync.Mutex 通常用来保护共享的临界资源,条件变量 sync.Cond 用来协调想要访问共享资源的 Goroutine。当共享资源的状态发生变化时,sync.Cond 可以用来通知被阻塞的 Goroutine。
2.使用场景
sync.Cond 经常用在多个 Goroutine 等待,一个 Goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。
我们想象一个非常简单的场景:
有一个协程在异步地接收数据,剩下的多个协程必须等待这个协程接收完数据,才能读取到正确的数据。在这种情况下,如果单纯使用 chan 或互斥锁,那么只能有一个协程可以等待,并读取到数据,没办法通知其他的协程也读取数据。
这个时候,就需要有个全局的变量来标志第一个协程数据是否接受完毕,剩下的协程,反复检查该变量的值,直到满足要求。或者创建多个 channel,每个协程阻塞在一个 channel 上,由接收数据的协程在数据接收完毕后,逐个通知。总之,需要额外的复杂度来完成这件事。
Go 语言在标准库 sync 中内置一个 sync.Cond 用来解决这类问题。
3.实现原理
sync.Cond 内部维护了一个等待队列,队列中存放的是所有在等待这个 sync.Cond 的 Go 程,即保存了一个通知列表。sync.Cond 可以用来唤醒一个或所有因等待条件变量而阻塞的 Go 程,以此来实现多个 Go 程间的同步。
sync.Cond 的定义如下:
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
每个 Cond 实例都会关联一个锁 L(互斥锁 Mutex,或读写锁 RWMutex),当修改条件或者调用 Wait 方法时,必须加锁。
sync.Cond 的四个成员函数定义如下:
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
NewCond 创建 Cond 实例时,需要关联一个锁。
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
Wait 用于阻塞调用者,等待通知。
调用 Wait 会自动释放锁 c.L,因为 Wait() 会将主调加入条件变量的通知列表,需要修改条件变量,所以主调在调用 Wait() 前需要对条件变量进行上锁,主调加入条件变量的通知列表后再解锁。
执行runtime_notifyListWait
会挂起调用者所在的 Goroutine。如果其他协程调用了 Signal 或 Broadcast 唤醒了该协程,那么 Wait 方法在结束阻塞时,会重新给 c.L 加锁,并且继续执行 Wait 后面的代码。
主调对条件的检查,使用了 for !condition() 而非 if,是因为当前协程被唤醒时,条件不一定符合要求,需要再次 Wait 等待下次被唤醒。为了保险起,使用 for 能够确保被唤醒后条件一定符合后,再执行后续的代码。
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
Signal 只唤醒任意 1 个等待条件变量 c 的 goroutine,无需锁保护。Broadcast 唤醒所有等待条件变量 c 的 goroutine,无需锁保护。
4.使用示例
我们实现一个简单的例子,三个协程调用 Wait() 等待,另一个协程调用 Broadcast() 唤醒所有等待的协程。
var done = false
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
c.Wait()
}
fmt.Println(name, "starts reading")
c.L.Unlock()
}
func write(name string, c *sync.Cond) {
fmt.Println(name, "starts writing")
time.Sleep(time.Second)
done = true
fmt.Println(name, "wakes all")
c.Broadcast()
}
func main() {
cond := sync.NewCond(&sync.Mutex{})
go read("reader1", cond)
go read("reader2", cond)
go read("reader3", cond)
write("writer", cond)
time.Sleep(time.Second * 3)
}
- done 即多个 Goroutine 阻塞等待的条件。
- read() 调用 Wait() 等待通知,直到 done 为 true。
- write() 接收数据,接收完成后,将 done 置为 true,调用 Broadcast() 通知所有等待的协程。
- write() 中的暂停了 1s,一方面是模拟耗时,另一方面是确保前面的 3 个 read 协程都执行到 Wait(),处于等待状态。main 函数最后暂停了 3s,确保所有操作执行完毕。
运行结果如下,注意 reader 的打印顺序是随机的。
$ go run main.go
writer starts writing
writer wakes all
reader3 starts reading
reader1 starts reading
reader2 starts reading
更多关于 sync.Cond 的讨论可参考 How to correctly use sync.Cond? - StackOverflow。
5.注意事项
- sync.Cond 不能被复制
sync.Cond 不能被复制的原因,并不是因为其内部嵌套了 Locker。因为 NewCond 时传入的 Mutex/RWMutex 指针,对于 Mutex 指针复制是没有问题的。
主要原因是 sync.Cond 内部是维护着一个 Goroutine 通知队列 notifyList。如果这个队列被复制的话,那么就在并发场景下导致不同 Goroutine 之间操作的 notifyList.wait、notifyList.notify 并不是同一个,这会导致出现有些 Goroutine 会一直阻塞。
- 唤醒顺序
从等待队列中按照顺序唤醒,先进入等待队列,先被唤醒。
- 调用 Wait() 前要加锁
调用 Wait() 函数前,需要先获得条件变量的成员锁,原因是需要互斥地变更条件变量的等待队列。在 Wait() 返回前,会重新上锁。重新上锁的原因是主调在 Wait 后会进行解锁操作,避免重复解锁引发 panic。
- sync.Cond 和 channel 的区别?
实际上,我们可以使用无缓冲 channel 充当条件变量实现 Go 程同步。通过 close(ch) 表示广播通知,其他的 Goroutine 使用 for select 结构来接收通知就行了。
还是以上面给出的同步示例,这里换作 channel 来实现。
var done = false
var ch = make(chan struct{})
func read(name string) {
for !done {
select {
case <-ch:
}
}
fmt.Println(name, "starts reading")
}
func write(name string) {
fmt.Println(name, "starts writing")
time.Sleep(time.Second)
done = true
fmt.Println(name, "wakes all")
close(ch)
}
func main() {
go read("reader1")
go read("reader2")
go read("reader3")
write("writer")
time.Sleep(time.Second * 3)
}
运行输出如下,注意 reader 的打印顺序是随机的。
$ go run main.go
writer starts writing
writer wakes all
reader1 starts reading
reader3 starts reading
reader2 starts reading
既然 channel 可以实现,看起来代码更加简洁,那么 sync.Cond 的存在还有必要吗?
实际上 sync.Cond 与 Channel 是有区别的,channel 定位于通信,用于一发一收的场景,sync.Cond 定位于同步,用于一发多收的场景。虽然 channel 可以通过 close 操作来达到一发多收的效果,但是 closed 的 channel 已无法继续使用,而 sync.Cond 依旧可以继续使用。这可能就是“全能”与“专精”的区别。