Alibaba Sentinel 源码阅读(Part 1 执行流程)

原创
2018/10/20 22:45
阅读数 1K

准备

  1. 已经看过wiki 很多遍。
  2. 跑过测试用例和example,已经理解Sentinel的作用和大致的工作原理。
  3. 此博客基于版本 0.2.0,请自行准备源码。

目标

带着疑惑和问题阅读源码,理解未知的问题,和设计的优缺点。 这个项目的源码分析,直接从 SphU.entry(resource, EntryType); 入口方法往下走就可以了,比较直接。

工作原理

因为这张图很好的解释了工作原理,所以引用到这里,具体文档看wiki工作原理

重要概念

Resource : 需要保护的资源
Node : 存储资源的访问信息,资源的qps等信息
Slot : 独立功能的可插拔模块,处理具体的业务,比如限流处理等...。

Q1. Sentinel 怎么存储资源的访问信息?

Node的创建

先看看信息统计的栗子:

curl http://localhost:18990/tree?type=root

EntranceNode: machine-root(t:0 pq:0 bq:0 tq:0 rt:0 prq:0 1mp:0 1mb:0 1mt:0)
-EntranceNode: sentinel_default_context(t:0 pq:0 bq:0 tq:0 rt:0 prq:0 1mp:0 1mb:0 1mt:0)
-EntranceNode: book(t:0 pq:0 bq:0 tq:0 rt:0 prq:0 1mp:0 1mb:0 1mt:0)
--book(t:0 pq:0 bq:0 tq:0 rt:0 prq:0 1mp:0 1mb:0 1mt:0)
---/book/coke(t:0 pq:0 bq:0 tq:0 rt:0 prq:0 1mp:0 1mb:0 1mt:0)
-EntranceNode: coke(t:0 pq:0 bq:0 tq:0 rt:0 prq:0 1mp:0 1mb:0 1mt:0)
--coke(t:0 pq:0 bq:0 tq:0 rt:0 prq:0 1mp:0 1mb:0 1mt:0)
---/coke/exception(t:0 pq:0 bq:0 tq:0 rt:0 prq:0 1mp:0 1mb:0 1mt:0)
---/coke/coke(t:0 pq:0 bq:0 tq:0 rt:0 prq:0 1mp:0 1mb:0 1mt:0)

每个资源都有一个统计信息,对应存储到Node上,但是上一级的信息会是下一级信息的一个统计。 先看看Node的结构:

再来看看不同的node 功能和创建时机。


// Entrance node create in {@link ContextUtil} (对应上面的EntranceNode: book)
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<String, DefaultNode>();
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Default node create in {@link NodeSelectorSlot} (对应上面的 --book, --/book/coke),同时会把这些资源的调用路径,以树状结构存储起来。
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
node = Env.nodeBuilder.buildTreeNode(resourceWrapper, null);
// Build invocation tree
((DefaultNode)context.getLastNode()).addChild(node);

// Cluster node build in {@link ClusterBuilderSlot}(Cluter node的功能会单独讲)
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap
        = new HashMap<ResourceWrapper, ClusterNode>();
clusterNode = Env.nodeBuilder.buildClusterNode();
node.setClusterNode(clusterNode);

Node的信息收集

到上面为止,我们的node就创建好了,之后StatisticSlot 就会进行计数和统计了

// 各个规则的校验
fireEntry(context, resourceWrapper, node, count, args);
// 如果通过就统计相应的指标
 node.increaseThreadNum();
 node.addPassRequest();
 
if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest();
            }
 if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest();
            }

for (ProcessorSlotEntryCallback<DefaultNode> handler :StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }

Node中统计信息的使用

我们以FlowControl 为栗子, 先为 /book/coke 这个资源创建FlowControl(流量控制)的规则。

[{"controlBehavior":0,"count":1.0,"grade":1,"limitApp":"default","maxQueueingTimeMs":500,"resource":"/book/coke","strategy":0,"warmUpPeriodSec":10}]

规则校验时获取的信息. StatisticSlot

  // 获取qps信息
    @Override
    public long passQps() {
        return rollingCounterInSecond.pass() / IntervalProperty.INTERVAL;
    }

我们可以看到 统计信息都是从 StatisticSlot 中获取,而最终的数据都是存在 rollingCounterInMinuterollingCounterInMinute 中的,这也核心的数据结构和算法所在,我们再下一篇具体再看

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    /**
     * Holds statistics of the recent 120 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
     * meaning each bucket per second, in this way we can get accurate statistics of each second.
     */
    private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);

Q2. Slot Chain 是怎么工作的?

通过上面Node 创建过程,可以看到Node 的创建也是由各个Slot完成,每个Slot完成不同的功能然后向后传递。

Slot Chain创建过程

构造chain 在类CtSph 中创建

// 创建slot chain
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

 /**
     * Get {@link ProcessorSlotChain} of the resource. new {@link ProcessorSlotChain} will
     * be created if the resource doesn't relate one.
     *
     * <p>Same resource({@link ResourceWrapper#equals(Object)}) will share the same
     * {@link ProcessorSlotChain} globally, no matter in witch {@link Context}.<p/>
     *
     * <p>
     * Note that total {@link ProcessorSlot} count must not exceed {@link Constants#MAX_SLOT_CHAIN_SIZE},
     * otherwise null will return.
     * </p>
     *
     * @param resourceWrapper target resource
     * @return {@link ProcessorSlotChain} of the resource
     */
    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
					// 创建SlotChain的方法
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }

来看看default的实现

/**
 * Helper class to create {@link ProcessorSlotChain}.
 *
 * @author qinan.qn
 * @author leyou
 */
public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }

}

同时提供了自定义 builder的SPI 功能,SPI机制单独一个章节我们再看

 private static void resolveSlotChainBuilder() {
        List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();
        boolean hasOther = false;
		// LOADER 就是对应的SPI
        for (SlotChainBuilder builder : LOADER) {
            if (builder.getClass() != DefaultSlotChainBuilder.class) {
                hasOther = true;
                list.add(builder);
            }
        }
        if (hasOther) {
            builder = list.get(0);
        } else {
            // No custom builder, using default.
            builder = new DefaultSlotChainBuilder();
        }

        RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
            + builder.getClass().getCanonicalName());
    }

Slot Chain 的调用过程

Slot Chain 创建好了之后,那就从 firt开始 一个个往下执行就好了,这个流程,有接口直接定义好了,接口描述其实非常清晰,也就不再赘述。

public interface ProcessorSlot<T> {

    /**
     * Entrance of this slot.
     *
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param param           Generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node}
     * @param count           tokens needed
     * @param args            parameters of the original call
     * @throws Throwable blocked exception or unexpected error
     */
    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args)
        throws Throwable;

    /**
     * Means finish of {@link #entry(Context, ResourceWrapper, Object, int, Object...)}.
     *
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param obj             relevant object (e.g. Node)
     * @param count           tokens needed
     * @param args            parameters of the original call
     * @throws Throwable blocked exception or unexpected error
     */
    void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)
        throws Throwable;

    /**
     * Exit of this slot.
     *
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param count           tokens needed
     * @param args            parameters of the original call
     */
    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

    /**
     * Means finish of {@link #exit(Context, ResourceWrapper, int, Object...)}.
     *
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param count           tokens needed
     * @param args            parameters of the original call
     */
    void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

总结

根据工作原理,顺着entry 方法把Sentinel的大致流程过了一遍,可能是因为设计比较合理,所以源码阅读起来比较轻松。整体过程就是构建一个个slot,每个slot处理对应的职责,并将其连起来。资源以树状的形式组织,并将统计信息存储在对应的节点上。下一篇我们将看下具体信息的存储部分,也是重要的数据结构部分。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部