文档章节

sync.Pool实现原理

魂祭心
 魂祭心
发布于 08/10 19:04
字数 1546
阅读 353
收藏 6

sync.Pool实现原理

对象的创建和销毁会消耗一定的系统资源(内存,gc等),过多的创建销毁对象会带来内存不稳定与更长的gc停顿,因为go的gc不存在分代,因而更加不擅长处理这种问题。因而go早早就推出Pool包用于缓解这种情况。Pool用于核心的功能就是Put和Get。当我们需要一个对象的时候通过Get获取一个,创建的对象也可以Put放进池子里,通过这种方式可以反复利用现有对象,这样gc就不用高频的促发内存gc了。

结构

    type Pool struct {
        noCopy noCopy

        local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
        localSize uintptr        // size of the local array

        // New optionally specifies a function to generate
        // a value when Get would otherwise return nil.
        // It may not be changed concurrently with calls to Get.
        New func() interface{}
    }

创建时候指定New方法用于创建默认对象,local,localSize会在随后用到的时候生成. local是一个poolLocalInternal的切片指针。

    type poolLocalInternal struct {
        private interface{}   // Can be used only by the respective P.
        shared  []interface{} // Can be used by any P.
        Mutex                 // Protects shared.
    }

当不同的p调用Pool时,每个p都会在local上分配这样一个poolLocal,索引值就是p的id。 private存放的对象只能由创建的p读写,shared则会在多个p之间共享。

PUT

    // Put adds x to the pool.
    func (p *Pool) Put(x interface{}) {
        if x == nil {
            return
        }
        if race.Enabled {
            if fastrand()%4 == 0 {
                // Randomly drop x on floor.
                return
            }
            race.ReleaseMerge(poolRaceAddr(x))
            race.Disable()
        }
        l := p.pin()
        if l.private == nil {
            l.private = x
            x = nil
        }
        runtime_procUnpin()
        if x != nil {
            l.Lock()
            l.shared = append(l.shared, x)
            l.Unlock()
        }
        if race.Enabled {
            race.Enable()
        }
    }

Put先要通过pin函数获取当前Pool对应的pid位置上的localPool,然后检查private是否存在,存在则设置到private上,如果不存在就追加到shared尾部。


func (p *Pool) pin() *poolLocal {
	pid := runtime_procPin()
	// In pinSlow we store to localSize and then to local, here we load in opposite order.
	// Since we've disabled preemption, GC cannot happen in between.
	// Thus here we must observe local at least as large localSize.
	// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
	s := atomic.LoadUintptr(&p.localSize) // load-acquire
	l := p.local                          // load-consume
	if uintptr(pid) < s {                 // 这句话的意思是如果当前pool的localPool切片尚未创建,尚未创建这句话肯定是false的
		return indexLocal(l, pid)
	}
	return p.pinSlow()
}

pin函数先通过自旋加锁(可以避免p自身发生并发),在检查本地local切片的size,size大于当前pid则使用pid去本地local切片上索引到localpool对象,否则就要走pinSlow对象创建本地localPool切片了.

func (p *Pool) pinSlow() *poolLocal {
	// Retry under the mutex.
	// Can not lock the mutex while pinned.
	runtime_procUnpin()
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid)
	}
	if p.local == nil {
		allPools = append(allPools, p)
	}
	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
	return &local[pid]
}

pinShow先要取消自旋锁,因为后面的lock内部也会尝试自旋锁,下面可能会操作allpool因而这里需要使用互斥锁allPoolsMu,然后又加上自旋锁,(这里注释说不会发生poolCleanup,但是查看代码gcstart只是查看了当前m的lock状态,然而避免不了其他m触发的gc,尚存疑),这里会再次尝试之前的操作,因为可能在unpin,pin之间有并发产生了poolocal,确认本地local切片是空的才会生成一个新的pool。后面是创建Pool上的localPool切片,runtime.GOMAXPROCS这里的作用是返回p的数量,用于确定pool的localpool的数量.

GET

    func (p *Pool) Get() interface{} {
        if race.Enabled {
            race.Disable()
        }
        l := p.pin()
        x := l.private
        l.private = nil
        runtime_procUnpin()
        if x == nil {
            l.Lock()
            last := len(l.shared) - 1
            if last >= 0 {
                x = l.shared[last]
                l.shared = l.shared[:last]
            }
            l.Unlock()
            if x == nil {
                x = p.getSlow()
            }
        }
        if race.Enabled {
            race.Enable()
            if x != nil {
                race.Acquire(poolRaceAddr(x))
            }
        }
        if x == nil && p.New != nil {
            x = p.New()
        }
        return x
    }

GET 先调用pin获取本地local,这个具体流程和上面一样了,如果当前private存在返回private上面的对象,如果不存在就从shared查找,存在返回尾部对象,反之就要从其他的p的localPool里面偷了。

    func (p *Pool) getSlow() (x interface{}) {
        // See the comment in pin regarding ordering of the loads.
        size := atomic.LoadUintptr(&p.localSize) // load-acquire
        local := p.local                         // load-consume
        // Try to steal one element from other procs.
        pid := runtime_procPin()
        runtime_procUnpin()
        for i := 0; i < int(size); i++ {
            l := indexLocal(local, (pid+i+1)%int(size))
            l.Lock()
            last := len(l.shared) - 1
            if last >= 0 {
                x = l.shared[last]
                l.shared = l.shared[:last]
                l.Unlock()
                break
            }
            l.Unlock()
        }
        return x
    }

首先就要获取当前size,用于轮询p的local,这里的查询顺序不是从0开始,而是是从当前p的位置往后查一圈。查到依次检查每个p的shared上是否存在对象,如果存在就获取末尾的值。 如果所有p的poollocal都是空的,那么初始化的New函数就起作用了,调用这个New函数创建一个新的对象出来。

清理

func poolCleanup() {
	// This function is called with the world stopped, at the beginning of a garbage collection.
	// It must not allocate and probably should not call any runtime functions.
	// Defensively zero out everything, 2 reasons:
	// 1. To prevent false retention of whole Pools.
	// 2. If GC happens while a goroutine works with l.shared in Put/Get,
	//    it will retain whole Pool. So next cycle memory consumption would be doubled.
	for i, p := range allPools {
		allPools[i] = nil
		for i := 0; i < int(p.localSize); i++ {
			l := indexLocal(p.local, i)
			l.private = nil
			for j := range l.shared {
				l.shared[j] = nil
			}
			l.shared = nil
		}
		p.local = nil
		p.localSize = 0
	}
	allPools = []*Pool{}
}

pool对象的清理是在每次gc之前清理,通过runtime_registerPoolCleanup函数注册一个上面的poolCleanup对象,内部会把这个函数设置到clearpool函数上面,然后每次gc之前会调用clearPool来取消所有pool的引用,重置所有的Pool。代码很简单就是轮询一边设置nil,然后取消所有poollocal,pool引用。方法简单粗暴。由于clearPool是在STW中调用的,如果Pool存在大量对象会拉长STW的时间,在已经有提案来修复这个问题了(CL 166961.)[https://go-review.googlesource.com/c/go/+/166961/]

© 著作权归作者所有

魂祭心
粉丝 13
博文 53
码字总数 88077
作品 0
浦东
后端工程师
私信 提问
请问sync.Pool有什么缺点?

1.12及之前版本的sync.Pool有三个问题: 每次GC都回收所有对象,如果缓存对象数量太大,会导致STW1阶段的耗时增加。 每次GC都回收所有对象,导致缓存对象命中率下降,New方法的执行造成额外的...

木白的技术私厨
08/07
0
0
同步之sync.Pool临时对象池

同步之sync.Pool临时对象池 当多个goroutine都需要创建同一个对象的时候,如果goroutine过多,可能导致对象的创建数目剧增。 而对象又是占用内存的,进而导致的就是内存回收的GC压力徒增。造...

秋风醉了
2016/08/05
569
0
golang sync.Pool试用说明及注意事项

Go tip 是 Go 语言的实验分支,包含了很多尚在讨论,但很有可能会加入 stable 分支的特性。“Go tip 在做什么”(原文地址:What's happening in Go tip)分析总结了 Go 语言尚在开发中的一些...

wkh
2014/06/20
3.6K
0
[转载]golang sync.Pool

Go 1.3 的sync包中加入一个新特性:Pool。 官方文档可以看这里http://golang.org/pkg/sync/#Pool 这个类设计的目的是用来保存和复用临时对象,以减少内存分配,降低CG压力。 Get返回Pool中的...

大蓝妹
2014/10/29
147
0
深度 | 从Go高性能日志库zap看如何实现高性能Go组件

导语:zap是uber开源的Go高性能日志库。本文作者深入分析了zap的架构设计和具体实现,揭示了zap高效的原因。并且对如何构建高性能Go语言库给出自己的建议。 作者简介:李子昂,美图公司架构平...

高可用架构
2018/08/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
521
10
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
27
0
spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
今天
13
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
40
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
47
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部