文档章节

使用ConcurrentHashMap实现高效缓存框架

爱宝贝丶
 爱宝贝丶
发布于 2017/05/13 15:56
字数 2521
阅读 1246
收藏 59

      在项目中我们有的时候需要使用某种形式的缓存,使用缓存能够重用之前的计算结果,降低系统延迟,提高吞吐量,但是其却会消耗更多的内存。就像许多重复发明的轮子一样,缓存框架的设计看上去简单,但系统的性能将会由缓存框架的伸缩性决定。如下是一段使用HashMap实现的缓存框架:

public interface Computable<A, V> {
  V compute(A arg) throws InterruptedException;
}
import java.math.BigInteger;

public class ExpensiveFunction implements Computable<String, BigInteger> {
  @Override
  public BigInteger compute(String arg) throws InterruptedException {
    return new BigInteger(arg);
  }
}
import java.util.HashMap;
import java.util.Map;

public class Memorizer1<A, V> implements Computable<A, V> {
  private final Map<A, V> cache = new HashMap<>();
  private final Computable<A, V> c;

  public Memorizer1(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public synchronized V compute(A arg) throws InterruptedException {
    V result = cache.get(arg);
    if (null == result) {
      result = c.compute(arg);
      cache.put(arg, result);
    }

    return result;
  }
}

      上述代码中,Computable接口定义的是一类用于执行某种类型计算的策略族。ExpensiveFunction实现了Computable接口,该类在概念上是通过传入的参数arg,经过一系列复杂计算而得到结果,这里为了方便起见,只是返回了一个BigInteger对象。Memorizer1类也实现了Computable接口,这里实际上用到了装饰者模式,在构造Memorizer1类时需要传入一个Computable类型对象进来,如ExpensiveFunction,当需要使用ExpensiveFunction类来进行复杂计算时,可以通过Memorizer1类来对其进行装饰,转而调用Memorizer1的compute方法。而在Memorizer1内部,其使用了一个HashMap来对真正的Computable对象(如ExpensiveFunction)的结果进行了缓存,如果传入的参数arg能够在cache中找到结果,那么直接返回,否则调用实际的Computable::compute方法进行计算,通过这种方式达到提高系统新能的目的。

      上述Memorizer1虽然能够实现对计算结果的缓存,但是由于HashMap不是线程安全的,其使用synchronized将整个compute方法包裹起来,当并发量较高时,就会出现多个线程同时竞争执行compute方法的情况,系统的性能可能比不使用缓存更低,此时如何提高Memorizer1的包裹内容的伸缩性将决定了系统的性能。

      观察这段程序会发现,Memorizer1中有两个全局属性,一个是用于缓存的cache,一个是用于计算的c。这里c对象只表示了一组通过传入参数计算结果的策略,其是一个无状态对象(所谓的无状态对象是指其内部没有被多个线程公用的属性);而主要的cache对象才是所有线程需要竞争的对象,因而这里使用锁的时候其实只需要锁住HashMap一个类型的对象即可。在java中对于HashMap,可以使用ConcurrentHashMap来解决多线程竞争map对象的问题,如下是使用ConcurrentHashMap改写Memorizer1的形式:

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class Memorizer2<A, V> implements Computable<A, V> {
  private final Map<A, V> cache = new ConcurrentHashMap<>();
  private final Computable<A, V> c;

  public Memorizer2(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public V compute(A arg) throws InterruptedException {
    V result = cache.get(arg);
    if (null == result) {
      result = c.compute(arg);
      cache.put(arg, result);
    }

    return result;
  }
}

      上述Memorizer2很好的改善了Memorizer1中存在的不足,并且多线程情况下也能够很好的运行,但是其仍存在一些不足,通过分析Memorizer2::compute方法可以发现,使用cache的位置主要有两个:首先获取cache中键为arg的值,如果不存在则调用Computable::compute方法来计算结果,并将结果保存到cache中。其实在调用cache::get和cache::put方法之间进行了一些计算,当一个线程执行了cache::get方法之后得到了空的result,其会执行高额的Computable::compute方法,此时该线程还未执行到cache::put方法,而另一个线程也调用了cache::get方法,其也将得到一个空的result,这样这两个线程还是有可能执行两次Computable::compute方法,并且由于Computable::compute方法耗费较高,因而重复计算的概率还是比较大的。

      在java类库中提供了另外一个类FutureTask,该类有一个get方法,如果有结果,那么其会立即返回结果,如果还未运行完成,那么其会阻塞,直到得到运行结果为止。这里我们可以通过FutureTask改写上述缓存框架,如下是改写后的代码:

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class Memorizer3<A, V> implements Computable<A, V> {
  private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
  private final Computable<A, V> c;

  public Memorizer3(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public V compute(final A arg) throws InterruptedException {
    Future<V> f = cache.get(arg);
    if (null == f) {
      Callable<V> eval = () -> {
          return c.compute(arg);
      };

      FutureTask<V> ft = new FutureTask<V>(eval);
      f = ft;
      cache.put(arg, f);
      ft.run();
    }

    try {
      return f.get();
    } catch (ExecutionException e) {
      throw launderThrowable(e.getCause());
    }
  }

  private InterruptedException launderThrowable(Throwable cause) {
    if (cause instanceof InterruptedException) {
      return (InterruptedException) cause;
    } else {
      return new InterruptedException(cause.getMessage());
    }
  }
}

      这里cache中存储的键还是计算需要的参数,而值则是一个个FutureTask对象,真正的计算放到了一个Callable对象中,在调用FutureTask::run方法时,是实际执行了Computable::compute方法。当一个线程传入一个参数之后,Memorizer3首先在cache中找是否已经有计算结果,如果没有,那么会创建一个FutureTask对象存入到cache中,然后调用FutureTask::run方法计算结果,最后调用FutureTask::get方法返回结果,如果run方法还未计算出结果,那么最后的get方法将会被阻塞,直到产生结果。
      Memorizer3对于缓存的实现几乎是完美的,但是其还是存在重复计算的缺陷,即当一个线程在cache中未找到结果,其创建了FutureTask对象并将其放入cache中,然后执行run方法计算结果的同时,另一个线程也传入同样的参数想要获取计算结果,其也发现cache中没有缓存结果,那么其也会创建一个新的FutureTask对象,并调用FutureTask::run方法计算结果。但是相对于Memorizer2,其出现重复计算的概率已经大大减少了。这里造成这个问题的原因还是在于Map的复合操作“若没有则添加”并不是原子的。在ConcurrentHashMap中提供了一个方法V putIfAbsent(K key, V value),其是一个原子的若不存在则添加方法,并且其会返回当前Map中已经存在的value值,若没有则返回null,如下是通过该方法解决上述重复计算问题的代码:

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class Memorizer<A, V> implements Computable<A, V> {
  private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<>();
  private final Computable<A, V> c;

  public Memorizer(Computable<A, V> c) {
    this.c = c;
  }

  @Override
  public V compute(final A arg) throws InterruptedException {
    while (true) {
      Future<V> f = cache.get(arg);
      if (null == f) {
        Callable<V> eval = () -> {
          return c.compute(arg);
        };

        FutureTask<V> ft = new FutureTask<V>(eval);
        f = cache.putIfAbsent(arg, ft);
        if (null == f) {
          f = ft;
          ft.run();
        }
      }

      try {
        return f.get();
      } catch (CancellationException e) {
        cache.remove(arg, f);
      } catch (ExecutionException e) {
        throw launderThrowable(e.getCause());
      }
    }
  }

  private InterruptedException launderThrowable(Throwable cause) {
    if (cause instanceof InterruptedException) {
      return (InterruptedException) cause;
    }

    return new InterruptedException(cause.getMessage());
  }
}

      这里通过调用ConcurrentHashMap::putIfAbsent方法,假设两个线程传入了同样的参数,并且都创建了一个FutureTask对象,一个线程获得了cache的执行权限执行了cache::putIfAbsent()方法,并且返回了一个null到局部变量f中,此时另一个线程也会调用cache::putIfAbsent()方法,由于第一个线程已经将相关键值对存入到cache中了,那么第二个线程将获得第一个线程创建的FutureTask对象,并且将其替换给当前线程中的局部变量f,并且其判断f不为null,那么其会调用f::get()方法,而此时第一个线程正在执行FutureTask::run方法,如果其已经计算完成,那么其会返回结果给第一个线程,而第二个线程是直接执行的FutureTask::get方法,如果第一个线程执行完成,那么第二个线程将直接获取结果返回,如果第一个线程没有执行完成,那么第二个线程将等待第一个线程执行完成后再返回结果。

      这里对compute方法使用while循环的目的是,当某个线程在执行结果的时候,其余的线程需要等待该线程执行完成,如果其余的线程由于某些原因等待被打断,那么通过while循环其会继续进入等待状态,从而得到执行结果。而对于某个执行计算结果的线程而言,如果其计算过程被取消或失败,那么对于缓存而言这就造成了缓存污染,因为存入的是FutureTask对象,而不是真正的结果,如果计算的线程被取消,那么实际上FutureTask::get方法将一直无法获取到值,但是cache中对应键的值也不是null的,这就是缓存污染的根源。上述代码中通过调用FutureTask::get方法时捕获CacellationException来捕捉执行计算的线程计算被取消或失败的情况,当出现这种情况的时候就从cache中将相应的FutureTask对象给移除掉,并且通过while循环也可以是后来进入的线程再次执行run方法从而得到计算结果。

      上述的Memorizer基本上是一个完美的缓存类,但是对于缓存而言,其数据如果存在过期问题,那么将需要另外进行设计,从而实现高性能吞吐的目的,当然,如果只是针对一些复杂的计算,只要传入的值不变,其结果永远不会发生变化,那么使用该框架还是非常合适的。

© 著作权归作者所有

爱宝贝丶

爱宝贝丶

粉丝 292
博文 118
码字总数 408287
作品 0
武汉
程序员
私信 提问
加载中

评论(5)

h
huihrt

引用来自“huihrt”的评论

貌似是并发编程实践上的吧,不过还是谢谢总结

引用来自“爱宝贝丶”的评论

嗯,是书上的内容,感觉这个位置写得非常好,所以发表出来了,一来做个总结加深印象,二来做个笔记,方便后续查阅:bowtie:
确实,这一段写得非常好,当时我也想总结来着😄
爱宝贝丶
爱宝贝丶

引用来自“huihrt”的评论

貌似是并发编程实践上的吧,不过还是谢谢总结
嗯,是书上的内容,感觉这个位置写得非常好,所以发表出来了,一来做个总结加深印象,二来做个笔记,方便后续查阅:bowtie:
h
huihrt
貌似是并发编程实践上的吧,不过还是谢谢总结
爱宝贝丶
爱宝贝丶

引用来自“whaon”的评论

不错
谢谢!
whaon
whaon
不错
AutoLoadCache 4.4 发布,完善使用 Map 本地缓存

AutoLoadCache 是一个高效的缓存管理解决方案,而且实现了自动加载(或叫预加载)和“拿来主义”机制,能非常巧妙地解决系统的性能及并发问题。源码:github 从4.0版本开始支持AOP的扩展,并...

qiujiayu
2016/05/11
2.3K
3
Retrofit 风格的 RxCache及其多种缓存替换算法

RxCache 是一个支持 Java 和 Android 的 Local Cache 。 之前的文章《给 Java 和 Android 构建一个简单的响应式Local Cache》、《RxCache 整合 Android 的持久层框架 greenDAO、Room》曾详细...

fengzhizi715
2018/11/01
0
0
分布式缓存系列之guava cache

guava是google的一个开源java框架,其github地址是 https://github.com/google/guava。guava工程包含了若干被Google的 Java项目广泛依赖的核心库,例如:集合 [collections] 、缓存 [caching...

浮云骑士LIN
2018/07/22
0
0
高效的缓存管理解决方案 - AutoLoadCache

现在使用的缓存技术很多,比如Redis、 Memcache 、 EhCache等,甚至还有使用ConcurrentHashMap 或 HashTable 来实现缓存。但在缓存的使用上,每个人都有自己的实现方式,大部分是直接与业务代...

qiujiayu
2015/12/02
0
10
《深入分布式缓存》之“自己动手写缓存”

版权声明:本文为半吊子子全栈工匠(wirelesscom,同公众号)原创文章,未经允许不得转载。 https://blog.csdn.net/wirelesscom/article/details/79277272 目前市面上已经有很多开源的缓存框架...

abel_cao
01/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

IT兄弟连 Java语法教程 Java语言的跨平台特性

什么是平台 Java是可以跨平台的编程语言,那么首先我们需要知道什么是平台,通常我们把CPU与操作系统的整体称为平台。 CPU大家都知道,是计算机的大脑,它既负责思维运算,又负责计算机中各种...

老码农的一亩三分地
3分钟前
0
0
http传值问题

这两天遇到一个问题 ,与一个渠道联调接口,http请求,展示ptf 的需求,服务方以一个二进制的方式返回。 当时我们在一开始开发的时候,我们按照读取文件的方式处理,本地存一个ptf 的方式 ,...

鬼才王
12分钟前
0
0
【面试】如果你这样回答“什么是线程安全”,面试官都会对你刮目相看

不是线程的安全 面试官问:“什么是线程安全”,如果你不能很好的回答,那就请往下看吧。 论语中有句话叫“学而优则仕”,相信很多人都觉得是“学习好了可以做官”。然而,这样理解却是错的。...

中关村的老男孩
12分钟前
4
0
5.01- Druid数据源配置

1、配置项 配置 缺省值 说明 name 无 配置这个属性的意义在于,如果存在多个数据源,监控的时候 可以通过名字来区分开来。如果没有配置,将会生成一个名字, 格式是:"DataSource-" + Syste...

静以修身2025
16分钟前
0
0
itop4412开发板-Linux内核的编译

本篇文章基于itop4412开发板 5.3.2.1源码目录 Linux 内核源码在光盘“06_源码_uboot 和 kernel”目录下,如下图所示。 5.3.2.2 编译器 内核的编译器和 uboot 的编译器一样,参考“5.3.1.2 编...

书白
21分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部