文档章节

java高并发设计(十四)-- netty通信之客户端

wangshuaixin
 wangshuaixin
发布于 2017/03/21 20:54
字数 1493
阅读 989
收藏 2

接着上面的服务端继续讲解客户端的开发

1,客户端的核心代码实现

public class RpcClientLoader extends RpcServerBase {

    //单例模式的设计
    private volatile static RpcClientLoader rpcClientLoader;

    //执行的线程池操作
    private static ListeningExecutorService threadPoolExecuter = MoreExecutors.listeningDecorator(RpcExecuter.getExecuter(10, 50, -1));

    private EventLoopGroup work = new NioEventLoopGroup(PROCESS);

    //发送消息的句柄
    private PushMessageHandler pushMessageHandler;

    //单例操作涉及到数据安全采用锁机制
    private ReentrantLock lock = new ReentrantLock();
    private Condition connect = lock.newCondition();
    private Condition push = lock.newCondition();

    /**
     * 单例构造函数
     */
    private RpcClientLoader() {
    }

    /**
     * 单例提供对外的访问变量
     * @return
     */
    public static RpcClientLoader getInstance() {
        return RpcClientLoaderHelp.rpcClientLoader;
    }

    /**
     * 内部类构造安全的单例模式
     */
    private static class RpcClientLoaderHelp {
        private static RpcClientLoader rpcClientLoader = new RpcClientLoader();
    }

    /**
     * 加载对应的ip和协议
     * @param inetAddress
     */
    public synchronized void init(String inetAddress) {
        String[] addrs = inetAddress.split(FinalStaticUtil.DELIMITER);
        if (addrs.length == 3) {
            //设置当前的通信协议
            setRpcSerializeProtocol(ProtocolSelecter.getProtocol(addrs[0]));
            String host = addrs[1];
            int port = Integer.valueOf(addrs[2]);

            //netty链接的时候需要该对象
            InetSocketAddress serviceAddr = new InetSocketAddress(host, port);

            //采用google提供的guava框架中的线程池操作类
            //PushInitTask类是主要设计netty客户端连接和相关netty句柄的操作实现类
            ListenableFuture<Boolean> future = threadPoolExecuter.submit(new PushInitTask(work, serviceAddr, rpcSerializeProtocol));

            //对连接结果进行控制,guava框架中的内容
            Futures.addCallback(future, new FutureCallback<Boolean>() {
                //连接成功后的处理
                //成功的核心处理就是把发送消息的句柄传递回来,供以后客户端发送消息使用
                @Override
                public void onSuccess(Boolean result) {
                    try {
                        lock.lock();

                        if (null == pushMessageHandler) {
                            push.await();
                        }

                        if (result && null != pushMessageHandler) {
                            connect.signalAll();
                        }

                    } catch (Exception e) {

                    } finally {
                        lock.unlock();
                    }
                }
                //连接失败后的处理
                @Override
                public void onFailure(Throwable t) {
                    // TODO

                }
            }, threadPoolExecuter);
        }
    }

    /**
     * 在netty的执行过程中设置相应的服务器状态
     * @param channelState
     */
    public void setChannelState(ChannelState channelState) {
        state = channelState;
    }

    /**
     *
     */
    public synchronized void close() {
        pushMessageHandler.close();
        threadPoolExecuter.shutdown();
        work.shutdownGracefully();
        //设置服务器状态
        state = ChannelState.CLOSED;
    }

    /**
     * 对外提供netty发送消息的句柄的访问,保证数据安全,并且保证该句柄必须存在
     * @return
     * @throws InterruptedException
     */
    public PushMessageHandler getPushMessageHandler() throws InterruptedException {
        try {
            lock.lock();
            //如果发送消息的句柄还没有设置则还没有连接上服务端
            if (null == pushMessageHandler) {
                connect.await();
            }
            return pushMessageHandler;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 设计netty的客户端消息发送句柄,该方法主要提供给netty链接成功后的操作
     * @param pushMessageHandler
     */
    public void setPushMessageHandler(PushMessageHandler pushMessageHandler) {
        try {
            lock.lock();
            this.pushMessageHandler = pushMessageHandler;
            //该方法是在client连接到server后的操作,所以是连接成功,通知可以发送消息了
            push.signalAll();
        } finally {
            lock.unlock();
        }

    }
}

2,netty客户端的实现

public class PushInitTask implements Callable<Boolean> {

    private EventLoopGroup eventLoopGroup;
    private InetSocketAddress address;
    private RpcSerializeProtocol protocol;

    public PushInitTask (EventLoopGroup eventLoopGroup, InetSocketAddress address, RpcSerializeProtocol protocol) {
        this.eventLoopGroup = eventLoopGroup;
        this.address = address;
        this.protocol = protocol;
    }

    /**
     * 下面是netty的标准实现
     * 采用长连接的机制
     * @return
     * @throws Exception
     */
    @Override
    public Boolean call() throws Exception {
        //执行对netty的客户端操作
        Bootstrap bootstrap = new Bootstrap();

        //下面的参数需要在后期的升级开发中进行配置的功能升级
        bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000);

        //设置通道流,根据序列化的机制采用不同的管道流
        bootstrap.handler(new PushChannelHander(protocol));

        //链接服务端
        ChannelFuture future = null;
        try {
            future = bootstrap.connect(address);
        } catch (Exception e) {
            return false;
        }

        //设置服务器为正常状态
        RpcClientLoader.getInstance().setChannelState(ChannelState.ALIVE);

        //将发送消息的句柄设置到相应的类中,给系统提示可以进行消息发送
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                //连接成功后的消息发送句柄
                RpcClientLoader.getInstance().setPushMessageHandler(
                        channelFuture.channel().pipeline().get(PushMessageHandler.class));
            }
        });
        return true;
    }
}

3,netty客户端的句柄通道,客户端发送消息及接受反馈的操作主要体现在下面的配置

public class PushChannelHander extends ChannelInitializer<SocketChannel> {

    private RpcSerializeProtocol protocol;

    public PushChannelHander (RpcSerializeProtocol protocol) {
        this.protocol = protocol;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //获得netty的通道,并对该通道设置相应的句柄
        ChannelPipeline pipeline = socketChannel.pipeline();
        //设置句柄的具体实现方法
        PushPipelineFrame.initPipeline(pipeline, protocol);
    }
}
public class PushPipelineFrame {

    /**
     * 设置相应的编码解码操作
     *
     * @param pipeline
     * @param protocol
     */
    public static void initPipeline(ChannelPipeline pipeline, RpcSerializeProtocol protocol) {
        switch (protocol) {
            case HESSIONSERIALIZE:
                HessionMessageCodec codec = new HessionMessageCodec();
                pipeline.addLast(new HessionMessageDecoder(codec));
                pipeline.addLast(new HessionMessageEncoder(codec));
                pipeline.addLast(new PushMessageHandler());
                break;
            case KRYOSERIALIZE:
                break;
        }
    }
}

4,消息的实际操作

public class PushMessageHandler extends ChannelInboundHandlerAdapter {

    private ConcurrentHashMap<String, PushCallback> callbacks = new ConcurrentHashMap<String, PushCallback>();

    //执行的数据通道,需要由通道进行数据的传输
    private volatile Channel channel;


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        this.channel = ctx.channel();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

    /**
     * 服务器端返回的数据结果
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcResponse rpcResponse = (RpcResponse) msg;
        String requestId = rpcResponse.getRequestId();
        if (callbacks.containsKey(requestId)) {
            PushCallback callback = callbacks.get(requestId);
            //将结果放置在客户端的回调函数中
            callback.doEnd(rpcResponse);
            callbacks.remove(requestId);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //super.exceptionCaught(ctx, cause);
        ctx.close();
    }

    /**
     * 客户端执行数据发送的真是操作,可以通过netty来实现
     * @param rpcRequest
     * @return
     */
    public PushCallback pushMessage(RpcRequest rpcRequest) {
        PushCallback callback = new PushCallback(rpcRequest);
        callbacks.put(rpcRequest.getRequestId(), callback);
        //数据写入通道并刷新到服务端
        channel.writeAndFlush(rpcRequest);
        return callback;
    }

    /**
     * 客户端关闭通道的处理
     * 仅仅在客户端程序停止时调用
     */
    public void close() {
        channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

}

根据netty的特点,客户端的整体流程如下:

1,用户根据服务器的特点启动辅助类,保证服务是可以正常运行,并且调用netty的创建

2,用户线程创建Bootstrap,通过api设置netty客户端的相关参数

3,创建客户端的Reactor的线程组,该线程组主要处理客户端连接,io读写的NioEventLoopGroup,可以指定线程的数量,默认是cpu内核的2倍

4,通过Bootstrap的channelFactory和用户指定的channel类型,用作客户端处理的NioSocketChannel,区别于服务端的NioServerSocketChannel

5,创建ChannelHandlerPipeline,用户调度和执行网络事件,就是我们增加的handler功能

6,异步发起tcp连接,判断是否连接成功,如果注册成功,则通过继续用户数据的接受和消息的发送,如果没有连接成功,则等待链接结果

7,注册对应的网络监听状态到多路复用器,并且轮询个channel,处理连接的结果

8,连接成功,设置future结果,发送连接成功事件,出发channelpipeline执行

9,有channelpipeline调度执行系统和用户的channelhandler,执行自定义的业务逻辑

© 著作权归作者所有

wangshuaixin

wangshuaixin

粉丝 20
博文 52
码字总数 61196
作品 1
海淀
架构师
私信 提问
Qzone 微信 Java高级——dubbo源码分析之远程通信 netty

Java高级——dubbo源码分析之远程通信 netty dubbo 底层通信选择了 netty 这个 nio 框架做为默认的网络通信框架并且通过自定义协议进行通信。dubbo 支持以下网络通信框架: Netty(默认) Min...

Java架构师那些事
2018/08/29
0
0
Netty系列之Netty高性能之道

背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用。相比于传统基于J...

宇智波带土
2014/06/17
622
1
Java异步NIO框架Netty实现高性能高并发

背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用。相比于传统基于J...

java知识分子
2018/09/28
43
0
深入了解 Java-Netty高性能高并发理解

一丶 Netty基础入门 Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,...

架构师springboot
2018/10/31
628
0
如何使用Netty开发实现高性能的RPC服务器

RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的...

vshcxl
2017/10/20
232
0

没有更多内容

加载失败,请刷新页面

加载更多

一起来学Java8(三)——方法引用

在一起来学Java8(一)——函数式编程中有一个简单的函数式编程的例子: import java.util.function.Consumer;class Person { public static void sayHello(String name) { S...

猿敲月下码
22分钟前
11
0
读书笔记:深入理解ES6(十一)

第十一章 Promise与异步编程   Promise可以实现其他语言中类似Future和Deferred一样的功能,是另一种异步编程的选择,它既可以像事件和回调函数一样指定稍后执行的代码,也可以明确指示代码...

张森ZS
45分钟前
19
0
面试官,Java8 JVM内存结构变了,永久代到元空间

在文章《JVM之内存结构详解》中我们描述了Java7以前的JVM内存结构,但在Java8和以后版本中JVM的内存结构慢慢发生了变化。作为面试官如果你还不知道,那么面试过程中是不是有些露怯?作为面试...

程序新视界
53分钟前
28
0
Elasticsearch 实战(一) - 简介

官腔 Elasticsearch,分布式,高性能,高可用,可伸缩的搜索和分析系统 基本等于没说,咱们慢慢看 1 概述 百度:我们比如说想找寻任何的信息的时候,就会上百度去搜索一下,比如说找一部自己喜...

JavaEdge
58分钟前
23
0
【jQuery基础学习】11 jQuery性能简单优化

本文转载于:专业的前端网站➦【jQuery基础学习】11 jQuery性能简单优化 关于性能优化 合适的选择器 $("#id")会直接调用底层方法,所以这是最快的。如果这样不能直接找到,也可以用find方法继...

前端老手
今天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部