Java线程间通信与信号量

原创
2017/12/05 20:48
阅读数 33

1. 信号量Semaphore

先说说Semaphore,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。一般用于<code>控制并发线程数,及线程间互斥</code>。另外重入锁 ReentrantLock 也可以实现该功能,但实现上要复杂些。 功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中 的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会。 单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

例子:

/**
 * @Description:
 * [@param](https://my.oschina.net/u/2303379) [@param](https://my.oschina.net/u/2303379) args
 * [@return](https://my.oschina.net/u/556800) void 返回类型
 */
public static void main(String[] args) {
	// 线程池
	ExecutorService exec = Executors.newCachedThreadPool();
	// 只能5个线程同时访问
	final Semaphore semp = new Semaphore(5);
	// 模拟20个客户端访问
	for (int index = 0; index < 20; index++) {
		final int NO = index;
		Runnable run = new Runnable() {
			public void run() {
				try {
					// 获取许可
					semp.acquire();
					System.out.println("获得Accessing: " + NO);
					Thread.sleep((long) (Math.random() * 10000));
					// 访问完后,释放
					semp.release();
					System.out.println("剩余可用信号-----------------"
							+ semp.availablePermits());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		};
		exec.execute(run);
	}
	// 退出线程池
	exec.shutdown();
}

输出结果(可以想想为什么会这样输出):

获得Accessing: 1
获得Accessing: 5
获得Accessing: 2
获得Accessing: 3
获得Accessing: 0
剩余可用信号-----------------1
获得Accessing: 4
剩余可用信号-----------------1
获得Accessing: 9
剩余可用信号-----------------1
获得Accessing: 8
剩余可用信号-----------------1
获得Accessing: 6
剩余可用信号-----------------1
获得Accessing: 10
剩余可用信号-----------------1
获得Accessing: 11
剩余可用信号-----------------1
获得Accessing: 12
剩余可用信号-----------------1
获得Accessing: 13
剩余可用信号-----------------1
获得Accessing: 7
剩余可用信号-----------------1
获得Accessing: 15
剩余可用信号-----------------1
获得Accessing: 16
剩余可用信号-----------------1
获得Accessing: 17
剩余可用信号-----------------1
获得Accessing: 14
剩余可用信号-----------------1
获得Accessing: 18
剩余可用信号-----------------1
获得Accessing: 19
剩余可用信号-----------------1
剩余可用信号-----------------2
剩余可用信号-----------------3
剩余可用信号-----------------4
剩余可用信号-----------------5

2. 使用PIPE作为线程间通信桥梁

Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。一进一出。先作为初步了解怎么使用。 值得注意的是该类在java.nio.channels下,说明该类属于nio方式的数据通信方式,那就使用Buffer来缓冲数据。

Pipe原理的图示: Pipe原理图

  • Pipe就是个空管子,这个空管子一头可以从管子里往外读,一头可以往管子里写
  • 操作流程:
  • 1.首先要有一个对象往这个空管子里面写。写到哪里呢?这个空管子是有一点空间的,就在这个管子里。 写的时候就是写到管子本身包含的这段空间里的。这段空间大小是1024个字节。
  • 2.然后另一个对象才能将这个装满了的管子里的内容读出来。

上代码

package com.jx.test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class testPipe {

	/**
	 * @Description:
	 * @param @param args
	 * @return void 返回类型
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {
		// 创建一个管道
		Pipe pipe = Pipe.open();
		final Pipe.SinkChannel psic = pipe.sink();// 要向管道写数据,需要访问sink通道
		final Pipe.SourceChannel psoc = pipe.source();// 从读取管道的数据,需要访问source通道

		Thread tPwriter = new Thread() {

			public void run() {
				try {
					System.out.println("send.....");
					// 创建一个线程,利用管道的写入口Pipe.SinkChannel类型的psic往管道里写入指定ByteBuffer的内容
					int res = psic.write(ByteBuffer
							.wrap("Hello,Pipe!测试通讯.....".getBytes("utf-16BE")));
					System.out.println("send size:" + res);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};

		Thread tPreader = new Thread() {
			public void run() {
				int bbufferSize = 1024 * 2;
				ByteBuffer bbuffer = ByteBuffer.allocate(bbufferSize);
				try {
					System.out.println("recive.....");
					// 创建一个线程,利用管道的读入口Pipe.SourceChannel类型的psoc将管道里内容读到指定的ByteBuffer中					
					int res = psoc.read(bbuffer);//数据未
					 System.out.println("recive size:"+res+" Content:" + ByteBufferToString(bbuffer));
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};

		tPwriter.start();
		tPreader.start();
	}

	/**
	 *ByteBuffer--> String的转换函数
	 */
	public static String ByteBufferToString(ByteBuffer content) {
		if (content == null || content.limit() <= 0
				|| (content.limit() == content.remaining())) {
			System.out.println("不存在或内容为空!");
			return null;
		}
		int contentSize = content.limit() - content.remaining();
		StringBuffer resultStr = new StringBuffer();
		for (int i = 0; i < contentSize; i += 2) {
			resultStr.append(content.getChar(i));
		}
		return resultStr.toString();
	}

}
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部