Parquet案例研究

假设您有要写入Hive中Parquet表的数据库数据。您想根据源端的国家将数据写入不同的Parquet表。您预计不会有很多模式更改,但是希望它在发生时自动进行处理。

为此,您将从JDBC Query Consumer开始将数据读入管道。您将源端连接到Hive Metadata处理器,并配置用于定义每条记录写入Parquet表的相应数据库、表和分区的表达式。Hive Metadata处理器使用此信息来评估记录并生成记录头属性,数据处理目标端将其用于写入数据。它还使用该信息来生成元数据记录,Hive Metastore目标端将根据该记录来创建和更新表。

您将Hive Metadata处理器数据输出流连接到Hadoop FS目标端,并将其配置为使用记录头中的信息。然后,目标端使用记录头中的目标目录和模式信息写入每个记录,并在模式更改时滚动文件。然后,您将目标端配置为生成事件,以便每次关闭文件时都生成事件。

您将Hive Metadata处理器元数据输出流连接到Hive Metastore目标端。目标端从Hive Metadata处理器接收到元数据记录后,将根据需要创建或更新Parquet表。

最后,将MapReduce执行程序连接到Hadoop FS目标端的事件流,并将执行程序配置为使用阶段中可用的Convert Avro to Parquet作业。因此,每次执行程序从Hadoop FS目标端接收到事件时,执行程序都会处理关闭的Avro文件并将其转换为Parquet,并将其写入更新的Parquet表中。

现在让我们仔细看看…

JDBC Query Consumer

配置源端时,可以像配置任何普通管道一样配置它。指定要使用的连接字符串,要使用的查询和偏移列以及查询间隔。如果需要所有现有数据,请省略初始偏移量。使用默认的增量模式可以避免在源端运行下一个查询时重新查询整个表。

使用源端处理小数数据时,请确保源端创建JDBC记录头属性。创建记录头属性时,源端包括记录头属性中每个小数字段的精度和小数位数。这使Hive Metadata处理器可以轻松确定小数数据的原始精度和小数位数。

您也可以在Hive Metadata处理器中输入常量,以用于记录中所有小数字段的精度和小数位数,但可以使用JDBC记录头属性来使用特定于字段的值。默认情况下,源端将创建头属性。

这是源端的JDBC记录头属性属性:

Hive Metadata处理器

将JDBC Query Consumer源连接到Hive Metadata处理器。在配置处理器时,除了基本的连接详细信息之外,还需要考虑一些其他事项:

  1. 记录应写入哪个数据库?

    Hadoop FS会进行写操作,但是处理器需要知道记录应该去哪里。让我们写到Hive default数据库。为此,您可以将数据库属性保留为空。

  2. 记录应写入哪些表?
    您可以通过对表名属性进行硬编码来将所有数据写入单个表。但是,由于您希望根据源端的Country将数据写入不同的表,因此,我们使用表达式从“Country”字段中提取表名称,如下所示:

    ${record:value('/Country')}
  3. 您要使用哪个分区?
    让我们使用表达式中的datetime变量为每日分区创建一个dt分区列,如下所示:

    ${YYYY()}-${MM()}-${DD()}
  4. 您想如何配置小数字段的精度和小数位表达式?
    由于您具有JDBC Query Consumer生成记录头属性,因此可以在处理器中使用默认表达式:

    ${record:attribute(str:concat(str:concat('jdbc.', field:field()), '.scale'))}
    ${record:attribute(str:concat(str:concat('jdbc.', field:field()), '.precision'))}

    通过这些表达式,处理器使用JDBC Query Consumer为记录中的每个小数字段编写的记录头属性的精度和小数位数。

  5. 正在处理哪种类型的数据?

    在“Data Format”选项卡上,选择Parquet数据格式。

此时,管道如下所示:

在处理记录时,Hive Metadata处理器使用配置详细信息来评估记录。它使用表的记录中列出的国家以及该分区记录的处理时间,为每个记录生成targetDirectory头属性。

当记录包含模式更改时,处理器会将新模式写入avroSchema头属性,并将roll头属性添加到记录。它还会为Hive Metastore目标端生成一个元数据记录。这些操作的组合使Hive Metastore目标端可以根据需要更新Parquet表,并且使Hadoop FS目标端可以将具有模式漂移的文件写入更新后的表。

请记住,对于Parquet数据,处理器会将.avro添加到为每个记录生成的目标目录中。这样,数据处理目标端就可以将Avro文件写入到Hive忽略的临时目录中。

结果,目标端将文件写入以下目录:<generated directory>/.avro

Hive Metastore目标端

现在要处理元数据记录——并在Hive中自动创建和更新Parquet表——您需要Hive Metastore目标端。

将目标端连接到处理器的第二个输出流并配置目标端。配置此目标端很容易——只需配置Hive连接信息,并可以选择配置一些高级选项。

目标端以与处理器相同的方式连接到Hive,因此您可以重用该连接信息。“Advanced”选项卡包括一些仅适用于Avro数据的属性,以及“Max Cache Size”属性以限制Hive Metastore使用的缓存大小。默认情况下,缓存大小是无限的,所以让它保持这种状态。

现在,Hive Metastore目标端的魅力在于:当目标端获得元数据记录,表明您需要为新国家创建新表时,它将创建一个包含所有必要列的新Parquet表,以便您可以编写记录(触发该元数据记录)到表中。

并且,如果要转到表的记录的结构发生了变化(例如添加了几个新字段),那么目标端将更新表,以便可以将记录写入表中。 目标端在生成表时使用Stored as Parquet子句,因此不需要为每次更改都生成新的模式。

这是管道当前的外观:

这涵盖了元数据,但是数据呢?

数据处理目标端

要使用记录头属性将数据写入Hive,可以使用Hadoop FS或MapR FS目标端。我们将使用Hadoop FS目标端。

要将Avro文件写入HDFS,请将Hadoop FS目标端连接到Hive Metadata处理器的数据输出流。

首先,在“General”选项卡上,启用目标端以生成事件,如下所示:

现在,目标端每次关闭输出文件时都会生成一个事件。如Hadoop FS文档的“事件记录”部分所述,事件记录包括关闭文件的文件路径和文件名。MapReduce执行器将使用此信息将Avro文件转换为Parquet。

配置目标端时,可以配置目标端以使用记录头中的目录,而不是配置目录模板。当目标端在记录头中看到roll属性时,配置目标端以滚动文件;在配置Avro属性时,指示该模式在记录头中。

目标端的“Output Files”选项卡可能看起来像这样:

通过此配置,目标端将使用记录头属性中的信息将数据写入HDFS。它将使用avroSchema头属性中的Avro模式将每个记录写入targetDirectory头属性中的目录。当它在记录头中发现roll属性时,或者达到目标端中配置的其他文件关闭限制时,它将关闭文件。并且每次关闭文件时都会生成一个事件。

提示:在将Avro文件转换为Parquet之前,数据无法用于Hive。如果要快速转换数据,请配置一个或多个文件关闭属性以确保文件定期滚动:“Max Records in File”,“Max File Size”或“Idle Timeout”。

MapReduce执行器

要转换由Hadoop FS目标端生成的Avro文件,请使用MapReduce执行器中的Convert Avro to Parquet作业。与所有执行器一样,MapReduce执行器在被事件触发时执行任务。在这种情况下,它将是由Hadoop FS目标端生成的文件关闭事件。

将Hadoop FS事件输出流连接到MapReduce执行程序。除了所需的配置详细信息之外,选择“Convert Avro to Parquet”作业类型,然后配置以下Avro转换详细信息:

  • 输入Avro文件-使用此属性的默认表达式。默认情况下,执行器使用事件记录的filepath字段中指定的目录和文件名。文件将位于.avro目录中,但此信息将在事件记录中正确记录。
  • 保留Avro输入文件-如果要保留原始Avro文件,请选择此选项。默认情况下,执行器将原始文件成功转换为Parquet后将其删除。
  • 输出目录-要将Parquet文件写到期望数据的原始目录中,而不是.avro目录中,请使用以下表达式:
    ${file:parentPath(file:parentPath(record:value('/filepath')))}

    该 file:parentPath函数返回不带最终分隔符的文件路径。因此,此表达式从文件路径中删除/.avro/<filename>。

    例如,如果原始文件路径为:/sales/countries/nz/.avro/sdc-file,则file:parentPath 返回以下输出路径:/sales/countries/nz。

需要时,您可以在Avro到Parquet选项卡上配置其他Parquet属性,例如要使用的压缩编码解码器或页面大小。

这是管道和MapReduce执行程序配置:

有关数据流触发器和事件框架的更多信息,请参见数据流触发器概述。

处理Parquet数据

管道运行时,将发生以下操作:

  • Hive Metadata处理器使用记录中的国家为targetDirectory头属性创建输出目录,来评估每个记录。

  • 当记录包含模式更改时,处理器会将新模式写入avroSchema头属性,并将roll头属性添加到记录。它还会为Hive Metastore目标端生成一个元数据记录。这些操作的组合使Hive Metastore目标端可以根据需要更新Parquet表,并且使Hadoop FS目标端可以将具有模式漂移的文件写入更新后的表。

  • 当Hive Metastore目标端接收到元数据记录时,它将相应地更新Hive Metastore,从而创建或更新Parquet表。
  • Hadoop FS目标端根据targetDirectory头中的目录将记录写入文件,并根据roll头属性和在此阶段配置的任何其他文件关闭属性关闭文件。
  • Hadoop FS目标端关闭文件时,它将事件记录发送到MapReduce执行器,触发执行程序启动“Convert Avro to Parquet”作业。MapReduce执行器不监视作业。
  • 作业完成后,Parquet数据可用于Hive。