基础教程

基础教程创建了一个管道,该管道从目录中读取文件,在两个分支中处理数据,然后将所有数据写入文件系统。您将使用数据预览来帮助配置管道,并创建数据警报并运行管道。

以下是构建和运行基本管道的高层次步骤:

  1. 配置管道属性,主要是错误处理。
  2. 添加Directory源端以表示要处理的数据。
  3. 预览源数据以确定管道所需的字段级详细信息。
  4. 使用Stream Selector将信用卡交易路由到主要分支,将现金交易路由到次要分支。我们将定义一个必填字段,以丢弃没有付款类型的记录。
  5. 配置Jython Evaluator以执行自定义处理,该处理将根据信用卡号确定信用卡类型。
  6. 添加一个Field Masker来屏蔽信用卡号。使用必填字段可丢弃没有信用卡号的记录。
  7. 将两个分支都连接到Local FS目标端。
  8. 在次要分支中,使用Expression Evaluator将字段添加到现金记录中以匹配信用卡记录。使用数据预览来验证要添加的字段。
  9. 如果有太多信用卡付款缺少信用卡号,请添加数据规则以发出警报。
  10. 启动管道并监视结果。

创建管道并定义管道属性

配置管道时,需要确定如何处理错误记录。您可以丢弃它们,或者——更有效率地——将它们写入文件,另一个管道或Kafka。

将错误记录写到这些位置之一,作为无需停止管道即可处理错误记录的便捷方法。

本教程将记录写到本地文件,但是如果您想将错误记录写到Kafka,也可以。

  1. 如果尚未登录,请登录到Data Collector
  2. Home或Getting Started页面中,点击Create New Pipeline。
    提示:要转到主页,请单击“Home”图标。
  3. 在“New Pipeline窗口中,输入管道标题和可选描述,然后选择在Data Collector上运行管道。
  4. 点击“Save”
    一个空的画布显示:

    请注意一些有用的区域:

    名称 描述
    1 管道创建帮助栏 表示缺少源端,并提供了可供选择的源端列表。

    如果未显示“Pipeline Creation Help Bar”,则可能已将其禁用。要启用它,请在Data Collector窗口的右上角,单击Help > Settings。清除“Hide Pipeline Creation Help Bar选项。

    2 问题图标 单击以显示通过隐式验证发现的管道问题列表。
    3 阶段库面板 默认情况下显示可用源端的列表。
    4 阶段菜单 更改在阶段库中显示的阶段。默认显示所有阶段。
    5 属性面板/预览面板/监视器面板 配置管道时,“Properties”面板显示管道或选定阶段的属性。您可以调整,最小化和最大化面板。

    预览数据时,“Preview”面板显示所选阶段或阶段组的输入和输出数据。

    监视正在运行的管道时,“Monitor”面板将显示实时指标和统计信息。

    注意:某些图标和选项可能不会显示在UI中。显示的项目基于您正在执行的任务和分配给您的用户帐户的角色。
    阶段库图标 切换阶段库面板的显示。
  5. 在“Properties”面板中,单击“Error Records选项卡。对于“Error Records属性,选择“Write to File
    这会将错误记录写入文件,因此您无需停止管道即可处理错误记录。
  6. 单击“Error Records – Write to File选项卡,然后配置以下属性。

    对未列出的属性使用默认值:

    写入文件属性 描述
    Directory 错误记录文件的目录。输入您为教程设置的目录。我们建议:

    /<base directory>/tutorial/error
    注意:为防止验证错误,该目录必须已经存在。
    Files Prefix 这定义了错误记录文件的前缀。

    默认情况下,文件以“SDC”为前缀,并带有一个返回Data Collector ID的表达式,但这远远超出了此处的需要。

    删除默认值,然后输入以下前缀: err_

    Max File Size 对于本教程,请将文件大小减小到更易于管理的大小,例如5或1 MB。

现在我们将开始构建管道…

配置源端

源端代表管道的传入数据。配置源端时,可以定义如何连接到源系统,要处理的数据类型以及特定于源端的其他属性。

Data Collector提供了广泛的源端。我们将使用Directory源端来处理您下载的示例CSV文件。
  1. 要将阶段添加到画布,请从“Pipeline Creation Help Bar”中,单击Select Origin > Directory。或者,在阶段库面板中,单击Directory源:
    源端显示在画布中,“Properties”面板显示阶段的属性。
  2. 在“Properties”面板中,单击“Files选项卡,然后配置以下属性。

    对未列出的属性使用默认值:

    目录属性
    Files Directory 您保存示例文件的目录。输入绝对路径。

    我们建议:/<base directory>/tutorial/origin

    File Name Pattern Directory源仅处理目录中与文件名模式匹配的文件。

    教程样本文件名为nyc_taxi_data.csv。由于该文件是目录中的唯一文件,因此可以使用一些通用名称,例如星号通配符(*)或*.csv。

    如果您不想处理的目录中还有其他.csv文件,则可能更具体,例如: nyc_taxi*.csv

    或者,如果要处理带有其他城市前缀的文件,则可以使用*taxi*.csv

    Read Order 当目录包含多个文件时,这确定了读取顺序。您可以根据上次修改的时间戳或文件名进行阅读。因为它更简单,所以我们使用Last Modified Timestamp
  3. 单击“Data Formats”选项卡,然后配置以下属性。

    对未列出的属性使用默认值:

    分隔属性 描述
    Data Format 在样本文件中的数据分隔,所以选择Delimited。
    Delimiter Format Type 由于示例文件是标准CSV文件,因此请使用默认值:Default CSV (ignores empty lines)
    Header Line 该示例文件包含一个头部,因此选择 With Header Line
    Root Field Type 此属性确定Data Collector如何处理分隔数据。使用默认的 List-Map

    这使您可以使用标准功能来处理分隔数据。对于List根字段类型,您需要使用分隔数据功能。

这是目前阶段和管道的样子:

请注意页面上的错误图标。当您将鼠标悬停在“Directory”错误图标上或单击“Issues”图标时,将显示一条验证消息,指出该源端具有开放的流——这意味着它尚未连接到任何对象。接下来,我们会处理。

预览数据

为了更加熟悉数据集并收集一些重要的管道配置细节,让我们预览源数据。

以下是我们配置管道所需的一些关键细节:

  • 包含付款信息的字段 – 我们将使用它来在Stream Selector中路由数据。
  • 包含信用卡号的字段 – 我们将使用它来屏蔽Field Masker中的数据。

当您访问字段中的数据时,您可以为该字段指定字段路径。字段路径取决于记录的复杂性:/<fieldname>对于简单记录和<path to field>/<fieldname>更复杂的记录。

由于我们使用的是List-Map根字段类型,因此可以使用 /<fieldname>

要开始数据预览,必须连接所有阶段并定义所有必需的属性——尽管不一定正确。由于已配置了源端并且是唯一的阶段,因此管道应准备好按原样进行预览。

  1. 在管道画布上方,单击“Preview图标: 
    如果未启用该图标,请确保已完成本教程的所有先前步骤。如果列表中显示多个问题,请更正Validation_0011以外的任何问题。

    Validation_0011仅表明Directory源尚未连接任何东西,并且不会阻止数据预览。

  2. 在“Preview Configuration对话框中,配置以下属性。

    对未列出的属性使用默认值:

    数据预览属性 描述
    Preview Source 使用默认的“Configured Source 来使用样本源数据。
    Write to Destinations and Executors 默认情况下,未选择此属性。通常,应清除此属性,以避免将数据写入目标系统或触发目标系统中的任务。
    Show Field Type 默认情况下,此属性处于选中状态。保持选中状态以查看记录中字段的数据类型。
    Remember the Configuration 选择以在每次运行数据预览时使用这些属性。

    选择此选项时,UI将进入数据预览,而不会再次显示此对话框。

  3. 点击“Run Preview”
    预览面板在列表视图中显示目录的10条输出记录的列表。如果深入一点,您会看到每个记录显示字段名称和值的顺序列表以及每个字段的数据类型。

    因为数据是从文件中读取的,所以所有字段都是字符串:

    请注意,在“Preview”面板中,预览记录显示在“Records选项卡上:。要查看或更改属性,请使用“Stage Configuration选项卡:。要更改预览属性,请使用“Preview Configuration选项卡:

  4. 要查看数据的完整视图,请单击“Table View图标: 
    “Preview”面板显示数据的前几列。

  5. 要查看所有列,请单击全部显示
    如果向右滚动,则可以验证 credit_card字段包含前三个记录的信用卡号。当您滚动回到 payment_type字段时,请注意,这对应于付款类型为“CRD”的记录。
  6. 现在我们有了所需的信息,单击“Close Preview图标:
现在我们知道以下内容:

  • 付款类型信息在payment_type字段中,而信用卡信息在credit_card字段中。
  • 要在表达式中使用这些字段,我们将使用它们的字段路径: /payment_type和 /credit_card
  • 要路由使用信用卡付款的记录,我们将查找付款类型为“CRD”的记录。

使用Stream Selector路由数据

要将数据路由到不同的流以进行处理,我们使用Stream Selector处理器。

Stream Selector根据用户定义的条件将数据路由到不同的流。用户定义条件未捕获的任何数据都将路由到默认流。

我们会将信用卡交易路由到信用卡流中进行处理。所有其他交易将进入默认流。

我们还将定义一个必填字段,以删除没有付款类型的记录。定义必填字段时,记录必须包含指定字段的数据才能进入阶段。在必填字段中不包含数据的记录将发送到管道以进行错误处理。如果您将管道配置为写入文件,那么错误记录就在那里。

为了表示字段中的数据,我们使用该 record:value函数。这将返回与该字段关联的字段值。

要捕获使用信用卡付款的记录,请使用以下条件:

${record:value('/payment_type') == 'CRD'}

请注意,我们将表达式用美元符号和大括号括起来。您可以在字符串周围使用单引号或双引号。有关表达式语言的更多信息,请参见表达式语言。

  1. 在管道创建帮助栏中,单击Select Processor to Connect > Stream Selector。或者,在“Stage Library”面板中,选择“Stream Selector处理器(),然后将Directory源连接到该处理器。
  2. 在“General选项卡上,单击“Required Fields文本框。
    将显示可用字段的列表,因为您已经执行了数据预览。它还显示管道何时有效用于数据预览。
  3. 要丢弃没有付款类型信息的记录,请选择以下字段:/payment_type
    如果未出现列表,则可以手动输入字段路径:/payment_type。
  4. 要配置流选择器条件,请单击“Conditions选项卡。
    显示默认流的条件。默认流表示未由其他条件捕获的任何记录。
  5. 单击添加图标:
    条件文本框将显示在“Property”面板中,并且相应的输出位置将出现在画布上的阶段上。
  6. 以下条件捕获使用信用卡付款的记录。您可以复制和粘贴表达式,但是尝试键入它以查看表达式完成功能如何帮助您选择函数并确保有效语法。
    ${record:value('/payment_type') == 'CRD'}
    符合此条件的所有记录将传递到第一个输出流。默认情况下将捕获所有其他记录,并通过第二个输出流。
流选择器应如下所示:

使用Jython进行信用卡分类

接下来,我们将评估信用卡号以确定信用卡类型。您可以使用Expression Evaluator进行相同的计算,但是使用简短的脚本,Jython Evaluator更加容易。

您可以将自定义脚本与JavaScript Evaluator和Jython Evaluator一起使用,以执行使用其他Data Collector处理器不容易执行的处理。使用脚本处理list-map数据时,脚本必须将数据视为map。

我们提供的Jython脚本会创建一个附加字段credit_card_type,并通过评估信用卡号的前几位数来生成信用卡类型。如果记录的信用卡付款类型没有相应的信用卡号,则脚本将返回错误消息。

  1. Jython Evaluator处理器添加到画布。

    如果舞台库中未列出Jython Evaluator处理器,则需要首先安装Jython阶段库。默认情况下,完整的Data Collector安装包括Jython阶段库。核心安装不包括Jython阶段库。

    1. Data Collector右上方工具栏中,单击“Package Manager 图标:
    2. 在“Package Manager”搜索字段中,键入“jy”,以便程序包管理器显示Jython阶段库:
    3. 选择该库,单击“Install图标(),然后单击“Install以确认您的选择。
      Data Collector  将安装外部库并显示一条消息,提示您重新启动Data Collector
    4. 单击重新启动Data Collector
    5. 重新启动Data Collector之后,在画布上打开教程管道,然后将Jython Evaluator处理器添加到画布。
  2. 将流选择器的第一个输出位置连接到Jython Evaluator。
    这会将通过信用卡支付的记录路由到Jython Evaluator。
  3. 选择Jython Evaluator后,在“Properties”面板中,单击“Jython”选项卡。
  4. 使用默认的“Batch by Batch”处理模式来处理数据,而不是逐条记录。
  5. 在“Script文本框中,查看注释中的信息,然后将其删除。粘贴以下脚本:
    try: 
      for record in records:
        cc = record.value['credit_card']
        if cc == '':
          error.write(record, "Payment type was CRD, but credit card was null")
          continue
    
        cc_type = ''
        if cc.startswith('4'):
          cc_type = 'Visa'
        elif cc.startswith(('51','52','53','54','55')):
          cc_type = 'MasterCard'
        elif cc.startswith(('34','37')):
          cc_type = 'AMEX'
        elif cc.startswith(('300','301','302','303','304','305','36','38')):
          cc_type = 'Diners Club'
        elif cc.startswith(('6011','65')):
          cc_type = 'Discover'
        elif cc.startswith(('2131','1800','35')):
          cc_type = 'JCB'
        else:
          cc_type = 'Other'
    
        record.value['credit_card_type'] = cc_type
    
        output.write(record)
    except Exception as e:
      error.write(record, e.message)
    注意:缩进不正确会导致Jython验证错误。为了获得最佳结果,请从联机帮助中复制脚本。从PDF复制脚本可能导致缩进不正确。

    要启动上下文相关的帮助,请单击“Properties”面板中的“Help图标。然后在目录中,向下滚动以找到Data Collector Tutorial chapter > Basic Tutorial > Use Jython for Card Typing。

在Jython评估程序中,脚本应如下所示:

屏蔽信用卡号

现在,通过使用Field Masker屏蔽信用卡号来防止敏感信息到达内部数据库。

Field Masker提供固定长度和可变长度的掩码,以掩码字段中的所有数据。要显示数据中的指定位置,可以使用自定义掩码。要显示数据中的一组位置,可以使用正则表达式掩码定义数据的结构,然后显示一个或多个组。

对于信用卡号,我们将使用以下正则表达式来屏蔽除最后四位数字以外的所有数字:

(.*)([0-9]{4})

正则表达式定义了两组,因此我们可以揭示第二组。

  1. 在画布上添加Field Masker处理器,然后将Jython Evaluator连接到画布。
  2. 在“Properties”面板中,单击“Mask选项卡。
  3. 单击要屏蔽的字段字段。滚动浏览字段列表,然后选择代表信用卡数据的字段: /credit_card
    当管道对数据预览有效时,将显示字段列表。如果未显示该列表,则可以手动输入字段路径。
  4. 要将正则表达式用作掩码并显示信用卡号的后4位,请按以下方式配置其余属性:
    字段屏蔽属性 配置
    Mask Type Regular Expression
    Regular Expression (.*)([0-9]{4})
    Groups to Show 2
这是Field Masker在管道中的外观:

写到目标端

Data Collector可以将数据写入到多个目标端。Local FS目标端将写入本地文件系统中的文件。

配置Local FS目标端时,将定义目录模板。这将确定所创建的输出目录的命名约定。

  1. Local FS目标端添加到画布,然后将Field Masker连接到画布。
  2. 单击输出文件选项卡,然后配置以下属性。

    对未列出的属性使用默认值:

    Local FS属性 配置
    Files Prefix 定义输出文件名的前缀。

    默认情况下,文件以“SDC”为前缀,并带有一个返回DataCollector ID的表达式,但这远远超出了此处的需要。

    让我们简化并使用“out_”代替。

    Directory Template 默认情况下,目录模板包含datetime变量,以为输出文件创建目录结构。这旨在用于写入大量数据。

    由于我们只需要处理示例文件,因此不需要datetime变量。继续并删除默认值,然后输入要在其中写入文件的目录。

    我们建议:/<base directory>/tutorial/destination

    Max File Size (MB) 对于本教程,让我们将文件大小减小到可管理的范围,例如5或1。
  3. 单击数据格式选项卡,然后配置以下属性。

    对未列出的属性使用默认值:

    分隔属性 配置
    Data Format Delimited
    Header Line With Header Line
这就完成了主分支:

现在,我们将返回到Stream Selector并完成辅助分支。

用Expression Evaluator添加一个对应的字段

Jython Evaluator脚本在信用支付分支中添加了一个新字段。为确保所有记录具有相同的结构,我们将使用Expression Evaluator将相同的字段添加到非信用分支。

这样可以确保所有记录在写入目标端时都具有相同的格式。

为此,让我们使用数据预览来验证Jython Evaluator如何将信用卡类型添加到记录中。

  1. 单击“Preview”图标。
  2. 在管道中,单击Jython Evaluator以查看该处理器的输出。
  3. 展开第一个输出记录,向下滚动,然后注意以绿色突出显示的新字段:/credit_card_type
  4. 单击“Close Preview”
  5. 将一个Expression Evaluator处理器添加到画布,并将Stream Selector的第二个默认流 与其连接。
  6. 单击“Expressions”选项卡。
  7. 配置以下“Field Expression”属性:
    输出字段 表达式
    /credit_card_type n/a
    这将创建一个credit_card_type字段,指示该信息不适用。
    由于我们在表达式中使用“n/a”作为常量,因此我们无需在表达式中使用美元符号和方括号。但是,如果要使用它们,可以将它们定义为${‘credit_card_type’}和${‘N/A’}。
  8. 将Expression Evaluator链接到Local ​​FS目标端。
    这会将数据从该分支流传输到目标端,从而合并两个分支的数据:

创建数据规则和警报

现在,在运行基本管道之前,让我们添加一个数据规则和警报。数据规则是用户定义的规则,用于检查在两个阶段之间移动的数据。它们是查找异常值和异常数据的有力方法。

数据规则和警报需要详细了解通过管道的数据。有关更常规的管道监视信息,可以使用度量标准规则和警报。

Jython Evaluator中的脚本为没有信用卡号的信用卡交易创建错误记录。我们可以创建一个数据规则并发出警报,以便在记录计数达到指定阈值时通知我们。

我们将使用带有record:value()函数的表达式来识别信用卡号字段何时/credit_card为空。该函数返回指定字段中的数据。

  1. 在Stream Selector和Jython Evaluator之间,选择链接或“Data Inspection”图标:
    “Data Rules选项卡显示在“Preview”面板中。
  2. 点击“Add”。
  3. 在“Data Rule对话框中,配置以下属性。

    对未列出的属性使用默认值:

    数据规则属性 描述
    Label Missing Card Numbers
    Condition ${record:value(“/credit_card”) == “”}
    Sampling Percentage 35
    Alert Text At least 10 missing credit card numbers!
    Threshold Value 10

    这将创建一个警报,该警报在找到10条在信用卡支付流中不包含信用卡号的记录后关闭。

    注意:对于较大的数据集,较小的采样百分比和较高的阈值可能是合适的,但是出于教程的目的,我们将使用这些数字。
  4. 点击“Save”
    规则显示在数据规则列表中。并且“Data Inspection”图标变暗,表明已为流配置了数据规则。
  5. 要启用数据规则和警报,请单击“Active
    注意,“Data Inspection”图标将变为深灰色,以表明数据规则在流上处于活动状态。

运行基础管道

现在基础管道已完成,您可以通过单击开始图标来启动它 

UI进入“Monitor”模式,并在“Monitor”面板中显示摘要统计信息。在管道运行的某个时刻,数据警报触发,并且触发的数据警报的位置变为红色:

单击数据警报图标以查看数据警报通知。然后,关闭通知并浏览“Monitor”面板中的可用信息。

请注意,当您选择画布的未使用部分时,“Monitor”面板将显示整个管道的监视信息。选择一个阶段时,它将显示该阶段的信息。

Jython Evaluator显示40条错误记录。单击错误记录号以查看缓存的错误记录和相关错误消息的列表。

您也可以选择红色的“Data Inspection”图标以查看有关数据警报的信息并查看与数据警报关联的错误记录。

要继续配置扩展教程,请停止管道。

如果愿意,您可以使用数据预览来逐步完成管道,以检查每个阶段如何处理数据。但是,如果您卡在那里,可以在扩展教程中与我们一起进行。