用 ForkJoin 写一个并发执行任务的工具类 BatchTaskRunner

原创
2020/12/31 09:50
阅读数 7.4K

实际编程中经常需要并发执行多个任务,并等待这些任务运行结束返回结果。

所以用 Java 的 ForkJoin 简单撸了一个工具类:

package com.gitee.search.utils;

import java.util.List;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.function.Consumer;

/**
 * Batch task action
 * @author Winter Lau<javayou@gmail.com>
 */
public final class BatchTaskRunner extends RecursiveAction {

    protected int threshold = 5; //每个线程处理的任务数
    protected List taskList;
    Consumer<List> action;

    /**
     * @param taskList      任务列表
     * @param threshold     每个线程处理的任务数
     */
    private BatchTaskRunner(List taskList, int threshold, Consumer action) {
        this.taskList = taskList;
        this.threshold = threshold;
        this.action = action;
    }

    /**
     * 多线程批量执行任务
     * @param taskList
     * @param threshold
     * @param action
     */
    public static <T> void execute(List<T> taskList, int threshold, Consumer<List<T>> action) {
        new BatchTaskRunner(taskList, threshold, action).invoke();
    }

    @Override
    protected void compute() {
        if (taskList.size() <= threshold) {
            this.action.accept(taskList);
        }
        else {
            this.splitFromMiddle(taskList);
        }
    }

    /**
     * 任务中分
     * @param list
     */
    private void splitFromMiddle(List list) {
        int middle = (int)Math.ceil(list.size() / 2.0);
        List leftList = list.subList(0, middle);
        List RightList = list.subList(middle, list.size());
        BatchTaskRunner left = newInstance(leftList);
        BatchTaskRunner right = newInstance(RightList);
        ForkJoinTask.invokeAll(left, right);
    }

    private BatchTaskRunner newInstance(List taskList) {
        return new BatchTaskRunner(taskList, threshold, action);
    }

}

使用方法:

List<Integer> allTasks = Arrays.asList(1,2,3,4,5);
int taskPerThread = 1;
BatchTaskRunner.execute(allTasks, taskPerThread, tasks -> {
    System.out.printf("[%s]: %s\n", Thread.currentThread().getName(), tasks);
}); 

这里假设有5个任务(allTasks),其中 taskPerThread 是指定每个线程处理的任务数。

而 { ... } 内就是任务的处理逻辑。

超简单。

展开阅读全文
打赏
3
12 收藏
分享
加载中
打赏
0 评论
12 收藏
3
分享
返回顶部
顶部