文档章节

java 利用Future异步获取多线程任务结果

 细节探索者
发布于 01/11 08:39
字数 909
阅读 19
收藏 8
点赞 0
评论 0

简述

Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实现,可以来进行异步计算。

有了Future就可以进行三段式的编程了,1.启动多线程任务2.处理其他事3.收集多线程任务结果。从而实现了非阻塞的任务调用。在途中遇到一个问题,那就是虽然能异步获取结果,但是Future的结果需要通过isdone来判断是否有结果,或者使用get()函数来阻塞式获取执行结果。这样就不能实时跟踪其他线程的结果状态了,所以直接使用get还是要慎用,最好配合isdone来使用。

这里有一种更好的方式来实现对任意一个线程运行完成后的结果都能及时获取的办法:使用CompletionService,它内部添加了阻塞队列,从而获取future中的值,然后根据返回值做对应的处理。一般future使用和CompletionService使用的两个测试案例如下:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 多线程执行,异步获取结果
 * 
 * @author i-clarechen
 *
 */
public class AsyncThread {

    public static void main(String[] args) {
        AsyncThread t = new AsyncThread();
        List<Future<String>> futureList = new ArrayList<Future<String>>();
        t.generate(3, futureList);
        t.doOtherThings();
        t.getResult(futureList);
    }

    /**
     * 生成指定数量的线程,都放入future数组
     * 
     * @param threadNum
     * @param fList
     */
    public void generate(int threadNum, List<Future<String>> fList) {
        ExecutorService service = Executors.newFixedThreadPool(threadNum);
        for (int i = 0; i < threadNum; i++) {
            //添加到线程池中,有返回值
            Future<String> f = service.submit(getJob(i));
            fList.add(f);
        }
         //关闭执行服务对象
        service.shutdown();
    }

    /**
     * other things
     */
    public void doOtherThings() {
        try {
            for (int i = 0; i < 3; i++) {
                System.out.println("do thing no:" + i);
                Thread.sleep(1000 * (new Random().nextInt(10)));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 从future中获取线程结果,打印结果
     * 
     * @param fList
     */
    public void getResult(List<Future<String>> fList) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        service.execute(getCollectJob(fList));
        service.shutdown();
    }

    /**
     * 生成指定序号的线程对象
     * 
     * @param i
     * @return
     */
    public Callable<String> getJob(final int i) {
        final int time = new Random().nextInt(10);
        return new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000 * time);
                return "thread-" + i;
            }
        };
    }

    /**
     * 生成结果收集线程对象
     * 
     * @param fList
     * @return
     */
    public Runnable getCollectJob(final List<Future<String>> fList) {
        return new Runnable() {
            public void run() {
                for (Future<String> future : fList) {
                    try {
                        while (true) {
                            if (future.isDone() && !future.isCancelled()) {
                                System.out.println("Future:" + future
                                        + ",Result:" + future.get());
                                break;
                            } else {
                                Thread.sleep(1000);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        };
    }

}

运行结果打印和future放入列表时的顺序一致,为0,1,2:

do thing no:0
do thing no:1
do thing no:2
Future:java.util.concurrent.FutureTask@68e1ca74,Result:thread-0
Future:java.util.concurrent.FutureTask@3fb2bb77,Result:thread-1
Future:java.util.concurrent.FutureTask@6f31a24c,Result:thread-2

下面是先执行完的线程先处理的方案:

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;


public class testCallable {
    public static void main(String[] args) {
        try {
            completionServiceCount();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

  
    /**
     * 使用completionService收集callable结果
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public static void completionServiceCount() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
                executorService);
        int threadNum = 5;
        for (int i = 0; i < threadNum; i++) {
            completionService.submit(getTask(i));
        }
        int sum = 0;
        int temp = 0;
        for(int i=0;i<threadNum;i++){
            temp = completionService.take().get();
            sum += temp;
            System.out.print(temp + "\t");
        }
        System.out.println("CompletionService all is : " + sum);
        executorService.shutdown();
    }

    public static Callable<Integer> getTask(final int no) {
        final Random rand = new Random();
        Callable<Integer> task = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int time = rand.nextInt(100)*100;
                System.out.println("thead:"+no+" time is:"+time);
                Thread.sleep(time);
                return no;
            }
        };
        return task;
    }
}

运行结果为最先结束的线程结果先被处理:

thead:0 time is:4200
thead:1 time is:6900
thead:2 time is:2900
thead:3 time is:9000
thead:4 time is:7100
   0    1    4    3    CompletionService all is : 10

介绍

submit和execute都是 ExecutorService 的方法,都是添加线程到线程池中。

区别 submit 有返回值 返回future , execute没有

submit 返回值 future 用处 可以执行cancle方法,取消执行 可以通过get()方法,判断是否执行成功 ==null表示执行成功

© 著作权归作者所有

共有 人打赏支持
粉丝 19
博文 232
码字总数 325352
作品 0
嘉兴
程序员
JAVA多线程和并发基础面试问答

原文链接 译文连接 作者:Pankaj 译者:郑旭东 校对:方腾飞 多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一。在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢...

雷神雨石
2014/07/19
0
2
JAVA多线程和并发基础面试问答

多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一。在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢固的掌握Java多线程基础知识来对应日后碰到的问题。(校对注:...

LCZ777
2014/05/26
0
0
JAVA多线程和并发基础面试问答

Java多线程面试问题 1. 进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用。而线程是在进程中执行的一个任务。Java运行环境是一...

清风傲剑
2014/12/06
0
0
JAVA多线程和并发基础面试问答

Java多线程面试问题 1. 进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用。而线程是在进程中执行的一个任务。Java运行环境是一...

hanzhankang
2014/01/20
0
0
java Future 接口介绍

在Java中,如果需要设定代码执行的最长时间,即超时,可以用Java线程池ExecutorService类配合Future接口来实现。 Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口...

程序袁_绪龙
2014/11/25
0
0
Java 并发之 Future 接口

简介 Future 是 Java 5 JUC 包中的一个接口,主要提供了三类功能: 任务结果的获取 这个功能由 get 方法提供,它有两种形式的重载。get 方法本身使用起来很简单,需要注意的是它所抛出的异常...

编走编想
2013/11/13
0
3
java多线程编程之Future/FutureTask和Callable

有这样一种场景,用多线程发送数据到某个服务器,需要知道各个线程是否都发送成功,等所有线程都发送完成才能继续下一轮计算和发送。如果用传统的多线程方式,就需要启动多个线程,然后在每个...

上品好礼生活馆
2014/09/11
0
0
Java编程的逻辑 -- 并发章 -- 线程的基本协作机制

线程的基本协作 线程的基本协作示例 总结 线程的基本协作 多线程间除了竞争访问同一资源外,也经常需要相互协作的去执行一些任务。而对于协作的基本机制用的最多的无疑是wait/notify。 协作的...

HikariCP
06/22
0
0
JAVA多线程实现的三种方式和带返回值实现

1、JAVA多线程实现的四种方式 继承Thread类 实现Runnable接口 实现Callable接口通过FutureTask包装器来创建Thread线程 使用ExecutorService、Callable、Future实现有返回结果的多线程 2、继承...

职业搬砖20年
06/14
0
0
多线程编程读书笔记之线程中断的本质

Java试图提供过抢占式限制中断,但问题多多,例如已被废弃的Thread.stop、Thread.suspend和 Thread.resume等。另一方面,出于Java应用代码的健壮性的考虑,降低了编程门槛,减少不清楚底层机...

刘学炜
2012/07/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

编程语言对比分析:Python与Java和JavaScript(图)

编程语言对比分析:Python与Java和JavaScript(图): 凭什么说“Python 太慢,Java 太笨拙,我讨厌 JavaScript”?[图] 编程语言生而为何? 我们人类从原始社会就是用语言表达自己,互相沟通...

原创小博客
4分钟前
0
0
Akka构建Reactive应用《one》

看到这Akka的官网,描述使用java或者scala构建响应式,并发和分布式应用更加简单,听着很高级的样子,下面的小字写着消息驱动,但是在quickstart里面又写容错事件驱动,就是这么钻牛角尖。 ...

woshixin
16分钟前
0
0
ffmpeg源码分析 (四)

io_open 承接上一篇,对于avformat_open_input的分析还差其中非常重要的一步,就是io_open,该函数用于打开FFmpeg的输入输出文件。 在init_input中有这么一句 if ((ret = s->io_open(s, &s-...

街角的小丑
17分钟前
0
0
String,StringBuffer ,StringBuilder的区别

不同点 一、基类不同 StringBuffer、StringBuilder 都继承自AbStractStringBuilder,String 直接继承自 Object 2、底层容器“不同” 虽然底层都是字符数组,但是String的是final修饰的不可变...

不开心的时候不要学习
32分钟前
0
0
nodejs 文件操作

写文件code // 加载文件模块var fs = require("fs");var content = 'Hello World, 你好世界!';//params 文件名,内容,编码,回调fs.writeFile('./hello.txt',content,'utf8',function (er......

yanhl
35分钟前
0
0
SpringBoot mybits 查询为0条数据 但是在Navicat 中可以查询到数据

1.页面请求: 数据库查询: 2018-07-16 17:56:25.054 DEBUG 17312 --- [nio-9010-exec-3] c.s.h.m.C.selectSelective : ==> Preparing: select id, card_number, customer_id, customer_nam......

kuchawyz
45分钟前
0
0
译:Self-Modifying cod 和cacheflush

date: 2014-11-26 09:53 翻译自: http://community.arm.com/groups/processors/blog/2010/02/17/caches-and-self-modifying-code Cache处在CPU核心与内存存储器之间,它给我们的感觉是,它具......

我叫半桶水
47分钟前
0
0
Artificial Intelligence Yourself

TensorFlow是谷歌基于DistBelief进行研发的第二代人工智能学习系统,其命名来源于本身的运行原理。Tensor(张量)意味着N维数组,Flow(流)意味着基于数据流图的计算,TensorFlow为张量从流...

孟飞阳
今天
0
0
press.one个人数字签名

这是我在press.one的数字签名 https://press.one/p/address/v?s=9d3d5b7ce019af357ab994775549e8f047a5b17fc9893364652fc67e4b95443b38ccb24c6655e0d252dd0154369eb9b7717c4ccf4e1835ca3596......

NateHuang
今天
1
0
Oracle 中的 SQL 分页查询原理和方法详解

本文分析并介绍 Oracle 中的分页查找的方法。 Oracle 中的表,除了我们建表时设计的各个字段,其实还有两个字段(此处只介绍2个),分别是 ROWID(行标示符)和 ROWNUM(行号),即使我们使用...

举个_栗子
今天
4
2

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部