Create and Run a Kinesis Data Analytics for Apache Flink Application - Amazon Kinesis Data Analytics
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Create and Run a Kinesis Data Analytics for Apache Flink Application

In this exercise, you create a Kinesis Data Analytics application with an Amazon MSK topic as a source and an Amazon S3 bucket as a sink.

Create Dependent Resources

Before you create an Amazon Kinesis Data Analytics for Apache Flink for this exercise, you create the following dependent resources:

  • A virtual private cloud (VPC) based on Amazon VPC and an Amazon MSK cluster

  • An Amazon S3 bucket to store the application's code and output (ka-app-code-<username>)

Create a VPC and an Amazon MSK Cluster

To create a VPC and Amazon MSK cluster to access from your Kinesis Data Analytics application, follow the Getting Started Using Amazon MSK tutorial.

When completing the tutorial, note the following:

  • Record the bootstrap server list for your cluster. You can get the list of bootstrap servers with the following command, replacing ClusterArn with the Amazon Resource Name (ARN) of your MSK cluster:

    aws kafka get-bootstrap-brokers --region us-west-2 --cluster-arn ClusterArn {... "BootstrapBrokerStringTls": ",," }
  • When following the steps in the tutorials, be sure to use your selected Amazon Region in your code, commands, and console entries.

Create an Amazon S3 Bucket

You can create the Amazon S3 bucket using the console. For instructions for creating this resource, see the following topics:

  • How Do I Create an S3 Bucket? in the Amazon Simple Storage Service User Guide. Give the Amazon S3 bucket a globally unique name by appending your login name, such as ka-app-code-<username>.

Other Resources

When you create your application, Kinesis Data Analytics creates the following Amazon CloudWatch resources if they don't already exist:

  • A log group called /aws/kinesis-analytics-java/MyApplication.

  • A log stream called kinesis-analytics-log-stream.

Write Sample Records to the Input Stream

In this section, you use a Python script to write sample records to the Amazon MSK topic for the application to process.

  1. Connect to the client instance you created in Step 4: Create a Client Machine of the Getting Started Using Amazon MSK tutorial.

  2. Install Python3, Pip, and the Kafka Python library:

    $ sudo yum install python37 $ curl -O $ python3 --user $ pip install kafka-python
  3. Create a file named with the following contents. Replace the BROKERS value with your bootstrap broker list you recorded previously.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time':, 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  4. Later in the tutorial, you run the script to send data to the application.

    $ python3

Download and Examine the Apache Flink Streaming Java Code

The Java application code for this example is available from GitHub.

To download the Java application code
  1. Clone the remote repository using the following command:

    git clone
  2. Navigate to the amazon-kinesis-data-analytics-java-examples/GettingStartedTable directory.

Note the following about the application code:

  • A Project Object Model (pom.xml) file contains information about the application's configuration and dependencies, including the Kinesis Data Analytics libraries.

  • The file contains the main method that defines the application's functionality.

  • The application uses a FlinkKafkaConsumer to read from the Amazon MSK topic. The following snippet creates a FlinkKafkaConsumer object:

    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProps);
  • Your application creates source and sink connectors to access external resources using StreamExecutionEnvironment and TableEnvironment objects.

  • The application creates source and sink connectors using dynamic application properties, so you can specify your application parameters (such as your S3 bucket) without recompiling the code.

    //read the parameters from the Kinesis Analytics environment Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); Properties flinkProperties = null; String kafkaTopic = parameter.get("kafka-topic", "AWSKafkaTutorialTopic"); String brokers = parameter.get("brokers", ""); String s3Path = parameter.get("s3Path", ""); if (applicationProperties != null) { flinkProperties = applicationProperties.get("FlinkApplicationProperties"); } if (flinkProperties != null) { kafkaTopic = flinkProperties.get("kafka-topic").toString(); brokers = flinkProperties.get("brokers").toString(); s3Path = flinkProperties.get("s3Path").toString(); }

    For more information about runtime properties, see Runtime Properties.

Compile the Application Code

In this section, you use the Apache Maven compiler to create the Java code for the application. For information about installing Apache Maven and the Java Development Kit (JDK), see Prerequisites for Completing the Exercises.

To compile the application code
  1. To use your application code, you compile and package it into a JAR file. You can compile and package your code in one of two ways:

    • Use the command-line Maven tool. Create your JAR file by running the following command in the directory that contains the pom.xml file:

      mvn package -Dflink.version=1.15.3
    • Use your development environment. See your development environment documentation for details.


      The provided source code relies on libraries from Java 11.

    You can either upload your package as a JAR file, or you can compress your package and upload it as a ZIP file. If you create your application using the Amazon CLI, you specify your code content type (JAR or ZIP).

  2. If there are errors while compiling, verify that your JAVA_HOME environment variable is correctly set.

If the application compiles successfully, the following file is created:


Upload the Apache Flink Streaming Java Code

In this section, you create an Amazon S3 bucket and upload your application code.

To upload the application code
  1. Open the Amazon S3 console at

  2. Choose Create bucket.

  3. Enter ka-app-code-<username> in the Bucket name field. Add a suffix to the bucket name, such as your user name, to make it globally unique. Choose Next.

  4. In the Configure options step, keep the settings as they are, and choose Next.

  5. In the Set permissions step, keep the settings as they are, and choose Next.

  6. Choose Create bucket.

  7. In the Amazon S3 console, choose the ka-app-code-<username> bucket, and choose Upload.

  8. In the Select files step, choose Add files. Navigate to the aws-kinesis-analytics-java-apps-1.0.jar file that you created in the previous step. Choose Next.

  9. You don't need to change any of the settings for the object, so choose Upload.

Your application code is now stored in an Amazon S3 bucket where your application can access it.

Create and Run the Kinesis Data Analytics Application

Follow these steps to create, configure, update, and run the application using the console.

Create the Application

  1. Open the Kinesis Data Analytics console at

  2. On the Kinesis Data Analytics dashboard, choose Create analytics application.

  3. On the Kinesis Analytics - Create application page, provide the application details as follows:

    • For Application name, enter MyApplication.

    • For Description, enter My java test app.

    • For Runtime, choose Apache Flink.

    • Leave the version as Apache Flink version 1.15.2 (Recommended version).

  4. For Access permissions, choose Create / update IAM role kinesis-analytics-MyApplication-us-west-2.

  5. Choose Create application.


When you create a Kinesis Data Analytics application using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:

  • Policy: kinesis-analytics-service-MyApplication-us-west-2

  • Role: kinesis-analytics-MyApplication-us-west-2

Edit the IAM Policy

Edit the IAM policy to add permissions to access the Amazon S3 bucket.

To edit the IAM policy to add S3 bucket permissions
  1. Open the IAM console at

  2. Choose Policies. Choose the kinesis-analytics-service-MyApplication-us-west-2 policy that the console created for you in the previous section.

  3. On the Summary page, choose Edit policy. Choose the JSON tab.

  4. Add the highlighted section of the following policy example to the policy.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-code-<username>", "arn:aws:s3:::ka-app-code-<username>/*" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] } ] }

Configure the Application

Use the following procedure to configure the application.

To configure the application
  1. On the MyApplication page, choose Configure.

  2. On the Configure application page, provide the Code location:

    • For Amazon S3 bucket, enter ka-app-code-<username>.

    • For Path to Amazon S3 object, enter aws-kinesis-analytics-java-apps-1.0.jar.

  3. Under Access to application resources, for Access permissions, choose Create / update IAM role kinesis-analytics-MyApplication-us-west-2.

  4. Under Properties, choose Create group.

  5. Enter the following:

    Group ID Key Value
    FlinkApplicationProperties kafka-topic AWSKafkaTutorialTopic
    FlinkApplicationProperties brokers Your Amazon MSK cluster's Bootstrap Brokers list
    FlinkApplicationProperties s3Path ka-app-code-<username>
    FlinkApplicationProperties security.protocol SSL
    FlinkApplicationProperties ssl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
    FlinkApplicationProperties ssl.truststore.password changeit
  6. Under Monitoring, ensure that the Monitoring metrics level is set to Application.

  7. For CloudWatch logging, select the Enable check box.

  8. In the Virtual Private Cloud (VPC) section, choose VPC configuration based on Amazon MSK cluster. Choose AWSKafkaTutorialCluster.

  9. Choose Update.


When you choose to enable Amazon CloudWatch logging, Kinesis Data Analytics creates a log group and log stream for you. The names of these resources are as follows:

  • Log group: /aws/kinesis-analytics/MyApplication

  • Log stream: kinesis-analytics-log-stream

Run the Application

Use the following procedure to run the application.

To run the application
  1. On the MyApplication page, choose Run. Confirm the action.

  2. When the application is running, refresh the page. The console shows the Application graph.

  3. From your Amazon EC2 client, run the Python script you created previously to write records to the Amazon MSK cluster for your application to process:

    $ python3

Stop the Application

To stop the application, on the MyApplication page, choose Stop. Confirm the action.

Next Step

Clean Up Amazon Resources