rk呓语

reposkeeper

Golang sync.Pool 源码解析

main

在 Golang 中,sync 包中的内容都是对于并发处理的一些工具的集合,每一个都很有用,在某些场景里,使用非常频繁。这篇文章主要说下,sync 包中的 Pool 这个组件。

在一些大批量的数据结构处理的过程中,我们会申请一些临时的存放变量,用完再释放掉。如果这个频率很高,数量很多的话就会造成GC标记的压力很大。导致应用消耗在GC上时间大幅变多,降低性能。如果有一个临时的变量队列(池),用完放到里面,供下次使用,那么在这种场景下,就会使得GC消耗的时间片大幅降低。优化程序的运行效率。

既然放在了 sync 包下面,那么对于并发的支持是一定的。sync.Pool 可以安全的在多协程的环境使用。

我们自己也可以实现一个这样的队列。但是实现起来有一些细节还是要考虑清楚,包括:

  1. 如何保证在并发的时候,不会将同一个对象分配给不同的协程?
  2. 如何保证使用锁的情况下,能够尽量减少锁的开销?
  3. ……

下面我们详细的来说一下


SYNC.POOL 的使用

首先,简单的来说下如何使用 sync.Pool。

在日常的使用过程中,一定要谨记一个概念,就是 sync.Pool 是用来存放  临时对象 的,不能存放像 DB连接,文件句柄等等一类持有状态和数据的对象。

原因是 sync.Pool 在使用的过程中,存放的对象可能在没有任何通知的情况下,被删除掉,如果存放的是某一个对象指针,那么指针指向的对象也可能被释放掉。这种设计机制决定了 sync.Pool 只能存放临时对象。

那么我们设计一个简单场景,来模拟一下 sync.Pool 的使用。

创建两个文件,分别为 demo.go 和 demo_test.go

package awesomeProject

import (
    "sync"
)

var p = sync.Pool{
    New: func() interface{}{
        return &SimpleHandler{}
    },
}

type SimpleHandler struct {
    Data []byte
}

func (s *SimpleHandler) Do() {
    s.Data = nil
}

func SimpleHandFunc(req string) {

    s := SimpleHandler{
        Data: []byte(req),
    }
    s.Do()
}

func SimpleHandFuncWithPool(req string) {

    s := p.Get().(*SimpleHandler)
    s.Data = []byte(req)
    s.Do()
    p.Put(s)
}
package awesomeProject

import "testing"

func TestSimpleHandFunc(t *testing.T){
    for i := 0; i < 500000000; i++ {
        SimpleHandFunc("req")
    }
}


func TestSimpleHandFuncWithPool(t *testing.T){
    for i := 0; i < 500000000; i++ {
        SimpleHandFuncWithPool("req")
    }
}

这段代码主要是创建了两种测试场景,用来模拟在大量请求的情况下,分别在使用和不使用 sync.Pool的测试中,展示耗时情况。

# go test -run TestSimpleHandFunc
PASS
ok      github.com/reposkeeper/awesomeProject   22.012s


# go test -run TestSimpleHandFuncWithPool
PASS
ok      github.com/reposkeeper/awesomeProject   18.679s

多次测试之后,大概使用 sync.Pool 会比不使用的时候,节省大概2-3s的时间。如果场景更加复杂,或者请求量更大的话,我相信对比还可以更加明显。

SYNC.POOL 的实现

对于这个组件来说,使用的场景和方式都非常固定和简单。不需要太复杂的介绍。但是其背后的设计逻辑,还是值得深入看看的。

pool-structure

为了解决并发的效率问题,sync.pool 给每个 P 都分配了一个 pool。这样可以大幅降低因为多个  P 抢占而需要处理的并发问题。所以在每个 sync.pool 的实例中,都有 N 个 local pool,每个 local pool 都是为了对应的 P 而产生的。这个 N,取的是 GOMAXPROCS 的值。

在每个 local pool 中,有两个地方来存放用户的对象,分别是 PRIVATE、SHARED。PRIVATE表示的是这个 local pool 私有的内容,只能存放一个。SHARED表示可以和其他 local pool 共享的内容,可以存放多个。另外还有一个 VICTIM POOL,它的结构和 local pool 的结构完全一样。它的存在是为了让 local pool 中未使用的对象能够多存在一个GC周期,防止出现冷启动的抖动问题。这个后面详细介绍。

下面看一下这块的源码实现。

type Pool struct {
    noCopy noCopy              // 防止 copy 实例的

    local     unsafe.Pointer   // local pool 的一个数组,长度固定,是 P 的数量
    localSize uintptr          // local pool 的长度

    victim     unsafe.Pointer  // 和 local pool 一样
    victimSize uintptr         // 和 local pool 一样

    New func() interface{}     // 可选,用来指定创建对象的函数
}

我们可以看到上述结构图中描述的几个部分,local pool 和 victim pool。

New 是一个可选参数,可以初始化一个创建对象的函数,这样当池子中没有对象的时候,可以直接创建一个返回,而不是返回 nil。

noCopy 是用来防止 copy 实例的。但是,Golang 目前无法做到在运行时做出禁止和检测,所以这个参数是给 go vet 在编译的时候做静态检查的。

get-process

取出对象步骤如下

  1. 首先 定位到  P 对应的 local pool 中,取 PRIVATE 存放的对象;
  2. 如果 #1 没有取到,则去取 SHARED 中存放的对象;
  3. 如果 #2 没有取到,则去其他的 local pool 的 SHARED 中偷一个对象回来;
  4. 如果 #3 没有偷到,则去 VICTIM 的 PRIVATE 中取;
  5. 如果 #4 没有取到,则去 VICTIM 的 SHARED 中取;
  6. 如果 #5 没有取到,则表示上述地方都没有,则返回nil,或新创建一个对象(取决于是否初始化 New 这个函数);

为什么要分这么多步骤呢?其实是因为解决几个比较麻烦的问题才会有这么多的情况。

  • PRIVATE 存放了 local pool 私有的对象,只能存放一个,只有自己可以访问。这是为了防止其他 P 把自己的对象偷完,造成重建的动作发生;
  • SHARED 是一个双向链表,存放了可以和其他 P 共享的对象,可以存放多个,这是为了防止其他 local pool 如果没有需要重新创建的问题;
  • VICTIM 是存放了在一次 GC 之后,local pool 原有的内容;这是为了防止因为GC出现的冷启动抖动问题,可以使未使用的对象,至少存在两个 GC 周期;

相对来说,放入对象就会比较简单了

  1. 找到当前对应的 local pool ;
  2. 如果 PRIVATE 为空,则放入 PRIVATE中,否则,放入 SHARED 中;

GET 对应源码是下面的部分(去除了部分不重要的内容):

func (p *Pool) Get() interface{} {
    // ......
    
    l, pid := p.pin()  // 将当前的 gorountine 锁在当前的 P 上,函数返回了 local pool 的指针和 P 的 id;
    x := l.private     // 检查 private
    l.private = nil
    if x == nil {
        x, _ = l.shared.popHead()    // 如果没有从 shared 中获取;
        if x == nil {
            x = p.getSlow(pid)       // 如果还没有,就尝试用比较慢的步骤来获取
        }
    }
    runtime_procUnpin()              //  unpin
    
    // .....
    
    if x == nil && p.New != nil {    // 如果哪哪都没有,而且 New 被初始化了,那就创建一个
        x = p.New()
    }
    return x                         // 如果 New 没有初始化,就返回 nil
}

func (p *Pool) getSlow(pid int) interface{} {
    
    size := atomic.LoadUintptr(&p.localSize) 
    locals := p.local                      
    for i := 0; i < int(size); i++ {  // 去其他 local pool 偷一个
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {  // 从尾部拿
            return x
        }
    }

    size = atomic.LoadUintptr(&p.victimSize)  // 从 victim pool 拿一个
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim                         // private 中拿
    l := indexLocal(locals, pid)
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ {          // 从 所有的池子中拿
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    atomic.StoreUintptr(&p.victimSize, 0)     // 如果没拿到,后面就不再做这一步了

    return nil
}

PUT 对应源码这样的:

func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    
    // ......
    
    l, _ := p.pin()
    if l.private == nil {    // 尝试放到 private
        l.private = x
        x = nil
    }
    if x != nil {            // 放到 shared
        l.shared.pushHead(x)
    }
    runtime_procUnpin()
    
    // ......
}

那么,池子里的元素什么时候会被清理掉呢? sync.Pool 在GC中注册了一个函数

func poolCleanup() {
    
    for _, p := range oldPools {  // 丢掉 oldpool 中 victim pool 的内容
        p.victim = nil
        p.victimSize = 0
    }

    for _, p := range allPools {  // 把 local pool 放到 victim pool 中
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }

    oldPools, allPools = allPools, nil // 当前 pool 标记为 oldpool
}

通过这三步来做到定期清理池子中的内容。

为了实现 SHARED 中的队列,sync 包还实现了一个 单生产者,多消费者的双向队列,使用了 CAS 设计。后面再找一篇文章具体分解下。

SOLID 原则小记

上一篇

Golang sync.Map 源码解析

下一篇
评论
发表评论 说点什么
还没有评论
115
0