赛尔校园公共服务平台 Logo
平台使用
阿里云
百度云
移动云
智算服务
教育生态
登录 →
赛尔校园公共服务平台 Logo
平台使用 阿里云 百度云 移动云 智算服务 教育生态
登录
  1. 首页
  2. 阿里云
  3. 日志服务
  4. 开发参考
  5. 日志服务SDK
  6. Java SDK
  7. 日志管理
  8. 使用Aliyun Log Java Producer写入日志数据

使用Aliyun Log Java Producer写入日志数据

  • 日志管理
  • 发布于 2025-04-22
  • 0 次阅读
文档编辑
文档编辑

如果您在使用Flink、Spark、Storm等大数据计算引擎时,需要将日志进行压缩、批量上传日志到日志服务、减少网络传输资源的占用,API或者SDK往往无法满足大数据场景对数据写入能力的要求,您可以使用Aliyun Log Java Producer,便捷高效地将数据上传到日志服务。

前提条件

您已完成以下操作:

  • 开通日志服务。

  • 初始化日志服务Python SDK。

  • 已安装日志服务Java SDK。具体操作,请参见安装Java SDK。

什么是Aliyun Log Java Producer

Aliyun Log Java Producer是为运行在大数据、高并发场景下的Java应用量身打造的高性能类库。相对于原始的API或SDK,使用该类库写日志数据能为您带来诸多优势,包括高性能、计算与I/O逻辑分离、资源可控制等。Aliyun LOG Java Producer使用阿里云日志服务提供的顺序写入功能来保证日志的上传顺序。

Aliyun Log Java Producer实现原理请参见Aliyun LOG Java Producer。

日志服务提供基于Aliyun Log Java Producer的样例应用程序,便于您快速上手。更多信息,请参见Aliyun Log Producer Sample Application。

工作流程

image

特点

  • 线程安全:Producer接口暴露的所有方法都是线程安全的。

  • 异步发送:调用Producer的发送接口通常能够立即返回响应。Producer内部会缓存并合并待发送数据,然后批量发送以提高吞吐量。

  • 自动重试:Producer会根据配置的最大重试次数和重试退避时间进行重试。

  • 行为追溯:通过Callback或Future能获取当前数据是否发送成功的信息,也可以获得该数据每次被尝试发送的信息,有利于问题追溯和行为决策。

  • 上下文还原:同一个Producer实例产生的日志在同一上下文中,在服务端可以查看某条日志前后相关的日志。

  • 优雅关闭:保证close方法退出时,Producer缓存的所有数据都能被处理,同时您也能得到相应的通知。

应用场景

producer对比原始的API或SDK的优势如下:

  • 高性能

    在海量数据、资源有限的前提下,写入端要达到目标吞吐量需要实现复杂的控制逻辑,包括多线程、缓存策略、批量发送等,另外还要充分考虑失败重试的场景。Producer实现了上述功能,在为您带来性能优势的同时简化了程序开发步骤。

  • 异步非阻塞

    在可用内存充足的前提下,Producer会对发往日志库的数据进行缓存,因此调用send方法时能够立即返回响应且不会阻塞,可达到计算与I/O逻辑分离的目的。随后,您可以通过返回的Future对象或传入的Callback获得数据发送的结果。

  • 资源可控制

    可以通过参数控制Producer用于缓存待发送数据的内存大小,同时还可以配置用于执行数据发送任务的线程数量。这样可避免Producer无限制地消耗资源,且可以让您根据实际情况平衡资源消耗和写入吞吐量。

  • 定位问题简单

    如果日志数据发送失败,Producer除了返回状态码,还会返回一个String类型的异常信息,用于描述失败的原因和详细信息。例如,如果发送失败是因为网络连接超时,则返回的异常信息可能是“连接超时”;如果发送失败是因为服务器无响应,则返回的异常信息可能是“服务器无响应”。

使用限制

  • aliyun-log-producer底层调用PutLogs接口上传日志,每次可以写入的原始日志大小存在限制。更多信息,请参见数据读写。

  • 日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源。

  • 代码首次运行后,请在日志服务控制台开启日志库索引,等待一分钟后,进行查询。

  • 在控制台进行日志查询时,当单个字段值长度超过最大长度时,超出部分被截断,不参与分析。更多信息,请参考创建索引。

费用说明

使用SDK产生的费用和使用控制台产生的费用一致。更多信息,请参见计费概述。

步骤一:安装Aliyun Log Java Producer

在Maven工程中使用日志服务Aliyun Log Java Producer,只需在pom.xml中加入相应依赖。Maven项目管理工具会自动下载相关JAR包。例如,在<dependencies>中加入如下内容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log-producer</artifactId>
    <version>0.3.22</version>
</dependency>

添加更新完后,如果提示Producer依赖的版本冲突,在<dependencies>中加入如下内容:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>aliyun-log</artifactId>
    <version>0.6.114</version>
  <classifier>jar-with-dependencies</classifier>
</dependency>

步骤二:配置ProducerConfig

ProducerConfig用于配置发送策略,您可以根据不同的业务场景为参数指定不同的值,各参数含义如下表所示:

Config producerConfig = new ProducerConfig();
producerConfig.setTotalSizeInBytes(104857600);

参数

类型

描述

totalSizeInBytes

整型

单个Producer实例能缓存的日志大小上限,默认为 100MB。

maxBlockMs

整型

如果Producer可用空间不足,调用者在send方法上的最大阻塞时间,默认为60秒。

如果超过这个时间后所需空间仍无法得到满足,send方法会抛出TimeoutException。

如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。

如果您希望send方法一直阻塞直到所需空间得到满足,可将该值设为负数。

ioThreadCount

整型

执行日志发送任务的线程池大小,默认为可用处理器个数。

batchSizeThresholdInBytes

整型

当一个ProducerBatch中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该batch将被发送,默认为512KB,最大可设置成 5MB。

batchCountThreshold

整型

当一个ProducerBatch中缓存的日志条数大于等于 batchCountThreshold时,该batch将被发送,默认4096,最大可设置成40960。

lingerMs

整型

一个ProducerBatch从创建到可发送的逗留时间,默认为2秒,最小可设置成100毫秒。

retries

整型

如果某个ProducerBatch首次发送失败,能够对其重试的次数,默认为10次。

如果retries小于等于 0,该ProducerBatch首次发送失败后将直接进入失败队列。

maxReservedAttempts

整型

每个ProducerBatch每次被尝试发送都对应着一个Attempt,此参数用来控制返回给用户的attempt个数,默认只保留最近的11次attempt信息。

该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。

baseRetryBackoffMs

整型

首次重试的退避时间,默认为100毫秒。

Producer采样指数退避算法,第N次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。

maxRetryBackoffMs

整型

重试的最大退避时间,默认为50秒。

adjustShardHash

布尔

如果调用send方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为true。

buckets

整型

当且仅当adjustShardHash为true时,该参数才生效。此时,producer会自动将shardHash重新分组,分组数量为buckets。

如果两条数据的shardHash不同,它们是无法合并到一起发送的,会降低producer吞吐量。将shardHash重新分组后,能让数据有更多地机会被批量发送。

该参数的取值范围是 [1, 256],且必须是2的整数次幂,默认为64。

步骤三:创建Producer

Producer 支持用户配置AK或STS token。如果使用STS token,需要定期创建新的ProjectConfig然后将其添加到ProjectConfigs里。

LogProducer是接口Producer的实现类,它接收唯一的参数producerConfig。当您准备好producerConfig后,可以按照下列方式创建producer实例。

Producer producer = new LogProducer(producerConfig);

创建producer的同时会创建一系列线程,这是一个相对昂贵的操作,因此建议一个应用共用一个producer实例。一个producer实例包含的线程如下表所示,其中N为该producer实例在当前进程中的编号,从 0 开始。另外,LogProducer提供的所有方法都是线程安全的,可以在多线程环境下安全执行。

线程名格式

数量

描述

aliyun-log-producer-<N>-mover

1

负责将满足发送条件的batch投递到发送线程池里。

aliyun-log-producer-<N>-io-thread

ioThreadCount

IOThreadPool中真正用于执行数据发送任务的线程。

aliyun-log-producer-<N>-success-batch-handler

1

用于处理发送成功的batch。

aliyun-log-producer-<N>-failure-batch-handler

1

用于处理发送失败的batch。

步骤四:配置日志项目

ProjectConfig包含目标Project的服务入口信息以及表征调用者身份的访问凭证。每个日志项目对应一个ProjectConfig对象。

可以按照如下方式创建实例。

ProjectConfig project1 = new ProjectConfig("your-project-1", "cn-hangzhou.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
ProjectConfig project2 = new ProjectConfig("your-project-2", "cn-shanghai.log.aliyuncs.com", "accessKeyId", "accessKeySecret");
producer.putProject(project1);
producer.putProject(project2);

步骤五:发送数据

创建Future或Callback

在使用Aliyun Log Java Producer发送日志数据时,需要指定一个回调函数来处理发送过程中的各种情况。当日志数据发送成功时,回调函数会被调用,并返回一个发送结果;当日志数据发送失败时,回调函数也会被调用,并传入一个异常对象。

说明

如果获取结果后,应用的处理逻辑比较简单且不会造成producer阻塞,建议直接使用callback。否则,建议使用ListenableFuture,在单独的线程(池)中执行后续业务

方法的各个参数含义如下:

参数

描述

project

待发送数据的目标 project。

logstore

待发送数据的目标 logStore。

logTem

待发送数据。

completed

Java提供的一个原子类型,用来确保所有日志发送完成(成功或者失败)。

发送数据

Producer接口提供多种发送方法,方法的各个参数含义如下。

参数

描述

是否必选

project

目标Project。

是

logStore

目标LogStore。

是

logItem

要发送的日志/日志列表。

是

topic

日志主题

否

说明

如果留空或没有指定,该字段将被赋予""。

source

发送源。

否

说明

如果留空或没有指定,该字段将被赋予producer所在宿主机的 IP。

shardHash

可为发送的日志设置自定义哈希,服务端将根据此哈希选择对应的日志库Shard分片写入日志。

否

说明

如果留空或没有指定,数据将被随机写入目标LogStore的某个shard中。

callback

可设置一个回调函数。该回调函数将在日志被成功发送或者重试多次失败后被丢弃时调用。

否

常见异常

异常

说明

TimeoutException

当Producer缓存的日志大小超过设定的内存上限时,且阻塞maxBlockMs毫秒后仍未获取到足够内存时,将抛出TimeoutException。

maxBlockMs 为-1时,阻塞没有时间上限,将永远不会抛出 TimeoutException。

IllegalStateException

当Producer已经处于关闭状态(调用过close方法)时,再调用send 方法,会抛出IllegalStateException。

步骤六:获取发送数据

由于producer提供的所有发送方法都是异步的,需要通过返回的future或者传入的callback获取发送结果。

Future

Send 方法会返回一个ListenableFuture,它除了可以像普通future那样通过调用get方法阻塞获得发送结果外,还允许你注册回调方法(回调方法会在完成 future 设置后被调用)。以下代码片段展示了ListenableFuture的使用方法,用户需要为该future注册一个FutureCallback并将其投递到应用提供的线程池EXECUTOR_SERVICE中执行,完整样例请参见SampleProducerWithFuture.java。

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class SampleProducerWithCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws InterruptedException {
        final String project = "example-project";
        final String logstore = "example-logstore";
        String endpoint = "example-endpoint";
        // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        ProducerConfig producerConfig = new ProducerConfig();
        final Producer producer = new LogProducer(producerConfig);
        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));

        int nTask = 100;
        // The number of logs that have finished (either successfully send, or failed).
        final AtomicLong completed = new AtomicLong(0);
        final CountDownLatch latch = new CountDownLatch(nTask);

        for (int i = 0; i < nTask; ++i) {
            threadPool.submit(
                    new Runnable() {
                        @Override
                        public void run() {
       //The maximum size of a LogItem (key) is 128 bytes.  The maximum size of a LogItem (value) is 1 MB.               
                            LogItem logItem = new LogItem();
                            logItem.PushBack("key1", "foo");
                            logItem.PushBack("key2", "bar");
                            try {
                                producer.send(
                                        project,
                                        logstore,
                                        "your-topic",
                                        "your-source",
                                        logItem,
                                        new SampleCallback(project, logstore, logItem, completed));
                            } catch (InterruptedException e) {
                                LOGGER.warn("The current thread has been interrupted during send logs.");
                            } catch (Exception e) {
                                LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                            } finally {
                                latch.countDown();
                            }
                        }
                    });
        }

        // 只有进程退出的时候,才需要考虑如下的逻辑。
        latch.await();
        threadPool.shutdown();
        try {
            producer.close();
        } catch (InterruptedException e) {
            LOGGER.warn("The current thread has been interrupted from close.");
        } catch (ProducerException e) {
            LOGGER.info("Failed to close producer, e=", e);
        }

        LOGGER.info("All log complete, completed={}", completed.get());
    }

    private static final class SampleCallback implements Callback {
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
        private final String project;
        private final String logStore;
        private final LogItem logItem;
        private final AtomicLong completed;

        SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
            this.project = project;
            this.logStore = logStore;
            this.logItem = logItem;
            this.completed = completed;
        }

        @Override
        public void onCompletion(Result result) {
            try {
                if (result.isSuccessful()) {
                    LOGGER.info("Send log successfully.");
                } else {
                    LOGGER.error(
                            "Failed to send log, project={}, logStore={}, logItem={}, result={}",
                            project,
                            logStore,
                            logItem.ToJsonString(),
                            result);
                }
            } finally {
                completed.getAndIncrement();
            }
        }
    }
}

Callback

Callback由producer内部线程负责执行,并且只有在执行完毕后数据“占用”的空间才会释放。为了不阻塞producer造成整体吞吐量的下降,要避免在callback里执行耗时的操作。另外,在callback中调用send方法进行重试也是不建议的,您可以在ListenableFuture的callback中进行重试。完整样例请参见SampleProducerWithCallback.java。

import com.aliyun.openservices.aliyun.log.producer.Callback;
import com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.aliyun.openservices.aliyun.log.producer.Producer;
import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.aliyun.openservices.aliyun.log.producer.Result;
import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.aliyun.openservices.log.common.LogItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class SampleProducerWithCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);

    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws InterruptedException {
        final String project = "example-project";
        final String logstore = "example-logstore";
        String endpoint = "example-endpoint";
        // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

        ProducerConfig producerConfig = new ProducerConfig();
        final Producer producer = new LogProducer(producerConfig);
        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));

        int nTask = 100;
        // The number of logs that have finished (either successfully send, or failed).
        final AtomicLong completed = new AtomicLong(0);
        final CountDownLatch latch = new CountDownLatch(nTask);

        for (int i = 0; i < nTask; ++i) {
            threadPool.submit(
                    new Runnable() {
                        @Override
                        public void run() {
       //The maximum size of a LogItem (key) is 128 bytes.  The maximum size of a LogItem (value) is 1 MB.               
                            LogItem logItem = new LogItem();
                            logItem.PushBack("key1", "foo");
                            logItem.PushBack("key2", "bar");
                            try {
                                producer.send(
                                        project,
                                        logstore,
                                        "your-topic",
                                        "your-source",
                                        logItem,
                                        new SampleCallback(project, logstore, logItem, completed));
                            } catch (InterruptedException e) {
                                LOGGER.warn("The current thread has been interrupted during send logs.");
                            } catch (Exception e) {
                                LOGGER.error("Failed to send log, logItem={}, e=", logItem, e);
                            } finally {
                                latch.countDown();
                            }
                        }
                    });
        }

        // 只有进程退出的时候,才需要考虑如下的逻辑。
        latch.await();
        threadPool.shutdown();
        try {
            producer.close();
        } catch (InterruptedException e) {
            LOGGER.warn("The current thread has been interrupted from close.");
        } catch (ProducerException e) {
            LOGGER.info("Failed to close producer, e=", e);
        }

        LOGGER.info("All log complete, completed={}", completed.get());
    }

    private static final class SampleCallback implements Callback {
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
        private final String project;
        private final String logStore;
        private final LogItem logItem;
        private final AtomicLong completed;

        SampleCallback(String project, String logStore, LogItem logItem, AtomicLong completed) {
            this.project = project;
            this.logStore = logStore;
            this.logItem = logItem;
            this.completed = completed;
        }

        @Override
        public void onCompletion(Result result) {
            try {
                if (result.isSuccessful()) {
                    LOGGER.info("Send log successfully.");
                } else {
                    LOGGER.error(
                            "Failed to send log, project={}, logStore={}, logItem={}, result={}",
                            project,
                            logStore,
                            logItem.ToJsonString(),
                            result);
                }
            } finally {
                completed.getAndIncrement();
            }
        }
    }
}

步骤七:关闭Producer

当您已经没有数据需要发送或者当前进程准备退出时,需要关闭Producer,目的是让Producer中缓存的数据全部被处理。目前,Producer提供安全关闭和有限关闭两种模式。

安全关闭

在大多数情况下,建议您使用安全关闭。安全关闭对应的方法是close(),它会等到Producer中缓存的数据全部被处理、线程全部停止、注册的callback全部执行,返回future全部被设置后才会返回。

虽然要等到数据全部处理完成,但Producer被关闭后,缓存的batch会被立刻处理且不会被重试。因此,如果callback不被阻塞,close方法往往能在很短的时间内返回。

有限关闭

如果您的callback在执行过程中有可能阻塞,但您又希望close方法能在短时间内返回,可以使用有限关闭。有限关闭对应的方法是close(long timeoutMs),如果超过指定的timeoutMs后Producer仍未完全关闭,它会抛出IllegalStateException异常,这意味着缓存的数据可能还没来得及处理就被丢弃,用户注册的Callback也可能不会被执行。

常见问题

写入数据次数是否存在限制?

  • 日志服务读写数据的次数和大小均存在限制。更多信息,请参见数据读写。

  • 日志服务的基础资源,包括创建Project个数、Logstore个数、Shard个数、LogtailConfig个数、机器组个数、单个LogItem大小、LogItem(Key)长度和LogItem(Value)长度等均存在限制。更多信息,请参见基础资源。

为什么数据没有写入日志服务?

如果您发现数据没有写入日志服务,可通过如下步骤诊断问题。

  1. 检查您项目中引入的aliyun-log-producer、aliyun-log、protobuf-java Jar包的版本是否和文档中安装部分列出的Jar包版本一致,如果不一致请进行升级。

  2. Producer接口的send方法异步发送数据,无法及时获取返回的值。请通过Callback接口或返回的Future对象获取数据发送失败的原因。

  3. 如果您发现并没有回调Callback接口的onCompletion方法,请检查在您的程序退出之前是否有调用producer.close()方法。因为数据发送是由后台线程异步完成的,为了防止缓存在内存里的少量数据丢失,请务必在程序退出之前调用producer.close()方法。

  4. Producer接口会把运行过程中的关键行为通过日志框架slf4j进行输出,您可以在程序中配置好相应的日志实现框架并打开DEBUG级别的日志。重点检查是否输出ERROR级别的日志。

  5. 如果通过上述步骤仍然没有解决,请提工单。

相关文档

  • 在调用API接口过程中,若服务端返回结果中包含错误信息,则表示调用API接口失败。您可以参考API错误码对照表查找对应的解决方法。更多信息,请参见API错误处理对照表。

  • 日志服务除自研的SDK外,还支持公共的阿里云SDK,关于阿里云SDK的使用方式,请参见日志服务_SDK中心-阿里云OpenAPI开发者门户。

  • 为满足越来越多的自动化日志服务配置需求,日志服务提供命令行工具CLI(Command Line Interface)。更多信息,请参见日志服务命令行工具CLI。

  • 更多示例代码,请参见Aliyun Log Java SDK on GitHub。

相关文章

使用Java SDK管理索引 2025-04-22 10:35

索引是一种倒排的数据存储结构,由关键词和指向实际数据的逻辑指针组成,用于快速根据关键词定位到具体数据行,类似于数据的目录。您只有配置索引后,才能进行查询和分析操作。本文通过代码示例介绍如何创建、修改、查询、删除索引。 前提条件

使用Java SDK写入日志 2025-04-22 10:35

如果想将应用程序的运行日志、操作系统日志、用户日志等上传到日志服务,您可以使用日志服务Java SDK提供的PutLogs方法。本文主要介绍使用日志服务Java SDK将日志写入到日志服务的操作步骤。 前提条件

使用Aliyun Log Java Producer写入日志数据 2025-04-22 10:35

如果您在使用Flink、Spark、Storm等大数据计算引擎时,需要将日志进行压缩、批量上传日志到日志服务、减少网络传输资源的占用,API或者SDK往往无法满足大数据场景对数据写入能力的要求,您可以使用Aliyun Log Java Producer,便捷高效地将数据上传到日志服务。

使用GetLogs接口查询日志 2025-04-22 10:35

完成日志采集后,您可以调用GetLogs接口查询采集到的日志。本文介绍GetLogs接口示例。 前提条件 已完成日

使用Java SDK管理快速查询 2025-04-22 10:35

当您需要频繁查看某一查询和分析结果时,可以将对应查询和分析语句另存为快速查询。本文通过代码示例介绍如何创建、修改、查询、删除快速查询等。 前提条件

目录
Copyright © 2025 your company All Rights Reserved. Powered by 赛尔网络.
京ICP备14022346号-15
gongan beian 京公网安备11010802041014号