Snowflake

支持的管道类型:

  •  Data Collector

Snowflake目标端将数据写入Snowflake数据库中的一个或多个表。您可以使用Snowflake目标端写入任何可访问的Snowflake数据库,包括托管在Amazon S3、Microsoft Azure和私有Snowflake安装的数据库。

Snowflake目标端将CSV文件转储到Amazon S3或Microsoft Azure中的内部Snowflake阶段或外部阶段。然后,目标端将命令发送到Snowflake以处理已暂存的文件。可以使用JDBC Producer目标端直接写入Snowflake,但出于性能原因不建议使用。

您可以使用Snowflake目标端写入新数据或变更数据捕获(CDC)到Snowflake。处理新数据时,目标端可以使用COPY命令或Snowpipe将数据加载到Snowflake。处理CDC数据时,目标端使用MERGE命令。

Snowflake目标端根据匹配的名称将数据从记录字段写入表列。当新字段和表引用出现在记录中时,目标端可以通过在Snowflake中创建新的列和表来补偿数据漂移。

在配置Snowflake目标端时,您可以指定Snowflake区域,帐户和连接信息以及用于写入Snowflake的连接数。您还可以根据需要定义其他Snowflake连接属性。

您可以配置要使用的Snowflake仓库,数据库,模式和表,还可以选择启用属性来处理数据漂移。您可以指定加载方法属性和阶段详细信息,还可以选择定义Amazon S3或Microsoft Azure的高级属性。

您可以配置行的根字段,以及要从记录中排除的任何第一级字段。您还可以配置目标端,以使用指定的默认值替换缺少的字段或包含无效数据类型的字段,并以指定的字符替换字符串字段中的换行符。您可以指定引用模式,定义引号和转义符,并配置目标端以修剪空格。

处理CDC数据时,可以为每个表指定主键列,也可以使目标端查询Snowflake获得该信息。

在使用Snowflake目标端之前,您必须安装Snowflake阶段库并完成其他先决任务。Snowflake阶段库是一个企业版阶段库,仅供开发用途免费。有关购买用于生产的阶段库的信息,请联系StreamSets。

注意:使用Snowflake目标时,StreamSets建议将Snowflake仓库配置为在收到新查询时自动恢复。

样例用例

以下是使用Snowflake目标端的几种常见方案:

复制数据库

假设您要复制写入Oracle数据库模式中五个表的数据。您想将现有数据和后续CDC数据都写入Snowflake。

为此,您创建了两个管道,一个用于加载现有数据,另一个用于处理后续数据,如下所示:

  • 第一个管道以复制数据 – 第一个管道使用多线程JDBC Multitable Consumer源从您要复制的表中读取。要利用Snowflake的批量加载功能,您可以将源端配置为使用非常大的批次大小——每批介于20,000至50,000条记录之间。您可以将线程数设置为五个,以同时读取所有五个表中的数据,并将连接池的大小增加到五个,以允许同时写入Snowflake中的五个表。

    在管道中,可以使用所需数量的处理器来处理数据。然后,您将Snowflake目标端配置为将数据加载到Snowflake。

    如果希望将数据写入Snowflake后立即可用,则可以使用默认的COPY命令加载方法。但是,由于可以忍受一点延迟,因此可以使用更快、更低消耗的Snowpipe来加载数据。使用Snowpipe需要在Snowflake中执行一些必备步骤。

    初始加载完成后,您将停止第一个管道并启动第二个管道以处理传入的CDC数据。

  • 第二条管道以处理CDC数据 – 在第二条管道中,您使用Oracle CDC Client源和Snowflake目标端。您也可以将此源端配置为使用非常大的批次大小,每批次介于20,000至50,000条记录之间。

    在目标中,选择“Use CDC”属性在写入Snowflake时执行CRUD操作。这将导致目标端使用MERGE命令将数据加载到Snowflake中。您可以在记录中指定一个字段,其中包含写入到Snowflake时要使用的表名,并为每个表定义键列,或配置目标端以查询Snowflake以获取键列。

    为了提高性能,您还增加了连接池的大小。有关更多信息,请参见性能优化。

从Hadoop卸载
假设您有一个要移入Snowflake的Hadoop数据湖。在这种情况下,您仅需要一个管道,其中包括多线程Hadoop FS Standalone源,所需的所有处理器以及Snowflake目标端。
在Snowflake目标端中,如果可以在写入数据后在数据可用性方面留一点延迟,则可以使用Snowpipe。但是Snowpipe只写已经存在的表。要允许目标端创建新表来补偿漂移数据,请使用默认的COPY命令加载数据。如前所述,您可以使用大的批次,多个线程和多个连接来优化管道性能。
由于数据湖中的数据可能不如典型的数据库数据那么结构化,因此请在目标端配置以下数据漂移和高级数据属性以平滑过渡:

  • 启用“Data Drift”属性,以允许在出现新字段时在表中创建新列。
  • 启用“Table Auto Create”属性以根据需要创建新表。
  • 为避免生成不必要的错误记录:
    • 使用“Ignore Missing Fields”属性将丢失的数据替换为默认值。
    • 使用“Ignore Fields with Invalid Types”属性将无效数据类型的数据替换为默认值。
    • 定义要用于每种Snowflake数据类型的默认值。
    • 用指定的字符替换字符串字段中的换行符。

先决条件

在配置Snowflake目标端之前,请完成以下先决条件:

  1. 安装Snowflake阶段库。
  2. 创建一个内部或外部Snowflake阶段。

    如果要在Snowflake内部用户界面上暂存数据,则可以跳过此步骤。

  3. 要使用Snowpipe,还需要完成Snowpipe的先决条件。

安装Snowflake阶段库

您必须先安装Snowflake阶段库,然后才能使用Snowflake目标端。Snowflake阶段库包括目标端用来访问Snowflake的Snowflake JDBC驱动程序。

Snowflake阶段库是一个企业版阶段库,仅供开发用途免费。有关购买用于生产的阶段库的信息,请联系StreamSets。

您可以使用Package Manager来对tarball安装的Data Collector安装企业版阶段库,也可以将其作为tarball,RPM或Cloudera Manager安装的Data Collector的定制阶段库。

支持的版本

下表列出了与特定的Data Collector版本一起使用的Snowflake Enterprise阶段库的版本:

Data Collector Version Supported Stage Library Version
Data Collector 3.8.x and later Snowflake Enterprise Library 1.0.1, 1.0.2, 1.1.0, 1.2.0, or 1.3.0
Data Collector 3.7.x Snowflake Enterprise Library 1.0.1

使用软件包管理器安装

您可以使用Package Manager在tarball安装的Data Collector中安装Snowflake阶段库。

  1. 单击包管理器图标:
  2. 在导航面板中,单击Enterprise Stage Libraries
  3. 选择Snowflake Enterprise Library,然后单击 Install图标:
  4. 阅读StreamSets 订阅服务条款。如果您同意,请选中复选框,然后单击“Install
    Data Collector将安装所选的阶段库。
  5. 重新启动Data Collector

作为自定义阶段库安装

您可以在tarball,RPM或Cloudera Manager安装的Data Collector中将Snowflake Enterprise阶段库安装为自定义阶段库。

  1. 要下载阶段库,请转到StreamSets下载企业连接器页面。
    该网页显示按发布日期组织的企业版阶段库,并在页面顶部显示最新版本。
  2. 单击您想要下载的企业版阶段库名称和版本。
  3. 在“Download Enterprise Connectors表单中,输入您的姓名和联系信息。
  4. 阅读StreamSets订阅服务条款。如果您同意,请接受服务条款,然后单击“Submit
    阶段库下载。
  5. 将企业版阶段库安装和管理为自定义阶段库。
    有关更多信息,请参见Custom Stage Libraries。

创建一个Snowflake阶段

在管道中使用目标端之前,必须创建一个Snowflake内部或外部阶段。

Snowflake目标端将CSV文件转储到Amazon S3或Microsoft Azure中的内部Snowflake阶段或外部阶段。然后,目标端将命令发送到Snowflake以处理已暂存的文件。

要使用外部阶段,请使用托管您的Snowflake仓库的云服务提供商来创建外部阶段。

根据需要创建以下Snowflake阶段之一:

Snowflake内部阶段
您可以在Snowflake内部用户阶段或命名阶段中暂存数据。不要使用内部表阶段。
默认情况下,为每个用户创建用户阶段。有关如何创建命名阶段的步骤,请参见Snowflake SQL命令参考文档中的CREATE STAGE。
您可以对用户阶段和命名阶段都使用默认的Snowflake配置。
有关Snowflake阶段的更多信息,请参见Snowflake文档。
Amazon S3外部阶段
要在Amazon S3外部阶段中暂存数据,请在托管Snowflake虚拟仓库的同一S3区域中的存储桶中创建Snowflake外部阶段。例如,如果您的Snowflake仓库在AWS US West内,则在AWS US West区域的存储桶中创建Snowflake外部平台。
创建Snowflake外部阶段时,可以指定一个URL,该URL定义阶段的名称和位置。在网址中使用斜杠以确保Snowflake加载所有暂存的数据。您还可以在阶段名称中包含前缀,以指示外部阶段用于Data Collector。

例如,以下URL在s3://mybucket/中创建一个名为sdc-externalstage的外部阶段,并将所有阶段数据加载到Snowflake:

s3://mybucket/sdc-externalstage/
您可以使用Snowflake Web界面或SQL创建S3阶段。有关更多信息,请参见Snowflake文档中的创建S3阶段。
Microsoft Azure外部阶段
要在Microsoft Azure外部阶段中暂存数据,请完成以下任务:

  1. 为要使用的Microsoft Azure Blob存储容器配置Snowflake身份验证。

    您可以使用SAS令牌或Azure帐户名称和密钥进行身份验证。有关配置SAS令牌身份验证的信息,请参阅Snowflake文档中的配置Azure容器以加载数据。

  2. 在容器中创建一个Snowflake外部阶段。

    创建Snowflake外部阶段时,可以指定一个URL,该URL定义阶段的名称和位置。在网址中使用斜杠以确保Snowflake加载所有暂存的数据。您还可以在阶段名称中包含前缀,以指示外部阶段用于Data Collector。

    例如,以下URL在azure://myaccount.blob.core.windows.net/mycontainer/load/中创建一个名为sdc-externalstage的外部阶段,并将所有阶段数据加载到Snowflake:

    azure://myaccount.blob.core.windows.net/mycontainer/load/sdc-externalstage/

    您可以使用Snowflake Web界面或SQL创建一个Azure阶段。有关更多信息,请参见Snowflake文档中的创建Azure阶段。

AWS凭证

当Snowflake目标端在Amazon S3上暂存数据时,它必须将凭证传递给Amazon Web Services。

使用以下方法之一来传递AWS凭证:

IAM角色
当Data Collector在Amazon EC2实例上运行时,您可以使用AWS管理控制台为EC2实例配置IAM角色。Data Collector使用IAM实例配置文件凭证自动连接到AWS。
若要使用IAM角色,在目标端中选择“Use IAM Role属性。
有关将IAM角色分配给EC2实例的更多信息,请参阅Amazon EC2文档。
AWS访问密钥对

当Snowflake目标端使用AWS访问密钥对访问AWS时,您必须在目标端中指定访问密钥ID秘密访问密钥属性。

提示:为了保护敏感信息,例如访问密钥对,可以使用运行时资源或凭据存储。
注意:要在Amazon S3上暂存数据,您使用的角色或访问密钥对必须具有写入Amazon S3所需的权限,包括s3:GetBucketLocation和s3:PutObject。

COPY先决条件

处理新数据时,可以配置目标端以使用COPY命令将数据加载到Snowflake表。

使用COPY命令加载数据需要具有以下一组访问特权之一的角色:

  • 使用内部Snowflake阶段时所需的特权:
    Object Type Privilege
    Internal Snowflake stage READ, WRITE
    Table SELECT, INSERT
  • 使用外部阶段时所需的特权:
    Object Type Privilege
    External stage USAGE
    Table SELECT, INSERT

如有必要,请在Snowflake中创建一个自定义角色,然后为该角色授予所需的访问权限。然后在Snowflake中,将自定义角色分配为目标中配置的Snowflake用户帐户的默认角色,或者将自定义角色定义为目标的其他Snowflake连接属性。

有关为目标端定义角色的更多信息,请参见定义角色。

Snowpipe先决条件

在处理新数据时,可以使用Snowpipe连续提取引擎Snowpipe将数据加载到Snowflake表中。您不能使用Snowpipe处理CDC数据。

使用Snowpipe之前,请完成以下先决条件:

  1. 在Snowflake中,为Snowpipe创建一个管道以用于加载数据。

    有关创建管道的更多信息,请参见Snowflake文档

  2. 在Snowflake中,生成私钥PEM和公钥PEM。

    有关密钥对身份验证的详细信息,请参阅Snowflake文档。您无需按照步骤5中所述生成JSON Web令牌(JWT)。

  3. 在Snowflake中,将公用密钥分配给在阶段中配置的Snowflake用户帐户。

    您可以使用Snowflake控制台或ALTER USER命令。

  4. (可选)要保护私钥PEM和密码,请使用运行时资源或凭据存储。
  5. 配置目标端时,请输入私钥PEM和密码以及公钥PEM。或者,使用运行时资源或凭据存储功能来引用信息。

MERGE先决条件

处理CDC数据时,可以配置目标端以使用MERGE命令将数据加载到Snowflake表。

使用MERGE命令加载数据需要具有以下一组访问特权之一的角色:

  • 使用内部Snowflake阶段时所需的特权:
    Object Type Privilege
    Internal Snowflake stage READ, WRITE
    Table SELECT, INSERT, UPDATE, DELETE
  • 使用外部阶段时所需的特权:
    Object Type Privilege
    External stage USAGE
    Table SELECT, INSERT, UPDATE, DELETE

如有必要,请在Snowflake中创建一个自定义角色,然后为该角色授予所需的访问权限。然后在Snowflake中,将自定义角色分配为目标端中配置的Snowflake用户帐户的默认角色,或者将自定义角色定义为目标的其他Snowflake连接属性。

有关为目标定义角色的更多信息,请参见定义角色。

加载方式

Snowflake目标端可以使用以下方法将数据加载到Snowflake:

COPY命令以获取新数据
COPY命令是默认的加载方法,它将对Snowflake执行批量同步加载,将所有记录视为INSERTS。使用此方法将新数据写入Snowflake表。
COPY命令提供对写入数据的实时访问。但是,这确实会产生Snowflake仓库使用费,目前这笔费用会四舍五入到小时。使用推荐的准则来优化性能和成本效益。
由于COPY命令是默认的加载方法,因此您无需配置Snowflake目标端即可使用此命令。
Snowpipe获取新数据
Snowpipe是Snowflake连续提取服务,它对Snowflake执行异步加载,将所有记录视为INSERTS。使用此方法将新数据写入Snowflake表。需要时,您可以配置目标端以使用自定义Snowflake端点。
Snowpipe提供了稍微延迟的数据访问权限,通常不到一分钟。Snowpipe仅对用于执行写入操作的资源收取Snowflake费用。
使用Snowpipe之前,请执行必备步骤。另外,请使用推荐的准则来优化性能和成本效益。
要使用Snowpipe加载新数据,请在目标的Snowflake选项卡上启用Use Snowpipe属性。然后,在“Snowpipe”选项卡上配置属性。
CDC数据的MERGE命令
像COPY命令一样,MERGE命令对Snowflake执行批量同步加载。但是,不是将所有记录都视为INSERT,而是适当地插入,更新和删除记录。使用此方法通过CRUD操作将变更数据捕获(CDC)数据写入Snowflake表。
就像COPY命令一样,MERGE命令提供对写入数据的实时访问。而且,它会产生Snowflake仓库的使用费,目前这笔费用被舍入为小时。
使用推荐的准则来优化性能和成本效益。
重要:为了保持数据的原始顺序,在处理CDC数据时不要使用多线程或集群执行模式。
要使用MERGE命令加载CDC数据,请在目标的“Data”选项卡上选择“CDC Data”属性。然后,输入“Snowflake”列以用作键列。

有关Snowpipe或COPY或MERGE命令的更多信息,请参见Snowflake文档。

定义角色

Snowflake目标需要一个Snowflake角色,该角色授予使用配置的load方法加载数据所需的所有特权。每个加载方法都需要一组不同的特权。

在配置目标之前,请确保您已授予Snowflake角色所需的特权,如前提条件中所述。

如果使用所需的特权创建自定义角色,请通过以下方式之一定义目标使用的角色:

将自定义角色分配为用户的默认角色
在Snowflake中,将自定义角色分配为目标中配置的Snowflake用户帐户的默认角色。Snowflake用户帐户与一个默认角色相关联。
用自定义角色替代用户的默认角色
Data Collector中,配置Snowflake目标端时,将自定义角色的名称指定为附加的Snowflake连接属性。在附加连接属性中指定的自定义角色将覆盖分配给Snowflake用户帐户的默认角色。
例如,您可能在Snowflake中为特定数据源定义了自定义角色,然后仅在使用Snowflake目标端加载数据时才想指定这些自定义角色。
在“Snowflake Connection Info”选项卡上,添加其他连接属性,然后按如下所示定义自定义角色:

  • Name-输入role
  • Value-输入自定义角色的名称。例如,您可以输入copy_load

性能优化

使用Snowflake目标端时,请使用以下技巧来优化性能和成本效益:

增加批次大小
最大批次大小由管道中的源端决定,通常默认值为1,000条记录。要利用Snowflake的批量加载功能,请将管道源端中的最大批次大小增加到20,000-50,000条记录。确保根据需要增加Data Collector的Java堆大小。
重要:强烈建议增加批次大小。使用默认的批处理大小可能会很慢且成本很高。
使用多个线程
使用Snowpipe或COPY命令写入Snowflake时,在管道中包含多线程源时,可以使用多个线程来提高性能。当Data Collector资源允许时,使用多个线程可以并发处理多批数据。
与增加批次大小一样,在使用多个线程时,应确保适当地设置了Data Collector Java堆大小。
注意:不要使用多个线程通过MERGE命令将CDC数据写入Snowflake。使用多个线程处理数据时,不会保留数据的原始顺序。
启用与Snowflake的其他连接
使用COPY或MERGE命令写入多个Snowflake表时,请增加Snowflake目标端与Snowflake的连接数。每个其他连接都允许目标端同时写入另一个表。
例如,当仅通过一个连接写入10个表时,目标端一次只能写入一个表。通过5个连接,目标端可以一次写入5个表。10个连接允许同时写入所有10个表。

默认情况下,目标端对标准单线程管道使用一个连接。在多线程管道中,目标端与管道使用的线程数匹配。也就是说,将多线程源配置为使用最多3个线程时,默认情况下,Snowflake目标使用3个连接来写入Snowflake,每个线程一个。

请注意,连接数是针对整个管道的,而不是针对每个线程的。因此,当使用多个线程写入多个表时,您还可以通过分配其他连接来提高性能。例如,当使用3个线程写入3个表时,可以将连接数增加到9,以实现最大吞吐量。

使用“Connection Pool Size”属性可以指定Snowflake目标端可使用的最大连接数。使用COPY或MERGE命令写入Snowflake时,请使用此属性。使用Snowpipe时,增加连接数不会提高性能。

行生成

将记录写到表时,默认情况下,Snowflake目标端包含结果行中的所有记录字段。目标端使用根字段 /作为结果行的基础。

您可以配置“Row Field”属性,以指定记录中的映射或列表映射字段作为该行的基础。结果记录仅包含来自指定映射或列表映射字段的数据,而排除所有其他记录数据。当您要写入Snowflake的数据存在于记录中的单个映射或列表映射字段中时,请使用此功能。

如果要使用根字段,但不想在结果行中包括所有字段,则可以将目标端配置为忽略所有指定的第一级字段。

Snowflake目标端会将指定根字段内的所有映射或列表映射字段转换为Snowflake Variant数据类型。Snowflake目标端完全支持Variant数据类型。

默认情况下,缺少字段或字段中数据类型无效的记录将被视为错误记录。您可以配置目标端,以用用户定义的默认值替换缺少的字段和无效类型的数据。然后,指定用于每种数据类型的默认值。您还可以配置目标端,以使用替换字符替换字符串字段中的换行符。

写入多个表

您可以使用Snowflake目标端写入Snowflake模式中的多个表。要写入多个表,请在记录中指定一个字段,该字段指定要写入的表。

例如,假设您有以公司部门(例如Operations, Sales, 和Marketing)命名的Snowflake表。此外,正在处理的记录具有一个带匹配值的dept字段。您可以通过“Table”属性中输入下面的表达式配置Snowflake目标端写入记录到不同的表:${record:value('/dept')}

使用COPY或MERGE命令加载数据时,可以将Snowflake目标端配置为在指定字段中出现新值时自动创建表。例如,如果该dept字段突然包括 Engineering部门,则目标端可以在Snowflake中为新数据创建一个新的Engineering表。有关更多信息,请参见创建数据漂移的列和表。

使用命令写入多个Snowflake表时,您可能还会增加目标端用于写入的连接数。有关更多信息,请参见性能优化。

创建用于数据漂移的列和表

您可以将Snowflake目标端配置为自动补偿列或表要求的变化,也称为数据漂移。

启用数据漂移后,当新字段出现在记录中时,Snowflake目标将在Snowflake表中创建新列。例如,如果一条记录突然包含一个新Address2字段,则目标将Address2在目标表中创建一个新列。

默认情况下,目标端根据新字段中的数据创建新列,例如为十进制数据创建一个Double列。但是,您可以配置目标端以将所有新列创建为Varchar。

启用数据漂移后,您还可以配置目标以根据需要创建新表。例如,假设目标端根据Region字段中的区域名称将数据写入表中。当记录中出现新SW-3区域时,目标端将SW-3在Snowflake中创建一个新表并将该记录写入新表中。

您可以使用此功能在空的Snowflake数据库模式中创建所有必要的表。

注意:由于Snowflake的限制,使用Snowpipe将数据加载到Snowflake时,目标端无法创建表。仅当使用COPY或MERGE命令加载数据时,目标端才能创建表。

要启用自动创建新列的功能,请在“Snowflake”选项卡上选择“Data Drift Enabled”属性。然后,要启用新表的创建,请选择“Table Auto Create”属性。

生成的数据类型

在创建新表或在现有表中创建新列时,Snowflake目标端使用字段名称生成新列名称。

您可以配置目标端以将所有新列创建为Varchar。但是,默认情况下,Snowflake目标端创建的列如下:

Record Field Data Type Snowflake Column Data Type
Byte Array Binary
Char Char
String Varchar
Byte, Integer, Long, Short Number
Decimal, Double, Float Double
Boolean Boolean
Date Date
Datetime Timestampntz
Time Time
Zoned Datetime Timestamptz
Map, List-Map Variant

Snowflake目标端完全支持Variant数据类型。

定义CRUD操作

将目标端配置为处理CDC数据时,Snowflake目标端可以插入,更新或删除数据。处理CDC数据时,目标端使用MERGE命令将数据写入Snowflake。

写入数据时,Snowflake目标端使用sdc.operation.type记录头属性中指定的CRUD操作。目标端根据以下数值执行操作:

  • 1为INSERT
  • 2为DELETE
  • 3为UPDATE

如果您的管道包括启用CRUD的数据源以处理变更数据,则目标端仅从源端生成的sdc.operation.type头属性中读取操作类型 。如果您的管道使用非CDC源端,则可以使用Expression Evaluator或脚本处理器来定义记录头属性。有关Data Collector变更数据处理以及启用CDC的源端的列表的详细信息 ,请参阅处理变更数据。

配置Snowflake目标端

配置Snowflake目标端以将数据写入Snowflake表。在管道中使用目标之前,请完成所需的先决条件。

  1. 在“Properties”面板的“General选项卡上,配置以下属性:
    一般属性 描述
    Name 阶段名。
    Description 可选说明。
    Required Fields 必须包含用于将记录传递到阶段的记录的数据的字段。

    提示:您可能包括阶段使用的字段。

    根据为管道配置的错误处理,处理不包括所有必填字段的记录。

    Preconditions 必须评估为TRUE的条件才能使记录进入处理阶段。单击 添加以创建其他前提条件。

    根据为该阶段配置的错误处理,处理不满足所有前提条件的记录。

    On Record Error 该阶段的错误记录处理:

    • Discard-放弃记录。
    • To Error-将记录发送到管道以进行错误处理。
    • Stop Pipeline-停止管道。
  2. 在“Snowflake Connection Info选项卡上,配置以下属性:
    Snowflake连接属性 描述
    Snowflake Region Snowflake仓库所在的区域。选择以下区域之一:

    • AWS US West
    • AWS US East
    • AWS EU (Dublin)
    • AWS EU (Frankfurt)
    • AWS Asia Pacific (Sydney)
    • Microsoft Azure East US 2
    • Microsoft Azure West Europe
    • Other-用于指定上面未列出的Snowflake区域。
    • Custom JDBC URL-用于指定Virtual Private Snowflake。
    Custom Snowflake Region 自定义Snowflake区域。在将Other用作Snowflake区域时可用。
    Virtual Private Snowflake URL

    使用虚拟私有Snowflake安装时要使用的自定义JDBC URL。

    Account Snowflake帐户名称。
    User Snowflake用户名。
    Password Snowflake密码。
    Connection Pool Size 目标端用来写入Snowflake的最大连接数。默认值为0,以确保目标端使用与管道使用的线程相同数量的连接。

    使用COPY或MERGE命令写入多个表时,增加此属性可以提高性能。

    Connection Properties 要使用的其他Snowflake连接属性。

    要添加属性,请单击“添加”并定义属性名称和值。使用Snowflake期望的属性名称和值。

  3. 在“Snowflake选项卡上,配置以下属性:
    Snowflake 描述
    Warehouse Snowflake仓库。
    Database Snowflake数据库。
    Schema Snowflake模式。
    Table 要写入的Snowflake表。要写入单个表,请输入表名称。要写入多个表,请输入一个表达式,其结果为包含表名的记录中的字段。

    例如: ${record:value('/table')}

    或者,要基于jdbc.table由JDBC Multitable Consumer起源定义的记录头属性中的表名写表 ,可以使用以下表达式: ${record:attribute('jdbc.tables')}

    Use Snowpipe 启用使用Snowpipe写入Snowflake。仅在处理新数据时使用。您不能使用Snowpipe来加载CDC数据或自动创建表时。

    启用S​​nowpipe之前,请执行必要的先决条件。有关Snowpipe和其他加载方法的更多信息,请参见加载方法。有关优化管道性能的信息,请参见性能优化。

    Upper Case Schema & Field Names 将所有模式,表和字段名称转换为所有大写字母。启用后,目标端创建的任何新表或字段也将使用所有大写字母。
    Data Drift Enabled 当新字段出现在记录中时,使目标端能够在Snowflake表中创建新列。
    Table Auto Create 在需要时启用自动创建表的功能。启用“Data Drift”属性且禁用“Use Snowpipe”属性时可用。

    由于Snowpipe只能写入新表,因此使用Snowpipe加载数据时,目标不会创建新表。

    如果未显示此属性,请清除“Use Snowpipe”属性。

    Create New Columns as Varchar 使目标端能够将所有新列创建为Varchar。默认情况下,目标端根据字段中的数据类型创建新列。
  4. 使用Snowpipe加载数据时,在Snowpipe选项卡上,配置以下属性:
    Snowpipe属性 描述
    Pipe 使用Snowpipe将数据加载到Snowflake时使用的管道。

    仅在处理新数据时使用Snowpipe。在配置这些Snowpipe属性之前,请确保完成所有Snowpipe必备任务。

    Private Key PEM 私钥PEM。在Snowflake中生成,作为Snowpipe先决条件任务的一部分。

    为了保护敏感信息,可以使用运行时资源或凭据存储。

    Private Key Password 私钥密码。在Snowflake中生成,作为Snowpipe先决条件任务的一部分。
    Public Key PEM 公钥PEM。在Snowflake中生成,作为 Snowpipe先决条件任务的一部分。
    Use Custom Snowpipe Endpoint 启用使用自定义Snowpipe端点。
    Custom Snowpipe Protocol 自定义Snowpipe端点的协议:

    • HTTP
    • HTTPS
    Custom Snowpipe Host 自定义Snowpipe端点的主机名。
    Custom Snowpipe Port 自定义Snowpipe端点的端口号。
  5. 在“Staging选项卡上,配置以下属性:
    阶段属性 描述
    Stage Location Snowflake阶段的位置:

    • Amazon S3
    • Azure Blob Storage
    • Snowflake Internal Stage

    此属性配置确定在此选项卡和“登台高级”选项卡上显示的属性。

    Stage Database Snowflake阶段的可选数据库。当阶段位于与Snowflake表不同的数据库中时,请配置此属性。

    如果未定义,目标将使用在“Snowflake”选项卡上为Snowflake表定义的数据库。

    Stage Schema Snowflake阶段的可选模式。当阶段位于与Snowflake表不同的模式中时,请配置此属性。

    如果未定义,那么目标将使用在“Snowflake”选项卡上为Snowflake表定义的模式。

    Snowflake Stage Name 用于暂存数据的Snowflake暂存器的名称。

    除非使用Snowflake内部用户阶段,否则您将此阶段创建为Snowflake前提任务的一部分。

    要使用Snowflake内部用户界面,请输入波浪号(~)。

    Purge Stage File After Ingesting 将阶段文件的数据写入Snowflake后,将其删除。使用Snowpipe写入Snowflake时请勿使用。
    Use IAM Role 启用使用IAM角色写入Amazon S3上的外部阶段。仅当Data Collector在Amazon EC2实例上运行时使用。
    AWS Access Key ID

    AWS访问密钥ID。

    不使用IAM角色写入Amazon S3上的外部阶段时需要。

    AWS Secret Key ID

    AWS秘密访问密钥。

    不使用IAM角色写入Amazon S3上的外部阶段时需要。

    S3 Stage File Name Prefix 外部阶段名称的可选前缀。
    S3 Compressed File 在将文件写入S3之前启用压缩功能。保持此选项为最佳性能。
    Azure Authentication 用于连接到Azure的身份验证类型:

    • Account Name and Key
    • SAS Token
    Azure Account Name Azure帐户名称。
    Azure Account Key Azure帐户密钥。

    仅用于帐户名和密钥验证。

    为了保护敏感信息, 可以使用运行时资源或凭据存储。

    Azure SAS Token Azure SAS令牌。

    仅用于SAS令牌认证。

    为了保护敏感信息, 可以使用运行时资源或凭据存储。

    Azure Stage File Name Prefix 外部阶段名称的可选前缀。
    Azure Compressed File 启用压缩文件,然后再将其写入Azure。保持此选项为最佳性能。
  6. 使用Snowflake外部阶段时,在“Staging Advanced选项卡上,配置以下属性。
    此选项卡根据外部阶段的位置显示不同的属性。

    在Amazon S3中使用外部平台时,您可以配置以下属性:

    Amazon S3高级属性 描述
    S3 Connection Timeout 关闭连接之前等待响应的秒数。

    默认值为10秒。

    S3 Socket Timeout 等待查询响应的秒数。
    S3 Max Error Retry 重试请求的最大次数。
    S3 Uploading Threads 并行上传的线程池的大小。在写入多个分区并在多个部分中写入大型对象时使用。

    当写入多个分区时,将此属性设置为要写入的分区数可以提高性能。

    有关此属性和以下属性的更多信息,请参阅Amazon S3 TransferManager文档。

    S3 Minimum Upload Part Size (MB) 分段上传的最小分段大小(以字节为单位)。
    S3 Multipart Upload Threshold (MB) 目标端使用分段上传的最小批处理大小(以字节为单位)。
    S3 Proxy Enabled 指定是否使用代理进行连接。
    S3 Proxy Host 代理主机。
    S3 Proxy Port 代理端口。
    S3 Proxy Authentication Enabled 指示使用代理身份验证。
    S3 Proxy User S3代理用户。
    S3 Proxy Password S3代理密码。
    S3 Encryption Amazon S3用于管理加密密钥的选项:

    • None
    • SSE-S3 – 使用Amazon S3托管密钥。
    • SSE-KMS – 使用Amazon Web Services KMS管理的密钥。

    默认为None。

    S3 Encryption KMS ID AWS KMS主加密密钥的Amazon资源名称(ARN)。使用以下格式:

    <arn>:<aws>:<kms>:<region>:<acct ID>:<key>/<key ID>

    仅用于SSE-KMS加密。

    S3 Encryption Context 用于加密上下文的键值对。单击添加以添加键值对。

    仅用于SSE-KMS加密。

    在Azure中使用外部阶段时,可以配置以下属性:

    Azure高级属性 描述
    Use Custom Blob Service URL 启用使用自定义Azure Blob存储URL。
    Custom Blob Service URL 自定义Azure Blob存储URL。通常使用以下格式:

    https://<Azure Account>.blob.core.windows.net
    Azure Encryption 此时启用使用Azure默认加密。
  7. 在“Data选项卡上,配置以下属性:
    数据属性 描述
    Row Field Map或list-map字段用作生成的row的基础。默认值为/,其中包括结果行中的所有记录字段。
    Column Fields to Ignore 写入目标端时要忽略的字段列表。您可以输入以逗号分隔的第一级字段列表,以将其忽略。
    Null Value 用于表示空值的字符。

    默认 \N值为,Snowflake的空值字符。

    CDC Data 启用执行CRUD操作并使用MERGE命令写入Snowflake表的功能。选择以处理CDC数据。

    使用Snowpipe写入数据时无法使用。

    重要:为了保持数据的原始顺序,在处理CDC数据时不要使用多线程或群集执行模式。

    有关MERGE命令和其他加载方法的更多信息,请参见加载方法。有关优化管道性能的信息,请参见性能优化。

    Get Primary Key Information from Snowflake 查询Snowflake以获取每个表的主键列。仅在Snowflake表中定义了主键列时使用。

    知道键列后,手动输入它们比查询Snowflake更有效。

    Table Key Columns 每个Snowflake表使用的键列。单击添加图标以添加其他表。

    单击“Key Column字段中的“添加”图标可为表添加其他键列。

  8. 在“Data Advanced选项卡上,配置以下属性:
    数据高级属性 描述
    Snowflake File Format 允许使用自定义Snowflake CSV文件格式。除非StreamSets客户支持建议,否则不应使用。
    Ignore Missing Fields 允许将缺少字段的记录写入Snowflake表。将指定的默认值用于缺少字段的数据类型。

    如果未启用,则缺少字段的记录将被视为错误记录。

    Ignore Fields with Invalid Types 允许将包含无效类型数据的字段替换为该数据类型的指定默认值。

    如果未启用,则具有无效类型数据的记录将被视为错误记录。

    Boolean Default 当替换缺少的布尔字段或带无效数据的布尔字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Char Default 当替换缺少的Char字段或带无效数据的Char字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Number Default 当替换缺少的Number字段或带无效数据的Number字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Double Default 当替换缺少的Double字段或带无效数据的Double字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Date Default 当替换缺少的Date字段或带无效数据的Date字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Timestampntz Default 当替换缺少的Timestampntz字段或带无效数据的Timestampntz字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Timestamptz Default 当替换缺少的Timestamptz字段或带无效数据的Timestamptz字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Time Default 当替换缺少的Time字段或带无效数据的Time字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Varchar Default 当替换缺少的Varchar字段或带无效数据的Varchar字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Binary Default 当替换缺少的Binary字段或带无效数据的Binary字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Variant Default 当替换缺少的Variant字段或带无效数据的Variant字段时使用的默认值。

    默认值为\ N,代表Snowflake中的空值。

    Replace Newlines 用指定的替换字符替换字符串字段中的换行符。
    New Line Replacement Character 用于替换换行符的字符。
    Column Separator 用作列分隔符的字符。
    Quoting Mode 处理数据中特殊字符的模式,例如列分隔符和换行符:

    • Quoted-用指定的引号将每个字段中的数据括起来。

      以下示例使用星号将数据括在字段中:

      *string data, more string data*
    • Escaped-在特殊字符前加上指定的转义字符。

      下面的示例使用反引号对字段中的逗号列分隔符进行转义:

      string data`, more string data
    Quote Character 包含字段数据的字符。

    使用报价模式时可用。

    Escape Character 字段数据中特殊字符之前的字符。

    使用退出模式时可用。

    Trim Spaces 修剪字段数据中的前导和尾随空格。