使用 Amazon EMR Serverless 连接到 DynamoDB - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon EMR Serverless 连接到 DynamoDB

在本教程中,您要将美国地名委员会的数据子集上传到 Amazon S3 存储桶,然后使用 Amazon EMR Serverless 上的 Hive 或 Spark 将数据复制到 Amazon DynamoDB 表,以便进行查询。

步骤 1:将数据上传到 Amazon S3 存储桶

要创建 Amazon 存储桶,请按照《Amazon Simple Storage Service 控制台用户指南》创建存储桶的说明操作。将对 amzn-s3-demo-bucket 的引用替换为新建存储桶的名称。现在,您的 EMR Serverless 应用程序已准备好运行作业。

  1. 使用以下命令下载示例数据存档 features.zip

    wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
  2. 从存档中提取 features.txt 文件,查看文件的前几行:

    unzip features.zip head features.txt

    结果应类似以下内容。

    1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98

    这里每行中的字段表示唯一标识符、名称、自然特征类型、州、纬度(度)、经度(度)和高度(英尺)。

  3. 将数据上传到 Amazon S3

    aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/

步骤 2:创建 Hive 表

使用 Apache Spark 或 Hive 创建一个新的 Hive 表,其中包含 Amazon S3 中上传的数据。

Spark

要使用 Spark 创建 Hive 表,请运行以下命令。

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE hive_features \ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket/features';")

现在,您有一个 Hive 表,其中包含 features.txt 文件中的数据。要验证数据是否在表中,请运行 Spark SQL 查询,如以下示例所示。

sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
Hive

要使用 Hive 创建 Hive 表,请运行以下命令。

CREATE TABLE hive_features (feature_id BIGINT, feature_name STRING , feature_class STRING , state_alpha STRING, prim_lat_dec DOUBLE , prim_long_dec DOUBLE , elev_in_ft BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/features';

现在,您有一个 Hive 表,其中包含 features.txt 文件中的数据。要验证数据是否在表中,请运行 HiveQL 查询,如以下示例所示。

SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;

步骤 3:将数据复制到 DynamoDB

使用 Spark 或 Hive 将数据复制到新的 DynamoDB 表。

Spark

要将数据从您在上一步中创建的 Hive 表复制到 DynamoDB,请按照将数据复制到 DynamoDB 中的步骤 1-3 操作。这将创建一个名为 Features 的新 DynamoDB 表。然后,您可以直接从文本文件中读取数据,并将其复制到 DynamoDB 表中,如下例所示。

import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "region") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
Hive

要将数据从您在上一步中创建的 Hive 表复制到 DynamoDB,请按照将数据复制到 DynamoDB 中的说明操作。

步骤 4:从 DynamoDB 查询数据

使用 Spark 或 Hive 查询 DynamoDB 表。

Spark

要查询您在上一步中创建的 DynamoDB 表中的数据,您可以使用 Spark SQL 或 Spark API。 MapReduce

例 使用 Spark SQL 查询 DynamoDB 表

以下 Spark SQL 查询将按字母顺序返回所有特征类型的列表。

val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \ FROM ddb_features \ ORDER BY feature_class;")

以下 Spark SQL 查询将返回以字母 M 开头的所有数据湖列表。

val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \ FROM ddb_features \ WHERE feature_class = 'Lake' \ AND feature_name LIKE 'M%' \ ORDER BY feature_name;")

以下 Spark SQL 查询将返回所有州的列表,其中至少有三个特征高于一英里。

val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \ FROM ddb_features \ WHERE elev_in_ft > 5280 \ GROUP by state_alpha, feature_class \ HAVING COUNT(*) >= 3 \ ORDER BY state_alpha, feature_class;")
例 — 使用 Spark API 查询你的 DynamoDB 表 MapReduce

以下 MapReduce 查询按字母顺序返回所有要素类型的列表。

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .map(pair => pair._2.get("feature_class").getS) .distinct() .sortBy(value => value) .toDF("feature_class")

以下 MapReduce 查询返回以字母 M 开头的所有湖泊的列表。

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .filter(pair => "Lake".equals(pair._2.get("feature_class").getS)) .filter(pair => pair._2.get("feature_name").getS.startsWith("M")) .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS)) .sortBy(_._1) .toDF("feature_name", "state_alpha")

以下 MapReduce 查询返回包含至少三个高于一英里的要素的所有州的列表。

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => pair._2.getItem) .filter(pair => pair.get("elev_in_ft").getN != null) .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280) .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS)) .filter(pair => pair._2.size >= 3) .map(pair => (pair._1._1, pair._1._2, pair._2.size)) .sortBy(pair => (pair._1, pair._2)) .toDF("state_alpha", "feature_class", "count")
Hive

要从您在上一步中创建的 DynamoDB 表查询数据,请按照查询 DynamoDB 表中的数据中的说明操作。