赛尔校园公共服务平台 Logo
平台使用
阿里云
百度云
移动云
智算服务
教育生态
登录 →
赛尔校园公共服务平台 Logo
平台使用 阿里云 百度云 移动云 智算服务 教育生态
登录
  1. 首页
  2. 阿里云
  3. 日志服务
  4. 开发参考
  5. 日志服务SDK
  6. Python SDK
  7. 使用Python SDK通过消费组消费日志

使用Python SDK通过消费组消费日志

  • Python SDK
  • 发布于 2025-04-22
  • 0 次阅读
文档编辑
文档编辑

通过消费组(ConsumerGroup)消费日志有显著优点,您无需关注日志服务的实现细节和消费者之间的负载均衡、Failover等,只需要专注于业务逻辑即可。本文通过Python代码介绍如何使用消费组消费日志。

工作流程

image

一个Logstore中包含多个Shard,通过消费组消费数据就是将Shard分配给一个消费组下面的消费者,分配方式遵循以下原则。

  • 在一个消费组中,一个Shard只会分配到一个消费者。

  • 在一个消费组中,一个消费者可以被分配多个Shard。

新的消费者加入消费组后,这个消费组下面的Shard从属关系会调整,以实现消费的负载均衡,但是仍遵循上述分配原则。

说明

通过消费组消费,程序发生故障时,会默认保存Checkpoint。在程序故障恢复时,能够从断点处继续消费,从而保证数据不会被重复消费。

前提条件

  • 已开通日志服务。更多信息,请参见开通日志服务。

  • 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。

  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。

    重要
    • 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

  • 已安装日志服务Python SDK。具体操作,请参见安装Python SDK。

  • 已创建Project。具体操作,请参见创建项目Project。

注意事项

本示例以华东1(杭州)的公网Endpoint为例,其公网Endpoint为https://cn-hangzhou.log.aliyuncs.com。如果您通过与Project同地域的其他阿里云产品访问日志服务,请使用内网Endpointhttps://cn-hangzhou-intranet.log.aliyuncs.com。关于日志服务支持的地域与Endpoint的对应关系,请参见服务入口。

代码示例

以下代码用于创建Logstore、为Logstore写入日志、创建消费组并消费日志。

import os
import time

from aliyun.log.consumer import *
from aliyun.log import *
from threading import RLock


class SampleConsumer(ConsumerProcessorBase):
    shard_id = -1
    last_check_time = 0
    log_results = []
    lock = RLock()

    def initialize(self, shard):
        self.shard_id = shard

    def process(self, log_groups, check_point_tracker):
        for log_group in log_groups.LogGroups:
            items = []
            for log in log_group.Logs:
                item = dict()
                item['time'] = log.Time
                for content in log.Contents:
                    item[content.Key] = content.Value
                items.append(item)
            log_items = dict()
            log_items['topic'] = log_group.Topic
            log_items['source'] = log_group.Source
            log_items['logs'] = items

            with SampleConsumer.lock:
                SampleConsumer.log_results.append(log_items)
                print(log_items)

        current_time = time.time()
        if current_time - self.last_check_time > 3:
            try:
                self.last_check_time = current_time
                check_point_tracker.save_check_point(True)
            except Exception:
                import traceback
                traceback.print_exc()
        else:
            try:
                check_point_tracker.save_check_point(False)
            except Exception:
                import traceback
                traceback.print_exc()

        # None means succesful process
        # if need to roll-back to previous checkpoint,return check_point_tracker.get_check_point()
        return None

    def shutdown(self, check_point_tracker):
        try:
            check_point_tracker.save_check_point(True)
        except Exception:
            import traceback
            traceback.print_exc()


test_item_count = 20

# 为Logstore写入日志。
def _prepare_data(client, project, logstore):
    topic = 'python-ide-test'
    source = ''

    for i in range(0, test_item_count):
        logitemList = []  # LogItem list

        contents = [
            ('user', 'magic_user_' + str(i)),
            ('avg', 'magic_age_' + str(i))
        ]
        logItem = LogItem()
        logItem.set_time(int(time.time()))
        logItem.set_contents(contents)

        logitemList.append(logItem)
        # 将日志写入Logstore。
        request = PutLogsRequest(project, logstore, topic, source, logitemList)

        response = client.put_logs(request)
        print("successfully put logs in logstore")


def sleep_until(seconds, exit_condition=None, expect_error=False):
    if not exit_condition:
        time.sleep(seconds)
        return

    s = time.time()
    while time.time() - s < seconds:
        try:
            if exit_condition():
                break
        except Exception:
            if expect_error:
                continue
        time.sleep(1)


def sample_consumer_group():
    # 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
    endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')

    # 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
    accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')

    # Project名称。本示例中,SDK不会创建Project,您需要在运行代码前创建该Project。
    project = os.environ.get('ALIYUN_LOG_SAMPLE_PROJECT', 'ali-test-project-python')

    # Logstore名称。本示例中,SDK会自动创建该Logstore,您无需提前创建该Logstore。
    logstore = 'ali-test-logstore'

    # 消费组名称。您无需提前创建,SDK会自动创建该消费组。
    consumer_group = 'consumer-group-1'
    consumer_name1 = "consumer-group-1-A"
    consumer_name2 = "consumer-group-1-B"
    token = ""

    if not logstore:
        logstore = 'consumer_group_test_' + str(time.time()).replace('.', '_')

    assert endpoint and accessKeyId and accessKey and project, ValueError("endpoint/access_id/key and "
                                                                          "project cannot be empty")

    # 创建Logstore。
    client = LogClient(endpoint, accessKeyId, accessKey, token)
    ret = client.create_logstore(project, logstore, 2, 4)
    print("successfully create logstore")

    time.sleep(60)
    SampleConsumer.log_results = []

    try:
        # 为Logstore写入日志。
        _prepare_data(client, project, logstore)

        # 在消费组中创建2个消费者消费数据。
        option1 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
                               consumer_name1, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
                               data_fetch_interval=1)
        option2 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
                               consumer_name2, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
                               data_fetch_interval=1)

        print("*** start to consume data...")
        client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
        client_worker1.start()
        client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
        client_worker2.start()

        sleep_until(300, lambda: len(SampleConsumer.log_results) >= test_item_count)

        print("*** consumer group status ***")
        ret = client.list_consumer_group(project, logstore)
        print("successfully list consumergroup")

        for c in ret.get_consumer_groups():
            ret = client.get_check_point_fixed(project, logstore, c.get_consumer_group_name())
            print("successfully get checkpoint fixed")

        print("*** stopping workers")
        client_worker1.shutdown()
        client_worker2.shutdown()

    finally:
        # clean-up
        # ret = client.delete_logstore(project, logstore)
        ret = client.list_logstore(project, logstore)
        print("successfully list logstore")

    # validate
    ret = str(SampleConsumer.log_results)
    print("*** get content:")
    print(ret)

    assert 'magic_user_0' in ret and 'magic_age_0' in ret \
           and 'magic_user_' + str(test_item_count-1) in ret \
           and 'magic_age_' + str(test_item_count-1) in ret


if __name__ == '__main__':
    sample_consumer_group()

预期结果如下:

successfully create logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
......
*** start to consume data...
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}
......
*** consumer group status ***
successfully list consumergroup
successfully get checkpoint fixed
*** stopping workers
successfully list logstore
*** get content:
[{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_8', 'avg': 'magic_age_8'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_9', 'avg': 'magic_age_9'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_2', 'avg': 'magic_age_2'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_11', 'avg': 'magic_age_11'}]}, ......}]

相关文档

  • 阿里云OpenAPI开发者门户提供调试、SDK、示例和配套文档。通过OpenAPI,您无需手动封装请求和签名操作,就可以快速对日志服务API进行调试。更多信息,请参见OpenAPI开发者门户。

  • 为满足越来越多的自动化日志服务配置需求,日志服务提供命令行工具CLI(Command Line Interface)。更多信息,请参见日志服务命令行工具CLI。

  • 更多示例代码,请参见Aliyun Log Python SDK on GitHub。

相关文章

Python SDK概述 2025-04-22 10:35

日志服务Python SDK封装了日志服务的大部分API接口。您可以通过日志服务Python SDK方便地调用日志服务的大部分API接口。 版本说明

安装日志服务Python SDK 2025-04-22 10:35

本文介绍安装Python SDK的方法。安装日志服务Python SDK后,即可在编写代码时调用日志服务SDK的接口。本文介绍如何安装日志服务Python SDK。 前提条件

初始化日志服务Python SDK 2025-04-22 10:35

LogClient是日志服务的客户端,它为调用者提供了一系列的方法,可以用来创建Project和Logstore、写入日志、读取日志等。使用Python SDK发起请求,您需要初始化一个LogClient实例,并根据需要修改默认配置项。 前提条件 安装日志服务Python SDK。

Python SDK快速入门 2025-04-22 10:35

本文介绍如何快速使用日志服务Python SDK完成常见操作,包括创建项目(Project)、创建日志库(Logstore)、写入日志和查询日志等。 前提条件

通过Python SDK使用SQL独享版 2025-04-22 10:35

本文介绍通过Python SDK使用SQL独享版的代码示例。 前提条件 您已完成以下操作:

通过Python SDK管理告警 2025-04-22 10:35

本文介绍通过Python SDK使用告警的代码示例。 前提条件 您已完成以下操作:

目录
Copyright © 2025 your company All Rights Reserved. Powered by 赛尔网络.
京ICP备14022346号-15
gongan beian 京公网安备11010802041014号