使用参数动态抽取数据
案例说明
数据抽取任务需要随着时间的变化动态调整抽取参数,如抽取时间、抽取批次等。本案例将介绍如何使用参数动态抽取数据。
数据准备
本案例中“从数据库中读取数据”小节将使用建表脚本etl_test_input.sql,etl_test_output.sql以及数据文件etl_test_input.txt;
本案例中“向数据库中插入数据”小节将使用productlist_LUX_200908.txt和建表脚本create_products.sql;
本案例中“创建参数查询”、“基于时间戳的CDC”和“基于快照的CDC”三个小节将利用 Sakila 数据库中的 custmor 表和 cdc_time 表,Sakila 是 MySQL 中的一个示例数据库(sample database),提供了一个标准的方案,可用于自学,写书,教程,文章以及示例等。详细介绍请参考 Sakila数据集介绍 。关于Sakila数据库,读者亦可参考相关的官方文档。 sakila-db.zip
从数据库中读取数据
在进行本小节前,需要提前在数据库建好对应的表。将计算【start_date】与【end_date】字段之间所间隔的天数,并根据所隔天数来设定字段【performance】的值。
通过表输入组件从数据库中读取数据后,使用字段选择组件可以保留所需字段。然后在“计算器”组件中新建【diff_date】字段用于计算日期间隔。计算好间隔天数,在通过“数值范围”组件设置【performance】字段的值,其中【performance】字段为“数值范围”组件产生的新的输出字段。
具体操作如下:
- 新建转换,拖拽表输入组件至画布,双击组件,在弹出的对话框中选择数据库连接,单击获取SQL查询语句,在对应的文本框中即可获得对应表的查询。当然,也可以在文本框中编写自定义SQL查询语句。配置如下图所示:
2. 配置好表输入组件后,选中该组件,右击并选中预 览查看数据。结果如下图所示:
3. 将字段选择组件拖至画布,并建立从表输入到字段选择的连接。双击字段选择,点击右键“获取选择字段”,配置如下图所示:
4. 拖拽计算器组件至画布,并建立从字段选择到计算器之间的连接(在弹出的提示框中选择主输出步骤),右键“插入”,双击填写新字段信息,计算器组件配置如下图所示:
5. 拖拽数值范围组件至画布,建立从计算器到数值范围之间的连接,数值范围配置如下图所示:
6. 拖拽表输出组件至画布,建立从数值范围到表输出之间的连接,表输出配置如下图所示:
7. 完成所有步骤后,整个转换视图如下图所示:
8. 点击画布左上角
按钮运行转换,并在弹出的提示框中点击启动按钮,即可运行整个转换。结果如下图所示:
运行转换前etl_test_output表:
运行转换后etl_test_output表:
运行结果:
向数据库中插入数据
本小节使用 “插入/更新” 组件将文本文件“productist_LUX_200908.txt”中的内容更新到数据库中。在更新数据之前,要确保数据库中的表存在,具体操作如下:
- 在输入中拖入 CSV文件输入 组件,配置并预览数据,结果如下图所示:
2. 在 转换 中拖入 增加常量 组件并连接上一步骤,连接时若出现提示框,选择主输出步骤,双击该组件,点击新增字段,配置如下图所示:
- 从 输出 中拖入 插入/更新 组件并连接至上一步骤,双击 “插入/更新”,选择查询字段的右键菜单获取字段,选择更新字段的右键菜单获取字段,从下拉列表中修改字段名,配置如下图所示:
4. 从 应用 中拖入 写日志 组件并连接上一步骤,连接时若出现提示框,选择错误输出步骤,完整的转换视图如下图所示:
5. 右击该组件并选择定义错误处理,在弹出的提示框中添入如下图所示内容:
- 运行转换之前,可以先检查要更新表的数据,在运行转换之后再比较表的变化。运行转换结果如下图所示:
创建参数查询
本小节将介绍如何使用参数化查询,使用表输入组件动态抽取数据。目前,有两种参数化的查询方法:使用变量替换和使用参数,具体操作如下:
使用参数步骤
- 新建转换,将自定义常量数据组件拖至画布,以此来添加一些数据。双击组件,在元数据中先定义is_active,created两列并选择好数据类型,如下图所示:
在数据中插入一条数据
自定义常量数据用来给表输入步骤提供一个或多个参数,这些参数是来替换表输入步骤的SQL语句里的问号。
- 将表输入组件拖至画布,双击组件,在弹出的对话框中选择数据库连接,单击 获取SQL查询语句,在对应的文本框中即可获得对应表的查询。当然,也可以在文本框中编写自定义SQL查询语句。配置如下图所示:
- 将空操作(什么也不做)组件拖至画布,并按顺序连接。若提示选择步骤,依然选择主输出步骤。完整转换如下图所示:
- 运行转换,结果如下图所示:
- 选中 空操作(什么也不做) 组件,右击并选中预览查看数据。结果如下图所示:
注:例子中的自定义常量数据步骤只用来演示,在实际使用中,一般使用其他步骤替换这个步骤。
使用变量步骤
变量需要在使用变量的转换前先设置变量,设置变量的转换往往是作业里的第一个转换。
- 新建转换,名称为“设置变量”,并将自定义常量数据组件拖至画布,以此来添加一些数据。双击组件,在元数据中先定义is_active,created两列并选择好数据类型,并在数据中插入一条数据,如下图所示:
- 将设置变量组件拖至画布,双击组件,选择字段名称并填入相应变量名,设置变量活动类型,具体如下图所示:
在后面的表输入步骤里可以使用这些变量,查询里的变量名会被变量的值替换,使用变量的表输入步骤。
- 将两个组件按顺序连接,如下图所示:
- 保存转换后,再新建转换,命名为“使用变量的表输入”,并将表输入组件拖至画布,双击组件,选择数据库连接,单击 获取SQL查询语句,在对应的文本框中即可获得对应表的查询。当然,也可以在文本框中编写自定义SQL查询语句。配置如下图所示:
- 保存转换后,再新建作业,将START拖至画布,并将“Pipeline”两次拖至画布中,并按顺序连接,如下图所示:
- 分别双击设置两个Pipeline,第一个转换是“设置变量”,第二个转换是“使用变量作为表输入 步骤的参数”,具体设置如下图所示:
- 运行转换,结果如下图所示: