sync.RWMutex 源码剖析

  1. 读写锁逻辑
  2. 读写锁实现

读写锁是 Golang sync 包中提供的另一种锁。

Mutex 因为每一次都要等待其他人释放锁之后才能对竞争资源进行操作,不管是读还是写。但我们在读的时候其实并不需要这么严格的加锁。只要保证当时没有其他协程在写即可。

所以,一个分开读写的锁就变得非常必要。

读写锁的原理,其实很简单,就是对“读”和“写”两种操作分别进行加锁,但是“写锁”的操作优先级更加高。

读写锁逻辑

Golang 的 RWMutex 遵循了两个重要的逻辑:

  1. 如果当前有协程在读,那么,写锁必须等待所有读协程结束之后,才能进行实际写操作;
  2. 如果当前有协程在写,那么,读锁必须等待写完成之后,才能进行读操作;

读锁和写锁通过信号量进行协调,还有两个计数器用来记录读锁的使用数量。

读写锁实现

所有的代码都去掉了 “竞态检查”相关的内容。

type RWMutex struct {
    w           Mutex  // 写锁直接使用 Mutex 来实现,以保证写的唯一性
    writerSem   uint32 // 写的信号量,当有读锁被使用时,写就等待在这个信号量上,等待所有的读锁释放
    readerSem   uint32 // 读的信号量,当写锁锁定,有读请求时,会等待在此信号量上,写锁释放时,会释放信号量
    readerCount int32  // 正在读的计数器
    readerWait  int32  // 等待读的计数器
}

上面的结构就是读写锁的结构体,里面有两个计数器,两个信号量,一个Mutex,通过这几个变量来控制整个读写锁的运作。

读写锁的读加锁写加锁读释放写释放 四个操作相互都有关联,很难单独来讲。但是我们可以将其每一个步骤拆解开来,慢慢解说。

读加锁

func (rw *RWMutex) RLock() {
  // 给 readerCount 加 1,表示新增一个读
  // 如果加完之后的结果是负数,表示已经有写锁被锁定了
  // 那么就等在 readerSem 的信号量上,如果写锁释放,则信号量则也会被释放
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // 等待信号量
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
  // 如果不需要等待信号量,那么到此就认为读锁已经加锁完毕
}

上面是读加锁的源码。在 RWMutex 中,在 readerCount 中添加一个数量,就代表读锁加锁完成。

如果在给 readerCount 添加1之后,变成了负数,那么证明有写锁已经上锁了。此时,只能等在信号量上。等待写锁释放。

写加锁

func (rw *RWMutex) Lock() {

    // 直接现在 Mutex 加锁,这样保证其他的写不会同时加锁。
    rw.w.Lock()
  
    // 给 readerCount 加上一个很大的负数 rwmutexMaxReaders,此时 readerCount 已经变成了负数。
  // 然后,再将原来的内容恢复,赋值给变量 r。
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    
  // 如果 r 不等于 0,就是此时还有读在进行
  // 那么,就将 r 加到 readerWait 上面,表示需要等待的读数量
  // 然后,等待在 writerSem 的信号量上
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

上面是写加锁的过程。这里面用到了 Mutex 和 2个计数器,还有 1个信号量。

在 Mutex 拿到锁之后,第一件事就是给readerCount增加一个很大的负数,此时,readerCount 变成了负数。还记得 读加锁 中的内容吗?在那里,如果 readerCount 是一个负数的话,那么,读就要等待在 readerSem 中,不能直接拿到读锁。

读释放锁

func (rw *RWMutex) RUnlock() {
  // 读释放锁,先给 readerCount 减去1
  // 如果还是负数,那么就认为有写锁在等待
  // 此时,要进入慢路径的解锁方式
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        rw.rUnlockSlow(r)
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
  
  // 当没有加锁,直接解锁时,会进入此分支
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        race.Enable()
        throw("sync: RUnlock of unlocked RWMutex")
    }
  
    // 给 readerWait 减去 1
  // 如果自己是最后一个读的话,那么就释放 writerSem
  // 让写锁可以继续操作
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
    // 释放写的信号量
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

上面是读锁释放时的步骤。之前在写加锁时,会把 readerCount 中的数量,放到 readerWait 中。所以,当检查到写锁加锁后,就要在两个计数中都要减去 1。

当检查到 readerWait == 0,也就是自己是最后一个读时,就将写锁等待的信号量 writerSem 释放掉。

写锁释放

func (rw *RWMutex) Unlock() {
    
  // 将 readerCount 恢复为正数
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)

  // 如果 readerCount 的值大于 最大的读数量,是异常情况,未加锁直接解锁
    if r >= rwmutexMaxReaders {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
  
    // 循环,按顺序释放等待的读锁信号量
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    
  // 释放写锁
    rw.w.Unlock()
}

到此,读写锁的操作细节都已经介绍完了。在我看来,整体的设计虽然比较简单,但是非常精巧。几个计数器的使用也非常的有新意义。可以学习,用在实际的开发过程中。


署名-非商业性使用-相同方式共享 4.0