java高并发设计(十三)-- netty通信之服务端

原创
2017/03/20 21:41
阅读数 485

项目的整体功能目录如下:

服务端的核心代码实现如下:

1,抽象父类,供服务端和客户端公共使用

public abstract class RpcServerBase {

    //当前系统的可用cpu线程
    public static final int PROCESS = Runtime.getRuntime().availableProcessors() * 2;

    //服务状态一般只是做状态改变的标示,使用volatile最合适确保安全
    protected volatile ChannelState state = ChannelState.UNINIT;

    //系统默认的序列化
    protected RpcSerializeProtocol rpcSerializeProtocol = RpcSerializeProtocol.HESSIONSERIALIZE;

    public void setRpcSerializeProtocol(RpcSerializeProtocol protocol) {
        this.rpcSerializeProtocol = protocol;
    }
}

2,服务端启动程序的核心代码

public abstract class RpcServerStarter extends RpcServerBase {

    private String inetAddress;

    private String[] addrs;

    //链接的服务端通道
    private ChannelFuture future = null;

    //根据设置的url来实现不同的序列化和服务器暴露端口
    public RpcServerStarter(String inetAddress) {
        this.inetAddress = inetAddress;
        setRpcSerializeProtocol(inetAddress);
    }

    private void setRpcSerializeProtocol(String inetAddress) {
        this.addrs = inetAddress.split(FinalStaticUtil.DELIMITER);
        setRpcSerializeProtocol(ProtocolSelecter.getProtocol(addrs[0]));
    }


    /**
     * netty的server端启动的核心实现
     * 下面是netty的服务端的标准实现
     *
     */
    public synchronized void startServer() {
        //

        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup(PROCESS, new RpcThreadFactory(FinalStaticUtil.RPC_NETTY_FACTORY));

        try {
            //netty的实现
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new PullChannelHandler(rpcSerializeProtocol, getRpcBeanMap()));

            //服务状态为初始化
            state = ChannelState.INIT;

            //地址的参数控制
            if (addrs.length == 3) {
                String host = addrs[1];
                int port = Integer.valueOf(addrs[2]);
                //绑定服务器
                future = bootstrap.bind(host, port).sync();

                //服务状态未启动
                state = ChannelState.ALIVE;

                future.channel().closeFuture().sync();
            }

        } catch (InterruptedException e) {
            state = ChannelState.UNALIVE;
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
            boss.shutdownGracefully();

            state = ChannelState.CLOSED;
        }
    }

    /**
     * 该类是供子类来实现的,获得rpc服务调用的实际实现类
     * key是该类对应实现接口的全路径
     * value是该实现类的初始化的bean
     * @return
     */
    public abstract ConcurrentHashMap<String, Object> getRpcBeanMap();
}

3,channelHandler的核心实现,用户自定义的所有功能和业务处理都在此

public class PullChannelHandler extends ChannelInitializer<SocketChannel> {

    private RpcSerializeProtocol protocol;
    private ConcurrentHashMap<String, Object> rpcBeanMap;

    public PullChannelHandler(RpcSerializeProtocol protocol, ConcurrentHashMap<String, Object> rpcBeanMap) {
        this.protocol = protocol;
        this.rpcBeanMap = rpcBeanMap;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        PullPipelineFrame frame = new PullPipelineFrame();
        //设置通道句柄的核心操作方法
        frame.initPipeline(protocol, socketChannel.pipeline(), rpcBeanMap);
    }
}
public class PullPipelineFrame {

    public PullPipelineFrame () {

    }

    /**
     *
     * @param protocol
     * @param pipeline
     * @param rpcBeanMap
     */
    public void initPipeline(RpcSerializeProtocol protocol, ChannelPipeline pipeline, ConcurrentHashMap<String, Object> rpcBeanMap) {
        switch (protocol) {
            case HESSIONSERIALIZE:
                HessionMessageCodec codec = new HessionMessageCodec();
                pipeline.addLast(new HessionMessageDecoder(codec));
                pipeline.addLast(new HessionMessageEncoder(codec));
                pipeline.addLast(new PullMessageHander(rpcBeanMap));
                break;
            case KRYOSERIALIZE:
                break;
        }
    }
}

4,自定义的消息发送及消息接受处理的rpc结果

public class PullMessageHander extends ChannelInboundHandlerAdapter {

    private ConcurrentHashMap<String, Object> rpcBeanMap;

    public PullMessageHander (ConcurrentHashMap<String, Object> rpcBeanMap) {
        this.rpcBeanMap = rpcBeanMap;
    }

    /**
     * 读取客户端传递的数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcRequest rpcRequest = (RpcRequest) msg;

        //初始化返回结果
        RpcResponse rpcResponse = new RpcResponse();

        //将数据设计成线程的形式,保证操作的并发处理
        PullMessageHandleTask handleTask = new PullMessageHandleTask(rpcRequest, rpcResponse, rpcBeanMap);

        //处理客户端传递的数据,并且将处理结果返回给客户端
        PullExecuter.submit(handleTask, rpcRequest, rpcResponse, ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}
public class PullMessageHandleTask implements Callable<Boolean> {

    private RpcRequest rpcRequest;
    private RpcResponse rpcResponse;
    private ConcurrentHashMap<String, Object> rpcBeanMap;

    public PullMessageHandleTask (RpcRequest rpcRequest, RpcResponse rpcResponse, ConcurrentHashMap<String, Object> rpcBeanMap) {
        this.rpcRequest = rpcRequest;
        this.rpcResponse = rpcResponse;
        this.rpcBeanMap = rpcBeanMap;
    }


    /**
     * 线程的最终执行方法,实际操作方法
     * @return
     * @throws Exception
     */
    @Override
    public Boolean call() throws Exception {
        rpcResponse.setRequestId(rpcRequest.getRequestId());
        try {
            //根据参数采用反射机制实现方法调用
            Object result = invoke(rpcRequest);
            rpcResponse.setResultVal(result);
            rpcResponse.setStatus(Status.SUCCESS);
            return true;
        } catch (Exception e) {
            rpcResponse.setMessage(e.getMessage());
            rpcResponse.setStatus(Status.FAILE);
            return false;
        }
    }

    /**
     * 反射的实际操作,现在预留真实的实现类的结果
     * @param rpcRequest
     * @return
     * @throws Exception
     */
    private Object invoke(RpcRequest rpcRequest) throws Exception {
        //客户端传递过来的接口名称
        String inter = rpcRequest.getClassName();
        if (!rpcBeanMap.containsKey(inter)) {
            throw new RpcBeanException(inter);
        }
        //实现从初始化集合中获得该接口的实现类的实体
        Object obj = rpcBeanMap.get(inter);
        String methodName = rpcRequest.getMethodName();
        Object[] params = rpcRequest.getParametes();
        return MethodUtils.invokeExactMethod(obj, methodName, params, rpcRequest.getParameterTypes());
    }
}
public class PullExecuter {

    //定义线程池的处理框架
    private static ListeningExecutorService threadPool;

    public static void submit(PullMessageHandleTask handleTask, RpcRequest rpcRequest,
                              RpcResponse rpcResponse, ChannelHandlerContext ctx) {
        //线程池的初始化操作
        if (null == threadPool) {
            synchronized (PullExecuter.class) {
                if (null == threadPool) {
                    threadPool = MoreExecutors.listeningDecorator(RpcExecuter.getExecuter(10, 50, -1));
                }
            }
        }

        //采用guava的多线程处理框架
        ListenableFuture<Boolean> future = threadPool.submit(handleTask);
        Futures.addCallback(future, new FutureCallback<Boolean>() {
            //操作成功后的动作
            @Override
            public void onSuccess(Boolean result) {
                //处理成功后将结果反写给客户端
                ctx.writeAndFlush(rpcResponse).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        // TODO log
                    }
                });
            }

            @Override
            public void onFailure(Throwable t) {

            }
        }, threadPool);
    }

}

下面对服务端的核心实现做说明:

1,创建ServerBootstrap实例,该类是netty的服务端启动辅助类,提供了一系列的设置服务端启动相关的参数,netty进行了封装和抽象,避免和底层nio的api进行交互,降低开发难度。

2,设置并绑定Reactor线程池,netty的reactor线程池是EventLoopGroup,它实际上是EventLoop的数组,服务端的两个线程池,一个是主要接受客户端的请求,一个是处理io操作,如果该线程池只有一个,那么它既是接受客户端请求的同时要处理io操作。

3,设定channel,作为nio的服务端,我们要创建ServerSocketChannel,netty对原生nio的封装,对应的实现就是我们使用的NioServerSocketChannel

4,然后就是根据实际情况设定相应的tcp参数,Option,作为服务端,主要是设置tcp的backlog参数,backlog指定了内核为此套接口排队的最大连接数,内核维护这两个队列:未链接队列和已连接队列。backog的值是两个队列的和,netty默认的是100

5,链路建立的时候创建并初始化ChannelPipeline,该类并不是nio服务端必须的,它本质上是一个负责处理网络事件的职责链,负责管理和处理channelHandler

6,初始化channelpipeline完成后,添加并设置channelHandler,该handler中我们就是最常用并且最核心的功能handler,编码解码,消息处理句柄等一系列句柄链。我们编写的内容主要在这里。

7,然后就是绑定并监听端口,开始准备接受客户端的请求。

8,selector轮询,由reactor线程NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合

9,轮询到准备就绪的channel之后,就由reactor线程nioeventloop执行channelpipeline的相关方法,最终调度并执行channelhandler

10,执行netty的channelhandler和用户自定义的channelhandler,channelpipeline根据网络事件的类型,调度并执行channelhandler。收发消息的核心handler。

展开阅读全文
打赏
1
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
1
分享
返回顶部
顶部