面试官问DUBBO不能降级哪类异常,我们聊了二十分钟

原创
2021/05/27 12:07
阅读数 7.2K

欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我个人微信「java_front」一起交流

1 服务雪崩

在分析服务降级之前,我们首先谈一谈什么是服务雪崩。现在我们假设存在A、B、C、D四个系统,系统间存在如下调用链路:

面试官问DUBBO不能降级哪类异常,我们聊了二十分钟

 

在正常情况下系统之间调用快速且正常,系统运行平稳。但是此时用户访问系统A的流量激增,这些流量在瞬间透传到B、C、D三个系统。B、C系统服务器节点较多抗住了这些流量,但是D系统服务器节点较少,没有抗住这些流量,导致D系统的资源逐渐耗尽,只能提供慢服务,最终结果是响应用户时延很长。

面试官问DUBBO不能降级哪类异常,我们聊了二十分钟

 

此时用户发现响应很慢,以为是自己网络不好会反复重试,那么成倍的流量会打到系统中,导致上游系统资源也逐渐耗尽了,整个访问链路都最终都不可用。

面试官问DUBBO不能降级哪类异常,我们聊了二十分钟

 

以上介绍了服务雪崩场景,我们发现在链路中一个节点出现问题,导致整个链路最终都不可用了,这是不可以接受的。

 

2 非线性

我们再从另一个概念来理解服务雪崩:非线性。这个概念在我们生活中无处不在。

你要赶早上8点钟的火车,如果6:30出发可以在7:00到达车站,于是你得到一个结论:只要30分钟就可以到达车站。

你早上想睡晚一点预计7:10出发,想着7:40可以到达车站。但是最可能的结果是你将错过这趟火车。因为正好遇上早高峰,堵车导致你至少需要花费1个小时才能到达车站。

一个小雪球的重量是100克,打雪仗时你被砸中100次,这对你不会造成任何影响。

但是如果你被10公斤的雪球砸中1次,这可能会对你造成严重的伤害。

这就是非线性。事物不是简单叠加关系,当达到某个临界值时会造成一种完全截然不同的结果。

我们来分析一个互联网的秒杀场景。假设你设计的秒杀系统当每秒30个人访问时,响应时间是10毫秒。即从用户点击按钮至得到结果这个过程,只花费了10毫秒。这个时间的流逝基本上察觉不到,性能是不错的。你感觉很好继续设计:

每秒30个访问量响应时间10毫秒

每秒300个访问量响应时间100毫秒

每秒3000个访问量响应时间1000毫秒

如果你按照这个思路去做系统设计,将会发生重大的错误。因为当每秒3000个访问量发生时,系统的响应时间可能不是1000毫秒,而可能直接导致系统崩溃,无法再处理任何的请求。最常见的场景就是当缓存系统失效时,导致的系统雪崩:

(1) 当耗时低的缓存层出现故障时,流量直接打在了耗时高的数据库层,用户的等待时长就会增加

(2) 等待时长的增加导致用户更加频繁去访问,更多的流量会打在数据库层

(3) 这导致用户的等待时长进一步增加,再次导致更频繁的访问

(4) 当访问量达到一个极限值时,造成系统崩溃,无法再处理任何请求

流量和响应时间绝不是简单的叠加关系,当到达某个临界值时,技术系统将直接崩溃。

3 服务雪崩应对方案

保证系统的稳定性和高可用性,我们需要采取一些高可用策略,目的是构建一个稳定的高可用工程系统,我们一般采用如下方案。

 

3.1 冗余 + 自动故障转移

最基本的冗余策略就是主从模式。原理是准备两台机器,部署了同一份代码,在功能层面是相同的,都可以对外提供相同的服务。

一台机器启动提供服务,这就是主服务器。另一台机器启动在一旁待命,不提供服务,随时监听主服务器的状态,这就是从服务器。当发现主服务器出现故障时,从服务器立刻替换主服务器,继续为用户提供服务。

自动故障转移策略是指当主系统发生异常时,应该可以自动探测到异常,并自动切换为备用系统。不应该只依靠人工去切换成,否则故障处理时间会显著增加。

3.2 降级策略

所谓降级策略,就是当系统遇到无法承受的压力时,选择暂时关闭一些非关键的功能,或者延时提供一些功能,把此刻所有的资源都提供给现在最关键的服务。

在秒杀场景中下订单就是最核心最关键的功能。当系统压力将要到达临界值时,可以暂时先关闭一些非核心功能如查询功能。

当秒杀活动结束后,再将暂时关闭的功能开启。这样既保证了秒杀活动的顺利进行,也保护了系统没有崩溃。

还有一种降级策略,当系统依赖的下游服务出现错误,甚至已经完全不可用了,那么此时就不能再调用这个下游服务了,否则可能导致雪崩。所以直接返回兜底方案,把下游服务直接降级。

这里比较两个概念:服务降级与服务熔断,因为这两个概念比较相似。我认为服务熔断是服务降级的一个方法,而服务降级还有很多其它方法,例如开关降级、流量降级等等。

 

3.3 延时策略

用户下订单成功后就需要进行支付。假设秒杀系统下订单每秒访问量是3000,我们来思考一个问题,有没有必要将每秒3000次访问量的压力传递给支付服务器?

答案是没有必要。因为用户秒杀成功后可以稍晚付款,比如可以跳转到一个支付页面,提示用户只要在10分钟内支付完成即可。

这样每秒3000次访问量就被分摊至几分钟,有效保护了系统。技术架构还可以使用消息队列做缓冲,让支付服务按照自己的能力去处理业务。

 

3.4 隔离策略

物理隔离:应用分别部署在不同物理机、不同机房,资源不会互相影响。

线程隔离:不同类型的请求进行分类,交给不同的线程池处理,当一类请求出现高耗时和异常,不影响另一类请求访问。

 

4 服务降级

本文我们重点结合Dubbo框架谈一谈服务降级。现在我们有服务提供者提供如下服务:

public interface HelloService {
    public String sayHello(String name) throws Exception;
}

public class HelloServiceImpl implements HelloService {
    public String sayHello(String name) throws Exception {
        String result = "hello[" + name + "]";
        return result;
    }
}

配置文件声明服务接口:

<dubbo:service interface="com.java.front.demo.provider.HelloService" ref="helloService" />

 

4.1 降级策略配置

Dubbo框架是自带服务降级策略的,提供了三种常用的降级策略,我们看一看如何进行配置。

(1) 强制降级策略

<dubbo:reference id="helloService" mock="force:return 1" interface="com.java.front.demo.provider.HelloService" /

 

(2) 异常降级策略

<dubbo:reference id="helloService" mock="throw com.java.front.BizException" interface="com.java.front.dubbo.demo.provider.HelloService" />

 

(3) 自定义降级策略

package com.java.front.dubbo.demo.consumer;
import com.java.front.demo.provider.HelloService;

public class HelloServiceMock implements HelloService {

    @Override
    public String sayHello(String name) throws Exception {
        return "mock";
    }
}

配置指定自定义降级策略:

<dubbo:reference id="helloService" mock="com.java.front.dubbo.demo.consumer.HelloServiceMock" interface="com.java.front.demo.provider.HelloService" />

 

4.2 源码分析

public class MockClusterInvoker<Timplements Invoker<T{

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        // 检查是否有mock属性
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();

        // 没有mock属性直接执行消费逻辑
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {

            // 服务消费默认执行FailoverClusterInvoker
            result = this.invoker.invoke(invocation);
        }

        // 不执行消费逻辑直接返回
        else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            // 直接执行mock逻辑
            result = doMockInvoke(invocation, null);
        } else {
            try {
                // 服务消费默认执行FailoverClusterInvoker
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                }
                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                }
                // 服务消费失败执行mock逻辑
                result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }
}


public class MockInvoker<Timplements Invoker<T{

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        String mock = getUrl().getParameter(invocation.getMethodName() + "." + Constants.MOCK_KEY);
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(this);
        }
        if (StringUtils.isBlank(mock)) {
            mock = getUrl().getParameter(Constants.MOCK_KEY);
        }

        if (StringUtils.isBlank(mock)) {
            throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
        }
        mock = normalizeMock(URL.decode(mock));

        // <mock="force:return 1">直接包装返回结果
        if (mock.startsWith(Constants.RETURN_PREFIX)) {
            mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
            try {
                Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
                Object value = parseMockValue(mock, returnTypes);
                return new RpcResult(value);
            } catch (Exception ew) {
                throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: " + url, ew);
            }
        }

        // <mock="throw">抛出异常
        else if (mock.startsWith(Constants.THROW_PREFIX)) {
            mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
            if (StringUtils.isBlank(mock)) {
                throw new RpcException("mocked exception for service degradation.");
            } else {
                // 获取自定义异常
                Throwable t = getThrowable(mock);
                throw new RpcException(RpcException.BIZ_EXCEPTION, t);
            }
        }

        // <mock="com.java.front.HelloServiceMock">自定义mock策略
        else {
            try {
                Invoker<T> invoker = getInvoker(mock);
                return invoker.invoke(invocation);
            } catch (Throwable t) {
                throw new RpcException("Failed to create mock implementation class " + mock, t);
            }
        }
    }
}

 

5 产生疑问

通过上述源码我们知道,如果在mock属性中配置force,那么不会执行真正的业务逻辑,而是只执行mock逻辑,这一部分比较容易理解:

// 不执行消费逻辑直接返回
else if (value.startsWith("force")) {
    if (logger.isWarnEnabled()) {
        logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
    }
    // 直接执行mock逻辑
    result = doMockInvoke(invocation, null);
}

但是如果是其它mock配置则首先执行业务代码,如果业务代码发生异常了再执行mock逻辑:

try {
    // 服务消费默认执行FailoverClusterInvoker
    result = this.invoker.invoke(invocation);
} catch (RpcException e) {
    if (e.isBiz()) {
        throw e;
    }
    if (logger.isWarnEnabled()) {
        logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
    }
    // 服务消费失败执行mock逻辑
    result = doMockInvoke(invocation, e);
}

这段代码捕获了RpcException异常,那么问题来了RpcException是什么类型的异常?我们使用自定义降级策略进行实验,消费者代码如下:

package com.java.front.dubbo.demo.consumer;
import com.java.front.demo.provider.HelloService;

public class HelloServiceMock implements HelloService {

    @Override
    public String sayHello(String name) throws Exception {
        return "mock";
    }
}

配置指定自定义策略并设置服务超时为2秒:

<dubbo:reference id="helloService" mock="com.java.front.dubbo.demo.consumer.HelloServiceMock" interface="com.java.front.demo.provider.HelloService" timeOut="2000" />

消费者测试代码如下:

public static void testMock() {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer1.xml" });
    context.start();
    HelloService helloServiceMock = (HelloService) context.getBean("helloService");
    String result = helloServiceMock.sayHello("JAVA前线");
    System.out.println("消费者收到结果=" + result);
}

 

5.1 超时异常

5.1.1 代码实例

我们在生产者业务代码造成5秒的阻塞,模拟一个慢服务:

public class HelloServiceImpl implements HelloService {

    public String sayHello(String name) throws Exception {
        String result = "hello[" + name + "]";
        // 模拟耗时操作5秒
        Thread.sleep(5000L);
        return result;
    }
}

消费者执行返回mock结果,说明超时异常属于RpcException异常,可以被降级策略捕获:

消费者收到结果=mock

 

5.1.2 源码分析

要分析超时异常为什么可以被降级策略捕获,我们从以下两个类分析。DefaultFuture.get方法采用了经典多线程保护性暂停模式,并且实现了异步转同步的效果,如果发生超时异常则抛出TimeoutException异常:

public class DefaultFuture implements ResponseFuture {

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        // response对象为空
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                // 进行循环
                while (!isDone()) {

                    // 放弃锁并使当前线程阻塞,直到发出信号或中断它或者达到超时时间
                    done.await(timeout, TimeUnit.MILLISECONDS);

                    // 阻塞结束后再判断是否完成
                    if (isDone()) {
                        break;
                    }
                    // 阻塞结束后判断超过超时时间
                    if(System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // response对象仍然为空则抛出超时异常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }
}

DubboInvoker调用了DefaultFuture.get方法,如果捕获到上述TimeoutException则会抛出RpcException:

public class DubboInvoker<Textends AbstractInvoker<T{

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        try {
            // request方法发起远程调用 -> get异步转同步并进行超时验证
            RpcContext.getContext().setFuture(null);
            Result result = (Result) currentClient.request(inv, timeout).get();
            return result;
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

源码分析到这里已经很清楚了,RpcException正是服务降级策略可以捕获的异常,所以超时异常是可以被降级的。

 

5.2 业务异常

本文我们把非超时异常统称为业务异常,例如生产者业务执行时发生运行时异常可以归为业务异常,下面我们进行试验。

5.2.1 代码实例

生产者执行过程中抛出运行时异常:

public class HelloServiceImpl implements HelloService {

    public String sayHello(String name) throws Exception {
        throw new RuntimeException("BizException")
    }
}

消费者调用直接抛出异常:

java.lang.RuntimeException: BizException
  at com.java.front.dubbo.demo.provider.HelloServiceImpl.sayHello(HelloServiceImpl.java:35)
  at org.apache.dubbo.common.bytecode.Wrapper1.invokeMethod(Wrapper1.java)
  at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:56)
  at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:85)

 

5.2.2 源码分析

我们发现服务降级对业务异常没有生效,需要分析原因,我认为从以下两点进行分析:

(1) 消费者接收到什么消息

public class DefaultFuture implements ResponseFuture {
    public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response " + response
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                               + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
}

response用来接收服务端发送的消息,我们看到异常信息存放在Response的exception属性:

Response [id=0, version=null, status=20event=false, error=null, result=RpcResult [result=null, exception=java.lang.RuntimeException: BizException]]

 

(2) 异常在哪里被抛出

我们知道消费者对象是一个代理对象,首先会执行到InvokerInvocationHandler:

public class InvokerInvocationHandler implements InvocationHandler {
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class{
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }

        // RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[JAVA前线], attachments={}]
        RpcInvocation rpcInvocation = createInvocation(method, args);

        // 消费者Invoker -> MockClusterInvoker(FailoverClusterInvoker(RegistryDirectory(invokers)))
        Result result = invoker.invoke(rpcInvocation);

        // 结果包含异常信息则抛出异常 -> 例如异常结果对象RpcResult [result=null, exception=java.lang.RuntimeException: sayHelloError1 error]
        return result.recreate();
    }
}

RpcResult.recreate方法会处理异常,如果发现异常对象不为空则抛出异常:

public class RpcResult extends AbstractResult {

    @Override
    public Object recreate() throws Throwable {
        if (exception != null) {
            try {
                Class clazz = exception.getClass();
                while (!clazz.getName().equals(Throwable.class.getName())) {
                    clazz = clazz.getSuperclass();
                }
                Field stackTraceField = clazz.getDeclaredField("stackTrace");
                stackTraceField.setAccessible(true);
                Object stackTrace = stackTraceField.get(exception);
                if (stackTrace == null) {
                    exception.setStackTrace(new StackTraceElement[0]);
                }
            } catch (Exception e) {
            }
            throw exception;
        }
        return result;
    }
}

 

5.2.3 业务异常如何降级

通过上述实例我们知道Dubbo自带的服务降级策略只能降级超时异常,而不能降级业务异常。

那么业务异常应该如何降级呢?我们可以整合Dubbo、Hystrix进行业务异常熔断,相关配置也并不复杂,大家可以网上查阅相关资料。

 

6 文章总结

本文我们首先介绍了服务雪崩这个场景,并且从非线性角度再次理解了服务雪崩。随后我们总结了服务雪崩应对方案,其中服务降级是应对服务雪崩的重要方法之一。我们针对超时异常和业务异常两种场,结合源码深入分析了Dubbo服务降级的使用场景,希望本文对大家有所帮助。

欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我个人微信「java_front」一起交流

展开阅读全文
加载中

作者的其它热门文章

打赏
1
2 收藏
分享
打赏
0 评论
2 收藏
1
分享
返回顶部
顶部