sync.Cond 源码剖析

  1. 带唯一 KEY 的锁
  2. 源码剖析
    1. 不允许拷贝的实现
    2. notifyList 的实现
    3. Cond 源码实现

Cond 是用作实现一个共享条件变量,一般会配合一个锁来使用,比如:Mutex 或者 RWMutex。当变量改变时,所有等待在共享变量上的协程被唤醒,然后争夺锁,并对共享变量进行操作,没有抢到锁的,则继续等待。

带唯一 KEY 的锁

比如,我们可以使用 Cond 来实现一个带唯一 KEY 的锁。

package utils

import "sync"

// Can be locked by unique ID
type KeyMutex struct {
    c *sync.Cond
    l sync.Locker
    s map[interface{}]struct{}
}

// Create new KeyMutex
func New() *KeyMutex {
    l := sync.Mutex{}
    return &KeyMutex{c: sync.NewCond(&l), l: &l, s: make(map[interface{}]struct{})}
}

func (km *KeyMutex) locked(key interface{}) (ok bool) { _, ok = km.s[key]; return }

// Lock KeyMutex by unique ID
func (km *KeyMutex) Lock(key interface{}) {
    km.l.Lock()
    defer km.l.Unlock()
    for km.locked(key) {
        km.c.Wait()
    }
    km.s[key] = struct{}{}
    return
}

// Unlock KeyMutex by unique ID
func (km *KeyMutex) Unlock(key interface{})  {
    km.l.Lock()
    defer km.l.Unlock()
    delete(km.s, key)
    km.c.Broadcast()
}

上面的代码唯一需要注意的就是在 Lock 函数中,使用了 for 循环来检查条件是否满足,这个是有原因的。Cond 本身只能支持对一个条件的变化进行等待和通知。而我们使用一个 Cond 实例,却管理了 N 个 Key 。所以,当我们在 Unlock 中调用 Broadcast 通知等待的协程时,其实,我们通知了所有 Key 的等待协程,而满足条件的只有一个 Key。这个时候,剩余的协程就要依靠 for 循环来重新进入等待序列中,等待它所属的 Key 被释放。

源码剖析

type Cond struct {
  // Cond 不允许被拷贝后使用
  // 此字段用来做静态检查
    noCopy noCopy

    // 配合使用的锁,锁持有时,才能查看或更改条件
  // 这个是一个 Interface,并不局限于使用 Mutex 或者 RWMutex
  // 但实际应用中,一般使用上述两个实现
    L Locker

  // 一个在 runtime 中实现的通知列表
  // 主要用来做协程的等待和通知
    notify  notifyList
  
  // Cond 不允许被拷贝后使用
  // 此变量用来做 runtime 检查是否被拷贝
    checker copyChecker
}

不允许拷贝的实现

在我们之前分享的源码剖析中,也有不允许拷贝的字段。但是大都只有 noCopy 字段,却没有 copyChecker

这是因为 noCopy 字段是用来做静态检查的。也就是说,如果你在实际使用中 copy 了一个带 noCopy 的组件,虽然有有隐患,但在运行时并不会阻止你这么做。这个字段只能在使用 go vet 时,才会做检查。

copyChecker 不同,它会在运行时进行检查,是否被 Copy 了,相当于更加高等级的 noCopy 检查。

但实现也很简单,如下:

// copyChecker 其实就是一个指针,它实际存储的是自己的指针地址
// 通过对比存储的指针地址和实际自己的指针地址来检查是不是
type copyChecker uintptr

func (c *copyChecker) check() {
  // 这一个 if 的作用是:
  // 如果当前实例的地址和存储的地址不一样
  // 那么假定存储的地址为 0(初始化值),然后使用 cas 将现在的实例地址替换进去
  // 如果没替换成功,而且当前实例地址和存储地址还是不一样,那么就认为是被 copy 过
    if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
        !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
        uintptr(*c) != uintptr(unsafe.Pointer(c)) {
        panic("sync.Cond is copied")
    }
}

至于为什么有的组件不使用 runtime 检查,可能是出于性能的考虑。毕竟需要在每一个操作之前都执行以下 check 操作。对于标准库来说,能省则省。

notifyList 的实现

notifyList 是一个独立的实现,通过4个方法来控制操作。

因为涉及的代码比较多,为了让我们主题不跑偏,不再详细介绍,只简单说一下流程

type notifyList struct {
    wait uint32
    notify uint32

    lock mutex
    head *sudog
    tail *sudog
}

// 下面的4个方法实际的实现在 runtime/sema.go 中

// 给 notifyList 中添加一个需要被通知的数量,返回一个 ticket 数字,用来后面等待时使用
// 给 notifyList.wait 中增加了计数,并按照 wait-1 作为 ticket 数字
func runtime_notifyListAdd(l *notifyList) uint32

// 调用这个必须先调用 上面的方法,拿到 ticket 数字,也就是这里的 t
// 这里会比对是否已经被通知过了,如果通知过了直接返回
// 如果没有,则会进入等待的链表中
// 并将自己的 goroutine 睡眠掉
func runtime_notifyListWait(l *notifyList, t uint32)

// 首先对比 wait 和 notify ,如果一样,说明没有新的等待的协程,返回
// 将 wait 赋值给 notify,清空链表指针
// 按照链表的顺序,依次唤醒协程
func runtime_notifyListNotifyAll(l *notifyList)

// 首先对比 wait 和 notify ,如果一样,说明没有新的等待的协程,返回
// 拿到 notidy 中的最后一个最为 ticket,notify +1
// 在链表中找到 ticket 对应的 goroutine 并唤醒
// 从链表中踢出 goroutine
func runtime_notifyListNotifyOne(l *notifyList)

上面的几个操作完成了 goroutine 的挂起和唤醒操作。是 runtime 帮助完成的。这样在 cond 中就不需要关心这一块的逻辑了。

实际上,这一段对于 goroutine 的挂起和唤醒操作在 channel 等一系列的源码中都是大同小异的操作,相差不大。

Cond 源码实现

当我们看完了上面两个小节的代码之后,Cond 已经不剩什么东西了,我们来看一下

// Wait 用来等待条件变化
// Wait 会自动释放 c.L,然后挂起当前的 goroutine; // 这里释放锁是要允许其他 goroutine 获取锁来操作条件变量
// Wait 被唤醒,并在返回之前,会重新将 Lock 锁上; // 对于调用这来说,对锁的使用行为一致
// Wait 没有被 Boardcast 和 Singal 唤醒之前,不会返回;
// 
func (c *Cond) Wait() {
  // runtime 检查 cond 实例是否被 copy 过
    c.checker.check()
  
  // 向 notify list 增加计数
    t := runtime_notifyListAdd(&c.notify)
  
  // 释放锁
  // 此时,可以允许其他 goroutine 发起通知
    c.L.Unlock()
  
  // 将 goroutine 加入等待链表
    runtime_notifyListWait(&c.notify, t)
  
  // 重新加锁
    c.L.Lock()
}

// Signal 唤醒一个等待的 goroutine,如果有的话
func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

// 唤醒所有正在等待的 goroutine
func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

至此,Cond 的源码剖析就结束了。

Cond 难以理解的地方在于 对于锁的使用时在多个地方同时使用的。也就造成了容易出现死锁,和难以理解的情况。

每个 goroutine 在操作或者检查条件变量的时候,都要获取锁。而在 Wait 函数中等待的时候又会将其释放掉,唤醒时再重新拿到。这样对于锁的分布操作,很容易造成死锁的情况。即便没有造成,如果没有深入理解,也会对阅读代码造成很大的困难。

这个是 Cond 的相对较难理解的地方。


署名-非商业性使用-相同方式共享 4.0