Sentinel

原创
2020/10/29 10:44
阅读数 1.3W

Sentinel

大名鼎鼎的限流、容灾中间件,阿里的开源项目,经过双十一验证,兼容各类其他中间件,支持多语言。所以还有什么理由不拥抱它呢? https://github.com/alibaba/Sentinel

主要能力

  1. 流量限制
  2. 熔断降级
  3. 系统自适应保护

简单源码分析

滑动窗口

sentinel的数据统计采用的是滑动窗口的模式。数据统计基础的类是StatisticNode 该类包含三个关键统计指标 秒级别qps统计 分钟级qps统计 线程数统计

qps的统计使用到了滑动窗口的统计方案。已更简单,更典型的分钟级qps统计为例。 当创建一个StatisticNode 结构体时,会同时创建一个用于统计qps的对象

private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

之后会将1分钟划分成60个窗口。

当资源被运行时,会找到当前时间对应的窗口(逻辑不算复杂),在该窗口内计数。

demo

public class BasicDemo {

	public static void main(String[] args){
		
		ContextUtil.enter("name1","app1");

		Entry entry = null;
		try{
			entry = SphU.entry("test1", EntryType.IN);
			System.out.println("hello world");
		} catch (BlockException e) {
			System.out.println("系统繁忙");
			e.printStackTrace();
		}finally {
			if(entry!=null) {
				entry.exit();
			}
		}
	}
}

ContextUtil.enter(String name, String origin);

实际使用中,很多时候我们不会主动调用该方法。这种情况下 SphU.entry 会运行默认调用,后面我们将会看到。 name: 表示context的名称 origin: 通常表示表示调用来源,比如调用者的ip或者其他标识来源的字段.

在理解这个方法之前,我们需要先知道以下几个概念 Resource: sentinel的控制粒度被称为资源。比如上述代码就是一个资源。 Node: 资源的封装,用于存储资源的实时统计数据等。

protected static Context trueEnter(String name, String origin) {
        Context context = contextHolder.get();
        if (context == null) {
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    LOCK.lock();
                    try {
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // Add entrance node.
                                Constants.ROOT.addChild(node);

                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }

        return context;
    }

该方法呢,首先从当前线程的ThreadLocal中尝试获取上下文(context结构体). 如果没有获取到,那么从本地静态map(localCacheNameMap)中尝试获取当前资源节点(Node)。 如果依然没有获取到这个资源节点,那就创建一个 EntranceNode ,并且将其插入跟节点中。 最后创建一个上下文,放入threadlocal中。 PS: ROOT节点也是一个EntranceNode

public final static DefaultNode ROOT = new EntranceNode(new StringResourceWrapper(ROOT_ID, EntryType.IN),
        new ClusterNode(ROOT_ID, ResourceTypeConstants.COMMON));

SphU.entry("test1", EntryType.IN)

该方法最终调用

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();
       ......

        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }
		..........
    }

首先从threadlocak中获取上下文(通过上面 ContextUtil.enter设置进去)。如果没有找到,则会执行 InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME) (最终还是调用 trueEnter ),插入一个默认的节点。

PS: 第一次调用的时候还会运行所有 InitFunc 类的init方法。

lookProcessChain(resourceWrapper);

ProcessorSlotChain:sentinel的核心设计模式就是责任链模式,该类就是责任链的入口。 该方法的核心作用就是获取一个 ProcessorSlotChain 。

当该资源第一次调用该方法时,就会调用 SlotChainProvider.newSlotChain(); 创建一个新的ProcessorSlotChain,并且放入全局变量 chainMap 中。

默认使用DefaultSlotChainBuilder的build方法创建ProcessorSlotChain。

 public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        // Note: the instances of ProcessorSlot should be different, since they are not stateless.
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }

创建一个默认的DefaultProcessorSlotChain,并且会在其中依次添加如下任务节点 PS: 节点的并非单例模式,每次调用build就会创建一个新的实例。

DefaultProcessorSlotChain.entry

DefaultProcessorSlotChain中维护了一个任务节点的链表,链表头(first)是一个AbstractLinkedProcessorSlot。当调用DefaultProcessorSlotChain.entry时,实际上就是调用上面加入的第一个slot NodeSelectorSlot,的entry方法。

NodeSelectorSlot

责任链的第一个节点。 该节点会给每一个context创建一个 DefaultNode。 首先不同的资源使用的是不同的NodeSelectorSlot实例。 对于相同的资源,但是context不同的情况,会给每个context创建一个 DefaultNode.

ClusterBuilderSlot

创建一个 ClusterNode 该node主要用来存储资源的运行时数据,比如rt ,线程数量,qps等。 如果设置了 origin,同时会创建一个 StatisticNode 用于在origin维度统计资源数据。

LogSlot

处理限流日志的节点。简单来说就是当发生限流时,将日志写入sentinel_block.log里面

StatisticSlot

sentinel的核心节点,用于进行数据统计。和logslot有些类似,他会先运行next节点的逻辑。处理完成之后才会运行统计的逻辑。

其中用到的统计方式就是上面所说的滑动窗口的方法。

 @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            ......
        } catch (BlockException e) {
           .......

在顺利执行的情况下,StatisticSlot会做四件事情 一. 为当前资源的计数线程+1,通过qps+1 二. 为当前origin 计数线程+1,通过qps+1 三. 为全局节点计数线程+1,通过qps+1 四. 调用所有自定义流程运行回调的callback. (默认情况下,只有一个callback MetricEntryCallback,而该callback会调用所有自定义MetricExtension对象——用于做自定义数据统计——的onPass方法,默认情况下相当于啥也没干)。

在异常情况下也会执行类似操作,具体可以阅读源码。

AuthoritySlot

顾名思义,主要用来实现黑白名单的能力,当不满足条件时,会抛出 AuthorityException异常(继承与BlockException).

SystemSlot

用来实现系统限流 当出现系统平均rt,总qps,load,总线程数,cpu使用率,超过阈值时返回SystemBlockException错误。

FlowSlot

最常用的控制环节,通过qps等条件限制资源访问。

DegradeSlot

用于检测熔断降级条件的节点。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
2 收藏
1
分享
返回顶部
顶部