Jython Evaluator

支持的管道类型:

  •  Data Collector

Jython Evaluator处理器使用自定义的Jython代码来处理数据。使用Jython Evaluator处理器将自定义Jython代码合并到管道中。该处理器支持Jython 2.7.x版本

您可以为处理器开发以下脚本:

  • 初始化脚本-可选的初始化脚本,用于设置任何必需的资源或连接。管道启动时,初始化脚本将运行一次。
  • 主处理脚本-处理数据的主脚本。根据配置的处理模式,为每个记录或每批数据运行主脚本。
  • 销毁脚本-可选的销毁脚本,用于关闭处理器打开任何资源或连接当管道停止时,销毁脚本将运行一次。

当您在管道中使用Jython Evaluator处理器时,Data Collector会将一批数据传递给处理器,然后将数据转换为脚本友好的数据结构以进行处理。

您可以从脚本中调用外部Java代码。Jython Evaluator处理器提供了广泛的示例代码,可用于开发脚本。

在配置处理器时,您可以指定处理模式,输入脚本,并指定访问记录的方法以及脚本中使用的所有脚本参数。

处理模式

您可以选择Jython Evaluator用于处理主脚本的处理模式。您可以在每种处理模式下使用相同的脚本。但是,在以批处理模式运行之前,应在主脚本中包括错误处理。

Jython Evaluator为主脚本提供以下处理模式:

Record by Record
处理器为每个记录调用脚本。处理器将记录作为映射传递到脚本,并分别处理每个记录。
该脚本不需要错误处理逻辑。错误记录将传递给处理器以进行错误处理。处理器根据“On Record Error”属性处理错误记录。
使用此模式可以避免在代码中包含错误处理逻辑。由于此模式为每个记录调用脚本,因此管道性能将受到负面影响。
Batch by Batch
处理器为每个批次调用脚本。处理器将批次作为列表传递到脚本,并一次处理该批次。
在脚本中包括错误处理逻辑。没有错误处理逻辑,单个错误记录会将整个批次发送到处理器以进行错误处理。处理器根据“On Record Error”属性处理错误记录。
使用此模式可以通过一次处理一批数据来提高性能。

Jython脚本对象

您可以根据脚本类型在Jython Evaluator处理器中使用不同的脚本对象:

脚本类型 有效的脚本对象
Init 您可以在初始化脚本中使用以下脚本对象:

  • state
  • log
  • sdc
  • sdcFunctions(不建议使用)
Main 您可以在主脚本中使用以下脚本对象:

  • records
  • state
  • log
  • output
  • error
  • sdc
  • sdcFunctions(不建议使用)
Destroy 您可以在销毁脚本中使用以下脚本对象:

  • state
  • log
  • sdc
  • sdcFunctions(不建议使用)
注意:您不需要使用脚本对象从Jython Evaluator处理器调用管道运行时参数。您可以在任何处理器脚本中简单地使用以下语法:${<parameter name>}

每种脚本类型中的脚本对象均相同:

records
要处理的记录的集合。根据您使用的处理模式,记录对象包含不同的元素:

  • Record by Record-记录数组包含一个记录元素。记录包含单个value元素。该 value元素包含记录中的数据。
  • Batch by Batch-记录数组包括批次中的所有记录。
映射和列表记录字段分别映射到Jython dictionaries和lists。
保留数据类型以保留未修改的值。对于修改的值,脚本引擎可能会更改原始类型。
state
一个对象,用于在init,main和destroy脚本的调用之间存储信息。状态是包含键/值对集合的映射对象。您可以使用该state对象来缓存数据,例如查找、计数器或与外部系统的连接。
state对象的功能非常类似于成员变量:

  • 该信息是暂时的,并且在管道停止或重新启动时会丢失。
  • state对象仅可用于定义它的处理器阶段的实例。如果管道以集群模式执行,则该state对象不会在节点之间共享。
state对象的相同实例可用于所有三个脚本。例如,您可以使用初始化脚本打开与数据库的连接,然后将对该连接的引用存储在state对象中。在主脚本中,您可以使用state对象访问打开的连接。然后,在销毁脚本中,您可以使用state 对象关闭连接。
警告:state对象最适合用于固定或静态数据集。在处理每条记录或批次时添加缓存会迅速消耗分配给Data Collector的内存,并导致out-of-memory异常。
log
将消息写入log4j日志的对象。使用 sdc.log访问为阶段配置的对象。该对象包括与日志文件中的级别相对应的方法:

  • info(<message template>, <arguments>...)
  • warn(<message template>, <arguments>...)
  • error(<message template>, <arguments>...)
  • trace(<message template>, <arguments>...)

消息模板可以包含位置变量,用大括号{}表示。在编写消息时,该方法用相应位置的参数替换每个变量——即,该方法用第一个参数替换第一次出现的{},依此类推。

output
将记录写入输出批处理的对象。使用 sdc.output访问为阶段配置的对象。该对象包括write(<record>) 方法。
error
传递错误记录以进行错误处理的对象。使用 sdc.error访问为阶段配置的对象。该对象包括write(<record>, <String message>)方法。
sdc
一个包装对象,用于访问脚本可用的常量,方法和对象。
sdc对象包含以下常量:

  • userParams – 包含脚本参数和在“Advanced”选项卡上配置的参数以及“Parameters in Script”属性的字典。
sdc对象包含以下方法:

  • createRecord(<String record ID>) – 返回带有传递的ID的新记录。传递一个唯一标识记录的字符串,该字符串包含足够的信息以跟踪记录来源。
  • isPreview() – 返回一个布尔值,该值指示管道是否处于预览模式。
  • getFieldNull(<record>, <String field path>) – 返回以下之一:
    • 如果该值不为null,则位于指定路径的字段的值
    • 如果值是null,为字段类型定义的空对象,例如 NULL_INTEGER或 NULL_STRING
    • 如果指定的路径上没有字段,则为未分配的空对象NULL
  • createMap(<Boolean list-map>) – 返回一个映射,用作记录中的一个字段。传 true以创建列表映射字段,或传 false创建映射字段。
  • createEvent(<String type>, <Integer version>) – 返回具有指定事件类型和版本的新事件记录。在实现事件方法之前,请验证该阶段是否启用了事件生成。
  • toEvent(<event record>) – 将事件记录发送到事件输出流。在实现事件方法之前,请验证该阶段是否启用了事件生成。
  • pipelineParameters() – 返回为管道定义的所有运行时参数的映射。
sdcFunctions
运行评估或修改数据功能的对象。

重要说明:该 sdcFunctions对象现已弃用,并将在以后的版本中删除。要评估和修改数据,请使用sdc对象中的方法。

sdcFunctions对象包括以下方法:

  • getFieldNull(<record>, <String field path>) -返回以下之一:
    • 如果该值不为null,则位于指定路径的字段的值
    • 如果值是null,为字段类型定义的空对象,例如 NULL_INTEGER或 NULL_STRING
    • 如果指定的路径上没有字段,则为未分配的空对象NULL
  • createRecord(<String record ID>)– 返回带有传递的ID的新记录。传递一个唯一标识记录的字符串,该字符串包含足够的信息以跟踪记录来源。
  • createMap(<Boolean list-map>) – 返回一个映射,用作记录中的一个字段。传 true以创建列表映射字段,或传 false创建映射字段。
  • createEvent(<String type>, <Integer version>) – 返回具有指定事件类型和版本的新事件记录。在实现事件方法之前,请验证该阶段是否启用了事件生成。
  • toEvent(<event record>) – 将事件记录发送到事件输出流。在实现事件方法之前,请验证该阶段是否启用了事件生成。
  • isPreview() -返回一个布尔值,该值指示管道是否处于预览模式。

访问记录详细信息

默认情况下,您使用脚本语言中的本地类型来访问脚本中的记录。但是,对于本地类型,您无法轻松访问Data Collector记录的所有功能,例如字段属性。

要直接将脚本中的记录作为Data Collector记录进行访问,请将“Record Type”高级属性设置为Data Collector Records,以配置阶段以使用Data Collector Java API处理脚本中的记录。

在脚本中,从Java包中引用所需的类com.streamsets.pipeline.api,然后使用适当的方法访问记录和字段。使用Data Collector Java API,脚本可以访问Data Collector 记录的所有功能。有关完整说明,请参见GitHub中的Data Collector Java API

例如,在脚本中包括以下几行以使用Data Collector Java API执行以下操作:

  • 创建一个名为的字符串字段new,并将其值设置为 new-value
  • 更新名为的现有字段,以oldattr属性的值设置 为attr-value
from com.streamsets.pipeline.api import Field
...
record.sdcRecord.set('/new', Field.create(Field.Type.STRING, 'new-value'))
record.sdcRecord.get('/old').setAttribute('attr', 'attr-value')
...

处理列表映射数据

在处理列表映射数据的脚本中,将数据视为映射。

列表映射是一种Data Collector数据类型,它允许您使用标准记录功能来处理分隔数据。当源端读取分隔数据时,默认情况下会生成列表映射字段。

Jython Evaluator可以读取和传递列表映射数据。但是要处理列表映射字段中的数据,请将该字段视为脚本中的映射。

类型处理

尽管Jython在处理数据时不使用类型信息,但将数据传递到管道的其余部分仍需要数据类型。使用 Jython Evaluator处理器时,请注意以下类型信息

Data type of null values
您可以将空值与数据类型相关联。例如,如果脚本为Integer字段分配了空值,则该字段将作为具有空值的整数返回给管道。
在Jython代码中使用常量来创建具有空值的特定数据类型的新字段。例如,可以通过将NULL_STRING类型常量分配给字段来创建具有空值的新String字段,如下所示:

record.value['new_field'] = sdc.NULL_STRING
Date fields
使用String数据类型创建一个新字段,以特定格式存储日期。例如,以下示例代码创建一个新的String字段,该字段使用以下格式存储当前日期 YYYY-MM-dd
# Define a date object to record the current date
import datetime as dt
newDate = dt.datetime.utcnow().strftime("%Y-%m-%d")

for record in records:
  try:
    # Create a string field to store the current date with the specified format
    record.value["date"] = newDate
    
    # Write record to the output
    sdc.output.write(record)

  except Exception as e:
    # Send record to error
    sdc.error.write(record, str(e))
Data type of modified values
阶段未修改的值将保留其原始类型。
修改后的数值数据将成为Double,其他类型的数据将保留其原始类型。

事件生成

您可以使用Jython Evaluator处理器为事件流生成事件记录。当您希望阶段根据脚本逻辑生成事件记录时,请启用事件生成。

与任何记录一样,您可以将事件记录向下游传递到目标端以进行事件存储,也可以传递给可以配置为使用事件的任何执行器。有关事件和事件框架的更多信息,请参阅数据流触发器概述。

生成事件:

  1. 常规选项卡上,选择生产事件属性。

    这样可以使用事件输出流。

  2. 在脚本中包括以下两个方法:
    • sdc.createEvent(<String type>, <Integer version>) -创建具有指定事件类型和版本号的事件记录。您可以创建新的事件类型或使用现有的事件类型。现有事件类型在其他事件生成阶段中记录。

      事件记录不包含任何记录字段。根据需要生成记录字段。

    • sdc.toEvent(<record>) -用于将事件传递到事件输出流。

使用记录头属性

您可以使用Jython Evaluator处理器读取,更新或创建记录头属性。

创建或更新头属性时,请使用映射。如果存在头属性,则脚本将更新该值。如果不存在,脚本将创建属性并将其设置为指定的值。

所有记录都包含一组内部记录头属性,这些阶段在处理记录时会自动更新。错误记录还具有其自己的内部头属性集。您无法在脚本中更改内部头属性的值。

某些阶段会生成自定义记录头属性,这些属性将以特定方式使用。例如,Oracle CDC Client源在记录头属性中指定记录的操作类型。事件生成阶段为事件记录创建一组事件头属性。 有关更多信息,请参见记录头属性。

您可以使用以下记录头变量来处理头属性:

  • record.<header name> -用于返回标头属性的值。
  • record.attributes -用于返回自定义记录头属性的映射,或创建或更新特定记录头属性。
提示:使用数据预览可以查看记录中包含的记录头属性。

查看记录头属性

您可以使用数据预览来查看与管道中任何给定点上的记录关联的记录头属性。若要查看记录头属性,请启用“Show Record/Field Header data”预览属性。

例如,下图显示了由Directory源在数据预览中生成的记录。

“Record Header”列表在管道的此点显示记录中的只读内部属性集。您可以使用record.<header name>变量返回这些属性的值。

“values”下的头属性是由Directory源创建的属性。您可以使用 record.attributes变量返回或修改这些属性。使用record.attributes变量创建头属性时,在数据预览期间它将显示在此位置。

访问整个文件格式记录

在处理整个文件数据格式的管道中,可以使用Jython Evaluator读取整个文件数据。

通过使用getInputStream() API创建输入流,处理器可以访问整个文件记录中的fileref字段。例如,您可以使用处理器读取fileref字段中的文件数据,然后使用该数据创建新记录。处理器可以访问fileref字段,但不能修改该字段中的信息。

使用以下行来创建然后读取输入流:

input_stream = record.value['fileRef'].getInputStream()
input_stream.read()

处理器读取输入流后,在代码中包含以下行以关闭输入流:

input_stream.close()

调用外部Java代码

您可以从Jython Evaluator处理器调用外部Java代码。只需安装外部Java库即可使其对处理器可用。然后,从为处理器开发的Jython脚本中调用外部Java代码。

有关安装其他驱动程序的信息,请参阅“安装外部库”。

要从Jython脚本调用外部Java代码,请在脚本中添加导入语句:

from <package> import <class name>

例如,假设您安装了Bouncy Castle JAR文件以计算SHA-3(Secure Hash Algorithm 3)摘要。将以下语句添加到脚本中:

from org.bouncycastle.jcajce.provider.digest.SHA3 import DigestSHA3

有关更多信息,请参见以下StreamSets博客文章:Calling External Java Code from Script Evaluators

配置Jython Evaluator

配置Jython Evaluator处理器以在管道中使用自定义Jython代码。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    Required Fields 必须包含用于将记录传递到阶段的记录的数据的字段。

    提示:您可能包括阶段使用的字段。

    根据为管道配置的错误处理,处理不包括所有必填字段的记录。

    Preconditions 必须评估为TRUE的条件才能使记录进入处理阶段。单击 添加以创建其他前提条件。

    根据为该阶段配置的错误处理,处理不满足所有前提条件的记录。

    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。对集群管道无效。
  2. 在“Jython”选项卡上,配置以下属性:
    Jython Evaluator属性 描述
    Record Processing Mode 确定Jython Evaluator如何处理数据:

    • Record by Record-单独处理记录。执行错误处理。
    • Batch by Batch-批量处理记录。在脚本中需要错误处理代码。

    默认为Batch by Batch。

    Init Script 要使用的可选初始化脚本。

    用于设置任何必需的连接或资源。管道启动时运行一次。

    Script 要使用的主要处理脚本。

    根据配置的处理模式,为每个记录或每批数据运行。

    Destroy Script 要使用的可选销毁脚本。

    用于关闭使用的所有连接或资源。管道停止时运行一次。

  3. 在“Advanced选项卡上,配置以下属性:
    高级属性 描述
    Record Type 脚本执行期间要使用的记录类型:

    • Data Collector Records-选择脚本何时使用Data Collector Java API方法访问记录。
    • Native Objects-选择脚本何时使用本机类型访问记录。

    默认值为“Native Objects”。

    Parameters in Script 脚本参数及其值。

    脚本使用sdc.userParams 字典访问值。