处理变更的数据

某些阶段使您可以轻松地处理管道中的变更捕获数据(CDC)或事务数据。

启用CDC的源端可以读取变更捕获数据。一些专门读取变更捕获数据,其他一些可以配置为读取它。当读取变更数据时,它们确定与数据关联的CRUD操作,并在sdc.operation.type记录头属性中包含CRUD操作,例如insert, update, upsert或delete。

启用CRUD的处理器和目标端在写入记录时可以在头属性sdc.operation.type中使用CRUD操作类型,从而使外部系统能够执行适当的操作。

在管道中使用启用CDC的源端和启用CRUD的阶段,可以轻松地将变更数据从一个系统写入另一个系统。您还可以使用启用CDC的源端来写入非CRUD目标端,使用非CDC源端来写入启用CRUD的阶段。有关其工作原理的信息,请参见用例。

CRUD操作头属性

当读取变更数据时,启用CDC的源端将在所有记录中包括sdc.operation.type记录头属性。

启用CRUD的处理器和目标端在写入记录时可以在头属性sdc.operation.type中使用CRUD操作类型,从而使外部系统能够执行适当的操作。

sdc.operation.type记录头属性使用以下整数表示CRUD操作:

  • 1用于INSERT记录
  • 2用于DELETE记录
  • 3用于UPDATE记录
  • 4用于UPSERT记录
  • 5用于不受支持的操作或代码
  • 6用于UNDELETE记录
  • 7用于REPLACE记录
  • 8用于MERGE记录
注意:根据来源系统支持的操作,某些源端仅使用操作的子集。同样,目标端仅识别目标系统支持的操作子集。有关支持的操作的详细信息,请参阅源端和目标端文档。

较早的实现

在早期版本中,某些启用CDC的源端使用不同的记录头属性,但现在它们都包含sdc.operation.type记录头属性。保留所有较早的CRUD头属性以实现向后兼容。

同样,启用CRUD的目标端可以在其他头属性中查找CRUD操作类型,现在可以先查找sdc.operation.type记录头属性,然后再查找替代属性。保留备用头属性功能是为了向后兼容。

启用CDC的阶段

启用CDC的阶段在sdc.operation.type记录头属性中提供CRUD操作类型。一些源端提供备用和附加的头属性。

以下阶段提供CRUD记录头属性:

启用CDC的阶段 CRUD记录头属性
用于Microsoft SQL Server的JDBC Query Consumer 在sdc.operation.type记录头属性中包括CRUD操作类型。

有关更多信息,请参见CRUD记录头属性。

MapR DB CDC 在sdc.operation.type记录头属性中包括CRUD操作类型。

在记录头属性中包括其他CDC信息。

有关更多信息,请参见CRUD操作和CDC头属性。

MongoDB Oplog 在sdc.operation.type记录头属性中包括CRUD操作类型。

可以在记录头属性中包含其他CDC信息,例如op和ns属性。

更多信息,请参见生成的记录。

MySQL Binary Log 在sdc.operation.type记录头属性中包括CRUD操作类型。

在记录字段中包括其他CDC信息。

更多信息,请参见生成的记录。

Oracle CDC客户端 在以下两个标头中均包括CRUD操作类型:

  • sdc.operation.type
  • oracle.cdc.operation

有关更多信息,请参见CRUD操作头属性。

在带有oracle.cdc前缀的记录头属性中包含其他CDC信息,例如oracle.cdc.table。

PostgreSQL CDC Client 在记录中包括CRUD操作类型。

在带有postgres.cdc前缀的记录头属性中包括其他CDC信息,例如postgres.cdc.lsn。

销售队伍 在sdc.operation.type记录头属性中包括CRUD操作类型。

有关更多信息,请参见CRUD操作头属性。

SQL Parser 在以下两个头中均包括CRUD操作类型:

  • sdc.operation.type
  • oracle.cdc.operation

更多信息,请参见生成的记录。

SQL Server CDC Client 在sdc.operation.type记录头属性中包括CRUD操作类型。

在名为jdbc.<CDC column name>的头属性中包含CDC信息。有关更多信息,请参见记录头属性。

SQL Server Change Tracking 在sdc.operation.type记录头属性中包括CRUD操作类型。

在jdbc.SYS_CHANGE头属性中包含来自更改表的其他信息。有关更多信息,请参见记录头属性。

启用CRUD的阶段

以下阶段识别存储在记录头属性中的CRUD操作,并可以基于这些值执行写操作。某些阶段还提供与CRUD相关的属性。

启用CRUD的阶段 支持的操作 阶段处理
JDBC Tee处理器
  • INSERT
  • UPDATE
  • DELETE
根据以下条件确定要使用的操作:

    • sdc.operation.type 记录头属性
    • 阶段中的“Default Operation and Unsupported Operation Handling”属性

包括“Change Log”属性,该属性使您能够根据您使用的启用CDC的源端处理记录。有关更多信息,请参见定义CRUD操作。

Couchbase目标端
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
根据以下条件确定要使用的操作:

    • sdc.operation.type 记录头属性
    • 该阶段中的“Default Write Operation and Unsupported Operation Handling”属性

有关更多信息,请参见定义CRUD操作。

Elasticsearch目标端
  • CREATE (INSERT)
  • UPDATE
  • INDEX (UPSERT)
  • DELETE
  • UPDATE with doc_as_upsert (MERGE)
根据以下条件确定要使用的操作:

    • sdc.operation.type 记录头属性
    • 阶段中的“Default Operation and Unsupported Operation Handling”属性

有关更多信息,请参见定义CRUD操作。

GPSS Producer目标端
  • INSERT
  • UPDATE
  • MERGE
根据以下条件确定要使用的操作:

    • sdc.operation.type 记录头属性
    • 阶段中的“Default Operation and Unsupported Operation Handling”属性

有关更多信息,请参见定义CRUD操作。

JDBC Producer目标端
  • INSERT
  • UPDATE
  • DELETE
根据以下条件确定要使用的操作:

    • sdc.operation.type 记录头属性
    • 阶段中的“Default Operation and Unsupported Operation Handling”属性

包括“Change Log”属性,该属性使您能够根据您使用的启用CDC的源端处理记录。有关更多信息,请参见定义CRUD操作。

Kudu目标端
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
根据以下条件确定要使用的操作:

    • sdc.operation.type 记录头属性
    • 阶段中的“Default Operation and Unsupported Operation Handling”属性

有关更多信息,请参见定义CRUD操作。

MapR DB JSON目标端
  • INSERT
  • UPDATE
  • DELETE
根据以下条件确定要使用的操作:

    • sdc.operation.type 记录头属性
    • 阶段中的“Insert API”和“Set API”属性

有关更多信息,请参见写入MapR DB JSON。

MongoDB目标端
  • INSERT
  • UPDATE
  • REPLACE
  • DELETE
根据以下条件确定要使用的操作:

    • sdc.operation.type 记录头属性
    • 阶段中的“Upsert”属性

有关更多信息,请参见定义CRUD操作。

Redis目标端
  • INSERT
  • UPDATE
  • REPLACE
  • DELETE
根据以下条件确定要使用的操作:

  • sdc.operation.type 记录头属性

有关更多信息,请参见定义CRUD操作。

Salesforce目标端
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
  • UNDELETE
根据以下条件确定要使用的操作:

    • sdc.operation.type 记录头属性
    • 阶段中的“Default Operation and Unsupported Operation Handling”属性

有关更多信息,请参见定义CRUD操作。

Snowflake目标端
  • INSERT
  • UPDATE
  • DELETE
根据以下条件确定要使用的操作:

  • sdc.operation.type 记录头属性

有关更多信息,请参见定义CRUD操作。

处理记录

变更日志可以提供不同格式的记录数据。JDBC Tee处理器和JDBC Producer可以解码大多数变更日志格式,以基于原始变更日志生成记录数据。当使用其他启用CRUD的目标端时,可能需要向管道中添加其他处理以变更记录的格式。

例如,由JDBC Query Consumer创建的Microsoft SQL CDC记录除记录数据外,还包含记录中的CDC字段。您可以使用Field Remover从记录中删除所有不必要的字段。

相反,由MySQL Binary Log源读取的MySQL Server二进制日志在New Data map字段中提供新数据或更新数据,并在Changed Data映射字段中提供更改或删除的数据。您可能要使用Field Flattener处理器使用所需的数据来展平地图字段,并使用Field Remover删除所有不必要的字段。

有关生成记录的格式的详细信息,请参阅启用CDC的源端文档。

用例

您可以在管道中一起或单独使用启用CDC的源端和启用CRUD的目标端。以下是一些典型的用例:

启用CDC的源端与启用CRUD的目标端
您可以使用启用CDC的源端和启用CRUD的目标端来轻松处理已变更记录并将其写入目标系统。

例如,假设您要将CDC数据从Microsoft SQL Server写入Kudu。为此,您可以使用启用CDC的SQL Server CDC Client源从Microsoft SQL Server更改捕获表中读取数据。源将CRUD操作类型放置在sdc.operation.type头属性中,在这种情况下:1表示INSERT,2表示DELETE,3表示UPDATE。

您配置管道以写入启用了CRUD的Kudu目标端。在Kudu目标端中,可以为sdc.operation.type属性中未设置任何值的任何记录指定默认操作,并且可以为无效值配置错误处理。您将默认值设置为INSERT,并将目标端配置为使用此默认值表示无效值。在sdc.operation.type属性中,Kudu目标支持1表示INSERT,2表示DELETE,3表示UPDATE,4表示UPSERT。

当您运行管道时,SQL Server CDC Client源将为每条记录确定CRUD操作类型,并将其写入sdc.operation.type记录头属性。Kudu目标端使用sdc.operation.type属性中的操作来通知Kudu目标端系统如何处理每个记录。sdc.operation.type属性中具有未声明值的任何记录,例如由管道创建的记录,都将被视为INSERT记录。任何具有无效值的记录都使用相同的默认行为。

启用CDC的源端到非CRUD目标端

如果需要将变更数据写入没有启用CRUD的目标端的目标系统,则可以使用Expression Evaluator或脚本处理器将CRUD操作信息从sdc.operation.type头属性移动到字段,因此该信息为保留在记录中。

例如,假设您要从Oracle LogMiner重做日志中读取并将记录与记录字段中的所有CDC信息一起写入Hive表。为此,您将使用Oracle CDC Client源读取重做日志,然后添加一个Expression Evaluator来将sdc.operation.type头属性中的CRUD信息提取到记录中。Oracle CDC客户端将其他CDC信息(例如表名和scn)写入oracle.cdc头属性中,因此您也可以使用表达式将该信息提取到记录中。

非CDC源端到CRUD目标端
从非CDC源端读取数据时,可以使用Expression Evaluator或脚本处理器来定义sdc.operation.type头属性。
例如,假设您要从事务型数据库表中读取数据,并使维表与更改保持同步。您将使用JDBC Query Consumer来读取源表,并使用JDBC Lookup处理器来检查维度表中每个记录的主键值。然后,根据查找处理器的输出,您知道表中是否有匹配的记录。使用Expression Evaluator,设置sdc.operation.type记录头属性——3表示更新具有匹配记录的记录,而1表示插入新记录。
当您将记录传递到JDBC Producer目标端时,目标使用sdc.operation.type头属性中的操作来确定如何将记录写入维表。