1.简介

sync.Cond 是基于互斥锁/读写锁实现的条件变量,用来协调想要访问共享资源的那些 Goroutine。当共享资源的状态发生变化的时候,sync.Cond 可以用来通知等待条件发生而阻塞的 Goroutine。

sync.Cond 基于互斥锁/读写锁,那它和互斥锁的区别是什么呢?

互斥锁 sync.Mutex 通常用来保护共享的临界资源,条件变量 sync.Cond 用来协调想要访问共享资源的 Goroutine。当共享资源的状态发生变化时,sync.Cond 可以用来通知被阻塞的 Goroutine。

2.使用场景

sync.Cond 经常用在多个 Goroutine 等待,一个 Goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。

我们想象一个非常简单的场景:

有一个协程在异步地接收数据,剩下的多个协程必须等待这个协程接收完数据,才能读取到正确的数据。在这种情况下,如果单纯使用 chan 或互斥锁,那么只能有一个协程可以等待,并读取到数据,没办法通知其他的协程也读取数据。

这个时候,就需要有个全局的变量来标志第一个协程数据是否接受完毕,剩下的协程,反复检查该变量的值,直到满足要求。或者创建多个 channel,每个协程阻塞在一个 channel 上,由接收数据的协程在数据接收完毕后,逐个通知。总之,需要额外的复杂度来完成这件事。

Go 语言在标准库 sync 中内置一个 sync.Cond 用来解决这类问题。

3.实现原理

sync.Cond 内部维护了一个等待队列,队列中存放的是所有在等待这个 sync.Cond 的 Go 程,即保存了一个通知列表。sync.Cond 可以用来唤醒一个或所有因等待条件变量而阻塞的 Go 程,以此来实现多个 Go 程间的同步。

sync.Cond 的定义如下:

// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
    noCopy noCopy

    // L is held while observing or changing the condition
    L Locker

    notify  notifyList
    checker copyChecker
}

每个 Cond 实例都会关联一个锁 L(互斥锁 Mutex,或读写锁 RWMutex),当修改条件或者调用 Wait 方法时,必须加锁。

sync.Cond 的四个成员函数定义如下:

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}

NewCond 创建 Cond 实例时,需要关联一个锁。

// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

Wait 用于阻塞调用者,等待通知。

调用 Wait 会自动释放锁 c.L,因为 Wait() 会将主调加入条件变量的通知列表,需要修改条件变量,所以主调在调用 Wait() 前需要对条件变量进行上锁,主调加入条件变量的通知列表后再解锁。

执行runtime_notifyListWait会挂起调用者所在的 Goroutine。如果其他协程调用了 Signal 或 Broadcast 唤醒了该协程,那么 Wait 方法在结束阻塞时,会重新给 c.L 加锁,并且继续执行 Wait 后面的代码。

主调对条件的检查,使用了 for !condition() 而非 if,是因为当前协程被唤醒时,条件不一定符合要求,需要再次 Wait 等待下次被唤醒。为了保险起,使用 for 能够确保被唤醒后条件一定符合后,再执行后续的代码。

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

Signal 只唤醒任意 1 个等待条件变量 c 的 goroutine,无需锁保护。Broadcast 唤醒所有等待条件变量 c 的 goroutine,无需锁保护。

4.使用示例

我们实现一个简单的例子,三个协程调用 Wait() 等待,另一个协程调用 Broadcast() 唤醒所有等待的协程。

var done = false

func read(name string, c *sync.Cond) {
    c.L.Lock()
    for !done {
        c.Wait()
    }
    fmt.Println(name, "starts reading")
    c.L.Unlock()
}

func write(name string, c *sync.Cond) {
    fmt.Println(name, "starts writing")
    time.Sleep(time.Second)
    done = true
    fmt.Println(name, "wakes all")
    c.Broadcast()
}

func main() {
    cond := sync.NewCond(&sync.Mutex{})

    go read("reader1", cond)
    go read("reader2", cond)
    go read("reader3", cond)
    write("writer", cond)

    time.Sleep(time.Second * 3)
}
  • done 即多个 Goroutine 阻塞等待的条件。
  • read() 调用 Wait() 等待通知,直到 done 为 true。
  • write() 接收数据,接收完成后,将 done 置为 true,调用 Broadcast() 通知所有等待的协程。
  • write() 中的暂停了 1s,一方面是模拟耗时,另一方面是确保前面的 3 个 read 协程都执行到 Wait(),处于等待状态。main 函数最后暂停了 3s,确保所有操作执行完毕。

运行结果如下,注意 reader 的打印顺序是随机的。

$ go run main.go
writer starts writing
writer wakes all
reader3 starts reading
reader1 starts reading
reader2 starts reading

更多关于 sync.Cond 的讨论可参考 How to correctly use sync.Cond? - StackOverflow

5.注意事项

  • sync.Cond 不能被复制

sync.Cond 不能被复制的原因,并不是因为其内部嵌套了 Locker。因为 NewCond 时传入的 Mutex/RWMutex 指针,对于 Mutex 指针复制是没有问题的。

主要原因是 sync.Cond 内部是维护着一个 Goroutine 通知队列 notifyList。如果这个队列被复制的话,那么就在并发场景下导致不同 Goroutine 之间操作的 notifyList.wait、notifyList.notify 并不是同一个,这会导致出现有些 Goroutine 会一直阻塞。

  • 唤醒顺序

从等待队列中按照顺序唤醒,先进入等待队列,先被唤醒。

  • 调用 Wait() 前要加锁

调用 Wait() 函数前,需要先获得条件变量的成员锁,原因是需要互斥地变更条件变量的等待队列。在 Wait() 返回前,会重新上锁。重新上锁的原因是主调在 Wait 后会进行解锁操作,避免重复解锁引发 panic。

  • sync.Cond 和 channel 的区别?

实际上,我们可以使用无缓冲 channel 充当条件变量实现 Go 程同步。通过 close(ch) 表示广播通知,其他的 Goroutine 使用 for select 结构来接收通知就行了。

还是以上面给出的同步示例,这里换作 channel 来实现。

var done = false
var ch = make(chan struct{})

func read(name string) {
    for !done {
        select {
        case <-ch:
        }
    }
    fmt.Println(name, "starts reading")
}

func write(name string) {
    fmt.Println(name, "starts writing")
    time.Sleep(time.Second)
    done = true
    fmt.Println(name, "wakes all")
    close(ch)
}

func main() {
    go read("reader1")
    go read("reader2")
    go read("reader3")
    write("writer")

    time.Sleep(time.Second * 3)
}

运行输出如下,注意 reader 的打印顺序是随机的。

$ go run main.go
writer starts writing
writer wakes all
reader1 starts reading
reader3 starts reading
reader2 starts reading

既然 channel 可以实现,看起来代码更加简洁,那么 sync.Cond 的存在还有必要吗?

实际上 sync.Cond 与 Channel 是有区别的,channel 定位于通信,用于一发一收的场景,sync.Cond 定位于同步,用于一发多收的场景。虽然 channel 可以通过 close 操作来达到一发多收的效果,但是 closed 的 channel 已无法继续使用,而 sync.Cond 依旧可以继续使用。这可能就是“全能”与“专精”的区别。

powered by Gitbook该文章修订时间: 2024-08-04 07:27:05

results matching ""

    No results matching ""