将 Spark Kinesis 连接器迁移到适用于 Amazon EMR 7.0 的 SDK 2.x - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Spark Kinesis 连接器迁移到适用于 Amazon EMR 7.0 的 SDK 2.x

该 Amazon 软件开发工具包提供了一组丰富的 API 和库,用于与 Amazon 云计算服务进行交互,例如管理凭据、连接到 S3 和 Kinesis 服务。Spark Kinesis 连接器用于使用来自 Kinesis Data Streams 的数据,且接收到的数据将在 Spark 的执行引擎中进行转换和处理。目前,此连接器是在 Amazon SDK 和 K inesis-client-library (KCL) 的 1.x 基础上构建的。

作为 Amazon SDK 2.x 迁移的一部分,Spark Kinesis 连接器也相应进行了更新,使其可以与 SDK 2.x 一起运行。在 Amazon EMR 7.0 发行版中,Spark 包含 SDK 2.x 升级,该升级尚不可在 Apache Spark 的社区版本中使用。如果您使用低于 7.0 的版本中的 Spark Kinesis 连接器,则必须先将应用程序代码迁移到在 SDK 2.x 上运行,然后才能迁移到 Amazon EMR 7.0。

迁移指南

本部分介绍将应用程序迁移到升级后的 Spark Kinesis 连接器的步骤。它包括迁移到 Kinesis 客户端库 (KCL) 2.x、 Amazon 证书提供程序和 SDK 2.x 中的 Amazon 服务客户端的指南。 Amazon 作为参考,它还包括一个使用 Kinesis 连接器的示例WordCount程序。

将 KCL 从 1.x 迁移到 2.x

  • KinesisInputDStream 中的指标级别和维度

    当您实例化 KinesisInputDStream 时,您可以控制流的指标级别和维度。以下示例演示了如何使用 KCL 1.x 自定义这些参数:

    import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build()

    在 KCL 2.x 中,这些配置设置具有不同的包名称。要迁移到 2.x:

    1. 分别将 com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfigurationcom.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel 的导入语句更改为 software.amazon.kinesis.metrics.MetricsLevelsoftware.amazon.kinesis.metrics.MetricsUtil

      // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import software.amazon.kinesis.metrics.MetricsLevel // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import software.amazon.kinesis.metrics.MetricsUtil
    2. 将行 metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet 替换为 metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)

    以下是包含自定义指标级别和指标维度的 KinesisInputDStream 的更新版本:

    import software.amazon.kinesis.metrics.MetricsLevel import software.amazon.kinesis.metrics.MetricsUtil val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build()
  • KinesisInputDStream 中的消息处理程序函数

    在实例化 KinesisInputDStream 时,您还可以提供一个“消息处理程序函数”,该函数接收 Kinesis 记录并返回通用对象 T,以备您想使用记录中包含的其他数据(例如分区键)。

    在 KCL 1.x 中,消息处理程序函数签名为:Record => T,其中记录为 com.amazonaws.services.kinesis.model.Record。在 KCL 2.x 中,处理程序的签名更改为:KinesisClientRecord => T,其中 KinesisClientRecord。software.amazon.kinesis.retrieval.KinesisClientRecord

    下面是在 KCL 1.x 中提供消息处理程序的示例:

    import com.amazonaws.services.kinesis.model.Record def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    要迁移消息处理程序:

    1. com.amazonaws.services.kinesis.model.Record 的导入语句更改为 software.amazon.kinesis.retrieval.KinesisClientRecord

      // import com.amazonaws.services.kinesis.model.Record import software.amazon.kinesis.retrieval.KinesisClientRecord
    2. 更新消息处理程序的方法签名。

      //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5

    下面是在 KCL 2.x 中提供消息处理程序的更新示例:

    import software.amazon.kinesis.retrieval.KinesisClientRecord def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    有关从 KCL 1.x 迁移到 2.x 的更多信息,请参阅将使用者从 KCL 1.x 迁移到 KCL 2.x

将 Amazon 凭证提供程序从 Amazon SDK 1.x 迁移到 2.x

凭证提供者用于获取与之交互的 Amazon 凭证 Amazon。SDK 2.x 中有几项与凭证提供程序相关的接口和类更改,可参见此处。Spark Kinesis 连接器定义了一个接口 (org.apache.spark.streaming.kinesis.SparkAWSCredentials) 和实现类,用于返回 1.x 版本的 Amazon 凭据提供程序。初始化 Kinesis 客户端时需要这些凭证提供程序。例如,如果您在应用程序SparkAWSCredentials.provider中使用该方法,则需要更新代码以使用 2.x 版本的 Amazon 凭据提供程序。

以下是在 S Amazon DK 1.x 中使用凭证提供程序的示例:

import org.apache.spark.streaming.kinesis.SparkAWSCredentials import com.amazonaws.auth.AWSCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
要迁移到 SDK 2.x:
  1. com.amazonaws.auth.AWSCredentialsProvider 的导入语句更改为 software.amazon.awssdk.auth.credentials.AwsCredentialsProvider

    //import com.amazonaws.auth.AWSCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
  2. 更新使用此类的其余代码。

    import org.apache.spark.streaming.kinesis.SparkAWSCredentials import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")

将 Amazon 服务客户端从 Amazon SDK 1.x 迁移到 2.x

Amazon 服务客户端在 2.x(即software.amazon.awssdk)中具有不同的软件包名称,而 SDK 1.x 则使用。com.amazonaws有关客户端更改的更多信息,请参阅此处。如果您在代码中使用这些服务客户端,则需要相应地迁移客户端。

下面是在 SDK 1.x 中创建客户端的示例:

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
要迁移到 2.x:
  1. 请更改服务客户端的导入语句。以 DynamoDB 客户端为例。您需要将 com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientcom.amazonaws.services.dynamodbv2.document.DynamoDB 更改为 software.amazon.awssdk.services.dynamodb.DynamoDbClient

    // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient // import com.amazonaws.services.dynamodbv2.document.DynamoDB import software.amazon.awssdk.services.dynamodb.DynamoDbClient
  2. 更新用初始化客户端的代码

    // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(); DynamoDbClient ddbClient = DynamoDbClient.create(); DynamoDbClient ddbClient = DynamoDbClient.builder().build();

    有关将 Amazon SDK 从 1.x 迁移到 2.x 的更多信息,请参阅适用于 Java 的 Amazon SDK 1.x 和 2.x 有什么区别

流式传输应用程序的代码示例

import java.net.URI import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider import software.amazon.awssdk.http.apache.ApacheHttpClient import software.amazon.awssdk.services.kinesis.KinesisClient import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest import software.amazon.awssdk.regions.Region import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream object KinesisWordCountASLSDKV2 { def main(args: Array[String]): Unit = { val appName = "demo-app" val streamName = "demo-kinesis-test" val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" val regionName = "us-west-2" // Determine the number of shards from the stream using the low-level Kinesis Client // from the AWS Java SDK. val credentialsProvider = DefaultCredentialsProvider.create require(credentialsProvider.resolveCredentials() != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html") val kinesisClient = KinesisClient.builder() .credentialsProvider(credentialsProvider) .region(Region.US_WEST_2) .endpointOverride(URI.create(endpointUrl)) .httpClientBuilder(ApacheHttpClient.builder()) .build() val describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build() val numShards = kinesisClient.describeStream(describeStreamRequest) .streamDescription .shards .size // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard. // This is not a necessity; if there are less receivers/DStreams than the number of shards, // then the shards will be automatically distributed among the receivers and each receiver // will receive data from multiple shards. val numStreams = numShards // Spark Streaming batch interval val batchInterval = Milliseconds(2000) // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information // on sequence number of records that have been received. Same as batchInterval for this // example. val kinesisCheckpointInterval = batchInterval // Setup the SparkConfig and StreamingContext val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2") val ssc = new StreamingContext(sparkConfig, batchInterval) // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build() } // Union all the streams val unionStreams = ssc.union(kinesisStreams) // Convert each line of Array[Byte] to String, and split into words val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) // Map each word to a (word, 1) tuple so we can reduce by key to count the words val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) // Print the first 10 wordCounts wordCounts.print() // Start the streaming context and await termination ssc.start() ssc.awaitTermination() } }

使用升级后的 Spark Kinesis 连接器时的注意事项

  • 如果您的应用程序将 Kinesis-producer-library 用于低于 11 版本的 JDK,则可能会遇到异常,例如 java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter。之所以发生这种情况,是因为 EMR 7.0 默认附带 JDK 17,而自 Java 11+ 版本以来,J2EE 模块就已从标准库中移除。此问题可以通过在 pom 文件中添加以下依赖项来解决。将库版本替换为您认为合适的版本。

    <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency>
  • EMR 集群被创建后,可以在此路径下找到 Spark Kinesis 连接器 jar:/usr/lib/spark/connector/lib/