长文详解:DUBBO源码使用了哪些设计模式

原创
2021/05/25 19:53
阅读数 3.9K

欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我个人微信「java_front」一起交流学习

0 文章概述

DUBBO作为RPC领域优秀开源的框架在业界十分流行,本文我们阅读其源码并对其使用到的设计模式进行分析。需要说明的是本文所说的设计模式更加广义,不仅包括标准意义上23种设计模式,还有一些常见经过检验的代码模式例如双重检查锁模式、多线程保护性暂停模式等等。

1 模板方法

模板方法模式定义一个操作中的算法骨架,一般使用抽象类定义算法骨架。抽象类同时定义一些抽象方法,这些抽象方法延迟到子类实现,这样子类不仅遵守了算法骨架约定,也实现了自己的算法。既保证了规约也兼顾灵活性。这就是用抽象构建框架,用实现扩展细节。

DUBBO源码中有一个非常重要的核心概念Invoker,我们可以理解为执行器或者说一个可执行对象,能够根据方法的名称、参数得到相应执行结果,这个特性体现了代理模式我们后面章节再说,本章节我们先分析其中的模板方法模式。

public abstract class AbstractInvoker<Timplements Invoker<T{

    @Override
    public Result invoke(Invocation inv) throws RpcException {
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            return doInvoke(invocation);
        } catch (InvocationTargetException e) {
            Throwable te = e.getTargetException();
            if (te == null) {
                return new RpcResult(e);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                return new RpcResult(te);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                return new RpcResult(e);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }

    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
}

AbstractInvoker作为抽象父类定义了invoke方法这个方法骨架,并且定义了doInvoke抽象方法供子类扩展,例如子类InjvmInvoker、DubboInvoker各自实现了doInvoke方法。

InjvmInvoker是本地引用,调用时直接从本地暴露生产者容器获取生产者Exporter对象即可。

class InjvmInvoker<Textends AbstractInvoker<T{

    @Override
    public Result doInvoke(Invocation invocation) throws Throwable {
        Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
        if (exporter == null) {
            throw new RpcException("Service [" + key + "] not found.");
        }
        RpcContext.getContext().setRemoteAddress(Constants.LOCALHOST_VALUE, 0);
        return exporter.getInvoker().invoke(invocation);
    }
}

DubboInvoker相对复杂一些,需要考虑同步异步调用方式,配置优先级,远程通信等等。

public class DubboInvoker<Textends AbstractInvoker<T{

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

            // 超时时间方法级别配置优先级最高
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);
                Result result;
                if (isAsyncFuture) {
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

 

2 动态代理

代理模式核心是为一个目标对象提供一个代理,以控制对这个对象的访问,我们可以通过代理对象访问目标对象,这样可以增强目标对象功能。

代理模式分为静态代理与动态代理,动态代理又分为JDK代理和Cglib代理,JDK代理只能代理实现类接口的目标对象,但是Cglib没有这种要求。

 

2.1 JDK动态代理

动态代理本质是通过生成字节码的方式将代理对象织入目标对象,本文以JDK动态代理为例。

第一步定义业务方法,即被代理的目标对象:

public interface StudentJDKService {
    public void addStudent(String name);
    public void updateStudent(String name);
}

public class StudentJDKServiceImpl implements StudentJDKService {

    @Override
    public void addStudent(String name) {
        System.out.println("add student=" + name);
    }

    @Override
    public void updateStudent(String name) {
        System.out.println("update student=" + name);
    }
}

第二步定义一个事务代理对象:

public class TransactionInvocationHandler implements InvocationHandler {

    private Object target;

    public TransactionInvocationHandler(Object target) {
        this.target = target;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println("------前置通知------");
        Object rs = method.invoke(target, args);
        System.out.println("------后置通知------");
        return rs;
    }
}

第三步定义代理工厂:

public class ProxyFactory {

    public Object getProxy(Object target, InvocationHandler handler) {
        ClassLoader loader = this.getClass().getClassLoader();
        Class<?>[] interfaces = target.getClass().getInterfaces();
        Object proxy = Proxy.newProxyInstance(loader, interfaces, handler);
        return proxy;
    }
}

第四步进行测试:

public class ZTest {

    public static void main(String[] args) throws Exception {
        testSimple();
    }

    public static void testSimple() {
        StudentJDKService target = new StudentJDKServiceImpl();
        TransactionInvocationHandler handler = new TransactionInvocationHandler(target);
        ProxyFactory proxyFactory = new ProxyFactory();
        Object proxy = proxyFactory.getProxy(target, handler);
        StudentJDKService studentService = (StudentJDKService) proxy;
        studentService.addStudent("JAVA前线");
    }
}

ProxyGenerator.generateProxyClass是生成字节码文件核心方法,我们看一看动态字节码到底如何定义:

public class ZTest {

    public static void main(String[] args) throws Exception {
        createProxyClassFile();
    }

    public static void createProxyClassFile() {
        String name = "Student$Proxy";
        byte[] data = ProxyGenerator.generateProxyClass(name, new Class[] { StudentJDKService.class });
        FileOutputStream out = null;
        try {
            String fileName = "c:/test/" + name + ".class";
            File file = new File(fileName);
            out = new FileOutputStream(file);
            out.write(data);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            if (null != out) {
                try {
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

最终生成字节码文件如下,我们看到代理对象被织入了目标对象:

import com.xpz.dubbo.simple.jdk.StudentJDKService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;

public final class Student$Proxy extends Proxy implements StudentJDKService {
    private static Method m1;

    private static Method m2;

    private static Method m4;

    private static Method m3;

    private static Method m0;

    public Student$Proxy(InvocationHandler paramInvocationHandler) {
        super(paramInvocationHandler);
    }

    public final boolean equals(Object paramObject) {
        try {
            return ((Boolean)this.h.invoke(this, m1, new Object[] { paramObject })).booleanValue();
        } catch (Error | RuntimeException error) {
            throw null;
        } catch (Throwable throwable) {
            throw new UndeclaredThrowableException(throwable);
        }
    }

    public final String toString() {
        try {
            return (String)this.h.invoke(this, m2, null);
        } catch (Error | RuntimeException error) {
            throw null;
        } catch (Throwable throwable) {
            throw new UndeclaredThrowableException(throwable);
        }
    }

    public final void updateStudent(String paramString) {
        try {
            this.h.invoke(this, m4, new Object[] { paramString });
            return;
        } catch (Error | RuntimeException error) {
            throw null;
        } catch (Throwable throwable) {
            throw new UndeclaredThrowableException(throwable);
        }
    }

    public final void addStudent(String paramString) {
        try {
            this.h.invoke(this, m3, new Object[] { paramString });
            return;
        } catch (Error | RuntimeException error) {
            throw null;
        } catch (Throwable throwable) {
            throw new UndeclaredThrowableException(throwable);
        }
    }

    public final int hashCode() {
        try {
            return ((Integer)this.h.invoke(this, m0, null)).intValue();
        } catch (Error | RuntimeException error) {
            throw null;
        } catch (Throwable throwable) {
            throw new UndeclaredThrowableException(throwable);
        }
    }

    static {
        try {
            m1 = Class.forName("java.lang.Object").getMethod("equals"new Class[] { Class.forName("java.lang.Object") });
            m2 = Class.forName("java.lang.Object").getMethod("toString"new Class[0]);
            m4 = Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("updateStudent"new Class[] { Class.forName("java.lang.String") });
            m3 = Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("addStudent"new Class[] { Class.forName("java.lang.String") });
            m0 = Class.forName("java.lang.Object").getMethod("hashCode"new Class[0]);
            return;
        } catch (NoSuchMethodException noSuchMethodException) {
            throw new NoSuchMethodError(noSuchMethodException.getMessage());
        } catch (ClassNotFoundException classNotFoundException) {
            throw new NoClassDefFoundError(classNotFoundException.getMessage());
        }
    }
}

 

2.2 DUBBO源码应用

那么在DUBBO源码中动态代理是如何体现的呢?我们知道消费者在消费方法时实际上执行的代理方法,这是消费者在refer时生成的代理方法。

代理工厂AbstractProxyFactory有两个子类:

JdkProxyFactory
JavassistProxyFactory

通过下面源码我们可以分析得到,DUBBO通过InvokerInvocationHandler对象代理了invoker对象:

public class JdkProxyFactory extends AbstractProxyFactory {

    @Override
    public <T> getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }
}

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    public <T> getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
}

InvokerInvocationHandler将参数信息封装至RpcInvocation进行传递:

public class InvokerInvocationHandler implements InvocationHandler {
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class{
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[JAVA前线], attachments={}]
        RpcInvocation rpcInvocation = createInvocation(method, args);
        return invoker.invoke(rpcInvocation).recreate();
    }

    private RpcInvocation createInvocation(Method method, Object[] args) {
        RpcInvocation invocation = new RpcInvocation(method, args);
        if (RpcUtils.hasFutureReturnType(method)) {
            invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
            invocation.setAttachment(Constants.ASYNC_KEY, "true");
        }
        return invocation;
    }
}

 

3 策略模式

在1995年出版的《设计模式:可复用面向对象软件的基础》给出了策略模式定义:

Define a family of algorithms, encapsulate each one, and make them interchangeable. Strategy lets the algorithm vary independently from clients that use it

定义一系列算法,封装每一个算法,并使它们可以互换。策略模式可以使算法的变化独立于使用它们的客户端代码。

在设计模式原则中有一条开闭原则:对扩展开放,对修改关闭,我认为这是设计模式中最重要设计原则原因如下:

(1) 当需求变化时应该通过扩展而不是通过修改已有代码来实现变化,这样就保证代码的稳定性,避免牵一发而动全身

(2) 扩展也不是随意扩展,因为事先定义了算法,扩展也是根据算法扩展,体现了用抽象构建框架,用实现扩展细节

(3) 标准意义的二十三种设计模式说到底最终都是在遵循开闭原则

 

3.1 策略模式实例

假设我们现在需要解析一段文本,这段文本有可能是HTML也有可能是TEXT,如果不使用策略模式应该怎么写呢?

public enum DocTypeEnum {
    HTML(1"HTML"),
    TEXT(2"TEXT");

    private int value;
    private String description;

    private DocTypeEnum(int value, String description) {
        this.value = value;
        this.description = description;
    }
    
    public int value() {  
        return value;  
    }    
}

public class ParserManager {

    public void parse(Integer docType, String content) {
        // 文本类型是HTML
        if(docType == DocTypeEnum.HTML.getValue()) {
            // 解析逻辑
        }
        // 文本类型是TEXT
        else if(docType == DocTypeEnum.TEXT.getValue()) {
            // 解析逻辑
        }
    }
}

这种写法功能上没有问题,但是当本文类型越来越多时,那么parse方法将会越来越冗余和复杂,if else代码块也会越来越多,所以我们要使用策略模式。

第一步定义业务类型和业务实体:

public enum DocTypeEnum {
    HTML(1"HTML"),
    TEXT(2"TEXT");

    private int value;
    private String description;

    private DocTypeEnum(int value, String description) {
        this.value = value;
        this.description = description;
    }

    public int value() {
        return value;
    }
}

public class BaseModel {
    // 公共字段
}

public class HtmlContentModel extends BaseModel {
    // HTML自定义字段
}

public class TextContentModel extends BaseModel {
    // TEXT自定义字段
}

第二步定义策略:

public interface Strategy<T extends BaseModel{
    public T parse(String sourceContent);
}

@Service
public class HtmlStrategy implements Strategy {

    @Override
    public HtmlContentModel parse(String sourceContent) {
        return new HtmlContentModel("html");
    }
}

@Service
public class TextStrategy implements Strategy {

    @Override
    public TextContentModel parse(String sourceContent) {
        return new TextContentModel("text");
    }
}

第三步定义策略工厂:

@Service
public class StrategyFactory implements InitializingBean {
    
    private Map<Integer,Strategy> strategyMap = new HashMap<>();  
    @Resource
    private Strategy<HtmlContentModel> htmlStrategy ;
    @Resource
    private Strategy<TextContentModel> textStrategy ;

    @Override
   public void afterPropertiesSet() throws Exception  {
       strategyMap.put(RechargeTypeEnum.HTML.value(), htmlStrategy);   
       strategyMap.put(RechargeTypeEnum.TEXT.value(),textStrategy);
   }

   public Strategy getStrategy(int type) {
       return strategyMap.get(type);
   }
} 

第四步定义策略执行器:

@Service
public class StrategyExecutor<T extends BaseModel{

    @Resource
    private StrategyFactory<T> strategyFactory;

    public T parse(String sourceContent, Integer type) {
        Strategy strategy = StrategyFactory.getStrategy(type);
        return strategy.parse(sourceContent);
    }
}

第五步执行测试用例:

public class Test {

    @Resource
    private StrategyExecutor  executor;

    @Test
    public void test() {
        // 解析HTML
        HtmlContentModel content1 = (HtmlContentModel) executor.parse("<a>测试内容</a>",  DocTypeEnum.HTML.value());
        System.out.println(content1);

        // 解析TEXT
        TextContentModel content2 = (TextContentModel)executor.calRecharge("测试内容",  DocTypeEnum.TEXT.value());
        System.out.println(content2);
    }
}

如果新增文本类型我们再扩展新策略即可。我们再回顾策略模式定义会有更深的体会:定义一系列算法,封装每一个算法,并使它们可以互换。策略模式可以使算法的变化独立于使用它们的客户端代码。

 

3.2 DUBBO源码应用

在上述实例中我们将策略存储在map容器,我们思考一下还有没有其它地方可以存储策略?答案是配置文件。下面就要介绍SPI机制,我认为这个机制在广义上实现了策略模式。

SPI(Service Provider Interface)是一种服务发现机制,本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件加载实现类,这样可以在运行时动态为接口替换实现类,我们通过SPI机制可以为程序提供拓展功能。

 

3.2.1 JDK SPI

我们首先分析JDK自身SPI机制,定义一个数据驱动接口并提供两个驱动实现,最后通过serviceLoader加载驱动。

(1) 新建DataBaseDriver工程并定义接口

public interface DataBaseDriver {
    String connect(String hostIp);
}

(2) 打包这个工程为JAR

<dependency>
  <groupId>com.javafont.spi</groupId>
  <artifactId>DataBaseDriver</artifactId>
  <version>1.0.0-SNAPSHOT</version>
</dependency>

(3) 新建MySQLDriver工程引用上述依赖并实现DataBaseDriver接口

import com.javafont.database.driver.DataBaseDriver;

public class MySQLDataBaseDriver implements DataBaseDriver {
    @Override
    public String connect(String hostIp) {
        return "MySQL DataBase Driver connect";
    }
}

(4) 在MySQLDriver项目新建文件

src/main/resources/META-INF/services/com.javafont.database.driver.DataBaseDriver

(5) 在上述文件新增如下内容

com.javafont.database.mysql.driver.MySQLDataBaseDriver

(6) 按照上述相同步骤创建工程OracleDriver

(7) 打包上述两个项目

<dependency>
  <groupId>com.javafont.spi</groupId>
  <artifactId>MySQLDriver</artifactId>
  <version>1.0.0-SNAPSHOT</version>
</dependency>

<dependency>
  <groupId>com.javafont.spi</groupId>
  <artifactId>OracleDriver</artifactId>
  <version>1.0.0-SNAPSHOT</version>
</dependency>

(8) 新建测试项目引入上述MySQLDriver、OracleDriver

public class DataBaseConnector {
    public static void main(String[] args) {
        ServiceLoader<DataBaseDriver> serviceLoader = ServiceLoader.load(DataBaseDriver.class);
        Iterator<DataBaseDriver> iterator = serviceLoader.iterator();
        while (iterator.hasNext()) {
            DataBaseDriver driver = iterator.next();
            System.out.println(driver.connect("localhost"));
        }
    }
}

// 输出结果
// MySQL DataBase Driver connect
// Oracle DataBase Driver connect

我们并没有指定使用哪个驱动连接数据库,而是通过ServiceLoader方式加载所有实现了DataBaseDriver接口的实现类。假设我们只需要使用MySQL数据库驱动那么直接引入相应依赖即可。

 

3.2.2 DUBBO SPI

我们发现JDK SPI机制还是有一些不完善之处:例如通过ServiceLoader会加载所有实现了某个接口的实现类,但是不能通过一个key去指定获取哪一个实现类,但是DUBBO自己实现的SPI机制解决了这个问题。

例如Protocol接口有如下实现类:

org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

我们现在将这些类配置信息在配置文件,配置文件在如下目录:

META-INF/services/
META-INF/dubbo/
META-INF/dubbo/internal/

配置方式和JDK SPI方式配置不一样,每个实现类都有key与之对应:

dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol

使用时通过扩展点方式加载实现类:

public class ReferenceConfig<Textends AbstractReferenceConfig {

    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    private T createProxy(Map<String, String> map) {
        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        }
    }
}

getAdaptiveExtension()是加载自适应扩展点,javassist会为自适应扩展点生成动态代码:

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {

    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null)
            throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

extension对象就是根据url中protocol属性等于injvm最终加载InjvmProtocol对象,动态获取到了我们制定的业务对象,所以我认为SPI体现了策略模式。

 

4 装饰器模式

装饰器模式可以动态将责任附加到对象上,在不改变原始类接口情况下,对原始类功能进行增强,并且支持多个装饰器的嵌套使用。实现装饰器模式需要以下组件:

(1) Component(抽象构件)

核心业务抽象:可以使用接口或者抽象类

(2) ConcreteComponent(具体构件)

实现核心业务:最终执行的业务代码

(3) Decorator(抽象装饰器)

抽象装饰器类:实现Component并且组合一个Component对象

(4) ConcreteDecorator(具体装饰器)

具体装饰内容:装饰核心业务代码

4.1 装饰器实例

有一名足球运动员要去踢球,我们用球鞋和球袜为他装饰一番,这样可以使其战力值增加,我们使用装饰器模式实现这个实例。

(1) Component

/**
 * 抽象构件(可以用接口替代)
 */
public abstract class Component {

    /**
     * 踢足球(业务核心方法)
     */
    public abstract void playFootBall();
}

(2) ConcreteComponent

/**
 * 具体构件
 */
public class ConcreteComponent extends Component {

    @Override
    public void playFootBall() {
        System.out.println("球员踢球");
    }
}

(3) Decorator

/**
 * 抽象装饰器
 */
public abstract class Decorator extends Component {
    private Component component = null;

    public Decorator(Component component) {
        this.component = component;
    }

    @Override
    public void playFootBall() {
        this.component.playFootBall();
    }
}

(4) ConcreteDecorator

/**
 * 球袜装饰器
 */
public class ConcreteDecoratorA extends Decorator {

    public ConcreteDecoratorA(Component component) {
        super(component);
    }

    /**
     * 定义球袜装饰逻辑
     */
    private void decorateMethod() {
        System.out.println("换上球袜战力值增加");
    }

    /**
     * 重写父类方法
     */
    @Override
    public void playFootBall() {
        this.decorateMethod();
        super.playFootBall();
    }
}

/**
 * 球鞋装饰器
 */
public class ConcreteDecoratorB extends Decorator {

    public ConcreteDecoratorB(Component component) {
        super(component);
    }

    /**
     * 定义球鞋装饰逻辑
     */
    private void decorateMethod() {
        System.out.println("换上球鞋战力值增加");
    }

    /**
     * 重写父类方法
     */
    @Override
    public void playFootBall() {
        this.decorateMethod();
        super.playFootBall();
    }
}

(5) 运行测试

public class TestDecoratorDemo {
    public static void main(String[] args) {
        Component component = new ConcreteComponent();
        component = new ConcreteDecoratorA(component);
        component = new ConcreteDecoratorB(component);
        component.playFootBall();
    }
}

// 换上球鞋战力值增加
// 换上球袜战力值增加
// 球员踢球

 

4.2 DUBBO源码应用

DUBBO是通过SPI机制实现装饰器模式,我们以Protocol接口进行分析,首先分析装饰器类,抽象装饰器核心要点是实现了Component并且组合一个Component对象。

public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
}

public class ProtocolListenerWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolListenerWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
}

在配置文件中配置装饰器:

filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper

通过SPI机制加载扩展点时会使用装饰器装饰具体构件:

public class ReferenceConfig<Textends AbstractReferenceConfig {

    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    private T createProxy(Map<String, String> map) {
        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        }
    }
}

最终生成refprotocol为如下对象:

ProtocolFilterWrapper(ProtocolListenerWrapper(InjvmProtocol))

 

5 责任链模式

责任链模式将请求发送和接收解耦,让多个接收对象都有机会处理这个请求。这些接收对象串成一条链路并沿着这条链路传递这个请求,直到链路上某个接收对象能够处理它。我们介绍责任链模式两种应用场景和四种代码实现方式,最后介绍了DUBBO如何应用责任链构建过滤器链路。

 

5.1 应用场景:命中立即中断

实现一个关键词过滤功能。系统设置三个关键词过滤器,输入内容命中任何一个过滤器规则就返回校验不通过,链路立即中断无需继续进行。

(1) 实现方式一

public interface ContentFilter {
    public boolean filter(String content);
}

public class AaaContentFilter implements ContentFilter {
    private final static String KEY_CONTENT = "aaa";

    @Override
    public boolean filter(String content) {
        boolean isValid = Boolean.FALSE;
        if (StringUtils.isEmpty(content)) {
            return isValid;
        }
        isValid = !content.contains(KEY_CONTENT);
        return isValid;
    }
}

public class BbbContentFilter implements ContentFilter {
    private final static String KEY_CONTENT = "bbb";

    @Override
    public boolean filter(String content) {
        boolean isValid = Boolean.FALSE;
        if (StringUtils.isEmpty(content)) {
            return isValid;
        }
        isValid = !content.contains(KEY_CONTENT);
        return isValid;
    }
}

public class CccContentFilter implements ContentFilter {
    private final static String KEY_CONTENT = "ccc";

    @Override
    public boolean filter(String content) {
        boolean isValid = Boolean.FALSE;
        if (StringUtils.isEmpty(content)) {
            return isValid;
        }
        isValid = !content.contains(KEY_CONTENT);
        return isValid;
    }
}

具体过滤器已经完成,我们下面构造过滤器责任链路:

@Service
public class ContentFilterChain {
    private List<ContentFilter> filters = new ArrayList<ContentFilter>();

    @PostConstruct
    public void init() {
        ContentFilter aaaContentFilter = new AaaContentFilter();
        ContentFilter bbbContentFilter = new BbbContentFilter();
        ContentFilter cccContentFilter = new CccContentFilter();
        filters.add(aaaContentFilter);
        filters.add(bbbContentFilter);
        filters.add(cccContentFilter);
    }

    public void addFilter(ContentFilter filter) {
        filters.add(filter);
    }

    public boolean filter(String content) {
        if (CollectionUtils.isEmpty(filters)) {
            throw new RuntimeException("ContentFilterChain is empty");
        }
        for (ContentFilter filter : filters) {
            boolean isValid = filter.filter(content);
            if (!isValid) {
                System.out.println("校验不通过");
                return isValid;
            }
        }
        return Boolean.TRUE;
    }
}

public class Test {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
        ContentFilterChain chain = (ContentFilterChain) context.getBean("contentFilterChain");
        System.out.println(context);
        boolean result1 = chain.filter("ccc");
        boolean result2 = chain.filter("ddd");
        System.out.println("校验结果1=" + result1);
        System.out.println("校验结果2=" + result2);
    }
}

 

(2) 实现方式二

public abstract class FilterHandler {

    /** 下一个节点 **/
    protected FilterHandler successor = null;

    public void setSuccessor(FilterHandler successor) {
        this.successor = successor;
    }

    public final boolean filter(String content) {
        /** 执行自身方法 **/
        boolean isValid = doFilter(content);
        if (!isValid) {
            System.out.println("校验不通过");
            return isValid;
        }
        /** 执行下一个节点链路 **/
        if (successor != null && this != successor) {
            isValid = successor.filter(content);
        }
        return isValid;
    }
    /** 每个节点过滤方法 **/
    protected abstract boolean doFilter(String content);
}

public class AaaContentFilterHandler extends FilterHandler {
    private final static String KEY_CONTENT = "aaa";

    @Override
    protected boolean doFilter(String content) {
        boolean isValid = Boolean.FALSE;
        if (StringUtils.isEmpty(content)) {
            return isValid;
        }
        isValid = !content.contains(KEY_CONTENT);
        return isValid;
    }
}

// 省略其它过滤器代码

具体过滤器已经完成,我们下面构造过滤器责任链路:

@Service
public class FilterHandlerChain {
    private FilterHandler head = null;
    private FilterHandler tail = null;

    @PostConstruct
    public void init() {
        FilterHandler aaaHandler = new AaaContentFilterHandler();
        FilterHandler bbbHandler = new BbbContentFilterHandler();
        FilterHandler cccHandler = new CccContentFilterHandler();
        addHandler(aaaHandler);
        addHandler(bbbHandler);
        addHandler(cccHandler);
    }

    public void addHandler(FilterHandler handler) {
        if (head == null) {
            head = tail = handler;
        }
        /** 设置当前tail继任者 **/
        tail.setSuccessor(handler);

        /** 指针重新指向tail **/
        tail = handler;
    }

    public boolean filter(String content) {
        if (null == head) {
            throw new RuntimeException("FilterHandlerChain is empty");
        }
        /** head发起调用 **/
        return head.filter(content);
    }
}

public class Test {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
        FilterHandlerChain chain = (FilterHandlerChain) context.getBean("filterHandlerChain");
        System.out.println(context);
        boolean result1 = chain.filter("ccc");
        boolean result2 = chain.filter("ddd");
        System.out.println("校验结果1=" + result1);
        System.out.println("校验结果2=" + result2);
    }
}

 

5.2 应用场景:全链路执行

我们实现一个考题生成功能。在线考试系统根据不同年级生成不同考题。系统设置三个考题生成器,每个生成器都会执行,根据学生年级决定是否生成考题,无需生成则执行下一个生成器。

(1) 实现方式一

public interface QuestionGenerator {
    public Question generateQuestion(String gradeInfo);
}

public class AaaQuestionGenerator implements QuestionGenerator {

    @Override
    public Question generateQuestion(String gradeInfo) {
        if (!gradeInfo.equals("一年级")) {
            return null;
        }
        Question question = new Question();
        question.setId("aaa");
        question.setScore(10);
        return question;
    }
}

// 省略其它生成器代码

具体生成器已经编写完成,我们下面构造生成器责任链路:

@Service
public class QuestionChain {
    private List<QuestionGenerator> generators = new ArrayList<QuestionGenerator>();

    @PostConstruct
    public void init() {
        QuestionGenerator aaaQuestionGenerator = new AaaQuestionGenerator();
        QuestionGenerator bbbQuestionGenerator = new BbbQuestionGenerator();
        QuestionGenerator cccQuestionGenerator = new CccQuestionGenerator();
        generators.add(aaaQuestionGenerator);
        generators.add(bbbQuestionGenerator);
        generators.add(cccQuestionGenerator);
    }

    public List<Question> generate(String gradeInfo) {
        if (CollectionUtils.isEmpty(generators)) {
            throw new RuntimeException("QuestionChain is empty");
        }
        List<Question> questions = new ArrayList<Question>();
        for (QuestionGenerator generator : generators) {
            Question question = generator.generateQuestion(gradeInfo);
            if (null == question) {
                continue;
            }
            questions.add(question);
        }
        return questions;
    }
}

public class Test {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
        System.out.println(context);
        QuestionChain chain = (QuestionChain) context.getBean("questionChain");
        List<Question> questions = chain.generate("一年级");
        System.out.println(questions);
    }
}

 

(2) 实现方式二

public abstract class GenerateHandler {

    /** 下一个节点 **/
    protected GenerateHandler successor = null;

    public void setSuccessor(GenerateHandler successor) {
        this.successor = successor;
    }

    public final List<Question> generate(String gradeInfo) {
        List<Question> result = new ArrayList<Question>();

        /** 执行自身方法 **/
        Question question = doGenerate(gradeInfo);
        if (null != question) {
            result.add(question);
        }

        /** 执行下一个节点链路 **/
        if (successor != null && this != successor) {
            List<Question> successorQuestions = successor.generate(gradeInfo);
            if (null != successorQuestions) {
                result.addAll(successorQuestions);
            }
        }
        return result;
    }
    /** 每个节点生成方法 **/
    protected abstract Question doGenerate(String gradeInfo);
}

public class AaaGenerateHandler extends GenerateHandler {

    @Override
    protected Question doGenerate(String gradeInfo) {
        if (!gradeInfo.equals("一年级")) {
            return null;
        }
        Question question = new Question();
        question.setId("aaa");
        question.setScore(10);
        return question;
    }
}

// 省略其它生成器代码

具体生成器已经完成,我们下面构造生成器责任链路:

@Service
public class GenerateChain {
    private GenerateHandler head = null;
    private GenerateHandler tail = null;

    @PostConstruct
    public void init() {
        GenerateHandler aaaHandler = new AaaGenerateHandler();
        GenerateHandler bbbHandler = new BbbGenerateHandler();
        GenerateHandler cccHandler = new CccGenerateHandler();
        addHandler(aaaHandler);
        addHandler(bbbHandler);
        addHandler(cccHandler);
    }

    public void addHandler(GenerateHandler handler) {
        if (head == null) {
            head = tail = handler;
        }
        /** 设置当前tail继任者 **/
        tail.setSuccessor(handler);

        /** 指针重新指向tail **/
        tail = handler;
    }

    public List<Question> generate(String gradeInfo) {
        if (null == head) {
            throw new RuntimeException("GenerateChain is empty");
        }
        /** head发起调用 **/
        return head.generate(gradeInfo);
    }
}

public class Test {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
        GenerateChain chain = (GenerateChain) context.getBean("generateChain");
        System.out.println(context);
        List<Question> result = chain.generate("一年级");
        System.out.println(result);
    }
}

 

5.3 DUBBO源码应用

生产者和消费者最终执行对象都是过滤器链路最后一个节点,整个链路包含多个过滤器进行业务处理。我们看看生产者和消费者最终生成的过滤器链路。

生产者过滤器链路
EchoFilter > ClassloaderFilter > GenericFilter > ContextFilter > TraceFilter > TimeoutFilter > MonitorFilter > ExceptionFilter > AbstractProxyInvoker

消费者过滤器链路
ConsumerContextFilter > FutureFilter > MonitorFilter > DubboInvoker

ProtocolFilterWrapper作为链路生成核心通过匿名类方式构建过滤器链路,我们以消费者构建过滤器链路为例:

public class ProtocolFilterWrapper implements Protocol {
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {

        // invoker = DubboInvoker
        Invoker<T> last = invoker;

        // 查询符合条件过滤器列表
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), keygroup);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;

                // 构造一个简化Invoker
                last = new Invoker<T>() {
                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        // 构造过滤器链路
                        Result result = filter.invoke(next, invocation);
                        if (result instanceof AsyncRpcResult) {
                            AsyncRpcResult asyncResult = (AsyncRpcResult) result;
                            asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
                            return asyncResult;
                        } else {
                            return filter.onResponse(result, invoker, invocation);
                        }
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // RegistryProtocol不构造过滤器链路
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        Invoker<T> invoker = protocol.refer(type, url);
        return buildInvokerChain(invoker, Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }
}

 

6 保护性暂停模式

在多线程编程实践中我们肯定会面临线程间数据交互的问题。在处理这类问题时需要使用一些设计模式,从而保证程序的正确性和健壮性。

保护性暂停设计模式就是解决多线程间数据交互问题的一种模式。本文先从基础案例介绍保护性暂停基本概念和实践,再由浅入深,最终分析DUBBO源码中保护性暂停设计模式使用场景。

 

6.1 保护性暂停实例

我们设想这样一种场景:线程A生产数据,线程B读取数据这个数据。

但是有一种情况:线程B准备读取数据时,此时线程A还没有生产出数据。

在这种情况下线程B不能一直空转,也不能立即退出,线程B要等到生产数据完成并拿到数据之后才退出。

那么在数据没有生产出这段时间,线程B需要执行一种等待机制,这样可以达到对系统保护目的,这就是保护性暂停。

保护性暂停有多种实现方式,本文我们用synchronized/wait/notify的方式实现。

class Resource {
    private MyData data;
    private Object lock = new Object();

    public MyData getData(int timeOut) {
        synchronized (lock) {
            // 运行时长
            long timePassed = 0;
            // 开始时间
            long begin = System.currentTimeMillis();
            // 如果结果为空
            while (data == null) {
                try {
                    // 如果运行时长大于超时时间退出循环
                    if (timePassed > timeOut) {
                        break;
                    }
                    // 如果运行时长小于超时时间表示虚假唤醒 -> 只需再等待时间差值
                    long waitTime = timeOut - timePassed;

                    // 等待时间差值
                    lock.wait(waitTime);

                    // 结果不为空直接返回
                    if (data != null) {
                        break;
                    }
                    // 被唤醒后计算运行时长
                    timePassed = System.currentTimeMillis() - begin;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (data == null) {
                throw new RuntimeException("超时未获取到结果");
            }
            return data;
        }
    }

    public void sendData(MyData data) {
        synchronized (lock) {
            this.data = data;
            lock.notifyAll();
        }
    }
}

/**
 * 保护性暂停实例
 */
public class ProtectDesignTest {

    public static void main(String[] args) {
        Resource resource = new Resource();
        new Thread(() -> {
            try {
                MyData data = new MyData("hello");
                System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
                // 模拟发送耗时
                TimeUnit.SECONDS.sleep(3);
                resource.sendData(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            MyData data = resource.getData(1000);
            System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
        }, "t2").start();
    }
}

 

6.2 加一个编号

现在再来设想一个场景:现在有三个生产数据的线程1、2、3,三个获取数据的线程4、5、6,我们希望每个获取数据线程都只拿到其中一个生产线程的数据,不能多拿也不能少拿。

这里引入一个Futures模型,这个模型为每个资源进行编号并存储在容器中,例如线程1生产的数据被拿走则从容器中删除,一直到容器为空结束。

@Getter
@Setter
public class MyNewData implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final AtomicLong ID = new AtomicLong(0);
    private Long id;
    private String message;

    public MyNewData(String message) {
        this.id = newId();
        this.message = message;
    }

    /**
     * 自增到最大值会回到最小值(负值可以作为识别ID)
     */
    private static long newId() {
        return ID.getAndIncrement();
    }

    public Long getId() {
        return this.id;
    }
}

class MyResource {
    private MyNewData data;
    private Object lock = new Object();

    public MyNewData getData(int timeOut) {
        synchronized (lock) {
            long timePassed = 0;
            long begin = System.currentTimeMillis();
            while (data == null) {
                try {
                    if (timePassed > timeOut) {
                        break;
                    }
                    long waitTime = timeOut - timePassed;
                    lock.wait(waitTime);
                    if (data != null) {
                        break;
                    }
                    timePassed = System.currentTimeMillis() - begin;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (data == null) {
                throw new RuntimeException("超时未获取到结果");
            }
            return data;
        }
    }

    public void sendData(MyNewData data) {
        synchronized (lock) {
            this.data = data;
            lock.notifyAll();
        }
    }
}

class MyFutures {
    private static final Map<Long, MyResource> FUTURES = new ConcurrentHashMap<>();

    public static MyResource newResource(MyNewData data) {
        final MyResource future = new MyResource();
        FUTURES.put(data.getId(), future);
        return future;
    }

    public static MyResource getResource(Long id) {
        return FUTURES.remove(id);
    }

    public static Set<Long> getIds() {
        return FUTURES.keySet();
    }
}


/**
 * 保护性暂停实例
 */
public class ProtectDesignTest {

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 3; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    MyNewData data = new MyNewData("hello_" + index);
                    MyResource resource = MyFutures.newResource(data);
                    // 模拟发送耗时
                    TimeUnit.SECONDS.sleep(1);
                    resource.sendData(data);
                    System.out.println("生产数据data=" + data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }).start();
        }
        TimeUnit.SECONDS.sleep(1);

        for (Long i : MyFutures.getIds()) {
            final long index = i;
            new Thread(() -> {
                MyResource resource = MyFutures.getResource(index);
                int timeOut = 3000;
                System.out.println("接收数据data=" + resource.getData(timeOut));
            }).start();
        }
    }
}

 

6.3 DUBBO源码应用

我们顺着这一个链路跟踪代码:消费者发送请求 > 提供者接收请求并执行,并且将运行结果发送给消费者 > 消费者接收结果。

(1) 消费者发送请求

消费者发送的数据包含请求ID,并且将关系维护进FUTURES容器

final class HeaderExchangeChannel implements ExchangeChannel {

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null"Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
}

class DefaultFuture implements ResponseFuture {

    // FUTURES容器
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

    private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        // 请求ID
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
}

(2) 提供者接收请求并执行,并且将运行结果发送给消费者

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        // response与请求ID对应
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();
            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
            channel.send(res);
            return;
        }
        // message = RpcInvocation包含方法名、参数名、参数值等
        Object msg = req.getData();
        try {

            // DubboProtocol.reply执行实际业务方法
            CompletableFuture<Object> future = handler.reply(channel, msg);

            // 如果请求已经完成则发送结果
            if (future.isDone()) {
                res.setStatus(Response.OK);
                res.setResult(future.get());
                channel.send(res);
                return;
            }
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }
}

(3) 消费者接收结果

以下DUBBO源码很好体现了保护性暂停这个设计模式,说明参看注释

class DefaultFuture implements ResponseFuture {
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();

    public static void received(Channel channel, Response response) {
        try {
            // 取出对应的请求对象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response " + response
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                               + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }


    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {

                    // 放弃锁并使当前线程阻塞,直到发出信号中断它或者达到超时时间
                    done.await(timeout, TimeUnit.MILLISECONDS);

                    // 阻塞结束后再判断是否完成
                    if (isDone()) {
                        break;
                    }

                    // 阻塞结束后判断是否超时
                    if(System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // response对象仍然为空则抛出超时异常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            // 接收到服务器响应赋值response
            response = res;
            if (done != null) {
                // 唤醒get方法中处于等待的代码块
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}

 

7 双重检查锁模式

单例设计模式可以保证在整个应用某个类只能存在一个对象实例,并且这个类只提供一个取得其对象实例方法,通常这个对象创建和销毁比较消耗资源,例如数据库连接对象等等。我们分析一个双重检查锁实现的单例模式实例。

public class MyDCLConnection {
    private static volatile MyDCLConnection myConnection = null;

    private MyDCLConnection() {
        System.out.println(Thread.currentThread().getName() + " -> init connection");
    }

    public static MyDCLConnection getConnection() {
        if (null == myConnection) {
            synchronized (MyDCLConnection.class{
                if (null == myConnection) {
                    myConnection = new MyDCLConnection();
                }
            }
        }
        return myConnection;
    }
}

在DUBBO服务本地暴露时使用了双重检查锁模式判断exporter是否已经存在避免重复创建:

public class RegistryProtocol implements Protocol {

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
                    final Exporter<T> strongExporter = (Exporter<T>) protocol.export(invokerDelegete);
                    exporter = new ExporterChangeableWrapper<T>(strongExporter, originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }
}

 

8 文章总结

本文我们结合DUBBO源码分析了模板方法模式、动态代理模式、策略模式、装饰器模式、责任链模式、保护性暂停模式、双重检查锁模式,我认为在阅读源码时要学习其中优秀的设计模式和代码实例,这样有助于提高代码水平,希望本文对大家有所帮助。

欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我个人微信「java_front」一起交流学习

展开阅读全文
加载中

作者的其它热门文章

打赏
0
8 收藏
分享
打赏
1 评论
8 收藏
0
分享
返回顶部
顶部