Sentinel
大名鼎鼎的限流、容灾中间件,阿里的开源项目,经过双十一验证,兼容各类其他中间件,支持多语言。所以还有什么理由不拥抱它呢? https://github.com/alibaba/Sentinel
主要能力
- 流量限制
- 熔断降级
- 系统自适应保护
简单源码分析
滑动窗口
sentinel的数据统计采用的是滑动窗口的模式。数据统计基础的类是StatisticNode
该类包含三个关键统计指标 秒级别qps统计 分钟级qps统计 线程数统计
qps的统计使用到了滑动窗口的统计方案。已更简单,更典型的分钟级qps统计为例。 当创建一个StatisticNode
结构体时,会同时创建一个用于统计qps的对象
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
之后会将1分钟划分成60个窗口。
当资源被运行时,会找到当前时间对应的窗口(逻辑不算复杂),在该窗口内计数。
demo
public class BasicDemo {
public static void main(String[] args){
ContextUtil.enter("name1","app1");
Entry entry = null;
try{
entry = SphU.entry("test1", EntryType.IN);
System.out.println("hello world");
} catch (BlockException e) {
System.out.println("系统繁忙");
e.printStackTrace();
}finally {
if(entry!=null) {
entry.exit();
}
}
}
}
ContextUtil.enter(String name, String origin);
实际使用中,很多时候我们不会主动调用该方法。这种情况下 SphU.entry 会运行默认调用,后面我们将会看到。 name: 表示context的名称 origin: 通常表示表示调用来源,比如调用者的ip或者其他标识来源的字段.
在理解这个方法之前,我们需要先知道以下几个概念 Resource: sentinel的控制粒度被称为资源。比如上述代码就是一个资源。 Node: 资源的封装,用于存储资源的实时统计数据等。
protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
LOCK.lock();
try {
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
Constants.ROOT.addChild(node);
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
该方法呢,首先从当前线程的ThreadLocal中尝试获取上下文(context结构体). 如果没有获取到,那么从本地静态map(localCacheNameMap)中尝试获取当前资源节点(Node)。 如果依然没有获取到这个资源节点,那就创建一个 EntranceNode ,并且将其插入跟节点中。 最后创建一个上下文,放入threadlocal中。 PS: ROOT节点也是一个EntranceNode
public final static DefaultNode ROOT = new EntranceNode(new StringResourceWrapper(ROOT_ID, EntryType.IN),
new ClusterNode(ROOT_ID, ResourceTypeConstants.COMMON));
SphU.entry("test1", EntryType.IN)
该方法最终调用
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
......
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
..........
}
首先从threadlocak中获取上下文(通过上面 ContextUtil.enter设置进去)。如果没有找到,则会执行 InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME) (最终还是调用 trueEnter ),插入一个默认的节点。
PS: 第一次调用的时候还会运行所有 InitFunc
类的init方法。
lookProcessChain(resourceWrapper);
ProcessorSlotChain:sentinel的核心设计模式就是责任链模式,该类就是责任链的入口。 该方法的核心作用就是获取一个 ProcessorSlotChain 。
当该资源第一次调用该方法时,就会调用 SlotChainProvider.newSlotChain();
创建一个新的ProcessorSlotChain,并且放入全局变量 chainMap 中。
默认使用DefaultSlotChainBuilder的build方法创建ProcessorSlotChain。
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// Note: the instances of ProcessorSlot should be different, since they are not stateless.
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
创建一个默认的DefaultProcessorSlotChain,并且会在其中依次添加如下任务节点 PS: 节点的并非单例模式,每次调用build就会创建一个新的实例。
DefaultProcessorSlotChain.entry
DefaultProcessorSlotChain中维护了一个任务节点的链表,链表头(first)是一个AbstractLinkedProcessorSlot。当调用DefaultProcessorSlotChain.entry时,实际上就是调用上面加入的第一个slot NodeSelectorSlot,的entry方法。
NodeSelectorSlot
责任链的第一个节点。 该节点会给每一个context创建一个 DefaultNode。 首先不同的资源使用的是不同的NodeSelectorSlot实例。 对于相同的资源,但是context不同的情况,会给每个context创建一个 DefaultNode.
ClusterBuilderSlot
创建一个 ClusterNode 该node主要用来存储资源的运行时数据,比如rt ,线程数量,qps等。 如果设置了 origin,同时会创建一个 StatisticNode 用于在origin维度统计资源数据。
LogSlot
处理限流日志的节点。简单来说就是当发生限流时,将日志写入sentinel_block.log里面
StatisticSlot
sentinel的核心节点,用于进行数据统计。和logslot有些类似,他会先运行next节点的逻辑。处理完成之后才会运行统计的逻辑。
其中用到的统计方式就是上面所说的滑动窗口的方法。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
......
} catch (BlockException e) {
.......
在顺利执行的情况下,StatisticSlot会做四件事情 一. 为当前资源的计数线程+1,通过qps+1 二. 为当前origin 计数线程+1,通过qps+1 三. 为全局节点计数线程+1,通过qps+1 四. 调用所有自定义流程运行回调的callback. (默认情况下,只有一个callback MetricEntryCallback,而该callback会调用所有自定义MetricExtension对象——用于做自定义数据统计——的onPass方法,默认情况下相当于啥也没干)。
在异常情况下也会执行类似操作,具体可以阅读源码。
AuthoritySlot
顾名思义,主要用来实现黑白名单的能力,当不满足条件时,会抛出 AuthorityException异常(继承与BlockException).
SystemSlot
用来实现系统限流 当出现系统平均rt,总qps,load,总线程数,cpu使用率,超过阈值时返回SystemBlockException错误。
FlowSlot
最常用的控制环节,通过qps等条件限制资源访问。
DegradeSlot
用于检测熔断降级条件的节点。