并发编程基础五--线程通信
并发编程基础五--线程通信
Zhao-Qian 发表于5年前
并发编程基础五--线程通信
  • 发表于 5年前
  • 阅读 250
  • 收藏 2
  • 点赞 0
  • 评论 1

腾讯云 学生专属云服务套餐 10元起购>>>   

当线程在系统内运行时,线程的调度具有一定的透明性,程序通常无法准确控制线程的轮换执行,但我们可以通过一些机制来包装线程的协调运行.

1.传统的线程通信

Object类有三个特殊的方法,分别是 wait(),notify(),notifyAll()这3个方法,他们不属于Thread,但和Thread却是息息相关.他们3个必须是由同步监视器对象来调用.

  1. 对于使用synchronized修饰的同步方法,因为该类的默认实例(this)就是同步监视器,所以可以在同步方法总直接调用3个方法.
  2. 对于使用synchronized修饰的同步代码块,同步监视器是synchronized后括号里的对象,就要用这个对象来调用.
  • wait():导致当前线程等待,直接其他线程用notify或者notifyAll唤醒.wait有三种1.无时间的,会一直等待.2.两种带时间的.
  • notify():唤醒在此同步监视器上等待的某个单个线程.选择是随意性的.
  • notifyAll():唤醒所有在此同步监视器上等待的线程.

看示例:

package org.credo.thread.control.synchronizedTest;

/**
 * 本代码是Account类提供存钱,取钱2个方法,因为这2个方法可能需要并发修改Account类的balance字段.
 * 所以都是用了synchronized来修饰方法.还是用了wait和notifyall来控制线程的协作.
 */
public class Account {
	private String accountNo;
	private double balance;

	// 没人存钱就为false,能save. 有就是true.
	private boolean haveMoney = false;

	public Account() {

	}

	public Account(String accountNo, double balance) {
		this.accountNo = accountNo;
		this.balance = balance;
	}

	public synchronized void saveMoney(double saveMoney) {
		try {
			if (!haveMoney) {
				// 执行存钱操作
				System.out.println(Thread.currentThread().getName() + "存钱:" + saveMoney);
				balance = +saveMoney;
				System.out.println("账户余额为:" + balance);
				haveMoney = true;
				// 唤醒其他线程
				this.notifyAll();
			} else {
				// 当前线程等待
				this.wait();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public synchronized void getMoney(double getMoney, int i) {
		try {
			System.out.println("i" + i);
			if (haveMoney) {
				// 执行取钱操作
				System.out.println(Thread.currentThread().getName() + "取钱:" + getMoney);
				balance = -getMoney;
				System.out.println("账户余额为:" + balance);
				haveMoney = false;
			} else {
				this.wait();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public String getAccountNo() {
		return accountNo;
	}

	public void setAccountNo(String accountNo) {
		this.accountNo = accountNo;
	}

	public boolean isHaveMoney() {
		return haveMoney;
	}

	public void setHaveMoney(boolean haveMoney) {
		this.haveMoney = haveMoney;
	}

	public double getBalance() {
		return balance;
	}

	@Override
	public int hashCode() {
		return this.accountNo.hashCode();
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj) {
			return true;
		}
		if (obj != null && obj.getClass() == Account.class) {
			Account acc = (Account) obj;
			return acc.getAccountNo().equals(accountNo);
		}
		return false;
	}
}
============================
package org.credo.thread.control.synchronizedTest;

/**
 * 取钱的线程操作类.
 */
public class GetMoneyThread extends Thread{
	private Account account;
	private double getMoney;
	
	public GetMoneyThread(String name,Account account,double getmoney){
		super.setName(name);
		this.account=account;
		this.getMoney=getmoney;
	}
	
	@Override
	public void run() {
		for(int i=0;i<15;i++){
			account.getMoney(getMoney,i);
		}
	}
	
	public Account getAccount() {
		return account;
	}
	public void setAccount(Account account) {
		this.account = account;
	}
	public double getGetMoney() {
		return getMoney;
	}
	public void setGetMoney(double getMoney) {
		this.getMoney = getMoney;
	}
}
=============================================
package org.credo.thread.control.synchronizedTest;

/**
 *存钱的线程操作.
 */
public class SaveMoneyThread implements Runnable{

	@Override
	public void run() {
		for(int i=0;i<15;i++){
			account.saveMoney(saveMoney);
		}
	}

	private Account account;
	private double saveMoney;
	
	public SaveMoneyThread(Account account,double saveMoney){
		this.account=account;
		this.saveMoney=saveMoney;
	}

	public Account getAccount() {
		return account;
	}
	public void setAccount(Account account) {
		this.account = account;
	}
	
	public double getSaveMoney() {
		return saveMoney;
	}
	
	public void setSaveMoney(double saveMoney) {
		this.saveMoney = saveMoney;
	}
}
========================
package org.credo.thread.control.synchronizedTest;

public class MainTest {

	public static void main(String[] args) {
		Account acc=new Account("zhaoqian", 0);
		new GetMoneyThread("取钱人", acc, 800).start();
		
		new Thread(new SaveMoneyThread(acc, 800), "存钱人1").start();
		new Thread(new SaveMoneyThread(acc, 800), "存钱人2").start();
		new Thread(new SaveMoneyThread(acc, 800), "存钱人3").start();
	}
}

//主程序可以启动任意多个存取线程.可以试验看出所有的取钱线程必须等存款线程存钱后才能继续执行.
//存钱的线程也必须等取出后才能继续执行.

运行上面代码后可以看到存取线程的交替执行.最后发现程序阻塞了,但不是死锁.

2.使用Condition控制线程通信

如果程序不使用synchronized关键字来保证同步,而直接用lock.那么系统中不存在隐式的同步监视器,就不能使用wait,notify,notifyAll方法进行线程通信了.

但使用Lock对象来保证同步时,Java提供了一个Condition类来保持协调,使用Condition可以让那些已经得到Lock对象但无法继续执行的线程释放Lock对象,Condition对象也可以唤醒其他等待的线程.

这情况下,Lock替代了同步方法或同步代码块,Condition替代了同步监视器的功能.

Condition实例绑定在Lock对象上,要获得需要调用Lock对象的newCondition方法.有3个方法:

  • await():类似于隐式同步监视器上的wait方法.该await方法有更多的重载方法.查阅API可知.
  • signal():任意唤醒一个Lock对象上等待的单个线程.只有当前线程放弃对该Lock对象的锁定后(使用await()方法),才可以自行被唤醒的线程.
  • signal():不解释.

以下的代码和上面的4个class文件一直,除了下面的Account类.代码如下:

package org.credo.thread.Condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 本代码是Account类提供存钱,取钱2个方法,因为这2个方法可能需要并发修改Account类的balance字段.
 * 所以都是用了lock来充当同步监视器.需要用Condition对象来暂停唤醒指定线程.
 */
public class Account {
	private String accountNo;
	private double balance;

	// 没人存钱就为false,能save. 有就是true.
	private boolean haveMoney = false;

	public Account() {

	}

	public Account(String accountNo, double balance) {
		this.accountNo = accountNo;
		this.balance = balance;
	}

	//显示定义lock对象
	private final Lock lock=new ReentrantLock();
	//获取指定Lock对象对应的Condition
	private final Condition condition=lock.newCondition();
	
	public void saveMoney(double saveMoney) {
		lock.lock();
		try {
			if (!haveMoney) {
				// 执行存钱操作
				System.out.println(Thread.currentThread().getName() + "存钱:" + saveMoney);
				balance = +saveMoney;
				System.out.println("账户余额为:" + balance);
				haveMoney = true;
				// 唤醒其他线程
				condition.signalAll();
			} else {
				// 当前线程等待
				condition.await();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}

	public void getMoney(double getMoney, int i) {
		lock.lock();
		try {
			System.out.println("i" + i);
			if (haveMoney) {
				// 执行取钱操作
				System.out.println(Thread.currentThread().getName() + "取钱:" + getMoney);
				balance = -getMoney;
				System.out.println("账户余额为:" + balance);
				haveMoney = false;
				// 唤醒其他线程
				condition.signalAll();
			} else {
				// 当前线程等待
				condition.await();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}

	public String getAccountNo() {
		return accountNo;
	}

	public void setAccountNo(String accountNo) {
		this.accountNo = accountNo;
	}

	public boolean isHaveMoney() {
		return haveMoney;
	}

	public void setHaveMoney(boolean haveMoney) {
		this.haveMoney = haveMoney;
	}

	public double getBalance() {
		return balance;
	}

	@Override
	public int hashCode() {
		return this.accountNo.hashCode();
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj) {
			return true;
		}
		if (obj != null && obj.getClass() == Account.class) {
			Account acc = (Account) obj;
			return acc.getAccountNo().equals(accountNo);
		}
		return false;
	}
}

结果和上面传统的那个一样.

3.使用阻塞队列(BlockingQueue)控制线程通信

小例子:

package org.credo.thread.BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class SmallTest {

	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<String> bq=new ArrayBlockingQueue<>(2);
		System.out.println("1");
		bq.put("java");
		System.out.println("2");
		bq.put("C++");
		System.out.println("3");
		bq.put("C#");
		System.out.println("4");
	}

}

BlockingQueue提供2个阻塞的方法:

  • 1.put(E e):把E放入到BlockingQueue队尾中,如果队列元素已满,就阻塞该线程.
  • 2.take():从头部取出元素,如果队列为空,就阻塞该线程.

BlockingQueue继承了Queue接口,自然可以使用Queue的方法.分为3组:

  • 在队列尾部插入元素.包括add(E e),offer(E e),put(E e),队列满的时候,分别是抛异常,返回false,阻塞队列.
  • 在队列头部删除并返回删除的元素,包括remove(),poll().take(),队列已空的时候,分别是抛异常,返回false,阻塞队列.
  • 在队列头部取出但不删除元素,包括element(),peek方法.队列已空的时候,分别是抛异常,返回false,

BlockingQueue包含5个实现类(javaSE 7):

  • ArrayBlockingQueue:基于数组实现的BlockingQueue.
  • LinkedBlockingQueue:基于链表实现的BlockingQueue.
  • PriorityBlockingQueue:查API去.不是标准的注射队列.取元素的时候不是取出在队列中存在时间最长的,而是队列中最小的元素.
  • SynchronousQueue:同步队列,对该队列的存取操作必须交替进行.
  • DelayQueue:特殊的BlockingQueue,底层基于PriorityBlockingQueue实现.具体API去.

BigTest:

package org.credo.thread.BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BigTest {

	public static void main(String[] args) {
		//创建一个容量为1的BlockingQueue
		BlockingQueue<String> bq=new ArrayBlockingQueue<String>(1);
		//启动三个生产者线程
		new Producer(bq).start();
		new Producer(bq).start();
		new Producer(bq).start();
		//启动一个消费者线程
		new Consumer(bq).start();
	}
}
//内部类生产者
class Producer extends Thread{
	private BlockingQueue<String> bq;
	public Producer(BlockingQueue<String> bq){
		this.bq=bq;
	}
	public void run(){
		String[] strArr=new String[]{"Java","C++","C#"};
		for(int i=0;i<9999;i++){
			System.out.println(getName()+"生产");
			try {
				Thread.sleep(200);
				//尝试放入元素,如果队列已经满了,则线程堵塞
				bq.put(strArr[i%3]);
			} catch (Exception e) {
				e.printStackTrace();
			}
			System.out.println(getName()+"生产完成:"+bq);
		}
	}
}
//内部类消费者
class Consumer extends Thread{
	private BlockingQueue<String> bq;
	public Consumer(BlockingQueue<String> bq){
		this.bq=bq;
	}
	public void run(){
		while(true){
			System.out.println(getName()+"消费!");
			try {
				Thread.sleep(200);
				bq.take();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(getName()+"消费完成:"+bq);
		}
	}
}

上面启动3个生产的线程往BlockingQueue放元素,启动一个去取.因为BlockingQueue为1,所以无法连续放入.必须等取出一个,3个生产者之一才能放进去一个.可以运行看看.



共有 人打赏支持
粉丝 309
博文 156
码字总数 238058
评论 (1)
×
Zhao-Qian
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: