案例研究:将数据从关系型源端卸载到Hadoop

假设您要将数据从一组数据库表中批量加载到Hive,基本上是替换旧的Apache Sqoop实现。在处理新数据之前,您要删除以前的表。并且您想在管道停止触发其他应用程序的后续操作时创建一个通知文件,例如_SUCCESS文件以启动MapReduce作业。

任务分解如下:

批量处理

要执行批处理,所有处理完成后管道将自动停止,您可以使用创建no-more-data事件的源,然后将该事件传递给Pipeline Finisher执行器。我们将快速完成此过程,但是对于以Pipeline Finisher为中心的案例研究,请参阅案例研究:停止管道。

为了处理数据库数据,我们可以使用JDBC Multitable Consumer——它生成no-more-data事件,并可以产生多个线程以提高吞吐量。有关生成no-more-data事件的来源列表,请参阅Pipeline Finisher文档中的Related Event Generation Stages。

在处理新数据之前删除现有数据
要在管道开始处理数据之前执行任务,请使用管道开始事件。因此,例如,如果要在处理开始之前运行shell命令以执行一组任务,则可以使用Shell执行器。
要truncate Hive表,我们将使用Hive Query执行器。
管道停止时创建通知文件
在所有处理完成之后,在管道完全停止之前,使用管道停止事件来执行任务。要创建一个空的成功文件,我们将使用HDFS File Metadata executor。
现在,让我们逐步进行:

  1. 首先创建您要使用的管道。

    我们在以下简单管道中使用JDBC Multitable Consumer,但是您的管道可以根据需要复杂。

  2. 要设置批处理,请通过在“General选项卡上选择“Produce Events属性来启用源中的事件生成。然后,将事件输出流连接到Pipeline Finisher执行器。

    现在,当源端完成对所有数据的处理后,它会将no-more-data事件传递给Pipeline Finisher。并且,在完成所有管道任务之后,执行器将停止管道。

    注意: JDBC Multitable Consumer源仅生成no-more-data事件,因此您不需要使用Stream Selector或执行器前提条件来管理其他事件类型。但是,如果要使用的源端生成其他事件类型,则应确保仅将no-more-data事件路由到Pipeline Finisher。有关详细信息,请参阅停止管道案例研究。
  3. 若要在处理开始之前清空Hive表,请配置管道以将管道启动事件传递给Hive Query执行器。

    为此,在“General选项卡上,配置“Start Event属性,并选择Hive Query执行器,如下所示:

    请注意,现在将显示“Start Event – Hive Query选项卡。这是因为管道启动和停止事件的执行器未显示在管道画布中——您将选定的执行器配置为管道属性的一部分。

    还要注意,您可以将每种类型的管道事件传递给一个执行者或另一种管道以进行更复杂的处理。有关管道事件的更多信息,请参见管道事件生成。

  4. 要配置执行器,请单击“Start Event – Hive Query”选项卡。

    您可以根据需要配置连接属性,然后指定要使用的查询。在这种情况下,您可以使用以下查询,填写表名:

    TRUNCATE TABLE IF EXISTS <table name>

    另外,选择“Stop on Query Failure”。这样可以确保在执行器无法完成truncate查询时,管道停止并避免执行任何处理。这些属性应如下所示:

    使用此配置,当您启动管道时,Hive Query执行器会在数据处理开始之前清空指定的表。当清空成功完成时,管道开始处理。

  5. 现在,要在所有处理完成后生成成功文件,请使用“Stop Event”属性执行类似的步骤。

    配置管道,以将管道停止事件传递给HDFS文件元数据执行器,如下所示:

  6. 然后,在“Stop Event – HDFS File Metadata选项卡上,指定连接信息并配置执行器以在具有指定名称的必需目录中创建成功文件。

    使用这些配置后,启动管道时,Hive Query执行器将清空查询中指定的表,然后开始管道处理。当JDBC Multitable Consumer完成所有可用数据的处理后,它将no-more-data事件传递给Pipeline Finisher执行器。

    Pipeline Finisher执行器允许管道停止事件触发HDFS File Metadata执行器创建空文件,然后使管道平稳停止。批处理作业完成!