Avro案例研究

假设您有一个Data Collector管道,该管道将Avro日志数据写入Kafka。管道中的File Tail源处理来自多个不同Web服务的数据,并使用“tag”头属性标记每个记录,该头属性标识生成数据的服务。

现在,您想要一个新的管道将数据传递到HDFS,可以在其中存储和查看数据,并且您希望将数据写入基于生成数据的Web服务的表中。请注意,您也可以将数据写入MapR FS——步骤几乎与本案例研究相同,只是使用了另一个目标端。

为此,添加并配置Kafka Consumer以将数据读入管道,然后将其连接到Hive Metadata处理器。处理器评估记录结构,并生成描述任何所需的Hive Metadata更改的元数据记录。使用标签头属性和其他用户定义的表达式,Hive Metadata处理器可以确定要用于目标目录的数据库,表和分区,并将该信息与Avro模式一起写入记录头,如需要时包括文件滚动指示符。

您将Hive Metadata处理器元数据输出流连接到Hive Metastore目标端。目标端从Hive Metadata处理器接收到元数据记录后,将根据需要创建或更新Hive表。

您将Hive Metadata处理器数据输出流连接到Hadoop FS目标端,并将其配置为使用记录头中的信息。然后,目标端使用记录头中的目标目录和Avro模式将每个记录移至要到达的位置,并在需要时滚动文件。

现在让我们仔细看看…

Hive Metadata处理器

您设置了Kafka Consumer,并将其连接到Hive Metadata处理器。在配置处理器时,除了基本的连接详细信息之外,还需要考虑一些其他事项:

  1. 记录应写入哪个数据库?

    Hadoop FS会进行写操作,但是处理器需要知道记录应该去哪里。

    让我们写到Hive default数据库。为此,您可以将数据库属性保留为空。

  2. 记录应写入哪些表?
    向Kafka提供数据的管道使用“tag”头属性来指示原始Web服务。要使用tag属性写入表,请对表名使用以下表达式:

    ${record:attribute('tag')}
  3. 您要使用什么分区(如果有)?
    让我们使用日期时间变量为分区值表达式创建每日分区,如下所示:

    ${YYYY()}-${MM()}-${DD()}
  4. 您如何配置小数字段的精度和小数位数?

    尽管来自Web服务的数据不包含您知道的小数数据,但是为了防止新的小数数据生成错误记录,请配置小数字段表达式。

    默认表达式用于JDBC Query Consumer或JDBC Multitable Consumer生成的数据。您可以将它们替换为其他表达式或常量。

  5. 正在处理哪种类型的数据?

    在“Data Format”选项卡上,选择“Avro”数据格式。

此时,您的管道如下所示:

使用此配置,Hadoop FS目标端将根据处理时间将每个记录写入tag属性中列出的Hive表和每日分区。

Hive Metastore目标端

现在要处理元数据记录——并在Hive中自动创建和更新表——您需要Hive Metastore目标端。

将目标端连接到处理器的第二个输出流并配置目标端。配置此目标端很容易——只需配置Hive连接信息,并可以选择配置一些高级选项。

目标端以与处理器相同的方式连接到Hive,因此您可以重用该连接信息:

当针对Hive的漂移同步解决方案处理Avro数据时,默认情况下,目标端在表创建查询中包括Stored As Avro子句。您可以在“Advanced”选项卡上进行更改并配置其他高级属性。通常,您可以将默认值用于高级属性,所以让我们开始吧。

现在,Hive Metastore目标端的美妙之处在于:当目标端获取一个元数据记录,假设需要一个新表用于新的Web服务时,它将创建具有所有必要列的表,以便您可以将记录(该元数据记录)写到表中。

并且,如果要写到表的记录的结构发生了变化(例如添加了几个新字段),那么目标端将更新表,以便可以将记录写入表中。

这涵盖了元数据,但是数据呢?

数据处理目标端

要使用记录头属性将数据写入Hive,可以使用Hadoop FS或MapR FS目标端。我们将使用Hadoop FS目标端。

要将数据写入HDFS,请将Hadoop FS目标端连接到Hive Metadata处理器的数据输出流。

配置目标端时,可以配置目标端以使用记录头中的目录,而不是配置目录模板。当目标端在记录头中看到roll属性时,配置目标端以滚动文件;在配置Avro属性时,指示模式在记录头中。

目标端的“Output Files”选项卡可能看起来像这样:

“Data Format”选项卡如下所示:

通过此配置,目标端将使用记录头属性中的信息将数据写入HDFS。它将使用avroSchema头属性中的Avro模式将每个记录写入targetDirectory头属性中的目录。当在记录头中发现roll属性时,它将滚动文件。

请注意,目标位置还可以使用“Max Records in File”,“Max Files Size”和“Idle Timeout”来确定何时滚动文件。

另外,如果要压缩Avro文件,请使用“Data Formats”选项卡上的“Avro Compression Codec”属性,而不是“Output Files”选项卡上的常规压缩选项。

处理Avro数据

现在,当您启动管道时会发生什么?

设置此管道是根据“tag”属性中的表名将数据写入不同的表,该属性已添加到管道前段的记录头。

假设表名是“weblog”和“service”。对于每个以“weblog”作为标签属性的记录,Hive Metadata处理器按以下方式评估记录中的字段:

  • 如果这些字段与现有的Hive表匹配,则仅将必要的信息写入targetDirectory和avroSchema阶段属性中,然后Hadoop FS将记录写入Weblog表中。
  • 如果记录包含新字段,则处理器将生成元数据记录,Hive Metastore目标端将其用于更新Weblog表以包括新列。它还将信息写入阶段属性,以便Hadoop FS可以将记录写入更新的Weblog表。
  • 如果记录中缺少字段,则处理器仅将信息写入阶段属性,而Hadoop FS将记录写入HDFS,并使用缺少字段的空值。
  • 如果字段已重命名,则处理器会将其视为新字段,从而生成元数据记录,Hive Metastore目标端将使用该记录来更新Weblog表。当Hadoop FS写入记录时,数据将被写入新字段,而空值将被写入旧字段。
  • 如果现有字段的数据类型发生更改,则处理器会将记录视为错误记录。

对于带有“service”标签的每个记录,处理器执行相同的操作。

注意:如果记录包含新标签值,则Hive Metadata处理器会生成元数据记录,Hive Metastore目标端将使用该元数据记录来创建新表。Hadoop FS将记录写入新表。因此,如果启动新的Web服务,则无需触摸此管道即可让它处理新的数据集。