文档章节

Java NIO_Socket

秋风醉了
 秋风醉了
发布于 2014/05/02 17:13
字数 1224
阅读 236
收藏 1
Java NIO_Socket

SocketChannel

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。可以通过以下2种方式创建SocketChannel:

  • 打开一个SocketChannel并连接到互联网上的某台服务器。

  • 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel。

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));

ServerSocketChannel

Java NIO中的 ServerSocketChannel 是一个可以监听新进来的TCP连接的通道, 就像标准IO中的ServerSocket一样。

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));

while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    //do something with socketChannel...
}

如以下代码,

MyClient.java

package com.lyx.nio.socket;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MyClient {

    private final static Logger logger = Logger.getLogger(MyClient.class.getName());

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 100; i++) {
            final int idx = i;
            new Thread(new MyRunnable(idx)).start();
        }
    }

    private static final class MyRunnable implements Runnable {

        private final int idx;

        private MyRunnable(int idx) {
            this.idx = idx;
        }

        public void run() {
            //SocketChannel是一个连接到TCP网络套接字的通道
            SocketChannel socketChannel = null;
            try {
                //创建一个SocketChannel
                socketChannel = SocketChannel.open();
                //InetSocketAddress此类实现IP套接字地址(IP 地址 + 端口号)
                SocketAddress socketAddress = new InetSocketAddress("localhost", 10000);
                socketChannel.connect(socketAddress);

                MyRequestObject myRequestObject = new MyRequestObject("request_" + idx, "request_" + idx);
                logger.log(Level.INFO, myRequestObject.toString());
                sendData(socketChannel, myRequestObject);

                MyResponseObject myResponseObject = receiveData(socketChannel);
                logger.log(Level.INFO, myResponseObject.toString());
            } catch (Exception ex) {
                logger.log(Level.SEVERE, null, ex);
            } finally {
                try {
                    socketChannel.close();
                } catch (Exception ex) {
                }
            }
        }

        private void sendData(SocketChannel socketChannel, MyRequestObject myRequestObject) throws IOException {
            byte[] bytes = SerializableUtil.toBytes(myRequestObject);
            ByteBuffer buffer = ByteBuffer.wrap(bytes);
            socketChannel.write(buffer);
            socketChannel.socket().shutdownOutput();
        }

        private MyResponseObject receiveData(SocketChannel socketChannel) throws IOException {
            MyResponseObject myResponseObject = null;
            ByteArrayOutputStream baos = new ByteArrayOutputStream();

            try {
                //字节缓冲区ByteBuffer
                //是一个直接字节缓冲区
                ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                byte[] bytes;
                int count = 0;
                while ((count = socketChannel.read(buffer)) >= 0) {
                    /**
                     * flip()方法
                     * flip方法将Buffer从写模式切换到读模式。调用flip()方法会将position设回0,并将limit设置成之前position的值。
                     * 换句话说,position现在用于标记读的位置,limit表示之前写进了多少个byte、char等 —— 现在能读取多少个byte、char等。
                     */
                    buffer.flip();
                    bytes = new byte[count];
                    //使用get()方法从Buffer中读取数据
                    buffer.get(bytes);
                    baos.write(bytes);
                    buffer.clear();
                }
                bytes = baos.toByteArray();
                //反序列化
                Object obj = SerializableUtil.toObject(bytes);
                myResponseObject = (MyResponseObject) obj;
                socketChannel.socket().shutdownInput();
            } finally {
                try {
                    baos.close();
                } catch (Exception ex) {
                }
            }
            return myResponseObject;
        }
    }
}

MyServer.java

package com.lyx.nio.socket;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MyServer {

    private final static Logger logger = Logger.getLogger(MyServer.class.getName());

    public static void main(String[] args) {
        Selector selector = null;
        ServerSocketChannel serverSocketChannel = null;

        try {
            //打开一个selector
            selector = Selector.open();

            // Create a new server socket and set to non blocking mode
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);

            // Bind the server socket to the local host and port
            serverSocketChannel.socket().setReuseAddress(true);
            serverSocketChannel.socket().bind(new InetSocketAddress(10000));

            // Register accepts on the server socket with the selector. This
            // step tells the selector that the socket wants to be put on the
            // ready list when accept operations occur, so allowing multiplexed
            // non-blocking I/O to take place.
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            // Here's where everything happens. The select method will
            // return when any operations registered above have occurred, the
            // thread has been interrupted, etc.
            while (selector.select() > 0) {
                // Someone is ready for I/O, get the ready keys
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                // Walk through the ready keys collection and process date requests.
                while (it.hasNext()) {
                    SelectionKey readyKey = it.next();
                    it.remove();

                    // The key indexes into the selector so you
                    // can retrieve the socket that's ready for I/O
                    execute((ServerSocketChannel) readyKey.channel());
                }
            }
        } catch (ClosedChannelException ex) {
            logger.log(Level.SEVERE, null, ex);
        } catch (IOException ex) {
            logger.log(Level.SEVERE, null, ex);
        } finally {
            try {
                selector.close();
            } catch (Exception ex) {
            }
            try {
                serverSocketChannel.close();
            } catch (Exception ex) {
            }
        }
    }

    private static void execute(ServerSocketChannel serverSocketChannel) throws IOException {
        SocketChannel socketChannel = null;
        try {
            socketChannel = serverSocketChannel.accept();
            MyRequestObject myRequestObject = receiveData(socketChannel);
            logger.log(Level.INFO, myRequestObject.toString());

            MyResponseObject myResponseObject = new MyResponseObject(
                    "response for " + myRequestObject.getName(),
                    "response for " + myRequestObject.getValue());
            sendData(socketChannel, myResponseObject);
            logger.log(Level.INFO, myResponseObject.toString());
        } finally {
            try {
                socketChannel.close();
            } catch (Exception ex) {
            }
        }
    }

    private static MyRequestObject receiveData(SocketChannel socketChannel) throws IOException {
        MyRequestObject myRequestObject = null;
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        try {
            byte[] bytes;
            int size = 0;
            /**
             * 从SocketChannel中读取数据
             * socketChannel.read(buffer)
             * 首先,分配一个Buffer。从SocketChannel读取到的数据将会放到这个Buffer中。
             * 然后,调用SocketChannel.read()。该方法将数据从SocketChannel 读到Buffer中。
             * read()方法返回的int值表示读了多少字节进Buffer里。如果返回的是-1,表示已经读到了流的末尾(连接关闭了)。
             */
            while ((size = socketChannel.read(buffer)) >= 0) {
                buffer.flip();
                bytes = new byte[size];
                buffer.get(bytes);
                baos.write(bytes);
                buffer.clear();
            }
            bytes = baos.toByteArray();
            Object obj = SerializableUtil.toObject(bytes);
            myRequestObject = (MyRequestObject) obj;
        } finally {
            try {
                baos.close();
            } catch (Exception ex) {
            }
        }
        return myRequestObject;
    }

    private static void sendData(SocketChannel socketChannel, MyResponseObject myResponseObject) throws IOException {
        byte[] bytes = SerializableUtil.toBytes(myResponseObject);
        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        /**
         * 写入 SocketChannel
         * 写数据到SocketChannel用的是SocketChannel.write()方法,
         * 该方法以一个Buffer作为参数
         */
        socketChannel.write(buffer);
    }
}

辅助类

MyRequestObject.java

package com.lyx.nio.socket;

import java.io.Serializable;

public class MyRequestObject implements Serializable {

    private static final long serialVersionUID = 1L;

    private String name;

    private String value;

    private byte[] bytes;

    public MyRequestObject(String name, String value) {
        this.name = name;
        this.value = value;
        this.bytes = new byte[1024];
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append("Request [name: " + name + ", value: " + value + ", bytes: " + bytes.length + "]");
        return sb.toString();
    }
}

MyResponseObject.java

package com.lyx.nio.socket;

import java.io.Serializable;

public class MyResponseObject implements Serializable {

    private static final long serialVersionUID = 1L;

    private String name;

    private String value;

    private byte[] bytes;

    public MyResponseObject(String name, String value) {
        this.name = name;
        this.value = value;
        this.bytes = new byte[1024];
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append("Response [name: " + name + ", value: " + value + ", bytes: " + bytes.length + "]");
        return sb.toString();
    }
}

SerializableUtil.java

package com.lyx.nio.socket;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class SerializableUtil {

    public static byte[] toBytes(Object object) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = null;
        try {
            oos = new ObjectOutputStream(baos);
            oos.writeObject(object);
            byte[] bytes = baos.toByteArray();
            return bytes;
        } catch (IOException ex) {
            throw new RuntimeException(ex.getMessage(), ex);
        } finally {
            try {
                oos.close();
            } catch (Exception e) {
            }
        }
    }

    public static Object toObject(byte[] bytes) {
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        ObjectInputStream ois = null;
        try {
            ois = new ObjectInputStream(bais);
            Object object = ois.readObject();
            return object;
        } catch (IOException ex) {
            throw new RuntimeException(ex.getMessage(), ex);
        } catch (ClassNotFoundException ex) {
            throw new RuntimeException(ex.getMessage(), ex);
        } finally {
            try {
                ois.close();
            } catch (Exception e) {
            }
        }
    }
}

==========END==========

© 著作权归作者所有

秋风醉了
粉丝 252
博文 532
码字总数 405690
作品 0
朝阳
程序员
私信 提问
最近仔细研究了一下Java的NIO以及线程并发,搞清了点思路,特作笔记如下(NIO篇)

[转]http://www.cnblogs.com/feidao/archive/2005/07/15/193788.html 因为前段时间的项目需要写一些高性能服务器,结果写出来的结果是五花八门,我们要求使用NIO编写异步服务器,但是竟然有人...

风林火山
2010/12/26
2.4K
2
Java NIO 和 IO的区别

Java NIO 和 IO的区别 Java IO 中,ServerSocket 负责绑定 IP 地址,启动监听端口;Socket 负责发起连接操作,连接成功后,双方通过输入和输出流进行同步阻塞通信。采用 BIO 通信模型的 Serv...

秋风醉了
2014/11/01
353
0
Qzone 微信 Java高级——dubbo源码分析之远程通信 netty

Java高级——dubbo源码分析之远程通信 netty dubbo 底层通信选择了 netty 这个 nio 框架做为默认的网络通信框架并且通过自定义协议进行通信。dubbo 支持以下网络通信框架: Netty(默认) Min...

Java架构师那些事
2018/08/29
0
0
smart-ioc 首版发布:为 Android 打造的国产 NIO 通信框架

项目背景 在几年前作者便开始NIO的学习与研究,并在码云上提交了第一个作品smart-socket(NIO版)。本来期望将其打造成异步非阻塞的通信框架,如同netty一样,却最终效果并不理想。恰逢Java ...

三刀蜀黍
2018/05/28
2.8K
13
SpringBoot 启动参数设置环境变量、JVM参数、tomcat远程调试

->博客已准备转移到博客园<- java命令的模版:java [-options] -jar jarfile [args...] 先贴一下我的简单的启动命令: java -Xms128m -Xmx256m -Xdebug -Xrunjdwp:server=y,transport=dt_soc......

Sgmder
2018/01/24
14.5K
2

没有更多内容

加载失败,请刷新页面

加载更多

可见性有序性,Happens-before来搞定

写在前面 上一篇文章并发 Bug 之源有三,请睁大眼睛看清它们 谈到了可见性/原子性/有序性三个问题,这些问题通常违背我们的直觉和思考模式,也就导致了很多并发 Bug 为了解决 CPU,内存,IO ...

tan日拱一兵
41分钟前
3
0
网络七层模型与TCP/UDP

为了使全球范围内不同的计算机厂家能够相互之间能够比较协调的进行通信,这个时候就有必要建立一种全球范围内的通用协议,以规范各个厂家之间的通信接口,这就是网络七层模型的由来。本文首先...

爱宝贝丶
44分钟前
7
0
Jenkins World 贡献者峰会及专家答疑展位

本文首发于:Jenkins 中文社区 原文链接 作者:Marky Jackson 译者:shunw Jenkins World 贡献者峰会及专家答疑展位 本文为 Jenkins World 贡献者峰会活动期间的记录 Jenkins 15周岁啦!Jen...

Jenkins中文社区
今天
10
0
杂谈:面向微服务的体系结构评审中需要问的三个问题

面向微服务的体系结构如今风靡全球。这是因为更快的部署节奏和更低的成本是面向微服务的体系结构的基本承诺。 然而,对于大多数试水的公司来说,开发活动更多的是将现有的单块应用程序转换为...

liululee
今天
8
0
OSChina 周二乱弹 —— 我等饭呢,你是不是来错食堂了?

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @ 自行车丢了:给主编推荐首歌 《クリスマスの夜》- 岡村孝子 手机党少年们想听歌,请使劲儿戳(这里) @烽火燎原 :国庆快来,我需要长假! ...

小小编辑
今天
1K
14

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部