文档章节

并发编程之Callable和Future接口、FutureTask类

小菜鸡1
 小菜鸡1
发布于 2016/08/11 22:02
字数 2375
阅读 59
收藏 4
点赞 0
评论 0

    Callable接口代表一段可以调用并返回结果的代码;Future接口表示异步任务,是还没有完成的任务给出的未来结果。所以说Callable用于产生结果,Future用于获取结果。

    Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但它可以返回一个对象或者抛出一个异常。其中Runnable可以提交给Thread来包装下,直接启动一个线程来执行,而Callable则一般都是提交给ExecuteService来执行。Executor就是Runnable和Callable的调度容器,Future就是对于具体的调度任务的执行结果进行查看,最为关键的是Future可以检查对应的任务是否已经完成,也可以阻塞在get方法上一直等待任务返回结果。Runnable和Callable的差别就是Runnable是没有结果可以返回的,并且Runnable无法抛出返回结果的异常,就算是通过Future也看不到任务调度的结果的。

    Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于 Callable任务是并行的(并行就是整体看上去是并行的,其实在某个时间点只有一个线程在执行),我们必须等待它返回的结果。

    java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并 获取它的执行结果。

Callable接口的源码如下:

public interface Callable<V> {
    V call() throws Exception; // 计算结果
}

Future接口的源码如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);// 试图取消对此任务的执行

    boolean isCancelled();// 如果在任务正常完成前将其取消,则返回 true

    boolean isDone();// 如果任务已完成,则返回 true

    V get() throws InterruptedException, ExecutionException;// 如有必要,等待计算完成,然后获取其结果

    // 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Future用于表示异步计算的结果。它的实现类是FutureTask。

如果不想分支线程阻塞主线程,又想取得分支线程的执行结果,就用FutureTask
FutureTask实现了Runnable和Future接口,这个接口的定义如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

可以看到这个接口实现了Runnable和Future接口,接口中的具体实现由FutureTask来实现。这个类的两个构造方法如下 :

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    sync = new Sync(callable);
}

public FutureTask(Runnable runnable, V result) {
    sync = new Sync(Executors.callable(runnable, result));
}

如上提供了两个构造函数,一个以Callable为参数,另外一个以Runnable为参数。这些类之间的关联对于任务建模的办法非常灵活,允许你基于 FutureTask的Runnable特性(因为它实现了Runnable接口),把任务写成Callable,然后封装进一个由执行者调度并在必要时 可以取消的FutureTask。

FutureTask可以由执行者调度,这一点很关键。它对外提供的方法基本上就是Future和Runnable接口的组合:get()、cancel、isDone()、isCancelled()和run(),而run()方法通常都是由执行者调用,我们基本上不需要直接调用它。

下面来看一个FutureTask的例子,如下:

package test1;
import java.util.concurrent.*;

class MyCallable implements Callable<String> {
    private long waitTime;

    public MyCallable(int timeInMillis) {
        this.waitTime = timeInMillis;
    }

    @Override
    public String call() throws Exception {
        Thread.sleep(waitTime);
        //return the thread name executing this callable task
        return Thread.currentThread().getName();
    }
}

public class FutureTaskExample {
    public static void main(String[] args) {
        // 要执行的任务
        MyCallable callable1 = new MyCallable(1000);
        MyCallable callable2 = new MyCallable(2000);
        // Callable写的任务封装到一个由执行者调度的FutureTask对象
        FutureTask<String> futureTask1 = new FutureTask<String>(callable1);
        FutureTask<String> futureTask2 = new FutureTask<String>(callable2);
        // 创建线程池并返回ExecutorService实例
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.execute(futureTask1);  // 执行任务
        executor.execute(futureTask2);

        while (true) {
            try {
                //两个任务都完成
                if (futureTask1.isDone() && futureTask2.isDone()) {
                    System.out.println("Done");
                    // 关闭线程池和服务
                    executor.shutdown();
                    return;
                }
                //任务1没有完成,会等待,直到任务完成
                if (!futureTask1.isDone()) {
                    System.out.println("FutureTask1 output=" + futureTask1.get());
                }

                System.out.println("Waiting for FutureTask2 to complete");
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
                if (s != null) {
                    System.out.println("FutureTask2 output=" + s);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                //do nothing
            }
        }
    }
}

运行如上程序后,可以看到一段时间内没有输出,因为get()方法等待任务执行完成然后才输出内容. 

输出结果如下:

FutureTask1 output=pool-1-thread-1

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

FutureTask2 output=pool-1-thread-2

Done

Callable和Future接口示例程序:该程序是计算一个公司的年销售水泥的总 数目,每一行代表一个客户,每一列代表一个客户在每个月内的购买数量,将每一个客户(每一行)看做一个小任务。每一个任务计算之后放入Future中,等 待所有的计算完毕后,调用get(是一个阻塞方法,等待所有任务执行完毕)方法得到结果并计算总和。

package test1;

import java.text.DateFormatSymbols;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AnnualSalesCalc {

    private static int NUMBER_OF_CUSTOMERS = 100;
    private static int NUMBER_OF_MONTHS = 12;
    private static int salesMatrix[][];

    private static class Summer implements Callable<Integer> {
        private int companyID;

        public Summer(int companyID) {
            this.companyID = companyID;
        }

        @Override
        public Integer call() {
            int sum = 0;
            for (int col = 0; col < NUMBER_OF_MONTHS; col++) {
                sum += salesMatrix[companyID][col];
            }
            System.out.printf("Totaling for client 1%02d completed%n",
                    companyID);
            return sum;
        }
    }

    private static void generateMatrix() {
        salesMatrix = new int[NUMBER_OF_CUSTOMERS][NUMBER_OF_MONTHS];
        for (int i = 0; i < NUMBER_OF_CUSTOMERS; i++) {
            for (int j = 0; j < NUMBER_OF_MONTHS; j++) {
                salesMatrix[i][j] = (int) (Math.random() * 100);
            }
        }
    }

    private static void printMatrix() {
        System.out.print("\t\t");
        String[] monthDisplayNames = (new DateFormatSymbols()).getShortMonths();
        for (String strName : monthDisplayNames) {
            System.out.printf("%12s", strName);
        }
        System.out.println();
        for (int i = 0; i < monthDisplayNames.length - 1; i++) {
            System.out.print("=======");
        }
        System.out.println("====");
        for (int i = 0; i < NUMBER_OF_CUSTOMERS; i++) {
            System.out.printf("Client ID:1%02d%2s", i, "|");
            for (int j = 0; j < NUMBER_OF_MONTHS; j++) {
                System.out.printf("%6d", salesMatrix[i][j]);
            }
            System.out.println();
        }
        System.out.println();
    }

    public static void main(String[] args) throws 
            InterruptedException, ExecutionException {
        generateMatrix();
        printMatrix();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        Set<Future<Integer>> set = new HashSet<Future<Integer>>();
        for (int row = 0; row < NUMBER_OF_CUSTOMERS; row++) {
            Callable<Integer> callable = new Summer(row);
            Future<Integer> future = executor.submit(callable);
            set.add(future);
        }
        int sum = 0;
        for (Future<Integer> future : set) {
            sum += future.get();
        }
        System.out.printf("%nThe annual turnover (bags): %s%n%n", sum);
        executor.shutdown();
    }
}

部分结果:

                  一月          二月          三月          四月          五月          六月          七月          八月          九月          十月         十一月         十二月            

========================================================================================

Client ID:100 |    82    19    30    27    90    33    64    32    20    40    60    36

Client ID:101 |    19    99    14    26    87    86    26    22    51     2    75    57

Client ID:102 |    41    86    32    68    91    52     0    38    77    13    53     7

......

Client ID:197 |    68    93    72    72     8    68    10     6    90    11    81    78

Client ID:198 |    80    86    88    87    17    87    47    62    88    62    76    47

Client ID:199 |    10    38    74    36    75    17    31     4    48    92    43    59

Totaling for client 101 completed

Totaling for client 103 completed

Totaling for client 102 completed

......

Totaling for client 104 completed

Totaling for client 130 completed

Totaling for client 120 completed

The annual turnover (bags): 60275

FutureTask示例程序:演示股票交易程序,一个懒惰线程随时可以取消提交的任务,如果订单已经完成取消失败,如果任务正在执行且可以中断则取消该线程剩余的处理流程,如果订单已提交并且在分配给线程执行之前被取消,则订单会取消成功。

package test1;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class StocksOrderProcessor {
    public static final int MAX_NUMBER_OF_ORDERS = 1000;
    private static final ExecutorService executor = Executors
            .newFixedThreadPool(100);
    private static List<Future<Integer>> ordersToProcess = new ArrayList<Future<Integer>>();

    private static class OrderExecutor implements Callable<Integer> {
        private int id = 0;
        private int count = 0;

        public OrderExecutor(int id) {
            this.id = id;
        }

        @Override
        public Integer call() throws Exception {
            while (count < 50) {
                count++;
                Thread.sleep(new Random(System.currentTimeMillis() % 100)
                        .nextInt(10));
            }
            System.out.println("Successfully executed order: " + id);
            return id;
        }
    }

    private static void submitOrder(int id) {
        Callable<Integer> callable = new OrderExecutor(id);
        ordersToProcess.add(executor.submit(callable));
    }

    public static void main(String[] args) {
        System.out.printf("Submitting %d trades%n", MAX_NUMBER_OF_ORDERS);
        for (int i = 0; i < MAX_NUMBER_OF_ORDERS; i++) {
            submitOrder(i);
        }
        new Thread(new EvilThread(ordersToProcess)).start();
        System.out.println("Cancelling a few orders at random");
        try {
            // 为了让所有任务都可以完成,让执行器等待30秒钟
            executor.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Checking status before shutdown");
        int count = 0;
        for (Future<Integer> future : ordersToProcess) {
            if (future.isCancelled()) {
                count++;
            }
        }
        System.out.printf("%d trades cancelled%n", count);
        // shutdown方法并不意味着之前提交的任务被立即取消,而是发起任务顺序停止以便之前提交的任务
        // 可以被执行,但是他保证不再接受新的任务。ExecuteExistingDelayedTaskAfterShutdownPolicy
        // 被设置为false意味着现有未完成的任务会被取消,反之,不会被取消。
        // ContinueExistingPeriodicTasksAfterShutdownPolicy
        // 被设置为true,那么已有的周期性任务会被取消。
        executor.shutdown();
    }
}

class EvilThread implements Runnable {
    private List<Future<Integer>> ordersToProcess;

    public EvilThread(List<Future<Integer>> futures) {
        this.ordersToProcess = futures;
    }

    @Override
    public void run() {
        Random myNextKill = new Random(System.currentTimeMillis() % 100);
        for (int i = 0; i < 100; i++) {
            int index = myNextKill
                    .nextInt(StocksOrderProcessor.MAX_NUMBER_OF_ORDERS);
            boolean cancel = ordersToProcess.get(index).cancel(true);
            if (cancel == true) {
                System.out.println("Cancel Order Succeed: " + index);
            } else {
                System.out.println("Cancel Order Failed: " + index);
            }
            try {
                Thread.sleep(myNextKill.nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

部分执行结果:

Submitting 1000 trades

Successfully executed order: 64

Cancelling a few orders at random

Cancel Order Succeed: 857

Cancel Order Succeed: 402

Cancel Order Succeed: 262

Cancel Order Succeed: 72

Successfully executed order: 62

Successfully executed order: 102

Cancel Order Succeed: 327

......

Cancel Order Failed: 539

本文转载自:

共有 人打赏支持
小菜鸡1
粉丝 10
博文 59
码字总数 16851
作品 0
深圳
程序员
Executor框架结构与主要成员(一)

本文分两部分来介绍Executor:Executor的结构和Executor框架包含的成员组件 1、Executor框架的结构 Executor主要由3大部分组成。 1.1、任务。包含被执行任务需要实现的接口:Runnable接口或C...

Dreyer
2016/05/07
133
0
Java多线程、并发杂记

多线程涉及的类可以分为以下几类: 可执行对象:最基本的多线程 执行器:简化多线程编程 工具类 容器 并发控制 一、可执行对象: 1、Runnable: 执行单位:Thread 创建线程的两种方式(来自于...

rathan0
2016/02/17
60
0
java Callable & Future & FutureTask

实现Runnable接口的线程类与一个缺陷,就是在任务执行完之后无法取得任务的返回值。 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦 ...

Key_Stone
2016/09/16
20
0
Java并发编程 -- Executor 框架介绍

前面详细通过源码解释了ThreadPoolExecutor类的运行原理,本篇文章来说一下Executor的框架组成。 Java的线程既是工作单元也是执行单元,从JDK5开始,把工作单元与执行机制分离开来,工作单元...

GordonNemo
03/13
0
0
Thread,Runnable,Callable. 多线程

编写多线程程序是为了实现多任务的并发执行,从而能够更好地与用户交互。一般有三种方法,Thread,Runnable,Callable.   Runnable和Callable的区别是,   (1)Callable规定的方法是call(...

千惊万喜
2016/06/30
12
0
JAVA多线程高并发学习笔记(三)——Callable、Future和FutureTask

为什么要是用Callable和Future Runnable的局限性 Executor采用Runnable作为基本的表达形式,虽然Runnable的run方法能够写入日志,写入文件,写入数据库等操作,但是它不能返回一个值,或者抛...

诸葛西门
2017/09/19
0
0
Callable+FutureTask多线程方案

1.Callable余FutureTask简介  先说一下java.lang.Runnable吧,它是一个接口,在它里面只声明了一个run()方法:  由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。 ...

IamOkay
2015/12/14
640
0
Callable,Runnable比较及用法以及创建线程的4种方法

编写多线程程序是为了实现多任务的并发执行,从而能够更好地与用户交互。一般有三种方法,Thread,Runnable,Callable. Runnable和Callable的区别是, (1)Callable规定的方法是call(),Runnable...

geek_loser
2017/10/25
0
0
java多线程实现方法

编写多线程程序是为了实现多任务的并发执行,从而能够更好地与用户交互。一般有三种方法,Thread,Runnable,Callable. Runnable和Callable的区别是, (1)Callable规定的方法是call(),Runnable...

张欢19933
2016/02/23
31
0
java并发callable,runnable,future和futureTask

java多线程 java实现多线程有两种方式:一个是直接继承Thread类,一种是实现Runnable接口。但这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。于是后面又增加了Callable和Fut...

张欢19933
2016/02/23
57
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

公众号推荐

阿里技术 书籍:《不止代码》

courtzjl
1分钟前
0
0
关于改进工作效率

1.给不同的业务线建立需求群,所有的数据需求都在群里面提。 2.对于特别难搞定的事情,到对应的技术哪去做,有问题随时沟通。 3.定期给工作总结形成方法论。 4.学习新的技术,尝试用新的方法...

Avner
8分钟前
0
0
关于thinkphp 框架开启路径重写,无法获取Authorization Header

今天遇到在thinkphp框架中获取不到header头里边的 Authorization ,后来在.htaccess里面加多一项解决,记录下: <IfModule mod_rewrite.c> Options +FollowSymlinks -Multiviews Rewrite......

殘留回憶
12分钟前
0
0
centos 使用yum安装nginx后如何添加模块 10

centos 使用yum安装nginx后如何添加模块 10 centos6.2版本,使用yum来安装了nginx,但是最近需要重新添加模块,所以就傻了,询问下有人知道怎么重新添加模块吗? PS:俺是新手,需要高手救助...

linjin200
15分钟前
0
0
dubbo 资料

dubbo资料网站: https://www.cnblogs.com/a8457013/p/7818925.html

zaolonglei
16分钟前
0
0
大型网站,你是如何架构的?

大型网站,你是如何架构的?

微小宝
18分钟前
0
0
javaScript选框的全选与取消

<div> <input type="button" value="全选" onclick="quan()"> <input type="button" value="取消" onclick="cancel()"> <input type="button" value="反选" onclick="reverse()"> <table> <t......

南桥北木
19分钟前
0
0
七牛云宫静:基于容器和大数据平台的持续交付平台

7 月 6 日上午,在 ArchSummit 2018 深圳站 | 全球架构师峰会上,七牛云工程效率部技术专家宫静分享了《基于容器和大数据平台的持续交付平台》为题的演讲。本文是对演讲内容的整理。
 
 本...

七牛云
26分钟前
1
0
Linux系统下如何查看某个命令的安装位置

1.which + 命令 会出现这个命令的路径,如果不是软链接的话,那么这就是此软件的安装路径;如果是软连接的话,那么进入下一步 2.进入上面的路径下,输入:ls -al 要查照的命令 会出现它的真是...

xiaomin0322
26分钟前
1
0
微信小程序富文本图片处理二

一、将富文本中图片的相对链接地址修改成绝对链接地址 //替换图片链接 data.content = data.content.replace(/<img [^>]*src=['"]([^'"]+)[^>]*>/gi, function (match......

tianma3798
30分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部