推广

基于DAG实现的任务编排框架&平台

iseeyu2年前 (2024-02-21)推广142

image

DAG 有向无环图

首先我们了解图这个数据结构,每个元素称为顶点 vertex,顶点之间的连线称为边 edge。像我们画的这种带箭头关系的称为有向图,箭头关系之间能形成一个环的成为有环图,反之称为无环图。显然运用在我们任务编排工作流上,最合适的是 DAG 有向无环图。

我们在代码里怎么存储图呢,有两种数据结构:邻接矩阵和邻接表。

下图表示一个有向图的邻接矩阵,例如 x->y 的边,只需将 Array[x][y]标识为 1 即可。

image

此外我们也可以使用邻接表来存储,这种存储方式较好地弥补了邻接矩阵浪费空间的缺点,但相对来说邻接矩阵能更快地判断连通性。

image

一般在代码实现上,我们会选择邻接矩阵,这样我们在判断两点之间是否有边更方便点。

一个任务编排框架

了解了 DAG 的基本知识后我们可以来简单实现一下。首先是存储结构,我们的 Dag 表示一整个图,Node 表示各个顶点,每个顶点有其 parents 和 children:

//Dag
public final class DefaultDag<T, R> implements Dag<T, R> {

    private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>();
    ...
}

//Node
public final class Node<T, R> {
    /**
     * incoming dependencies for this node
     */
    private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>();
    /**
     * outgoing dependencies for this node
     */
    private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>();
    ...
}

画两个顶点,以及为这两个顶点连边操作如下:

    public void addDependency(final T evalFirstNode, final T evalLaterNode) {
        Node<T, R> firstNode = createNode(evalFirstNode);
        Node<T, R> afterNode = createNode(evalLaterNode);

        addEdges(firstNode, afterNode);
    }

   private Node<T, R> createNode(final T value) {
        Node<T, R> node = new Node<T, R>(value);
        return node;
    }

    private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {
        if (!firstNode.equals(afterNode)) {
            firstNode.getChildren().add(afterNode);
            afterNode.getParents().add(firstNode);
        }
    }

到现在我们其实已经把基础数据结构写好了,但我们作为一个任务编排框架最终是需要线程去执行的,我们把它和线程池一起给包装一下。

//任务编排线程池
public class DefaultDexecutor <T, R> {

    //执行线程,和2种重试线程
    private final ExecutorService<T, R> executionEngine;
    private final ExecutorService immediatelyRetryExecutor;
    private final ScheduledExecutorService scheduledRetryExecutor;
    //执行状态
    private final ExecutorState<T, R> state;
    ...
}
//执行状态
public class DefaultExecutorState<T, R> {
    //底层图数据结构
    private final Dag<T, R> graph;
    //已完成
    private final Collection<Node<T, R>> processedNodes;
    //未完成
    private final Collection<Node<T, R>> unProcessedNodes;
    //错误task
    private final Collection<ExecutionResult<T, R>> erroredTasks;
    //执行结果
    private final Collection<ExecutionResult<T, R>> executionResults;
}

可以看到我们的线程包括执行线程池,2 种重试线程池。我们使用 ExecutorState 来保存一些整个任务工作流执行过程中的一些状态记录,包括已完成和未完成的 task,每个 task 执行的结果等。同时它也依赖我们底层的图数据结构 DAG。

接下来我们要做的事其实很简单,就是 BFS 这整个 DAG 数据结构,然后提交到线程池中去执行就可以了,过程中注意一些节点状态的保持,结果的保存即可。

image

还是以上图为例,值得说的一点是在 Task D 这个点需要有一个并发等待的操作,即 Task D 需要依赖 Task B 和 Task C 执行结束后再往下执行。这里有很多办法,我选择了共享变量的方式来完成并发等待。遍历工作流中被递归的方法的伪代码如下:

private void doProcessNodes(final Set<Node<T, R>> nodes) {
        for (Node<T, R> node : nodes) {
        //共享变量 并发等待
        if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {
            Task<T, R> task = newTask(node);
            this.executionEngine.submit(task);
            ...
            ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
            if (executionResult.isSuccess()) {
                state.markProcessingDone(processedNode);
            }
            //继续执行孩子节点
            doExecute(processedNode.getChildren());
            ...
        }
    }
}

这样我们基本完成了这个任务编排框架的工作,现在我们可以如下来进行示例图中的任务编排以及执行:

DefaultExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("A", "B");
executor.addDependency("B", "D");
executor.addDependency("C", "D");
executor.execute();

任务编排平台化

好了现在我们已经有一款任务编排框架了,但很多时候我们想要可视化、平台化,让使用者更加无脑。

框架与平台最大的区别在哪里?是可拖拽的可视化输入么?我觉得这个的复杂度更多在前端。而对于后端平台来讲,与框架最大的区别是数据的持久化。

对于 DAG 的顶点来说,我们需要将每个节点 Task 的信息给持久化到关系数据库中,包括 Task 的状态、输出结果等。而对于 DAG 的边来说,我们也得用数据库来存储各 Task 之间的方向关系。此外,在遍历执行 DAG 的整个过程中的中间状态数据,我们也得搬运到数据库中。

首先我们可以设计一个 workflow 表,来表示一个工作流。接着我们设计一个 task 表,来表示一个执行单元。task 表主要字段如下,这里主要是 task_parents 的设计,它是一个 string,存储 parents 的 taskId,多个由分隔符分隔。

task_id
workflow_id
task_name
task_status
result
task_parents

image

依赖是上图这个例子,对比框架来说,我们首先得将其存储到数据库中去,最终可能得到如下数据:

task_id  workflow_id  task_name  task_status  result  task_parents
  1          1           A           0                    -1
  2          1           B           0                    1
  3          1           C           0                    -1
  4          1           D           0                    2,3

可以看到,这样也能很好地存储 DAG 数据,和框架中代码的输入方式差别并不是很大。

接下来我们要做的是遍历执行整个 workflow,这边和框架的差别也不大。首先我们可以利用select * from task where workflow_id = 1 and task_parents = -1来获取初始化节点 Task A 和 Task C,将其提交到我们的线程池中。

接着对应框架代码中的doExecute(processedNode.getChildren());,我们使用select * from task where task_parents like %3%,就可以得到 Task C 的孩子节点 Task D,这里使用了模糊查询是因为我们的 task_parents 可能是由多个父亲的 taskId 与分隔号组合而成的字符串。查询到孩子节点后,继续提交到线程池即可。

别忘了我们在 Task D 这边还有一个并发等待的操作,对应框架代码中的if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents()))。这边我们只要判断select count(1) from task where task_id in (2,3) and status != 1的个数为 0 即可,即保证 parents task 全部成功。

另外值得注意的是 task 的重试。在框架中,失败 task 的重试可以是立即使用当前线程重试或者放到一个定时线程池中去重试。而在平台上,我们的重试基本上来自于用户在界面上的点击,即主线程。

至此,我们已经将任务编排框架的功能基本平台化了。作为一个任务编排平台,可拖拽编排的可视化输入、整个工作流状态的可视化展示、任务的可人工重试都是其优点。

扫描二维码推送至手机访问。

版权声明:本文由西安泽虎代运营发布,如需转载请注明出处。

转载请注明出处https://www.0291.com.cn/post/57148.html

相关文章

全渠道新零售模式或新零售营销解决方案的落实离不开以下 ...

全渠道新零售模式或新零售营销解决方案的落实离不开以下 ...

纯的时代已经过去,时代已经到来,未来零售将是智慧新零售,现在传统制造业都纷纷进入新零售。线上的企业走到线下,线下的企业走到线上,线上线下加上现代物流,才能实现真正的新零售。能做到无须排队、无须人工结账、无须人工售货、无收营业员才是未来新零售门店该有的样子。七件事集团-案例新...

蜂蜜类农产品的电商品牌软文营销推广要注意什么?

蜂蜜类农产品的电商品牌软文营销推广要注意什么?

简单的来说就是通过第三方或者是相关的评论,以采访或者是其他的一些方式来撰写文章,用事实来给文章造成一个新闻的效应,通过这种方式能达到宣传的目的和的效果,那么软文营销相对于传统的硬广告来说成本是比较低的,而且他们追求的是春风化雨或者是润物细无声的传播效果,相对来说在进行软文营...

林郑月娥:2021年是“激励人心的一年”社会增加正能量

林郑月娥:2021年是“激励人心的一年”社会增加正能量

新华社香港12月30日电(记者韦骅)“2021年可以说是激励人心的一年,社会多了很多正能量。”30日,在回顾香港特区政府2021年整体工作的时候,特区行政长官林郑月娥如是说。在媒体见面会上,林郑月娥回顾总结了2021年特区政府各项工作,并展望了2022年的工作。“2021年...

网络优化有哪些推广策略。

网络优化有哪些推广策略。

对于很多企业来说,网站SEO营销是当前社会中绝不可缺少的营销推广方式之一,如今市场上企业做一个网站都比较轻松,但要做好SEO关键词优化,这就有一定的难度了,而今需要站长们有一定的推广策略和技巧,那网络优化有哪些推广策略? 1、前期分析 正所谓\"万里长征第一步\"是最难的,网站建设也是...

2021年新手还能做网店吗,2021年开网店还来得及吗(2021淘宝C店还能做吗)

2021年新手还能做网店吗,2021年开网店还来得及吗(2021淘宝C店还能做吗)

2021年做网店还是可以的,虽然疫情影响了很多行业,但是,却让电商行业更加盛行了。网购人群又大幅度增加了,现在加入开网店当然是可行的,不过,能不能赚钱,还是要看个人的能力。...

沈亮数字能量学:绝命磁场详细解读

文/沈亮数字能量学,作者/沈亮,数字能量学高级分析师,易经数字风水调运师,专研数字能量学,善于手机号码测吉凶,选配吉祥开运手机号码。绝命磁场属金,一般称为绝命金,磁场数字:12-21,69-96,84-48,37-73优点:反应快 对事物敏感 心肠软 心善 直肠子解释:所以...

现在,非常期待与您的又一次邂逅

我们努力让每一部企业宣传片和抖音短视频成为商业大片