Adding streaming data sources to Managed Service for Apache Flink - Managed Service for Apache Flink
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Amazon Managed Service for Apache Flink was previously known as Amazon Kinesis Data Analytics for Apache Flink.

Adding streaming data sources to Managed Service for Apache Flink

Apache Flink provides connectors for reading from files, sockets, collections, and custom sources. In your application code, you use an Apache Flink source to receive data from a stream. This section describes the sources that are available for Amazon services

Kinesis data streams

The FlinkKinesisConsumer source provides streaming data to your application from an Amazon Kinesis data stream.

Creating a FlinkKinesisConsumer

The following code example demonstrates creating a FlinkKinesisConsumer:

Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

For more information about using a FlinkKinesisConsumer, see Download and examine the Apache Flink streaming Java code.

Creating a FlinkKinesisConsumer that uses an EFO consumer

The FlinkKinesisConsumer now supports Enhanced Fan-Out (EFO).

If a Kinesis consumer uses EFO, the Kinesis Data Streams service gives it its own dedicated bandwidth, rather than having the consumer share the fixed bandwidth of the stream with the other consumers reading from the stream.

For more information about using EFO with the Kinesis consumer, see FLIP-128: Enhanced Fan Out for Amazon Kinesis Consumers.

You enable the EFO consumer by setting the following parameters on the Kinesis consumer:

  • RECORD_PUBLISHER_TYPE: Set this parameter to EFO for your application to use an EFO consumer to access the Kinesis Data Stream data.

  • EFO_CONSUMER_NAME: Set this parameter to a string value that is unique among the consumers of this stream. Re-using a consumer name in the same Kinesis Data Stream will cause the previous consumer using that name to be terminated.

To configure a FlinkKinesisConsumer to use EFO, add the following parameters to the consumer:

consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");

For an example of a Managed Service for Apache Flink application that uses an EFO consumer, see EFO Consumer.

Amazon MSK

The KafkaSource source provides streaming data to your application from an Amazon MSK topic.

Creating a KafkaSource

The following code example demonstrates creating a KafkaSource:

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

For more information about using a KafkaSource, see MSK Replication.