Stream Selector

支持的管道类型:

  •  Data Collector

  •  Data Collector Edge

Stream Selector根据条件将数据传递到流。为要创建的每个数据流定义一个条件。Stream Selector使用默认流来传递与用户定义条件不匹配的记录。

定义条件时,Stream Selector将创建一个输出,该输出连接到管道的下游分支。

Stream Selector将记录传递到记录与条件匹配的所有流。例如,您创建以下条件:

Stream 1: ${record:value("/State")==CA}
Stream 2: ${record:value("/ID")==null}

当您运行管道时,来自加利福尼亚的记录将传递到流1,具有丢失ID的记录将传递到流2,来自加利福尼亚的具有丢失ID的任何记录将传递到两个流。

默认流

默认流捕获不符合用户定义条件的记录。使用默认流来管理不匹配的记录。

配置Stream Selector时,将每个流连接到管道中的一个分支。默认流是阶段中的最后一个流。

您可以为默认流配置分支,以执行其他处理或直接写入目标。如果不需要默认流中的记录,则可以将流连接到Trash目标端。

例如,下图显示了处理推文数据的Stream Selector的条件:

第一个条件评估已删除的记录。第二个条件评估标记为敏感的记录。第三个默认条件捕获所有剩余的记录。

下图显示了管道中的Stream Selector:

第一个流将已删除的tweets传递到“Deleted Tweets”目标端。第二个流将带有敏感语言的新推文传递给Field Masker,以掩盖不适当的文本。第三个流(默认流)将所有剩余的tweets直接传递到“New Tweets”目标端。

流的采样条件

条件定义了传递到关联流的数据。满足条件的所有记录都将传递到流中。使用表达语言来定义条件。

定义条件时,通常会基于记录中一个或多个字段中的值。

采样条件

以下条件是您可能适合使用的常见方案:

${str:contains(record:value("/line"), "ERROR")}
检查“Line”字段中文本为“ERROR”的数据。
如果该字段包含“ERROR”,则记录将传递到流。
${str:contains(str:toLower(record:value("/message")), "fatal")}
在“message”字段中将字符串值小写,然后检查该字段中的数据是否“fatal”。
如果字段包含“fatal”,则记录将传递到流。
${record:value("/priority") > 1}
如果Priority字段中的值大于1,则记录将传递到流。
${record:value("/AccountID")==null}
如果AccountID字段为null,则记录将传递到流。

配置Stream Selector

使用Stream Selector根据条件将数据路由到不同的流。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    Required Fields 必须包含用于将记录传递到阶段的记录的数据的字段。

    提示:您可能包括阶段使用的字段。

    根据为管道配置的错误处理,处理不包括所有必填字段的记录。

    Preconditions 必须评估为TRUE的条件才能使记录进入处理阶段。单击 添加以创建其他前提条件。

    根据为该阶段配置的错误处理,处理不满足所有前提条件的记录。

    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。对集群管道无效。
  2. 在“Conditions选项卡上, 为要创建的每个条件单击“添加”图标。您可以使用简单或批量编辑模式来添加条件。
    每个新条件都会创建一个对应的输出位置。您无法编辑或删除默认流的条件。
  3. 输入每个输出位置的条件。

    (可选)单击Ctrl +空格键以帮助创建表达式。