dubbo 缓存的使用和实现解析

原创
2018/04/04 16:38
阅读数 3.9K

dubbo缓存主要实现,对方法调用结果的缓存。
在服务消费方和提供方都可以配置使用缓存。
以消费方为例,可以配置全局缓存策略,这样所有服务引用都启动缓存
<dubbo:consumer cache="lru"/>
可以对某个服务引用配置缓存策略
<dubbo:reference id="demoService"   interface="demo.dubbo.api.DemoService" cache="lru"  >
也支持对单个方法启用缓存策略
<dubbo:reference id="demoService"    interface="demo.dubbo.api.DemoService" >
   <dubbo:method name="sayHello" cache="lru"> </dubbo:method>
</dubbo:reference>
服务方配置方法一样。

下面分析具体的实现过程

dubbo的缓存是通过过滤器实现的
通过 这篇博文 对注解Activate的认识,还有缓存的使用配置cache
这里找到了对应的Filter实现CacheFilter

//Activate指明服务方和消费方都可以启用缓存
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {

    private CacheFactory cacheFactory;

    public void setCacheFactory(CacheFactory cacheFactory) {
        this.cacheFactory = cacheFactory;
    }

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
          
        if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
           //invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName()) 
	   //作为缓存对象的key 可知不同的服务提供者,每个方法都会单独分配一个缓存对象
	    Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName()));
            if (cache != null) {
	        //方法的参数作为key
                String key = StringUtils.toArgumentString(invocation.getArguments());
                if (cache != null && key != null) {
                    Object value = cache.get(key);
                    if (value != null) {
		        //缓存命中,直接返回,也就是说,
			//这里要注意,如果有多个过滤器,cache后面的过滤器不会执行
                        return new RpcResult(value);
                    }
                    Result result = invoker.invoke(invocation);
                    if (!result.hasException()) {
                        cache.put(key, result.getValue());
                    }
                    return result;
                }
            }
        }
        return invoker.invoke(invocation);
    }

}

关于cacheFactory的实现,这里看下由dubbo动态生成的CacheFactory$Adaptive源码

public class CacheFactory$Adaptive implements com.alibaba.dubbo.cache.CacheFactory {
    public com.alibaba.dubbo.cache.Cache getCache(com.alibaba.dubbo.common.URL arg0) {
        if (arg0 == null) 
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        //默认是 lru 缓存策略
        String extName = url.getParameter("cache", "lru");
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.cache.CacheFactory) name from url(" + url.toString() + ") use keys([cache])");
        com.alibaba.dubbo.cache.CacheFactory extension = (com.alibaba.dubbo.cache.CacheFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.cache.CacheFactory.class).getExtension(extName);
        return extension.getCache(arg0);
    }
}

duobb提供了CacheFactory的具体有三种实现,类图如下

AbstractCacheFactory抽象父类中定义了缓存对象的获取方法getCache

public abstract class AbstractCacheFactory implements CacheFactory {

    private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>();

    public Cache getCache(URL url) {
        String key = url.toFullString();
        Cache cache = caches.get(key);
        if (cache == null) {
            caches.put(key, createCache(url));
            cache = caches.get(key);
        }
        return cache;
    }
    //具体缓存实现在子类中,这也是dubbo一贯的设计模式,公共方法提到抽象类中
    protected abstract Cache createCache(URL url);

}
//LruCacheFactory子类,返回LruCache对象,实现LRU策略缓存
public class LruCacheFactory extends AbstractCacheFactory {

    protected Cache createCache(URL url) {
        return new LruCache(url);
    }

}
//JCacheFactory子类,返回LruCache对象,可与符合JSR107规范的缓存桥接
public class JCacheFactory extends AbstractCacheFactory {

    protected Cache createCache(URL url) {
        return new JCache(url);
    }

}
//ThreadLocalCacheFactory子类,返回LruCache对象,ThreadLocal利用当前线程缓存
public class ThreadLocalCacheFactory extends AbstractCacheFactory {

    protected Cache createCache(URL url) {
        return new ThreadLocalCache(url);
    }

}

上面提到的三种Cache对象类都实现了com.alibaba.dubbo.cache接口,类图如下,

Cache接口很简单

public interface Cache {
    //put value到缓存
    void put(Object key, Object value);
    //通过key 获取value
    Object get(Object key);

}

三种Cache类都实现也值得学习
LruCache类

public class LruCache implements Cache {

    private final Map<Object, Object> store;

    public LruCache(URL url) {
        //从配置中,获取cache大小
        final int max = url.getParameter("cache.size", 1000);
        //内部是LRUCache对象
        this.store = new LRUCache<Object, Object>(max);
    }

    public void put(Object key, Object value) {
        store.put(key, value);
    }

    public Object get(Object key) {
        return store.get(key);
    }

}

分析LRUCache源码,可以看到LRUCache继承了LinkedHashMap,而LinkedHashMap是个双向链表,它是个天然的lru数据结构
只要定义LinkedHashMap是有序的,比如LRUCache的构造函数的定义

  public LRUCache(int maxCapacity) {
        //默认有序的链表,初始数组申请空间大小16,负载因子0.75(触发扩展的填充度临界值)
        super(16, DEFAULT_LOAD_FACTOR, true);
        this.maxCapacity = maxCapacity;
    }

并重写LinkedHashMap的removeEldestEntry方法

    @Override
    //定义换出缓存对象的条,这里是大小超过最大容量
    protected boolean removeEldestEntry(java.util.Map.Entry<K, V> eldest) {
        return size() > maxCapacity;
    }

这样就可以完成一个LRU缓存容器的创建,具体实现,可读写LinkedHashMap源码

ThreadLocalCache类实现

public class ThreadLocalCache implements Cache {
    //通过ThreadLocal把store 绑定到当前线程
    private final ThreadLocal<Map<Object, Object>> store;

    public ThreadLocalCache(URL url) {
        this.store = new ThreadLocal<Map<Object, Object>>() {
            @Override
            protected Map<Object, Object> initialValue() {
                return new HashMap<Object, Object>();
            }
        };
    }

    public void put(Object key, Object value) {
        store.get().put(key, value);
    }

    public Object get(Object key) {
        return store.get().get(key);
    }

}

JCache类实现

public class JCache implements com.alibaba.dubbo.cache.Cache {

    private final Cache<Object, Object> store;

    public JCache(URL url) {
        String method = url.getParameter(Constants.METHOD_KEY, "");
        //每个服务提供者每个方法,分配一个cache对象
        String key = url.getAddress() + "." + url.getServiceKey() + "." + method;
        // jcache 为SPI实现的全限定类名
        String type = url.getParameter("jcache");
        
        //通过CachingProvider 等jsr107规范相关接口 操作,这样,就能通过购spi 机制桥接各种缓存实现了
        CachingProvider provider = type == null || type.length() == 0 ? Caching.getCachingProvider() : Caching.getCachingProvider(type);
        CacheManager cacheManager = provider.getCacheManager();
        Cache<Object, Object> cache = cacheManager.getCache(key);
        if (cache == null) {
            try {
                //configure the cache
                MutableConfiguration config =
                        new MutableConfiguration<Object, Object>()
                                .setTypes(Object.class, Object.class)
                                 //通过cache.write.expire 设置失效时间,默认为1分钟,但不知道cache.write.expire在哪设置的??
                                .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, url.getMethodParameter(method, "cache.write.expire", 60 * 1000))))
                                .setStoreByValue(false)
                                .setManagementEnabled(true)
                                .setStatisticsEnabled(true);
                cache = cacheManager.createCache(key, config);
            } catch (CacheException e) {
                // 初始化cache 的并发情况
                cache = cacheManager.getCache(key);
            }
        }

        this.store = cache;
    }

    public void put(Object key, Object value) {
        store.put(key, value);
    }

    public Object get(Object key) {
        return store.get(key);
    }

}

我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=15h2lq45eu17e

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部