# DAG 任务调度实现以及优化

2019/09/04 10:04

github:https://github.com/smartxing/algorithm

1 有向图的构建

DAG dag = new DAG();
System.out.println(dag);

2 拓扑排序检测图中是否有环

public boolean isCircularity() {
Set<Object> set = inDegree.keySet();
//入度表
Map<Object, AtomicInteger> inDegree = set.stream().collect(Collectors
.toMap(k -> k, k -> new AtomicInteger(this.inDegree.get(k).size())));
//入度为0的节点
Set sources = getSources();
while (!queue.isEmpty()) {
Object o = queue.removeFirst();
outDegree.get(o)
.forEach(so -> {
if (inDegree.get(so).decrementAndGet() == 0) {
}
});
}
return inDegree.values().stream().filter(x -> x.intValue() > 0).count() > 0;
}

3 stage优化

  eg

eg : 复杂一点的DAG:
/**
*  H
*    \
*      G
*        \
*     A -> B
*            \
*  C- D  -E  - F-> J
*
*
*
*    优化后得  ==>
*
*     (H,G)
*         \
*     A -> B
*            \
*  (C,D,E)  - (F,J)
*
*/

详见chain方法： 关键代码如下

sources.forEach(sourceNode -> {

ArrayList<Object> maxStage = Lists.newArrayList();
findMaxStage(sourceNode, maxStage);
if (maxStage.size() > 1) { //存在需要合并的stage
Object o = maxStage.get(maxStage.size() - 1); //最后一个节点
reChain_(foutChain, finChain, maxStage, o);
}
if (maxStage.size() == 1) {
//不存在需要合并的stage
Set subNodes = outDegree.get(sourceNode);
}
});
}

4 测试DAG 执行

测试程序: 详见 DAGExecTest
public static class Task implements Runnable {

}

@Override public void run() {
try {
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

@Override public String toString() {
}
}
2 构建DAG
DAG dag = DAG.create();
构建完成后如图
*   H
*    \
*      G
*        \
*     A -> B
*            \
*  C- D  -E  - F-> J

3 stage 切分
DAG chain = dag.chain();
执行完图入下：
*     (H,G)
*         \
*     A -> B
*            \
*  (C,D,E)  - (F,J)

4 执行 DAG  DAGExecTest   最终结果打印如下如下：

其中c-d-e   g-c  f-j是经过优化的在同一个线程里面执行，减少了不必要的上下文切换

i am running  my name is a  finish ThreadID: 10
i am running  my name is c  finish ThreadID: 11
i am running  my name is h  finish ThreadID: 12
i am running  my name is d  finish ThreadID: 11
i am running  my name is g  finish ThreadID: 12
i am running  my name is e  finish ThreadID: 11
-----------------------------------------------
i am running  my name is b  finish ThreadID: 14
-----------------------------------------------
i am running  my name is f  finish ThreadID: 11
i am running  my name is j  finish ThreadID: 11
stage 结束 ：  task chain f-j
测试执行关键代码如下：
chain.execute(col -> {
Set set = (Set) col;
List<CompletableFuture> completableFutures = Lists.newArrayList();
StringBuilder sb = new StringBuilder();
set.stream().forEach(x -> {
CompletableFuture<Void> future = CompletableFuture.runAsync((Task) x, executorService);
}
if (x instanceof List) {
CompletableFuture<Void> future = CompletableFuture.runAsync(()->
sb.append(
}
});
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();
System.out.println("stage 结束 ： " + sb.toString());
System.out.println("-----------------------------------------------");
});



0
0 收藏

0 评论
0 收藏
0