文档章节

exchange

这些年了1990
 这些年了1990
发布于 2016/04/18 18:09
字数 702
阅读 50
收藏 0
import java.util.concurrent.locks.*
    private V item;    
   
    private int arrivalCount;   
    private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
        lock.lock();        try {
            V other;            // If arrival count already at two, we must wait for
            // a previous pair to finish and reset the count;
            while (arrivalCount == 2) {                if (!timed)
                    taken.await();                else if (nanos > 0) 
                    nanos = taken.awaitNanos(nanos);                else 
                    throw new TimeoutException();
            }            int count = ++arrivalCount;            // If item is already waiting, replace it and signal other thread
            if (count == 2) { 
                other = item;
                item = x;
                taken.signal();                return other;
            }            // Otherwise, set item and wait for another thread to
            // replace it and signal us.

            item = x;
            InterruptedException interrupted = null;            try { 
                while (arrivalCount != 2) {                    if (!timed)
                        taken.await();                    else if (nanos > 0) 
                        nanos = taken.awaitNanos(nanos);                    else 
                        break; // timed out
                }
            } catch (InterruptedException ie) {
                interrupted = ie;
            }            // Get and reset item and count after the wait.
            // (We need to do this even if wait was aborted.)
            other = item;
            item = null;
            count = arrivalCount;
            arrivalCount = 0; 
            taken.signal();            
            // If the other thread replaced item, then we must
            // continue even if cancelled.
            if (count == 2) {                if (interrupted != null)
                    Thread.currentThread().interrupt();                return other;
            }            // If no one is waiting for us, we can back out
            if (interrupted != null) 
                throw interrupted;            else  // must be timeout
                throw new TimeoutException();
        } finally {
            lock.unlock();
        }
    }  
    public Exchanger() {
    }   
    public V exchange(V x) throws InterruptedException {        try {            return doExchange(x, false, 0);
        } catch (TimeoutException cannotHappen) { 
            throw new Error(cannotHappen);
        }
    }  
   
    public V exchange(V x, long timeout, TimeUnit unit) 
        throws InterruptedException, TimeoutException {        return doExchange(x, true, unit.toNanos(timeout));
    }

}
  • 以上为源码:

  • 此类提供对外的操作是同步的;

  • 用于成对出现的线程之间交换数据;

  • 可以视作双向的同步队列;

  • 可应用于基因算法、流水线设计等场景。

     从官方的javadoc可以知道,当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达——完成交换正常返回;或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常

一个简单的例子

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/**
 * @Title: ExchangerTest
 * @Description: Test class for Exchanger
 * @Company: CSAIR
 * @Author: lixuanbin
 * @Creation: 2014年12月14日
 * @Version:1.0
 */
public class ExchangerTest {
    protected static final Logger log = Logger.getLogger(ExchangerTest.class);
    private static volatile boolean isDone = false;

    static class ExchangerProducer implements Runnable {
        private Exchanger<Integer> exchanger;
        private static int data = 1;
        ExchangerProducer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (!Thread.interrupted() && !isDone) {
                for (int i = 1; i <= 3; i++) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        data = i;
                        System.out.println("producer before: " + data);
                        data = exchanger.exchange(data);
                        System.out.println("producer after: " + data);
                    } catch (InterruptedException e) {
                        log.error(e, e);
                    }
                }
                isDone = true;
            }
        }
    }

    static class ExchangerConsumer implements Runnable {
        private Exchanger<Integer> exchanger;
        private static int data = 0;
        ExchangerConsumer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (!Thread.interrupted() && !isDone) {
                data = 0;
                System.out.println("consumer before : " + data);
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = exchanger.exchange(data);
                } catch (InterruptedException e) {
                    log.error(e, e);
                }
                System.out.println("consumer after : " + data);
            }
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        ExchangerProducer producer = new ExchangerProducer(exchanger);
        ExchangerConsumer consumer = new ExchangerConsumer(exchanger);
        exec.execute(producer);
        exec.execute(consumer);
        exec.shutdown();
        try {
            exec.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error(e, e);
        }
    }
}


本文转载自:http://www.cnblogs.com/davidwang456/p/4179488.html

共有 人打赏支持
上一篇: scala yield
这些年了1990
粉丝 10
博文 51
码字总数 11621
作品 0
徐汇
程序员
私信 提问
Exchange 2007迁移Exchange 2010应该注意的13件事

1. Exchange 2007可以支持升级到Exchange 2010,但需要提前将Exchange 2007所有服务器环境升级至 SP2或以上版本。 2. Exchange 2007如果更新至SP2或以上版本,则建议按照以下顺序进行各角色的...

taotie_ksl
2018/06/26
0
0
exchange 2013 邮箱服务器主要服务功能概览

1.Microsoft Exchange Active Directory Topology/MSExchangeADTopology/ADTopologyService.exe 找到Active Directory域控制器和全局编录服务器,并向Exchange Server服务提供Active Direct......

烟台小崔
2018/08/24
0
0
Exchange 2016 系统要求

摘要:在安装 Exchange 2016 之前,您的环境所需要的内容。 安装 Exchange 2016 之前,建议您查看本主题中的内容,以确保您的网络、硬件、软件、客户端和其他元素满足 Exchange 2016 的要求。...

lianggj
2018/06/26
0
0
升级邮件系统后,如何删除企业里最后一台旧的Exchange 2000/2003

升级邮件系统后,如何删除企业里最后一台旧的Exchange 2000/2003 最近的项目,查到相关资料放在这里备忘 如何将最后一个旧版 Exchange Server 从组织中删除 已发表 2008年3月20日 19:35 作者 ...

技术小大人
2017/11/16
0
0
Exchange 2003 迁移至 Exchange 2010 完全攻略(一)

前言 现在Exchange Server 2003迁移到全新的Exchange Server 2010,是很多企业都关注的一件大事,由原先的架构能否顺利的迁移到全新的Exchange 2010架构是很多ITPro同仁所关注最多的。由于企...

日久不生情
2017/11/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

远程获得的有趣的linux命令

使用这些工具从远程了解天气、阅读资料等。 我们即将结束为期 24 天的 Linux 命令行玩具日历。希望你有一直在看,如果没有,请回到开始,从头看过来。你会发现 Linux 终端有很多游戏、消遣和...

Linux就该这么学
9分钟前
0
0
Apollo配置详细步骤(Windows环境)

一. 准备工作 1.下载 apollo 安装包 下载链接:http://activemq.apache.org/apollo/download.html 2.下载 java JDK 安装包 ( apollo 依赖 java 环境) 下载链接:http://www.oracle.com/techn......

morpheusWB
31分钟前
0
0
聊聊flink的AsyncWaitOperator

序 本文主要研究一下flink的AsyncWaitOperator AsyncWaitOperator flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ......

go4it
56分钟前
1
0
Java并发编程基础(四)

ThreadGroup 在主线程创建得线程,如果没有给他指定线程组,那么创建的线程,默认和主线程同一个线程组。线程组可以底下可以是线程,也可以实线程组。 构建线程组的方法: private ThreadGr...

chendom
今天
2
0
Scala学习(一)

学习Spark之前需要学习Scala。 参考学习的书籍:快学Scala

柠檬果过
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部