文档章节

java高并发设计(十三)-- netty通信之服务端

wangshuaixin
 wangshuaixin
发布于 2017/03/20 21:41
字数 1522
阅读 137
收藏 3

项目的整体功能目录如下:

服务端的核心代码实现如下:

1,抽象父类,供服务端和客户端公共使用

public abstract class RpcServerBase {

    //当前系统的可用cpu线程
    public static final int PROCESS = Runtime.getRuntime().availableProcessors() * 2;

    //服务状态一般只是做状态改变的标示,使用volatile最合适确保安全
    protected volatile ChannelState state = ChannelState.UNINIT;

    //系统默认的序列化
    protected RpcSerializeProtocol rpcSerializeProtocol = RpcSerializeProtocol.HESSIONSERIALIZE;

    public void setRpcSerializeProtocol(RpcSerializeProtocol protocol) {
        this.rpcSerializeProtocol = protocol;
    }
}

2,服务端启动程序的核心代码

public abstract class RpcServerStarter extends RpcServerBase {

    private String inetAddress;

    private String[] addrs;

    //链接的服务端通道
    private ChannelFuture future = null;

    //根据设置的url来实现不同的序列化和服务器暴露端口
    public RpcServerStarter(String inetAddress) {
        this.inetAddress = inetAddress;
        setRpcSerializeProtocol(inetAddress);
    }

    private void setRpcSerializeProtocol(String inetAddress) {
        this.addrs = inetAddress.split(FinalStaticUtil.DELIMITER);
        setRpcSerializeProtocol(ProtocolSelecter.getProtocol(addrs[0]));
    }


    /**
     * netty的server端启动的核心实现
     * 下面是netty的服务端的标准实现
     *
     */
    public synchronized void startServer() {
        //

        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup(PROCESS, new RpcThreadFactory(FinalStaticUtil.RPC_NETTY_FACTORY));

        try {
            //netty的实现
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new PullChannelHandler(rpcSerializeProtocol, getRpcBeanMap()));

            //服务状态为初始化
            state = ChannelState.INIT;

            //地址的参数控制
            if (addrs.length == 3) {
                String host = addrs[1];
                int port = Integer.valueOf(addrs[2]);
                //绑定服务器
                future = bootstrap.bind(host, port).sync();

                //服务状态未启动
                state = ChannelState.ALIVE;

                future.channel().closeFuture().sync();
            }

        } catch (InterruptedException e) {
            state = ChannelState.UNALIVE;
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
            boss.shutdownGracefully();

            state = ChannelState.CLOSED;
        }
    }

    /**
     * 该类是供子类来实现的,获得rpc服务调用的实际实现类
     * key是该类对应实现接口的全路径
     * value是该实现类的初始化的bean
     * @return
     */
    public abstract ConcurrentHashMap<String, Object> getRpcBeanMap();
}

3,channelHandler的核心实现,用户自定义的所有功能和业务处理都在此

public class PullChannelHandler extends ChannelInitializer<SocketChannel> {

    private RpcSerializeProtocol protocol;
    private ConcurrentHashMap<String, Object> rpcBeanMap;

    public PullChannelHandler(RpcSerializeProtocol protocol, ConcurrentHashMap<String, Object> rpcBeanMap) {
        this.protocol = protocol;
        this.rpcBeanMap = rpcBeanMap;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        PullPipelineFrame frame = new PullPipelineFrame();
        //设置通道句柄的核心操作方法
        frame.initPipeline(protocol, socketChannel.pipeline(), rpcBeanMap);
    }
}
public class PullPipelineFrame {

    public PullPipelineFrame () {

    }

    /**
     *
     * @param protocol
     * @param pipeline
     * @param rpcBeanMap
     */
    public void initPipeline(RpcSerializeProtocol protocol, ChannelPipeline pipeline, ConcurrentHashMap<String, Object> rpcBeanMap) {
        switch (protocol) {
            case HESSIONSERIALIZE:
                HessionMessageCodec codec = new HessionMessageCodec();
                pipeline.addLast(new HessionMessageDecoder(codec));
                pipeline.addLast(new HessionMessageEncoder(codec));
                pipeline.addLast(new PullMessageHander(rpcBeanMap));
                break;
            case KRYOSERIALIZE:
                break;
        }
    }
}

4,自定义的消息发送及消息接受处理的rpc结果

public class PullMessageHander extends ChannelInboundHandlerAdapter {

    private ConcurrentHashMap<String, Object> rpcBeanMap;

    public PullMessageHander (ConcurrentHashMap<String, Object> rpcBeanMap) {
        this.rpcBeanMap = rpcBeanMap;
    }

    /**
     * 读取客户端传递的数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcRequest rpcRequest = (RpcRequest) msg;

        //初始化返回结果
        RpcResponse rpcResponse = new RpcResponse();

        //将数据设计成线程的形式,保证操作的并发处理
        PullMessageHandleTask handleTask = new PullMessageHandleTask(rpcRequest, rpcResponse, rpcBeanMap);

        //处理客户端传递的数据,并且将处理结果返回给客户端
        PullExecuter.submit(handleTask, rpcRequest, rpcResponse, ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}
public class PullMessageHandleTask implements Callable<Boolean> {

    private RpcRequest rpcRequest;
    private RpcResponse rpcResponse;
    private ConcurrentHashMap<String, Object> rpcBeanMap;

    public PullMessageHandleTask (RpcRequest rpcRequest, RpcResponse rpcResponse, ConcurrentHashMap<String, Object> rpcBeanMap) {
        this.rpcRequest = rpcRequest;
        this.rpcResponse = rpcResponse;
        this.rpcBeanMap = rpcBeanMap;
    }


    /**
     * 线程的最终执行方法,实际操作方法
     * @return
     * @throws Exception
     */
    @Override
    public Boolean call() throws Exception {
        rpcResponse.setRequestId(rpcRequest.getRequestId());
        try {
            //根据参数采用反射机制实现方法调用
            Object result = invoke(rpcRequest);
            rpcResponse.setResultVal(result);
            rpcResponse.setStatus(Status.SUCCESS);
            return true;
        } catch (Exception e) {
            rpcResponse.setMessage(e.getMessage());
            rpcResponse.setStatus(Status.FAILE);
            return false;
        }
    }

    /**
     * 反射的实际操作,现在预留真实的实现类的结果
     * @param rpcRequest
     * @return
     * @throws Exception
     */
    private Object invoke(RpcRequest rpcRequest) throws Exception {
        //客户端传递过来的接口名称
        String inter = rpcRequest.getClassName();
        if (!rpcBeanMap.containsKey(inter)) {
            throw new RpcBeanException(inter);
        }
        //实现从初始化集合中获得该接口的实现类的实体
        Object obj = rpcBeanMap.get(inter);
        String methodName = rpcRequest.getMethodName();
        Object[] params = rpcRequest.getParametes();
        return MethodUtils.invokeExactMethod(obj, methodName, params, rpcRequest.getParameterTypes());
    }
}
public class PullExecuter {

    //定义线程池的处理框架
    private static ListeningExecutorService threadPool;

    public static void submit(PullMessageHandleTask handleTask, RpcRequest rpcRequest,
                              RpcResponse rpcResponse, ChannelHandlerContext ctx) {
        //线程池的初始化操作
        if (null == threadPool) {
            synchronized (PullExecuter.class) {
                if (null == threadPool) {
                    threadPool = MoreExecutors.listeningDecorator(RpcExecuter.getExecuter(10, 50, -1));
                }
            }
        }

        //采用guava的多线程处理框架
        ListenableFuture<Boolean> future = threadPool.submit(handleTask);
        Futures.addCallback(future, new FutureCallback<Boolean>() {
            //操作成功后的动作
            @Override
            public void onSuccess(Boolean result) {
                //处理成功后将结果反写给客户端
                ctx.writeAndFlush(rpcResponse).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        // TODO log
                    }
                });
            }

            @Override
            public void onFailure(Throwable t) {

            }
        }, threadPool);
    }

}

下面对服务端的核心实现做说明:

1,创建ServerBootstrap实例,该类是netty的服务端启动辅助类,提供了一系列的设置服务端启动相关的参数,netty进行了封装和抽象,避免和底层nio的api进行交互,降低开发难度。

2,设置并绑定Reactor线程池,netty的reactor线程池是EventLoopGroup,它实际上是EventLoop的数组,服务端的两个线程池,一个是主要接受客户端的请求,一个是处理io操作,如果该线程池只有一个,那么它既是接受客户端请求的同时要处理io操作。

3,设定channel,作为nio的服务端,我们要创建ServerSocketChannel,netty对原生nio的封装,对应的实现就是我们使用的NioServerSocketChannel

4,然后就是根据实际情况设定相应的tcp参数,Option,作为服务端,主要是设置tcp的backlog参数,backlog指定了内核为此套接口排队的最大连接数,内核维护这两个队列:未链接队列和已连接队列。backog的值是两个队列的和,netty默认的是100

5,链路建立的时候创建并初始化ChannelPipeline,该类并不是nio服务端必须的,它本质上是一个负责处理网络事件的职责链,负责管理和处理channelHandler

6,初始化channelpipeline完成后,添加并设置channelHandler,该handler中我们就是最常用并且最核心的功能handler,编码解码,消息处理句柄等一系列句柄链。我们编写的内容主要在这里。

7,然后就是绑定并监听端口,开始准备接受客户端的请求。

8,selector轮询,由reactor线程NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合

9,轮询到准备就绪的channel之后,就由reactor线程nioeventloop执行channelpipeline的相关方法,最终调度并执行channelhandler

10,执行netty的channelhandler和用户自定义的channelhandler,channelpipeline根据网络事件的类型,调度并执行channelhandler。收发消息的核心handler。

© 著作权归作者所有

共有 人打赏支持
wangshuaixin

wangshuaixin

粉丝 14
博文 31
码字总数 29115
作品 1
海淀
技术主管
私信 提问
Qzone 微信 Java高级——dubbo源码分析之远程通信 netty

Java高级——dubbo源码分析之远程通信 netty dubbo 底层通信选择了 netty 这个 nio 框架做为默认的网络通信框架并且通过自定义协议进行通信。dubbo 支持以下网络通信框架: Netty(默认) Min...

Java架构师那些事
2018/08/29
0
0
Netty系列之Netty高性能之道

背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用。相比于传统基于J...

宇智波带土
2014/06/17
0
1
Java 200+ 面试题补充② Netty 模块

让我们每天都能看到自己的进步。老王带你打造最全的 Java 面试清单,认真把一件事做到最好。 本文是前文《Java 最常见的 200+ 面试题》的第二个补充模块,第一模块为:《Java 200+ 面试题补充...

王磊的博客
03/08
0
0
Java异步NIO框架Netty实现高性能高并发

背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用。相比于传统基于J...

java知识分子
2018/09/28
0
0
架构师必备技能之Netty 高并发 UTS 项目实战

一、Netty Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 也就是说,Netty 是一个基...

A尚学堂Nancy老师
2018/08/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

rabbitmq安装教程

RabbitMQ有Windows与Linux版本的,这里先写Windows版本的安装。 以前安装软件总是在百度上找某某安装教程,结果能按照教程安装好的软件真的不多。想起先前以为大牛说的一句话,去官网按照官网...

em_aaron
今天
6
0
Android 贝塞尔曲线实践——波浪式运动

一、波浪效果如下 贝塞尔曲线自定义波浪效果的案例很多,同样方法也很简单,大多数和本案例一样使用二次贝塞尔曲线实现,同样还有一种是PathMeasure的方式,这里我们后续补充,先来看贝塞尔曲...

IamOkay
今天
3
0
Nmap之防火墙/IDS逃逸

选项 解释 -f 报文分段 --mtu 指定偏移大小 -D IP欺骗 -sI 原地址欺骗 --source-port 源端口欺骗 --data-length 指定发包长度 --randomize-hosts 目标主机随机排序 --spoof-mac Mac地址欺骗 ...

Frost729
今天
2
0
带你搭一个SpringBoot+SpringData JPA的环境

不知道大家对SpringBoot和Spring Data JPA了解多少,如果你已经学过Spring和Hibernate的话,那么SpringBoot和SpringData JPA可以分分钟上手的。 其实我在学完SpringBoot和SpringData JPA了之...

java菜分享
今天
7
0
Chocolatey 在Window搭建一个开发环境

在看了(利用 Chocolatey 快速在 Windows 下搭建一个开发环境)后,准备从零开始 一、准备工作 1、用管理员权限启动:powershell,执行错误请参考(PowerShell因为在此系统中禁止执行脚本的解...

近在咫尺远在天涯
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部