文档章节

Mina传输大数组,多路解码,粘包问题的处理

boonya
 boonya
发布于 2016/06/05 22:03
字数 1180
阅读 735
收藏 3

最近刚刚在做Java通信方面,初次接触mina,边根据网上查找的资料,结合自身的实际问题,作出了如下整理,希望能给类似问题的朋友帮助。

我的实际情况:

   1,传递的业务数据种类很多,这就决定了我们要用多路解码器,MINA的中文手册提供的是DemuxingProtocolCodecFactory;

  2,,有的数据长度达到8K,网上有资料说Mina在传输数据超过2K的情况下,会分片传输,因此要考虑如何来接收;

  3,若数据发送很快,或者网络状况不佳,很容易出现粘包的情况,这也是要解决的问题。

 

1)针对多路解码:

编码器:

   将编码器继承MessageEncoder<T>,T是你编码的对象的类,此中我是要编码Requstwork类;其中GetBytes()是我自己定义的将对象的数据组成字节数组的函数;

public class RequstNetworkEncoder implements MessageEncoder<RequstNetwork>{
    @Override
    public void encode(IoSession ioSession, RequstNetwork requstNetwork, ProtocolEncoderOutput out)
            throws Exception {
        if (requstNetwork != null) {
            byte[] bytes1 = GetBytes(requstNetwork);
            int capacity = bytes1.length;
            IoBuffer buffer = IoBuffer.allocate(capacity, false);
            buffer.setAutoExpand(true);
            buffer.put(bytes1);           
            buffer.flip();
            out.write(buffer);
        }
    }
}

对应的解码器:

public class RequstNetworkDecoder implements MessageDecoder {
    @Override
    public MessageDecoderResult decodable(IoSession ioSession, IoBuffer ioBuffer) {
        if(ioBuffer.remaining()<2){
            //还没有达到不同数据的标志位的地方
            return MessageDecoderResult.NEED_DATA;
        }
        else{
            ioBuffer.position(1);
            byte b=ioBuffer.get();
            if (b==(此处为区分不同数据的标志)){  
                return  MessageDecoderResult.OK;

            }
            else{
                return MessageDecoderResult.NOT_OK;
            }
        }
    }

    @Override
    public MessageDecoderResult decode(IoSession ioSession, IoBuffer in, ProtocolDecoderOutput out)
            throws Exception {
        RequstNetworkReply reply=new RequstNetworkReply();
       //自己解码的过程
        out.write(reply);
        return MessageDecoderResult.OK;
    }

    @Override
    public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {

    }
}

编解码工厂:

public class MyProtocolCodecFactory extends DemuxingProtocolCodecFactory {

    public MyProtocolCodecFactory(){
        super.addMessageEncoder(RequstNetwork.class,RequstNetworkEncoder.class);
        super.addMessageDecoder(RequstNetworkDecoder.class);

    }
}

针对大数组的传输和粘包,修改了一下网上的做法:

public class RequestPlanDecoder extends CumulativeProtocolDecoder {

    private final AttributeKey CONTEXT = new AttributeKey(getClass(),
            "context");

    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
            throws Exception {
      
        Context ctx =getContext(session);//获取session 的context

        long matchCount=ctx.getMatchLength();//目前已获取的数据
        long length=ctx.getLength();//数据总长度
        IoBuffer buffer=ctx.getBuffer();//数据存入buffer

        //第一次取数据
        if(length==0){
            length=in.getLong();
            //保存第一次获取的长度
            ctx.setLength(length);
            matchCount=in.remaining();
           ctx.setMatchLength(matchCount);
        }
        else{
            matchCount+=in.remaining();
            ctx.setMatchLength(matchCount);
        } 
        ctx.setMatchLength(matchCount); 
        if (in.hasRemaining()) {
           // 如果buff中还有数据 
           buffer.put(in);
           // 添加到保存数据的buffer中 
           if (matchCount >= length) { 
             ////自己解码的部分/////// 
              if(buffer.remaining() > 0) {
                 //如果读取一个完整包内容后还粘了包,就让父类再调用一次,进行下一次解析 
                 IoBuffer temp = IoBuffer.allocate(1024).setAutoExpand(true);
                 temp.put(buffer);
                 temp.flip();
                 in.sweep();
                 //清空数据 
                 in.put(temp);
              } 
             ctx.reset();//清空
             return true;  
           } else { 
             ctx.setBuffer(buffer); 
             return false; 
           } 
      } 
   return false; 
} 

//获取session的context 
public Context getContext(IoSession session) { 
    Context ctx = (Context) session.getAttribute(CONTEXT);
    if (ctx == null) { 
      ctx = new Context();
      session.setAttribute(CONTEXT, ctx);
    } 
    return ctx; 
} 
/** * 定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于大数据解析 **/
 private class Context {

   public IoBuffer buffer; 

   public long length = 0; 

   public long matchLength = 0; 

   public Context() { 
       buffer = IoBuffer.allocate(1024).setAutoExpand(true);
   }
   public void setBuffer(IoBuffer buffer) {
       this.buffer = buffer; 
   } 
   public void setLength(long length) {
       this.length = length; 
   }
   public void setMatchLength(long matchLength) { 
       this.matchLength = matchLength;
   } 
   public IoBuffer getBuffer() { 
       return buffer;
   }
   public long getLength() { 
       return length; 
   } 
   public long getMatchLength() {
       return matchLength;
    }
   public void reset(){ 
     this.buffer.clear();
     this.length=0; 
     this.matchLength=0; 
   }
 
  } 
} 

我想让传大数组的解码器能和其他解码器一起共用,通过查看官方的MINA API直到MessageDecoder就是继承了CumulativeProtocolDecoder,于是就做了如下结合:

public class RequestPlanDecode implements MessageDecoder  {
    private final AttributeKey CONTEXT = new AttributeKey(getClass(),
            "context");
    @Override
    public MessageDecoderResult decodable(IoSession ioSession, IoBuffer in) {
        if(in.remaining()<2){
            return MessageDecoderResult.NEED_DATA;
        }
        else{
            byte b1=in.get();
            byte b2=in.get();
            if(b2==<span style="font-family: Arial, Helvetica, sans-serif;">(此处为区分不同数据的标志)</span>){
                return MessageDecoderResult.OK;
            }
            else {
                return  MessageDecoderResult.NOT_OK;
            }
        }
    }

    @Override
    public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
            throws Exception {
        //=================结合CumulativeProtocolDecoder================//
        Context ctx =getContext(session);//获取session  的context

        long matchCount=ctx.getMatchLength();//目前已获取的数据
        long length=ctx.getLength();//数据总长度
        IoBuffer buffer=ctx.getBuffer();//数据存入buffer

        //第一次取数据
        if(length==0){
            length=in.getLong();
            //保存第一次获取的长度
            ctx.setLength(length);
            matchCount=in.remaining();
            ctx.setMatchLength(matchCount);
        }
        else{
            matchCount+=in.remaining();
            ctx.setMatchLength(matchCount);
        }
        if (in.hasRemaining()) {// 如果buff中还有数据
            buffer.put(in);// 添加到保存数据的buffer中
            if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
               ////自己解码的部分///////

                if(buffer.remaining() > 0) {////解决粘包
                    IoBuffer temp = IoBuffer.allocate(1024).setAutoExpand(true);
                    temp.put(buffer);
                    temp.flip();
                    in.sweep();
                    in.put(temp);
                }
                ctx.reset();
                return MessageDecoderResult.OK;

            } else {
                ctx.setBuffer(buffer);
                return MessageDecoderResult.NEED_DATA;
            }
        }
        return MessageDecoderResult.NEED_DATA;
    }

    @Override
    public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput)
            throws Exception {

    }
    /////////////////////////////////////结合CumulativeProtocolDecoder/////////////////////////////////////////////////
    //获取session的context
    public Context getContext(IoSession session) {
        Context ctx = (Context) session.getAttribute(CONTEXT);
        if (ctx == null) {
            ctx = new Context();
            session.setAttribute(CONTEXT, ctx);
        }
        return ctx;
    }
    /**
     * 定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于大数据解析
     *
     */
    private class Context {
        public IoBuffer buffer;
        public long length = 0;
        public long matchLength = 0;

        public Context() {
            buffer = IoBuffer.allocate(1024).setAutoExpand(true);
        }

        public void setBuffer(IoBuffer buffer) {
            this.buffer = buffer;
        }

        public void setLength(long length) {
            this.length = length;
        }
        public void setMatchLength(long matchLength) {
            this.matchLength = matchLength;
        }

        public IoBuffer getBuffer() {

            return buffer;
        }

        public long getLength() {
            return length;
        }

        public long getMatchLength() {
            return matchLength;
        }

        public  void reset(){
            this.buffer.clear();
            this.length=0;
            this.matchLength=0;
        }
    }

}

这样,我就可以愉快的多路解码啦!!!

本文转载自:http://blog.csdn.net/sangsa/article/details/49997279

共有 人打赏支持
boonya
粉丝 73
博文 214
码字总数 43922
作品 0
成都
高级程序员
通信(Netty、Mina2)【通信粘包的处理】、【数据协议】、【网络系统的安全性】

Netty、Mina2是非常优秀的javaNIO+ThreadPool线程池通信框架http://www.cnblogs.com/51cto/archive/2010/09/06/1819361.html提到通信就得面临两个问题,一是通信协议的选择,二是数据协议的定...

干死it
2014/06/24
0
0
Socket的半包,粘包与分包的问题

首先看两个概念: 短连接: 连接->传输数据->关闭连接 HTTP是无状态的,浏览器和服务器每进行一次HTTP操作,就建立一次连接,但任务结束就中断连接。 也可以这样说:短连接是指SOCKET连接后发...

ksfzhaohui
2012/12/14
0
0
如何使用Netty开发实现高性能的RPC服务器

RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的...

vshcxl
2017/10/20
0
0
读懂Netty服务端开发(附学习代码)

使用JDK NIO类库时开发NIO的异步服务端时,需要用到:多路复用器Selector,ServerSocketChannel,SocketChanel,ByteBuffer,SelectionKey等。如果用源生的JAVA NIO搭建服务端,无疑是十分复杂的...

@林文龙
06/14
0
0
使用Mina实现数据采集时出现的断包、半包的问题处理

1、之前写了一篇基于Mina实现的一个简单数据采集中间件 在数据采集的多次测试过程中发现有断包、半包的情况 如下: 上面的报文没结束(我们的协议都是以16结束) 上面的报文包头不正确(我们的协...

ytangdigl
2017/12/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

如何从平面设计转行到UI设计?

时代的变迁,科技的进步,工具的发展,薪资的差距,促使许多人转行的原因,但平面与界面两者之间有着哪些的差异呢?如果,想要转行又该具备哪些条件呢? 平面、界面设计之间的差异性 平面设计...

mo311
22分钟前
3
0
线程池整理

一般在生产环境中,我们都不会直接new一个Thread,然后再去start(),因为这么做会不断频繁的创建线程,销毁线程,过大的线程会耗尽CPU和内存资源,大量的垃圾回收,也会给GC带来压力,延长GC停顿时间...

算法之名
23分钟前
5
0
一句话说清分布式锁,进程锁,线程锁

转载自: https://www.cnblogs.com/intsmaze/p/6384105.html 在分布式集群中的开发中,线程锁往往不能支持全部场景的使用,必须引入新的分布式锁。 线程锁,进程锁,分布式锁 线程锁: 主要用...

nao
25分钟前
1
0
springboot2.x支持自定义JSP标签

JSP标签的tld必须放在webapp下的WEB-INF目录下 参考: https://stackoverflow.com/questions/44746757/using-custom-tag-files-in-jsp-with-spring-boot#...

EasyProgramming
27分钟前
1
0
pg数据库 upsert使用

原文地址 https://blog.csdn.net/rudygao/article/details/50498908 --创建测试数据表create table t (id int constraint idx_t_id primary key,name varchar(20) constraint cst_name no......

philonic
31分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部