Pulsar Producer

支持的管道类型:

  •  Data Collector

Pulsar Producer目标端将数据写入Apache Pulsar集群中的主题。Pulsar Producer目标端连接到主题,并将消息发布到Pulsar broker进行处理。

当配置Pulsar Producer目标端时,您将定义URL以连接到Pulsar。您还定义了将消息发布到的主题。如果在配置的主题名称中包含表达式,则可以配置目标端以将消息发布到单个主题或多个主题。

您可以配置目标端以使用Pulsar安全功能。您还可以根据需要配置高级属性,例如在发布消息时使用的分区或压缩类型,或者目标端是异步还是同步发布消息。

有关Pulsar主题和生产者的更多信息,请参阅Apache Pulsar文档。

启用安全性

如果Pulsar集群使用安全功能,则必须将Pulsar Producer目标端配置为使用相同的安全功能连接到Pulsar。

Pulsar集群可以使用以下安全功能:

TLS transport encryption
配置TLS传输加密后,Pulsar集群将使用TLS加密Pulsar服务器与客户端之间的所有流量。Pulsar服务器使用密钥和证书,客户端使用该密钥和证书来验证服务器的身份。
Mutual TLS authentication
当配置为TLS传输加密时,可以将Pulsar集群配置为使用相互TLS身份验证。通过相互身份验证,客户端还使用服务器用来验证客户端身份的密钥和证书。
  1. 在阶段的“Pulsar”选项卡上,将“Pulsar URL”属性设置为代理服务的安全URL。

    URL使用以下格式:

    pulsar+ssl://<host name>:<broker service TLS port>/

    例如:

    pulsar+ssl://pulsar.us-west.example.com:6651/
  2. 在阶段的“Security选项卡上,选择“Enable TLS
  3. 将包含对Pulsar集群证书签名的证书颁发机构(CA)的PEM文件存储在Data Collector资源目录$SDC_RESOURCES中。
    有关为Pulsar集群创建证书的信息,请参阅Pulsar文档
  4. 在阶段的“Security选项卡上,在“CA Certificate PEM 属性中输入CA证书PEM文件的名称。
  5. 如果还为Pulsar集群配置了相互TLS身份验证,请在阶段的“Security选项卡上选择“Enable Mutual Authentication” 。
  6. 创建客户端证书和客户端私钥PEM文件以供该阶段使用。
    有关为Pulsar创建客户端证书的信息,请参阅Pulsar文档
  7. 将为该阶段创建的客户端证书和客户端私钥PEM文件存储在Data Collector资源目录$SDC_RESOURCES中。
  8. 在阶段的“Security选项卡上,在“Client Certificate PEM和“Client Key PEM属性中输入客户端文件的名称。

数据格式

Pulsar Producer目标端根据您选择的数据格式将数据写入Pulsar。您可以使用以下数据格式:

Binary
该阶段将二进制数据写入记录中的单个字段。
Delimited
目标端将记录写为分隔数据。使用此数据格式时,根字段必须是list或list-map。

您可以使用以下分隔格式类型:

  • Default CSV-包含逗号分隔值的文件。忽略文件中的空行。
  • RFC4180 CSV-严格遵循RFC4180准则的逗号分隔文件。
  • MS Excel CSV -Microsoft Excel逗号分隔文件。
  • MySQL CSV -MySQL逗号分隔文件。
  • Tab-Separated Values -包含制表符分隔的值的文件。
  • PostgreSQL CSV -PostgreSQL逗号分隔文件。
  • PostgreSQL文本 -PostgreSQL文本文件。
  • Custom -使用用户定义的分隔符,转义符和引号字符的文件。
  • Multi Character Delimited-使用多个用户定义的字符分隔字段和行以及单个用户定义的转义和引号字符的文件。
JSON
目标端将记录写为JSON数据。您可以使用以下格式之一:

  • Array-每个文件都包含一个数组。在数组中,每个元素都是每个记录的JSON表示形式。
  • Multiple objects-每个文件都包含多个JSON对象。每个对象都是一条记录的JSON表示形式。
Protobuf
在一条消息中写入一条记录。在描述符文件中使用用户定义的消息类型和消息类型的定义来生成消息。
有关生成描述符文件的信息,请参阅Protobuf数据格式先决条件。
SDC Record
目标端以SDC记录数据格式写入记录。
Text
目标端将数据从单个文本字段写入目标系统。配置阶段时,请选择要使用的字段。
您可以配置字符以用作记录分隔符。默认情况下,目标使用UNIX样式的行尾(\n)分隔记录。
当记录不包含选定的文本字段时,目标端可以将缺少的字段报告为错误或忽略该丢失的字段。默认情况下,目标端报告错误。
当配置为忽略缺少的文本字段时,目标端可以丢弃该记录或写入记录分隔符以为该记录创建一个空行。默认情况下,目标端丢弃记录。
XML
目标端为每个记录创建一个有效的XML文档。目标端要求记录具有一个包含其余记录数据的单个根字段。有关如何完成此操作的详细信息和建议,请参阅记录结构要求。
目标端可以包括缩进以产生人类可读的文档。它还可以验证所生成的XML是否符合指定的模式定义。具有无效模式的记录将根据为目标端配置的错误处理进行处理。

配置Pulsar Producer

配置Pulsar Producer目标端以将数据写入Pulsar主题。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    Stage Library 您要使用的库版本。
    Required Fields 必须包含用于将记录传递到阶段的记录的数据的字段。

    提示:您可能包括阶段使用的字段。

    根据为管道配置的错误处理,处理不包括所有必填字段的记录。

    Preconditions 必须评估为TRUE的条件才能使记录进入处理阶段。单击添加以创建其他前提条件。

    根据为该阶段配置的错误处理,处理不满足所有前提条件的记录。

    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。
  2. 在“Pulsar”选项卡上,配置以下属性:
    Pulsar属性 描述
    Pulsar URL Pulsar Web服务或代理服务的URL。

    如果未为Pulsar集群启用TLS,请以以下格式输入Web服务或代理服务URL:

    • Web服务URL- http://<host name>:<web service port>。例如: http://pulsar.us-west.example.com:8080
    • Broker服务URL- pulsar://<host name>:<broker service port>。例如: pulsar://pulsar.us-west.example.com:6650

    如果为Pulsar集群启用了TLS,请以以下格式输入安全代理服务URL:

    pulsar+ssl://<host name>:<broker service TLS port>

    例如: pulsar+ssl://pulsar.us-west.example.com:6651

    Topic 要向其发布消息的主题的名称。输入以下格式的主题名称:

    {persistent|non-persistent}://<tenant>/<namespace>/<topic name>

    例如,发布到my-tenant租户的my-namespace命名空间下的一个名为 my-sdc-topic持久的话题,输入以下内容作为主题名称:

    persistent://my-tenant/my-namespace/my-sdc-topic

    如果仅输入主题名称,则Pulsar使用默认persistent://public/default/ 位置。例如,要发布到名称空间中属于public租户 的永久主题default,只需输入主题名称,如下所示:

    my-sdc-topic

    如果指定的主题不存在,则Pulsar在管道启动时创建主题。

    您可以使用表达式来定义主题名称。例如,如果my-topic记录中的字段包含主题名称,请输入以下内容作为主题名称:

    persistent://my-tenant/my-namespace/${record:value("/my-topic")}
    Keep Alive Interval (ms) 允许与Pulsar的连接保持空闲状态的毫秒数。在此时间段内目标未发布任何消息后,连接将关闭。目标端必须重新连接到Pulsar。

    默认值为30,000毫秒。

    Operation Timeout (ms) 在将操作标记为失败之前,允许Pulsar Producer创建操作完成的毫秒数。

    默认值为30,000毫秒。

  3. 要启用安全性,请单击“Security选项卡并配置以下属性:
    安全属性 描述
    Enable TLS 使阶段能够通过TLS加密安全地连接到Pulsar。
    Enable Mutual Authentication 使阶段可以使用相互TLS身份验证来安全地连接到Pulsar。
    CA Certificate PEM PEM文件的路径,该文件包含对Pulsar集群证书进行签名的证书颁发机构(CA)。

    输入文件的绝对路径或相对于Data Collector资源目录$SDC_RESOURCES的路径。

    Client Certificate PEM 如果启用了相互身份验证,则是包含为Data Collector创建的客户端证书的PEM文件的路径。

    输入文件的绝对路径或相对于Data Collector资源目录$SDC_RESOURCES的路径。

    Client Key PEM 如果启用了相互身份验证,则是包含为Data Collector创建的客户端专用密钥的PEM文件的路径。

    输入文件的绝对路径或相对于Data Collector资源目录$SDC_RESOURCES的路径。

  4. 在“Advanced选项卡上,可以选择配置高级属性。

    这些属性的默认值在大多数情况下都应该起作用:

    高级属性 描述
    Partition Type 将消息发布到主题时要使用的分区类型:

    • Single
    • Round Robin

    默认为Single。

    Hashing Scheme 选择向哪个分区写入消息时使用的哈希方案。
    Message Key 消息密钥,用于计算分区的哈希值。输入密钥或输入计算结果为该密钥的表达式。
    Compression Type 适用于已发布消息的压缩类型:

    • None
    • LZ4
    • ZLIB

    默认为None。

    Async Send 使目标端能够异步发布消息。清除以同步发布消息。

    有关可用的发送模式的更多信息,请参阅Apache Pulsar文档。

    默认启用。

    Max Pending Messages 异步发送消息时,是队列中等待Pulsar代理确认的最大消息数。

    默认值为1,000。

    Enable Batching 异步发送消息时,启用在单个请求中发送一批消息。清除以在每个请求中发送一条消息。

    默认启用。

    Max Batch Size (messages) 异步发送消息并启用批处理时,要包含在批处理中的最大消息数。

    默认值为2,000。

    Batch Max Publish Latency (ms) 异步发送消息并启用批处理功能时,发送下一批批处理之前要等待的最大毫秒数。

    默认值为1000毫秒。

    Pulsar Configuration Properties

    要使用的其他Pulsar配置属性。使用简单或批量编辑模式,单击添加图标以添加属性。定义Pulsar属性名称和值。

    使用Pulsar期望的属性名称和值。

  5. 在“Data Format选项卡上,配置以下属性:
    数据格式属性 描述
    Data Format 要读取的数据类型。使用以下选项之一:

    • Binary
    • Delimited
    • JSON
    • Protobuf
    • SDC Record
    • Text
    • XML
  6. 对于二进制数据,在“Data Format选项卡上,配置以下属性:
    二进制属性 描述
    Binary Field Path 包含二进制数据的字段。
  7. 对于分隔数据,在“Data Format选项卡上,配置以下属性:
    分隔属性 描述
    Delimiter Format 分隔数据的格式:

    • Default CSV-包含逗号分隔值的文件。忽略文件中的空行。
    • RFC4180 CSV-严格遵循RFC4180准则的逗号分隔文件。
    • MS Excel CSV -Microsoft Excel逗号分隔文件。
    • MySQL CSV -MySQL逗号分隔文件。
    • Tab-Separated Values -包含制表符分隔的值的文件。
    • PostgreSQL CSV -PostgreSQL逗号分隔文件。
    • PostgreSQL Text -PostgreSQL文本文件。
    • Custom -使用用户定义的分隔符,转义符和引号字符的文件。
    • Multi Character Delimited-使用多个用户定义的字符分隔字段和行以及单个用户定义的转义和引号字符的文件。
    Header Line 指示是否创建标题行。
    Replace New Line Characters 用配置的字符串替换换行符。

    在将数据写为单行文本时推荐使用。

    New Line Character Replacement 用于替换每个换行符的字符串。例如,输入一个空格以将每个换行符替换为一个空格。

    留空以删除新行字符。

    Delimiter Character 自定义分隔符格式的分隔符。选择一个可用选项,或使用“Other”输入自定义字符。

    您可以输入使用格式\ U A的Unicode控制符NNNN,其中N是数字0-9或字母AF十六进制数字。例如,输入\ u0000以将空字符用作分隔符,或输入\ u2028以将行分隔符用作分隔符。

    默认为竖线字符(|)。

    Escape Character 自定义分隔符格式的转义符。选择一个可用选项,或使用“Other”输入自定义字符。

    默认为反斜杠字符(\)。

    Quote Character 自定义分隔符格式的引号字符。选择一个可用选项,或使用“Other”输入自定义字符。

    默认为引号字符(””)。

    Charset 写入数据时使用的字符集。
  8. 对于JSON数据,在“Data Format”选项卡上,配置以下属性:
    JSON属性 描述
    JSON Content 写入JSON数据的方法:

    • JSON Array of Objects-每个文件都包含一个数组。在数组中,每个元素都是每个记录的JSON表示形式。
    • Multiple JSON Objects-每个文件包含多个JSON对象。每个对象都是记录的JSON表示形式。
    Charset 写入数据时使用的字符集。
  9. 对于protobuf数据,在“Data Format选项卡上,配置以下属性:
    Protobuf属性 描述
    Protobuf Descriptor File 要使用的描述符文件(.desc)。描述符文件必须位于Data Collector资源目录中$SDC_RESOURCES

    有关环境变量的更多信息,请参阅《 Data Collector环境配置》。有关生成描述符文件的信息,请参阅Protobuf数据格式先决条件。

    Message Type 写入数据时使用的消息类型的全限定名称。

    使用以下格式: <package name>.<message type>

    使用在描述符文件中定义的消息类型。

  10. 对于文本数据,在“Data Format选项卡上,配置以下属性:
    文本属性 描述
    Text Field Path 包含要写入的文本数据的字段。所有数据必须合并到指定字段中。
    Record Separator 用于分隔记录的字符。使用任何有效的Java字符串文字。例如,当写入Windows时,您可能会\r\n用来分隔记录。

    默认情况下,目标使用 \n

    On Missing Field 当记录不包含文本字段时,确定目标是将丢失的字段报告为错误还是忽略该丢失的字段。
    Insert Record Separator if No Text 当配置为忽略缺少的文本字段时,插入配置的记录分隔符字符串以创建一个空行。

    如果未选择,则丢弃没有文本字段的记录。

    Charset 写入数据时使用的字符集。
  11. 对于XML数据,在“Data Format选项卡上,配置以下属性:
    XML属性 描述
    Pretty Format 添加缩进以使生成的XML文档更易于阅读。相应地增加记录大小。
    Validate Schema 验证生成的XML是否符合指定的模式定义。具有无效模式的记录将根据为目标端配置的错误处理进行处理。

    要点:无论是否验证XML模式,目标端都需要特定格式的记录。更多信息,请参见记录结构要求。
    XML Schema 用于验证记录的XML模式。