Amazon EMR
Amazon EMR 版本指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 Amazon AWS 入门

用于在 DynamoDB 中导出、导入和查询数据的 Hive 命令示例

以下示例使用 Hive 命令执行将数据导出到 Amazon S3 或 HDFS、将数据导入到 DynamoDB、联接表、查询表等操作。

对 Hive 表执行的操作将引用 DynamoDB 中存储的数据。Hive 命令受到 DynamoDB 表预配置的吞吐量设置约束,并且检索的数据包括 DynamoDB 处理 Hive 操作请求时写入到 DynamoDB 表的数据。如果数据检索过程需要很长一段时间,则自 Hive 命令开始执行以来,Hive 命令返回的某些数据可能已在 DynamoDB 中更新。

Hive 命令 DROP TABLECREATE TABLE 仅对 Hive 中的本地表进行操作,而不会在 DynamoDB 中创建或删除表。如果 Hive 查询引用 DynamoDB 中的表,在您运行查询之前,该表必须已存在。有关在 DynamoDB 中创建和删除表的更多信息,请参阅 Amazon DynamoDB Developer Guide 中的在 DynamoDB 中使用表

注意

当您将 Hive 表映射到 Amazon S3 中的位置时,请勿将其映射到存储桶的根路径 s3://mybucket (因为这会在 Hive 将数据写入到 Amazon S3 时导致错误)。而是应将表映射到存储桶的子路径 s3://mybucket/mypath。

从 DynamoDB 中导出数据

可以使用 Hive 从 DynamoDB 中导出数据。

将 DynamoDB 表导出到 Amazon S3 存储桶

  • 创建一个引用 DynamoDB 中存储的数据的 Hive 表。然后,您可以调用 INSERT OVERWRITE 命令将数据写入到外部目录。在以下示例中,s3://bucketname/path/subpath/ 是 Amazon S3 中的有效路径。调整 CREATE 命令中的列和数据类型以匹配 DynamoDB 中的值。可以使用此命令在 Amazon S3 中创建 DynamoDB 数据的档案。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); INSERT OVERWRITE DIRECTORY 's3://bucketname/path/subpath/' SELECT * FROM hiveTableName;

使用格式设置将 DynamoDB 表导出到 Amazon S3 存储桶

  • 创建引用 Amazon S3 中的位置的外部表。此表在下面显示为 s3_export。在调用 CREATE 期间,为此表指定行格式设置。然后,当您使用 INSERT OVERWRITE 将数据从 DynamoDB 导出到 s3_export 时,数据将以指定的格式写出。在以下示例中,数据以逗号分隔值 (CSV) 的格式写出。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE s3_export SELECT * FROM hiveTableName;

在不指定列映射的情况下将 DynamoDB 表导出到 Amazon S3 存储桶

  • 创建一个引用 DynamoDB 中存储的数据的 Hive 表。此例与前面的示例类似,只是不指定列映射。该表必须正好具有类型为 map<string, string> 的一个列。如果随后在 Amazon S3 中创建 EXTERNAL 表,则可以调用 INSERT OVERWRITE 命令将数据从 DynamoDB 写入到 Amazon S3 中。可以使用此命令在 Amazon S3 中创建 DynamoDB 数据的档案。由于没有列映射,因此您无法查询以此方式导出的表。在 Hive 0.8.1.5 或更高版本 (在 Amazon EMR AMI 2.2x 及其更高版本上受支持) 中导出数据而不指定列映射。

    CREATE EXTERNAL TABLE hiveTableName (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1"); CREATE EXTERNAL TABLE s3TableName (item map<string, string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE s3TableName SELECT * FROM hiveTableName;

使用数据压缩将 DynamoDB 表导出到 Amazon S3 存储桶

  • Hive 提供多个可以在 Hive 会话期间设置的压缩编解码器。这样做会导致导出的数据以指定的格式进行压缩。以下示例使用 Lempel-Ziv-Oberhumer (LZO) 算法压缩导出的文件。

    SET hive.exec.compress.output=true; SET io.seqfile.compression.type=BLOCK; SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec; CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); CREATE EXTERNAL TABLE lzo_compression_table (line STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE lzo_compression_table SELECT * FROM hiveTableName;

    可用的压缩编解码器包括:

    • org.apache.hadoop.io.compress.GzipCodec

    • org.apache.hadoop.io.compress.DefaultCodec

    • com.hadoop.compression.lzo.LzoCodec

    • com.hadoop.compression.lzo.LzopCodec

    • org.apache.hadoop.io.compress.BZip2Codec

    • org.apache.hadoop.io.compress.SnappyCodec

将 DynamoDB 表导出到 HDFS

  • 使用以下 Hive 命令,其中 hdfs:///directoryName 是有效的 HDFS 路径,而 hiveTableName 为 Hive 中引用 DynamoDB 的表。此导出操作比将 DynamoDB 表导出到 Amazon S3 速度快,因为将数据导出到 Amazon S3 时,Hive 0.7.1.1 将 HDFS 用作中间步骤。以下示例还显示了如何将 dynamodb.throughput.read.percent 设置为 1.0 以提高读取请求速率。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); SET dynamodb.throughput.read.percent=1.0; INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName' SELECT * FROM hiveTableName;

    您还可以按上面所示的用于导出到 Amazon S3 的方法,使用格式设置和压缩将数据导出到 HDFS。为此,只需将上面示例中的 Amazon S3 目录替换为 HDFS 目录。

在 Hive 中读取不可打印的 UTF-8 字符数据

  • 创建表时,您可以使用 STORED AS SEQUENCEFILE 子句在 Hive 中读取和写入不可打印的 UTF-8 字符数据。SequenceFile 是 Hadoop 二进制文件格式;您需要使用 Hadoop 来读取此文件。以下示例显示了如何将数据从 DynamoDB 导出到 Amazon S3 中。您可以使用此功能处理不可打印的 UTF-8 编码字符。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>) STORED AS SEQUENCEFILE LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE s3_export SELECT * FROM hiveTableName;

将数据导入到 DynamoDB 中

使用 Hive 将数据写入到 DynamoDB 中时,应确保写入容量单位数大于cluster中的映射器数。例如,在 m1.xlarge EC2 实例上运行的cluster在每个实例上生成 8 个映射器。对于具有 10 个实例的cluster,这意味着生成 80 个映射器。如果写入容量单位数不大于cluster中的映射器数,则 Hive 写入操作可能会占用所有写入吞吐量,或者尝试占用超过预配置值的吞吐量。有关每种 EC2 实例类型生成的映射器数的更多信息,请参阅配置 Hadoop

Hadoop 中的映射器数由输入的拆分数控制。如果拆分数过小,写入命令可能无法占用所有可用的写入吞吐量。

如果具有相同键的项目在目标 DynamoDB 表中存在,则将覆盖该项目。如果目标 DynamoDB 表中不存在具有该键的项目,则将插入该项目。

将表从 Amazon S3 导入到 DynamoDB 中

  • 可以使用 Amazon EMR (Amazon EMR) 和 Hive 将数据从 Amazon S3 写入到 DynamoDB 中。

    CREATE EXTERNAL TABLE s3_import(a_col string, b_col bigint, c_col array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://bucketname/path/subpath/'; CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3_import;

在不指定列映射时将表从 Amazon S3 存储桶导入到 DynamoDB 中

  • 创建一个引用 Amazon S3 中存储的数据的 EXTERNAL 表,该数据是以前从 DynamoDB 中导出的。在导入之前,请确保该表存在于 DynamoDB 中,并且该表与以前导出的 DynamoDB 表具有相同的键架构。此外,该表还必须正好具有类型为 map<string, string> 的一个列。如果您随后创建一个链接到 DynamoDB 的 Hive 表,则可以调用 INSERT OVERWRITE 命令将数据从 Amazon S3 写入到 DynamoDB 中。由于没有列映射,因此您无法查询以此方式导入的表。在 Hive 0.8.1.5 或更高版本 (在 Amazon EMR AMI 2.2.3 及其更高版本上受支持) 中可以在不指定列映射时导入数据。

    CREATE EXTERNAL TABLE s3TableName (item map<string, string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucketname/path/subpath/'; CREATE EXTERNAL TABLE hiveTableName (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1"); INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3TableName;

将表从 HDFS 导入到 DynamoDB 中

  • 可以使用 Amazon EMR 和 Hive 将数据从 HDFS 写入到 DynamoDB 中。

    CREATE EXTERNAL TABLE hdfs_import(a_col string, b_col bigint, c_col array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'hdfs:///directoryName'; CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); INSERT OVERWRITE TABLE hiveTableName SELECT * FROM hdfs_import;

查询 DynamoDB 中的数据

以下示例显示了您可以使用 Amazon EMR 查询 DynamoDB 中存储的数据的各种方式。

查找映射列的最大值 (max)

  • 使用如下 Hive 命令。在第一个命令中,CREATE 语句创建了一个引用 DynamoDB 中存储的数据的 Hive 表。然后,SELECT 语句使用该表查询 DynamoDB 中存储的数据。以下示例查找给定客户提交的最大订单。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); SELECT max(total_cost) from hive_purchases where customerId = 717;

使用 GROUP BY 子句聚合数据

  • 可以使用 GROUP BY 子句收集多条记录中的数据。此子句通常与聚合函数 (如 sum、count、min 或 max) 一起使用。以下示例返回提交了三个以上订单的客户的最大订单列表。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;

联接两个 DynamoDB 表

  • 以下示例将两个 Hive 表映射到 DynamoDB 中存储的数据。然后,它对这两个表调用联接。联接在cluster上计算并返回。联接不在 DynamoDB 中进行。此示例返回提交了两个以上订单的客户及其购买物的列表。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); CREATE EXTERNAL TABLE hive_customers(customerId bigint, customerName string, customerAddress array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Customers", "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address"); Select c.customerId, c.customerName, count(*) as count from hive_customers c JOIN hive_purchases p ON c.customerId=p.customerId GROUP BY c.customerId, c.customerName HAVING count > 2;

联接来自不同源的两个表

  • 在以下示例中,Customer_S3 是加载了 Amazon S3 中存储的 CSV 文件的 Hive 表,而 hive_purchases 是引用了 DynamoDB 中的数据的表。以下示例将 Amazon S3 中以 CSV 文件格式存储的客户数据与 DynamoDB 中存储的订单数据联接在一起,以返回一组数据,这些数据表示客户名称中包含“Miller”的客户提交的订单。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); CREATE EXTERNAL TABLE Customer_S3(customerId bigint, customerName string, customerAddress array<String>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://bucketname/path/subpath/'; Select c.customerId, c.customerName, c.customerAddress from Customer_S3 c JOIN hive_purchases p ON c.customerid=p.customerid where c.customerName like '%Miller%';

注意

在上述示例中,为了提高清晰性和完整性,在每个示例中均包括了 CREATE TABLE 语句。针对给定 Hive 表运行多个查询或执行导出操作时,只需在 Hive 会话的开始创建表一次即可。