文档章节

Java可扩展IO

士别三日
 士别三日
发布于 06/13 14:41
字数 3270
阅读 17
收藏 0

1.Socket基础

本文的主题是利用java.nio实现可扩展IO。这里的IO主要是指基于TCP连接的网络IO。TCP协议是传输层协议,基于TCP协议的Socket是对它的实现(也有基于UDP的Socket)。Socket是操作系统层面的实现,Java API中的Socket只是封装了操作系统中的实现公开给Java开发者。TCP协议是基于数据流的,而Socket在操作系统中就是以文件的形式存在的,但是这文件是在内存中而不是在硬盘中,所以基于TCP的Socket本质上就是一个文件流。基于TCP协议的Socket分为两种,一种是监听Socket(在Java中就是ServerSocket),一种是已连接Socket(在Java中就是Socket)。

监听Socket要调用bind函数绑定IP和端口,接下来就可以调用listen函数进行监听。在内核中,为每个监听Socket维护两个队列。一个是已经建立了连接的队列,这时候连接三次握手已经完毕,处于established状态;一个是还没有完全建立连接的队列,这个时候三次握手还没完成,处于syn_rcvd的状态。最后监听Socket调用accept函数,从已建立连接的队列从取出一个,交给已连接Socket处理。

在服务端等待的时候,客户端可以通过connect函数发起连接。先在参数中指明要连接的IP地址和端口号,然后开始发起三次握手。内核会给客户端分配一个临时的端口。一旦握手成功,服务端的监听Socket就会调用accept函数返回另一个已连接Socket。下图是基于TCP协议的Socket函数调用过程:

2.PPC/TPC模式

对于一个web服务,通常处理结构都遵循下面的结构:

  • Read request 读取请求数据
  • Decode request 对请求数据解码(比如把HTTP请求文本转成Java对象)
  • Process service 处理逻辑 
  • Encode response 对响应编码(比如把Java对象转成HTTP响应文本)
  • Send response 发送请求

传统的模型是每来一个请求,服务器就开辟一个进程/线程单独处理,这就叫做PPC模式(Process Per Connection)和TPC模式(Thread Per Connection)。在Java中一般是用线程实现,所以下文都只讨论线程。下图是该模型的示意图:

该模式的优点是实现简单,不用担心并发带来的活跃性风险,因为所有任务都是并行处理,没有共享状态数据。该模式也不用考虑阻塞的问题。该模式对于单个请求来说处理最快。但是当请求量很大的时候,系统就会开启很多线程,最终会耗尽CPU资源。该模式下最多支持几百个并发请求。适合客户端不多的系统,比如企业内部系统、数据库和其他中间件。

因为实现比较简单,本文不对PPC/TPC模式做实现。

3.Reactor模式

对于并发量很大的情况下,就一定要开始考虑线程复用了,用一个线程处理多个任务。但是read操作是阻塞的——必须要等到客户端发送了数据过来,有数据可读的时候,才能开始read操作。当请求和线程一对一的情况下(PPC/TPC模式),这没有问题。但是当一个线程要处理多个请求的时候,第二个请求要等到第一个请求完全处理完成以后才能被接收处理,请求量大的情况这显然是不能接受的。理想的目标是,当请求数据一过来就被接收线程接收,然后交给处理线程处理,而不能被其它请求阻塞。要达到这种目标,就要采用IO多路复用。

什么是IO多路复用呢?说通俗一点,其实就是多个Socket的状态集中在一起被监控。其中一种办法就是用一个线程把多个Socket监控起来,轮询它们的状态,当有数据可读时,就把Socket交给处理线程处理,但是这种方式很消耗CPU,不可取。使用java.io就只能用这种办法,老版的Tomcat就是这么实现的。

IO多路复用的另一种办法就是利用阻塞对象,多个Socket共用一个阻塞对象。监控线程只需要在阻塞对象上等待,当某个Socket有新的状态时,操作系统就会通知监控线程从阻塞状态返回,把这个Socket交给接收或处理线程处理。实现阻塞对象的方法有select、epoll、kqueue等方式。java.nio采用的就是这种办法。这种办法有一种专门的学名,叫做Reactor模式。

Reactor模式最重要的三个核心组件是Reactor、接收器和处理器。Reactor负责监听和分配事件,接收器负责接收事件,处理器负责处理事件。Reactor可以只有一个,也可以有多个。一般会有一个线程池处理处理器包含的任务。

3.1 用java.nio实现单Reactor模式

单Reactor的架构示意图如下:

下面是单Reactor实现:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 单Reactor模型
 * @author xiaoyilin
 *
 */
public class SingleReactor implements Runnable {
	
	private final Selector selector;
	private final ServerSocketChannel serverSocket;
	private ExecutorService pool = Executors.newFixedThreadPool(10);
	
	public SingleReactor(int port) throws IOException {
		selector = Selector.open();
		serverSocket = ServerSocketChannel.open();
		serverSocket.socket().bind(new InetSocketAddress(port));
		serverSocket.configureBlocking(false);
		serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector,serverSocket));
	}

	
	public void run() {
		try {
			while (!Thread.interrupted()) {
				
				selector.select();
				Set<SelectionKey> selected = selector.selectedKeys();
				
				for(SelectionKey key:selected) {
					dispatch(key);
				}
				selected.clear();
				
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * dispatch,分配接收器任务和处理器任务
	 * @param key
	 */
	private void dispatch(SelectionKey key) {
		Runnable r = (Runnable) key.attachment();
		if(r!=null) {
			if(r instanceof Acceptor)  {
				r.run();
			} else if(r instanceof Handler){

				if(key.interestOps()==SelectionKey.OP_WRITE) {
					key.cancel();
				} else if(key.interestOps()==SelectionKey.OP_READ) {
					key.interestOps(SelectionKey.OP_WRITE);
				}
				pool.submit(r);

			}
		}
	}
	
	

}

/**
 * 接收器,负责接收消息,接收到后,生成一个处理器
 * @author xiaoyilin
 *
 */
class Acceptor implements Runnable {
	
	private final Selector selector;
	private final ServerSocketChannel serverSocket;
	
	public Acceptor(Selector selector,ServerSocketChannel serverSocket) {
		this.selector = selector;
		this.serverSocket = serverSocket;
	}

	
	public void run() {
		try {
			SocketChannel socket = serverSocket.accept();
			if(socket!=null) {
				Handler handler = new Handler(socket);
				socket.register(selector, SelectionKey.OP_READ, handler);
				// 让还没返回结果的select()或select(long)方法,马上返回结果
				selector.wakeup();
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}
	
}

/**
 * 处理器,执行 (读入-处理(解码-执行业务逻辑-编码)-写出) 操作
 * @author xiaoyilin
 *
 */
final class Handler implements Runnable {
	private final SocketChannel socket;
	private ByteBuffer input = ByteBuffer.allocate(10000);
	private ByteBuffer output = ByteBuffer.allocate(10000);
	private static final int READING = 0;
	private static final int SENDING = 1;
	private int state = READING;
	
	public Handler(SocketChannel socket) throws IOException {
		this.socket = socket;
		socket.configureBlocking(false);
	}
	
	public synchronized void run() {
		try {
			if(state == READING) {
				this.read();
			} else if(state == SENDING) {
				this.send();
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}
	
	/**
	 * read 读入数据
	 * @throws IOException
	 */
	private void read() {
		
		try {
			input.clear();
			socket.read(input);
			
			process();//耗时可能比较长
				
		    state = SENDING;

		} catch (IOException e) {
			System.out.println("close a socket");
			try {
				socket.close();
			} catch (IOException e1) {
				System.out.println("close a socket fail");
			}
		} 
		
	}
	
	/**
	 * send 把结果写出
	 * @throws IOException
	 */
	private void send() throws IOException {
		
		if(socket.isOpen()) {
			while(output.hasRemaining()) {
				socket.write(output);
			}
			
			socket.close();
		}
		
	}
	
	/**
	 * process 处理数据,这里把读到的数据放到输出缓存
	 * @throws IOException 
	 */
	private void process() throws IOException {
		
		output = ByteBuffer.wrap(input.array());
		
		System.out.println(new String(input.array()));
	}
	
}

java.nio中的阻塞对象就是Selector。Selector内部有三中key的集合:key,selected-key,cancelled-key。这些集合不是线程安全的,所以Selector内部对它们进行了同步。所以对Selector的任何操作都应该在一个线程内处理好,这是编码的时候首先要注意的地方。

再来谈谈key。SelectionKey本质上是记录Socket的状态。拿上面的例子说,key感兴趣的事件先标记为read,等到处理完数据后再改为write。其实就是告诉Selector某个Socket的状态变更了,在条件成熟的情况下,可以执行下一步了。

然后看看处理器。处理器包含了多个操作,每个操作都由不同线程处理并且这些线程共用状态数据,所以要做好同步工作。

用下面这段测试代码,让SingleReactor运行起来:

public static void main(String[] args) throws IOException {
	new Thread(new SingleReactor(8888)).start();
}

然后在浏览器地址栏上输入:http://127.0.0.1:8888/test 。打印出如下结果:

GET /test HTTP/1.1
Host: 127.0.0.1:8888
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9

你会发现这其实就是HTTP请求文本。如果你在process方法里面把HTTP请求文本解析成Servlet相关对象,交给Servlet处理,得到结果后给客户端编码成HTTP响应文本,其实你就写成了一个简易的HTTP服务器!

3.2 用java.nio实现多Reactor模式

我们再来研究研究多Reactor实现,下图是多Reactor的架构示意图:

下面是多Reactor的实现(和上图有一定的出入),让一个Reactor专门监听和分配接收事件,让多个Reactor监听和分配处理事件。因为对阻塞对象Selector的操作必须在单线程内完成,如果多几个Reactor,就可以提高系统的监控和分发效率。如果监控和分发存在性能瓶颈,就要考虑使用多Reactor模式。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 多Reactor模型
 * @author xiaoyilin
 *
 */
public class MutiReactor implements Runnable {
	
	private final Selector selector;
	private final ServerSocketChannel serverSocket;
	
	public MutiReactor(int port) throws IOException {
		selector = Selector.open();
		serverSocket = ServerSocketChannel.open();
		serverSocket.socket().bind(new InetSocketAddress(port));
		serverSocket.configureBlocking(false);
		serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocket));
	}

	
	public void run() {
		try {
			while (!Thread.interrupted()) {
				
				selector.select();
				Set<SelectionKey> selected = selector.selectedKeys();
				
				for(SelectionKey key:selected) {
					dispatch(key);
				}
				selected.clear();
				
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * dispatch,分配接收器任务和处理器任务
	 * @param key
	 */
	private void dispatch(SelectionKey key) {
		Runnable r = (Runnable) key.attachment();
		if(r!=null) {
			r.run();
		}
	}
	
	

}

/**
 * 接收器,负责接收消息,接收到后,生成一个处理器
 * @author xiaoyilin
 *
 */
class Acceptor implements Runnable {
	
	private final int size = 10;
	private final ServerSocketChannel serverSocket;
	private final SubReactor[] subReactors = new SubReactor[size];
	
	private int next = 0;
	
	public Acceptor(ServerSocketChannel serverSocket) throws IOException {
		this.serverSocket = serverSocket;
		for(int i=0;i<size;i++) {
			subReactors[i] = new SubReactor();
			new Thread(subReactors[i]).start();
		}
	}

	
	public void run() {
		try {
			SocketChannel socket = serverSocket.accept();
			if(socket!=null) {
				subReactors[next].addSocket(socket);
				if(next==9) {
					next = 0;
				} else {
					next++;
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}
	
}

class SubReactor implements Runnable {
	
	private final Selector selector;
	private Set<SocketChannel> sockets = new HashSet<SocketChannel>();
	private ExecutorService pool = Executors.newFixedThreadPool(10);
	
	public SubReactor() throws IOException {
		selector = Selector.open();
	}
	
	public synchronized void addSocket(SocketChannel socket) {
		sockets.add(socket);
		// 让还没返回结果的select()或select(long)方法,马上返回结果
	    selector.wakeup();
	}

	public void run() {
		try {
			while (!Thread.interrupted()) {
				
				this.handleSocket();
				
				selector.select();
				Set<SelectionKey> selected = selector.selectedKeys();
				
				for(SelectionKey key:selected) {
					dispatch(key);
				}
				selected.clear();
				
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * dispatch,分配接收器任务和处理器任务
	 * @param key
	 */
	private void dispatch(SelectionKey key) {
		Handler h = (Handler) key.attachment();
		if(h!=null) {
			if(key.interestOps()==SelectionKey.OP_WRITE) {
				key.cancel();
			} else if(key.interestOps()==SelectionKey.OP_READ) {
				key.interestOps(SelectionKey.OP_WRITE);
			}
			pool.submit(h);
		}
	}
	
	private synchronized void handleSocket() throws IOException {
		
		for(SocketChannel socket:sockets) {
			Handler handler = new Handler(socket);
			socket.register(selector, SelectionKey.OP_READ, handler);
		}
			
		sockets.clear();	
	}
	
}

/**
 * 处理器,执行 (读入-处理(解码-执行业务逻辑-编码)-写出) 操作
 * @author xiaoyilin
 *
 */
final class Handler implements Runnable {
	private final SocketChannel socket;
	private ByteBuffer input = ByteBuffer.allocate(10000);
	private ByteBuffer output = ByteBuffer.allocate(10000);
	private static final int READING = 0;
	private static final int SENDING = 1;
	private int state = READING;
	
	public Handler(SocketChannel socket) throws IOException {
		this.socket = socket;
		socket.configureBlocking(false);
	}
	
	public synchronized void run() {
		try {
			if(state == READING) {
				this.read();
			} else if(state == SENDING) {
				this.send();
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}
	
	/**
	 * read 读入数据
	 * @throws IOException
	 */
	private void read() {
		
		try {
			input.clear();
			socket.read(input);
			
			process();//耗时可能比较长
				
		    state = SENDING;

		} catch (IOException e) {
			System.out.println("close a socket");
			try {
				socket.close();
			} catch (IOException e1) {
				System.out.println("close a socket fail");
			}
		} 
		
	}
	
	/**
	 * send 把结果写出
	 * @throws IOException
	 */
	private void send() throws IOException {
		
		if(socket.isOpen()) {
			while(output.hasRemaining()) {
				socket.write(output);
			}
			
			socket.close();
		}
		
	}
	
	/**
	 * process 处理数据,这里把读到的数据放到输出缓存
	 * @throws IOException 
	 */
	private void process() throws IOException {
		
		output = ByteBuffer.wrap(input.array());
		
		System.out.println(new String(input.array()));
	}
	
}

Reactor的实现可以有很多变种,不同的语言有不同实现方式,相同语言也可以利用它们的特性进行很多优化,上面两个实现只是两个简单的Demo。java.nio偏重于基础性API,和终端应用需求之间有鸿沟。直接使用java.nio设计IO系统的时候,一定要充分理解IO系统的设计方法(比如选什么模型等等),以及java.nio对它的支持方式。

4.衡量系统性能的指标

  • 响应时间(RT)
  • 并发数(Concurrency)
  • 吞吐量(TPS) = 并发数/平均响应时间

在并发量没有超出系统承受范围以内的情况下,对于单个请求,PPC/TPC模式的响应时间理论上是最短,其次是单Reactor模式,最后才是多Reactor模式。但是对于系统能承受的最大并发数,排名就要倒过来。所以选择什么样的方案,完全取决于系统的具体需求,如果并发数在几百个以内,PPC/TPC模式的吞吐量反而比Reactor模式的吞吐量大。

5.Java可扩展IO

Java的 IO API经历了 IO,NIO,NIO.2三个阶段的发展,对可扩展IO系统的支持越来越好,本文不做展开,反正这些API都是一些基础性API,直接使用作用都不大。只有结合并发系统设计、IO多路复用等思想,才能设计出可扩展的IO系统。IO系统开发者可以根据不同的需求,选择合适的模型,然后用这些基础API去实现。这就是应用开发者很少有机会用到 IO API的原因。

© 著作权归作者所有

共有 人打赏支持
下一篇: Java线程
士别三日

士别三日

粉丝 38
博文 30
码字总数 43081
作品 0
深圳
程序员
私信 提问
Java FileInputStream

一、序言 IO操作,才程序中比较普遍,JAVA 中提出了IO/NIO 的概念,也一直在说NIO 比IO快,一直不知道原因,就想memcache 和ehcache 比较优劣一样,这些东西得自己看看如何实现的,才 知道区...

pczhangtl
2014/08/03
0
0
DirectByteBuffer更快吗?

ByteBuffer.allocateDirect vs ByteBuffer.allocate 操作系统的IO机制 操作系统在内存区域上执行IO操作,这些内存区域是连续的字节。毫无疑问只有字节缓冲区才有资格参与IO操作的。同样操作系...

智深
2012/12/04
0
0
分别使用Java IO、NIO、Netty实现的一个Echo Server示例

分别使用Java IO、Java NIO、Netty来实现一个简单的EchoServer(即原样返回客户端的输入信息)。 Java IO int port = 9000;ServerSocket ss = new ServerSocket(port);while (true) {final S...

zgw06629
2015/05/24
0
0
java NIO:IO与NIO的区别

一、概念 NIO即New IO,这个库是在JDK1.4中才引入的。NIO和IO有相同的作用和目的,但实现方式不同,NIO主要用到的是块,所以NIO的效率要比IO高很多。在Java API中提供了两套NIO,一套是针对标...

盼望明天
09/11
0
0
有效选择七个关于Java的JSON开源类库

April 4, 2014 By Constantin Marian Alin 翻译:无若 (英语原文:http://www.developer.com/lang/jscript/top-7-open-source-json-binding-providers-available-today.html) 简介 JSON是J......

无若
2014/04/19
0
1

没有更多内容

加载失败,请刷新页面

加载更多

Spring应用学习——AOP

1. AOP 1. AOP:即面向切面编程,采用横向抽取机制,取代了传统的继承体系的重复代码问题,如下图所示,性能监控、日志记录等代码围绕业务逻辑代码,而这部分代码是一个高度重复的代码,也就...

江左煤郎
今天
4
0
eclipse的版本

Eclipse各版本代号一览表 Eclipse的设计思想是:一切皆插件。Eclipse核心很小,其它所有功能都以插件的形式附加于Eclipse核心之上。 Eclipse基本内核包括:图形API(SWT/Jface),Java开发环...

mdoo
今天
3
0
SpringBoot源码:启动过程分析(一)

本文主要分析 SpringBoot 的启动过程。 SpringBoot的版本为:2.1.0 release,最新版本。 一.时序图 还是老套路,先把分析过程的时序图摆出来:时序图-SpringBoot2.10启动分析 二.源码分析 首...

Jacktanger
今天
4
0
小白带你认识netty(二)之netty服务端启动(上)

上一章 中的标准netty启动代码中,ServerBootstrap到底是如何启动的呢?这一章我们来瞅下。 server.group(bossGroup, workGroup);server.channel(NioServerSocketChannel.class).optio...

天空小小
今天
3
0
聊聊storm trident batch的分流与聚合

序 本文主要研究一下storm trident batch的分流与聚合 实例 TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout) .p......

go4it
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部