Google Pub/Sub Publisher

支持的管道类型:

  •  Data Collector

Google Pub/Sub Publisher目标端将消息发布到Google Pub/Sub主题。您可以使用其他目标端写入Google BigQuery,Google Bigtable和Google Cloud Storage。

配置目标端时,您可以定义要向其写入消息的Google Pub/Sub主题ID。您还定义了用于连接到Google Pub/Sub的项目和凭据提供程序。目标端可以从Google应用程序默认凭据或Google Cloud服务帐户凭据文件检索凭据。

默认情况下,Google Pub/Sub Publisher目标端会批量写入邮件。使用高级属性,可以配置触发写入新批处理或禁用批处理以分别写入消息的条件。您还可以配置目标端在读取消息时比写入消息时快采取的操作。

Google Pub/Sub消息包含有效载荷和描述有效载荷内容的可选用户定义属性。当记录包含记录头属性时,Google Pub/Sub Publisher目标端将消息标题中包含记录头属性。目标在消息属性中不包括内部记录头属性。

有关记录头属性的更多信息,请参见Record Header Attributes。

凭据

当Google Pub/Sub Publisher目标端将消息发布到Google Pub/Sub主题时,它必须将凭据传递给Google Pub/Sub。配置目标端以从Google应用程序默认凭据或Google Cloud服务帐户凭据文件中检索凭据。

默认凭据提供程序

配置为使用Google应用程序默认凭据时,目标端将检查GOOGLE_APPLICATION_CREDENTIALS环境变量中定义的凭据文件 。如果环境变量不存在,并且Data Collector在Google Cloud Platform(GCP)中的虚拟机(VM)上运行,则目标端使用与虚拟机实例关联的内置服务帐户。

有关默认凭据的更多信息,请参阅Google Developer文档中的Google Application默认凭据。

完成以下步骤以在环境变量中定义凭据文件:

  1. 使用Google Cloud Platform Console或 gcloud命令行工具创建Google服务帐户,并使您的应用程序使用该帐户进行API访问。

    例如,要使用命令行工具,请运行以下命令:

    gcloud iam service-accounts create my-account
    gcloud iam service-accounts keys create key.json --iam-account=my-account@my-project.iam.gserviceaccount.com
  2. 将生成的凭据文件存储在Data Collector计算机上。
  3. GOOGLE_APPLICATION_CREDENTIALS 环境变量添加到适当的文件,并将其指向凭据文件。

    使用安装类型所需的方法来修改环境变量。

    如下设置环境变量:

    export GOOGLE_APPLICATION_CREDENTIALS="/var/lib/sdc-resources/keyfile.json"
  4. 重新启动Data Collector以启用更改。
  5. 在该阶段的“Credentials选项卡上, 为凭据提供者选择“Default Credentials Provider”。

服务帐户凭据(JSON)

配置为使用Google Cloud服务帐户凭据文件时,目标端将检查目标端属性中定义的文件。

完成以下步骤以使用服务帐户凭据文件:

  1. 生成JSON格式的服务帐户凭据文件。

    使用Google Cloud Platform Console或gcloud命令行工具来生成和下载凭据文件。有关更多信息,请参阅Google Cloud Platform文档中的生成服务帐户凭据。

  2. 将生成的凭证文件存储在Data Collector计算机上。

    最佳做法是将文件存储在 Data Collector资源目录$SDC_RESOURCES中 。

  3. 在该阶段的“Credentials选项卡上,为凭证提供者选择“Service Account Credentials File,然后输入凭证文件的路径。

数据格式

Google Pub/Sub Publisher根据您选择的数据格式将数据写入Google Pub/Sub。您可以使用以下数据格式:

Avro
目标根据Avro模式写入记录。您可以使用以下方法之一来指定Avro模式定义的位置:
  • In Pipeline Configuration – 使用您在阶段配置中提供的模式。
  • In Record Header – 使用avroSchema记录头属性中包含的模式。
  • Confluent Schema Registry – 从Confluent Schema Registry检索模式。Confluent Schema Registry是Avro模式的分布式存储层。您可以配置目标以通过模式ID或主题在Confluent Schema Registry中查找模式。

    如果在阶段或记录头属性中使用Avro模式,则可以选择配置目标以向Confluent Schema Registry注册Avro模式。

目标端在每个文件中都包含模式定义。
您可以使用Avro支持的压缩编解码器压缩数据。使用Avro压缩时,请避免在目标端使用其他压缩属性。
Binary
该阶段将二进制数据写入记录中的单个字段。
Delimited
目标端将记录写为分隔数据。使用此数据格式时,根字段必须是list或list-map。

您可以使用以下分隔格式类型:

  • Default CSV-包含逗号分隔值的文件。忽略文件中的空行。
  • RFC4180 CSV-严格遵循RFC4180准则的逗号分隔文件。
  • MS Excel CSV -Microsoft Excel逗号分隔文件。
  • MySQL CSV -MySQL逗号分隔文件。
  • Tab-Separated Values -包含制表符分隔的值的文件。
  • PostgreSQL CSV -PostgreSQL逗号分隔文件。
  • PostgreSQL文本 -PostgreSQL文本文件。
  • Custom -使用用户定义的分隔符,转义符和引号字符的文件。
  • Multi Character Delimited-使用多个用户定义的字符分隔字段和行以及单个用户定义的转义和引号字符的文件。
JSON
目标端将记录写为JSON数据。您可以使用以下格式之一:

  • Array-每个文件都包含一个数组。在数组中,每个元素都是每个记录的JSON表示形式。
  • Multiple objects-每个文件都包含多个JSON对象。每个对象都是一条记录的JSON表示形式。
Protobuf
在每个文件中写入一批消息。
在描述符文件中使用用户定义的消息类型和消息类型的定义来在文件中生成消息。
有关生成描述符文件的信息,请参阅Protobuf数据格式先决条件。
SDC Record
目标端以SDC记录数据格式写入记录。
Text
目标端将数据从单个文本字段写入目标系统。配置阶段时,请选择要使用的字段。
您可以配置字符以用作记录分隔符。默认情况下,目标使用UNIX样式的行尾(\n)分隔记录。
当记录不包含选定的文本字段时,目标端可以将缺少的字段报告为错误或忽略该丢失的字段。默认情况下,目标端报告错误。
当配置为忽略缺少的文本字段时,目标端可以丢弃该记录或写入记录分隔符以为该记录创建一个空行。默认情况下,目标端丢弃记录。
XML
目标端为每个记录创建一个有效的XML文档。目标端要求记录具有一个包含其余记录数据的单个根字段。有关如何完成此操作的详细信息和建议,请参阅记录结构要求。

目标端可以包括缩进以产生人类可读的文档。它还可以验证所生成的XML是否符合指定的模式定义。具有无效模式的记录将根据为目标端配置的错误处理进行处理。

配置Google Pub/Sub Publisher目标端

配置Google Pub/Sub Publisher目标端,以将消息写入Google Pub/Sub主题。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    Required Fields 必须包含用于将记录传递到阶段的记录的数据的字段。

    提示:您可能包括阶段使用的字段。

    根据为管道配置的错误处理,处理不包括所有必填字段的记录。

    Preconditions 必须评估为TRUE的条件才能使记录进入处理阶段。单击 添加以创建其他前提条件。

    根据为该阶段配置的错误处理,处理不满足所有前提条件的记录。

    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。对集群管道无效。
  2. 在“Pub/Sub选项卡上,配置以下属性:
    发布/订阅属性 描述
    Topic ID Google Pub/Sub主题ID,用于向其中写入消息。
  3. 在“Credentials选项卡上,配置以下属性:
    凭据属性 描述
    Project ID 要连接的Google Pub/Sub项目ID。
    Credentials Provider 用于连接到Google Pub/Sub的凭据提供程序:

    • 默认凭证提供者
    • 服务帐户凭证文件(JSON)
    Credentials File Path (JSON) 使用Google Cloud服务帐户凭据文件时,该路径是目标用来连接到Google Pub/Sub的文件的路径。凭证文件必须是JSON文件。

    输入相对于Data Collector资源目录$SDC_RESOURCES的路径,或输入绝对路径。

  4. 在“Advanced选项卡上,配置以下属性:
    高级属性 描述
    Request Bytes Threshold 触发批量发送消息的累积消息大小。以字节为单位指定。

    默认值为1000。

    Messages Count Threshold 触发批量发送消息的累积消息数。

    默认值为100。

    Default Delay Threshold (ms) 自从第一条消息触发以批处理形式发送消息以来经过的时间。以毫秒为单位指定。

    默认值为1。

    Batch Enabled 选择此选项可让目标端分批发送消息。禁用后,目标端将忽略阈值属性单独写入每个消息。
    Max Outstanding Message Count 在采取措施控制消息流之前,目标端存储在内存中的未处理消息数。当目标端读取消息的速度比写入消息的速度快时,您可能希望控制消息的流向。

    设置为0永远不会基于消息计数来控制流。若要在使用批处理时控制消息流,请将其设置为大于消息计数阈值的数字。

    Max Outstanding Request Bytes 在采取措施控制消息流之前,目标端存储在内存中的未处理字节数。

    设置为0永远不会根据消息大小控制流。若要在使用批处理时控制消息流,请将其设置为大于请求字节阈值的数字。

    Limit Exceeded Behavior 未处理的邮件数或大小超过指定的限制时采取的措施。选择以下选项之一:

    • Throw Exception-触发管道错误处理。
    • Block-停止处理新消息,直到已成功写入存储的消息。
    • Ignore-丢弃新消息,直到已成功写入存储的消息。
  5. 在“Data Format选项卡上,配置以下属性:
    数据格式属性 描述
    Data Format 要写入的数据格式。使用以下选项之一:

    • Avro
    • Binary
    • Delimited
    • JSON
    • Protobuf
    • SDC Record
    • Text
    • XML
  6. 对于Avro数据,在“Data Format选项卡上,配置以下属性:
    Avro属性 描述
    Avro Schema Location 写入数据时要使用的Avro模式定义的位置:

    • In Pipeline Configuration-使用您在阶段配置中提供的模式。
    • In Record Header-在avroSchema 记录头属性中使用模式 。仅在为所有记录定义avroSchema属性时使用。
    • Confluent Schema Registry-从Confluent Schema Registry检索模式。
    Avro Schema 用于写入数据的Avro模式定义。

    您可以选择使用该runtime:loadResource 函数来加载存储在运行时资源文件中的模式定义。

    Register Schema 向Confluent Schema Registry注册新的Avro模式。
    Schema Registry URLs 汇合的模式注册表URL,用于查找模式或注册新模式。要添加URL,请单击添加,然后以以下格式输入URL:

    http://<host name>:<port number>
    Basic Auth User Info 使用基本身份验证时连接到Confluent Schema Registry所需的用户信息。

    schema.registry.basic.auth.user.info使用以下格式从Schema Registry中的设置中输入密钥和机密 :

    <key>:<secret>
    提示: 要保护敏感信息(例如用户名和密码),可以使用运行时资源或凭据存储。
    Look Up Schema By 在Confluent Schema Registry中查找模式的方法:

    • Subject-查找指定的Avro模式主题。
    • Schema ID-查找指定的Avro模式ID。
    Schema Subject Avro模式可以在Confluent Schema Registry中进行查找或注册。

    如果要查找的指定主题具有多个模式版本,则目标对该主题使用最新的模式版本。要使用旧版本,请找到相应的模式ID,然后将“Look Up Schema By 属性设置为“Schema ID”。

    Schema ID 在Confluent Schema Registry中查找的Avro模式ID。
    Include Schema 在每个文件中包含模式。

    注意:省略模式定义可以提高性能,但是需要适当的模式管理,以避免丢失与数据关联的模式的跟踪。
    Avro Compression Codec 要使用的Avro压缩类型。

    使用Avro压缩时,请勿在目标中启用其他可用压缩。

  7. 对于二进制数据,在“Data Format选项卡上,配置以下属性:
    二进制属性 描述
    Binary Field Path 包含二进制数据的字段。
  8. 对于分隔数据,在“Data Format选项卡上,配置以下属性:
    分隔属性 描述
    Delimiter Format 分隔数据的格式:

    • Default CSV-包含逗号分隔值的文件。忽略文件中的空行。
    • RFC4180 CSV-严格遵循RFC4180准则的逗号分隔文件。
    • MS Excel CSV -Microsoft Excel逗号分隔文件。
    • MySQL CSV -MySQL逗号分隔文件。
    • Tab-Separated Values -包含制表符分隔的值的文件。
    • PostgreSQL CSV -PostgreSQL逗号分隔文件。
    • PostgreSQL Text -PostgreSQL文本文件。
    • Custom -使用用户定义的分隔符,转义符和引号字符的文件。
    • Multi Character Delimited-使用多个用户定义的字符分隔字段和行以及单个用户定义的转义和引号字符的文件。
    Header Line 指示是否创建标题行。
    Replace New Line Characters 用配置的字符串替换换行符。

    在将数据写为单行文本时推荐使用。

    New Line Character Replacement 用于替换每个换行符的字符串。例如,输入一个空格以将每个换行符替换为一个空格。

    留空以删除新行字符。

    Delimiter Character 自定义分隔符格式的分隔符。选择一个可用选项,或使用“Other”输入自定义字符。

    您可以输入使用格式\ U A的Unicode控制符NNNN,其中N是数字0-9或字母AF十六进制数字。例如,输入\ u0000以将空字符用作分隔符,或输入\ u2028以将行分隔符用作分隔符。

    默认为竖线字符(|)。

    Escape Character 自定义分隔符格式的转义符。选择一个可用选项,或使用“Other”输入自定义字符。

    默认为反斜杠字符(\)。

    Quote Character 自定义分隔符格式的引号字符。选择一个可用选项,或使用“Other”输入自定义字符。

    默认为引号字符(””)。

    Charset 写入数据时使用的字符集。
  9. 对于JSON数据,在“Data Format”选项卡上,配置以下属性:
    JSON属性 描述
    JSON Content 写入JSON数据的方法:

    • JSON Array of Objects-每个文件都包含一个数组。在数组中,每个元素都是每个记录的JSON表示形式。
    • Multiple JSON Objects-每个文件包含多个JSON对象。每个对象都是记录的JSON表示形式。
    Charset 写入数据时使用的字符集。
  10. 对于protobuf数据,在“Data Format选项卡上,配置以下属性:
    Protobuf属性 描述
    Protobuf Descriptor File 要使用的描述符文件(.desc)。描述符文件必须位于Data Collector资源目录中$SDC_RESOURCES

    有关环境变量的更多信息,请参阅《 Data Collector环境配置》。有关生成描述符文件的信息,请参阅Protobuf数据格式先决条件。

    Message Type 写入数据时使用的消息类型的全限定名称。

    使用以下格式: <package name>.<message type>

    使用在描述符文件中定义的消息类型。

  11. 对于文本数据,在“Data Format选项卡上,配置以下属性:
    文本属性 描述
    Text Field Path 包含要写入的文本数据的字段。所有数据必须合并到指定字段中。
    Record Separator 用于分隔记录的字符。使用任何有效的Java字符串文字。例如,当写入Windows时,您可能会\r\n用来分隔记录。

    默认情况下,目标使用 \n

    On Missing Field 当记录不包含文本字段时,确定目标是将丢失的字段报告为错误还是忽略该丢失的字段。
    Insert Record Separator if No Text 当配置为忽略缺少的文本字段时,插入配置的记录分隔符字符串以创建一个空行。

    如果未选择,则丢弃没有文本字段的记录。

    Charset 写入数据时使用的字符集。
  12. 对于XML数据,在“Data Format选项卡上,配置以下属性:
    XML属性 描述
    Pretty Format 添加缩进以使生成的XML文档更易于阅读。相应地增加记录大小。
    Validate Schema 验证生成的XML是否符合指定的模式定义。具有无效模式的记录将根据为目标端配置的错误处理进行处理。

    要点:无论是否验证XML模式,目标端都需要特定格式的记录。更多信息,请参见记录结构要求。
    XML Schema 用于验证记录的XML模式。