MongoDB

支持的管道类型:

  •  Data Collector

MongoDB源从MongoDB读取数据。Data Collector为每个MongoDB文档生成一条记录。要从MongoDB Oplog中读取更改数据捕获信息,请使用MongoDB Oplog源。

MongoDB源从有上限和无上限的集合中读取。配置MongoDB时,您将定义连接信息,例如连接字符串和MongoDB凭据。您还可以配置偏移量字段,收集类型和初始偏移量。这些属性确定来源如何查询数据库。

当管道停止时,MongoDB源会记录它在何处停止读取。当管道再次启动时,默认情况下源端将从上次保存的偏移开始继续处理。您可以重置源端以处理所有请求的文件。

您可以选择配置高级选项,这些选项用于确定源端如何连接到MongoDB,包括为源端启用SSL/TLS。

源端可以为事件流生成事件。有关数据流触发器和事件框架的更多信息,请参见数据流触发器概述。

注意: StreamSets已使用MongoDB 4.0测试了此阶段。

凭据

根据MongoDB服务器使用的身份验证,将源端配置为不使用身份验证、用户名/密码身份验证或LDAP身份验证。使用用户名/密码身份验证时,还可以使用委托身份验证。

默认情况下,源端不使用身份验证。

要使用用户名/密码或LDAP身份验证,请通过以下方式之一输入所需的凭据:

MongoDB选项卡中的连接字符串
在MongoDB选项卡的连接字符串中输入凭据。
要输入用于用户名/密码身份验证的凭据,请在主机名之前输入用户名和密码。使用以下格式:

mongodb://username:password@host[:port][/[database][?options]]
要输入用于LDAP身份验证的凭据,请在主机名之前输入用户名和密码,并将authMechanism选项设置为PLAIN。使用以下格式:

mongodb://username:password@host[:port][/[database]?authMechanism=PLAIN
Credentials选项卡
在“Credentials”选项卡中选择“Username/Password”或“LDAP authentication”类型。然后输入身份验证类型的用户名和密码。

如果您同时在连接字符串和“Credentials”选项卡中输入凭据,则“Credentials”选项卡优先。

偏移字段和初始偏移

MongoDB使用offset字段跟踪要读取的数据。默认情况下,MongoDB源使用_id字段作为偏移量字段。

您可以使用嵌套的偏移量字段,例如o._id。或者,您可以使用任何对象ID,日期或字符串字段作为偏移量字段。不保证使用默认_id字段以外的任何字段的结果。

使用日期或对象ID字段时,请指定要用作初始偏移量的时间戳。对象ID字段包含一个嵌入的时间戳记,该时间戳记可用于确定起点在集合中的何处开始读取。当您为日期或对象ID字段定义初始偏移量时,请使用以下格式:

YYYY-MM-DD HH:mm:ss

使用字符串字段时,请指定要用作初始偏移量的初始字符串。

注意:如果在管道运行然后停止后更改源端的偏移字段类型,则必须重置源端,然后才能再次运行管道。

读取偏好

您可以配置MongoDB源使用的读取偏好。

读取偏好确定源如何从MongoDB副本集的不同成员读取数据。

您可以使用以下MongoDB读取偏好:

  • Primary-要求从主成员读取。
  • Primary Preferred-首选从主成员读取,但允许从辅助成员读取。
  • Secondary-要求从辅助成员读取。
  • Secondary Preferred-首选从辅助成员读取,但在必要时允许从主成员读取。
  • Nearest-从具有最小网络延迟的成员读取。

默认情况下,源使用“Secondary Preferred”以避免对主成员进行不必要的请求。

事件生成

当MongoDB源完成对所有可用数据的处理,并且超过已配置的批处理等待时间时,它可以生成事件。

MongoDB源事件可以任何逻辑方式使用。例如:

  • 当源端完成处理可用数据时,使用Pipeline Finisher执行程序停止管道并将管道转换为Finished状态。

    重新启动由Pipeline Finisher执行程序停止的管道时,源端将从上次保存的偏移开始继续处理,除非您重置源端。

    有关示例,请参见案例研究:停止管道。

  • 具有用于存储事件信息的目的地。

    有关示例,请参见案例研究:事件存储。

有关数据流触发器和事件框架的更多信息,请参见数据流触发器概述。

事件记录

MongoDB源生成的事件记录具有以下与事件相关的记录头属性。记录头属性存储为字符串值:

记录头属性 描述
sdc.event.type 事件类型。使用以下事件类型:

  • no-more-data-原点完成对所有可用对象的处理并且经过了“最大批处理等待时间”配置的秒数后生成。
sdc.event.version 整数,指示事件记录类型的版本。
sdc.event.creation_timestamp 阶段创建事件的时间戳记。

MongoDB源可以生成以下事件记录:

no-more-data
当MongoDB源完成对所有可用记录的处理并且经过为“Max Batch Wait Time”配置的秒数,而未显示任何新对象被处理时,MongoDB源将生成无数据事件记录。
由源生成的无数据事件记录将sdc.event.type设置为无数据,并包含以下字段:

事件记录字段 描述
record-count 自管道启动或自上一次创建no-more-data事件以来成功生成的记录数。
error-count 自管道启动或自上一次创建no-more-data事件以来生成的错误记录数。

BSON时间戳

处理来自MongoDB 2.6版和更高版本的数据时,MongoDB原始支持MongoDB BSON Timestamp数据类型。

MongoDB BSON时间戳是一种MongoDB数据类型,其中包括时间戳和顺序,如下所示:

<BSON Timestamp field name>:Timestamp(<timestamp>, <ordinal>)

MongoDB源将BSON时间戳转换为地图,如下所示:

<BSON Timestamp field name>{MAP}:
    Timestamp{DATETIME}:<UTC timestamp>
    Ordinal{INTEGER}:<integer ordinal>

例如,事务BSON时间戳记为 (1485449409, 1),将转换为以下“Transaction”map字段:

"Transaction":{
    "Timestamp":Jan 26, 2016 14:50:09PM
    "Ordinal":1
}

启用S​​SL/TLS

您可以启用MongoDB源以使用SSL/TLS连接到MongoDB。

  1. 在该阶段的“Advanced选项卡中,选择“SSL Enabled属性。
  2. 如果MongoDB证书由私有CA签名或不受默认Java信任库信任,请创建自定义信任库文件或修改默认Java信任库文件的副本以将CA添加到该文件中。然后,将数据收集器配置为使用修改后的信任库文件。

    默认情况下,Data Collector使用$JAVA_HOME/jre/lib/security/cacerts的Java信任库文件 。如果您的证书是由默认Java信任库文件中包含的CA签名的,则无需创建信任库文件,可以跳过此步骤。

    在这些步骤中,我们展示了如何修改默认的信任库文件,以将其他CA添加到受信任的CA列表中。如果您希望创建自定义信任库文件,请参阅keytool文档。

    注意:如果已经将Data Collector配置为使用自定义信任库文件来启用HTTPS或到LDAP服务器的安全连接,则只需将此附加CA添加到相同的修改后的信任库文件中即可。
    1. 使用以下命令来设置JAVA_HOME环境变量:
      export JAVA_HOME=<Java home directory>
    2. 使用以下命令来设置SDC_CONF环境变量:
      export SDC_CONF=<Data Collector configuration directory>

      例如,对于RPM安装,请使用:

      export SDC_CONF=/etc/sdc
    3. 使用以下命令将默认的Java truststore文件复制到Data Collector 配置目录:
      cp "${JAVA_HOME}/jre/lib/security/cacerts" "${SDC_CONF}/truststore.jks"
    4. 使用以下keytool命令将CA证书导入到信任库文件中:
      keytool -import -file <MongoDB certificate> -trustcacerts -noprompt -alias <MongoDB alias> -storepass <password> -keystore "${SDC_CONF}/truststore.jks"

      例如:

      keytool -import -file  myMongoDB.pem -trustcacerts -noprompt -alias MyMongoDB -storepass changeit -keystore "${SDC_CONF}/truststore.jks"
    5. 在SDC_JAVA_OPTS环境变量中定义以下选项:
      • javax.net.ssl.trustStore– Data Collector计算机上信任库文件的路径。
      • javax.net.ssl.trustStorePassword -信任库密码。

      使用安装类型所需的方法来修改环境变量。

      例如,如下定义选项:

      export SDC_JAVA_OPTS="${SDC_JAVA_OPTS} -Djavax.net.ssl.trustStore=/etc/sdc/truststore.jks -Djavax.net.ssl.trustStorePassword=mypassword -Xmx1024m -Xms1024m -server -XX:-OmitStackTraceInFastThrow"

      或者,为避免在导出命令中保存密码,请将密码保存在文本文件中,然后按如下所示定义truststore password选项: -Djavax.net.ssl.trustStorePassword = $(cat passwordfile.txt)

      然后,确保密码文件仅可由执行导出命令的用户读取。

    6. 重新启动Data Collector 以启用更改。

配置MongoDB源

配置MongoDB源以从MongoDB读取数据。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    Produce Events 发生事件时生成事件记录。用于 事件处理。
    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。
  2. 在“MongoDB”选项卡上,配置以下属性:
    MongoDB属性 描述
    Connection String

    MongoDB实例的连接字符串。使用以下格式:

    mongodb://host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

    连接到集群时,输入其他节点信息以确保连接。

    如果MongoDB服务器使用用户名/密码或LDAP身份验证,可以在连接字符串中的凭据,如“凭据”中描述的。

    Enable Single Mode 选择以连接到单个MongoDB服务器或节点。如果在连接字符串中定义了多个节点,则该阶段仅连接到第一个节点。

    请谨慎使用此选项。如果阶段无法连接或连接失败,则管道将停止。

    Database MongoDB数据库的名称。
    Collection 要使用的MongoDB集合的名称。
    Capped Collection 该集合已设置上限。清除此选项以读取未加盖的集合。
    Initial Offset 用于开始读取的初始偏移量。当使用日期或对象ID字段作为偏移字段时,请输入具有以下格式的时间戳:YYYY-MM-DD hh:mm:ss

    使用字符串字段时,输入要使用的字符串。

    默认值为:2015-01-01 00:00:00

    Offset Field Type 偏移字段的数据类型:

    • ObjectId-用于对象ID字段。
    • Data-用于日期字段。
    • String-用于字符串字段。

    默认值为ObjectId。

    Offset Field 用于跟踪读取的字段。默认值为_id字段。

    您可以使用嵌套的偏移量字段,例如o._id。您还可以使用任何对象ID,日期或字符串字段。除_id字段外,不保证任何结果。

    Batch Size (records) 批次中允许的最大记录数。
    Max Batch Wait Time 在发送空批次之前,源端将等待填充批次的时间。
    Read Preference 确定源端如何从MongoDB副本集的不同成员读取数据。
  3. 要与MongoDB连接字符串分开输入凭据,请单击“Credentials选项卡并配置以下属性:
    证书 描述
    Authentication Type MongoDB服务器使用的身份验证:Username/Password或LDAP。
    Username MongoDB或LDAP用户名。
    Password MongoDB或LDAP密码。

    提示: 要保护敏感信息(例如用户名和密码),可以使用 运行时资源或凭据存储。
    Authentication Source 可选的备用数据库名称,用于执行委托的身份验证。

    可用于“Username/Password”选项。

  4. (可选)单击“Advanced选项卡以配置源如何连接到MongoDB。

    这些属性的默认值在大多数情况下都应该起作用:

    高级属性 描述
    Connections Per Host 每个主机的最大连接数。

    默认值为100。

    Min Connections Per Host 每个主机的最小连接数。

    默认值为0。

    Connection Timeout 等待连接的最长时间(以毫秒为单位)。

    默认值为10,000。

    Max Connection Idle Time 池化连接可以保持空闲状态的最长时间(以毫秒为单位)。当池化连接超过空闲时间时,连接将关闭。使用0退出此属性。

    默认值为0。

    Max Connection Lifetime 池化连接可以活动的最长时间(以毫秒为单位)。当池化连接超过生存期时,该连接将关闭。使用0退出此属性。

    默认值为0。

    Max Wait Time 线程可以等待连接可用的最长时间(以毫秒为单位)。使用0退出此属性。使用负值无限期等待。

    默认值为120,000。

    Server Selection Timeout Data Collector在引发异常之前等待服务器选择的最长时间(以毫秒为单位)。如果使用0,则在没有可用服务器的情况下立即引发异常。使用负值无限期等待。

    默认值为30,000。

    Threads Allowed to Block for Connection Multiplier 乘数,确定可以等待池中的连接可用的最大线程数。此数字乘以“Connections Per Host”值确定最大线程数。

    默认值为5。

    Heartbeat Frequency Data Collector尝试确定集群中每个服务器的当前状态的频率(以毫秒为单位)。

    默认值为10,000。

    Min Heartbeat Frequency 最小心跳频率(以毫秒为单位)。在检查每个服务器的状态之前,Data Collector至少要等待这么长时间。

    默认值为500。

    Heartbeat Connection Timeout 等待用于群集心跳的连接的最长时间(以毫秒为单位)。

    默认值为20,000。

    Heartbeat Socket Timeout 用于集群心跳的连接的套接字超时的最长时间(以毫秒为单位)。

    默认值为20,000。

    Local Threshold 本地阈值(以毫秒为单位)。将请求发送到其ping时间小于或等于具有最快ping时间加上本地阈值的服务器的服务器。

    默认值为15。

    Required Replica Set Name 用于集群的必需副本集名称。
    Cursor Finalizer Enabled 指定是否启用游标终结器。
    Socket Keep Alive 指定是否启用套接字保持活动状态。
    Socket Timeout 套接字超时的最长时间(以毫秒为单位)。使用0退出此属性。

    默认值为0。

    SSL Enabled Data Collector和MongoDB 之间启用SSL/TLS 。

    如果MongoDB证书是由私有CA签名的,或者不受默认Java信任库信任的,则还必须在SDC_JAVA_OPTS环境变量中定义信任库文件和密码,如启用S​​SL/TLS中所述。

    SSL Invalid Host Name Allowed 指定在SSL/TLS证书中是否允许使用无效的主机名。