赛尔校园公共服务平台 Logo
平台使用
阿里云
百度云
移动云
智算服务
教育生态
登录 →
赛尔校园公共服务平台 Logo
平台使用 阿里云 百度云 移动云 智算服务 教育生态
登录
  1. 首页
  2. 阿里云
  3. 对象存储
  4. 实践教程
  5. 数据湖
  6. 阿里云生态
  7. 使用Flume同步EMR Kafka集群的数据至OSS-HDFS服务

使用Flume同步EMR Kafka集群的数据至OSS-HDFS服务

  • 阿里云生态
  • 发布于 2025-04-21
  • 0 次阅读
文档编辑
文档编辑

本文为您介绍如何使用Flume同步EMR Kafka集群的数据至阿里云OSS-HDFS服务。

前提条件

  • 已开通并授权访问OSS-HDFS服务。具体操作,请参见开通并授权访问OSS-HDFS服务。

  • 已创建DataLake集群,并选择了Flume服务。具体操作,请参见创建集群。

  • 已创建DataFlow集群,并选择了Kafka服务。具体操作,请参见创建集群。

操作步骤

  1. 配置Flume。

    1. 进入Flume的配置页面。

      1. 登录E-MapReduce控制台。

      2. 在顶部菜单栏处,根据实际情况选择地域和资源组。

      3. 在EMR on ECS页面,单击目标集群操作列的集群服务。

      4. 在集群服务页签,单击FLUME服务区域的配置。

    2. 设置JVM最大可用内存(Xmx)。

      Flume向OSS-HDFS写入数据时需要占用较大的JVM内存,建议增加Flume Agent的Xmx。具体步骤如下:

      1. 单击flume-env.sh页签。

        本文采用了全局配置方式。如果您希望按照节点配置,可以在FLUME服务配置页面的下拉列表中选择独立节点配置。

      2. 修改JAVA_OPTS的参数值。

        例如,JVM最大可用内存设置为1 GB,则参数值修改为-Xmx1g。

      3. 单击保存。

    3. 修改flume-conf.properties配置。

      1. 单击flume-conf.properties页签。

        本文采用了全局配置方式。如果您希望按照节点配置,可以在FLUME服务配置页面的下拉列表中选择独立节点配置。

      2. 在flume-conf.properties右侧,输入以下配置项。

        说明

        以下示例中的default-agent的值需与FLUME服务配置页面的agent_name参数值保持一致。

        default-agent.sources = source1
        default-agent.sinks = k1
        default-agent.channels = c1
        
        default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
        default-agent.sources.source1.channels = c1
        default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
        default-agent.sources.source1.kafka.topics = flume-test
        default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
        
        default-agent.sinks.k1.type = hdfs
        default-agent.sinks.k1.hdfs.path = oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>
        default-agent.sinks.k1.hdfs.fileType=DataStream
        
        # Use a channel which buffers events in memory
        default-agent.channels.c1.type = memory
        default-agent.channels.c1.capacity = 100
        default-agent.channels.c1.transactionCapacity = 100
        
        # Bind the source and sink to the channel
        default-agent.sources.source1.channels = c1
        default-agent.sinks.k1.channel = c1

        参数

        描述

        default-agent.sources.source1.kafka.bootstrap.servers

        Kafka集群Broker的Host和端口号。

        default-agent.sinks.k1.hdfs.path

        OSS-HDFS的路径。填写格式为oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>。示例值为oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result。

        各参数说明如下:

        • <examplebucket>:填写已开启OSS-HDFS服务的Bucket名称。

        • <exampleregion>:填写Bucket所在的地域ID。

        • <exampledir>:填写OSS-HDFS服务的目录名称。

        default-agent.channels.c1.capacity

        通道中存储的最大事件数。请根据实际环境修改该参数值。

        default-agent.channels.c1.transactionCapacity

        每个事务通道将从源接收或提供给接收器的最大事件数。请根据实际环境修改该参数值。

      3. 单击保存。

  2. 测试数据同步情况。

    1. 通过SSH方式连接DataFlow集群,详情请参见登录集群。
    2. 创建名称为flume-test的Topic。
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. 生成测试数据。

      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      例如,输入abc并回车。

      在oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result路径下会以当前时间的时间戳(毫秒)为后缀生成格式为FlumeData.xxxx的文件。

相关文章

结合SLS分析OSS-HDFS服务热点访问数据 2025-04-21 17:13

为了有效地管理和分析OSS-HDFS服务的审计日志,您需要将审计日志导入日志服务SLS,然后通过SLS内置的查询分析能力对收集的审计日志进行分析,获取OSS-HDFS服务不同时间段内的访问量变化,分析频繁访问的数据、检测异常访问行为等信息。本教程用于演示如何通过SLS分析OSS-HDFS服务热点访问

通过XIHE SQL或者Spark SQL访问OSS数据 2025-04-21 17:13

在EMR Hive或Spark中访问OSS-HDFS 2025-04-21 17:13

EMR-3.42及后续版本或EMR-5.8.0及后续版本的集群,支持OSS-HDFS(JindoFS服务)作为数据存储,提供缓存加速服务和Ranger鉴权功能,使得在Hive或Spark等大数据ETL场景将获得更好的性能和HDFS平迁能力。本文为您介绍E-MapReduce(简称EMR)Hive或S

实时计算Flink读写OSS或者OSS-HDFS 2025-04-21 17:13

阿里云实时计算Flink支持通过连接器读写OSS以及OSS-HDFS数据。通过配置OSS或者OSS-HDFS连接器的输入属性,实时计算Flink会自动从指定的路径读取数据,并将其作为实时计算Flink的输入流,然后将计算结果按照指定格式写入到OSS或者OSS-HDFS的指定路径。 前提条件 已开通F

EMR Flink写入OSS-HDFS服务 2025-04-21 17:13

可恢复性写入功能支持将数据以EXACTLY_ONCE语义写入存储介质。本文介绍Flink如何通过EMR集群的方式可恢复性写入OSS-HDFS服务。 前提条件

使用Flume同步EMR Kafka集群的数据至OSS-HDFS服务 2025-04-21 17:13

本文为您介绍如何使用Flume同步EMR Kafka集群的数据至阿里云OSS-HDFS服务。 前提条件 <

目录
Copyright © 2025 your company All Rights Reserved. Powered by 赛尔网络.
京ICP备14022346号-15
gongan beian 京公网安备11010802041014号