Kinesis Producer

支持的管道类型:

  •  Data Collector

  •  Data Collector Edge

Kinesis Producer目标端将数据写入Amazon Kinesis Streams。当在微服务管道中使用时,目标端还可以向微服务源发送响应。

要将数据写入Amazon Kinesis Firehose传输系统,请使用Kinesis Firehose目标端。要将数据写入Amazon S3,请使用Amazon S3目标端。

配置Kinesis Producer时,您可以为Kinesis集群指定Amazon Web Services连接信息以及要使用的数据格式。当您希望目标端将响应发送到微服务管道中的微服务源时,可以指定要发送的响应的类型。

Kinesis Producer使用以下Kinesis要求写入数据:

  • 批量写入最多500条消息和4.5 MB。
  • 最多允许50 kb/消息。较大的消息将发送到阶段以进行错误处理。

AWS凭据

Data Collector将数据写入Kinesis Producer目标时,它必须将凭证传递给Amazon Web Services。

使用以下方法之一来传递AWS凭证:

IAM角色
当Data Collector 在Amazon EC2实例上运行时,您可以使用AWS管理控制台为EC2实例配置IAM角色。Data Collector使用IAM实例配置文件凭证自动连接到AWS。
要使用IAM角色,请不要配置“Access Key ID”和“Secret Access Key”属性
有关将IAM角色分配给EC2实例的更多信息,请参阅Amazon EC2文档。
AWS访问密钥对
Data Collector未在Amazon EC2实例上运行或EC2实例不具有IAM角色时,您必须配置访问密钥ID秘密访问密钥 属性。
提示:为了保护敏感信息,例如访问密钥对,可以使用运行时资源或凭据存储。
提示:为了保护敏感信息,例如访问密钥对,可以使用运行时资源或凭据存储。

发送微服务响应

当您在微服务管道中使用目标端时,Kinesis Producer目标端可以将响应发送到微服务源。

发送对微服务源的响应时,目标可以发送以下其中一项:

  • 所有成功写入的记录。
  • 来自目标系统的响应 – 有关可能的响应的信息,请参阅目标系统的文档。

数据格式

Kinesis Producer目标端根据您选择的数据格式将数据写入Kinesis。

在Data Collector Edge管道中, 目标端仅支持Binary,JSON,SDC Record和Text数据格式。

Kinesis Producer目标端处理数据格式如下:

Avro
目标端根据Avro模式写入记录。您可以使用以下方法之一来指定Avro模式定义的位置:
  • In Pipeline Configuration – 使用您在阶段配置中提供的模式。
  • In Record Header – 使用avroSchema记录头属性中包含的模式。
  • Confluent Schema Registry – 从Confluent Schema Registry检索模式。Confluent Schema Registry是Avro模式的分布式存储层。您可以配置目标以通过模式ID或主题在Confluent Schema Registry中查找模式。

    如果在阶段或记录头属性中使用Avro模式,则可以选择配置目标以向Confluent Schema Registry注册Avro模式。

目标端在每个文件中都包含模式定义。
您可以使用Avro支持的压缩编解码器压缩数据。使用Avro压缩时,请避免在目标端使用其他压缩属性。
Binary
该阶段将二进制数据写入记录中的单个字段。
Delimited
目标端将记录写为分隔数据。使用此数据格式时,根字段必须是list或list-map。

您可以使用以下分隔格式类型:

  • Default CSV-包含逗号分隔值的文件。忽略文件中的空行。
  • RFC4180 CSV-严格遵循RFC4180准则的逗号分隔文件。
  • MS Excel CSV -Microsoft Excel逗号分隔文件。
  • MySQL CSV -MySQL逗号分隔文件。
  • Tab-Separated Values -包含制表符分隔的值的文件。
  • PostgreSQL CSV -PostgreSQL逗号分隔文件。
  • PostgreSQL文本 -PostgreSQL文本文件。
  • Custom -使用用户定义的分隔符,转义符和引号字符的文件。
  • Multi Character Delimited-使用多个用户定义的字符分隔字段和行以及单个用户定义的转义和引号字符的文件。
JSON
目标端将记录写为JSON数据。您可以使用以下格式之一:

  • Array-每个文件都包含一个数组。在数组中,每个元素都是每个记录的JSON表示形式。
  • Multiple objects-每个文件都包含多个JSON对象。每个对象都是一条记录的JSON表示形式。
Protobuf
在每个文件中写入一批消息。
在描述符文件中使用用户定义的消息类型和消息类型的定义来在文件中生成消息。
有关生成描述符文件的信息,请参阅Protobuf数据格式先决条件。
SDC Record
目标端以SDC记录数据格式写入记录。
Text
目标端将数据从单个文本字段写入目标系统。配置阶段时,请选择要使用的字段。
您可以配置字符以用作记录分隔符。默认情况下,目标使用UNIX样式的行尾(\n)分隔记录。
当记录不包含选定的文本字段时,目标端可以将缺少的字段报告为错误或忽略该丢失的字段。默认情况下,目标端报告错误。
当配置为忽略缺少的文本字段时,目标端可以丢弃该记录或写入记录分隔符以为该记录创建一个空行。默认情况下,目标端丢弃记录。

配置Kinesis Producer目标端

配置Kinesis Producer目标端以将数据写入Amazon Kinesis Streams。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    Required Fields 必须包含用于将记录传递到阶段的记录的数据的字段。

    提示:您可能包括阶段使用的字段。

    根据为管道配置的错误处理,处理不包括所有必填字段的记录。

    Preconditions 必须评估为TRUE的条件才能使记录进入处理阶段。单击 添加以创建其他前提条件。

    根据为该阶段配置的错误处理,处理不满足所有前提条件的记录。

    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。对集群管道无效。
  2. 在“Kinesis”选项卡上,配置以下属性:
    Kinesis属性 描述
    Access Key ID

    AWS访问密钥ID。

    不将IAM角色与IAM实例配置文件凭据一起使用时是必需的。

    Secret Access Key

    AWS秘密访问密钥。

    不将IAM角色与IAM实例配置文件凭据一起使用时是必需的。

    Region 托管Kinesis集群的Amazon Web Services地区。
    Endpoint 当您为区域选择“Other”时要连接的端点。输入端点名称。
    Stream Name Kinesis流名称。
    Partitioning Strategy 将数据写入Kinesis分片的策略:

    • Random-生成随机分区密钥。
    • Expression-使用表达式的结果作为分区键。

    Partition Expression 用于生成用于将数据传递到不同分片的分区键的表达式。

    用于表达式分区策略。

    Kinesis Producer Configuration 其他Kinesis属性。

    添加配置属性时,输入确切的属性名称和值。Kinesis Producer不验证属性名称或值。

    Preserve Record Order 选择以保留记录顺序。启用此选项可能会降低管道性能。
  3. 在“Data Format选项卡上,配置以下属性:
    数据格式属性 描述
    Data Format 要写入的数据格式。使用以下数据格式之一:

    • Avro
    • Binary
    • Delimited
    • JSON
    • Protobuf
    • SDC Record
    • Text

    在Data Collector Edge管道中, 目标端仅支持Binary,JSON,SDC Record和Text数据格式。

  4. 对于Avro数据,在“Data Format选项卡上,配置以下属性:
    Avro属性 描述
    Avro Schema Location 写入数据时要使用的Avro模式定义的位置:

    • In Pipeline Configuration-使用您在阶段配置中提供的模式。
    • In Record Header-在avroSchema 记录头属性中使用模式 。仅在为所有记录定义avroSchema属性时使用。
    • Confluent Schema Registry-从Confluent Schema Registry检索模式。
    Avro Schema 用于写入数据的Avro模式定义。

    您可以选择使用该runtime:loadResource 函数来加载存储在运行时资源文件中的模式定义。

    Register Schema 向Confluent Schema Registry注册新的Avro模式。
    Schema Registry URLs 汇合的模式注册表URL,用于查找模式或注册新模式。要添加URL,请单击添加,然后以以下格式输入URL:

    http://<host name>:<port number>
    Basic Auth User Info 使用基本身份验证时连接到Confluent Schema Registry所需的用户信息。

    schema.registry.basic.auth.user.info使用以下格式从Schema Registry中的设置中输入密钥和机密 :

    <key>:<secret>
    提示: 要保护敏感信息(例如用户名和密码),可以使用运行时资源或凭据存储。
    Look Up Schema By 在Confluent Schema Registry中查找模式的方法:

    • Subject-查找指定的Avro模式主题。
    • Schema ID-查找指定的Avro模式ID。
    Schema Subject Avro模式可以在Confluent Schema Registry中进行查找或注册。

    如果要查找的指定主题具有多个模式版本,则目标对该主题使用最新的模式版本。要使用旧版本,请找到相应的模式ID,然后将“Look Up Schema By 属性设置为“Schema ID”。

    Schema ID 在Confluent Schema Registry中查找的Avro模式ID。
    Include Schema 在每个文件中包含模式。

    注意:省略模式定义可以提高性能,但是需要适当的模式管理,以避免丢失与数据关联的模式的跟踪。
    Avro Compression Codec 要使用的Avro压缩类型。

    使用Avro压缩时,请勿在目标中启用其他可用压缩。

  5. 对于二进制数据,在“Data Format选项卡上,配置以下属性:
    二进制属性 描述
    Binary Field Path 包含二进制数据的字段。
  6. 对于分隔数据,在“Data Format选项卡上,配置以下属性:
    分隔属性 描述
    Delimiter Format 分隔数据的格式:

    • Default CSV-包含逗号分隔值的文件。忽略文件中的空行。
    • RFC4180 CSV-严格遵循RFC4180准则的逗号分隔文件。
    • MS Excel CSV -Microsoft Excel逗号分隔文件。
    • MySQL CSV -MySQL逗号分隔文件。
    • Tab-Separated Values -包含制表符分隔的值的文件。
    • PostgreSQL CSV -PostgreSQL逗号分隔文件。
    • PostgreSQL Text -PostgreSQL文本文件。
    • Custom -使用用户定义的分隔符,转义符和引号字符的文件。
    • Multi Character Delimited-使用多个用户定义的字符分隔字段和行以及单个用户定义的转义和引号字符的文件。
    Header Line 指示是否创建标题行。
    Replace New Line Characters 用配置的字符串替换换行符。

    在将数据写为单行文本时推荐使用。

    New Line Character Replacement 用于替换每个换行符的字符串。例如,输入一个空格以将每个换行符替换为一个空格。

    留空以删除新行字符。

    Delimiter Character 自定义分隔符格式的分隔符。选择一个可用选项,或使用“Other”输入自定义字符。

    您可以输入使用格式\ U A的Unicode控制符NNNN,其中N是数字0-9或字母AF十六进制数字。例如,输入\ u0000以将空字符用作分隔符,或输入\ u2028以将行分隔符用作分隔符。

    默认为竖线字符(|)。

    Escape Character 自定义分隔符格式的转义符。选择一个可用选项,或使用“Other”输入自定义字符。

    默认为反斜杠字符(\)。

    Quote Character 自定义分隔符格式的引号字符。选择一个可用选项,或使用“Other”输入自定义字符。

    默认为引号字符(””)。

    Charset 写入数据时使用的字符集。
  7. 对于JSON数据,在“Data Format”选项卡上,配置以下属性:
    JSON属性 描述
    JSON Content 写入JSON数据的方法:

    • JSON Array of Objects-每个文件都包含一个数组。在数组中,每个元素都是每个记录的JSON表示形式。
    • Multiple JSON Objects-每个文件包含多个JSON对象。每个对象都是记录的JSON表示形式。
    Charset 写入数据时使用的字符集。
  8. 对于protobuf数据,在“Data Format选项卡上,配置以下属性:
    Protobuf属性 描述
    Protobuf Descriptor File 要使用的描述符文件(.desc)。描述符文件必须位于Data Collector资源目录中$SDC_RESOURCES

    有关环境变量的更多信息,请参阅《 Data Collector环境配置》。有关生成描述符文件的信息,请参阅Protobuf数据格式先决条件。

    Message Type 写入数据时使用的消息类型的全限定名称。

    使用以下格式: <package name>.<message type>

    使用在描述符文件中定义的消息类型。

  9. 对于文本数据,在“Data Format选项卡上,配置以下属性:
    文本属性 描述
    Text Field Path 包含要写入的文本数据的字段。所有数据必须合并到指定字段中。
    Record Separator 用于分隔记录的字符。使用任何有效的Java字符串文字。例如,当写入Windows时,您可能会\r\n用来分隔记录。

    默认情况下,目标使用 \n

    On Missing Field 当记录不包含文本字段时,确定目标是将丢失的字段报告为错误还是忽略该丢失的字段。
    Insert Record Separator if No Text 当配置为忽略缺少的文本字段时,插入配置的记录分隔符字符串以创建一个空行。

    如果未选择,则丢弃没有文本字段的记录。

    Charset 写入数据时使用的字符集。
  10. 在微服务管道中使用目标时,在“Response选项卡上,配置以下属性。在非微服务管道中,这些属性将被忽略。
    响应属性 描述
    Send Response to Origin 启用发送对微服务源的响应。
    Response Type 发送到微服务源的响应:

    • 成功写入的记录。
    • 来自目标系统的响应。