Create and run a Managed Service for Apache Flink application - Managed Service for Apache Flink
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Amazon Managed Service for Apache Flink was previously known as Amazon Kinesis Data Analytics for Apache Flink.

Create and run a Managed Service for Apache Flink application

In this exercise, you create a Managed Service for Apache Flink application with Kinesis data streams as a source and sink.

Create dependent resources

Before you create a Managed Service for Apache Flink for this exercise, you create the following dependent resources:

  • An Amazon S3 bucket to store the application's code and to write the application output.

    Note

    This tutorial assumes that you are deploying your application in the us-east-1 Region. If you use another Region, you must adapt all steps accordingly.

Create an Amazon S3 bucket

You can create the Amazon S3 bucket using the console. For instructions for creating this resource, see the following topics:

  • How Do I Create an S3 Bucket? in the Amazon Simple Storage Service User Guide. Give the Amazon S3 bucket a globally unique name by appending your login name.

    Note

    Make sure that you create the bucket in the Region you use for this tutorial. The default for the tutorial is us-east-1.

Other resources

When you create your application, Managed Service for Apache Flink creates the following Amazon CloudWatch resources if they don't already exist:

  • A log group called /AWS/KinesisAnalytics-java/<my-application>.

  • A log stream called kinesis-analytics-log-stream.

Set up your local development environment

For development and debugging, you can run the Apache Flink application on your machine, directly from your IDE of choice. Any Apache Flink dependencies are handled as normal Java dependencies using Maven.

Note

On your development machine, you must have Java JDK 11, Maven, and Git installed. We recommend that you use a development environment such as Eclipse Java Neon or IntelliJ IDEA. To verify that you meet all prerequisites, see Fulfill the prerequisites for completing the exercises. You do not need to install an Apache Flink cluster on your machine.

Authenticate your Amazon session

The application uses Kinesis data streams to publish data. When running locally, you must have a valid Amazon authenticated session with permissions to write to the Kinesis data stream. Use the following steps to authenticate your session:

  1. If you don't have the Amazon CLI and a named profile with valid credential configured, see Step 2: Set up the Amazon Command Line Interface (Amazon CLI).

  2. If your IDE has a plugin to integrate with Amazon, you can use it to pass the credentials to the application running in the IDE. For more information, see Amazon Toolkit for IntelliJ IDEA and Amazon Toolkit for compiling the application or running Eclipse.

Download and examine the Apache Flink streaming Java code

The application code for this example is available from GitHub.

To download the Java application code
  1. Clone the remote repository using the following command:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Navigate to the ./java/GettingStartedTable directory.

Review application components

The application is entirely implemented in the com.amazonaws.services.msf.BasicTableJob class. The main() method defines sources, transformations, and sinks. The execution is initiated by an execution statement at the end of this method.

Note

For an optimal developer experience, the application is designed to run without any code changes both on Amazon Managed Service for Apache Flink and locally, for development in your IDE.

  • To read the runtime configuration so that it will work when running in Amazon Managed Service for Apache Flink and in your IDE, the application automatically detects if it's running standalone locally in the IDE. In that case, the application loads the runtime configuration differently:

    1. When the application detects that it's running in standalone mode in your IDE, form the application_properties.json file included in the resources folder of the project. The content of the file follows.

    2. When the application runs in Amazon Managed Service for Apache Flink, the default behavior loads the application configuration from the runtime properties you will define in the Amazon Managed Service for Apache Flink application. See Create and configure the Managed Service for Apache Flink application.

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • The main() method defines the application data flow and runs it.

    • Initializes the default streaming environments. In this example, we show how to create both the StreamExecutionEnvironment to use with the DataStream API, and the StreamTableEnvironment to use with SQL and the Table API. The two environment objects are two separate references to the same runtime environment, to use different APIs.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    • Load the application configuration parameters. This will automatically load them from the correct place, depending on where the application is running:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • The FileSystem sink connector that the application uses to write results to Amazon S3 output files when Flink completes a checkpoint. You must enable checkpoints to write files to the destination. When the application is running in Amazon Managed Service for Apache Flink, the application configuration controls the checkpoint and enables it by default. Conversely, when running locally, checkpoints are disabled by default. The application detects that it runs locally and configures checkpointing every 5,000 ms.

      if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
    • This application does not receive data from an actual external source. It generates random data to process through the DataGen connector. This connector is available for DataStream API, SQL, and Table API. To demonstrate the integration between APIs, the application uses the DataStram API version because it provides more flexibility. Each record is generated by a generator function called StockPriceGeneratorFunction in this case, where you can put custom logic.

      DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>( new StockPriceGeneratorFunction(), Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordPerSecond), TypeInformation.of(StockPrice.class));
    • In the DataStream API, records can have custom classes. Classes must follow specific rules so that Flink can use them as record. For more information, see Supported Data Types. In this example, the StockPrice class is a POJO.

    • The source is then attached to the execution environment, generating a DataStream of StockPrice. This application doesn't use event-time semantics and doesn't generate a watermark. Run the DataGenerator source with a parallelism of 1, independent of the parallelism of the rest of the application.

      DataStream<StockPrice> stockPrices = env.fromSource( source, WatermarkStrategy.noWatermarks(), "data-generator" ).setParallelism(1);
    • What follows in the data processing flow is defined using the Table API and SQL. To do so, we convert the DataStream of StockPrices into a table. The schema of the table is automatically inferred from the StockPrice class.

      Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    • The following snippet of code shows how to define a view and a query using the programmatic Table API:

      Table filteredStockPricesTable = stockPricesTable. select( $("eventTime").as("event_time"), $("ticker"), $("price"), dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"), dateFormat($("eventTime"), "HH").as("hr") ).where($("price").isGreater(50)); tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    • A sink table is defined to write the results to an Amazon S3 bucket as JSON files. To illustrate the difference with defining a view programmatically, with the Table API the sink table is defined using SQL.

      tableEnv.executeSql("CREATE TABLE s3_sink (" + "eventTime TIMESTAMP(3)," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ") PARTITIONED BY ( dt, hr ) WITH (" + "'connector' = 'filesystem'," + "'fmat' = 'json'," + "'path' = 's3a://" + s3Path + "'" + ")");
    • The last step of the is an executeInsert() that inserts the filtered stock prices view into the sink table. This method initiates the execution of the data flow we have defined so far.

      filteredStockPricesTable.executeInsert("s3_sink");

Use the pom.xml file

The pom.xml file defines all dependencies required by the application and sets up the Maven Shade plugin to build the fat-jar that contains all dependencies required by Flink.

  • Some dependencies have provided scope. These dependencies are automatically available when the application runs in Amazon Managed Service for Apache Flink. They are required for application or to the application locally in your IDE. For more information, see (update to TableAPI) Run your application locally. Make sure that you are using the same Flink version as the runtime you will use in Amazon Managed Service for Apache Flink. To use the TableAPI and SQL, you must include the flink-table-planner-loader and flink-table-runtime-dependencies, both with provided scope.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • You must add additional Apache Flink dependencies to the pom with the default scope. For example, the DataGen connector, the FileSystem SQL connector, and the JSON format.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
  • To write to Amazon S3 when running locally, the S3 Hadoop File System is also included wit provided scope.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • The Maven Java Compiler plugin makes sure that the code is compiled against Java 11, the JDK version currently supported by Apache Flink.

  • The Maven Shade plugin packages the fat-jar, excluding some libraries that are provided by the runtime. It also specifies two transformers: ServicesResourceTransformer and ManifestResourceTransformer. The latter configures the class containing the main method to start the application. If you rename the main class, don't forget update this transformer.

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>