文档章节

Netty实现自定义简单的编解码器(二)

秋风醉了
 秋风醉了
发布于 2014/06/23 02:12
字数 1837
阅读 8399
收藏 12

Netty实现自定义简单的编解码器(二)

关于编解码器请参见:http://my.oschina.net/xinxingegeya/blog/282878

主要思路:

 * 实现了一个IntegerEncoder编码器和一个IntegerDecoder解码器

 * 服务器端发送一个数字,此时客户端解码器会编码为一个字符串,发送到服务器端

 * 服务器先用解码器解码字符串为整形,打印出来。然后服务器端通过服务器端编码器编码后向客户端发送数字

 * 客户端通过客户端解码器解码后接收数据,打印出来


编码器

package codec2.encoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.util.List;

public class IntegerEncoder extends MessageToMessageEncoder<Integer> {


    /**
     * encode Integer messages to String messages
     *
     * @param ctx
     * @param msg
     * @param out
     * @throws Exception
     */
    @Override
    public void encode(ChannelHandlerContext ctx, Integer msg,
                       List<Object> out) throws Exception {
        System.err.println("IntegerEncoder encode msg is " + msg);

        //把int转换为字节数组
        byte[] result = new byte[4];
        result[0] = (byte) (msg.intValue() >>> 24);//取最高8位放到0下标
        result[1] = (byte) (msg.intValue() >>> 16);//取次高8为放到1下标
        result[2] = (byte) (msg.intValue() >>> 8); //取次低8位放到2下标
        result[3] = (byte) (msg.intValue());      //取最低8位放到3下标

        ByteBuf encoded = ctx.alloc().buffer(Integer.BYTES);
        encoded.writeBytes(result);
        out.add(encoded);
    }
}


解码器

package codec2.decoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.util.List;

public class IntegerDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
                          List<Object> out) throws Exception {
        System.err.println("IntegerDecoder decode msg is " + msg);
        out.add(msg.readInt());
    }
}


服务器端程序

package codec2;

import codec2.decoder.IntegerDecoder;
import codec2.encoder.IntegerEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 主要思路:
 * 实现了一个IntegerEncoder编码器和一个IntegerDecoder解码器
 * 服务器端发送一个数字,此时客户端解码器会编码为一个字符串,发送到服务器端
 * 服务器先用解码器解码字符串为整形,打印出来。然后服务器端通过服务器端编码器编码后向客户端发送数字
 * 客户端通过客户端解码器解码后接收数据,打印出来
 */
public class HelloServer {
    public void start(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            // 注册handler
                            ch.pipeline().addLast(
                                    new IntegerEncoder(),
                                    new IntegerDecoder(),
                                    new HelloServerInHandler());
                        }
                    });

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        HelloServer server = new HelloServer();
        server.start(12345);
    }
}
package codec2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

// 该handler是InboundHandler类型
public class HelloServerInHandler extends ChannelInboundHandlerAdapter {
    @Override
    public boolean isSharable() {
        System.out.println("==============handler-sharable==============");
        return super.isSharable();
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("==============channel-register==============");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("==============channel-unregister==============");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("==============channel-active==============");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("==============channel-inactive==============");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("==============channel-read==============");
        System.out.println("the msg type is " + msg.getClass().getName());
        Integer integer = (Integer) msg;
        System.out.println("服务器端接收到的客户端的数字是" + integer);

        System.out.println("服务器向客户端写入整型数字2001");
        ctx.writeAndFlush(new Integer(2001));
        ctx.close();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("==============channel-read-complete==============");
        ctx.flush();
    }
}


客户端程序

package codec2;

import codec2.decoder.IntegerDecoder;
import codec2.encoder.IntegerEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * 主要思路:
 * 实现了一个IntegerToStringEncoder编码器和一个StringToIntegerDecoder解码器
 * 服务器端发送一个数字2000,此时客户端解码器会编码为一个字符串,发送到服务器端
 * 服务器先用解码器解码字符串为整形,打印出来。然后服务器端通过服务器端编码器编码后向客户端发送数字
 * 客户端通过客户端解码器解码后接收数据,打印出来
 */
public class HelloClient {

    public void connect(String host, int port) throws Exception {

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.AUTO_READ, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                            new IntegerEncoder(),
                            new IntegerDecoder(),
                            new HelloClientIntHandler());
                }
            });
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        HelloClient client = new HelloClient();
        client.connect("192.168.0.102", 12345);
    }
}
package codec2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


//InboundHandler类型
public class HelloClientIntHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("==============channel--register==============");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("==============channel--unregistered==============");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("==============channel--inactive==============");
    }

    // 连接成功后,向server发送消息
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("==============channel--active==============");
        System.out.println("向服务器端写入2000数字");
        ctx.writeAndFlush(new Integer(2000));
    }

    // 接收server端的消息,并打印出来
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("==============channel--read==============");
        System.out.println("the msg type is " + msg.getClass().getName());

        Integer result = (Integer) msg;
        System.out.println("接收到服务器数据,该数字是" + result);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}


运行结果:

服务器端

==============handler-sharable==============

==============channel-register==============

==============channel-active==============

IntegerDecoder decode msg is SimpleLeakAwareByteBuf(UnpooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 1024))

IntegerEncoder encode msg is 2001

==============channel-read==============

the msg type is java.lang.Integer

服务器端接收到的客户端的数字是2000

服务器向客户端写入整型数字2001

==============channel-read-complete==============

==============channel-inactive==============

==============channel-unregister==============

客户端

==============channel--register==============

IntegerEncoder encode msg is 2000

==============channel--active==============

向服务器端写入2000数字

==============channel--read==============

the msg type is java.lang.Integer

接收到服务器数据,该数字是2001

==============channel--inactive==============

==============channel--unregistered==============

IntegerDecoder decode msg is UnpooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 1024)


Process finished with exit code 0


后记:

我想到netty中有两个现成的编解码器是StringEncoder和StringDecoder,看代码:

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.handler.codec.string;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.List;

/**
 * Encodes the requested {@link String} into a {@link ByteBuf}.
 * A typical setup for a text-based line protocol in a TCP/IP socket would be:
 * <pre>
 * {@link ChannelPipeline} pipeline = ...;
 *
 * // Decoders
 * pipeline.addLast("frameDecoder", new {@link LineBasedFrameDecoder}(80));
 * pipeline.addLast("stringDecoder", new {@link StringDecoder}(CharsetUtil.UTF_8));
 *
 * // Encoder
 * pipeline.addLast("stringEncoder", new {@link StringEncoder}(CharsetUtil.UTF_8));
 * </pre>
 * and then you can use a {@link String} instead of a {@link ByteBuf}
 * as a message:
 * <pre>
 * void channelRead({@link ChannelHandlerContext} ctx, {@link String} msg) {
 *     ch.write("Did you say '" + msg + "'?\n");
 * }
 * </pre>
 */
@Sharable
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {

    // TODO Use CharsetEncoder instead.
    private final Charset charset;

    /**
     * Creates a new instance with the current system character set.
     */
    public StringEncoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public StringEncoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
        if (msg.length() == 0) {
            return;
        }

        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
    }
}
/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.handler.codec.string;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.nio.charset.Charset;
import java.util.List;

/**
 * Decodes a received {@link ByteBuf} into a {@link String}.  Please
 * note that this decoder must be used with a proper {@link ByteToMessageDecoder}
 * such as {@link DelimiterBasedFrameDecoder} or {@link LineBasedFrameDecoder}
 * if you are using a stream-based transport such as TCP/IP.  A typical setup for a
 * text-based line protocol in a TCP/IP socket would be:
 * <pre>
 * {@link ChannelPipeline} pipeline = ...;
 *
 * // Decoders
 * pipeline.addLast("frameDecoder", new {@link LineBasedFrameDecoder}(80));
 * pipeline.addLast("stringDecoder", new {@link StringDecoder}(CharsetUtil.UTF_8));
 *
 * // Encoder
 * pipeline.addLast("stringEncoder", new {@link StringEncoder}(CharsetUtil.UTF_8));
 * </pre>
 * and then you can use a {@link String} instead of a {@link ByteBuf}
 * as a message:
 * <pre>
 * void channelRead({@link ChannelHandlerContext} ctx, {@link String} msg) {
 *     ch.write("Did you say '" + msg + "'?\n");
 * }
 * </pre>
 */
@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {

    // TODO Use CharsetDecoder instead.
    private final Charset charset;

    /**
     * Creates a new instance with the current system character set.
     */
    public StringDecoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public StringDecoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        out.add(msg.toString(charset));
    }
}

你看吧,传输String还得有编解码器。

所以就根据这对编解码器实现了上述编解码器。

===========END===========

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 232
博文 574
码字总数 405033
作品 0
朝阳
程序员
Netty 系列六(编解码器).

一、概念 网络传输的单位是字节,如何将应用程序的数据转换为字节,以及将字节转换为应用程序的数据,就要说到到我们该篇介绍的编码器和解码器。 将应用程序的数据转换为网络格式,以及将网络...

JMCui
昨天
0
0
《Netty In Action》第七章 编解码器Codec

本章介绍 Codec,编解码器 Decoder,解码器 Encoder,编码器 Netty提供了编解码器框架,使得编写自定义的编解码器很容易,并且也很容易重用和封装。本章讨论Netty的编解码器框架以及使用。 ...

残刃O
02/05
5
0
第七章:编解码器Codec

本章介绍 Codec,编解码器 Decoder,解码器 Encoder,编码器 7.1 编解码器Codec 编写一个网络应用程序需要实现某种编解码器,编解码器的作用就是讲原始字节数据与自定义的消息对象进行互转。...

李矮矮
2016/09/26
28
0
第十四章:实现自定义的编码解码器

14.1 编解码器的范围 我们将只实现Memcached协议的一个子集,这足够我们进行添加、检索、删除对象;在Memcached中是通过执行SET,GET,DELETE命令来实现的。Memcached支持很多其他的命令,但我...

李矮矮
2016/09/27
14
0
Netty in Action ——— The codec framework

本文是Netty文集中“Netty in action”系列的文章。主要是对Norman Maurer and Marvin Allen Wolfthal 的 《Netty in action》一书简要翻译,同时对重要点加上一些自己补充和扩展。 本章含盖...

tomas家的小拨浪鼓
2017/12/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Zookeeper总结

Zookeeper的部分概念 什么是zookeeeper? Zookeeper是一个分布式服务的协调中心 zookeeper节点的角色类型? Leader(领导者)、Follower(跟随者)、Observer(观察者) Leader 负责更新系统...

DemonsI
18分钟前
1
0
Redis学习笔记

常用命令 从Docker进入Redis的命令 sudo docker exec -it redis /bin/bash

OSC_fly
19分钟前
0
0
SqlServer查询某个日期的数据

select * from View_ZJMONITORINGCORROSION where ENTERDATE > CONVERT(datetime,DATEADD(day,1,'2017/12/28 14:53:07'))...

笑丶笑
20分钟前
0
0
常用编码规范

Standard characters https://ascii.cl/

yeahlife
22分钟前
0
0
flannel实战

docker swarm mode的出现是个里程碑,官方原生的编排调度看起来都成雏形了,但是swarm mode和容器外部系统的对接、网络性能始终不尽人意,swarm mode下各种开源周边不能使用,感觉swarm mod...

China_OS
24分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部