ConcurrentSkipListMap跳表原理解析

原创
2019/11/29 23:57
阅读数 5.2K

我们首先来看一下ConcurrentSkipListMap的继承结构图。

内部结构如下(图片来源于网络),这里面Node其实就是HeadIndex中的level1,level2,level3中的一个个绿点。

ConcurrentSkipListMap的整体数据结构是一种多层链表结构,在Java中,我们都知道链表有LinkedList,但LinkedList是一个双向链表.由插入时候的以下代码就可以看出来。

void linkBefore(E e, Node<E> succ) {
    // assert succ != null;
    final Node<E> pred = succ.prev;
    final Node<E> newNode = new Node<>(pred, e, succ);
    succ.prev = newNode;
    if (pred == null)
        first = newNode;
    else
        pred.next = newNode;
    size++;
    modCount++;
}

而ConcurrentSkipListMap中的各层链表为单向链表,并且key值有序排列。这里有其Node节点的代码可以看出,它只有一个next的后续节点。

static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile Node<K,V> next;

以及它的两个构造器

Node(K key, Object value, Node<K,V> next) {
    this.key = key;
    this.value = value;
    this.next = next;
}
Node(Node<K,V> next) {
    this.key = null;
    this.value = this;
    this.next = next;
}

我们先来看一下,当我们new一个ConcurrentSkipListMap的时候会发生什么。

final Comparator<? super K> comparator; //函数式接口——比较器
public ConcurrentSkipListMap() {
    this.comparator = null;
    initialize();
}

private transient volatile HeadIndex<K,V> head; //HeadIndex继承于Index,这里为原始头索引(当整个数据结构中还没有分层,没有链表的时候)

private static final Object BASE_HEADER = new Object();
private void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    //这里head会被初始化,他的属性中,Node的Key为null,value为Object对象,下一个节点为null;
    //head的分层索引down为null,链表的后续索引right为null,层级level为第一层。
    head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                              null, null, 1);
}
static final class HeadIndex<K,V> extends Index<K,V> {
    final int level; //层数
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}
static class Index<K,V> {
    final Node<K,V> node; //节点
    final Index<K,V> down; //分层索引,分层索引跟上层索引的Node的key,value相同,next不同
    volatile Index<K,V> right; //链表后续索引

索引的构造器

Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
    this.node = node;
    this.down = down;
    this.right = right;
}

做为一个Map,我们来看一下它的put方法。

public V put(K key, V value) {
    //写入时不允许写入null值
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}
private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //在最底层链表中获取目标的前置节点
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            //如果该前置节点的后续节点不为null
            if (n != null) {
                Object v; int c;
                //获取后续节点的后续节点
                Node<K,V> f = n.next;
                //如果n不为前置节点的后续节点,重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
                //因为可能有其他线程已经插入了其他节点在b的后续节点
                if (n != b.next)               // inconsistent read
                    break;
                //如果后续节点n的值为null,删除该节点n,用n的后续节点顶替n,重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
                if ((v = n.value) == null) {   // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                //如果b为null或者v等于自己的value,说明b已经被其他线程删除了
                //重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
                if (b.value == null || v == n) // b is deleted
                    break;
                //如果我们查找的key大于后续节点的key,向后续节点推进
                if ((c = cpr(cmp, key, n.key)) > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                //如果我们查找的key等于后续节点的key
                if (c == 0) {
                    //通过无锁竞争,将value替换后续节点的值,并返回后续节点的原值
                    //竞争失败的,重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
                    //这里我们可以看到修改已存在的key的时候,是不会进行层数变化的
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    break; // restart if lost race to replace value
                }
                // else c < 0; fall through
            }
            //如果后续节点n为null,初始化一个节点,放入我们要存储的key,value,该节点的后续节点为n
            z = new Node<K,V>(key, value, n);
            //通过无锁竞争,将该新节点替换掉b的后续节点n,此时只有一个线程可以竞争成功,并替换
            //竞争失败的线程,重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
            if (!b.casNext(n, z))
                break;         // restart if lost race to append to b
            //竞争成功的,退出'for (;;)'循环
            break outer;
        }
    }
    //到此时表示已经节点已经put成功了,但对于跳表来说,来要根据随机数的值来表示是否向上增加层数与上层节点
    //获取一个伪随机的种子
    int rnd = ThreadLocalRandom.nextSecondarySeed();
    //如果该种子的二进制与10000000000000000000000000000001进行与运算为0
    //即该种子的二进制最高位与最末尾必须为0,其他位无所谓
    //如果该种子的二进制最高位与最末位不为0,不增加新节点的层数
    if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
        //定义层level,从1开始
        int level = 1, max;
        //判断该种子值的二进制从第二位开始向左有多少个连续的1,层数加多少个1
        //这里由于是随机值,所以层数level是不确定的
        while (((rnd >>>= 1) & 1) != 0)
            ++level;
        Index<K,V> idx = null;
        //获取头索引head
        HeadIndex<K,V> h = head;
        //如果level小于等于头索引的层数
        if (level <= (max = h.level)) {
            //根据层数level不断创建新增节点的下层索引
            //注意此时只是新增了新节点的索引,并没有关联到跳表的真实体中
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
        }
        //如果层数level大于头索引的层数
        else { // try to grow by one level
            //将层数level变更为头索引的层数加1
            level = max + 1; // hold in array and later pick the one to use
            //创建一个数量为level+1的索引数组
            @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                (Index<K,V>[])new Index<?,?>[level+1];
            //根据层数level不断创建新增节点的下层索引,并放入数组中
            //此时只是新增了新节点的索引,并没有关联到跳表的真实体中
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            for (;;) {
                //获取头索引
                h = head;
                //获取头索引的层数
                int oldLevel = h.level;
                //如果level小于等于oldLevel,说明已经有其他线程修改了头索引的层数,退出循环
                if (level <= oldLevel) // lost race to add level
                    break;
                //定义一个新的头索引,取值h
                HeadIndex<K,V> newh = h;
                //获取头索引的节点
                Node<K,V> oldbase = h.node;
                //该段代码的意思其实只是新增一层新链表,这一层新链表以原头索引为下层索引,新增节点索引为链表后续索引,所以这
                //一层新链表只有头索引和新增节点的索引两个索引,但由于有多个线程的参与,该循环体可能会不断执行
                for (int j = oldLevel+1; j <= level; ++j)
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                //通过无锁竞争,使用newh来替代h(此时h不等于newh,只是跳表的头索引位置被替换掉了)
                //竞争成功的线程,将h变更为newh,idx变更为oldlevel层的新增节点索引,并退出循环'for (;;)'
                //因为此时新链表层已经确定,仅为头索引和新节点索引,而之前所有的层的链表均要插入新节点索引,所以会作此变更
                //竞争失败的重新进入循环'for (;;)'
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        // find insertion points and splice in
        //获取level,该level为原头节点的层数,即原跳表的层数,不包括新层
        splice: for (int insertionLevel = level;;) {
            //获取头索引的层数
            int j = h.level;
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
                //如果头索引为null或者新增节点索引为null,退出'for (int insertionLevel = level;;)'
                //此处表示有其他线程删除了头索引或者新增节点的索引
                if (q == null || t == null)
                    break splice;
                //如果头索引的链表后续索引存在,如果是新层则为新节点索引,如果是老层,则为原索引
                if (r != null) {
                    //获取r的节点
                    Node<K,V> n = r.node;
                    // compare before deletion check avoids needing recheck
                    //获取我们插入的key和n的key的比较值
                    int c = cpr(cmp, key, n.key);
                    //删除空值索引,具体解释见doGet中
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    //如果我们插入的key大于n的key,继续向后续推进
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                //如果j为跳表原层数      
                if (j == insertionLevel) {
                    //此时q为我们需要在第j层插入新增节点的前置索引
                    //将新节点索引通过无锁竞争插入q与r之间,竞争失败的会重新进入'for (Index<K,V> q = h, r = q.right, t = idx;;)'循环
                    if (!q.link(r, t))
                        break; // restart
                    //如果新增节点的值为null,表示该节点已经被其他线程删除,结束循环'for (int insertionLevel = level;;)'
                    if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    //插入层逐层自减,当为最底层时退出循环'for (int insertionLevel = level;;)'
                    if (--insertionLevel == 0)
                        break splice;
                }
                //其他节点随着插入节点的层数下移而下移
                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}
boolean casValue(Object cmp, Object val) {
    //无锁竞争,如果cmp等于valueOffset,将val替换cmp
    return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}
private boolean casHead(HeadIndex<K,V> cmp, HeadIndex<K,V> val) {
    return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
    //获取调用索引对象的节点
    Node<K,V> n = node;
    //将新索引的链表后续索引(newSucc)设为老索引(succ)
    newSucc.right = succ;
    //如果调用索引对象的值不为null,通过无锁竞争,将新索引替换老索引
    return n.value != null && casRight(succ, newSucc);
}
private Node<K,V> findNode(Object key) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //找到最底层目标节点的前置节点
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            //如果该前置节点的链表后续节点为null,退出'for (;;)'循环
            if (n == null)
                break outer;
            //获取后续节点的后续节点
            Node<K,V> f = n.next;
            //如果n不为前置节点的后续节点,表示已经有其他线程删除了该节点,重新进入‘for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)’循环
            if (n != b.next)                // inconsistent read
                break;
            //如果后续节点的值为null,无锁竞争,删除该节点(将后续节点替代该节点)
            //重新进入‘for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)’循环
            if ((v = n.value) == null) {    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            //如果前置节点已被其他线程删除,重新进入‘for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)’循环
            if (b.value == null || v == n)  // b is deleted
                break;
            //如果插入的key与后续节点的key相等,返回后续节点
            if ((c = cpr(cmp, key, n.key)) == 0)
                return n;
            //如果插入的节点key小于后续节点key,结束循环‘for (;;)’
            if (c < 0)
                break outer;
            //如果插入的节点key大于后续节点key,向后推进
            b = n;
            n = f;
        }
    }
    return null;
}

关于层数是如何来增加的,这个就依靠于ThreadLocalRandom.nextSecondarySeed()这个随机数来决定,当该随机数的二进制最高位与最末位不为0的时候,我们put进该Map的数据只会在最底层链表中,不会在高层链表中构建节点。当该随机数的二进制最高位与最末位都为0的时候,且该随机数从二进制第二位开始向左有多少个1,就代表会在多少层高层链表中构建节点,当然超过原跳表的最高层只会增加一层。

用一个网上随机的例子来说明,虽然这个例子中的算法并不是以上ConcurrentSkipListMap的真实算法。但是可以帮助理解

跳跃表的初试状态如下图,表中没有一个元素:

如果我们要插入元素2,首先是在底部插入元素2,如下图:

然后我们抛硬币,结果是正面,那么我们要将2插入到L2层,如下图: 

继续抛硬币,结果是反面,那么元素2的插入操作就停止了,插入后的表结构就是上图所示。接下来,我们插入元素33,跟元素2的插入一样,现在L1层插入33,如下图:

然后抛硬币,结果是反面,那么元素33的插入操作就结束了,插入后的表结构就是上图所示。接下来,我们插入元素55,首先在L1插入55,插入后如下图:

然后抛硬币,结果是正面,那么L2层需要插入55,如下图:

继续抛硬币,结果又是正面,那么L3层需要插入55,如下图:

以此类推,我们插入剩余的元素。当然因为规模小,结果很可能不是一个理想的跳跃表。但是如果元素个数n的规模很大,学过概率论的同学都知道,最终的表结构肯定非常接近于理想跳跃表。

然后我们来看一下它的get方法。

public V get(Object key) {
    return doGet(key);
}
private V doGet(Object key) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //在最底层链表中获取目标的前置节点
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            //如果找到的前置节点没有后续节点,直接跳出循环'for (;;)',然后返回null
            if (n == null)
                break outer;
            //如果后续节点不为null,获取后续节点到后续节点
            Node<K,V> f = n.next;
            //如果n已经不为前置节点到后续节点了,重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
            if (n != b.next)                // inconsistent read
                break;
            //如果后续节点n的值为null
            if ((v = n.value) == null) {    // n is deleted
                //通过无锁竞争删除该节点n,因为只允许有一个线程可以删除成功重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
                n.helpDelete(b, f);
                break;
            }
            //如果前置节点的值为null或者后续节点的值为null重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
            if (b.value == null || v == n)  // b is deleted
                break;
            //如果查找的键与后续节点的键相同,返回后续节点的值
            if ((c = cpr(cmp, key, n.key)) == 0) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            //如果查找的键小于后续节点的键,直接跳出循环'for (;;)',然后返回null
            if (c < 0)
                break outer;
            //如果查找的键大于后续节点的键,继续向链表的后端推进
            b = n;
            n = f;
        }
    }
    return null;
}

以下这段findPredecessor方法整体的意思为:从最上层的头索引开始向右查找(链表的后续索引),如果后续索引的节点的Key大于我们要查找的Key.则头索引移到下层链表,在下层链表查找,以此反复,一直查找到没有下层的分层索引为止,返回该索引的节点。如果后续索引的节点的Key小于我们要查找的Key,则在该层链表中向后查找。由于查找的Key永远小于索引节点的Key,所以只能找到目标的前置索引节点。其中会有空值索引的存在,这里是通过CAS来进行处理的。这里需要注意的是我们要找的值最终都是在最底层链表中找到的,它不会在高层和中层链表中去找最终值(就算高层、中层链表中有这个值)。

比方说我们要在这个图中找55,它是不会直接在L3中直接找到的,而是经过箭头的方向在L1找55的前置节点。

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    for (;;) {
        //获取原始头索引以及该头索引的链表后续索引,当ConcurrentSkipListMap刚初始化的时候,r为null
        //但是我们查找肯定不会在空的跳表中查找,所以我们认定头索引的后续索引r不为null.
        for (Index<K,V> q = head, r = q.right, d;;) {
            //如果链表后续索引不为null
            if (r != null) {
                //获取链表后续索引的节点
                Node<K,V> n = r.node;
                K k = n.key;
                //如果该节点的值value为null
                if (n.value == null) {
                    //删除空值索引,即把r的后续索引顶替掉r
                    //如果无锁竞争失败,即多个线程都在删空索引,只有一个线程能删成功,从头开始进入循环'for (Index<K,V> q = head, r = q.right, d;;)'
                    if (!q.unlink(r))
                        break;           // restart
                    //无锁竞争成功,空值索引被成功删除,重新获取r值,并跳过后续代码,进入'for (Index<K,V> q = head, r = q.right, d;;)'下一轮循环
                    //这里我们需要注意,这里的q和r并不代表哪个固定的索引,他们都是不断在具体的索引中不断变化的
                    r = q.right;         // reread r
                    continue;
                }
                //如果该节点的值value不为null,且查找的key值大于该节点的key值
                //向后续索引推进
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }
            //如果q的分层索引为null,返回q的节点
            if ((d = q.down) == null)
                return q.node;
            //如果q的分层索引不为null,将q赋值为自身的分层索引
            q = d;
            //r赋值为分层索引的链表后续索引
            r = d.right;
        }
    }
}
final boolean unlink(Index<K,V> succ) {
    //前索引的节点的value是否为null,以及用后索引替换前索引是否成功
    return node.value != null && casRight(succ, succ.right);
}
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
    //无锁竞争,如果前索引cmp等于rightOffset的时候,使用后索引来替换前索引,并返回true;否则返回false
    return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}
static final int cpr(Comparator c, Object x, Object y) {
    //比较x,y是否相等,如果返回0则相等,为正数说明x大于y
    return (c != null) ? c.compare(x, y) : ((Comparable)x).compareTo(y);
}

关于比较器,可以参考本人博客Java函数式编程整理

void helpDelete(Node<K,V> b, Node<K,V> f) {
    /*
     * 如果本节点的后续节点为f,且本身为b的后续节点
     */
    if (f == next && this == b.next) {
        //如果后续节点为null或者后续节点值不为后续节点
        if (f == null || f.value != f) // not already marked
            //通过无锁竞争生成一个key为null,value和next都相同的后续节点(value和next也可能为null)
            casNext(f, new Node<K,V>(f));
        else
            //如果后续节点不为空,通过无锁竞争,将f的后续节点替换掉本身节点,即删除本身节点。
            b.casNext(this, f.next);
    }
}
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

当然我们最后依然要讨论它的并发性能,我们可以看到整个put,get中都没有锁的参与,完全通过双for循环(第一层基本类似于for(;;)的无限循环,第二层加入了各种CAS无锁竞争,竞争失败的会重新进入第二层循环)来保证线程安全。我们都知道纯无锁的性能其实是要远远高过锁性能的,所以在高并发的读写操作的时候,其性能是要好过ConcurrentHashMap,因为ConcurrentHashMap有数组单节点中有节点锁,具体的性能测试可以参考Fork/Join框架原理和使用探秘

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部