Redis Consumer

支持的管道类型:

  •  Data Collector

Redis Consumer源从Redis读取消息。

配置Redis Consumer时,您将定义Redis URI和要读取的通道。您可以指定特定频道或提供模式以订阅所有匹配的频道。您还可以配置连接超时。

通道与格式

您可以指定通道和格式来定义Redis Consumer来源处理的数据。

您可以列出单个通道和glob样式的格式。定义模式时,源端将从名称与该格式匹配的所有通道中读取。例如,* log*格式订阅通道名称中带有“ log”的任何通道。

为避免处理重复的消息,请勿多次订阅同一频道。Redis允许同一客户端多次读取消息。

数据格式

Redis Consumer源基于数据格式对数据进行不同的处理。源可以处理以下类型的数据:

Avro
为每个Avro记录生成一条记录。每个小数字段都包含 precisionscale 字段属性。
该阶段在avroSchema 记录头属性中包括Avro模式 。您可以使用以下方法之一来指定Avro模式定义的位置:

  • Message/Data Includes Schema -在文件中使用模式。
  • In Pipeline Configuration -使用您在阶段配置属性中提供的模式。
  • Confluent Schema Registry-从Confluent Schema Registry检索模式。Confluent Schema Registry是Avro模式的分布式存储层。您可以配置阶段以通过阶段配置中指定的模式ID或主题在Confluent Schema Registry中查找模式。
在阶段配置中使用模式或从Confluent Schema Registry检索模式会覆盖文件中可能包含的任何模式,并可以提高性能。
Binary
生成一条记录,在记录的根部有一个单字节数组字段。
当数据超过用户定义的最大数据大小时,源端将无法处理数据。因为未创建记录,所以源端无法将记录传递到管道以将其写为错误记录。相反,源端会产生阶段错误。
Delimited
为每个分隔的行生成一条记录。您可以使用以下分隔格式类型:

  • Default CSV-包含逗号分隔值的文件。忽略文件中的空行。
  • RFC4180 CSV-严格遵循RFC4180准则的逗号分隔文件。
  • MS Excel CSV -Microsoft Excel逗号分隔文件。
  • MySQL CSV -MySQL逗号分隔文件。
  • Tab-Separated Values -包含制表符分隔的值的文件。
  • PostgreSQL CSV -PostgreSQL逗号分隔文件。
  • PostgreSQL文本 -PostgreSQL文本文件。
  • Custom -使用用户定义的分隔符,转义符和引号字符的文件。
  • Multi Character Delimited-使用多个用户定义的字符分隔字段和行以及单个用户定义的转义和引号字符的文件。
您可以将列表或列表映射根字段类型用于分隔数据,并可以选择在标题行中包括字段名称(如有)。有关根字段类型的更多信息,请参见分隔数据根字段类型。
使用标题行时,可以启用对额外列的记录处理。额外列使用自定义的前缀和顺序递增的顺序整数,如命名 _extra_1, _extra_2。当您禁止额外列时,包含额外列的记录将发送到错误。
您也可以将字符串常量替换为空值。
当一条记录超过为该阶段定义的最大记录长度时,该阶段将根据为该阶段配置的错误处理来处理对象。
JSON
为每个JSON对象生成一条记录。您可以处理包含多个JSON对象或单个JSON数组的JSON文件。
当对象超过为源端定义的最大对象长度时,源端会根据为阶段配置的错误处理来处理对象。
Log
为每个日志行生成一条记录。
当一行超过用户定义的最大行时,源端将截断更长的行。
您可以将处理后的日志行作为字段包含在记录中。如果日志行被截断,并且您在记录中请求日志行,则源端包括被截断的行。
您可以定义要读取的日志格式或类型。
Protobuf
为每个protobuf消息生成一条记录。
Protobuf消息必须与指定的消息类型匹配,并在描述符文件中进行描述。
当记录数据超过1 MB时,源端将无法继续处理文件中的数据。源端根据文件错误处理属性处理文件,并继续读取下一个文件。
有关生成描述符文件的信息,请参阅Protobuf数据格式先决条件。
SDC Record
为每条记录生成一条记录。用于处理由Data Collector管道使用SDC记录数据格式生成的记录。
对于错误记录,源端提供从原始管道中的源端读取的原始记录,以及可用于更正记录的错误信息。
处理错误记录时,源端希望原始管道生成的错误文件名和内容。
Text
根据自定义分隔符为每行文本或每段文本生成一条记录。
当一条线或一段超出为源端定义的最大线长时,源端会截断它。源端添加了一个名为Truncated的布尔字段,以指示该行是否被截断。
有关使用自定义分隔符处理文本的更多信息,请参见使用自定义分隔符的文本数据格式。
XML
根据用户定义的分隔符元素生成记录。在根元素下直接使用XML元素或定义简化的XPath表达式。如果未定义分隔符元素,则源端会将XML文件视为单个记录。
默认情况下,生成的记录包括XML属性和名称空间声明作为记录中的字段。您可以配置阶段以将它们包括在记录中作为字段属性。
您可以在字段属性中包含每个解析的XML元素和XML属性的XPath信息。这还将每个名称空间放在xmlns记录头属性中。
注意: 只有在目标中使用SDC RPC数据格式时,字段属性和记录头属性才会自动写入目标系统。有关使用字段属性和记录头属性以及如何将它们包括在记录中的更多信息,请参见字段属性和记录头属性。
当记录超过用户定义的最大记录长度时,源端将跳过该记录并继续处理下一条记录。它将跳过的记录发送到管道以进行错误处理。
使用XML数据格式来处理有效的XML文档。有关XML处理的更多信息,请参见阅读和处理XML数据。
提示: 如果要处理无效的XML文档,则可以尝试将文本数据格式与自定义分隔符一起使用。有关更多信息,请参见 使用自定义分隔符处理XML数据。

配置Redis Consumer源

配置Redis Consumer源以从Redis读取消息。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。
  2. 在“Redis”选项卡上,配置以下属性:
    Redis属性 描述
    URI Redis服务器的URI。使用以下格式:

    redis://<host name>:<port number>/<database>

    如果服务器使用默认数据库,则可以省略数据库。

    您可以选择包括密码以登录到Redis服务器。例如:

    redis://:<password>@<host name>:<port number>/<database>
    Channels 要使用的通道名称。使用简单或批量编辑模式,单击 添加图标以从其他通道读取。
    Pattern 使用glob样式的图案。源端订阅的频道名称与格式匹配。使用简单或批量编辑模式,单击 添加图标以读取其他格式。
    Connection Timeout 是时候等待与Redis的连接了。
    Batch Wait Time (ms) 发送部分或空批次之前要等待的毫秒数。
    Max Batch Size (records) 一次处理的最大记录数。接受的值最高为Data Collector的最大批处理大小。

    默认值是1000。

  3. 在“Data Format选项卡上,配置以下属性:
    数据格式属性 描述
    Data Format 要读取的数据类型。使用以下数据格式之一:

    • Avro
    • Binary
    • Delimited
    • JSON
    • Log
    • Protobuf
    • SDC Record
    • Text
    • XML
  4. 对于Avro数据,在“Data Format选项卡上,配置以下属性:
    Avro属性 描述
    Avro Schema Location 处理数据时要使用的Avro模式定义的位置:

    • Message/Data Includes Schema-在文件中使用模式。
    • In Pipeline Configuration-使用阶段配置中提供的模式。
    • Confluent Schema Registry-从Confluent Schema Registry检索模式。

    在阶段配置中或在Confluent Schema Registry中使用模式可以提高性能。

    Avro Schema 用于处理数据的Avro模式定义。覆盖与数据关联的任何现有模式定义。

    您可以选择使用该 runtime:loadResource函数来加载存储在运行时资源文件中的模式定义。

    Schema Registry URLs 用于查找模式的Confluent Schema Registry URL。要添加URL,请单击添加,然后以以下格式输入URL:

    http://<host name>:<port number>
    Basic Auth User Info 使用基本身份验证时连接到Confluent Schema Registry所需的用户信息。

    schema.registry.basic.auth.user.info使用以下格式从Schema Registry中的设置中输入密钥和机密 :

    <key>:<secret>
    提示: 要保护敏感信息(例如用户名和密码),可以使用运行时资源或凭据存储。
    Lookup Schema By 在Confluent Schema Registry中查找模式的方法:

    • Subject-查找指定的Avro模式主题。
    • Schema ID-查找指定的Avro schema ID。

    覆盖与数据关联的任何现有模式定义。

    Schema Subject Avro模式需要在Confluent Schema Registry中查找。

    如果指定的主题具有多个模式版本,则源使用该主题的最新模式版本。要使用旧版本,请找到相应的模式ID,然后将“Look Up Schema By属性设置为“Schema ID”。

    Schema ID 在Confluent Schema Registry中查找的Avro模式ID。
  5. 对于二进制数据,请在“Data Format选项卡上并配置以下属性:
    二进制属性 描述
    Max Data Size (bytes) 消息中的最大字节数。较大的消息无法处理或写入错误。
  6. 对于分隔数据,在“Data Format选项卡上,配置以下属性:
    分隔属性 描述
    Compression Format 文件的压缩格式:

    • None-仅处理未压缩的文件。
    • Compressed File-处理受支持的压缩格式压缩的文件。
    • Archive-处理通过支持的存档格式存档的文件。
    • Compressed Archive-处理通过支持的存档和压缩格式存档和压缩的文件。
    File Name Pattern within Compressed Directory 对于归档文件和压缩归档文件,使用文件名模式表示要在压缩目录中处理的文件。您可以使用UNIX样式的通配符,例如星号或问号。例如,*.json。

    默认值为*,它处理所有文件。

    Delimiter Format Type 分隔符格式类型。使用以下选项之一:

    • Default CSV-包含逗号分隔值的文件。忽略文件中的空行。
    • RFC4180 CSV-严格遵循RFC4180准则的逗号分隔文件。
    • MS Excel CSV -Microsoft Excel逗号分隔文件。
    • MySQL CSV -MySQL逗号分隔文件。
    • Tab-Separated Values -包含制表符分隔的值的文件。
    • PostgreSQL CSV -PostgreSQL逗号分隔文件。
    • PostgreSQL Text -PostgreSQL文本文件。
    • Custom -使用用户定义的分隔符,转义符和引号字符的文件。
    • Multi Character Delimited-使用多个用户定义的字符分隔字段和行以及单个用户定义的转义和引号字符的文件。
    Header Line 指示文件是否包含标题行以及是否使用标题行。
    Allow Extra Columns 使用标题行处理数据时,允许处理的记录列数超过标题行中的列数。
    Extra Column Prefix 用于任何其他列的前缀。额外的列使用前缀和顺序递增的整数来命名,如下所示: <prefix><integer>

    例如,_extra_1。默认值为 _extra_

    Max Record Length (chars) 记录的最大长度(以字符为单位)。较长的记录不会被读取。

    此属性可以受数据收集器解析器缓冲区大小的限制。有关更多信息,请参见最大记录大小。

    Delimiter Character 自定义分隔符格式的分隔符。选择一个可用选项,或使用“其他”输入自定义字符。

    您可以输入使用格式为Unicode控制符\uNNNN,其中N是数字0-9或字母AF十六进制数字。例如,输入 \u0000以使用空字符作为分隔符,或 \u2028使用行分隔符作为分隔符。

    默认为竖线字符(|)。

    Multi Character Field Delimiter 用于分隔多字符分隔符格式的字段的字符。

    默认值为两个竖线字符(||)。

    Multi Character Line Delimiter 以多字符分隔符格式分隔行或记录的字符。

    默认值为换行符(\ n)。

    Escape Character 自定义字符或多字符分隔符格式的转义字符。
    Quote Character 自定义或多字符分隔符格式的引号字符。
    Enable Comments 自定义分隔符格式允许注释的数据被忽略。
    Comment Marker 为自定义分隔符格式启用注释时,标记注释的字符。
    Ignore Empty Lines 对于自定义分隔符格式,允许忽略空行。
    Root Field Type 要使用的根字段类型:

    • List-Map-生成数据索引列表。使您能够使用标准功能来处理数据。用于新管道。
    • List-生成带有索引列表的记录,该列表带有标头和值的映射。需要使用分隔数据功能来处理数据。仅用于维护在1.1.0之前创建的管道。
    Lines to Skip 读取数据前要跳过的行数。
    Parse NULLs 将指定的字符串常量替换为空值。
    NULL Constant 代表空值的字符串常量。
    Charset 要处理的文件的字符编码。
    Ignore Control Characters 除去制表符,换行符和回车符以外的所有ASCII控制字符。
  7. 对于JSON数据,在“Data Format选项卡上,配置以下属性:
    JSON属性 描述
    Compression Format 文件的压缩格式:

    • None-仅处理未压缩的文件。
    • Compressed File-处理受支持的压缩格式压缩的文件。
    • Archive-处理通过支持的存档格式存档的文件。
    • Compressed Archive-处理通过支持的存档和压缩格式存档和压缩的文件。
    File Name Pattern within Compressed Directory 对于归档文件和压缩归档文件,使用文件名模式表示要在压缩目录中处理的文件。您可以使用UNIX样式的通配符,例如星号或问号。例如,*.json。

    默认值为*,它处理所有文件。

    JSON Content JSON内容的类型。使用以下选项之一:

    • Array of Objects
    • Multiple Objects
    Maximum Object Length (chars) JSON对象中的最大字符数。

    较长的对象将转移到管道以进行错误处理。

    此属性可以受Data Collector解析器缓冲区大小的限制。有关更多信息,请参见最大记录大小。

    Charset 要处理的文件的字符编码。
    Ignore Control Characters 除去制表符,换行符和回车符以外的所有ASCII控制字符。
  8. 对于日志数据,在“Data Format选项卡上,配置以下属性:
    日志属性 描述
    Log Format 日志文件的格式。使用以下选项之一:

    • 通用日志格式
    • 合并日志格式
    • Apache错误日志格式
    • Apache访问日志自定义格式
    • 正则表达式
    • Grok模式
    • Log4j
    • 通用事件格式(CEF)
    • 日志事件扩展格式(LEEF)
    Max Line Length 日志行的最大长度。源端将截断较长的行。

    此属性可以受数据收集器解析器缓冲区大小的限制。有关更多信息,请参见最大记录大小。

    Retain Original Line 确定如何处理原始日志行。选择将原始日志行作为字段包含在结果记录中。

    默认情况下,原始行被丢弃。

    Charset 要处理的文件的字符编码。
    Ignore Control Characters 除去制表符,换行符和回车符以外的所有ASCII控制字符。
    • 当选择“Apache访问日志自定义格式”时,请使用Apache日志格式字符串定义“Custom Log Format
    • 选择“正则表达式”时,输入描述日志格式的正则表达式,然后将要包括的字段映射到每个正则表达式组。
    • 选择Grok Pattern时,可以使用 Grok Pattern Definition字段定义自定义grok模式。您可以在每行上定义一个模式。

      在 Grok Pattern字段中,输入用于解析日志的模式。您可以使用预定义的grok模式,也可以使用Grok Pattern Definition中定义的模式创建自定义grok模式 。

      有关定义grok模式和支持的grok模式的更多信息,请参见定义Grok模式。

    • 选择Log4j时,定义以下属性:
      Log4j属性 描述
      On Parse Error 确定如何处理无法解析的信息:

      • Skip and Log Error-跳过读取行并记录阶段错误。
      • Skip, No Error-跳过读取行并且不记录错误。
      • Include as Stack Trace-包含无法解析为先前读取的日志行的堆栈跟踪的信息。该信息将添加到最后一个有效日志行的消息字段中。
      Use Custom Log Format 允许您定义自定义日志格式。
      Custom Log4J Format 使用log4j变量定义自定义日志格式。
  9. 对于protobuf数据,在“Data Format选项卡上,配置以下属性:
    Protobuf属性 描述
    Compression Format 文件的压缩格式:

    • None-仅处理未压缩的文件。
    • Compressed File-处理受支持的压缩格式压缩的文件。
    • Archive-处理通过支持的存档格式存档的文件。
    • Compressed Archive-处理通过支持的存档和压缩格式存档和压缩的文件。
    File Name Pattern within Compressed Directory 对于归档文件和压缩归档文件,使用文件名模式表示要在压缩目录中处理的文件。您可以使用UNIX样式的通配符,例如星号或问号。例如,*.json。

    默认值为*,它处理所有文件。

    Protobuf Descriptor File 要使用的描述符文件(.desc)。描述符文件必须位于Data Collector资源目录中$SDC_RESOURCES

    有关环境变量的更多信息,请参阅《 Data Collector环境配置》。有关生成描述符文件的信息,请参阅Protobuf数据格式先决条件。

    Message Type 读取数据时使用的消息类型的全限定名称。

    使用以下格式: <package name>.<message type>

    使用在描述符文件中定义的消息类型。

    Delimited Messages 指示一个文件是否可能包含多个protobuf消息。
  10. 对于文本数据,在“Data Format选项卡上,配置以下属性:
    文字属性 描述
    Compression Format 文件的压缩格式:

    • None-仅处理未压缩的文件。
    • Compressed File-处理受支持的压缩格式压缩的文件。
    • Archive-处理通过支持的存档格式存档的文件。
    • Compressed Archive-处理通过支持的存档和压缩格式存档和压缩的文件。
    File Name Pattern within Compressed Directory 对于归档文件和压缩归档文件,使用文件名模式表示要在压缩目录中处理的文件。您可以使用UNIX样式的通配符,例如星号或问号。例如,*.json。

    默认值为*,它处理所有文件。

    Max Line Length 一行中允许的最大字符数。较长的行被截断。

    在记录中添加一个布尔字段,以指示该记录是否被截断。字段名称为“Truncated”。

    此属性可以受数据收集器解析器缓冲区大小的限制。有关更多信息,请参见最大记录大小。

    Use Custom Delimiter 使用自定义分隔符来定义记录而不是换行符。
    Custom Delimiter 用于定义记录的一个或多个字符。
    Include Custom Delimiter 在记录中包括分隔符。
    Charset 要处理的文件的字符编码。
    Ignore Control Characters 除去制表符,换行符和回车符以外的所有ASCII控制字符。
  11. 对于XML数据,在“Data Format选项卡上,配置以下属性:
    XML属性 描述
    Compression Format 文件的压缩格式:

    • None-仅处理未压缩的文件。
    • Compressed File-处理受支持的压缩格式压缩的文件。
    • Archive-处理通过支持的存档格式存档的文件。
    • Compressed Archive-处理通过支持的存档和压缩格式存档和压缩的文件。
    File Name Pattern within Compressed Directory 对于归档文件和压缩归档文件,使用文件名模式表示要在压缩目录中处理的文件。您可以使用UNIX样式的通配符,例如星号或问号。例如,*.json。

    默认值为*,它处理所有文件。

    Delimiter Element

    用于生成记录的分隔符。省略分隔符会将整个XML文档视为一条记录。使用以下之一:

    • 在根元素下方的XML元素。

      使用不带尖括号(<>)的XML元素名称。例如,用msg代替<msg>。

    • 一个简化的XPath表达式,指定要使用的数据。

      使用简化的XPath表达式访问XML文档中更深的数据或需要更复杂访问方法的数据。

      有关有效语法的更多信息,请参见简化的XPath语法。

    Include Field XPaths 在字段属性中包括每个解析的XML元素的XPath和XML属性。还包括xmlns记录头属性中的每个名称空间。

    如果未选中,则此信息不包含在记录中。默认情况下,未选择该属性。

    注意: 只有在目标中使用SDC RPC数据格式时,字段属性和记录头属性才会自动写入目标系统。有关使用字段属性和记录标题属性以及如何将它们包括在记录中的更多信息,请参见字段属性和记录标题属性。
    Namespaces 解析XML文档时使用的命名空间前缀和URI。当所使用的XML元素包含名称空间前缀或XPath表达式包含名称空间时,定义名称空间。

    有关将名称空间与XML元素一起使用的信息,请参见将XML元素与名称空间一起使用。

    有关将名称空间与XPath表达式一起使用的信息,请参阅将XPath表达式与名称空间一起使用。

    使用简单或批量编辑模式,单击添加图标以添加其他名称空间。

    Output Field Attributes 在记录中包括XML属性和名称空间声明作为字段属性。如果未选择,则XML属性和名称空间声明作为字段包含在记录中。

    注意: 只有在目标中使用SDC RPC数据格式时,字段属性才会自动包含在写入目标系统的记录中。有关使用字段属性的更多信息,请参见字段属性。

    默认情况下,未选择该属性。

    Max Record Length (chars)

    记录中的最大字符数。较长的记录将转移到管道以进行错误处理。

    此属性可以受Data Collector解析器缓冲区大小的限制。有关更多信息,请参见最大记录大小。

    Charset 要处理的文件的字符编码。
    Ignore Control Characters 除去制表符,换行符和回车符以外的所有ASCII控制字符。