赛尔校园公共服务平台 Logo
平台使用
阿里云
百度云
移动云
智算服务
教育生态
登录 →
赛尔校园公共服务平台 Logo
平台使用 阿里云 百度云 移动云 智算服务 教育生态
登录
  1. 首页
  2. 阿里云
  3. 分布式云容器平台ACK One
  4. 实践教程
  5. 分布式工作流Argo集群最佳实践
  6. 使用Argo Workflow编排动态DAG Fan-outFan-in任务

使用Argo Workflow编排动态DAG Fan-outFan-in任务

  • 分布式工作流Argo集群最佳实践
  • 发布于 2025-04-18
  • 1 次阅读
文档编辑
文档编辑

在工作流编排过程中,为了加快大任务处理的速度,可以使用Fan-out Fan-in任务编排,将大任务分解成小任务,然后并行运行小任务,最后聚合结果。分布式工作流Argo集群(简称工作流集群)支持动态DAG方式编排Fan-out Fan-in任务,可按需调度云上算力、利用云上弹性可调用数万核CPU资源,减少运行时间,运行结束后能够及时回收资源节省成本。本文为您介绍如何使用工作流集群的Argo Workflow编排动态DAG Fan-out Fan-in任务。

背景信息

Fan-out Fan-in

image

Fan-out和Fan-in常用于构建高效的并发处理流程,通过拆分(Fan-out)和聚合(Fan-in)操作,能够充分利用多核、多机资源,实现大规模数据的高效处理。

如上图所示,工作流编排过程中,可以使用DAG(有向无环图)编排Fan-out Fan-in任务。子任务的拆分方式分为有静态(静态DAG)和动态(动态DAG)。

  • 静态DAG:拆分的子任务分类是固定的。例如:在数据收集场景中,同时收集数据库1和数据库2中的数据,最后聚合结果。

  • 动态DAG:拆分的子任务分类是动态的,取决于前一个任务的输出结果。

    如上图所示,在数据处理场景中,任务A可以扫描待处理的数据集,为每个子数据集(例如:一个子目录)启动子任务Bn处理,当所有子任务Bn运行结束后,在子任务C中聚合结果。具体启动多少个子任务B取决于任务A的输出结果,根据实际的业务场景,可以在任务A中自定义子任务的拆分规则。

ACK One分布式工作流Argo集群

在实际业务场景中,为了提升大任务的执行效率,往往需要将一个大任务拆分成数千个子任务,为了保证这数千个子任务的同时运行,需要调度数万核的CPU资源。叠加多任务需要竞争资源,一般IDC离线任务难以满足需求。

例如:自动驾驶仿真任务,修改算法后进行回归测试,需要对所有驾驶场景进行仿真,每个驾驶场景为一个子任务运行,研发团队为了提高迭代速度,会要求所有子场景测试并行执行。

基于以上业务场景,您可以使用ACK One分布式工作流Argo集群编排工作流,工作流集群支持托管Argo Workflow,提供完善的售后技术支持,可通过动态DAG方式编排Fan-out Fan-in任务,支持弹调度云上算力,调度数万核CPU资源支撑大规模子任务的并行运行。任务运行结束时能够及时回收资源节省成本,一般可用于数据处理、机器学习、仿真计算、CI/CD等业务。

Argo Workflow是开源CNCF毕业项目,聚焦云原生领域下的工作流编排,使用Kubernetes CRD编排离线任务和DAG工作流,并使用Kubernetes Pod在集群中调度运行。更多信息,请参见Argo Workflow。

使用Argo Workflow编排Fan-out Fan-in任务

任务背景

本文以如下任务为例,为您介绍具体操作流程。

构建一个一个动态DAG Fan-out Fan-in工作流,读取阿里云OSS对象存储中的一个大日志文件,将其拆分为多个小文件(split),启动多个子任务分别计算每个小文件中的关键词数量(count),最后聚合结果(merge)。

操作流程

  1. 创建分布式工作流Argo集群。

  2. 挂载阿里云OSS存储卷,以便工作流可以像操作本地文件一样操作OSS上的文件。

    具体操作,请参见使用存储卷。

  3. 使用以下YAML创建一个工作流。

    具体操作,请参见创建工作流。

    展开查看YAML示例

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: dynamic-dag-map-reduce-
    spec:
      entrypoint: main
      # claim a OSS PVC, workflow can read/write file in OSS through PVC. 
      volumes:
        - name: workdir
          persistentVolumeClaim:
            claimName: pvc-oss
      # how many tasks to split, default is 5.
      arguments:
        parameters:
          - name: numParts
            value: "5"
      templates:
        - name: main
          # DAG definition.
          dag:
            tasks:
              # split log files to several small files, based on numParts.
              - name: split
                template: split
                arguments:
                  parameters:
                    - name: numParts
                      value: "{{workflow.parameters.numParts}}"
              # multiple map task to count words in each small file.
              - name: map
                template: map
                arguments:
                  parameters:
                    - name: partId
                      value: '{{item}}'
                depends: "split"
                # run as a loop, partId from split task json outputs.
                withParam: '{{tasks.split.outputs.result}}'
              - name: reduce
                template: reduce
                arguments:
                  parameters:
                    - name: numParts
                      value: "{{workflow.parameters.numParts}}"
                depends: "map"
        # The `split` task split the big log file to several small files. Each file has a unique ID (partId).
        # Finally, it dumps a list of partId to stdout as output parameters
        - name: split
          inputs:
            parameters:
              - name: numParts
          container:
            image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
            command: [python]
            args: ["split.py"]
            env:
            - name: NUM_PARTS
              value: "{{inputs.parameters.numParts}}"
            volumeMounts:
            - name: workdir
              mountPath: /mnt/vol
        # One `map` per partID is started. Finds its own "part file" and processes it.
        - name: map
          inputs:
            parameters:
              - name: partId
          container:
            image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
            command: [python]
            args: ["count.py"]
            env:
            - name: PART_ID
              value: "{{inputs.parameters.partId}}"
            volumeMounts:
            - name: workdir
              mountPath: /mnt/vol
        # The `reduce` task takes the "results directory" and returns a single result.
        - name: reduce
          inputs:
            parameters:
              - name: numParts
          container:
            image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
            command: [python]
            args: ["merge.py"]
            env:
            - name: NUM_PARTS
              value: "{{inputs.parameters.numParts}}"
            volumeMounts:
            - name: workdir
              mountPath: /mnt/vol
          outputs:
            artifacts:
              - name: result
                path: /mnt/vol/result.json
  4. 使用动态DAG方式实现Fan-out Fan-in编排任务。

    1. 大文件拆分为多个split子任务后,会在标准输出中输出一个JSON字符串,包含子任务要处理的partId,例如:

      ["0", "1", "2", "3", "4"]
    2. map任务使用withParam引用split任务的输出,并解析JSON字符串获得所有{{item}},并使用每个{{item}}作为输入参数启动多个map任务。

                - name: map
                  template: map
                  arguments:
                    parameters:
                      - name: partId
                        value: '{{item}}'
                  depends: "split"
                  withParam: '{{tasks.split.outputs.result}}'

    更多定义方式,请参见开源Argo Workflow。

  5. 工作流运行后,您可以在ACK One工作流集群控制台查看任务DAG流程与运行结果。

    image

  6. 阿里云OSS文件列表中,log-count-data.txt为输入日志文件,split-output,cout-output为中间结果目录,result.json为最终结果文件。

    image

示例源代码参考

argo-workflow-examples

相关参考

  • 了解阿里云分布式工作流Argo集群的详细功能,请参见ACK One概述。

  • 了解Argo Workflow的具体内容,请参见开源Argo Workflow。

  • 如果您对于ACK One有任何反馈或疑问,请加入钉群(钉群号:35688562)联系我们。

相关文章

迁移Batch批量计算到分布式工作流Argo集群 2025-04-18 18:06

批处理作业(Batch)通常用于数据处理、仿真计算、科学计算等领域,往往需要大规模的计算资源。分布式工作流Argo集群基于开源Argo Workflows项目开发,完全符合开源工作流标准。通过工作流集群,您可以轻松编排工作流,每个工作流步骤使用容器运行,可以在短时间内轻松运行大规模机器学习、仿真计算

使用Argo Workflow编排动态DAG Fan-outFan-in任务 2025-04-18 18:06

在工作流编排过程中,为了加快大任务处理的速度,可以使用Fan-out Fan-in任务编排,将大任务分解成小任务,然后并行运行小任务,最后聚合结果。分布式工作流Argo集群(简称工作流集群)支持动态DAG方式编排Fan-out Fan-in任务,可按需调度云上算力、利用云上弹性可调用数万核CPU资源

使用Argo Workflows编排基因计算工作流 2025-04-18 18:06

在基因计算这一高度复杂且数据密集型的领域,科研人员和生物信息分析师面临着严峻挑战,这不仅体现在数据量的爆炸性增长上,还在于需要高效、准确地整合和分析数据,以揭示生命的奥秘。为应对这些挑战,工作流自动化编排成为关键技术,其中,Argo Workflows以其容器化、灵活性和易用性脱颖而出,成为串联基因

使用Argo Workflows安全高效管理文件 2025-04-18 18:06

工作流集群是一个全托管的Argo服务,专注于高效安全的文件管理,并提供了一些增强功能。它在批处理、数据处理和持续集成等场景中比标准的Argo Workflows更具优势。本文将介绍工作流集群如何实现高效安全的文件管理。 复杂工作流编排的存储难题 Argo Workflows<

使用Python SDK构建大规模Argo Workflows 2025-04-18 18:06

Argo Workflows是一个强大的工作流管理工具,广泛应用于定时任务、机器学习和ETL数据处理等场景,但是使用YAML定义工作流程可能会增加学习难度。Hera Python SDK提供了一种简洁易用的替代方案,Hera允许用户以Python代码构建工作流,支持复杂任务,易于测试,并与Pytho

基于EventBridge的事件驱动CI Pipeline 2025-04-18 18:06

基于事件总线EventBridge和分布式工作流Argo Workflows,可以构建高效、快速、低成本的事件驱动自动化CI Pipeline,大幅简化和加速应用交付过程。本文介绍如何构建基于事件驱动的自动化CI Pipeline流程。 前提条件

目录
Copyright © 2025 your company All Rights Reserved. Powered by 赛尔网络.
京ICP备14022346号-15
gongan beian 京公网安备11010802041014号