文档章节

并发任务之间的数据交换

markGao
 markGao
发布于 2014/05/06 09:32
字数 612
阅读 81
收藏 2
package com.packtpub.java7.concurrency.chapter3.recipe7.core;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

import com.packtpub.java7.concurrency.chapter3.recipe7.task.Consumer;
import com.packtpub.java7.concurrency.chapter3.recipe7.task.Producer;

/**
 * Main class of the example
 * 
 */
public class Main {

    /**
     * Main method of the example
     * 
     * @param args
     */
    public static void main(String[] args) {

        // Creates two buffers
        List<String> buffer1 = new ArrayList<>();
        List<String> buffer2 = new ArrayList<>();

        // Creates the exchanger
        Exchanger<List<String>> exchanger = new Exchanger<>();

        // Creates the producer
        Producer producer = new Producer(buffer1, exchanger);
        // Creates the consumer
        Consumer consumer = new Consumer(buffer2, exchanger);

        // Creates and starts the threads
        Thread threadProducer = new Thread(producer);
        Thread threadConsumer = new Thread(consumer);

        threadProducer.start();
        threadConsumer.start();

    }

}
package com.packtpub.java7.concurrency.chapter3.recipe7.task;

import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * This class implements the producer
 * 
 */
public class Producer implements Runnable {

    /**
     * Buffer to save the events produced
     */
    private List<String> buffer;

    /**
     * Exchager to synchronize with the consumer
     */
    private final Exchanger<List<String>> exchanger;

    /**
     * Constructor of the class. Initializes its attributes
     * 
     * @param buffer
     *            Buffer to save the events produced
     * @param exchanger
     *            Exchanger to syncrhonize with the consumer
     */
    public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
        this.buffer = buffer;
        this.exchanger = exchanger;
    }

    /**
     * Main method of the producer. It produces 100 events. 10 cicles of 10
     * events. After produce 10 events, it uses the exchanger object to
     * synchronize with the consumer. The producer sends to the consumer the
     * buffer with ten events and receives from the consumer an empty buffer
     */
    @Override
    public void run() {
        int cycle = 1;

        for (int i = 0; i < 10; i++) {
            System.out.printf("Producer: Cycle %d\n", cycle);

            for (int j = 0; j < 10; j++) {
                String message = "Event " + ((i * 10) + j);
                System.out.printf("Producer: %s\n", message);
                buffer.add(message);
            }

            try {
                /*
                 * Change the data buffer with the consumer
                 */
                buffer = exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.printf("Producer: %d\n", buffer.size());

            cycle++;
        }

    }

}
package com.packtpub.java7.concurrency.chapter3.recipe7.task;

import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * This class implements the consumer of the example
 * 
 */
public class Consumer implements Runnable {

    /**
     * Buffer to save the events produced
     */
    private List<String> buffer;

    /**
     * Exchager to synchronize with the consumer
     */
    private final Exchanger<List<String>> exchanger;

    /**
     * Constructor of the class. Initializes its attributes
     * 
     * @param buffer
     *            Buffer to save the events produced
     * @param exchanger
     *            Exchanger to syncrhonize with the consumer
     */
    public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
        this.buffer = buffer;
        this.exchanger = exchanger;
    }

    /**
     * Main method of the producer. It consumes all the events produced by the
     * Producer. After processes ten events, it uses the exchanger object to
     * synchronize with the producer. It sends to the producer an empty buffer
     * and receives a buffer with ten events
     */
    @Override
    public void run() {
        int cycle = 1;

        for (int i = 0; i < 10; i++) {
            System.out.printf("Consumer: Cycle %d\n", cycle);

            try {
                // Wait for the produced data and send the empty buffer to the
                // producer
                buffer = exchanger.exchange(buffer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.printf("Consumer: %d\n", buffer.size());

            for (int j = 0; j < 10; j++) {
                String message = buffer.get(0);
                System.out.printf("Consumer: %s\n", message);
                buffer.remove(0);
            }

            cycle++;
        }

    }

}


© 著作权归作者所有

markGao
粉丝 15
博文 187
码字总数 91352
作品 0
宝山
程序员
私信 提问
数据交换工具DataX使用(1)

DataX是什么? DataX是一个在异构的数据库/文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换,由淘宝数据平台部门完成。 DataX用来...

小小毛同学
06/29
0
0
JAVA线程14 - 新特性:同步工具

一、Semaphore 1. 简介 Semaphore实现信号量。 Semaphore可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数。例如:实现一个文件允许的并发访...

小米米儿小
2014/03/05
105
0
java虚拟机并发编程3-4章-阅读笔记

第四章 Scalability and Thread Safety(可扩展性和线程安全) 4.1Managing Threads with ExecutorService(使用ExxtutorService管理线程) 每个ExecutorService代表一个线程池,其目的是将线程...

上官胡闹
2016/10/30
11
0
Java并发编程中级篇(七):并发任务间交换数据

Java API提供了一个同步辅助类Exchanger。它允许你在线程执行过程中在线程之间交换数据。它的机制是在线程中设置通步点,当两个线程都到达同步点之时,它们交换数据结构,因此第一个线程的数...

阿拉德大陆的魔法师
2016/11/28
40
0
从Exchager数据交换到基于trade-off的系统设计

可以使用JDK提供的Exchager类进行同步交换:进行数据交换的双方将互相等待对方,直到双方的数据都准备完毕,才进行交换。Exchager类很少用到,但理解数据交换的时机却十分重要,这是一个基于...

猴子007
2017/10/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

只需一步,在Spring Boot中统一Restful API返回值格式与统一处理异常

统一返回值 在前后端分离大行其道的今天,有一个统一的返回值格式不仅能使我们的接口看起来更漂亮,而且还可以使前端可以统一处理很多东西,避免很多问题的产生。 比较通用的返回值格式如下:...

晓月寒丶
昨天
59
0
区块链应用到供应链上的好处和实际案例

区块链可以解决供应链中的很多问题,例如记录以及追踪产品。那么使用区块链应用到各产品供应链上到底有什么好处?猎头悬赏平台解优人才网小编给大家做个简单的分享: 使用区块链的最突出的优...

猎头悬赏平台
昨天
28
0
全世界到底有多少软件开发人员?

埃文斯数据公司(Evans Data Corporation) 2019 最新的统计数据(原文)显示,2018 年全球共有 2300 万软件开发人员,预计到 2019 年底这个数字将达到 2640万,到 2023 年达到 2770万。 而来自...

红薯
昨天
65
0
Go 语言基础—— 通道(channel)

通过通信来共享内存(Java是通过共享内存来通信的) 定义 func service() string {time.Sleep(time.Millisecond * 50)return "Done"}func AsyncService() chan string {retCh := mak......

刘一草
昨天
58
0
Apache Flink 零基础入门(一):基础概念解析

Apache Flink 的定义、架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速...

Vincent-Duan
昨天
60
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部