覆盖主要内容
版本: 6.1.0

使用参数动态抽取数据

案例说明

数据抽取任务需要随着时间的变化动态调整抽取参数,如抽取时间、抽取批次等。本案例将介绍如何使用参数动态抽取数据。

数据准备

本案例中“从数据库中读取数据”小节将使用建表脚本etl_test_input.sqletl_test_output.sql以及数据文件etl_test_input.txt

本案例中“向数据库中插入数据”小节将使用productlist_LUX_200908.txt和建表脚本create_products.sql

本案例中“创建参数查询”、“基于时间戳的CDC”和“基于快照的CDC”三个小节将利用 Sakila 数据库中的 custmor 表和 cdc_time 表,Sakila 是 MySQL 中的一个示例数据库(sample database),提供了一个标准的方案,可用于自学,写书,教程,文章以及示例等。详细介绍请参考 Sakila数据集介绍 。关于Sakila数据库,读者亦可参考相关的官方文档sakila-db.zip

从数据库中读取数据

在进行本小节前,需要提前在数据库建好对应的表。将计算【start_date】与【end_date】字段之间所间隔的天数,并根据所隔天数来设定字段【performance】的值。

通过表输入组件从数据库中读取数据后,使用字段选择组件可以保留所需字段。然后在“计算器”组件中新建【diff_date】字段用于计算日期间隔。计算好间隔天数,在通过“数值范围”组件设置【performance】字段的值,其中【performance】字段为“数值范围”组件产生的新的输出字段。

具体操作如下:

  1. 新建转换,拖拽表输入组件至画布,双击组件,在弹出的对话框中选择数据库连接,单击获取SQL查询语句,在对应的文本框中即可获得对应表的查询。当然,也可以在文本框中编写自定义SQL查询语句。配置如下图所示:

  2. 配置好表输入组件后,选中该组件,右击并选中预览查看数据。结果如下图所示:

  3. 将字段选择组件拖至画布,并建立从表输入字段选择的连接。双击字段选择,点击右键“获取选择字段”,配置如下图所示:

  4. 拖拽计算器组件至画布,并建立从字段选择计算器之间的连接(在弹出的提示框中选择主输出步骤),右键“插入”,双击填写新字段信息,计算器组件配置如下图所示:

  5. 拖拽数值范围组件至画布,建立从计算器数值范围之间的连接,数值范围配置如下图所示:

  6. 拖拽表输出组件至画布,建立从数值范围表输出之间的连接,表输出配置如下图所示:

  7. 完成所有步骤后,整个转换视图如下图所示:

  8. 点击画布左上角按钮运行转换,并在弹出的提示框中点击启动按钮,即可运行整个转换。结果如下图所示:

运行转换前etl_test_output表:

  运行转换后etl_test_output表:

  运行结果:

向数据库中插入数据

本小节使用 “插入/更新” 组件将文本文件“productist_LUX_200908.txt”中的内容更新到数据库中。在更新数据之前,要确保数据库中的表存在,具体操作如下:

  1. 输入中拖入 CSV文件输入 组件,配置并预览数据,结果如下图所示:

    2. 在 转换 中拖入 增加常量 组件并连接上一步骤,连接时若出现提示框,选择主输出步骤,双击该组件,点击新增字段,配置如下图所示:

 

  1. 输出 中拖入 插入/更新 组件并连接至上一步骤,双击 “插入/更新”,选择查询字段的右键菜单获取字段,选择更新字段的右键菜单获取字段,从下拉列表中修改字段名,配置如下图所示:

  4. 从 应用 中拖入 写日志 组件并连接上一步骤,连接时若出现提示框,选择错误输出步骤,完整的转换视图如下图所示:

  5. 右击该组件并选择定义错误处理,在弹出的提示框中添入如下图所示内容:

  1. 运行转换之前,可以先检查要更新表的数据,在运行转换之后再比较表的变化。运行转换结果如下图所示:

创建参数查询

本小节将介绍如何使用参数化查询,使用表输入组件动态抽取数据。目前,有两种参数化的查询方法:使用变量替换和使用参数,具体操作如下:

使用参数步骤

  1. 新建转换,将自定义常量数据组件拖至画布,以此来添加一些数据。双击组件,在元数据中先定义is_active,created两列并选择好数据类型,如下图所示:

在数据中插入一条数据

自定义常量数据用来给表输入步骤提供一个或多个参数,这些参数是来替换表输入步骤的SQL语句里的问号。

  1. 表输入组件拖至画布,双击组件,在弹出的对话框中选择数据库连接,单击 获取SQL查询语句,在对应的文本框中即可获得对应表的查询。当然,也可以在文本框中编写自定义SQL查询语句。配置如下图所示:

  1. 空操作(什么也不做)组件拖至画布,并按顺序连接。若提示选择步骤,依然选择主输出步骤。完整转换如下图所示:

  1. 运行转换,结果如下图所示:

  1. 选中 空操作(什么也不做) 组件,右击并选中预览查看数据。结果如下图所示:

注:例子中的自定义常量数据步骤只用来演示,在实际使用中,一般使用其他步骤替换这个步骤。

使用变量步骤

变量需要在使用变量的转换前先设置变量,设置变量的转换往往是作业里的第一个转换。

  1. 新建转换,名称为“设置变量”,并将自定义常量数据组件拖至画布,以此来添加一些数据。双击组件,在元数据中先定义is_active,created两列并选择好数据类型,并在数据中插入一条数据,如下图所示:

  1. 设置变量组件拖至画布,双击组件,选择字段名称并填入相应变量名,设置变量活动类型,具体如下图所示:

在后面的表输入步骤里可以使用这些变量,查询里的变量名会被变量的值替换,使用变量的表输入步骤。

  1. 将两个组件按顺序连接,如下图所示:

  1. 保存转换后,再新建转换,命名为“使用变量的表输入”,并将表输入组件拖至画布,双击组件,选择数据库连接,单击 获取SQL查询语句,在对应的文本框中即可获得对应表的查询。当然,也可以在文本框中编写自定义SQL查询语句。配置如下图所示:

  1. 保存转换后,再新建作业,将START拖至画布,并将“Pipeline”两次拖至画布中,并按顺序连接,如下图所示:

  1. 分别双击设置两个Pipeline,第一个转换是“设置变量”,第二个转换是“使用变量作为表输入步骤的参数”,具体设置如下图所示:

  1. 运行转换,结果如下图所示: