本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将流数据源添加到 Apache Flink 的 Kinesis Data Analytics
Apache Flink 提供连接器以从文件、套接字、集合和自定义源中读取。在应用程序代码中,您可以使用 Apache Flink 源
Kinesis Data Streams
这些区域有:FlinkKinesisConsumer
源从 Amazon Kinesis 数据流中向应用程序提供流数据。
创建FlinkKinesisConsumer
以下代码示例说明了如何创建 FlinkKinesisConsumer
:
Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
有关使用 FlinkKinesisConsumer
的更多信息,请参阅下载并检查 Apache Flink 流式处理 Java 代码。
创建FlinkKinesisConsumer
使用 EFO 消费者
这些区域有:FlinkKinesis现在使用者支持增强型扇出功能(EFO)
如果 Kinesis 消费者使用 EFO,Kinesis Data Streams 服务将为其提供自己的专用带宽,而不是让消费者与其他消费者共享直播的固定带宽。
有关将 EFO 与 Kinesis 使用者一起使用的更多信息,请参阅翻转 128:的增强型扇出功能AmazonKinesis 使用者
您可以通过在 Kinesis 使用器上设置以下参数来启用 EFO 使用者:
记录_PUBLISHER _ 类型:将该参数设置为EFO让您的应用程序使用 EFO 使用者访问 Kinesis 数据流数据。
EFO_消费ER_NAME:将此参数设置为在此流的使用者中唯一的字符串值。在同一 Kinesis Data Stream 中重复使用消费者姓名将导致以前使用该名称的消费者被终止。
配置FlinkKinesisConsumer
要使用 EFO,请将以下参数添加到使用者:
consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");
有关使用 EFO 消费者的 Kinesis Data Analytics 应用程序的示例,请参阅EFO 消费者.
Amazon MSK
这些区域有:FlinkKafkaConsumer
源从 Amazon MSK 主题中向应用程序提供流数据。
创建FlinkKafkaConsumer
以下代码示例说明了如何创建 FlinkKafkaConsumer
:
Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty("bootstrap.servers", "
Cluster Bootstrap Broker String
"); inputProperties.setProperty("security.protocol", "SSL"); inputProperties.setProperty("ssl.truststore.location", "/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts"); inputProperties.setProperty("ssl.truststore.password", "changeit"); DataStream<string> input = env.addSource(new FlinkKafkaConsumer<>("MyMSKTopic", new SimpleStringSchema(), inputProperties));
有关使用 FlinkKafkaConsumer
的更多信息,请参阅MSK 复制。