赛尔校园公共服务平台 Logo
平台使用
阿里云
百度云
移动云
智算服务
教育生态
登录 →
赛尔校园公共服务平台 Logo
平台使用 阿里云 百度云 移动云 智算服务 教育生态
登录
  1. 首页
  2. 阿里云
  3. 表格存储
  4. 实践教程
  5. 数据同步迁移
  6. 数据导入
  7. 同步Kafka数据
  8. 配置说明

配置说明

  • 同步Kafka数据
  • 发布于 2025-04-22
  • 0 次阅读
文档编辑
文档编辑

启动Tablestore Sink Connector时,您需要通过键值映射向Kafka Connect进程传递参数。通过本文您可以结合配置示例和配置参数说明了解Tablestore Sink Connector的相关配置。

配置示例

当从Kafka同步数据到数据表或者时序表时配置项不同,且不同工作模式下相应配置文件的示例不同。此处以同步数据到数据表中为例介绍配置示例。同步数据到时序表的配置示例中需要增加时序相关配置项。

standalone模式配置示例

如果使用的是standalone模式,您需要通过.properties格式文件进行配置。配置示例如下:

# 设置连接器名称。
name=tablestore-sink
# 指定连接器类。
connector.class=TableStoreSinkConnector
# 设置最大任务数。
tasks.max=1
# 指定导出数据的Kafka的Topic列表。
topics=test
# 以下为Tablestore连接参数的配置。
# Tablestore实例的Endpoint。 
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# 填写AccessKey ID和AccessKey Secret。
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Tablestore实例名称。
tablestore.instance.name=xxx

# 以下为数据映射相关的配置。
# 指定Kafka Record的解析器。
# 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。
event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser

# 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。
# topics.assign.tables配置的优先级更高,如果配置了topics.assign.tables,则忽略table.name.format的配置。
# 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。
table.name.format=<topic>
# 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。
# 如果缺省,则采取table.name.format的配置。
# topics.assign.tables=test:test_kafka

# 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。
# kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。
# record_key表示以Record Key中的字段作为数据表的主键。
# record_value表示以Record Value中的字段作为数据表的主键。
primarykey.mode=kafka

# 定义导入数据表的主键列名和数据类型。
# 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
# 其中<tablename>为数据表名称的占位符。
# 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。
# 当主键模式为record_key或record_value时,必须配置以下两个属性。
# tablestore.test.primarykey.name=A,B
# tablestore.test.primarykey.type=string,integer

# 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。
# 默认值为空,使用Record Value中的所有字段作为数据表的属性列。
# 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
# 其中<tablename>为数据表名称的占位符。
# tablestore.test.columns.whitelist.name=A,B
# tablestore.test.columns.whitelist.type=string,integer

# 以下为写入Tablestore相关的配置。
# 指定写入模式,可选值包括put和update,默认值为put。
# put表示覆盖写。
# update表示更新写。
insert.mode=put
# 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。
insert.order.enable=true
# 是否自动创建目标表,默认值为false。
auto.create=false

# 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。
# none表示不允许进行任何删除。
# row表示允许删除行。
# column表示允许删除属性列。
# row_and_column表示允许删除行和属性列。
delete.mode=none

# 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。
buffer.size=1024
# 写入数据表时的回调线程数,默认值为核数+1。
# max.thread.count=
# 写入数据表时的最大请求并发数,默认值为10。
max.concurrency=10
# 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
bucket.count=3
# 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
flush.Interval=10000

# 以下为脏数据处理相关配置。
# 在解析Kafka Record或者写入数据表时可能发生错误,您可以可通过以下配置进行处理。
# 指定容错能力,可选值包括none和all,默认值为none。
# none表示任何错误都将导致Sink Task立即失败。
# all表示跳过产生错误的Record,并记录该Record。
runtime.error.tolerance=none
# 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。
# ignore表示忽略所有错误。
# kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。
# tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。
runtime.error.mode=ignore

# 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。
# runtime.error.bootstrap.servers=localhost:9092
# runtime.error.topic.name=errors

# 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。
# runtime.error.table.name=errors

distributed模式配置示例

如果使用的是distributed模式,您需要通过JSON格式文件进行配置。配置示例如下:

{
  "name": "tablestore-sink",
  "config": {
    // 指定连接器类。
    "connector.class":"TableStoreSinkConnector",
    // 设置最大任务数。
    "tasks.max":"3",
    // 指定导出数据的Kafka的Topic列表。
    "topics":"test",
    // 以下为Tablestore连接参数的配置。
    // Tablestore实例的Endpoint。
    "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
    // 填写AccessKey ID和AccessKey Secret。
    "tablestore.access.key.id":"xxx",
    "tablestore.access.key.secret":"xxx",
    // Tablestore实例名称。
    "tablestore.instance.name":"xxx",

    // 以下为数据映射相关的配置。
    // 指定Kafka Record的解析器。
    // 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。
    "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",

    // 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。
    // topics.assign.tables配置的优先级更高。如果配置了topics.assign.tables,则忽略table.name.format的配置。
    // 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。
    "table.name.format":"<topic>",
    // 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。
    // 如果缺省,则采取table.name.format的配置。
    // "topics.assign.tables":"test:test_kafka",

    // 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。
    // kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。
    // record_key表示以Record Key中的字段作为数据表的主键。
    // record_value表示以Record Value中的字段作为数据表的主键。
    "primarykey.mode":"kafka",

    // 定义导入数据表的主键列名和数据类型。
    // 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
    // 其中<tablename>为数据表名称的占位符。
    // 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。
    // 当主键模式为record_key或record_value时,必须配置以下两个属性。
    // "tablestore.test.primarykey.name":"A,B",
    // "tablestore.test.primarykey.type":"string,integer",

    // 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。
    // 默认值为空,使用Record Value中的所有字段作为数据表的属性列。
    // 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
    // 其中<tablename>为数据表名称的占位符。
    // "tablestore.test.columns.whitelist.name":"A,B",
    // "tablestore.test.columns.whitelist.type":"string,integer",

    // 以下为写入Tablestore相关的配置。
    // 指定写入模式,可选值包括put和update,默认值为put。
    // put表示覆盖写。
    // update表示更新写。
    "insert.mode":"put",
    // 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。
    "insert.order.enable":"true",
    // 是否自动创建目标表,默认值为false。
    "auto.create":"false",

    // 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。
    // none表示不允许进行任何删除。
    // row表示允许删除行。
    // column表示允许删除属性列。
    // row_and_column表示允许删除行和属性列。
    "delete.mode":"none",

    // 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。
    "buffer.size":"1024",
    // 写入数据表时的回调线程数,默认值为核数+1。
    // "max.thread.count":
    // 写入数据表时的最大请求并发数,默认值为10。
    "max.concurrency":"10",
    // 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
    "bucket.count":"3",
    // 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
    "flush.Interval":"10000",

    // 以下为脏数据处理相关配置。
    // 在解析Kafka Record或者写入数据表时可能发生错误,您可以通过以下配置进行处理。
    // 指定容错能力,可选值包括none和all,默认值为none。
    // none表示任何错误都将导致Sink Task立即失败。
    // all表示跳过产生错误的Record,并记录该Record。
    "runtime.error.tolerance":"none",
    // 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。
    // ignore表示忽略所有错误。
    // kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。
    // tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。
    "runtime.error.mode":"ignore"

    // 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。
    // "runtime.error.bootstrap.servers":"localhost:9092",
    // "runtime.error.topic.name":"errors",

    // 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。
    // "runtime.error.table.name":"errors",
  }

配置项说明

配置文件中的配置项说明请参见下表。其中时序相关配置项只有同步数据到时序表时才需要配置。

Kafka Connect常见配置

配置项

类型

是否必选

示例值

描述

name

string

是

tablestore-sink

连接器(Connector)名称。连接器名称必须唯一。

connector.class

class

是

TableStoreSinkConnector

连接器的Java类。

如果您要使用该连接器,请在connector.class配置项中指定Connector类的名称,支持配置为Connector类的全名com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector或别名TableStoreSinkConnector,例如connector.class=com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector。

tasks.max

integer

是

3

连接器支持创建的最大任务数。

如果连接器无法达到此并行度级别,则可能会创建较少的任务。

key.converter

string

否

org.apache.kafka.connect.json.JsonConverter

覆盖worker设置的默认key转换器。

value.converter

string

否

org.apache.kafka.connect.json.JsonConverter

覆盖worker设置的默认value转换器。

topics

list

是

test

连接器输入的Kafka Topic列表,多个Topic之间以半角逗号(,)分隔。

您必须为连接器设置topics来控制连接器输入的Topic。

连接器Connection配置

配置项

类型

是否必选

示例值

描述

tablestore.endpoint

string

是

https://xxx.xxx.ots.aliyuncs.com

Tablestore实例的服务地址。

tablestore.mode

string

是

timeseries

根据数据同步到的表类型选择模式。取值范围如下:

  • normal(默认):同步数据到表格存储的数据表。

  • timeseries:同步数据到表格存储的时序表。

tablestore.access.key.id

string

是

LTAn********************

登录账号的AccessKey ID和AccessKey Secret,获取方式请参见创建AccessKey。

tablestore.access.key.secret

string

是

zbnK**************************

tablestore.auth.mode

string

是

aksk

设置认证方式。取值范围如下:

  • aksk:使用阿里云账号或者RAM用户的AccessKey ID和Access Secret进行认证。请使用此认证方式。

  • sts(默认):使用STS临时访问凭证进行认证。对接云Kafka时使用。

tablestore.instance.name

string

是

myotstest

Tablestore实例的名称。

连接器Data Mapping配置

配置项

类型

是否必选

示例值

描述

event.parse.class

class

是

DefaultEventParser

消息解析器的Java类,默认值为DefaultEventParser。解析器用于从Kafka Record中解析出数据表的主键列和属性列。

重要

Tablestore对列值大小有限制。string类型和binary类型的主键列值限制均为1 KB,属性列列值限制均为2 MB。更多信息,请参见使用限制。

如果数据类型转换后列值超出对应限制,则将该Kafka Record作为脏数据处理。

如果使用默认的DefaultEventParser解析器,Kafka Record的Key或Value必须为Kafka Connect的Struct或Map类型。

Struct中选择的字段必须为支持的数据类型,字段会根据数据类型映射表转换为Tablestore数据类型写入数据表;Map中的值类型也必须为支持的数据类型,支持的数据类型与Struct相同,最终会被转换为binary类型写入数据表。

如果Kafka Record为不兼容的数据格式,则您可以通过实现com.aliyun.tablestore.kafka.connect.parsers.EventParser定义的接口来自定义解析器。

table.name.format

string

否

kafka_<topic>

目标数据表名称的格式字符串,默认值为<topic>。字符串中可包含<topic>作为原始Topic的占位符。例如当设置table.name.format为kafka_<topic>时,如果Kafka中Topic名为test,则映射到Tablestore的表名为kafka_test。

此配置项的优先级低于topics.assign.tables配置项,如果配置了topics.assign.tables,则会忽略table.name.format的配置。

topics.assign.tables

list

是

test:destTable

指定topic与Tablestore表之间的映射关系,格式为<topic_1>:<tablename_1>,<topic_2>:<tablename_2>。多个映射关系之间以半角逗号(,)分隔,例如test:destTable表示将Topic名为test的消息记录写入数据表destTable中。

此配置项的优先级高于table.name.format配置项,如果配置了topics.assign.tables,则会忽略table.name.format的配置。

primarykey.mode

string

否

kafka

数据表的主键模式。取值范围如下:

  • kafka:以<connect_topic>_<connect_partition>(Kafka主题和分区,用下划线"_"分隔)和<connect_offset>(该消息记录在分区中的偏移量)作为数据表的主键。

  • record_key:以Record Key中的字段(Struct类型)或者键(Map类型)作为数据表的主键。

  • record_value:以Record Value中的字段(Struct类型)或者键(Map类型)作为数据表的主键。

请配合tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type使用。此配置项不区分大小写。

tablestore.<tablename>.primarykey.name

list

否

A,B

数据表的主键列名,其中<tablename>为数据表名称的占位符,包含1~4个主键列,以半角逗号(,)分隔。

主键模式不同时,主键列名的配置不同。

  • 当设置主键模式为kafka时,以topic_partition,offset作为数据表主键列名称。在该主键模式下,您可以不配置此主键列名。如果配置了主键列名,则不会覆盖默认主键列名。

  • 当设置主键模式为record_key时,从Record Key中提取与配置的主键列名相同的字段(Struct类型)或者键(Map类型)作为数据表的主键。在该主键模式下主键列名必须配置。

  • 当设置主键模式为record_value时,从Record Value中提取与配置的主键列名相同的字段(Struct类型)或者键(Map类型)作为数据表的主键。在该主键模式下主键列名必须配置。

Tablestore数据表的主键列是有顺序的,此属性的配置应注意主键列名顺序,例如PRIMARY KEY(A、B、C)与PRIMARY KEY(A、C、B)是不同的两个主键结构。

tablestore.<tablename>.primarykey.type

list

否

string, integer

数据表的主键列数据类型,其中<tablename>为数据表名称的占位符,包含1~4个主键列,以半角逗号(,)分隔,顺序必须与tablestore.<tablename>.primarykey.name相对应。此属性配置不区分大小写。数据类型的可选值包括integer、string、binary和auto_increment。

主键数据类型的配置与主键模式相关。

  • 当主键模式为kafka时,以string, integer作为数据表主键数据类型。

    在该主键模式下,您可以不配置此主键列数据类型。如果配置了主键列数据类型,则不会覆盖默认主键列数据类型。

  • 当主键模式为record_key或record_value时,指定相应主键列的数据类型。在该主键模式下主键列数据类型必须配置。

    如果指定的数据类型与Kafka Schema中定义的数据类型发生冲突,则会产生解析错误,您可以配置Runtime Error相关属性来应对解析错误。

    当配置此配置项为auto_increment时,表示主键自增列,此时Kafka Record中可以缺省该字段,写入数据表时会自动插入自增列。

tablestore.<tablename>.columns.whitelist.name

list

否

A,B

数据表的属性列白名单中属性列名称,其中<tablename>为数据表名称的占位符,以半角逗号(,)分隔。

如果配置为空,则使用Record Value中的所有字段(Struct类型)或者键(Map类型)作为数据表的属性列,否则用于过滤得到所需属性列。

tablestore.<tablename>.columns.whitelist.type

list

否

string, integer

数据表的属性列白名单中属性列数据类型,其中<tablename>为数据表名称的占位符,以半角逗号(,)分隔,顺序必须与tablestore.<tablename>.columns.whitelist.name相对应。此属性配置不区分大小写。数据类型的可选值包括integer、string、binary、boolean和double。

连接器Write配置

配置项

类型

是否必选

示例值

描述

insert.mode

string

否

put

写入模式。取值范围如下:

  • put(默认):对应Tablestore的PutRow操作,即新写入一行数据会覆盖原数据。

  • update:对应Tablestore的UpdateRow操作,即更新一行数据,可以增加一行中的属性列,或更新已存在的属性列的值。

此属性配置不区分大小写。

insert.order.enable

boolean

否

true

写入数据表时是否需要保持顺序。取值范围如下:

  • true(默认):写入时保持Kafka消息记录的顺序。

  • false:写入顺序无保证,但写入效率会提升。

auto.create

boolean

否

false

是否需要自动创建目标表,支持自动创建数据表或时序表。取值范围如下:

  • true:自动创建目标表。

  • false(默认):不自动创建目标表。

delete.mode

string

否

none

删除模式,仅当同步数据到数据表且主键模式为record_key时才有效。取值范围如下:

  • none(默认):不允许进行任何删除。

  • row:允许删除行。当Record Value为空时会删除行。

  • column:允许删除属性列。当Record Value中字段值(Struct类型)或者键值(Map类型)为空时会删除属性列。

  • row_and_column:允许删除行和属性列。

此属性配置不区分大小写。

删除操作与insert.mode的配置相关。更多信息,请参见附录:删除语义。

buffer.size

integer

否

1024

写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须是2的指数。

max.thread.count

integer

否

3

写入数据表时的回调线程数,默认值为CPU核数+1。

max.concurrency

integer

否

10

写入数据表时的最大请求并发数,默认值为10。

bucket.count

integer

否

3

写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。

flush.Interval

integer

否

10000

写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。

连接器Runtime Error配置

配置项

类型

是否必选

示例值

描述

runtime.error.tolerance

string

否

none

解析Kafka Record或者写入表时产生错误的处理策略。取值范围如下:

  • none(默认):任何错误都将导致Sink Task立即失败。

  • all:跳过产生错误的Record,并记录该Record。

此属性配置不区分大小写。

runtime.error.mode

string

否

ignore

解析Kafka Record或者写入表时产生错误,对错误的Record的处理策略。取值范围如下:

  • ignore(默认):忽略所有错误。

  • kafka:将产生错误的Record和错误信息存储在Kafka的另一个Topic中,此时需要配置runtime.error.bootstrap.servers和runtime.error.topic.name。记录运行错误的Kafka Record的Key和Value与原Record一致,Header中增加ErrorInfo字段来记录运行错误信息。

  • tablestore:将产生错误的Record和错误信息存储在Tablestore另一张数据表中,此时需要配置runtime.error.table.name。记录运行错误的数据表主键列为topic_partition(string类型), offset(integer类型),并且属性列为key(bytes类型)、value(bytes类型)和error_info(string类型)。

kafka模式下需要对Kafka Record的Header、Key和Value进行序列化转换,tablestore模式下需要对Kafka Record的Key和Value进行序列化转换,此处默认使用org.apache.kafka.connect.json.JsonConverter,并且配置schemas.enable为true,您可以通过JsonConverter反序列化得到原始数据。关于Converter的更多信息,请参见Kafka Converter。

runtime.error.bootstrap.servers

string

否

localhost:9092

用于记录运行错误的Kafka集群地址。

runtime.error.topic.name

string

否

errors

用于记录运行错误的Kafka Topic名称。

runtime.error.table.name

string

否

errors

用于记录运行错误的Tablestore表名称。

时序相关配置

配置项

类型

是否必选

示例值

描述

tablestore.timeseries.<tablename>.measurement

string

是

mName

将JSON中的key值为指定值对应的value值作为_m_name字段写入对应时序表中。

如果设置此配置项为<topic>,则将Kafka记录的topic作为_m_name字段写入时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改,例如时序表名称为test,则配置项名称为tablestore.timeseries.test.measurement。

tablestore.timeseries.<tablename>.dataSource

string

是

ds

将JSON中的key值为ds对应的value值作为_data_source字段写入对应时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.tags

list

是

region,level

将JSON中key值为region和level所对应的value值作为tags字段写入对应时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.time

string

是

timestamp

将JSON中key值为timestamp对应的value值作为_time字段写入对应时序表中。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.time.unit

string

是

MILLISECONDS

tablestore.timeseries.<tablename>.time值的时间戳单位。取值范围为SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.field.name

list

否

cpu,io

将JSON中key值为cpu和io的键值对作为_field_name以及_field_name的值写入对应时序表。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.<tablename>.field.type

string

否

double,integer

tablestore.timeseries.<tablename>.field.name中字段对应的数据类型。取值范围为double、integer、string、binary、boolean。多个数据类型之间用半角逗号(,)分隔。

配置项名称中<tablename>为时序表名称的占位符,请根据实际情况修改。

tablestore.timeseries.mapAll

boolean

否

false

将输入JSON中的非主键字段和时间字段都作为field存储到时序表中。

当配置项取值为false时,tablestore.timeseries.<tablename>.field.name和tablestore.timeseries.<tablename>.field.type必填。

tablestore.timeseries.toLowerCase

boolean

否

true

将field中的key(输入数据中非主键或者时间的key,或配置在tablestore.timeseries.<tablename>.field.name中的key)转换为小写写入时序表。

tablestore.timeseries.rowsPerBatch

integer

否

50

写入tablestore时,一次请求支持写入的最大行数。最大值为200,默认值为200。

附录:Kafka和Tablestore数据类型映射

Kafka和Tablestore数据类型映射关系请参见下表。

Kafka Schema Type

Tablestore数据类型

STRING

STRING

INT8、INT16、INT32、INT64

INTEGER

FLOAT32、FLOAT64

DOUBLE

BOOLEAN

BOOLEAN

BYTES

BINARY

附录:删除语义

说明

只有同步数据到数据表时才支持此功能。

当同步数据到数据表且Kafka消息记录的value中存在空值时,根据写入模式(insert.mode)和删除模式(delete.mode)的不同设置,数据写入到表格存储数据表的处理方式不同,详细说明请参见下表。

insert.mode

put

update

delete.mode

none

row

column

row_and_column

none

row

column

row_and_column

value为空值

覆盖写

删行

覆盖写

删行

脏数据

删行

脏数据

删行

value所有字段值均为空值

覆盖写

覆盖写

覆盖写

覆盖写

脏数据

脏数据

删列

删列

value部分字段值为空值

覆盖写

覆盖写

覆盖写

覆盖写

忽略空值

忽略空值

删列

删列

相关文章

同步Kafka数据到数据表 2025-04-22 14:37

Tablestore Sink Connector会根据订阅的主题轮询地从Kafka中拉取消息,并对消息记录进行解析,然后将数据批量导入到Tablestore的数据表。 前提条件

同步Kafka数据到时序表 2025-04-22 14:37

您可以使用kafka-connect-tablestore包将Kafka中数据写入Tablestore的时序表中。本文主要介绍了如何配置Kafka写入时序数据。 前提条件

配置说明 2025-04-22 14:37

启动Tablestore Sink Connector时,您需要通过键值映射向Kafka Connect进程传递参数。通过本文您可以结合配置示例和配置参数说明了解Tablestore Sink Connector的相关配置。 配置示例

错误处理 2025-04-22 14:37

在将Kafka数据导入到表格存储的过程中可能产生错误,如果您不希望导致Sink Task立即失败,您可以配置错误处理策略。本文介绍Kafka Connect Error和Tablestore Sink Task Error两种错误类型的处理方法。 Kafka Connect Error

目录
Copyright © 2025 your company All Rights Reserved. Powered by 赛尔网络.
京ICP备14022346号-15
gongan beian 京公网安备11010802041014号