文档章节

Netty实现代理模式的通信

秋风醉了
 秋风醉了
发布于 2014/06/24 16:23
字数 2374
阅读 5091
收藏 21

该demo是从netty提供的demo修改并添加注释

按我的理解简单的说一下netty中代理通信的编程模型。

代理就是通过一个中间服务器去目标服务器请求所要的信息,然后通过代理服务器返回信息给客户端。

在netty中,netty客户端请求代理服务器,连接到代理服务器时->inbound,代理服务器马上去连接目标服务器->outbound,当连接目标服务器成功时,这是开始读取客户端给代理服务器的数据,然后使用outbound写入目标服务器,目标服务器返回数据,代理服务器读取,并使用inbound把数据返回客户端。

 

客户端程序

package proxy4;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:32
 * To change this template use File | Settings | File Templates.
 */
public class Client {

    public void connect(String host, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                            new ObjectEncoder(),
                            new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                            new ClientHandler()
                    );
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        new Client().connect("127.0.0.1", 12359);
    }
}

 

package proxy4;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("Client to the proxy server, said: I was a client, give my regards to the target server!");
        ChannelFuture f = ctx.writeAndFlush("I'm client, give my regards to the target server!");
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                System.out.println("===========write message success==========");
            }
        });
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String mess = (String) msg;
        System.out.println("Receiving a message from the proxy server to:" + mess);
        ctx.close();
    }
}

 

目标服务器端程序

package proxy4;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class Target {

    private String targetHost;
    private int targetPort;

    public Target(String targetHost, int targetPort) {
        this.targetHost = targetHost;
        this.targetPort = targetPort;
    }

    public void run() throws Exception {
        System.err.println("Target host:" + targetHost + " targetPort:" + targetPort);

        // Configure the bootstrap.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            // 注册handler
                            ch.pipeline().addLast(
                                    new ObjectEncoder(),
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new TargetHandler()
                            );
                        }
                    })
                    .bind(targetPort).sync().channel().closeFuture().sync();
            //监听本地的一个端口,当有客户端请求时,然后向目标服务器发送请求,获取消息,然后发送给客户端
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new Target("127.0.0.1", 12358).run();
    }
}

 

package proxy4;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:34
 * To change this template use File | Settings | File Templates.
 */
public class TargetHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>ACTIVE>>>>>>>>");
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("proxy said:" + msg.toString());
        // 向代理服务器发送消息
        System.out.println("After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.");
        String response = "I was the target server, I thank you for the client.";
        ctx.write(response);
        ctx.flush();
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>IN-ACTIVE>>>>>>>>");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

    /**
     * Closes the specified channel after all queued write requests are flushed.
     */
    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

 

代理服务器端程序

package proxy4;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class Proxy {

    private final int localPort;
    private final String remoteHost;
    private final int remotePort;

    /**
     * @param localPort
     * @param remoteHost
     * @param remotePort
     */
    public Proxy(int localPort, String remoteHost, int remotePort) {
        this.localPort = localPort;
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    public void run() throws Exception {
        System.err.println(
                "Proxying *:" + localPort + " to " +
                        remoteHost + ':' + remotePort + " ...");

        // Configure the bootstrap.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            // 注册handler
                            ch.pipeline().addLast(
                                    new ObjectEncoder(),
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new ProxyFrontendHandler(remoteHost, remotePort)
                            );
                        }
                    }).childOption(ChannelOption.AUTO_READ, false);

            ChannelFuture f = b.bind(localPort).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new Proxy(12359, "127.0.0.1", 12358).run();
    }
}

 

package proxy4;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class ProxyFrontendHandler extends ChannelInboundHandlerAdapter {

    private final String remoteHost;
    private final int remotePort;

    //代理服务器和目标服务器之间的通道(从代理服务器出去所以是outbound过境)
    private volatile Channel outboundChannel;


    /**
     * remoteHost和remotePort表示目标服务器
     *
     * @param remoteHost
     * @param remotePort
     */
    public ProxyFrontendHandler(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    /**
     * 当客户端和代理服务器建立通道连接时,调用此方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        /**
         * 客户端和代理服务器的连接通道
         * 入境的通道
         */
        final Channel inboundChannel = ctx.channel();

        // Start the connection attempt.
        Bootstrap b = new Bootstrap();
        b.group(inboundChannel.eventLoop())
                .channel(ctx.channel().getClass())
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(
                                new ObjectEncoder(),
                                new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                new ProxyBackendHandler(inboundChannel)
                        );
                    }
                });

        /**
         * 连接目标服务器
         * ChannelFuture
         * Netty中的IO操作是异步的,
         * 包括bind、write、connect等操作会简单的返回一个ChannelFuture,调用者并不能立刻获得结果。
         * 当future对象刚刚创建时,处于非完成状态。可以通过isDone()方法来判断当前操作是否完成。通过isSuccess()判断已完成的当前操作是否成功,getCause()来获取已完成的当前操作失败的原因,isCancelled()来判断已完成的当前操作是否被取消。
         * 调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
         */
        ChannelFuture f = b.connect(remoteHost, remotePort);
        /**
         * 获得代理服务器和目标服务器之间的连接通道
         */
        outboundChannel = f.channel();

        /**
         * ChannelFutureListener
         * 监听ChannelFuture的状态
         * 注册监听函数来执行完成后的操作
         */
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    inboundChannel.read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    inboundChannel.close();
                }
            }
        });
    }


    /**
     * 在这里接收客户端的消息
     * 在客户端和代理服务器建立连接时,也获得了代理服务器和目标服务器的通道outbound,
     * 通过outbound写入消息到目标服务器
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from client message:" + msg.toString());

        System.out.println("After receiving the proxy server to the client's message to the target server, said: I am a proxy server, a client asked me to say hello to you.");
        String proxyToServer = "I am a proxy server, the client asked me to say hello to you.";
        if (outboundChannel.isActive()) {
            outboundChannel.writeAndFlush(proxyToServer).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
                    }
                }
            });
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (outboundChannel != null) {
            closeOnFlush(outboundChannel);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

    /**
     * Closes the specified channel after all queued write requests are flushed.
     */
    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

 

package proxy4;

import io.netty.channel.*;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class ProxyBackendHandler extends ChannelInboundHandlerAdapter {

    private final Channel inboundChannel;

    public ProxyBackendHandler(Channel inboundChannel) {
        this.inboundChannel = inboundChannel;
    }

    //当和目标服务器的通道连接建立时
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>>>ACTIVE>>>>>>>>>>");
    }


    /**
     * msg是从目标服务器返回的消息
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("Target server returns data:" + msg.toString());
        /**
         * 接收目标服务器发送来的数据并打印
         * 然后把数据写入代理服务器和客户端的通道里
         */
        //通过inboundChannel向客户端写入数据
        System.out.println("After the proxy server receives a response to the target server to the client to say: I am a proxy server, the target server to replace him, I say to you, thank you. ");
        String resDataToClient = "I am a proxy server, the target server to replace him, I say to you, thank you.";
        inboundChannel.writeAndFlush(resDataToClient).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    inboundChannel.close();
                } else {
                    future.channel().close();
                }
            }
        });
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>>>IN-ACTIVE>>>>>>>>>>");
        ProxyFrontendHandler.closeOnFlush(inboundChannel);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ProxyFrontendHandler.closeOnFlush(ctx.channel());
    }
}

 

运行结果:

目标服务器

Target host:127.0.0.1 targetPort:12358

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

代理服务器

Proxying *:12359 to 127.0.0.1:12358 ...

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

>>>>>>>>>>ACTIVE>>>>>>>>>>

from client message:I'm client, give my regards to the target server!

After receiving the proxy server to the client's message to the target server, said: I am a proxy server, a client asked me to say hello to you.

Target server returns data:I was the target server, I thank you for the client.

After the proxy server receives a response to the target server to the client to say: I am a proxy server, the target server to replace him, I say to you, thank you. 

>>>>>>>>>>IN-ACTIVE>>>>>>>>>>

客户端

Client to the proxy server, said: I was a client, give my regards to the target server!

===========write message success==========

Receiving a message from the proxy server to:I am a proxy server, the target server to replace him, I say to you, thank you.

 

Process finished with exit code 0

==========END==========

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 241
博文 566
码字总数 417505
作品 0
朝阳
程序员
私信 提问
加载中

评论(6)

正在修行的W_强
正在修行的W_强
写的不错,代码注解很清晰,感谢分享
秋风醉了
秋风醉了

引用来自“Gelopa”的评论

这是 官网版...

引用来自“鑫鑫哥哥呀”的评论

嗯 对着官网的demo 做的~~

引用来自“Gelopa”的评论

但是这个 没有让客户端 得到target的响应信息..你自己实习了吗
现在不好使吗~~什么实习?
金贞花
金贞花

引用来自“Gelopa”的评论

这是 官网版...

引用来自“鑫鑫哥哥呀”的评论

嗯 对着官网的demo 做的~~
但是这个 没有让客户端 得到target的响应信息..你自己实习了吗
秋风醉了
秋风醉了

引用来自“Gelopa”的评论

这是 官网版...
嗯 对着官网的demo 做的~~
金贞花
金贞花
这是 官网版...
金贞花
金贞花
不错啊。为什么 没人来看看呢
JavaScript常用设计模式

设计模式 设计模式是一种在长时间的经验与错误中总结出来可服用的解决方案。 设计模式主要分为3类: 创建型设计模式:专注于处理对象的创建 Constructor构造器模式,Factory工厂模式,Singl...

a独家记忆
2018/07/13
0
0
《JavaScript设计模式与开发实践》原则篇(2)—— 最少知识原则

最少知识原则(LKP)说的是一个软件实体应当尽可能少地与其他实体发生相互作用。这 里的软件实体是一个广义的概念,不仅包括对象,还包括系统、类、模块、函数、变量等。 单一职责原则指导我们...

嗨呀豆豆呢
2018/12/30
0
0
JavaScript设计模式入坑

JavaScript设计模式入坑 介绍 设计模式编写易于维护的代码。 设计模式的开创者是一位土木工程师。Σ( ° △ °|||)︴,写代码就是盖房子。 模式 模式一种可以复用的解决方案。解决软件设计中...

小小小8021
2018/10/18
0
0
【设计模式笔记】(十六)- 代理模式

一、简述 代理模式(Proxy Pattern),为其他对象提供一个代理,并由代理对象控制原有对象的引用;也称为委托模式。 其实代理模式无论是在日常开发还是设计模式中,基本随处可见,中介者模式中...

MrTrying
2018/06/24
0
0
Java游戏服务器通用基础框架

[作品和服务描述] 此框架为本人从事7年游戏后端开发的技术积累,为通用性服务器开发框架。整个框架基于自行编写的事件通知机制(原型基于观察者模式+命令模式)。每个业务对应一个Command,通...

银杏果果
2016/10/09
3
0

没有更多内容

加载失败,请刷新页面

加载更多

苏宁金融红包系统大促海量流量背后的技术支撑

发红包是目前各大互联网公司最常用的营销手段之一,它形式多样,内容丰富。2016 年底苏宁金融开启了红包系统及相关系统的项目开发。 本文将对苏宁金融红包系统的架构部署方式、演变过程、技术...

架构师springboot
1分钟前
0
0
Linux恢复误删除的文件或者目录

2017 年 2 月 1 日 GitLab 数据库被误删引起了广大争议. linux不像windows有个回收站,使用rm -rf *基本上文件是找不回来的。 那么问题来了: 对于linux下误删的文件,我们是否真的无法通过软...

Goopand
2分钟前
0
0
从NeurIPS 2018看AI发展路线!

摘要: 从NeurIPS 2018看AI发展路线! 去年9月份的时候,我发表过一份技术报告,阐述了我认为人工智能最重要的挑战,大概有以下四个方面: ·可伸缩性(Scalability)计算或存储的成本不与神...

阿里云官方博客
3分钟前
0
0
快速入门:selenium自动化测试+ubuntu系统+php语言+firefox/chrome浏览器

前言 selenium可用于界面UI自动化测试,因此也可用于来做一些自动化方面的事情。下面简单总结概括一下,对于一位新手,学习和使用selenium的基本过程。 本文只要针对:selenium自动化测试+ub...

暗夜在火星
5分钟前
0
0
List集合知识总结

转载: 一:集合的概念 集合:保存数量不确定的数据,以及保存具有映射关系的数据的容器,简单的理解就是用于存储数量不等的多个对象的容器。 集合和数组不一样,数组元素既可以是基本类型的值...

小橙子的曼曼
5分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部