文档章节

ConcurrentMultiMap 实现

lost_o0
 lost_o0
发布于 2015/03/02 10:29
字数 890
阅读 54
收藏 0
点赞 0
评论 0

Index,akka用来存储对应dispatcher和actor的,也是Akka实现的`ConcurrentMultiMap<K,V>`

`key`类型自定义,`value`是由`ConcurrentSkipListSet`构成的,内部其实就对`ConcurrentMultiMap<K, ConcurrentSkipListSet<V>>`的再封装

而Index的`mapSize`参数对应`ConcurrentHashMap`的 `initialCapacity`, 语义都变味了!

读取时会copy(asScala)一份,并返回

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.util

import annotation.tailrec

import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
import java.util.Comparator
import scala.collection.JavaConverters.{ asScalaIteratorConverter, collectionAsScalaIterableConverter }
import scala.collection.mutable

/**
 * An implementation of a ConcurrentMultiMap
 * Adds/remove is serialized over the specified key
 * Reads are fully concurrent <-- el-cheapo
 */
class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) {//valueComparator

  def this(mapSize: Int, cmp: (V, V) ⇒ Int) = this(mapSize, new Comparator[V] {
    def compare(a: V, b: V): Int = cmp(a, b)
  })
  
  //java集合
  private val container = new ConcurrentHashMap[K, ConcurrentSkipListSet[V]](mapSize)
  private val emptySet = new ConcurrentSkipListSet[V]//线程安全集合

  /**
   * Associates the value of type V with the key of type K
   * @return true if the value didn't exist for the key previously, and false otherwise
   */
  def put(key: K, value: V): Boolean = {
    //Tailrecursive spin-locking put
    @tailrec
    def spinPut(k: K, v: V): Boolean = {//
      var retry = false
      var added = false
      val set = container get k

      if (set ne null) {
        set.synchronized {//删除的时候有synchronized,但是上一句ne null判定时没有被锁定,
          if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry//当为empty的时候,说明它已经不在container中,正在被删除中...
          else { //Else add the value to the set and signal that retry is not needed
            added = set add v
            retry = false
          }
        }
      } else {//
        val newSet = new ConcurrentSkipListSet[V](valueComparator)
        newSet add v

        // Parry for two simultaneous putIfAbsent(id,newSet)
        val oldSet = container.putIfAbsent(k, newSet)//插入的时候,通过putIfAbsent,防止覆盖
        if (oldSet ne null) {
          oldSet.synchronized {
            if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
            else { //Else try to add the value to the set and signal that retry is not needed
              added = oldSet add v
              retry = false
            }
          }
        } else added = true
      }

      if (retry) spinPut(k, v)
      else added
    }

    spinPut(key, value)
  }

  /**
   * @return Some(value) for the first matching value where the supplied function returns true for the given key,
   * if no matches it returns None
   */
  def findValue(key: K)(f: (V) ⇒ Boolean): Option[V] =
    container get key match {
      case null ⇒ None
      case set  ⇒ set.iterator.asScala find f
    }

  /**
   * Returns an Iterator of V containing the values for the supplied key, or an empty iterator if the key doesn't exist
   */
  def valueIterator(key: K): scala.Iterator[V] = {
    container.get(key) match {
      case null ⇒ Iterator.empty
      case some ⇒ some.iterator.asScala
    }
  }

  /**
   * Applies the supplied function to all keys and their values
   */
  def foreach(fun: (K, V) ⇒ Unit): Unit =
    container.entrySet.iterator.asScala foreach { e ⇒ e.getValue.iterator.asScala.foreach(fun(e.getKey, _)) }

  /**
   * Returns the union of all value sets.
   */
  def values: Set[V] = {
    val builder = Set.newBuilder[V]
    for {
      values ← container.values.iterator.asScala
      v ← values.iterator.asScala
    } builder += v
    builder.result()
  }

  /**
   * Returns the key set.
   */
  def keys: Iterable[K] = container.keySet.asScala

  /**
   * Disassociates the value of type V from the key of type K
   * @return true if the value was disassociated from the key and false if it wasn't previously associated with the key
   */
  def remove(key: K, value: V): Boolean = {
    val set = container get key

    if (set ne null) {
      set.synchronized {
        if (set.remove(value)) { //If we can remove the value
          if (set.isEmpty) //and the set becomes empty
            container.remove(key, emptySet) //We try to remove the key if it's mapped to an empty set

          true //Remove succeeded
        } else false //Remove failed
      }
    } else false //Remove failed
  }

  /**
   * Disassociates all the values for the specified key
   * @return None if the key wasn't associated at all, or Some(scala.Iterable[V]) if it was associated
   */
  def remove(key: K): Option[Iterable[V]] = {
    val set = container get key

    if (set ne null) {
      set.synchronized {
        container.remove(key, set)
        val ret = collectionAsScalaIterableConverter(set.clone()).asScala // Make copy since we need to clear the original
        set.clear() // Clear the original set to signal to any pending writers that there was a conflict
        Some(ret)
      }
    } else None //Remove failed
  }

  /**
   * Removes the specified value from all keys
   */
  def removeValue(value: V): Unit = {
    val i = container.entrySet().iterator()
    while (i.hasNext) {
      val e = i.next()
      val set = e.getValue()

      if (set ne null) {
        set.synchronized {
          if (set.remove(value)) { //If we can remove the value
            if (set.isEmpty) //and the set becomes empty
              container.remove(e.getKey, emptySet) //We try to remove the key if it's mapped to an empty set //类似 compareAndSwap
          }
        }
      }
    }
  }

  /**
   * @return true if the underlying containers is empty, may report false negatives when the last remove is underway
   */
  def isEmpty: Boolean = container.isEmpty

  /**
   *  Removes all keys and all values
   */
  def clear(): Unit = {
    val i = container.entrySet().iterator()
    while (i.hasNext) {
      val e = i.next()
      val set = e.getValue()
      if (set ne null) { set.synchronized { set.clear(); container.remove(e.getKey, emptySet) } }
    }
  }
}

/**
 * An implementation of a ConcurrentMultiMap
 * Adds/remove is serialized over the specified key
 * Reads are fully concurrent <-- el-cheapo
 */
class ConcurrentMultiMap[K, V](mapSize: Int, valueComparator: Comparator[V]) extends Index[K, V](mapSize, valueComparator)


© 著作权归作者所有

共有 人打赏支持
lost_o0
粉丝 10
博文 37
码字总数 31139
作品 0
烟台

暂无文章

看看 LinkedList Java 9

终于迎来了 LinkedList 类,实现的接口就有点多了 Serializable, Cloneable, Iterable<E>, Collection<E>, Deque<E>, List<E>, Queue<E>。LinkedList是一个实现了List接口和Deque接口的双端链......

woshixin
18分钟前
0
0
算法 - 冒泡排序 C++

大家好,我是ChungZH。今天我给大家讲一下最基础的排序算法:冒泡排序(BubbleSort)。 冒泡排序算法的原理如下: 比较相邻的元素。如果第一个比第二个大(可以相反),就交换他们两个。 对每...

ChungZH
21分钟前
0
0
jquery ajax request payload和fromData请求方式

请求头的不同 fromData var data = { name : 'yiifaa'};// 提交数据$.ajax('app/', { method:'POST', // 将数据编码为表单模式 contentType:'application/x-ww...

lsy999
23分钟前
0
0
阿里P7架构师,带你点亮程序员蜕变之路

前言: Java是现阶段中国互联网公司中,覆盖度最广的研发语言。 掌握了Java技术体系,不管在成熟的大公司,快速发展的公司,还是创业阶段的公司,都能有立足之地。 有不少朋友问,成为Java架...

Java大蜗牛
25分钟前
1
0
Ecstore 在没有后台管理界面(维护)的情况如何更新表的字段

window 系统: 切换到:app\base 目录下: C:\Users\qimh>d: D:\>cd D:\WWW\huaqh\app\base 执行:D:\WWW\huaqh\app\base>cmd update linux 系统: 1># cd /alidata/www.novoeshop.com/app/......

qimh
29分钟前
0
0
设计模式-策略模式

策略模式 解释 对工厂模式的再次封装,使用参数控制上下文信息(将工厂返回的实例赋值给context field) 不会返回bean实例,只是设置对应的条件 调用context的方法(调用field的方法) 用户只...

郭里奥
32分钟前
0
0
python使用有序字典

python自带的collections包中有很多有用的数据结构可供使用,其中有个叫OrderedDict类,它可以在使用的时候记录元素插入顺序,在遍历使用的时候就可以按照原顺序遍历。 a = {"a":1,"b"...

芝麻糖人
今天
0
0
RestTemplate HttpMessageConverter

RestTemplate 微信接口 text/plain HttpMessageConverter

微小宝
今天
0
0
mysql视图/存储过程/函数/事件/触发器

--语法参考:https://dev.mysql.com/doc/ (当前用的是5.6) https://dev.mysql.com/doc/refman/5.6/en/sql-syntax-data-manipulation.html --视图 CREATE VIEW test.v AS SELECT * FROM t;......

坦途abc
今天
0
0
MySQL参数优化案例

环境介绍 硬件配置 cpu核心数 内存大小 磁盘空间 16核 256G 3T 软件环境 操作系统版本 mysql版本 表数目 单表行数 centos-7.4 mysql-5.7.22 128张表 2kw行 优化层级与指导思想 优化层级 MySQ...

小致dad
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部