将流数据源添加到 Apache Flink 的 Kinesis Data Analytics - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将流数据源添加到 Apache Flink 的 Kinesis Data Analytics

Apache Flink 提供连接器以从文件、套接字、集合和自定义源中读取。在应用程序代码中,您可以使用 Apache Flink 源以从流中接收数据。本节介绍了可用于 Amazon 服务的源。

Kinesis Data Streams

这些区域有:FlinkKinesisConsumer源从 Amazon Kinesis 数据流中向应用程序提供流数据。

创建FlinkKinesisConsumer

以下代码示例说明了如何创建 FlinkKinesisConsumer

Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

有关使用 FlinkKinesisConsumer 的更多信息,请参阅下载并检查 Apache Flink 流式处理 Java 代码

创建FlinkKinesisConsumer使用 EFO 消费者

这些区域有:FlinkKinesis现在使用者支持增强型扇出功能(EFO).

如果 Kinesis 消费者使用 EFO,Kinesis Data Streams 服务将为其提供自己的专用带宽,而不是让消费者与其他消费者共享直播的固定带宽。

有关将 EFO 与 Kinesis 使用者一起使用的更多信息,请参阅翻转 128:的增强型扇出功能AmazonKinesis 使用者.

您可以通过在 Kinesis 使用器上设置以下参数来启用 EFO 使用者:

  • 记录_PUBLISHER _ 类型:将该参数设置为EFO让您的应用程序使用 EFO 使用者访问 Kinesis 数据流数据。

  • EFO_消费ER_NAME:将此参数设置为在此流的使用者中唯一的字符串值。在同一 Kinesis Data Stream 中重复使用消费者姓名将导致以前使用该名称的消费者被终止。

配置FlinkKinesisConsumer要使用 EFO,请将以下参数添加到使用者:

consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");

有关使用 EFO 消费者的 Kinesis Data Analytics 应用程序的示例,请参阅EFO 消费者.

Amazon MSK

这些区域有:FlinkKafkaConsumer源从 Amazon MSK 主题中向应用程序提供流数据。

创建FlinkKafkaConsumer

以下代码示例说明了如何创建 FlinkKafkaConsumer

Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty("bootstrap.servers", "Cluster Bootstrap Broker String"); inputProperties.setProperty("security.protocol", "SSL"); inputProperties.setProperty("ssl.truststore.location", "/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts"); inputProperties.setProperty("ssl.truststore.password", "changeit"); DataStream<string> input = env.addSource(new FlinkKafkaConsumer<>("MyMSKTopic", new SimpleStringSchema(), inputProperties));

有关使用 FlinkKafkaConsumer 的更多信息,请参阅MSK 复制