在 Golang 中,sync 包中的内容都是对于并发处理的一些工具的集合,每一个都很有用,在某些场景里,使用非常频繁。这篇文章主要说下,sync 包中的 Pool 这个组件。
在一些大批量的数据结构处理的过程中,我们会申请一些临时的存放变量,用完再释放掉。如果这个频率很高,数量很多的话就会造成GC标记的压力很大。导致应用消耗在GC上时间大幅变多,降低性能。如果有一个临时的变量队列(池),用完放到里面,供下次使用,那么在这种场景下,就会使得GC消耗的时间片大幅降低。优化程序的运行效率。
既然放在了 sync 包下面,那么对于并发的支持是一定的。sync.Pool 可以安全的在多协程的环境使用。
我们自己也可以实现一个这样的队列。但是实现起来有一些细节还是要考虑清楚,包括:
- 如何保证在并发的时候,不会将同一个对象分配给不同的协程?
- 如何保证使用锁的情况下,能够尽量减少锁的开销?
- ……
下面我们详细的来说一下
SYNC.POOL 的使用
首先,简单的来说下如何使用 sync.Pool。
在日常的使用过程中,一定要谨记一个概念,就是 sync.Pool 是用来存放 临时对象 的,不能存放像 DB连接,文件句柄等等一类持有状态和数据的对象。
原因是 sync.Pool 在使用的过程中,存放的对象可能在没有任何通知的情况下,被删除掉,如果存放的是某一个对象指针,那么指针指向的对象也可能被释放掉。这种设计机制决定了 sync.Pool 只能存放临时对象。
那么我们设计一个简单场景,来模拟一下 sync.Pool 的使用。
创建两个文件,分别为 demo.go 和 demo_test.go
package awesomeProject
import (
"sync"
)
var p = sync.Pool{
New: func() interface{}{
return &SimpleHandler{}
},
}
type SimpleHandler struct {
Data []byte
}
func (s *SimpleHandler) Do() {
s.Data = nil
}
func SimpleHandFunc(req string) {
s := SimpleHandler{
Data: []byte(req),
}
s.Do()
}
func SimpleHandFuncWithPool(req string) {
s := p.Get().(*SimpleHandler)
s.Data = []byte(req)
s.Do()
p.Put(s)
}
package awesomeProject
import "testing"
func TestSimpleHandFunc(t *testing.T){
for i := 0; i < 500000000; i++ {
SimpleHandFunc("req")
}
}
func TestSimpleHandFuncWithPool(t *testing.T){
for i := 0; i < 500000000; i++ {
SimpleHandFuncWithPool("req")
}
}
这段代码主要是创建了两种测试场景,用来模拟在大量请求的情况下,分别在使用和不使用 sync.Pool的测试中,展示耗时情况。
# go test -run TestSimpleHandFunc
PASS
ok github.com/reposkeeper/awesomeProject 22.012s
# go test -run TestSimpleHandFuncWithPool
PASS
ok github.com/reposkeeper/awesomeProject 18.679s
多次测试之后,大概使用 sync.Pool 会比不使用的时候,节省大概2-3s的时间。如果场景更加复杂,或者请求量更大的话,我相信对比还可以更加明显。
SYNC.POOL 的实现
对于这个组件来说,使用的场景和方式都非常固定和简单。不需要太复杂的介绍。但是其背后的设计逻辑,还是值得深入看看的。
为了解决并发的效率问题,sync.pool 给每个 P 都分配了一个 pool。这样可以大幅降低因为多个 P 抢占而需要处理的并发问题。所以在每个 sync.pool 的实例中,都有 N 个 local pool,每个 local pool 都是为了对应的 P 而产生的。这个 N,取的是 GOMAXPROCS 的值。
在每个 local pool 中,有两个地方来存放用户的对象,分别是 PRIVATE、SHARED。PRIVATE表示的是这个 local pool 私有的内容,只能存放一个。SHARED表示可以和其他 local pool 共享的内容,可以存放多个。另外还有一个 VICTIM POOL,它的结构和 local pool 的结构完全一样。它的存在是为了让 local pool 中未使用的对象能够多存在一个GC周期,防止出现冷启动的抖动问题。这个后面详细介绍。
下面看一下这块的源码实现。
type Pool struct {
noCopy noCopy // 防止 copy 实例的
local unsafe.Pointer // local pool 的一个数组,长度固定,是 P 的数量
localSize uintptr // local pool 的长度
victim unsafe.Pointer // 和 local pool 一样
victimSize uintptr // 和 local pool 一样
New func() interface{} // 可选,用来指定创建对象的函数
}
我们可以看到上述结构图中描述的几个部分,local pool 和 victim pool。
New 是一个可选参数,可以初始化一个创建对象的函数,这样当池子中没有对象的时候,可以直接创建一个返回,而不是返回 nil。
noCopy 是用来防止 copy 实例的。但是,Golang 目前无法做到在运行时做出禁止和检测,所以这个参数是给 go vet 在编译的时候做静态检查的。
取出对象步骤如下:
- 首先 定位到 P 对应的 local pool 中,取 PRIVATE 存放的对象;
- 如果 #1 没有取到,则去取 SHARED 中存放的对象;
- 如果 #2 没有取到,则去其他的 local pool 的 SHARED 中偷一个对象回来;
- 如果 #3 没有偷到,则去 VICTIM 的 PRIVATE 中取;
- 如果 #4 没有取到,则去 VICTIM 的 SHARED 中取;
- 如果 #5 没有取到,则表示上述地方都没有,则返回nil,或新创建一个对象(取决于是否初始化 New 这个函数);
为什么要分这么多步骤呢?其实是因为解决几个比较麻烦的问题才会有这么多的情况。
- PRIVATE 存放了 local pool 私有的对象,只能存放一个,只有自己可以访问。这是为了防止其他 P 把自己的对象偷完,造成重建的动作发生;
- SHARED 是一个双向链表,存放了可以和其他 P 共享的对象,可以存放多个,这是为了防止其他 local pool 如果没有需要重新创建的问题;
- VICTIM 是存放了在一次 GC 之后,local pool 原有的内容;这是为了防止因为GC出现的冷启动抖动问题,可以使未使用的对象,至少存在两个 GC 周期;
相对来说,放入对象就会比较简单了:
- 找到当前对应的 local pool ;
- 如果 PRIVATE 为空,则放入 PRIVATE中,否则,放入 SHARED 中;
GET 对应源码是下面的部分(去除了部分不重要的内容):
func (p *Pool) Get() interface{} {
// ......
l, pid := p.pin() // 将当前的 gorountine 锁在当前的 P 上,函数返回了 local pool 的指针和 P 的 id;
x := l.private // 检查 private
l.private = nil
if x == nil {
x, _ = l.shared.popHead() // 如果没有从 shared 中获取;
if x == nil {
x = p.getSlow(pid) // 如果还没有,就尝试用比较慢的步骤来获取
}
}
runtime_procUnpin() // unpin
// .....
if x == nil && p.New != nil { // 如果哪哪都没有,而且 New 被初始化了,那就创建一个
x = p.New()
}
return x // 如果 New 没有初始化,就返回 nil
}
func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize)
locals := p.local
for i := 0; i < int(size); i++ { // 去其他 local pool 偷一个
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil { // 从尾部拿
return x
}
}
size = atomic.LoadUintptr(&p.victimSize) // 从 victim pool 拿一个
if uintptr(pid) >= size {
return nil
}
locals = p.victim // private 中拿
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ { // 从 所有的池子中拿
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
atomic.StoreUintptr(&p.victimSize, 0) // 如果没拿到,后面就不再做这一步了
return nil
}
PUT 对应源码这样的:
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// ......
l, _ := p.pin()
if l.private == nil { // 尝试放到 private
l.private = x
x = nil
}
if x != nil { // 放到 shared
l.shared.pushHead(x)
}
runtime_procUnpin()
// ......
}
那么,池子里的元素什么时候会被清理掉呢? sync.Pool 在GC中注册了一个函数
func poolCleanup() {
for _, p := range oldPools { // 丢掉 oldpool 中 victim pool 的内容
p.victim = nil
p.victimSize = 0
}
for _, p := range allPools { // 把 local pool 放到 victim pool 中
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
oldPools, allPools = allPools, nil // 当前 pool 标记为 oldpool
}
通过这三步来做到定期清理池子中的内容。
为了实现 SHARED 中的队列,sync 包还实现了一个 单生产者,多消费者的双向队列,使用了 CAS 设计。后面再找一篇文章具体分解下。
署名-非商业性使用-相同方式共享 4.0