rk呓语

reposkeeper

Golang sync.Mutex 源码解析

Mutex 是一个较为常用的结构;不管是用户代码,还是 Golang 的官方包,都有不少地方用到了这个组件。

Mutex 的实现非常的精炼,去掉注释实际的代码,大概只有不到 150 行。但是,代码中体现的逻辑和对各种情况的考虑是非常的严谨的。个人觉得是官方库中特别值得仔细阅读的一个组件。

两种模式

首先,在进入源码之前,要对整个 Mutex 的设计初衷和设计思路要有总体的概念。

在程序中,锁会被多个 goroutine 来争抢,抢不到的则进入阻塞状态,等待抢到锁之后,再进行后面的工作。那么这样一来,抢不到锁的会排队等待锁释放之后再继续抢。如果等待锁的 goroutine 太多,就会出现一个问题,排队在末尾的 goroutine 可能要等待很长时间才能拿到锁,比如图中的编号8的 goroutine。

sync.mutex

那么在很多锁的实现中,并不完全使用公平锁。而是加入了一定的抢占能力。在 Golang 的 Mutex 实现中,表现为两种模式:NormalStarving 。在不同的模式下,锁会以不同的算法来交给对应的 Goroutine,以保证最好的效率,同时避免尾部延迟问题。

  • 在 Normal 模式时,Mutex 更偏向于将锁给当前正在CPU上的 Goroutine,即刚抢锁的 Goroutine。这样可以避免这个 Goroutine 等待,避免不必要的上下文切换;

  • 在 Starving 模式时,Mutex 不会允许 Goroutine 争抢,而是按照排队的顺序将锁给队列中的 Goroutine。

源码分析

知道了总体的设计原则之后 ,就可以通过代码来了解设计的具体实现了。

首先 ,Mutex 的定义非常简单,仅有两个字段

type Mutex struct {
    state int32
    sema  uint32
}

state 用来标记锁的状态,sema用做锁的信号量。在 Mutex 的方法中,state被划分出来多个位来标识不同的状态,如下。

state

低三位分别用来表示 StarvingWokenLocked 三种状态,剩下的高位用来当做等待锁的 Goroutine 计数器:

  1. Starving:表示当前锁在饥饿状态,此时不应该再有抢占动作,而是按顺序把锁交给队列里等待的 Goroutine;
  2. Woken:表示从休眠中唤醒,一个 Goroutine 第二次尝试抢占锁时,会将此标志位置位;
  3. Locked:表示当前锁是否被锁住;
  4. Wait Counter:等待计数器;如果抢占不成功,则加1;

低三位的几个状态并不是互斥关系,而是可以共存的,我们在后面详述。

Lock 实现

func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

Lock 时,分为两种情况。第一种,直接使用 CAS 来设置 state 为锁定状态,假设当前的状态为 “未锁定、未唤醒、非饥饿、无等待”。当一个 Mutex 并不繁忙时,这种方法基本可以直接拿到锁。如果很遗憾没拿到,那么就要进入 lockSlow,顾名思义,一种较慢的抢锁方式,也就是第二种情况。如果拿到的话 ,其实就是把 Locked 置位,表示锁已经被锁定。

第二种情况相对比较复杂,在这里,会使用各种方法来提高获取锁的效率,同时也会将刚才说的剩下的 几个状态位使用起来 。

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
  for {
    // .....
  }
  
}

首先,lockSlow 中定义了很多变量。因为函数是在每个 Goroutine 中执行的,所以,变量只在当前 Goroutine 中生效,这个原则对理解后面很有帮助。

  • waitStartTime 用来记录当前 Goroutine 在 信号量队列等待的开始时间,是纳秒时间戳;
  • starving 用来记录当前 Goroutine 是否是饥饿的;
  • awoke 用来记录当前 Goroutine 是否是被从队列唤醒的;
  • iter 是用来在自旋中计数的;
  • old 相当于 state 的一个快照;

了解完定义的变量之后,下面就是一个大的 dead loop,在这个循环里,做了很多事情:

首先了解一个概念:自旋。自旋是空占CPU等待的一个操作,可以避免 Goroutine 被调度,失去对CPU的占有。使用得当可以避免被调度时多出的很多上下文切换的开销,可以提高效率。

第一部分:当 Mutex 是锁定状态,不在饥饿模式下,并且可以自旋时,做下面的操作:

  1. 如果队列有等待的 Goroutine,且锁并没有被唤醒,自己也不是被唤醒的 Goroutine(自己是新到的抢锁 Goroutine),那么将锁置位唤醒,这样就可以不用再唤醒其他 Goroutine 了;
  2. 自旋 4次;
for {
  // 锁定 && 不是饥饿模式 && 可以自旋
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {

    // 自己不是唤醒 && 锁不是唤醒状态 && 队列不为空 && 将锁置位唤醒成功
    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
      awoke = true  // 把自己设置为已唤醒
    }
    runtime_doSpin()  // 自旋
    iter++  // 这里就是为了控制自旋的次数,runtime_doSpin 规定了只能自旋4次
    old = m.state // 重新获取状态
    continue
  }
  // ……
}

第二部分:如果自旋没有等到锁或者锁已经进入了饥饿状态或者锁已经释放,那么做下面的操作:

  1. 当锁已经释放,尝试加锁,加锁成功则退出;
  2. 当锁在加锁,且已处于饥饿模式,等待队列+1;
  3. 当锁在加锁,且自己是饥饿状态,则尝试将锁变为饥饿模式;
  4. 当自己在唤醒状态时,去除锁的唤醒状态;
  5. 当 2 或 3 任意一个执行成功,则将自己放入信号量等待队列中;
    1. 如果 waitStartTime 已设置,则将自己放入队头;
    2. 如果没有设置,则放入队尾,并将其设置为当前时间戳;
  6. 进入信号量等待
for {
  // %第一部分 ……%
  new := old
  // 如果不是 饥饿模式,则尝试加锁
  if old&mutexStarving == 0 {
    new |= mutexLocked
  }
  
  // 如果是饥饿模式或加锁状态,则在队列里+1
  if old&(mutexLocked|mutexStarving) != 0 {
    new += 1 << mutexWaiterShift
  }
  
  // 如果自己是饥饿状态 && 锁在加锁状态,则将锁变为饥饿模式
  if starving && old&mutexLocked != 0 {
    new |= mutexStarving
  }
  
  // 如果自己是被唤醒的,则取消锁的唤醒标记
  if awoke {
    // The goroutine has been woken from sleep,
    // so we need to reset the flag in either case.
    if new&mutexWoken == 0 {
      throw("sync: inconsistent mutex state")
    }
    new &^= mutexWoken
  }
  
  // 尝试给锁置位,如果不成功则重新获取锁的状态,循环中再走一次
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
    if old&(mutexLocked|mutexStarving) == 0 {
      break // 加锁成功,溜了
    }
    
    // 如果 waitStartTime 不为零,那么说明之前等待过,则放在队头,否则放在队尾
    queueLifo := waitStartTime != 0
    // 如果 waitStartTime 为零,那么设置开始等待时间
    if waitStartTime == 0 {
      waitStartTime = runtime_nanotime()
    }
    
    // 进入信号量等待
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
   
    // % 第三部分…… %
  } else {
    old = m.state // 重新获取状态
  }
  
}

第三部分:从信号量等待队列中醒来,如果锁进入了饥饿状态,那代表不需要抢锁,锁是直接给了自己的;如果没有,那么就需要重新开始抢。

for {
  // % 第一部分…… %
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
  // % 第二部分…… %
    
    // 确认自己是不是已经等饿了,当等待时间超过 1ms,那就饿了
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    
    // 重新获取状态
    old = m.state
    
    // 如果已经处于饥饿状态,那么唤醒就不需要抢锁,是Mutex直接给我们的
    if old&mutexStarving != 0 {
      
      // 此时,低三位的状态一定是 100
      // 等待计数至少为1(就是自己)
      if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
        throw("sync: inconsistent mutex state")
      }
      
      delta := int32(mutexLocked - 1<<mutexWaiterShift)
      
      // 如果自己不是饥饿状态 或者 队列里只剩下自己
      if !starving || old>>mutexWaiterShift == 1 {
        delta -= mutexStarving // 退出饥饿状态
      }
     
      atomic.AddInt32(&m.state, delta)
      break
    }
    
    // 如果不是饥饿状态,那就把自己设为唤醒状态,然后重新开始抢锁
    awoke = true
    iter = 0
    }else {
    old = m.state // 重新获取状态
  }
}

UnLock 实现

首先,尝试直接将 lock 位置为零,操作之后,如果state不等于零(有队列、饥饿状态、唤醒状态),那么就进入 slowLock。

在 slowLock ,首先看一下,是不是因为对已经解锁的Mutex进行了二次解锁,如果是这样,就直接抛出错误

if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }

随后,检查锁是否处于饥饿状态,如果是,那么就直接交给下一个信号量排队的 Goroutine;

如果,锁不在饥饿状态,那么就做下面的操作:

下面的操作放在一个 dead loop 中,直到成功为止;

  1. 如果队列中没有等待的Goroutine,则返回
  2. 如果锁被设置为唤醒,则表示已经有被唤醒的 Goroutine,则不需要再唤醒其他 Goroutine 了,返回
  3. 如果锁被设置为锁定,那么表示其他 Goroutine 已经获取了锁,则返回
  4. 如果锁被设置为饥饿,那么表示不需要再唤醒 Goroutine,则返回
  5. 上述条件判断完之后,则代表本次一定要唤醒一个 Goroutine,这时会将锁的唤醒位置位,并唤醒一个 Goroutine,
for {
            // 如果没有等待的 Goroutine 或者标志位任何一个被置位,就直接返回
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            
        // 标记为唤醒,并且唤醒一个 Goroutine
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }

后记

Golang 的 Mutex 实现,在加入了 饥饿模式之后,出现过严重的性能问题,在 Github 有详细的讨论 这里 。在 go1.14 中,修复了这个问题,增加了 “饥饿模式” 下,被唤醒的 Goroutine 可以被立即调度。

Golang Context 源码解析

上一篇

工具汇总

下一篇
评论
发表评论 说点什么
146
1