Elasticsearch

支持的管道类型:

  •  Data Collector

Elasticsearch目标端将数据写入到Elasticsearch集群,包括Elastic Cloud集群(以前称为Found集群)和Amazon Elasticsearch Service集群。目标使用Elasticsearch HTTP模块访问Bulk API,并将每条记录作为文档写入Elasticsearch。

配置Elasticsearch目标端时,将配置集群名称,HTTP URI和与文档相关的信息。

当Data Collector与Elasticsearch集群共享同一网络时,您可以输入一个或多个节点URI,并自动检测集群上的其他Elasticsearch节点。

Elasticsearch目标端可以使用在sdc.operation.type记录头属性中定义的CRUD操作来写入数据。您可以为没有头属性或值的记录定义默认操作。您还可以配置如何处理不受支持的操作的记录。 有关Data Collector变更数据处理以及启用CDC的源端的列表的信息,请参见处理变更数据。

您还可以根据需要添加高级Elasticsearch属性。

安全

为Elasticsearch集群启用安全性后,您必须指定身份验证方法:

Basic
对Amazon Elasticsearch Service以外的Elasticsearch集群使用基本身份验证。使用基本身份验证,源将通过Elasticsearch用户名和密码。
AWS Signature V4
对Amazon Elasticsearch Service中的Elasticsearch集群使用AWS Signature V4身份验证。源必须使用Amazon Web Services凭据签署HTTP请求。 有关详细信息,请参阅 Amazon Elasticsearch Service文档。使用以下方法之一来使用AWS凭证进行签名:

  IAM role
当Data Collector在Amazon EC2实例上运行时,您可以使用AWS管理控制台为EC2实例配置IAM角色。Data Collector使用IAM实例配置文件凭证自动连接到AWS。
要使用IAM角色,请不要配置“Access Key ID”和“Secret Access Key”属性
有关将IAM角色分配给EC2实例的更多信息,请参阅Amazon EC2文档。
  AWS access key pair
Data Collector未在Amazon EC2实例上运行或EC2实例不具有IAM角色时,您必须配置“Access Key ID”和“Secret Access Key”属性。
提示:为了保护敏感信息,例如访问密钥对,可以使用运行时资源或凭据存储。
提示:为了保护敏感信息,例如用户名和密码或访问密钥对,可以使用运行时资源或凭据存储。

时间基准和基于时间的索引

时间基准是Elasticsearch目标端将记录写入基于时间的索引所用的时间。当索引没有时间成分时,可以忽略时间基准属性。

您可以将处理时间或与数据关联的时间用作时间基准。

例如,假设您使用以下日期时间变量定义Index属性:

logs-${YYYY()}-${MM()}-${DD()}

如果使用处理时间作为时间基准,则目标将根据记录处理的时间将记录写入索引。如果使用与数据相关联的时间(例如事务时间戳记),那么目标将根据该时间戳记将记录写入索引。

您可以使用以下时间作为时间基准:

处理时间
使用处理时间作为时间基准时,目标端将根据处理时间和索引写入索引。要将处理时间用作时间基准,请使用以下表达式:

${time:now()}

这是默认的时间基准。

记录时间
当您使用与记录关联的时间作为时间基准时,您可以在记录中指定日期字段。目标端根据与记录关联的日期时间将数据写入索引。
要使用与记录关联的时间,请使用一个表达式,该表达式调用一个字段并解析为日期时间值,例如 ${record:value("/Timestamp")}

文件编号

在适当的时候,您可以指定一个定义文档ID的表达式。当您不指定表达式时,Elasticsearch会为每个文档生成ID。

在配置目标以执行创建,更新或删除操作时,必须定义文档ID。

例如,为了执行更新基于雇员ID字段使用标识文件,定义了写操作,更新并定义文档ID如下: ${record:value('/EmployeeID')}

您还可以为每个文档定义父ID,以在同一索引中的文档之间定义父/子关系。

定义CRUD操作

Elasticsearch目标端可以创建,更新,删除或索引数据。目标根据CRUD操作头属性或与操作相关的阶段属性中定义的CRUD操作写入记录。

您可以通过以下方式定义CRUD操作:

CRUD记录头属性
您可以在CRUD操作记录头属性中定义CRUD操作。目标端在sdc.operation.type记录头属性中寻找要使用的CRUD操作 。
该属性可以包含以下数值之一:

  • 1为CREATE,相当于INSERT
  • 2为DELETE
  • 3为UPDATE
  • 4为INDEX,相当于UPSERT
  • 8为使用doc_as_upsertUPDATE,等于MERGE
如果您的管道包括启用CRUD处理变更数据的源端,则目标端仅从源端生成的头属性sdc.operation.type中读取操作类型 。如果您的管道使用非CDC源,则可以使用Expression Evaluator或脚本处理器来定义记录头属性。有关Data Collector变更数据处理以及启用CDC的源端的列表的详细信息,请参阅处理变更数据。
操作阶段属性
您在目标端属性中定义一个默认操作。sdc.operation.type未设置记录头属性时,目标端使用默认操作 。
您还可以定义如何使用sdc.operation.type头属性中定义的不受支持的操作来处理记录 。目标端可以丢弃它们,将它们发送给错误,或使用默认操作。

配置Elasticsearch目标端

配置Elasticsearch目标端以将数据写入Elasticsearch集群。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    Required Fields 必须包含用于将记录传递到阶段的记录的数据的字段。

    提示:您可能包括阶段使用的字段。

    根据为管道配置的错误处理,处理不包括所有必填字段的记录。

    Preconditions 必须评估为TRUE的条件才能使记录进入处理阶段。单击 添加以创建其他前提条件。

    根据为该阶段配置的错误处理,处理不满足所有前提条件的记录。

    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。对集群管道无效。
  2. Elasticsearch选项卡上,配置以下属性:
    Elasticsearch属性 描述
    Cluster HTTP URI 用于连接到集群的HTTP URI。使用以下格式:

    <host>:<port>
    Additional HTTP Params 您想要作为查询字符串参数发送到Elasticsearch的其他HTTP参数。输入Elasticsearch期望的确切参数名称和值。
    Detect Additional Nodes in Cluster 根据配置的集群URI检测集群中的其他节点。

    选择此属性等同于将client.transport.sniff Elasticsearch属性设置为true。

    仅在Data Collector与Elasticsearch集群共享同一网络时使用。不要用于Elastic Cloud或Docker集群。

    Use Security 指定是否在Elasticsearch集群上启用安全性。
    Time Basis 用于写入基于时间的索引的时间基准。使用以下表达式之一:

    • ${time:now()}-使用处理时间作为时间基准。处理时间是与运行管道的数据收集器相关的时间。
    • 该表达式调用一个字段并解析为日期时间值,例如${record:value(<date field path>)}。使用datetime结果作为时间基准。

    如果Index属性不包含datetime变量,则可以忽略此属性。

    默认值为${time:now()}

    Data Time Zone 目标系统的时区。用于解析基于时间的索引中的日期时间。
    Index 生成文档的索引。输入索引名称或计算结果为该索引名称的表达式。

    例如,如果输入customer作为索引,则目标将在customer索引中写入文档 。

    如果在表达式中使用datetime变量,请确保适当配置时间基准。有关日期时间变量的详细信息,请参见日期时间变量。

    Mapping 生成文档的映射类型。输入映射类型,计算结果为该映射类型的表达式或包含该映射类型的字段。

    例如,如果输入user作为映射类型,则目标端将使用user映射类型写入文档 。

    Document ID 该表达式的计算结果为生成的文档的ID。当您不指定ID时,Elasticsearch为每个文档创建一个ID。

    默认情况下,目标端允许Elasticsearch创建ID。

    Parent ID 生成的文档的可选父ID。输入父ID或计算结果为父ID的表达式。

    用于在同一索引中的文档之间建立父子关系。

    Routing 生成的文档的可选自定义路由值。输入路由值或计算结果为该路由值的表达式。

    Elasticsearch根据为文档定义的路由值将文档路由到索引中的特定分片。您可以为每个文档定义一个自定义值。如果您未定义自定义路由值,Elasticsearch将使用父ID(如果已定义)或文档ID作为路由值。

    Data Charset

    要处理的数据的字符编码。

    Default Operation 如果sdc.operation.type未设置记录头属性,则执行默认的CRUD操作。
    Unsupported Operation Handling sdc.operation.type不支持在记录头属性中定义的CRUD操作类型时采取的措施 :

    • Discard-放弃记录。
    • Send to Error-将记录发送到管道以进行错误处理。
    • Use Default Operation-使用默认操作将记录写入目标系统。
    Additional Properties 要包含在操作语句中的其他字段。以JSON格式指定。

    例如,您可以使用该 _retry_on_conflict字段指定在版本冲突时重试更新的次数。要指定三个重试,包括以下内容:

    "_retry_on_conflict" : 3

    有关更多信息,请参阅Elasticsearch文档

  3. 如果启用了安全性,请在“Security选项卡上配置以下属性:
    Security属性 描述
    Mode 使用的身份验证方法:

    • Basic-使用Elasticsearch用户名和密码进行身份验证。为Amazon Elasticsearch Service之外的Elasticsearch集群选择此选项。
    • AWS Signature V4-通过AWS进行身份验证。为Amazon Elasticsearch Service中的Elasticsearch集群选择此选项。
    Security Username/Password Elasticsearch用户名和密码。

    使用以下语法输入用户名和密码:

    <username>:<password>

    使用基本身份验证时可用。

    Region 托管Elasticsearch域的Amazon Web Services区域。

    使用AWS Signature V4身份验证时可用。

    Access Key ID AWS访问密钥ID。不将IAM角色与IAM实例配置文件凭据一起使用时是必需的。

    使用AWS Signature V4身份验证时可用。

    Secret Access Key AWS秘密访问密钥。不将IAM角色与IAM实例配置文件凭据一起使用时是必需的。

    使用AWS Signature V4身份验证时可用。

    SSL Truststore Path 信任库文件的位置。

    配置此属性等效于配置shield.ssl.truststore.path Elasticsearch属性。

    对于弹性云集群而言不是必需的。

    SSL Truststore Password 信任库文件的密码。

    配置此属性等效于配置shield.ssl.truststore.password Elasticsearch属性。

    对于Elastic Cloud集群而言不是必需的。