文档章节

Netty实现代理模式的通信

秋风醉了
 秋风醉了
发布于 2014/06/24 16:23
字数 2374
阅读 4949
收藏 21

该demo是从netty提供的demo修改并添加注释

按我的理解简单的说一下netty中代理通信的编程模型。

代理就是通过一个中间服务器去目标服务器请求所要的信息,然后通过代理服务器返回信息给客户端。

在netty中,netty客户端请求代理服务器,连接到代理服务器时->inbound,代理服务器马上去连接目标服务器->outbound,当连接目标服务器成功时,这是开始读取客户端给代理服务器的数据,然后使用outbound写入目标服务器,目标服务器返回数据,代理服务器读取,并使用inbound把数据返回客户端。

 

客户端程序

package proxy4;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:32
 * To change this template use File | Settings | File Templates.
 */
public class Client {

    public void connect(String host, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                            new ObjectEncoder(),
                            new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                            new ClientHandler()
                    );
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        new Client().connect("127.0.0.1", 12359);
    }
}

 

package proxy4;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("Client to the proxy server, said: I was a client, give my regards to the target server!");
        ChannelFuture f = ctx.writeAndFlush("I'm client, give my regards to the target server!");
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                System.out.println("===========write message success==========");
            }
        });
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String mess = (String) msg;
        System.out.println("Receiving a message from the proxy server to:" + mess);
        ctx.close();
    }
}

 

目标服务器端程序

package proxy4;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class Target {

    private String targetHost;
    private int targetPort;

    public Target(String targetHost, int targetPort) {
        this.targetHost = targetHost;
        this.targetPort = targetPort;
    }

    public void run() throws Exception {
        System.err.println("Target host:" + targetHost + " targetPort:" + targetPort);

        // Configure the bootstrap.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            // 注册handler
                            ch.pipeline().addLast(
                                    new ObjectEncoder(),
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new TargetHandler()
                            );
                        }
                    })
                    .bind(targetPort).sync().channel().closeFuture().sync();
            //监听本地的一个端口,当有客户端请求时,然后向目标服务器发送请求,获取消息,然后发送给客户端
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new Target("127.0.0.1", 12358).run();
    }
}

 

package proxy4;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:34
 * To change this template use File | Settings | File Templates.
 */
public class TargetHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>ACTIVE>>>>>>>>");
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("proxy said:" + msg.toString());
        // 向代理服务器发送消息
        System.out.println("After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.");
        String response = "I was the target server, I thank you for the client.";
        ctx.write(response);
        ctx.flush();
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>IN-ACTIVE>>>>>>>>");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

    /**
     * Closes the specified channel after all queued write requests are flushed.
     */
    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

 

代理服务器端程序

package proxy4;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class Proxy {

    private final int localPort;
    private final String remoteHost;
    private final int remotePort;

    /**
     * @param localPort
     * @param remoteHost
     * @param remotePort
     */
    public Proxy(int localPort, String remoteHost, int remotePort) {
        this.localPort = localPort;
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    public void run() throws Exception {
        System.err.println(
                "Proxying *:" + localPort + " to " +
                        remoteHost + ':' + remotePort + " ...");

        // Configure the bootstrap.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            // 注册handler
                            ch.pipeline().addLast(
                                    new ObjectEncoder(),
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new ProxyFrontendHandler(remoteHost, remotePort)
                            );
                        }
                    }).childOption(ChannelOption.AUTO_READ, false);

            ChannelFuture f = b.bind(localPort).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new Proxy(12359, "127.0.0.1", 12358).run();
    }
}

 

package proxy4;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class ProxyFrontendHandler extends ChannelInboundHandlerAdapter {

    private final String remoteHost;
    private final int remotePort;

    //代理服务器和目标服务器之间的通道(从代理服务器出去所以是outbound过境)
    private volatile Channel outboundChannel;


    /**
     * remoteHost和remotePort表示目标服务器
     *
     * @param remoteHost
     * @param remotePort
     */
    public ProxyFrontendHandler(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    /**
     * 当客户端和代理服务器建立通道连接时,调用此方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        /**
         * 客户端和代理服务器的连接通道
         * 入境的通道
         */
        final Channel inboundChannel = ctx.channel();

        // Start the connection attempt.
        Bootstrap b = new Bootstrap();
        b.group(inboundChannel.eventLoop())
                .channel(ctx.channel().getClass())
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(
                                new ObjectEncoder(),
                                new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                new ProxyBackendHandler(inboundChannel)
                        );
                    }
                });

        /**
         * 连接目标服务器
         * ChannelFuture
         * Netty中的IO操作是异步的,
         * 包括bind、write、connect等操作会简单的返回一个ChannelFuture,调用者并不能立刻获得结果。
         * 当future对象刚刚创建时,处于非完成状态。可以通过isDone()方法来判断当前操作是否完成。通过isSuccess()判断已完成的当前操作是否成功,getCause()来获取已完成的当前操作失败的原因,isCancelled()来判断已完成的当前操作是否被取消。
         * 调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
         */
        ChannelFuture f = b.connect(remoteHost, remotePort);
        /**
         * 获得代理服务器和目标服务器之间的连接通道
         */
        outboundChannel = f.channel();

        /**
         * ChannelFutureListener
         * 监听ChannelFuture的状态
         * 注册监听函数来执行完成后的操作
         */
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    inboundChannel.read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    inboundChannel.close();
                }
            }
        });
    }


    /**
     * 在这里接收客户端的消息
     * 在客户端和代理服务器建立连接时,也获得了代理服务器和目标服务器的通道outbound,
     * 通过outbound写入消息到目标服务器
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("from client message:" + msg.toString());

        System.out.println("After receiving the proxy server to the client's message to the target server, said: I am a proxy server, a client asked me to say hello to you.");
        String proxyToServer = "I am a proxy server, the client asked me to say hello to you.";
        if (outboundChannel.isActive()) {
            outboundChannel.writeAndFlush(proxyToServer).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
                    }
                }
            });
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (outboundChannel != null) {
            closeOnFlush(outboundChannel);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

    /**
     * Closes the specified channel after all queued write requests are flushed.
     */
    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

 

package proxy4;

import io.netty.channel.*;

/**
 * Created with IntelliJ IDEA.
 * User: ASUS
 * Date: 14-6-24
 * Time: 下午3:33
 * To change this template use File | Settings | File Templates.
 */
public class ProxyBackendHandler extends ChannelInboundHandlerAdapter {

    private final Channel inboundChannel;

    public ProxyBackendHandler(Channel inboundChannel) {
        this.inboundChannel = inboundChannel;
    }

    //当和目标服务器的通道连接建立时
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>>>ACTIVE>>>>>>>>>>");
    }


    /**
     * msg是从目标服务器返回的消息
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("Target server returns data:" + msg.toString());
        /**
         * 接收目标服务器发送来的数据并打印
         * 然后把数据写入代理服务器和客户端的通道里
         */
        //通过inboundChannel向客户端写入数据
        System.out.println("After the proxy server receives a response to the target server to the client to say: I am a proxy server, the target server to replace him, I say to you, thank you. ");
        String resDataToClient = "I am a proxy server, the target server to replace him, I say to you, thank you.";
        inboundChannel.writeAndFlush(resDataToClient).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    inboundChannel.close();
                } else {
                    future.channel().close();
                }
            }
        });
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(">>>>>>>>>>IN-ACTIVE>>>>>>>>>>");
        ProxyFrontendHandler.closeOnFlush(inboundChannel);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ProxyFrontendHandler.closeOnFlush(ctx.channel());
    }
}

 

运行结果:

目标服务器

Target host:127.0.0.1 targetPort:12358

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

>>>>>>>>ACTIVE>>>>>>>>

proxy said:I am a proxy server, the client asked me to say hello to you.

After the target server to the proxy server receives a message to the proxy server, said: I am the target server, I thank you on behalf of the client.

>>>>>>>>IN-ACTIVE>>>>>>>>

代理服务器

Proxying *:12359 to 127.0.0.1:12358 ...

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

>>>>>>>>>>ACTIVE>>>>>>>>>>

from client message:I'm client, give my regards to the target server!

After receiving the proxy server to the client's message to the target server, said: I am a proxy server, a client asked me to say hello to you.

Target server returns data:I was the target server, I thank you for the client.

After the proxy server receives a response to the target server to the client to say: I am a proxy server, the target server to replace him, I say to you, thank you. 

>>>>>>>>>>IN-ACTIVE>>>>>>>>>>

客户端

Client to the proxy server, said: I was a client, give my regards to the target server!

===========write message success==========

Receiving a message from the proxy server to:I am a proxy server, the target server to replace him, I say to you, thank you.

 

Process finished with exit code 0

==========END==========

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 236
博文 577
码字总数 418437
作品 0
朝阳
程序员
加载中

评论(6)

正在修行的W_强
正在修行的W_强
写的不错,代码注解很清晰,感谢分享
秋风醉了
秋风醉了

引用来自“Gelopa”的评论

这是 官网版...

引用来自“鑫鑫哥哥呀”的评论

嗯 对着官网的demo 做的~~

引用来自“Gelopa”的评论

但是这个 没有让客户端 得到target的响应信息..你自己实习了吗
现在不好使吗~~什么实习?
金贞花
金贞花

引用来自“Gelopa”的评论

这是 官网版...

引用来自“鑫鑫哥哥呀”的评论

嗯 对着官网的demo 做的~~
但是这个 没有让客户端 得到target的响应信息..你自己实习了吗
秋风醉了
秋风醉了

引用来自“Gelopa”的评论

这是 官网版...
嗯 对着官网的demo 做的~~
金贞花
金贞花
这是 官网版...
金贞花
金贞花
不错啊。为什么 没人来看看呢
JavaScript常用设计模式

设计模式 设计模式是一种在长时间的经验与错误中总结出来可服用的解决方案。 设计模式主要分为3类: 创建型设计模式:专注于处理对象的创建 Constructor构造器模式,Factory工厂模式,Singl...

a独家记忆
07/13
0
0
设计模式: Java中的工厂设计模式

原文链接 https://github.com/shellhub/blog/issues/22 前言 工厂设计模式(Factory Design Pattern)属于创建模式之一,工厂设计模式在JDK,Spring,Stuts被广泛使用 当一个类或者接口有多个子类...

shellhub
08/22
0
0
Tomcat 系统架构与设计模式_ 设计模式分析

门面设计模式 门面设计模式在 Tomcat 中有多处使用,在 Request 和 Response 对象封装中、Standard Wrapper 到 ServletConfig 封装中、ApplicationContext 到 ServletContext 封装中等都用到...

lvzjane
2014/11/03
0
0
你需要了解的23种JavaScript设计模式

为什么要学习设计模式? 在许多访谈中,你可能会遇到很多面向对象编程中的接口,抽象类,代理和以及其他与设计模式相关的问题。 一旦了解了设计模式,它会让你轻松应对任何访谈,并可以在你的...

java高级架构牛人
06/02
0
0
ListView 数据与UI更新机制之观察者模式

之前有一篇文章专门介绍观察者模式知识,当时通过EventBus来进行分析。近日在读《Android源码设计模式解析与实战》,看到书中介绍ListView中使用到观察者模式。为了加深对观察者模式的理解,...

Tifkingsly
07/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

df和du命令 磁盘分区

9月25日任务 4.1 df命令 4.2 du命令 4.3/4.4 磁盘分区 df 命令 查看系统磁盘空间使用情况 常用参数 -h (human-readble) # 人性化的为空间大小转换单位显示[root@centos7 ~]# df -h文件系...

robertt15
17分钟前
1
0
【a&sAI创新】华为CloudIVS轻量云:借力打力,四两拨千斤

[摘要] 1、测评表现就目前来说,AI的门槛还是比较高的,业内所熟知的算法企业,为了追求算法的极致,比如人脸识别公司,仍在不断扩大研发...   1、测评表现   就目前来说,AI的门槛还是比...

yizhichao
34分钟前
2
0
安卓安装APK报错error: INSTALL_FAILED_UPDATE_INCOMPATIBLE

记录一个 DELETE_FAILED_INTERNAL_ERROR Error while Installing APK问题 之前遇到这个问题 方案1 将data/data/目录下该应用的包名的目录删除掉,如:adb shell rm -rf data/data/com.demo.h...

切切歆语
34分钟前
1
0
用网关zuul时,熔断hytrix里面的坑

1,zuul 默认的隔离级别是信号量,默认最大隔离信号量是100 信号量隔离和线程池隔离的区别如下: https://my.oschina.net/u/867417/blog/2120713 默认设置: 2,zuul里隔离是按服务隔离的,也...

爱吃大肉包
35分钟前
0
0
使用SSM+Solr优雅的实现电商项目中的搜索功能

在学习了Redis&Spring-Data-Redis入门和Solr&Spring-Data-Solr入门后,接下来就该是项目实战了。这次我们用Vue.JS和ElementUI写前端页面,优雅的整合SSM-Shiro-Redis-Solr框架。 手摸手教你优...

TyCoding
35分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部