MongoDB Oplog

支持的管道类型:

  •  Data Collector

MongoDB Oplog源从MongoDB Oplog读取条目。

MongoDB将有关数据库更改的信息存储在称为Oplog的本地限定集合中。Oplog包含有关数据更改以及数据库更改的信息。MongoDB Oplog源可以读取任何写入Oplog的操作。

使用MongoDB Oplog源捕获数据或数据库中的更改。要处理写入标准有上限或无上限集合的MongoDB数据,请使用MongoDB源。

MongoDB Oplog源的记录头属性中包括CRUD操作类型,因此生成的记录可以由启用CRUD的目标轻松处理。有关Data Collector变更数据处理的概述以及启用CRUD的目标的列表,请参阅处理变更数据。

配置MongoDB Oplog源时,将配置连接信息,例如连接字符串和MongoDB凭据。您还定义了一个可选的时间戳记和顺序号,以指定从何处开始读取,要处理的操作以及读取首选项。

您可以选择配置高级选项,这些选项可以确定源如何连接到MongoDB,包括启用SSL/TLS。

注意: StreamSets已使用MongoDB 4.0测试了此阶段。

凭据

根据MongoDB服务器使用的身份验证,将源端配置为不使用身份验证、用户名/密码身份验证或LDAP身份验证。使用用户名/密码身份验证时,还可以使用委托身份验证。

默认情况下,源端不使用身份验证。

要使用用户名/密码或LDAP身份验证,请通过以下方式之一输入所需的凭据:

MongoDB选项卡中的连接字符串
在MongoDB选项卡的连接字符串中输入凭据。
要输入用于用户名/密码身份验证的凭据,请在主机名之前输入用户名和密码。使用以下格式:

mongodb://username:password@host[:port][/[database][?options]]
要输入用于LDAP身份验证的凭据,请在主机名之前输入用户名和密码,并将authMechanism选项设置为PLAIN。使用以下格式:

mongodb://username:password@host[:port][/[database]?authMechanism=PLAIN
Credentials选项卡
在“Credentials”选项卡中选择“Username/Password”或“LDAP authentication”类型。然后输入身份验证类型的用户名和密码。

如果您同时在连接字符串和“Credentials”选项卡中输入凭据,则“Credentials”选项卡优先。

Oplog时间戳和顺序

当您启动管道时,默认情况下,MongoDB Oplog起点从Oplog的开头开始读取。您可以配置时间戳记和顺序来指定要在哪里开始处理。

MongoDB Oplog条目包含一个名为“ts”的时间戳字段,该字段由时间戳和顺序组成,如下所示:

"ts": Timestamp(<timestamp>, <ordinal>)

时间戳格式是自Unix时代以来的秒数,例如1412180887。

序数(ordinal)是一个整数计数器,用于区分同一秒内出现的条目。

您可以使用时间戳和序数来指定从Oplog开始读取的位置。使用时间戳时,还必须定义一个序数。

有关Oplog时间戳字段的更多信息,请参阅MongoDB文档。

读取偏好

您可以配置MongoDB Oplog源使用的读取偏好。

读取偏好确定源如何从MongoDB副本集的不同成员读取数据。

您可以使用以下MongoDB读取偏好:

  • Primary-要求从主成员读取。
  • Primary Preferred-首选从主成员读取,但允许从辅助成员读取。
  • Secondary-要求从辅助成员读取。
  • Secondary Preferred-首选从辅助成员读取,但在必要时允许从主成员读取。
  • Nearest-从具有最小网络延迟的成员读取。

默认情况下,源使用“Secondary Preferred”以避免对主成员进行不必要的请求。

生成的记录

MongoDB Oplog源基于MongoDB Oplog的数据生成记录,并添加CRUD和CDC相关的记录头属性。

Oplog记录的结构是唯一的,因此在必要时,您可以在管道中使用某些处理器来转换记录结构。

例如,对于插入记录,记录数据驻留在名为“o”的映射字段中。但是对于更新记录,_id字段是o2 map字段的一部分。要合并记录数据,可以使用Field Flattener来展平地图字段,并使用Field Remover来删除所有不必要的字段。

有关Oplog记录结构的更多信息,请参阅MongoDB文档。以下站点也是不错的资源:https : //www.compose.com/articles/the-mongodb-oplog-and-node-js/。

CRUD操作和CDC头属性

MongoDB Oplog源在sdc.operation.type记录头属性中包括CRUD操作类型。

如果您在诸如JDBC Producer或Elasticsearch之类的管道中使用启用了CRUD的目标端,则目标端在写入目标系统时可以使用操作类型。必要时,可以使用Expression Evaluator或脚本处理器来处理sdc.operation.type头属性中的值 。有关Data Collector变更数据处理的概述以及启用CRUD的目标的列表,请参阅处理变更数据。

MongoDB Oplog起源在sdc.operation.type记录标头属性中使用以下值表示操作类型:

  • 1为INSERT
  • 2为DELETE
  • 3为UPDATE
  • 5,用于不支持的操作,例如CMD,NOOP或DB,它们是可用的MongoDB操作类型,但不适用于记录数据。

当信息相关时,MongoDB Oplog源还可以包括以下头属性:

  • op – CRUD操作使用以下值:
    • i 为 INSERT
    • u 为 UPDATE
    • d 为 DELETE
  • ns – 命名空间,使用以下格式: <database>:<tablename>

启用S​​SL/TLS

您可以启用MongoDB Oplog源以使用SSL/TLS连接到MongoDB。

  1. 在该阶段的“Advanced选项卡中,选择“SSL Enabled属性。
  2. 如果MongoDB证书由私有CA签名或不受默认Java信任库信任,请创建自定义信任库文件或修改默认Java信任库文件的副本以将CA添加到该文件中。然后,将数据收集器配置为使用修改后的信任库文件。

    默认情况下,Data Collector使用$JAVA_HOME/jre/lib/security/cacerts的Java信任库文件 。如果您的证书是由默认Java信任库文件中包含的CA签名的,则无需创建信任库文件,可以跳过此步骤。

    在这些步骤中,我们展示了如何修改默认的信任库文件,以将其他CA添加到受信任的CA列表中。如果您希望创建自定义信任库文件,请参阅keytool文档。

    注意:如果已经将Data Collector配置为使用自定义信任库文件来启用HTTPS或到LDAP服务器的安全连接,则只需将此附加CA添加到相同的修改后的信任库文件中即可。
    1. 使用以下命令来设置JAVA_HOME环境变量:
      export JAVA_HOME=<Java home directory>
    2. 使用以下命令来设置SDC_CONF环境变量:
      export SDC_CONF=<Data Collector configuration directory>

      例如,对于RPM安装,请使用:

      export SDC_CONF=/etc/sdc
    3. 使用以下命令将默认的Java truststore文件复制到Data Collector 配置目录:
      cp "${JAVA_HOME}/jre/lib/security/cacerts" "${SDC_CONF}/truststore.jks"
    4. 使用以下keytool命令将CA证书导入到信任库文件中:
      keytool -import -file <MongoDB certificate> -trustcacerts -noprompt -alias <MongoDB alias> -storepass <password> -keystore "${SDC_CONF}/truststore.jks"

      例如:

      keytool -import -file  myMongoDB.pem -trustcacerts -noprompt -alias MyMongoDB -storepass changeit -keystore "${SDC_CONF}/truststore.jks"
    5. 在SDC_JAVA_OPTS环境变量中定义以下选项:
      • javax.net.ssl.trustStore– Data Collector计算机上信任库文件的路径。
      • javax.net.ssl.trustStorePassword -信任库密码。

      使用安装类型所需的方法来修改环境变量。

      例如,如下定义选项:

      export SDC_JAVA_OPTS="${SDC_JAVA_OPTS} -Djavax.net.ssl.trustStore=/etc/sdc/truststore.jks -Djavax.net.ssl.trustStorePassword=mypassword -Xmx1024m -Xms1024m -server -XX:-OmitStackTraceInFastThrow"

      或者,为避免在导出命令中保存密码,请将密码保存在文本文件中,然后按如下所示定义truststore password选项: -Djavax.net.ssl.trustStorePassword = $(cat passwordfile.txt)

      然后,确保密码文件仅可由执行导出命令的用户读取。

    6. 重新启动Data Collector 以启用更改。

配置MongoDB Oplog源

配置MongoDB Oplog源以从MongoDB Oplog读取数据。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。
  2. 在“MongoDB”选项卡上,
  3. 配置以下属性:
    MongoDB属性 描述
    Connection String

    MongoDB实例的连接字符串。使用以下格式:

    mongodb://host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

    连接到集群时,输入其他节点信息以确保连接。

    如果MongoDB服务器使用用户名/密码或LDAP身份验证,可以在连接字符串中的凭据,如“凭据”中描述的。

    Enable Single Mode 选择以连接到单个MongoDB服务器或节点。如果在连接字符串中定义了多个节点,则该阶段仅连接到第一个节点。

    请谨慎使用此选项。如果阶段无法连接或连接失败,则管道将停止。

    Database MongoDB数据库的名称。
    Collection 要使用的MongoDB集合的名称。
    Capped Collection 该集合已设置上限。清除此选项以读取未加盖的集合。
    Initial Offset 用于开始读取的初始偏移量。当使用日期或对象ID字段作为偏移字段时,请输入具有以下格式的时间戳:YYYY-MM-DD hh:mm:ss

    使用字符串字段时,输入要使用的字符串。

    默认值为:2015-01-01 00:00:00

    Offset Field Type 偏移字段的数据类型:

    • ObjectId-用于对象ID字段。
    • Data-用于日期字段。
    • String-用于字符串字段。

    默认值为ObjectId。

    Offset Field  用于跟踪读取的字段。默认值为_id字段。

    您可以使用嵌套的偏移量字段,例如o._id。您还可以使用任何对象ID,日期或字符串字段。除_id字段外,不保证任何结果。

    Batch Size (records) 批次中允许的最大记录数。
    Max Batch Wait Time  在发送空批次之前,源端将等待填充批次的时间。
    Read Preference  确定源端如何从MongoDB副本集的不同成员读取数据。
  4. 要与MongoDB连接字符串分开输入凭据,请单击“Credentials选项卡并配置以下属性:
    证书 描述
    Authentication Type MongoDB服务器使用的身份验证:Username/Password或LDAP。
    Username MongoDB或LDAP用户名。
    Password MongoDB或LDAP密码。

    提示: 要保护敏感信息(例如用户名和密码),可以使用 运行时资源或凭据存储。
    Authentication Source 可选的备用数据库名称,用于执行委托的身份验证。

    可用于“Username/Password”选项。

  5. (可选)单击“Advanced选项卡以配置源如何连接到MongoDB。

    这些属性的默认值在大多数情况下都应该起作用:

    高级属性 描述
    Connections Per Host 每个主机的最大连接数。

    默认值为100。

    Min Connections Per Host 每个主机的最小连接数。

    默认值为0。

    Connection Timeout 等待连接的最长时间(以毫秒为单位)。

    默认值为10,000。

    Max Connection Idle Time 池化连接可以保持空闲状态的最长时间(以毫秒为单位)。当池化连接超过空闲时间时,连接将关闭。使用0退出此属性。

    默认值为0。

    Max Connection Lifetime 池化连接可以活动的最长时间(以毫秒为单位)。当池化连接超过生存期时,该连接将关闭。使用0退出此属性。

    默认值为0。

    Max Wait Time 线程可以等待连接可用的最长时间(以毫秒为单位)。使用0退出此属性。使用负值无限期等待。

    默认值为120,000。

    Server Selection Timeout Data Collector在引发异常之前等待服务器选择的最长时间(以毫秒为单位)。如果使用0,则在没有可用服务器的情况下立即引发异常。使用负值无限期等待。

    默认值为30,000。

    Threads Allowed to Block for Connection Multiplier 乘数,确定可以等待池中的连接可用的最大线程数。此数字乘以“Connections Per Host”值确定最大线程数。

    默认值为5。

    Heartbeat Frequency Data Collector尝试确定集群中每个服务器的当前状态的频率(以毫秒为单位)。

    默认值为10,000。

    Min Heartbeat Frequency 最小心跳频率(以毫秒为单位)。在检查每个服务器的状态之前,Data Collector至少要等待这么长时间。

    默认值为500。

    Heartbeat Connection Timeout 等待用于群集心跳的连接的最长时间(以毫秒为单位)。

    默认值为20,000。

    Heartbeat Socket Timeout 用于集群心跳的连接的套接字超时的最长时间(以毫秒为单位)。

    默认值为20,000。

    Local Threshold 本地阈值(以毫秒为单位)。将请求发送到其ping时间小于或等于具有最快ping时间加上本地阈值的服务器的服务器。

    默认值为15。

    Required Replica Set Name 用于集群的必需副本集名称。
    Cursor Finalizer Enabled 指定是否启用游标终结器。
    Socket Keep Alive 指定是否启用套接字保持活动状态。
    Socket Timeout 套接字超时的最长时间(以毫秒为单位)。使用0退出此属性。

    默认值为0。

    SSL Enabled Data Collector和MongoDB 之间启用SSL/TLS 。

    如果MongoDB证书是由私有CA签名的,或者不受默认Java信任库信任的,则还必须在SDC_JAVA_OPTS环境变量中定义信任库文件和密码,如启用S​​SL/TLS中所述。

    SSL Invalid Host Name Allowed 指定在SSL/TLS证书中是否允许使用无效的主机名。