文档章节

exchange

这些年了1990
 这些年了1990
发布于 2016/04/18 18:09
字数 702
阅读 50
收藏 0
import java.util.concurrent.locks.*
    private V item;    
   
    private int arrivalCount;   
    private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
        lock.lock();        try {
            V other;            // If arrival count already at two, we must wait for
            // a previous pair to finish and reset the count;
            while (arrivalCount == 2) {                if (!timed)
                    taken.await();                else if (nanos > 0) 
                    nanos = taken.awaitNanos(nanos);                else 
                    throw new TimeoutException();
            }            int count = ++arrivalCount;            // If item is already waiting, replace it and signal other thread
            if (count == 2) { 
                other = item;
                item = x;
                taken.signal();                return other;
            }            // Otherwise, set item and wait for another thread to
            // replace it and signal us.

            item = x;
            InterruptedException interrupted = null;            try { 
                while (arrivalCount != 2) {                    if (!timed)
                        taken.await();                    else if (nanos > 0) 
                        nanos = taken.awaitNanos(nanos);                    else 
                        break; // timed out
                }
            } catch (InterruptedException ie) {
                interrupted = ie;
            }            // Get and reset item and count after the wait.
            // (We need to do this even if wait was aborted.)
            other = item;
            item = null;
            count = arrivalCount;
            arrivalCount = 0; 
            taken.signal();            
            // If the other thread replaced item, then we must
            // continue even if cancelled.
            if (count == 2) {                if (interrupted != null)
                    Thread.currentThread().interrupt();                return other;
            }            // If no one is waiting for us, we can back out
            if (interrupted != null) 
                throw interrupted;            else  // must be timeout
                throw new TimeoutException();
        } finally {
            lock.unlock();
        }
    }  
    public Exchanger() {
    }   
    public V exchange(V x) throws InterruptedException {        try {            return doExchange(x, false, 0);
        } catch (TimeoutException cannotHappen) { 
            throw new Error(cannotHappen);
        }
    }  
   
    public V exchange(V x, long timeout, TimeUnit unit) 
        throws InterruptedException, TimeoutException {        return doExchange(x, true, unit.toNanos(timeout));
    }

}
  • 以上为源码:

  • 此类提供对外的操作是同步的;

  • 用于成对出现的线程之间交换数据;

  • 可以视作双向的同步队列;

  • 可应用于基因算法、流水线设计等场景。

     从官方的javadoc可以知道,当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达——完成交换正常返回;或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常

一个简单的例子

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/**
 * @Title: ExchangerTest
 * @Description: Test class for Exchanger
 * @Company: CSAIR
 * @Author: lixuanbin
 * @Creation: 2014年12月14日
 * @Version:1.0
 */
public class ExchangerTest {
    protected static final Logger log = Logger.getLogger(ExchangerTest.class);
    private static volatile boolean isDone = false;

    static class ExchangerProducer implements Runnable {
        private Exchanger<Integer> exchanger;
        private static int data = 1;
        ExchangerProducer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (!Thread.interrupted() && !isDone) {
                for (int i = 1; i <= 3; i++) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        data = i;
                        System.out.println("producer before: " + data);
                        data = exchanger.exchange(data);
                        System.out.println("producer after: " + data);
                    } catch (InterruptedException e) {
                        log.error(e, e);
                    }
                }
                isDone = true;
            }
        }
    }

    static class ExchangerConsumer implements Runnable {
        private Exchanger<Integer> exchanger;
        private static int data = 0;
        ExchangerConsumer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (!Thread.interrupted() && !isDone) {
                data = 0;
                System.out.println("consumer before : " + data);
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = exchanger.exchange(data);
                } catch (InterruptedException e) {
                    log.error(e, e);
                }
                System.out.println("consumer after : " + data);
            }
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        ExchangerProducer producer = new ExchangerProducer(exchanger);
        ExchangerConsumer consumer = new ExchangerConsumer(exchanger);
        exec.execute(producer);
        exec.execute(consumer);
        exec.shutdown();
        try {
            exec.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error(e, e);
        }
    }
}


本文转载自:http://www.cnblogs.com/davidwang456/p/4179488.html

共有 人打赏支持
上一篇: scala yield
这些年了1990
粉丝 10
博文 51
码字总数 11621
作品 0
徐汇
程序员
私信 提问
Exchange 2007迁移Exchange 2010应该注意的13件事

1. Exchange 2007可以支持升级到Exchange 2010,但需要提前将Exchange 2007所有服务器环境升级至 SP2或以上版本。 2. Exchange 2007如果更新至SP2或以上版本,则建议按照以下顺序进行各角色的...

taotie_ksl
06/26
0
0
exchange 2013 邮箱服务器主要服务功能概览

1.Microsoft Exchange Active Directory Topology/MSExchangeADTopology/ADTopologyService.exe 找到Active Directory域控制器和全局编录服务器,并向Exchange Server服务提供Active Direct......

烟台小崔
08/24
0
0
Exchange 2016 系统要求

摘要:在安装 Exchange 2016 之前,您的环境所需要的内容。 安装 Exchange 2016 之前,建议您查看本主题中的内容,以确保您的网络、硬件、软件、客户端和其他元素满足 Exchange 2016 的要求。...

lianggj
06/26
0
0
升级邮件系统后,如何删除企业里最后一台旧的Exchange 2000/2003

升级邮件系统后,如何删除企业里最后一台旧的Exchange 2000/2003 最近的项目,查到相关资料放在这里备忘 如何将最后一个旧版 Exchange Server 从组织中删除 已发表 2008年3月20日 19:35 作者 ...

技术小大人
2017/11/16
0
0
Exchange 2010 SP3正式发布提供下载

在去年年末,微软发布了全平台的2013版服务器系统,包括Lync、Exchange、SharePoint、Office Web Apps、Project Server以及客户端套件Office 2013。但是我们在评估Exchange 2013的时候会发现...

技术小胖子
2017/11/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

通过Docker容器连接代理Wormhole

Wormhole 是一个能识别命名空间的由 Socket 激活的隧道代理。可以让你安全的连接在不同物理机器上的 Docker 容器。可以用来完成一些有趣的功能,例如连接运行在容器本机的服务或者在连接后创...

Linux就该这么学
18分钟前
1
0
从架构到平台, POWER 9处理器最全解读

本文根据IBM中国芯片设计部门高级经理尹文,在「智东西公开课」的超级公开课IBM专场《POWER 9-认知时代的驱动力》 上的系统讲解整理而来。 本次讲解中,尹文老师从内核微架构、总线互连、异构...

Mr_zebra
21分钟前
1
0
openjdk和oraclejdk有什么区别吗?

1.授权协议的不同:OpenJDK采用GPL V2协议放出,而SUN JDK则采用JRL放出。两者协议虽然都是开放源代码的,但是在使用上的不同在于GPL V2允许在商业上使用,而JRL只允许个人研究使用。 2.Open...

吴伟祥
22分钟前
2
0
c++基类析构函数要声明为virtual的原因

更深层的原因不知道,不过标准规定,如果不声明为virtual,那么将会导致未定义行为。个人测试结果表明,如果不声明为virtual,那么派生类的析构函数将不会得到调用

安非他命
28分钟前
1
0
CentOS 7下protobuf的源码编译安装

protobuf的github地址:https://github.com/google/protobuf支持多种语言,有多个语言的版本,本文采用的是在CentOS 7下编译源码进行安装。 github上有详细的安装说明:https://github.com/...

xtof
35分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部