赛尔校园公共服务平台 Logo
平台使用
阿里云
百度云
移动云
智算服务
教育生态
登录 →
赛尔校园公共服务平台 Logo
平台使用 阿里云 百度云 移动云 智算服务 教育生态
登录
  1. 首页
  2. 阿里云
  3. 表格存储
  4. 实践教程
  5. 计算与分析
  6. 实时计算Flink
  7. 使用教程

使用教程

  • 实时计算Flink
  • 发布于 2025-04-22
  • 0 次阅读
文档编辑
文档编辑

本文为您介绍如何使用Flink计算表格存储(Tablestore)的数据,表格存储中的数据表或时序表均可作为实时计算Flink的源表或结果表进行使用。

前提条件

  • 已开通表格存储服务并创建实例。具体操作,请参见开通服务和创建实例。

  • 已创建源表和结果表,并为源表创建数据通道。具体操作,请参见数据表操作、时序表操作和创建数据通道。

  • 已开通Flink工作空间。具体操作,请参见开通实时计算Flink版。

    重要

    实时计算Flink必须与表格存储服务位于同一地域。实时计算Flink支持的地域,请参见地域列表。

  • 已获取AccessKey信息。

    重要

    出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。具体操作,请参见创建RAM用户并授权。

实时计算作业开发流程

步骤一:创建作业

  1. 进入SQL作业创建页面。

    1. 登录实时计算控制台。

    2. 单击目标工作空间操作列下的控制台。

    3. 在左侧导航栏,单击数据开发 > ETL。

  2. 单击新建后,在新建作业草稿对话框,选择空白的流作业草稿,单击下一步。

    说明

    Flink也为您提供了丰富的代码模板和数据同步,每种代码模板都为您提供了具体的使用场景、代码示例和使用指导。您可以直接单击对应的模板快速地了解Flink产品功能和相关语法,实现您的业务逻辑,详情请参见代码模板和数据同步模板。

  3. 填写作业信息。

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    flink-test

    存储位置

    指定该作业的代码文件所属的文件夹。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    作业草稿

    引擎版本

    当前作业使用的Flink的引擎版本,引擎版本详情请参见功能发布记录和引擎版本介绍。

    vvr-8.0.10-flink-1.17

  4. 单击创建。

步骤二:编写SQL作业

说明

此处以将数据表中的数据同步至另一个数据表为例,为您介绍如何编写SQL作业。更多SQL示例,请参考SQL示例。

  1. 分别创建源表(数据表)和结果表(数据表)的临时表。

    详细配置信息,请参见附录1:Tablestore连接器。

    -- 创建源表(数据表)的临时表 tablestore_stream
    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector' = 'ots', -- 源表的连接器类型。固定取值为ots。
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- 表格存储实例的VPC地址。
        'instanceName' = 'xxx', -- 表格存储的实例名称。
        'tableName' = 'flink_source_table', -- 表格存储的源表名称。
        'tunnelName' = 'flink_source_tunnel', -- 表格存储源表的数据通道名称。
        'accessId' = 'xxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey ID。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey Secret。
        'ignoreDelete' = 'false' -- 是否忽略DELETE操作类型的实时数据:不忽略。
    );
    
    -- 创建结果表(数据表)的临时表 tablestore_sink
    CREATE TEMPORARY TABLE tablestore_sink(
       `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED -- 主键。
    ) WITH (
        'connector' = 'ots', -- 结果表的连接器类型。固定取值为ots。
        'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- 表格存储实例的VPC地址。
        'instanceName' = 'xxx', -- 表格存储的实例名称。
        'tableName' = 'flink_sink_table', -- 表格存储的结果表名称。
        'accessId' = 'xxxxxxxxxxx',  -- 阿里云账号或者RAM用户的AccessKey ID。
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey Secret。
        'valueColumns'='customerid,customername' --插入字段的列名。
    );
  2. 编写作业逻辑。

    将源表数据插入到结果表的代码示例如下:

    --将源表数据插入到结果表
    INSERT INTO tablestore_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

步骤三:(可选)查看配置信息

在SQL编辑区域右侧页签,您可以查看或上传相关配置。

页签名称

配置说明

更多配置

  • 引擎版本:当前作业使用的Flink的引擎版本。

  • 附加依赖文件:作业中需要使用到的附加依赖,例如临时函数等。

    您可以下载VVR依赖,并在资源文件页签进行上传,然后选择附加依赖文件为上传的VVR依赖即可。具体操作,请参见附录2:配置VVR依赖。

代码结构

  • 数据流向图:您可以通过数据流向图快速查看数据的流向。

  • 树状结构图:您可以通过树状结构图快速查看数据的来源。

版本信息

您可以在此处查看作业版本信息,操作列下的功能详情请参见管理作业版本。

步骤四:(可选)进行深度检查

深度检查能够检查作业的SQL语义、网络连通性以及作业使用的表的元数据信息。同时,您可以单击结果区域的SQL优化,展开查看SQL风险问题提示以及对应的SQL优化建议。

  1. 在SQL编辑区域右上方,单击深度检查。

  2. 在深度检查对话框,单击确认。

步骤五:(可选)进行作业调试

您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECT或INSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。

  1. 在SQL编辑区域右上方,单击调试。

  2. 在调试对话框,选择调试集群后,单击下一步。

    如果没有可用集群则需要创建新的Session集群,Session集群与SQL作业引擎版本需要保持一致并处于运行中。详情请参见创建Session集群。

  3. 配置调试数据。

    • 如果您使用线上数据,无需处理。

    • 如果您需要使用调试数据,需要先单击下载调试数据模板,填写调试数据后,上传调试数据。详情请参见作业调试。

  4. 确定好调试数据后,单击确定。

步骤六:进行作业部署

在SQL编辑区域右上方,单击部署,在部署新版本对话框,可根据需要填写或选中相关内容,单击确定。

说明

Session集群适用于非生产环境的开发测试环境,通过部署或调试作业提高作业JM(Job Manager)资源利用率和提高作业启动速度。但不推荐您将生产作业提交至Session集群中,可能会导致业务稳定性问题。

步骤七:启动并查看Flink计算结果

  1. 在左侧导航栏,单击运维中心 > 作业运维。

  2. 单击目标作业操作列中的启动。

    选择无状态启动后,单击启动。当作业状态转变为运行中时,代表作业运行正常。作业启动参数配置,详情请参见作业启动。

    说明
    • Flink中的每个TaskManager建议配置2CPU和4GB内存,此配置可以充分发挥每个TaskManager的计算能力。单个TaskManager能达到1万/s的写入速率。

    • 在source表分区数目足够多的情况下,建议Flink中并发配置在16以内,写入速率随并发线性增长。

  3. 在作业运维详情页面,查看Flink计算结果。

    1. 在运维中心 > 作业运维页面,单击目标作业名称。

    2. 在作业日志页签,单击运行Task Managers页签下Path,ID列的目标任务。

    3. 单击日志,在页面查看相关的日志信息。

  4. (可选)停止作业。

    如果您对作业进行了修改(例如更改代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要重新部署作业,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,或者更新非动态生效的参数配置时,也需要停止后再启动作业。作业停止详情请参见作业停止。

附录

附录1:Tablestore连接器

实时计算Flink版内置了表格存储Tablestore连接器,用于Tablestore的数据读写与同步。

源表

DDL定义
数据表

数据表作为源表的DDL定义示例如下:

-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false'
);
时序表

时序表作为源表的DDL定义示例如下:

-- 创建源表(时序表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

属性列支持读取待消费字段和Tunnel Service,以及返回数据中的OtsRecordType和OtsRecordTimestamp两个字段。字段说明请参见下表。

字段名

Flink映射名

描述

OtsRecordType

type

数据操作类型。

OtsRecordTimestamp

timestamp

数据操作时间,单位为微秒。

说明

全量读取数据时,取值为0。

WITH参数

参数

适用表

是否必填

描述

connector

通用参数

是

源表的连接器类型。固定取值为ots。

endPoint

通用参数

是

表格存储实例的服务地址,必须使用VPC地址。更多信息,请参见服务地址。

instanceName

通用参数

是

表格存储的实例名称。

tableName

通用参数

是

表格存储的源表名称。

tunnelName

通用参数

是

表格存储源表的通道名称。关于创建通道的具体操作,请参见创建数据通道。

accessId

通用参数

是

阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。

重要

为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID和AccessKey Secret,详情请参见变量管理。

accessKey

通用参数

是

connectTimeout

通用参数

否

连接器连接Tablestore的超时时间,单位为毫秒。默认值为30000。

socketTimeout

通用参数

否

连接器连接Tablestore的Socket超时时间,单位为毫秒。默认值为30000。

ioThreadCount

通用参数

否

IO线程数量。默认值为4。

callbackThreadPoolSize

通用参数

否

回调线程池大小。默认值为4。

ignoreDelete

数据表

否

是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。

skipInvalidData

通用参数

否

是否忽略脏数据。默认值为false,表示不忽略脏数据。如果不忽略脏数据,则处理脏数据时会报错。

重要

仅实时计算引擎VVR 8.0.4及以上版本支持该参数。

retryStrategy

通用参数

否

重试策略。参数取值如下:

  • TIME(默认值):在超时时间retryTimeoutMs内持续进行重试。

  • COUNT:在最大重试次数retryCount内持续进行重试。

retryCount

通用参数

否

重试次数。当retryStrategy设置为COUNT时,可以设置重试次数。默认值为3。

retryTimeoutMs

通用参数

否

重试的超时时间,单位为毫秒。当retryStrategy设置为TIME时,可以设置重试的超时时间。默认值为180000。

streamOriginColumnMapping

通用参数

否

原始列名到真实列名的映射。

说明

原始列名与真实列名之间,请使用半角冒号(:)隔开;多组映射之间,请使用半角逗号(,)隔开。例如origin_col1:col1,origin_col2:col2。

outputSpecificRowType

通用参数

否

是否透传具体的RowType。参数取值如下:

  • false(默认值):不透传,所有数据RowType均为INSERT。

  • true:透传,将根据透传的类型相应设置为INSERT、DELETE或UPDATE_AFTER。

类型映射

Tablestore字段类型

Flink字段类型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

结果表

DDL定义
数据表

数据表作为结果表的DDL定义示例如下:

-- 创建结果表(数据表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);
说明

Tablestore结果表必须定义主键(Primary Key)和至少一个属性列,输出数据以Update方式追加到Tablestore表。

时序表

时序模型结果表需要指定_m_name、_data_source、_tags、_time四个主键,其余配置与数据表的结果表配置相同。目前支持WITH参数,SINK表主键和Map格式主键三种方式指定时序表主键。三种方式_tags列的转换优先级为WITH参数方式的优先级最高,Map格式主键与SINK表主键方式次之。

使用WITH参数方式

使用WITH参数方式定义DDL的示例如下。

-- 创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
    'timeseriesSchema' = '{"measurement":"_m_name", "datasource":"_data_source", "tag_a":"_tags", "tag_b":"_tags", "tag_c":"_tags", "tag_d":"_tags", "tag_e":"_tags", "tag_f":"_tags", "time":"_time"}'
);

-- 将源表数据插入到结果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
使用Map格式主键方式

使用Map格式主键方式定义DDL的示例如下。

说明

Tablestore引入了Flink的Map类型,以便于生成时序模型中时序表的_tags列,Map类型可以支持列的改名、简单函数等映射操作。使用Map时必须保证其中的_tags主键声明位置在第三位。

-- 创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- 将源表数据插入到结果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    MAP[`tag_a`, `tag_b`, `tag_c`, `tag_d`, `tag_e`, `tag_f`] AS tags,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
使用SINK表主键方式

使用SINK表主键方式定义DDL的示例如下。主键定义中的第一位measurement为_m_name列,第二位datasource为_data_source列,最后一位time为time列,中间的多列为tag列。

-- 创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
    PRIMARY KEY(measurement, datasource, tag_a, tag_b, tag_c, tag_d, tag_e, tag_f, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- 将源表数据插入到结果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
WITH参数

参数

适用表

是否必填

说明

connector

通用参数

是

结果表的连接器类型。固定取值为ots。

endPoint

通用参数

是

表格存储实例的服务地址,必须使用VPC地址。更多信息,请参见服务地址。

instanceName

通用参数

是

表格存储的实例名称。

tableName

通用参数

是

表格存储的时序表名称。

accessId

通用参数

是

阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。

重要

为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID和AccessKey Secret,详情请参见变量管理。

accessKey

通用参数

是

valueColumns

数据表

是

插入字段的列名。多个字段以半角逗号(,)分割,例如ID,NAME。

storageType

通用参数

否

重要

当时序表作为结果表时,必须配置为TIMESERIES。

数据存储类型。取值范围如下:

  • WIDE_COLUMN(默认值):数据表

  • TIMESERIES:时序表

timeseriesSchema

时序表

否

重要

当时序表作为结果表时,如果使用WITH参数的方式指定时序表主键,则必须配置该参数。

需要指定为时序表主键的列。

  • 以JSON的key-value格式来指定时序表主键,例如{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}。

  • 配置的主键类型必须与时序表中主键类型一致。其中tags主键可以支持同时包含多列。

connectTimeout

通用参数

否

连接器连接Tablestore的超时时间,单位为毫秒。默认值为30000。

socketTimeout

通用参数

否

连接器连接Tablestore的Socket超时时间,单位为毫秒。默认值为30000。

ioThreadCount

通用参数

否

IO线程数量。默认值为4。

callbackThreadPoolSize

通用参数

否

回调线程池大小。默认值为4。

retryIntervalMs

通用参数

否

重试间隔时间,单位为毫秒。默认值为1000。

maxRetryTimes

通用参数

否

最大重试次数。默认值为10。

bufferSize

通用参数

否

流入多少条数据后开始输出。默认值为5000,表示输入的数据达到5000条就开始输出。

batchWriteTimeoutMs

通用参数

否

写入超时的时间,单位为毫秒。默认值为5000,表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

batchSize

通用参数

否

一次批量写入的条数。默认值为100,最大值为200。

ignoreDelete

通用参数

否

是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。

重要

仅数据表作为源表时可以根据需要配置。

autoIncrementKey

数据表

否

当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。当结果表没有主键自增列时,请不要设置此参数。

重要

仅实时计算引擎VVR 8.0.4及以上版本支持该参数。

overwriteMode

通用参数

否

数据覆盖模式。参数取值如下:

  • PUT(默认值):以PUT方式将数据写入到Tablestore表。

  • UPDATE:以UPDATE方式写入到Tablestore表。

说明

动态列模式下只支持UPDATE模式。

defaultTimestampInMillisecond

通用参数

否

设定写入Tablestore数据的默认时间戳。如果不指定,则使用系统当前的毫秒时间戳。

dynamicColumnSink

通用参数

否

是否开启动态列模式。默认值为false,表示不开启动态列模式。

说明
  • 动态列模式适用于在表定义中无需指定列名,根据作业运行情况动态插入数据列的场景。建表语句中主键需要定义为前若干列,最后两列中前一列的值作为列名变量,且类型必须为String,后一列的值作为该列对应的值。

  • 开启动态列模式时,不支持主键自增列,且参数overwriteMode必须设置为UPDATE。

checkSinkTableMeta

通用参数

否

是否检查结果表元数据。默认值为true,表示检查Tablestore表的主键列和此处的建表语句中指定的主键是否一致。

enableRequestCompression

通用参数

否

数据写入过程中是否开启数据压缩。默认值为false,表示不开启数据压缩。

类型映射

Flink字段类型

Tablestore字段类型

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

SQL示例

同步源表数据到结果表
同步数据表数据到时序表

从源表(数据表)flink_source_table中读取数据,并将结果写入结果表(时序表)flink_sink_table。

SQL示例如下:

-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 使用With参数方式创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
     measurement STRING,
     datasource STRING,
     tag_a STRING,
     `time` BIGINT,
     binary_value BINARY,
     bool_value BOOLEAN,
     double_value DOUBLE,
     long_value BIGINT,
     string_value STRING,
     tag_b STRING,
     tag_c STRING,
     tag_d STRING,
     tag_e STRING,
     tag_f STRING,
     PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
 ) WITH (
     'connector' = 'ots',
     'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
     'instanceName' = 'xxx',
     'tableName' = 'flink_sink_table',
     'accessId' = 'xxxxxxxxxxx',
     'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
     'storageType' = 'TIMESERIES',
     'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
 );
 
--将源表数据插入到结果表
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
同步时序表数据到数据表

从源表(时序表)flink_source_table中读取数据,并将结果写入结果表(数据表)flink_sink_table。

SQL示例如下:

-- 创建源表(时序表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

-- 创建结果表(数据表)的临时表 print_table。
CREATE TEMPORARY TABLE tablestore_target(
    measurement STRING,
    datasource STRING,
    tags STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY (measurement,datasource, tags, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='binary_value,bool_value,double_value,long_value,string_value'
);

--将源表数据插入到结果表
INSERT INTO tablestore_target
SELECT
    _m_name,
    _data_source,
    _tags,
    _time,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from tablestore_stream;
读取源表数据并打印到控制台

批量从源表flink_source_table中读取数据,您可以使用作业调试功能模拟作业运行,调试结果将显示在SQL编辑器下方。

SQL示例如下:

-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 从源表读取数据
SELECT * FROM tablestore_stream LIMIT 100;
读取源表数据并打印到TaskManager日志

从源表flink_source_table中读取数据,并通过Print连接器将结果打印到TaskManager日志中。

SQL示例如下:

-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- 创建结果表的临时表 print_table。
CREATE TEMPORARY TABLE print_table(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
  'connector' = 'print',   -- print连接器
  'logger' = 'true'        -- 控制台显示计算结果
);

-- 打印源表的字段
INSERT INTO print_table
SELECT `order`,orderid,customerid,customername from tablestore_stream;

附录2:配置VVR依赖

  1. 下载VVR依赖。

  2. 上传VVR依赖。

    1. 登录实时计算控制台。

    2. 单击目标工作空间操作列下的控制台。

    3. 在左侧导航栏,单击文件管理。

    4. 在资源文件页签,单击上传资源,选择要上传的VVR依赖的JAR包。

  3. 在目标作业的SQL编辑区域右侧页签,单击更多配置。在附加依赖文件项,选择目标VVR依赖的JAR包。

相关文章

使用教程 2025-04-22 14:36

本文为您介绍如何使用Flink计算表格存储(Tablestore)的数据,

目录
Copyright © 2025 your company All Rights Reserved. Powered by 赛尔网络.
京ICP备14022346号-15
gongan beian 京公网安备11010802041014号