赛尔校园公共服务平台 Logo
平台使用
阿里云
百度云
移动云
智算服务
教育生态
登录 →
赛尔校园公共服务平台 Logo
平台使用 阿里云 百度云 移动云 智算服务 教育生态
登录
  1. 首页
  2. 阿里云
  3. 日志服务
  4. 开发参考
  5. 日志服务SDK
  6. Java SDK
  7. 消费管理
  8. 使用Java SDK管理消费组

使用Java SDK管理消费组

  • 消费管理
  • 发布于 2025-04-22
  • 0 次阅读
文档编辑
文档编辑

通过消费组(ConsumerGroup)消费日志数据有显著优点,您无需关注日志服务的实现细节和消费者之间的负载均衡、Failover等,只需关注业务逻辑。本文通过代码示例介绍如何创建、修改、查询、删除消费组等。

前提条件

  • 已开通日志服务。更多信息,请参见开通日志服务。

  • 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。

  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。

    重要
    • 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

  • 已安装日志服务Java SDK。具体操作,请参见安装Java SDK。

  • 已创建Project、标准型Logstore并完成日志采集。具体操作,请参见创建项目Project、创建Logstore和数据采集概述。

  • 查询日志前,已配置索引。具体操作,请参见创建索引。

注意事项

本示例以华东1(杭州)的公网Endpoint为例,其公网Endpoint为https://cn-hangzhou.log.aliyuncs.com。如果您通过与Project同地域的其他阿里云产品访问日志服务,请使用内网Endpointhttps://cn-hangzhou-intranet.log.aliyuncs.com。关于日志服务支持的地域与Endpoint的对应关系,请参见服务入口。

创建消费组示例代码

以下代码用于创建名为ali-test-consumergroup的消费组。

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;

public class CreateConsumerGroup {
    public static void main(String[] args) throws LogException {
         // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // 输入Project名称。
        String projectName = "ali-test-project";
        // 输入Logstore名称。
        String logstoreName = "ali-test-logstore";
        // 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // 创建日志服务Client。
        Client client = new Client(host, accessId, accessKey);

        try {
            // 设置消费组名称。
            String consumerGroupName = "ali-test-consumergroup2";
            System.out.println("ready to create consumergroup");

            ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);

            client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);

            System.out.println(String.format("create consumergroup %s success", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

预期结果如下:

ready to create consumergroup
create consumergroup ali-test-consumergroup success

修改消费组示例代码

以下代码用于修改名为ali-test-consumergroup的消费组信息。

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;

public class UpdateConsumerGroup {
    public static void main(String[] args) throws LogException {
         // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // 输入Project名称。
        String projectName = "ali-test-project";
        // 输入Logstore名称。
        String logstoreName = "ali-test-logstore";
        // 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // 创建日志服务Client。
        Client client = new Client(host, accessId, accessKey);

        try {
            String consumerGroupName = "ali-test-consumergroup";
            System.out.println("ready to update consumergroup");

            // 修改消费组超时时间为350秒。
            client.UpdateConsumerGroup(projectName, logstoreName, consumerGroupName, false, 350);

            System.out.println(String.format("update consumergroup %s success", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

预期结果如下:

ready to update consumergroup
update consumergroup ali-test-consumergroup success

查询所有消费组示例代码

以下代码用于查询指定Logstore的所有消费组。

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.ListConsumerGroupResponse;

public class ListConsumerGroup {
    public static void main(String[] args) throws LogException {
         // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // 输入Project名称。
        String projectName = "ali-test-project";
        // 输入Logstore名称。
        String logstoreName = "ali-test-logstore";
        // 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // 创建日志服务Client。
        Client client = new Client(host, accessId, accessKey);

        try {
            System.out.println("ready to list consumergroup");

            // 查询指定Logstore的所有消费组。
            ListConsumerGroupResponse response = client.ListConsumerGroup(projectName,logstoreName);

            for(ConsumerGroup consumerGroup : response.GetConsumerGroups()){
                System.out.println("ConsumerName is : " + consumerGroup.getConsumerGroupName());
            }

            System.out.println(String.format("list consumergroup from %s success",projectName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

预期结果如下:

ready to list consumergroup
ConsumerName is : ali-test-consumergroup2
ConsumerName is : ali-test-consumergroup
list consumergroup from ali-test-project success

删除消费组示例代码

以下代码用于删除目标Project下的消费组。

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;

public class DeleteConsumerGroup {
    public static void main(String[] args) throws LogException {
         // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // 输入Project名称。
        String projectName = "ali-test-project";
        // 输入Logstore名称。
        String logstoreName = "ali-test-logstore";
        // 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // 创建日志服务Client。
        Client client = new Client(host, accessId, accessKey);

        try {
            String consumerGroupName = "ali-test-consumergroup";
            System.out.println("ready to delete consumergroup");

            // 删除一个指定的消费组。
            client.DeleteConsumerGroup(projectName,logstoreName,consumerGroupName);

            System.out.println(String.format("delete consumergroup %s success",consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

预期结果如下:

ready to delete consumergroup
delete consumergroup ali-test-consumergroup success

获取消费组Checkpoint示例代码

以下代码用于获取指定消费组的Checkpoint。

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.GetCheckPointResponse;

public class GetCheckPoint {
    public static void main(String[] args) throws LogException {
         // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // 输入Project名称。
        String projectName = "ali-test-project";
        // 输入Logstore名称。
        String logstoreName = "ali-test-logstore";
        // 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // 创建日志服务Client。
        Client client = new Client(host, accessId, accessKey);

        try {
            String consumerGroupName = "consumerGroupX";
            System.out.println("ready to get consumergroup checkpoint");

            // 获取指定消费组中Shard的CheckPoint。
            GetCheckPointResponse response1 = client.getCheckpoint(projectName,logstoreName,consumerGroupName,0);
            GetCheckPointResponse response2 = client.getCheckpoint(projectName,logstoreName,consumerGroupName,1);
            System.out.println("The checkpoint of Shard 0 is : " + response1.getCheckpoint());
            System.out.println("The checkpoint of Shard 1 is : " + response2.getCheckpoint());

            System.out.println(String.format("get consumergroup %s checkpoint success",consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

预期结果如下:

ready to get consumergroup checkpoint
The checkpoint of Shard 0 is : ConsumerGroupShardCheckPoint [shard=0, checkPoint=MTY2NzgxMDc0Nzk5MDk5MzAyMg==, updateTime=1668750821709044, consumer=consumer_1]
The checkpoint of Shard 1 is : ConsumerGroupShardCheckPoint [shard=1, checkPoint=MTY2NzgxMDc0Nzk5MTk0NTU0NQ==, updateTime=1668750828790425, consumer=consumer_1]
get consumergroup consumerGroupX checkpoint success

更新指定消费组Checkpoint示例代码

以下代码用于更新指定消费组的Checkpoint。

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.ListConsumerGroupResponse;

public class ConsumerGroupUpdateCheckpoint {
    public static void main(String[] args) throws LogException {
         // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // 输入Project名称。
        String projectName = "ali-test-project";
        // 输入Logstore名称。
        String logstoreName = "ali-test-logstore";
        // 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // 创建日志服务Client。
        Client client = new Client(host, accessId, accessKey);

        try {
            String consumerGroupName = "consumerGroupX";
            System.out.println("ready to update checkpoint");

            // 查询指定Logstore的所有消费组。
            ListConsumerGroupResponse response = client.ListConsumerGroup(projectName, logstoreName);

            for(ConsumerGroup consumerGroup : response.GetConsumerGroups()){
                System.out.println("ConsumerName is : " + consumerGroup.getConsumerGroupName());
                System.out.println("Consumer order is : " + consumerGroup.isInOrder());
            }

            // 更新Shard 0的Checkpoint。您可以通过GetCursor接口获取对应时间的Cursor。
            client.UpdateCheckPoint(projectName, logstoreName, consumerGroupName, 0, "MTY2NzgxMDc0Nzk5MTAwNjQ3Mg==");
            System.out.println(String.format("update checkpoint of %s success", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

预期结果如下:

ready to update checkpoint
ConsumerName is : consumerGroupX
Consumer order is : false
ConsumerName is : ali-test-consumergroup2
Consumer order is : true
ConsumerName is : ali-test-consumergroup
Consumer order is : false
update consumergroup checkpoint is:consumerGroupX
update checkpoint of consumerGroupX success

相关文档

  • 在调用API接口过程中,若服务端返回结果中包含错误信息,则表示调用API接口失败。您可以参考API错误码对照表查找对应的解决方法。更多信息,请参见API错误处理对照表。

  • 阿里云OpenAPI开发者门户提供调试、SDK、示例和配套文档。通过OpenAPI,您无需手动封装请求和签名操作,就可以快速对日志服务API进行调试。更多信息,请参见OpenAPI开发者门户。

  • 为满足越来越多的自动化日志服务配置需求,日志服务提供命令行工具CLI(Command Line Interface)。更多信息,请参见日志服务命令行工具CLI。

  • 关于消费组API接口说明,请参见如下:

    • CreateConsumerGroup - 创建消费组

    • DeleteConsumerGroup - 删除消费组

    • UpdateConsumerGroup - 更新消费者组

    • ListConsumerGroup - 查询消费组

    • GetCheckPoint - 获取指定消费组的消费点

  • 更多示例代码,请参见Aliyun Log Java SDK on GitHub。

相关文章

使用Java SDK管理消费组 2025-04-22 10:35

通过消费组(ConsumerGroup)消费日志数据有显著优点,您无需关注日志服务的实现细节和消费者之间的负载均衡、Failover等,只需关注业务逻辑。本文通过代码示例介绍如何创建、修改、查询、删除消费组等。 前提条件

通过API消费 2025-04-22 10:35

日志服务提供多语言SDK,而且都支持日志服务消费接口。本文介绍普通消费日志的SDK示例及控制台的消费预览功能。 前提条件

通过消费组消费数据 2025-04-22 10:35

当您使用第三方软件、多语言应用、云产品、流式计算框架等通过SDK实时消费日志服务的数据时,SDK消费无法满足日志服务的实现细节及消费者之间的负载均衡、故障转移(Failover)等,您可以通过消费组(ConsumerGroup)消费日志,消费组(ConsumerGroup)消费的实时性较强,通常为秒

目录
Copyright © 2025 your company All Rights Reserved. Powered by 赛尔网络.
京ICP备14022346号-15
gongan beian 京公网安备11010802041014号