exchange
exchange
这些年了1990 发表于2年前
exchange
  • 发表于 2年前
  • 阅读 46
  • 收藏 0
  • 点赞 1
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

摘要: Exchanger提供了 一个同步点 , 在这个同步点,两个线程可以交换数据 。每个线程通过exchange()方法的入口提供数据给另外的线程,并接收其它线程提供的数据,并返回。
import java.util.concurrent.locks.*
    private V item;    
   
    private int arrivalCount;   
    private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
        lock.lock();        try {
            V other;            // If arrival count already at two, we must wait for
            // a previous pair to finish and reset the count;
            while (arrivalCount == 2) {                if (!timed)
                    taken.await();                else if (nanos > 0) 
                    nanos = taken.awaitNanos(nanos);                else 
                    throw new TimeoutException();
            }            int count = ++arrivalCount;            // If item is already waiting, replace it and signal other thread
            if (count == 2) { 
                other = item;
                item = x;
                taken.signal();                return other;
            }            // Otherwise, set item and wait for another thread to
            // replace it and signal us.

            item = x;
            InterruptedException interrupted = null;            try { 
                while (arrivalCount != 2) {                    if (!timed)
                        taken.await();                    else if (nanos > 0) 
                        nanos = taken.awaitNanos(nanos);                    else 
                        break; // timed out
                }
            } catch (InterruptedException ie) {
                interrupted = ie;
            }            // Get and reset item and count after the wait.
            // (We need to do this even if wait was aborted.)
            other = item;
            item = null;
            count = arrivalCount;
            arrivalCount = 0; 
            taken.signal();            
            // If the other thread replaced item, then we must
            // continue even if cancelled.
            if (count == 2) {                if (interrupted != null)
                    Thread.currentThread().interrupt();                return other;
            }            // If no one is waiting for us, we can back out
            if (interrupted != null) 
                throw interrupted;            else  // must be timeout
                throw new TimeoutException();
        } finally {
            lock.unlock();
        }
    }  
    public Exchanger() {
    }   
    public V exchange(V x) throws InterruptedException {        try {            return doExchange(x, false, 0);
        } catch (TimeoutException cannotHappen) { 
            throw new Error(cannotHappen);
        }
    }  
   
    public V exchange(V x, long timeout, TimeUnit unit) 
        throws InterruptedException, TimeoutException {        return doExchange(x, true, unit.toNanos(timeout));
    }

}
  • 以上为源码:

  • 此类提供对外的操作是同步的;

  • 用于成对出现的线程之间交换数据;

  • 可以视作双向的同步队列;

  • 可应用于基因算法、流水线设计等场景。

     从官方的javadoc可以知道,当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达——完成交换正常返回;或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常

一个简单的例子

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/**
 * @Title: ExchangerTest
 * @Description: Test class for Exchanger
 * @Company: CSAIR
 * @Author: lixuanbin
 * @Creation: 2014年12月14日
 * @Version:1.0
 */
public class ExchangerTest {
    protected static final Logger log = Logger.getLogger(ExchangerTest.class);
    private static volatile boolean isDone = false;

    static class ExchangerProducer implements Runnable {
        private Exchanger<Integer> exchanger;
        private static int data = 1;
        ExchangerProducer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (!Thread.interrupted() && !isDone) {
                for (int i = 1; i <= 3; i++) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        data = i;
                        System.out.println("producer before: " + data);
                        data = exchanger.exchange(data);
                        System.out.println("producer after: " + data);
                    } catch (InterruptedException e) {
                        log.error(e, e);
                    }
                }
                isDone = true;
            }
        }
    }

    static class ExchangerConsumer implements Runnable {
        private Exchanger<Integer> exchanger;
        private static int data = 0;
        ExchangerConsumer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (!Thread.interrupted() && !isDone) {
                data = 0;
                System.out.println("consumer before : " + data);
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = exchanger.exchange(data);
                } catch (InterruptedException e) {
                    log.error(e, e);
                }
                System.out.println("consumer after : " + data);
            }
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        ExchangerProducer producer = new ExchangerProducer(exchanger);
        ExchangerConsumer consumer = new ExchangerConsumer(exchanger);
        exec.execute(producer);
        exec.execute(consumer);
        exec.shutdown();
        try {
            exec.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error(e, e);
        }
    }
}


标签: exchange
共有 人打赏支持
粉丝 9
博文 45
码字总数 8451
×
这些年了1990
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: