赛尔校园公共服务平台 Logo
平台使用
阿里云
百度云
移动云
智算服务
教育生态
登录 →
赛尔校园公共服务平台 Logo
平台使用 阿里云 百度云 移动云 智算服务 教育生态
登录
  1. 首页
  2. 阿里云
  3. 分布式云容器平台ACK One
  4. 操作指南
  5. 分布式工作流Argo集群
  6. 事件驱动
  7. 通过向OSS上传文件触发工作流

通过向OSS上传文件触发工作流

  • 事件驱动
  • 发布于 2025-04-18
  • 0 次阅读
文档编辑
文档编辑

本文介绍如何集成阿里云对象存储OSS与阿里云轻量消息队列(原 MNS),通过将数据上传文件至对象存储OSS中,自动触发工作流运行文件,并生成结果。

前提条件

  • 已开通以下服务与功能。

    • 开启事件驱动功能。具体操作,请参见开启事件驱动功能。

    • 开通轻量消息队列(原 MNS)。该功能会涉及轻量消息队列(原 MNS)相关计费,具体收费情况,请参见计费说明。

    • 开通对象存储OSS。该功能会涉及OSS相关计费,具体收费情况,请参见计费概述。

  • 已创建工作流集群并下载阿里云Argo CLI。

    • 创建工作流集群,请参见创建工作流集群。

    • 下载并安装阿里云Argo CLI,请参见阿里云Argo CLI。

    • 创建资源入口:连接工作流集群后,在本地终端操作。具体操作,请参见获取集群KubeConfig并通过kubectl工具连接集群。

步骤一:配置OSS Bucket事件通知

  1. 登录OSS管理控制台。

  2. 单击Bucket 列表,创建OSS Bucket或者选择已有Bucket。如需创建,请参见创建存储空间。

  3. 在文件管理页面,选择数据处理 > 事件通知,单击创建规则,完成相关参数配置后,单击确定。如需自定义规则,请参见通过事件通知实时处理OSS文件变动。

    配置项

    示例

    规则名称

    upload-complete

    事件类型

    PutObject, PostObject

    资源描述

    选择前后缀,并设置后缀为complete,即上传以.complete结尾的文件,触发事件后运行工作流。

    接收终端

    选择队列,名称设置为oss-event-queue。

    规则创建后,在轻量消息队列(原 MNS)中会自动创建一个主题与之对应。

  4. 登录轻量消息队列(原 MNS)控制台。

  5. 单击队列列表,创建队列oss-event-queue,创建方法,请参见创建队列。创建完成后,在队列详情页面的接入点区域获取Endpoint。

    重要

    创建队列的名称需要与步骤3中配置的接收终端的队列名称保持一致。

步骤二:创建Event Bus

Event Bus可以被命名空间中的事件驱动工作流共享。如果已经创建,请执行步骤三:创建Event Source。

方式一:使用NATS

  1. 创建event-bus.yaml文件。Event Bus示例代码如下所示:

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      nats:
        native:
          replicas: 3
          auth: token
  2. 执行以下命令,创建EventBus。

    kubectl apply -f event-bus.yaml
    说明

    命令执行成功后,会在default命名空间下创建Event Bus Pod。后续操作需在同一命名空间下。

  3. 执行以下命令,查看Event Bus Pod是否正常启动。

    kubectl get pod

方式二:使用轻量消息队列(原 MNS)

  1. 登录轻量消息队列(原 MNS)控制台。

  2. 在主题列表页面创建主题argoeventbus,并在主题详情页面的接入点区域获取Endpoint。

  3. 使用RAM管理员登录RAM控制台。

  4. 创建RAM用户,授权AliyunMNSFullAccess,并获取RAM用户的AK和SK。

  5. 执行以下命令,创建Secret用于存储AK和SK。

    kubectl create secret generic mns-secret\
      --from-literal=accesskey=*** \
      --from-literal=secretkey=***
  6. 创建event-bus-mns.yaml文件,EventBus示例代码如下所示:

    • topic:需替换为2中创建的轻量消息队列(原 MNS)中的主题名称。

    • endpoint:需替换为2中获取的Endpoint。

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      alimns:
        accessKey:
          key: accesskey
          name: mns-secret
        secretKey:
          key: secretkey
          name: mns-secret
        topic: argoeventbus  # 对应轻量消息队列(原 MNS)中的主题名称。
        endpoint: http://165***368.mns.<region>.aliyuncs.com
  7. 执行以下命令,创建event-bus-mns.yaml。

    kubectl apply -f event-bus.yaml
说明
  • 使用轻量消息队列(原 MNS)方式创建Event Bus时,不会创建Pod。

  • 如需使用Trigger功能,请使用NATS方式创建EventBus。目前轻量消息队列(原 MNS)方式不支持开源Argo Event的Sensor Trigger。

步骤三:创建Event Source

  1. 使用RAM管理员登录RAM控制台。

  2. 创建RAM用户,为其授予AliyunMNSFullAccess权限,并获取RAM用户的AK和SK。具体操作,请参见创建RAM用户、为RAM用户授权、创建AccessKey和查看RAM用户的AccessKey信息。

  3. 执行以下命令,创建Secret用于存储AK和SK。

    kubectl create secret generic mns-secret\
     --from-literal=accesskey=*** \
     --from-literal=secretkey=***
  4. 创建event-source.yaml文件,Event Source示例代码如下所示:

    • queue:需替换为步骤5中创建的轻量消息队列(原 MNS)名称。

    • endpoint:需替换为步骤5中获取的Endpoint。

    apiVersion: argoproj.io/v1alpha1
    kind: EventSource
    metadata:
      name: ali-mns
    spec:
      mns:
        example:
          jsonBody: true
          accessKey:
            key: accesskey
            name: mns-secret
          secretKey:
            key: secretkey
            name: mns-secret
          queue: oss-event-queue # 步骤一中创建的轻量消息队列(原 MNS)名称。
          waitTimeSeconds: 20
          endpoint: http://165***368.mns.<region>.aliyuncs.com # 步骤一中创建的轻量消息队列(原 MNS)接入点。
  5. 执行以下命令,创建Event Source。

    kubectl apply -f event-source.yaml
  6. 执行以下命令,查看Event Source Pod是否正常启动。

    kubectl get pod

步骤四:创建Event Sensor

  1. 创建event-sensor.yaml文件,在Event Sensor中嵌入待执行的工作流定义。Event Sensor示例代码如下所示:

    展开查看示例代码

    apiVersion: argoproj.io/v1alpha1
    kind: Sensor
    metadata:
      name: process-oss-file
    spec:
      template:
        serviceAccountName: default
      dependencies:
        - name: dep1
          eventSourceName: ali-mns
          eventName: example
      triggers:
        - template:
            name: process-oss-file-workflow
            k8s:
              operation: create
              source:
                resource:
                  apiVersion: argoproj.io/v1alpha1
                  kind: Workflow
                  metadata:
                    generateName: process-oss-file-
                    namespaces: default
                  spec:
                    entrypoint: process-oss-file
                    volumes:
                    - name: workdir
                      persistentVolumeClaim:
                        claimName: pvc-oss
                    arguments:
                      parameters:
                      - name: message
                        # this is the value that should be overridden
                        value: event message
                    templates:
                    - name: process-oss-file
                      steps:
                      - - name: parse-event-body
                          template: parse-event-body
                      - - name: process-file
                          template: process-file
                          arguments:
                            parameters:
                            - name: file-name
                              value: "{{steps.parse-event-body.outputs.parameters.file-name}}"
                    - name: parse-event-body
                      container:
                        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine
                        command: [sh,-c]
                        args:
                        - echo "Event body:";
                          echo {{workflow.parameters.message}} | base64 -d;
                          TriggerFileName=$(echo {{workflow.parameters.message}} | base64 -d | jq .events[0].oss.object.key | cut -c2- | rev | cut -c2- |rev);
                          echo "" && echo "TriggerFileName from event is $TriggerFileName";
                          Tmp=${TriggerFileName%%.complete} && DataFileName=${Tmp##*/};
                          echo "DataFileName after cutting .complete is $DataFileName, and pass file name to next step";
                          echo $DataFileName > /tmp/file-name.txt
                      outputs:
                        parameters:
                        - name: file-name
                          valueFrom:
                            path: /tmp/file-name.txt
                    - name: process-file
                      inputs:
                        parameters:
                          - name: file-name
                      container:
                        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine
                        imagePullPolicy: Always
                        command: [sh,-c]
                        args:
                        - echo "Show data-file:" && echo "";
                          ls -l /mnt/vol/{{inputs.parameters.file-name}};
                          echo "Content of data file:" && echo "";
                          cat /mnt/vol/{{inputs.parameters.file-name}} ;
                          echo "" && echo "Finished" ;
                        volumeMounts:
                        - name: workdir
                          mountPath: /mnt/vol
              parameters:
                - src:
                    dependencyName: dep1
                    dataKey: body
                  dest: spec.arguments.parameters.0.value
  2. 执行以下命令,创建Event Sensor。

    kubectl apply -f event-sensor.yaml
  3. 执行以下命令,查看Event Sensor Pod是否正常启动。

    kubectl get pod
说明

使用轻量消息队列(原 MNS)方式创建Eventbus时,在Event Sensor创建完成后,会自动创建一个轻量消息队列(原 MNS)与之对应,队列命名格式为:ackone-argowf-<namespace>-<sensor-name>-<sensor-uid>。

步骤五:验证向OSS上传文件触发工作流

  1. 登录OSS管理控制台。

  2. 向步骤一:配置OSS Bucket事件通知中的OSS Bucket中上传以下2个文件(该文件需自备),触发工作流运行。

    • datafile:数据文件,文本格式,内容自定义。

    • datafile.complete:trigger文件,可以是空文件。

  3. 执行以下命令,在工作流集群中查看工作流运行情况。

    argo list

    预期输出如下:

    NAME STATUS AGE DURATION PRIORITY
    process-oss-file-kmb4k Running 13s 13s 0
  4. 执行以下命令,获取工作流日志,查看消息内容。

    argo logs process-oss-file-kmb4k
    重要
    • 该命令中的工作流名称必须和上一步骤中返回的工作流名称一致,ali-mns-workflow-5prz7仅为示例值,请您修改为实际环境中的返回值。

    • 消息内容使用Base64编码。

    预期输出如下:

    image.png

步骤六:清除Event相关资源

  1. 依次执行以下命令,清除Event相关资源。

    kubectl delete sensor process-oss-file
    kubectl delete eventsource ali-mns
    kubectl delete eventbus default
  2. 执行以下命令查看Pod,确认所有资源已清除。

    kubectl get pod

相关文章

开启事件驱动功能 2025-04-18 18:08

工作流集群支持事件驱动功能,可通过监控事件触发工作流自动运行,您可以使用该功能构建事件驱动的自动化系统。事件驱动支持各种事件源,包括阿里云对象存储OSS、阿里云轻量消息队列(原 MNS)、Git代码仓库,EventBrige等。 背景信息 <

通过轻量消息队列(原 MNS)触发工作流 2025-04-18 18:08

工作流集群支持集成阿里云轻量消息队列(原 MNS),利用轻量消息队列(原 MNS)作为中介接入丰富的事件源,利用事件驱动触发工作流运行。当有新的事件触发(如OSS事件触发、EventBridge事件触发)时,轻量消息队列(原 MNS)

通过向OSS上传文件触发工作流 2025-04-18 18:08

本文介绍如何集成阿里云对象存储OSS与阿里云轻量消息队列(原 MNS),通过将数据上传文件至对象存储OSS中,自动触发工作流运行文件,并生成结果。 前提条件 已开通以下服务与功能。

目录
Copyright © 2025 your company All Rights Reserved. Powered by 赛尔网络.
京ICP备14022346号-15
gongan beian 京公网安备11010802041014号