文档章节

groupcache源码解读

go-skyblue
 go-skyblue
发布于 2016/04/11 07:38
字数 1251
阅读 356
收藏 0

Simple usage

gropucache的官方网站是 https://github.com/golang/groupcache

consistenthash模块

一致性hash算法,通常是用在查找一个合适的下载节点时,使负载更平均,但是对于同样的请求始终返回一样的结果。

type Map struct {
    hash     Hash
    replicas int
    keys     []int // Sorted
    hashMap  map[int]string
}

Map结构中replicas的含义是增加虚拟桶,使数据分布更加均匀。

// 创建Map结构
func New(replicas int, fn Hash) *Map
// 添加新的Key
func (m *Map) Add(keys ...string)
// 根据hash(key)获取value
func (m *Map) Get(key string) string
// 判断Map是否为空
func (m *Map) IsEmpty() bool
// 竟然没有提供Remove方法 O_O

用法简单例子

package main

import (
	"fmt"

	"github.com/golang/groupcache/consistenthash"
)

func main() {
	c := consistenthash.New(70, nil)
	c.Add("A", "B", "C", "D", "E")
	for _, key := range []string{"what", "nice", "what", "nice", "good", "yes!"} {
		fmt.Printf("%s -> %s\n", key, c.Get(key))
	}
}

// Expect output
// -------------
// what -> C
// nice -> A
// what -> C
// nice -> A
// good -> D
// yes! -> E

singleflight 模块

type Group struct {
}

// 当多个相同的key请求的时候,函数只被调用一次
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) 

使用例子

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/golang/groupcache/singleflight"
)

func NewDelayReturn(dur time.Duration, n int) func() (interface{}, error) {
	return func() (interface{}, error) {
		time.Sleep(dur)
		return n, nil
	}
}

func main() {
	g := singleflight.Group{}
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		ret, err := g.Do("key", NewDelayReturn(time.Second*1, 1))
		if err != nil {
			panic(err)
		}
		fmt.Printf("key-1 get %v\n", ret)
		wg.Done()
	}()
	go func() {
		time.Sleep(100 * time.Millisecond) // make sure this is call is later
		ret, err := g.Do("key", NewDelayReturn(time.Second*2, 2))
		if err != nil {
			panic(err)
		}
		fmt.Printf("key-2 get %v\n", ret)
		wg.Done()
	}()
	wg.Wait()
}

执行结果(耗时: 1.019s)

key-2 get 1
key-1 get 1

lru 模块

最初是用在内存管理上的一个算法,根据历史的请求数,分析出最热门的数据,并保存下来。

type Cache struct {
    // MaxEntries is the maximum number of cache entries before
    // an item is evicted. Zero means no limit.
    MaxEntries int

    // OnEvicted optionally specificies a callback function to be
    // executed when an entry is purged from the cache.
    OnEvicted func(key Key, value interface{})
    // contains filtered or unexported fields
}

func New(maxEntries int) *Cache
func (c *Cache) Add(key Key, value interface{})
func (c *Cache) Get(key Key) (value interface{}, ok bool)
func (c *Cache) Len() int
func (c *Cache) Remove(key Key)
func (c *Cache) RemoveOldest()

用法举例

package main

import (
        "fmt"

        "github.com/golang/groupcache/lru"
)

func main() {
        cache := lru.New(2)
        cache.Add("x", "x0")
        cache.Add("y", "y0")
        yval, ok := cache.Get("y")
        if ok {
                fmt.Printf("y is %v\n", yval)
        }
        cache.Add("z", "z0")

        fmt.Printf("cache length is %d\n", cache.Len())
        _, ok = cache.Get("x")
        if !ok {
                fmt.Printf("x key was weeded out\n")
        }
}
// Expect output
//--------------
// y is y0
// cache length is 2
// x key was weeded out

HTTPPool 模块

type HTTPPool struct {
    // 可选,为每次的请求封装的Context参数
    Context func(*http.Request) groupcache.Context

    // 可选,不懂这个干啥的
    // 注释说:请求的时候用的就是这个
    Transport func(groupcache.Context) http.RoundTripper
}

// self 必须是一个合法的URL指向当前的服务器,比如 "http://10.0.0.1:8000"
// 这个函数默会注册一个路由
// http.handle("/_groupcache/", poolInstance) 
// 该路由主要用户节点间获取数据的功能
// 另外该函数不能重复调用,否则会panic
func NewHTTPPool(self string) *HTTPPool

// 更新节点列表
// 用了consistenthash
// 奇怪的时候,只有节点添加的函数,并没有删除的
func (p *HTTPPool) Set(peers ...string)

// 用一致性hash算法选择一个节点
func (p *HTTPPool) PickPeer(key string) (groupcache.ProtoGetter, bool)

// 用于处理通过HTTP传递过来的grpc请求
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request)

其他 (待补充)

本来想在官网上找个例子,可以

package main

// A SizeReaderAt is a ReaderAt with a Size method.
//
// An io.SectionReader implements SizeReaderAt.
type SizeReaderAt interface {
    Size() int64
    io.ReaderAt
}

// NewMultiReaderAt is like io.MultiReader but produces a ReaderAt
// (and Size), instead of just a reader.
func NewMultiReaderAt(parts ...SizeReaderAt) SizeReaderAt {
    m := &multi{
        parts: make([]offsetAndSource, 0, len(parts)),
    }
    var off int64
    for _, p := range parts {
        m.parts = append(m.parts, offsetAndSource{off, p})
        off += p.Size()
    }
    m.size = off
    return m
}

// NewChunkAlignedReaderAt returns a ReaderAt wrapper that is backed
// by a ReaderAt r of size totalSize where the wrapper guarantees that
// all ReadAt calls are aligned to chunkSize boundaries and of size
// chunkSize (except for the final chunk, which may be shorter).
//
// A chunk-aligned reader is good for caching, letting upper layers have
// any access pattern, but guarantees that the wrapped ReaderAt sees
// only nicely-cacheable access patterns & sizes.
func NewChunkAlignedReaderAt(r SizeReaderAt, chunkSize int) SizeReaderAt {
    // ...
}

func part(s string) SizeReaderAt {
    return io.NewSectionReader(strings.NewReader(s), 0, int64(len(s)))
}

func handler(w http.ResponseWriter, r *http.Request) {
    sra := NewMultiReaderAt(
        part("Hello, "), part(" world! "),
        part("You requested "+r.URL.Path+"\n"),
    )
    rs := io.NewSectionReader(sra, 0, sra.Size())
    http.ServeContent(w, r, "foo.txt", modTime, rs)
}


func main(){
    me := "http://10.0.0.1"
    peers := groupcache.NewHTTPPool(me)

    // Whenever peers change:
    peers.Set("http://10.0.0.1", "http://10.0.0.2", "http://10.0.0.3")
        
    var thumbNails = groupcache.NewGroup("thumbnail", 64<<20, groupcache.GetterFunc(
        func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
            fileName := key
            dest.SetBytes(generateThumbnail(fileName))
            return nil
        }))

    var data []byte
    err := thumbNails.Get(ctx, "big-file.jpg",
        groupcache.AllocatingByteSliceSink(&data))
    // ...
    http.ServeContent(w, r, "big-file-thumb.jpg", modTime, bytes.NewReader(data))


            
}

groupcache 使用例子

package main

import (
        "errors"
        "flag"
        "log"
        "net/http"
        "strconv"
        "strings"

        "github.com/golang/groupcache"
)

var localStorage map[string]string

func init() {
        localStorage = make(map[string]string)
        localStorage["hello"] = "world"
        localStorage["info"] = "This is an example"
}

func main() {
        port := flag.Int("port", 4100, "Listen port")
        flag.Parse()

        // Name have to starts with http://
        self := "http://localhost:" + strconv.Itoa(*port)
        pool := groupcache.NewHTTPPool(self)
        pool.Set(self, "http://localhost:4101")

        var helloworld = groupcache.NewGroup("helloworld", 10<<20, groupcache.GetterFunc(
                func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
                        log.Printf("groupcache get key: %v", key)
                        value, exists := localStorage[key]
                        if !exists {
                                dest.SetString(key + " NotExist")
                                return errors.New(key + " NotExist")
                        } else {
                                dest.SetString(value)
                                return nil
                        }
                }))

        http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
                key := strings.TrimPrefix(r.RequestURI, "/")
                log.Printf("Request(%v) key(%v)", r.RemoteAddr, key)
                if key == "" {
                        http.Error(w, "Bad Request", http.StatusBadRequest)
                        return
                }
                var data []byte
                err := helloworld.Get(nil, key, groupcache.AllocatingByteSliceSink(&data))
                if err != nil {
                        http.Error(w, err.Error(), http.StatusBadRequest)
                        return
                }
                log.Printf("cache data: %v", data)
                w.Write(data)
                log.Println("Gets: ", helloworld.Stats.Gets.String())
                log.Println("CacheHits: ", helloworld.Stats.CacheHits.String())
                log.Println("Loads: ", helloworld.Stats.Loads.String())
                log.Println("LocalLoads: ", helloworld.Stats.LocalLoads.String())
                log.Println("PeerErrors: ", helloworld.Stats.PeerErrors.String())
                log.Println("PeerLoads: ", helloworld.Stats.PeerLoads.String())
        })
        http.ListenAndServe(":"+strconv.Itoa(*port), nil)
}

参考文章

  1. http://talks.golang.org/2013/oscon-dl.slide#1
  2. http://xuchongfeng.github.io/2016/02/21/GroupCache%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB/

© 著作权归作者所有

go-skyblue

go-skyblue

粉丝 90
博文 50
码字总数 31921
作品 5
杭州
程序员
私信 提问
groupcache常用框架以及源码解读

1、groupcache介绍: http://www.csdn.net/article/2013-07-30/2816399-groupcache-readme-go 2、groupcache常用框架: 一般常用以上的框架去使用groupcache,此框架以及框架示例代码可通过h...

shawn chen
2015/01/12
590
0
Go 语言 4 周岁生日

Go语言4岁了,生日快乐!Go语言是由Google开发和开源的编程语言,2007年,谷歌工程师Rob Pike, Ken Thompson和Robert Griesemer开始设计一门全新的语言,这是它的最初原型。 在Go语言的官方博...

oschina
2013/11/11
4K
25
consistent hash(一致性哈希算法)

一、产生背景 今天咱不去长篇大论特别详细地讲解consistent hash,我争取用最轻松的方式告诉你consistent hash算法是什么,如果需要深入,Google一下~。 举个栗子吧: 比如有 N 个 cache 服务...

CloudGeek
2018/08/05
0
0
简约的CDN--minicdn

MiniCDN 一般来说会推荐采用 qiniu 或者 upyun,又或者是 amazon 之类大公司的 cdn 服务,不过当需要一些自己实现的场景,比如企业内部软件的加速,就需要一个私有的 CDN 了。 极简内容分发系...

go-skyblue
2015/07/14
1K
1
帮助 Medium 阅读时间达到 2600 年的技术栈

背景 Medium 是一个网络。这是一个分享有价值的故事和想法的地方,在这里人们可以保持想法不断的前行,大家已经在那里花费了 14 亿分钟(换句话说是 2600 年)的阅读时间。 每月的独立用户超...

oschina
2015/11/09
4.5K
10

没有更多内容

加载失败,请刷新页面

加载更多

Docker下使用disconf:细说demo开发

Docker下的disconf实战全文链接 《Docker搭建disconf环境,三部曲之一:极速搭建disconf》; 《Docker搭建disconf环境,三部曲之二:本地快速构建disconf镜像》; 《Docker搭建disconf环境,...

程序员欣宸
30分钟前
6
0
centos7配置nfs共享存储服务

nfs 是一种网络文件系统,需要依赖rpc进行过程调度 注意nfs只验证id,验证用户名,并且只能在类unix os上进行文件共享服务,由于它的脆弱的验证机制,所以不适宜在internet上工作,在内网使用...

老孟的Linux私房菜
33分钟前
8
0
【F5小常识】F5的 Web 应用防火墙 (WAF)有什么优势?

     现如今传统防火墙已无法满足企业安全需求,网络攻击大多发生在应用层和网络层故障,且呈上升趋势,传统的防火墙存在着很大的不足之处,包括无法检测加密的Web流量、无法扩展深度检测...

梅丽莎好
44分钟前
4
0
整合到 Mockito 2

为了能够持续改进 Mockito 和在未来提升测试体验,我们希望你能够升级到 Mockito 2.10!Mockito 按照语义化版本(semantic versioning)的方式对版本进行编排,并且只在主版本升级的时候包含...

honeymoose
44分钟前
4
0
spring boot actuator

actuator 是监控系统健康的工具,引入 spring-boot-starter-actuator会暴露一些endpoint. 可通过如下配置来配置这些endpoint的基本配置: 可通过http:${url}:28081/management/actuator/*来访...

ZH-JSON
51分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部