Getting started with Amazon Managed Service for Apache Flink (Table API) - Managed Service for Apache Flink
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Amazon Managed Service for Apache Flink was previously known as Amazon Kinesis Data Analytics for Apache Flink.

Getting started with Amazon Managed Service for Apache Flink (Table API)

This section introduces you to the fundamental concepts of Managed Service for Apache Flink and implementing an application in Java using the Table API and SQL. It demonstrates how to switch between different APIs within the same application, and it describes the available options for creating and testing your applications. It also provides instructions for installing the necessary tools to complete the tutorials in this guide and to create your first application.

Components of the Managed Service for Apache Flink application


Managed Service for Apache Flink supports all Apache Flink APIs and potentially all JVM languages. Depending on the API you choose, the structure of the application and the implementation is slightly different. This tutorial covers the implementation of applications using the Table API and SQL, and the integration with the DataStream API, implemented in Java.

To process data, your Managed Service for Apache Flink application uses a Java application that processes input and produces output using the Apache Flink runtime.

A typical Apache Flink application has the following components:

  • Runtime properties: You can use runtime properties to pass configuration parameters to your application without modifying and republishing the code.

  • Sources: The application consumes data from one or more sources. A source uses a connector to read data from and external system, such as a Kinesis data stream or an Amazon MSK topic. For development or testing, you can also have sources random[ly generate test data. For more information, see Adding streaming data sources to Managed Service for Apache Flink. With SQL or Table API, sources are defined as source tables.

  • Transformations: The application processes data through one or more transformations that can filter, enrich, or aggregate data. When using SQL or Table API, transformations are defined as queries over tables or views.

  • Sinks: The application sends data to external systems through sinks. A sink uses a connector to send data to an external system, such as a Kinesis data stream, an Amazon MSK topic, an Amazon S3 bucket, or a relational database. You can also use a special connector to print the output for development purposes only. When using SQL or Table API, sinks are defined as sink tables where you will insert results. For more information, see Writing data using sinks in Managed Service for Apache Flink.

Your application requires some external dependencies, such as Flink connectors your application uses, or potentially a Java library. To run in Amazon Managed Service for Apache Flink, you must package the application along with dependencies in a fat-JAR and upload it to an Amazon S3 bucket. You then create a Managed Service for Apache Flink application. You pass the code package location, along with other runtime configuration parameters. This tutorial demonstrates how to use Apache Maven to package the application and how to run the application locally in the IDE of your choice.


Before starting this tutorial, complete the first two steps of the Getting started with Amazon Managed Service for Apache Flink (DataStream API):

To get started, see Create an Application.

Run your application locally

You can run and debug your Flink application locally in your IDE.


Before you continue, verify that the input and output streams are available. See Create two Amazon Kinesis data streams. Also, verify that you have permission to read and write from both streams. See Authenticate your Amazon session.

Setting up the local development environment requires Java 11 JDK, Apache Maven, and an IDE for Java development. Verify you meet the required prerequisites. See Fulfill the prerequisites for completing the exercises.

Import the Java project into your IDE

To start working on the application in your IDE, you must import it as a Java project.

The repository you cloned contains multiple examples. Each example is a separate project. For this tutorial, import the content in the ./jave/GettingStartedTable subdirectory into your IDE .

Insert the code as an existing Java project using Maven.


The exact process to import a new Java project varies depending on the IDE you are using.

Modify the local application configuration

When running locally, the application uses the configuration in the application_properties.json file in the resources folder of the project under ./src/main/resources. For this tutorial application, the configuration parameters are the name of the bucket and the path where the data will be written.

Edit the configuration and modify the name of the Amazon S3 bucket to match the bucket that you created at the beginning of this tutorial.

[ { "PropertyGroupId": "bucket", "PropertyMap": { "name": "<bucket-name>", "path": "output" } } ]

The configuration property name must contain only the bucket name, for example my-bucket-name. Don't include any prefix such as s3:// or a trailing slash.

If you modify the path, omit any leading or trailing slashes.

Set up your IDE run configuration

You can run and debug the Flink application from your IDE directly by running the main class, as you would run any Java application. Before running the application, you must set up the Run configuration. The setup depends on the IDE that you are using. For example, see Run/debug configurations in the IntelliJ IDEA documentation. In particular, you must set up the following:

  1. Add the provided dependencies to the classpath. This is required to make sure that the dependencies with provided scope are passed to the application when running locally. Without this set up, the application displays a class not found error immediately.

  2. Pass the Amazon credentials to access the Kinesis streams to the application. The fastest way is to use Amazon Toolkit for IntelliJ IDEA. Using this IDE plugin in the Run configuration, you can select a specific Amazon profile. Amazon authentication happens using this profile. You don't need to pass Amazon credentials directly.

  3. Verify that the IDE runs the application using JDK 11.

Run the application in your IDE

After you set up the Run configuration for the BasicTableJob, you can run or debug it like a regular Java application.


You can't run the fat-jar generated by Maven directly with java -jar ... from the command line. This jar does not contain the Flink core dependencies required to run the application standalone.

When the application starts successfully, it logs some information about the standalone minicluster and the initialization of the connectors. This is followed by a number of INFO and some WARN logs that Flink normally emits when the application starts.

21:28:34,982 INFO [] - Loading application properties from 'flink-application-properties-dev.json' 21:28:35,149 INFO [] - s3Path is ExampleBucket/my-output-bucket ...

After the initialization is complete, the application doesn't emit any further log entries. While data is flowing, no log is emitted.

To verify if the application is correctly processing data, you can inspect the content of the output bucket, as described in the following section.


Not emitting logs about flowing data is the normal behavior for a Flink application. Emitting logs on every record might be convenient for debugging, but can add considerable overhead when running in production.

Observe application writing data to S3 bucket

This example application generates random data internally and writes this data to the destination S3 bucket you configured. Unless you modified the default configuration path, the data will be written to the output path followed by data and hour partitioning, in the format ./output/<yyyy-MM-dd>/<HH>.

The FileSystem sink connector creates new files on the Flink checkpoint. When running locally, the application runs a checkpoint every 5 seconds (5,000 milliseconds), as specified in the code.

if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }

To browse the S3 bucket and observe the file written by the application

Stop your application running locally

Stop the application running in your IDE. The IDE usually provides a "stop" option. The exact location and method depends on the IDE.

Compile and package your application code

In this section, you use Apache Maven to compile the Java code and package it into a JAR file. You can compile and package your code using the Maven command line tool or your IDE.

To compile and package using the Maven command line

Move to the directory that contains the Jave GettingStarted project and run the following command:

$ mvn package

To compile and package using your IDE

Run mvn package from your IDE Maven integration.

In both cases, the JAR file target/amazon-msf-java-table-app-1.0.jar is created.


Running a build project from your IDE might not create the JAR file.

Upload the application code JAR file

In this section, you upload the JAR file you created in the previous section to the Amazon S3 bucket you created at the beginning of this tutorial. If you have done it yet, complete Create an Amazon S3 bucket.

To upload the application code
  1. Open the Amazon S3 console at

  2. Choose the bucket you previously created for the application code.

  3. Choose Upload field.

  4. Choose Add files.

  5. Navigate to the JAR file generated in the previous section: target/amazon-msf-java-table-app-1.0.jar.

  6. Choose Upload without changing any other settings.


    Make sure that you select the correct JAR file in <repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar.

    The target directory also contains other JAR files that you don't need to upload.

Create and configure the Managed Service for Apache Flink application

You can create and configure a Managed Service for Apache Flink application using either the console or the Amazon CLI. For this tutorial, you will use the console.


When you create the application using the console, your Amazon Identity and Access Management (IAM) and Amazon CloudWatch Logs resources are created for you. When you create the application using the Amazon CLI, you must create these resources separately.

Create the application

  1. Open the Managed Service for Apache Flink console at

  2. Verify that the correct Region is selected: US East (N. Virginia) us-east-1.

  3. On the right menu, choose Apache Flink applications and then choose Create streaming application. Alternatively, choose Create streaming application in the Get started section of the initial page.

  4. On the Create streaming application page, complete the following:

    • For Choose a method to set up the stream processing application, choose Create from scratch.

    • For Apache Flink configuration, Application Flink version, choose Apache Flink 1.19.

    • In the Application configuration section, complete the following:

      • For Application name, enter MyApplication.

      • For Description, enter My Java Table API test app.

      • For Access to application resources, choose Create / update IAM role kinesis-analytics-MyApplication-us-east-1 with required policies.

    • In Template for application settings, complete the following:

      • For Templates, choose Develoment.

  5. Choose Create streaming application.


When you create a Managed Service for Apache Flink application using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:

  • Policy: kinesis-analytics-service-MyApplication-us-east-1

  • Role: kinesisanalytics-MyApplication-us-east-1

Edit the IAM policy

Edit the IAM policy to add permissions to access the Amazon S3 bucket.

To edit the IAM policy to add S3 bucket permissions
  1. Open the IAM console at

  2. Choose Policies. Choose the kinesis-analytics-service-MyApplication-us-east-1 policy that the console created for you in the previous section.

  3. Choose Edit and then choose the JSON tab.

  4. Add the highlighted section of the following policy example to the policy. Replace the sample account ID (012345678901) with your account ID and <bucket-name> with the name of the S3 bucket that you created.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "WriteOutputBucket", "Effect": "Allow", "Action": "s3:*", Resource": [ "arn:aws:s3:::my-bucket" ] } ] }
  5. Choose Next and then choose Save changes.

Configure the application

Edit the application to set the application code artifact.

To configure the application
  1. On the MyApplication page, choose Configure.

  2. In the Aplication code location section, choose Configure.

    • For Amazon S3 bucket, select the bucket you previously created for the application code. Choose Browse and select the correct bucket, and then choose Choose. Don't click on the bucket name.

    • For Path to Amazon S3 object, enter amazon-msf-java-table-app-1.0.jar.

  3. For Access permissions, choose Create / update IAM role kinesis-analytics-MyApplication-us-east-1.

  4. In the Runtime properties section, add the following properties.

  5. Choose Add new item and add each of the following parameters:

    Group ID Key Value
    bucket name your-bucket-name
    bucket path output
  6. Don't modify any other setting.

  7. Choose Save changes.


When you choose to enable Amazon CloudWatch logging, Managed Service for Apache Flink creates a log group and log stream for you. The names of these resources are as follows:

  • Log group: /aws/kinesis-analytics/MyApplication

  • Log stream: kinesis-analytics-log-stream

Run the application

The application is now configured and ready to run.

To run the application
  1. Return to the console page in Amazon Managed Service for Apache Flink and choose MyApplication.

  2. Choose Run to start the application.

  3. On the Application restore configuration, choose Run with latest snapshot.

  4. Choose Run.

  5. The Status in Application details transitions from Ready to Starting and then to Running after the application has started.

When the application is in Running status, you can open the Flink dashboard.

To open the dashboard and view the job
  1. Choose Open Apache Flink dashbard. The dashboard opens in a new page.

  2. In the Running Jobs list, choose the single job you can see.


    If you set the runtime properties or edited the IAM policies incorrectly, the application status might change to Running, but the Flink dashboard shows the job continuously restarting. This is a common failure scenario when the application is misconfigured or lacks the permissions to access the external resources.

    When this happens, check the Exceptions tab in the Flink dashboard to investigate the cause of the problem.

Observe the metrics of the running application

On the MyApplication page, in the Amazon CloudWatch metrics section, you can see some of the fundamental metrics from the running application.

To view the metrics
  1. Next to the Refresh button, select 10 seconds from the dropdown list.

  2. When the application is running and healthy, you can see the uptime metric continuously increasing.

  3. The fullrestarts metric should be zero. If it is increasing, the configuration might have issues. Review the Exceptions tab on the Flink dashboard to investigate the issue.

  4. The Number of failed checkpoints metric should be zero in a healthy application.


    This dashboard displays a fixed set of metrics with a granularity of 5 minutes. You can create a custom application dashboard with any metrics in the CloudWatch dashboard.

Observe application writing data to the destination bucket

You can now observe the application running in Amazon Managed Service for Apache Flink writing files to Amazon S3.

To observe the files, follow the same process you used to check the files being written when the application was running locally. See Observe application writing data to S3 bucket.

Remember that the application writes new files on the Flink checkpoint. When running on Amazon Managed Service for Apache Flink, checkpoints are enabled by default and run every 60 seconds. The application creates new files approximately every 1 minute.

Stop the application

To stop the applicatio, go to the console page of the Managed Service for Apache Flink application named MyApplication.

To stop the application
  1. From the Action dropdown list, choose Stop.

  2. The Status in Application details transitions from Running to Stopping, and then to Ready when the application is completely stopped.


    Don't forget to also stop sending data to the input stream from the Python script or the Kinesis Data Generator.

Next step

Clean up Amazon resources