文档章节

Netty实现代理模式的通信

秋风醉了
 秋风醉了
发布于 2014/06/24 16:23
字数 2374
阅读 5048
收藏 21

该demo是从netty提供的demo修改并添加注释

按我的理解简单的说一下netty中代理通信的编程模型。

代理就是通过一个中间服务器去目标服务器请求所要的信息,然后通过代理服务器返回信息给客户端。

在netty中,netty客户端请求代理服务器,连接到代理服务器时->inbound,代理服务器马上去连接目标服务器->outbound,当连接目标服务器成功时,这是开始读取客户端给代理服务器的数据,然后使用outbound写入目标服务器,目标服务器返回数据,代理服务器读取,并使用inbound把数据返回客户端。

 

客户端程序

package proxy4;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:32
 * To change this template use File | Settings | File Templates.
 */
public class Client {

    public void connect(String host, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                            new ObjectEncoder(),
                            new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                            new ClientHandler()
                    );
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        new Client().connect("127.0.0.1", 12359);
    }
}

 

package proxy4;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("Client to the proxy server, said: I was a client, give my regards to the target server!");
        ChannelFuture f = ctx.writeAndFlush("I'm client, give my regards to the target server!");
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                System.out.println("===========write message success==========");
            }
        });
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String mess = (String) msg;
        System.out.println("Receiving a message from the proxy server to:" + mess);
        ctx.close();
    }
}

 

目标服务器端程序

package proxy4;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class Target {

    private String targetHost;
    private int targetPort;

    public Target(String targetHost, int targetPort) {
        this.targetHost = targetHost;
        this.targetPort = targetPort;
    }

    public void run() throws Exception {
        System.err.println("Target host:" + targetHost + " targetPort:" + targetPort);

        // Configure the bootstrap.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            // 注册handler
                            ch.pipeline().addLast(
                                    new ObjectEncoder(),
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new TargetHandler()
                            );
                        }
                    })
                    .bind(targetPort).sync().channel().closeFuture().sync();
            //监听本地的一个端口,当有客户端请求时,然后向目标服务器发送请求,获取消息,然后发送给客户端
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new Target("127.0.0.1", 12358).run();
    }
}

 

package proxy4;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:34
 * To change this template use File | Settings | File Templates.
 */
public class TargetHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>ACTIVE>>>>>>>>");
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("proxy said:" + msg.toString());
        // 向代理服务器发送消息
        System.out.println("After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.");
        String response = "I was the target server, I thank you for the client.";
        ctx.write(response);
        ctx.flush();
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>IN-ACTIVE>>>>>>>>");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

    /**
     * Closes the specified channel after all queued write requests are flushed.
     */
    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

 

代理服务器端程序

package proxy4;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class Proxy {

    private final int localPort;
    private final String remoteHost;
    private final int remotePort;

    /**
     * @param localPort
     * @param remoteHost
     * @param remotePort
     */
    public Proxy(int localPort, String remoteHost, int remotePort) {
        this.localPort = localPort;
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    public void run() throws Exception {
        System.err.println(
                "Proxying *:" + localPort + " to " +
                        remoteHost + ':' + remotePort + " ...");

        // Configure the bootstrap.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            // 注册handler
                            ch.pipeline().addLast(
                                    new ObjectEncoder(),
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new ProxyFrontendHandler(remoteHost, remotePort)
                            );
                        }
                    }).childOption(ChannelOption.AUTO_READ, false);

            ChannelFuture f = b.bind(localPort).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new Proxy(12359, "127.0.0.1", 12358).run();
    }
}

 

package proxy4;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class ProxyFrontendHandler extends ChannelInboundHandlerAdapter {

    private final String remoteHost;
    private final int remotePort;

    //代理服务器和目标服务器之间的通道(从代理服务器出去所以是outbound过境)
    private volatile Channel outboundChannel;


    /**
     * remoteHost和remotePort表示目标服务器
     *
     * @param remoteHost
     * @param remotePort
     */
    public ProxyFrontendHandler(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    /**
     * 当客户端和代理服务器建立通道连接时,调用此方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        /**
         * 客户端和代理服务器的连接通道
         * 入境的通道
         */
        final Channel inboundChannel = ctx.channel();

        // Start the connection attempt.
        Bootstrap b = new Bootstrap();
        b.group(inboundChannel.eventLoop())
                .channel(ctx.channel().getClass())
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(
                                new ObjectEncoder(),
                                new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                new ProxyBackendHandler(inboundChannel)
                        );
                    }
                });

        /**
         * 连接目标服务器
         * ChannelFuture
         * Netty中的IO操作是异步的,
         * 包括bind、write、connect等操作会简单的返回一个ChannelFuture,调用者并不能立刻获得结果。
         * 当future对象刚刚创建时,处于非完成状态。可以通过isDone()方法来判断当前操作是否完成。通过isSuccess()判断已完成的当前操作是否成功,getCause()来获取已完成的当前操作失败的原因,isCancelled()来判断已完成的当前操作是否被取消。
         * 调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
         */
        ChannelFuture f = b.connect(remoteHost, remotePort);
        /**
         * 获得代理服务器和目标服务器之间的连接通道
         */
        outboundChannel = f.channel();

        /**
         * ChannelFutureListener
         * 监听ChannelFuture的状态
         * 注册监听函数来执行完成后的操作
         */
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    inboundChannel.read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    inboundChannel.close();
                }
            }
        });
    }


    /**
     * 在这里接收客户端的消息
     * 在客户端和代理服务器建立连接时,也获得了代理服务器和目标服务器的通道outbound,
     * 通过outbound写入消息到目标服务器
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from client message:" + msg.toString());

        System.out.println("After receiving the proxy server to the client's message to the target server, said: I am a proxy server, a client asked me to say hello to you.");
        String proxyToServer = "I am a proxy server, the client asked me to say hello to you.";
        if (outboundChannel.isActive()) {
            outboundChannel.writeAndFlush(proxyToServer).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
                    }
                }
            });
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (outboundChannel != null) {
            closeOnFlush(outboundChannel);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

    /**
     * Closes the specified channel after all queued write requests are flushed.
     */
    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

 

package proxy4;

import io.netty.channel.*;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class ProxyBackendHandler extends ChannelInboundHandlerAdapter {

    private final Channel inboundChannel;

    public ProxyBackendHandler(Channel inboundChannel) {
        this.inboundChannel = inboundChannel;
    }

    //当和目标服务器的通道连接建立时
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>>>ACTIVE>>>>>>>>>>");
    }


    /**
     * msg是从目标服务器返回的消息
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("Target server returns data:" + msg.toString());
        /**
         * 接收目标服务器发送来的数据并打印
         * 然后把数据写入代理服务器和客户端的通道里
         */
        //通过inboundChannel向客户端写入数据
        System.out.println("After the proxy server receives a response to the target server to the client to say: I am a proxy server, the target server to replace him, I say to you, thank you. ");
        String resDataToClient = "I am a proxy server, the target server to replace him, I say to you, thank you.";
        inboundChannel.writeAndFlush(resDataToClient).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    inboundChannel.close();
                } else {
                    future.channel().close();
                }
            }
        });
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>>>IN-ACTIVE>>>>>>>>>>");
        ProxyFrontendHandler.closeOnFlush(inboundChannel);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ProxyFrontendHandler.closeOnFlush(ctx.channel());
    }
}

 

运行结果:

目标服务器

Target host:127.0.0.1 targetPort:12358

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

代理服务器

Proxying *:12359 to 127.0.0.1:12358 ...

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

>>>>>>>>>>ACTIVE>>>>>>>>>>

from client message:I'm client, give my regards to the target server!

After receiving the proxy server to the client's message to the target server, said: I am a proxy server, a client asked me to say hello to you.

Target server returns data:I was the target server, I thank you for the client.

After the proxy server receives a response to the target server to the client to say: I am a proxy server, the target server to replace him, I say to you, thank you. 

>>>>>>>>>>IN-ACTIVE>>>>>>>>>>

客户端

Client to the proxy server, said: I was a client, give my regards to the target server!

===========write message success==========

Receiving a message from the proxy server to:I am a proxy server, the target server to replace him, I say to you, thank you.

 

Process finished with exit code 0

==========END==========

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 239
博文 573
码字总数 417084
作品 0
朝阳
程序员
私信 提问
加载中

评论(6)

正在修行的W_强
正在修行的W_强
写的不错,代码注解很清晰,感谢分享
秋风醉了
秋风醉了

引用来自“Gelopa”的评论

这是 官网版...

引用来自“鑫鑫哥哥呀”的评论

嗯 对着官网的demo 做的~~

引用来自“Gelopa”的评论

但是这个 没有让客户端 得到target的响应信息..你自己实习了吗
现在不好使吗~~什么实习?
金贞花
金贞花

引用来自“Gelopa”的评论

这是 官网版...

引用来自“鑫鑫哥哥呀”的评论

嗯 对着官网的demo 做的~~
但是这个 没有让客户端 得到target的响应信息..你自己实习了吗
秋风醉了
秋风醉了

引用来自“Gelopa”的评论

这是 官网版...
嗯 对着官网的demo 做的~~
金贞花
金贞花
这是 官网版...
金贞花
金贞花
不错啊。为什么 没人来看看呢
JavaScript常用设计模式

设计模式 设计模式是一种在长时间的经验与错误中总结出来可服用的解决方案。 设计模式主要分为3类: 创建型设计模式:专注于处理对象的创建 Constructor构造器模式,Factory工厂模式,Singl...

a独家记忆
07/13
0
0
JavaScript设计模式入坑

JavaScript设计模式入坑 介绍 设计模式编写易于维护的代码。 设计模式的开创者是一位土木工程师。Σ( ° △ °|||)︴,写代码就是盖房子。 模式 模式一种可以复用的解决方案。解决软件设计中...

小小小8021
10/18
0
0
37行代码构建无状态组件通信工具-让恼人的Vuex和Redux滚蛋吧!

状态管理的现状 很多前端开发者认为,和是用来解决组件间状态通信问题的,所以大部分人仅仅是用于达到状态共享的目的。但是通常是用于解决工程性问题的,用于分离业务与视图,让结构更加清晰...

YaHuiLiang(Ryou)
11/05
0
0
【设计模式】——命令模式

【情景展示】 在网络出现之前,我们的日常生活中,人与人的交际,一般使用语言沟通,然而社会在进步,随着城市化进程的加快,我们的生活发生了巨大的变化,如今的社会,人与人之间的交流方式...

u013043341
02/02
0
0
你需要了解的23种JavaScript设计模式

为什么要学习设计模式? 在许多访谈中,你可能会遇到很多面向对象编程中的接口,抽象类,代理和以及其他与设计模式相关的问题。 一旦了解了设计模式,它会让你轻松应对任何访谈,并可以在你的...

java高级架构牛人
06/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

配置防盗链、访问控制Directory、访问控制FilesMatch

11月19日任务 11.25 配置防盗链 11.26 访问控制Directory 11.27 访问控制FilesMatch 通过限制referer来实现防盗链的功能 配置文件增加如下内容 <Directory /data/wwwroot/www.123.com> SetE...

zgxlinux
13分钟前
2
0
docker搭建redis环境

docker 搭建redis单机,主从,哨兵,集群环境: https://github.com/grace-233/docker-config-file/tree/master/redis...

grace_233
21分钟前
2
0
机器学习:从入门到晋级

摘要: 什么是机器学习,为什么学习机器学习,如何学习机器学习,这篇文章都告诉给你。 目前,人工智能(AI)非常热门,许多人都想一窥究竟。如果你对人工智能有所了解,但对机器学习(Machi...

阿里云官方博客
26分钟前
3
0
FFT数据为空

项目地址:https://github.com/zhangjianli/FlutterMusicPlugin

whoisliang
32分钟前
1
0
python中.whl文件下载,pandas

安装pandas一直报错,最后找到了这个网址,成功安装,分享一下给大家,下载的时候需要注意下载对应的python版本和电脑系统的whl文件 https://www.lfd.uci.edu/~gohlke/pythonlibs/ 这上面有很...

上官夏洛特
46分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部