JavaScript Scripting

支持的管道类型:

  •  Data Collector

JavaScript脚本起源运行JavaScript脚本来创建Data Collector记录。JavaScript脚本起源支持Java版本8u40和更高版本以及ECMAScript版本5.1。源端运行在Nashorn JavaScript引擎上。

该脚本在管道运行期间运行。源端可以支持复杂的多线程脚本或简单的单线程脚本。该脚本可以对阶段中配置的脚本参数起作用。脚本的基本流程必须执行以下操作:

  • 如果支持多线程处理,则创建线程
  • 创建批次
  • 创建记录
  • 将记录添加到批次
  • 处理批次
  • 管道停止时停止

该脚本必须应对所有必需的处理,例如生成事件、发送错误以进行处理以及在用户停止管道或没有更多数据时停止。您可以从脚本中调用外部Java代码。

要处理重新启动,脚本必须维护偏移量以跟踪源端停止并应重新启动的位置。对于偏移量,脚本需要一个称为实体的键,其与一个唯一值关联。对于多线程处理,实体必须标识每个线程处理的数据分区。处理批次的方法为每个实体保存一个偏移值。

例如,假设您的脚本使用API​​读取URL格式为../<state>&page=<number>的数据,从而处理有关美国各州的数据。在脚本中,每个线程读取一个州的数据,直到完成该州为止。您可以将实体设置为州,并将偏移量设置为页号。

源端提供了广泛的示例代码,可用于开发脚本。

配置源端时,输入脚本和所需的输入,包括批处理大小和线程数,以及脚本中使用的所有脚本参数。

脚本对象

JavaScript Scripting源中的脚本可以使用以下对象:

record
包含要处理的字段和值的对象。record使用sdc.createRecord(<String record ID>)方法创建新对象 。对象可用的方法取决于源端配置。您可以将记录类型配置为本机对象或Data Collector记录。
有关更多信息,请参见访问记录详细信息。
batch
收集记录以一起处理的对象。batch使用sdc.createBatch() 方法创建新对象。该对象包括以下方法:

  • add(<record>) -将记录追加到批次中。
  • add(<record[]>) -将记录列表追加到批次中。
  • addError(<record>,<String message>)-将错误记录追加到批次中。附加的错误记录包含相关的错误消息。
  • addEvent(<event record>)-将事件追加到批次。在实现事件方法之前,请验证该阶段是否启用了事件生成。
  • size() -返回批次中的记录数。
  • process(<String entity>, <String offset>) -处理批次并提交命名实体的偏移量。
  • getSourceResponseRecords() -处理批次后,检索下游阶段返回的所有响应记录。
log
将消息写入log4j日志的对象。使用 sdc.log访问为该阶段配置的对象。该对象包括与日志文件中的级别相对应的方法:

  • info(<message template>, <arguments>...)
  • warn(<message template>, <arguments>...)
  • error(<message template>, <arguments>...)
  • trace(<message template>, <arguments>...)

消息模板可以包含位置变量,用大括号{}表示。在编写消息时,该方法用相应位置的参数替换每个变量——即,该方法用第一个参数替换第一次出现的{},依此类推。

sdc
包装器对象,用于访问用户脚本可用的常量、方法和对象。
sdc对象包含以下常量:

  • lastOffsets-包含每个实体最后保存的偏移量的字典。在脚本的开头使用,以读取与成功处理的批次关联的最后一个值。
    注意:在管道运行时,该常量不会更新。
  • batchSize-单个批次中创建的记录数。在“Performance”选项卡上配置“Batch Size”属性。
  • nThreads-要同时运行的线程数。在“Performance”选项卡上使用“Number of Threads”属性进行配置。
  • userParams -字典,其中包含脚本参数和在“Advanced”选项卡上配置的参数以及“Parameters in Script”属性。
sdc对象包含以下方法:

  • createBatch() -返回新批次。
  • createRecord(<String record ID>)-返回带有传递的ID的新记录。传递一个唯一标识记录的字符串,该字符串包含足够的信息以跟踪记录源。
  • isStopped() -返回一个布尔值,该值指示管道是否已停止。
  • isPreview() -返回一个布尔值,该值指示管道是否处于预览模式。
  • getFieldNull(<record>, <String field path>) -返回以下之一:
    • 如果该值不为null,则位于指定路径的字段的值
    • 为字段类型定义的空对象,例如 NULL_INTEGER或 NULL_STRING,如果值是null
    • NULL如果指定的路径上没有字段,则为未分配的空对象
  • createMap(<Boolean list-map>) -返回地图,用作记录中的字段。通过 true创建列表地图字段,或 false创建地图字段。
  • createEvent(<String type>, <Integer version>)-返回具有指定事件类型和版本的新事件记录。在实现事件方法之前,请验证该阶段是否启用了事件生成。

多线程处理

JavaScript Scripting源可以基于“Number of Threads”属性使用多个并发线程来处理数据。

要启用多线程处理,请编写脚本以创建配置的线程数。每个线程必须创建一个批次,然后通过调用该batch.process(<String entity>, <String offset>)方法将批次传递给可用的管道运行器 。管道运行器是无源管道实例 ——一种包括所有处理器、执行器和目标端的管道实例,并处理源端之后的所有管道处理逻辑。

每个管道运行程序一次处理一批,就像在单个线程上运行的管道一样。当数据流变慢时,管道运行器会闲置等待,直到需要它们为止,并定期生成一个空批。您可以配置“Runner Idle Time”管道属性来指定间隔或选择退出空批次生成。

多线程管道保留每个批处理中的记录顺序,就像单线程管道一样。但是由于批次是由不同的流水线处理程序处理的,因此无法确保将批处理写入目的地的顺序。

例如,假设您编写脚本以使用多个线程以最后修改的时间戳的顺序读取文件,并且将源配置为使用五个线程。启动管道时,源端节点将创建五个线程,而Data Collector会创建匹配数量的管道运行器。

源端为五个最早的文件分配一个线程。每个线程处理其分配的文件,创建批数据并将每批数据传递给管道运行器。

线程完成文件处理后,源将根据上次修改的时间戳将该线程分配给下一个文件,直到处理完所有文件为止。

有关多线程管道的更多信息,请参见《多线程管道概述》。

访问记录详细信息

默认情况下,您在脚本中使用脚本语言中的本地类型来访问记录。但是,对于本地类型,您无法轻松访问Data Collector记录的所有功能,例如字段属性。

要直接将脚本中的记录作为Data Collector记录进行访问,请将“Record Type”高级属性设置为Data Collector Records,以配置阶段使用Data Collector Java API处理脚本中的记录。

在脚本中,从Java包中引用所需的类com.streamsets.pipeline.api,然后使用适当的方法访问记录和字段。使用Data Collector Java API,脚本可以访问Data Collector记录的所有功能。有关完整说明,请参见GitHub中的Data Collector Java API

例如,在脚本中包括以下几行以使用Data Collector Java API执行以下操作:

  • 创建一个名为的字符串字段new,并将其值设置为 new-value
  • 更新名为的现有字段,以oldattr属性的值设置 为attr-value
from com.streamsets.pipeline.api import Field
...
record.sdcRecord.set('/new', Field.create(Field.Type.STRING, 'new-value'))
record.sdcRecord.get('/old').setAttribute('attr', 'attr-value')
...

类型处理

尽管JavaScript在处理数据时不使用类型信息,但是将数据传递到管道的其余部分需要数据类型。使用JavaScript Scripting源时,请注意以下类型信息

Data type of null values
您可以将空值与数据类型相关联。例如,如果脚本为Integer字段赋了空值,则该字段将具有空值的整数返回给管道。
在JavaScript代码中使用常量创建具有空值的特定数据类型的新字段。例如,可以通过将NULL_STRING类型常量赋给字段来创建具有空值的新String字段,如下所示:

records[i].value.new_field = sdc.NULL_STRING
Date fields
使用String数据类型创建一个新字段,以特定格式存储日期。例如,以下示例代码创建一个新的String字段,该字段使用以下格式存储当前日期 YYYY-MM-dd

 // Define a date object to record the current date
var date = new Date();
var curBatch = sdc.createBatch();

for(var i = 0; i < records.length; i++) {
 try {
   // Create a string field to store the current date with the specified format
   records[i].value.date = date.getFullYear()+ "-" + date.getMonth() + "-" + date.getDate();
   // Add record to the current batch
   curBatch.add(record);
 } catch (e) {
   // Send record to error
   curBatch.addError(records[i], e.toString());
 }
}

// Process the current batch
curBatch.process(entityName, offset.toString());
Values retain their original type
无论脚本是否修改值,值都将保留其原始类型。

事件生成

您可以使用JavaScript脚本编写源为事件流生成事件记录。当您希望阶段根据脚本逻辑生成事件记录时,请启用事件生成。

与任何记录一样,您可以将事件记录向下游传递到目标端以进行事件存储,也可以传递给可以配置为使用事件的任何执行器。有关事件和事件框架的更多信息,请参阅数据流触发器概述。

生成事件:

  1. “General”选项卡上,选择“Produce Events”属性。

    这样可以使用事件输出流。

  2. 在脚本中包括以下两个方法:
    • sdc.createEvent(<String type>, <Integer version>) -创建具有指定事件类型和版本号的事件记录。您可以创建新的事件类型或使用现有的事件类型。现有事件类型在其他事件生成阶段中记录。

      事件记录不包含任何记录字段。根据需要生成记录字段。

    • batch.toEvent(<record>) -用于将事件记录追加到批次并将事件传递到事件输出流。

事件记录

由JavaScript Scripting源生成的事件记录具有与事件相关的标准记录头属性:

记录标题属性 描述
sdc.event.type 事件类型,由sdc.createEvent方法指定。
sdc.event.version 事件版本,由sdc.createEvent 方法指定。
sdc.event.creation_timestamp 阶段创建事件的时间戳记。

记录头属性

JavaScript Scripting源中的脚本可以创建自定义记录头属性。 管道逻辑可以使用记录头属性来影响数据流。因此,您可以为特定目的创建自定义记录头属性。有关更多信息,请参见记录头属性。

所有记录都包含一组内部记录头属性,这些阶段在处理记录时会自动更新。错误记录还具有其自己的内部头属性集。您无法在脚本中更改内部头属性的值。

您可以使用以下记录头变量来处理头属性:

  • record.<header name> -用于返回标头属性的值。
  • record.attributes -用于返回自定义记录头属性的映射,或创建或更新特定记录头属性。
提示:使用数据预览可以查看记录中包含的记录头属性。

调用外部Java代码

您可以从JavaScript脚本源调用外部Java代码。只需安装外部Java库以使其可用于源端。然后,从为源端开发的脚本中调用外部Java代码。

有关安装其他驱动程序的信息,请参阅“安装外部库”。

注意: JavaScript脚本源端包含在Data Collector基本阶段库中。当安装用于JavaScript Scripting源的外部Java库时,必须使外部库可用于基本阶段库。

要调用外部Java代码,只需在脚本中获取对Java类型的引用,如下所示:

var <class name> = Java.type('<package>.<class name>');

例如,假设您安装了Bouncy Castle JAR文件以计算SHA-3(Secure Hash Algorithm 3)摘要。将以下语句添加到脚本中:

var DigestSHA3 = Java.type('org.bouncycastle.jcajce.provider.digest.SHA3.DigestSHA3');

有关更多信息,请参见以下StreamSets博客文章: Calling External Java Code from Script Evaluators

配置JavaScript Scripting源

配置JavaScript Scripting源,以运行JavaScript脚本来创建Data Collector记录。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    Produce Events 发生事件时生成事件记录。用于事件处理。
    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • Send to Error-将记录发送到管道以进行错误处理。
    • Stop Pipeling-停止管道。
  2. 在“Performance选项卡上,配置以下属性:
    性能属性 描述
    Batch Size 单个批次中要生成的记录数。

    脚本使用sdc.batchSize 常量访问此值并实现批处理。

    默认值是1000。

    Number of Threads 并行并行生成数据的线程数。

    脚本使用sdc.numThreads常量访问此值, 并实现多线程处理。

  3. 在“Script选项卡上,配置以下属性:
    脚本属性 描述
    User Script 在管道执行期间运行的脚本。

    提示:要切换全屏编辑,请在光标位于编辑器中时按F11或Esc(取决于操作系统)。
  4. 在“Advanced选项卡上,配置以下属性:
    高级属性 描述
    Record Type 脚本执行期间要使用的记录类型:

    • Data Collector Records-选择脚本何时使用Data Collector Java API方法访问记录。
    • Native Objects-选择脚本何时使用本机类型访问记录。

    默认值为“Native Objects”。

    Parameters in Script 脚本参数及其值。

    脚本使用sdc.userParams 字典访问值。