MQTT Publisher

支持的管道类型:

  •  Data Collector

  •  Data Collector Edge

MQTT Publisher目标端将消息发布到MQTT broker上的主题。该目标端充当发布消息的MQTT客户端,将每个记录写为一条消息。

配置目标时,可以指定连接到MQTT broker所需的信息。当MQTT broker需要用户名和密码时,必须定义连接凭证。您还可以配置SSL/TLS属性,包括默认的传输协议和密码套件。

您在目标端将消息传递到的MQTT broker上指定主题。

您还可以配置服务质量级别和目标端用来启用可靠消息传递的持久性机制。

边缘管道先决条件

在Data Collector Edge管道中, MQTT阶段需要使用中间MQTT broker。

例如,边缘发送管道使用MQTT Publisher目标写入MQTT broker。MQTT broker临时存储数据,直到Data Collector接收管道中的MQTT Subscriber源端读取数据为止。

主题

MQTT Publisher目标端将消息写入MQTT代理上的单个主题。订阅该主题的任何MQTT客户端都会收到消息。主题是代理用来过滤每个已连接客户端的消息的字符串。

配置目标端时,请定义主题名称。您可以在一个主题中包括多个主题级别。例如,以下主题具有三个主题级别:

sales/US/NorthernRegion

您可以使用StreamSets表达式语言来定义主题名称。您不能在MQTT Publisher目标端使用的主题名称中使用MQTT通配符。

有关更多信息,请参见HiveMQ documentation on MQTT topics

数据格式

MQTT Publisher目标端根据您选择的数据格式将消息写入MQTT代理。

MQTT Publisher目标端处理数据格式如下:

Binary
该阶段将二进制数据写入记录中的单个字段。
JSON
目标端将记录写为JSON数据。您可以使用以下格式之一:

  • Array-每个文件都包含一个数组。在数组中,每个元素都是每个记录的JSON表示形式。
  • Multiple objects-每个文件都包含多个JSON对象。每个对象都是记录的JSON表示形式。
SDC Record
目标端以SDC记录数据格式写入记录。
Text
目标端将数据从单个文本字段写入目标系统。配置阶段时,请选择要使用的字段。
您可以配置字符以用作记录分隔符。默认情况下,目标使用UNIX样式的行尾(\n)分隔记录。
当记录不包含选定的文本字段时,目标端可以将缺少的字段报告为错误或忽略该丢失的字段。默认情况下,目标端报告错误。
当配置为忽略缺少的文本字段时,目标端可以丢弃该记录或写入记录分隔符以为该记录创建一个空行。默认情况下,目标端丢弃记录。

配置MQTT发布者目标

配置MQTT Publisher目标端,以将消息写入MQTT代理。

在Data Collector Edge管道中, MQTT Publisher目标端需要一个中间MQTT broker。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    Required Fields 必须包含用于将记录传递到阶段的记录的数据的字段。

    提示:您可能包括阶段使用的字段。

    根据为管道配置的错误处理,处理不包括所有必填字段的记录。

    Preconditions 必须评估为TRUE的条件才能使记录进入处理阶段。单击 添加以创建其他前提条件。

    根据为该阶段配置的错误处理,处理不满足所有前提条件的记录。

    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。对集群管道无效。
  2. 在“MQTT”选项卡上,配置以下属性:
    MQTT属性 描述
    Broker URL MQTT broker URL。输入以下格式:

    <tcp | ssl>://<hostname>:<port>

    使用ssl与代理进行安全连接。

    例如:

    tcp://localhost:1883
    Client ID MQTT客户端ID。在连接到同一代理的所有客户端上,该ID必须唯一。

    您可以定义一个计算结果为客户端ID的表达式。例如,输入以下表达式以使用唯一的管道ID作为客户端ID:

    ${pipeline:id()}

    如果管道包含多个MQTT阶段,并且您想将唯一的管道ID用作两个阶段的客户机ID,请在客户机ID之前加上以下字符串:

    sub-${pipeline:id()} and pub-${pipeline:id()} 

    否则,所有阶段将使用相同的客户端ID。这可能会导致出现问题,例如消息消失。

    Topic 要发布到的主题。使用简单或批量编辑模式,单击添加图标以阅读其他主题。
    Quality of Service 确定用于保证消息传递的服务质量级别:

    • At Most Once (0)
    • At Least Once (1)
    • Exactly Once (2)

    有关更多信息,请参阅有关服务质量级别的HiveMQ文档。

    Client Persistence Mechanism 确定服务质量级别至少一次或恰好一次时,目标端用来保证消息传递的持久性机制。选择以下选项之一:

    • Memory-将消息存储在Data Collector计算机的内存中,直到完成消息传递为止。
    • File-将消息存储在Data Collector计算机上的本地文件中,直到完成消息传递为止。

    当服务质量级别最多为一次时不使用。

    有关更多信息,请参阅有关客户端持久性的HiveMQ文档。

    Client Persistence Data Directory 配置文件持久性时,Data Collector计算机上的本地目录,目标端将文件中的消息临时存储在该目录中。

    启动Data Collector的用户必须具有对该目录的读写权限。

    Keep Alive Interval (secs) 允许与MQTT broker的连接保持空闲状态的最长时间(以秒为单位)。在此时间段内目标端未发布任何消息后,连接将关闭。目标端必须重新连接到MQTT代理。

    默认值为60秒。

    Use Credentials 在“Credentials”选项卡上启用输入凭据。当MQTT broker要求用户名和密码时使用。
    Retain the Message 确定在没有订阅任何MQTT客户端来侦听主题时,MQTT broker是否保留目标端最后发布的消息。

    选中后,MQTT broker将保留目标端发布的最后一条消息。先前发布的所有消息都将丢失。清除后,目标端发布的所有消息都会丢失。

    有关MQTT保留消息的更多信息,请参见http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages。

  3. 在“Credentials选项卡上,输入启用身份验证后要使用的MQTT凭据。
    提示: 要保护敏感信息(例如用户名和密码),可以使用运行时资源或凭据存储。
  4. 要使用SSL/TLS,请在“TLS”选项卡上配置以下属性:

    在Data Collector Edge管道中,仅“Use TLS和“Truststore File属性有效。启用TLS后,为使用PEM格式的信任库文件输入绝对路径。在Data Collector Edge管道中,MQTT Publisher目标端始终使用默认协议和密码套件。它忽略所有其他TLS属性。

    TLS属性 描述
    Use TLS 启用TLS的使用。
    Truststore File 信任库文件的路径。输入文件的绝对路径或相对于Data Collector资源目录$SDC_RESOURCES的路径。

    有关环境变量的更多信息,请参阅《Data Collector环境配置》。

    默认情况下,不使用任何信任库。

    在Data Collector Edge管道中,输入使用PEM格式的文​​件的绝对路径。

    Truststore Type 要使用的信任库的类型。使用以下类型之一:

    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    默认值为Java Keystore File (JKS)。

    Truststore Password 信任库文件的密码。密码是可选的,但建议使用。

    提示:要保护敏感信息(例如密码),可以使用运行时资源或凭据存储。
    Truststore Trust Algorithm 用于管理信任库的算法。

    默认值为SunX509

    Use Default Protocols 确定要使用的传输层安全性(TLS)协议。默认协议是TLSv1.2。要使用其他协议,请清除此选项。
    Transport Protocols 要使用的TLS协议。要使用默认TLSv1.2以外的协议,请单击“添加”图标并输入协议名称。您可以使用简单或批量编辑模式来添加协议。

    注意:较旧的协议不如TLSv1.2安全。
    Use Default Cipher Suites 对SSL/TLS握手使用默认的密码套件。要使用其他密码套件,请清除此选项。
    Cipher Suites 要使用的密码套件。要使用不属于默认密码集的密码套件,请单击“添加”图标并输入密码套件的名称。您可以使用简单或批量编辑模式来添加密码套件。

    输入要使用的其他密码套件的Java Secure Socket Extension (JSSE)名称。

  5. 在“Data Format选项卡上,配置以下属性:
    数据格式属性 描述
    Data Format 消息的数据格式。使用以下数据格式之一:

    • Binary
    • JSON
    • SDC Record
    • Text
  6. 对于二进制数据,在“Data Format选项卡上,配置以下属性:
    二进制属性 描述
    Binary Field Path 包含二进制数据的字段。
  7. 对于JSON数据,在数据格式选项卡上,配置以下属性:
    JSON属性 描述
    JSON Content 写入JSON数据的方法:

    • JSON Array of Objects-每个文件都包含一个数组。在数组中,每个元素都是每个记录的JSON表示形式。
    • Multiple JSON Objects-每个文件包含多个JSON对象。每个对象都是记录的JSON表示形式。
    Charset 写入数据时使用的字符集。
  8. 对于文本数据,在“Data Format选项卡上,配置以下属性:
    文本属性 描述
    Text Field Path 包含要写入的文本数据的字段。所有数据必须合并到指定字段中。
    Record Separator 用于分隔记录的字符。使用任何有效的Java字符串文字。例如,当写入Windows时,您可能会\r\n用来分隔记录。

    默认情况下,目标使用 \n

    On Missing Field 当记录不包含文本字段时,确定目标端是将丢失的字段报告为错误还是忽略该丢失的字段。
    Insert Record Separator if No Text 当配置为忽略缺少的文本字段时,插入配置的记录分隔符字符串以创建一个空行。

    如果未选择,则丢弃没有文本字段的记录。

    Charset 写入数据时使用的字符集。