Create and run a Managed Service for Apache Flink application - Managed Service for Apache Flink
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Amazon Managed Service for Apache Flink was previously known as Amazon Kinesis Data Analytics for Apache Flink.

Create and run a Managed Service for Apache Flink application

In this exercise, you create a Managed Service for Apache Flink application with an Amazon MSK topic as a source and an Amazon S3 bucket as a sink.

Create dependent resources

Before you create a Managed Service for Apache Flink for this exercise, you create the following dependent resources:

  • A virtual private cloud (VPC) based on Amazon VPC and an Amazon MSK cluster

  • An Amazon S3 bucket to store the application's code and output (ka-app-code-<username>)

Create a VPC and an Amazon MSK cluster

To create a VPC and Amazon MSK cluster to access from your Managed Service for Apache Flink application, follow the Getting Started Using Amazon MSK tutorial.

When completing the tutorial, note the following:

  • Record the bootstrap server list for your cluster. You can get the list of bootstrap servers with the following command, replacing ClusterArn with the Amazon Resource Name (ARN) of your MSK cluster:

    aws kafka get-bootstrap-brokers --region us-west-2 --cluster-arn ClusterArn {... "BootstrapBrokerStringTls": "b-2.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-1.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094,b-3.awskafkatutorialcluste.t79r6y.c4.kafka.us-west-2.amazonaws.com:9094" }
  • When following the steps in the tutorials, be sure to use your selected Amazon Region in your code, commands, and console entries.

Create an Amazon S3 bucket

You can create the Amazon S3 bucket using the console. For instructions for creating this resource, see the following topics:

  • How Do I Create an S3 Bucket? in the Amazon Simple Storage Service User Guide. Give the Amazon S3 bucket a globally unique name by appending your login name, such as ka-app-code-<username>.

Other resources

When you create your application, Managed Service for Apache Flink creates the following Amazon CloudWatch resources if they don't already exist:

  • A log group called /AWS/KinesisAnalytics-java/MyApplication.

  • A log stream called kinesis-analytics-log-stream.

Write samplerRecords to the input stream

In this section, you use a Python script to write sample records to the Amazon MSK topic for the application to process.

  1. Connect to the client instance you created in Step 4: Create a Client Machine of the Getting Started Using Amazon MSK tutorial.

  2. Install Python3, Pip, and the Kafka Python library:

    $ sudo yum install python37 $ curl -O https://bootstrap.pypa.io/get-pip.py $ python3 get-pip.py --user $ pip install kafka-python
  3. Create a file named stock.py with the following contents. Replace the BROKERS value with your bootstrap broker list you recorded previously.

    from kafka import KafkaProducer import json import random from datetime import datetime # BROKERS = "b-1.stocks.8e6izk.c12.kafka.us-east-1.amazonaws.com:9092,b-2.stocks.8e6izk.c12.kafka.us-east-1.amazonaws.com:9092" BROKERS = "localhost:9092" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=str.encode, retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getReferrer(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getReferrer() # print(data) try: future = producer.send("stocktopic", value=data,key=data['ticker']) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  4. Later in the tutorial, you run the stock.py script to send data to the application.

    $ python3 stock.py

Download and examine the Apache Flink streaming Java code

The Java application code for this example is available from GitHub.

To download the Java application code
  1. Clone the remote repository using the following command:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  2. Navigate to the amazon-kinesis-data-analytics-java-examples/GettingStartedTable directory.

Note the following about the application code:

  • A Project Object Model (pom.xml) file contains information about the application's configuration and dependencies, including the Managed Service for Apache Flink libraries.

  • The StreamingJob.java file contains the main method that defines the application's functionality.

  • The application uses a FlinkKafkaConsumer to read from the Amazon MSK topic. The following snippet creates a FlinkKafkaConsumer object:

    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProps);
  • Your application creates source and sink connectors to access external resources using StreamExecutionEnvironment and TableEnvironment objects.

  • The application creates source and sink connectors using dynamic application properties, so you can specify your application parameters (such as your S3 bucket) without recompiling the code.

    //read the parameters from the Managed Service for Apache Flink environment Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); Properties flinkProperties = null; String kafkaTopic = parameter.get("kafka-topic", "AWSKafkaTutorialTopic"); String brokers = parameter.get("brokers", ""); String s3Path = parameter.get("s3Path", ""); if (applicationProperties != null) { flinkProperties = applicationProperties.get("FlinkApplicationProperties"); } if (flinkProperties != null) { kafkaTopic = flinkProperties.get("kafka-topic").toString(); brokers = flinkProperties.get("brokers").toString(); s3Path = flinkProperties.get("s3Path").toString(); }

    For more information about runtime properties, see Runtime properties.

Note

When building your application, we strongly advise creating and running the Managed Service for Apache Flink application in the same Region as the Amazon MSK cluster. This is because the Flink Kafka connector is by default optimized for low latency environment. If you need to consume from a cross Region Kafka cluster, consider increasing the configuration value for receive.buffer.byte, such as 2097152.

For more information, see Custom MSK configurations.

Compile the application code

In this section, you use the Apache Maven compiler to create the Java code for the application. For information about installing Apache Maven and the Java Development Kit (JDK), see Prerequisites for completing the exercises.

To compile the application code
  1. To use your application code, you compile and package it into a JAR file. You can compile and package your code in one of two ways:

    • Use the command-line Maven tool. Create your JAR file by running the following command in the directory that contains the pom.xml file:

      mvn package -Dflink.version=1.18.1
    • Use your development environment. See your development environment documentation for details.

      Note

      The provided source code relies on libraries from Java 11.

    You can either upload your package as a JAR file, or you can compress your package and upload it as a ZIP file. If you create your application using the Amazon CLI, you specify your code content type (JAR or ZIP).

  2. If there are errors while compiling, verify that your JAVA_HOME environment variable is correctly set.

If the application compiles successfully, the following file is created:

target/aws-kinesis-analytics-java-apps-1.0.jar

Upload the Apache Flink streaming Java code

In this section, you create an Amazon S3 bucket and upload your application code.

To upload the application code
  1. Open the Amazon S3 console at https://console.amazonaws.cn/s3/.

  2. Choose Create bucket.

  3. Enter ka-app-code-<username> in the Bucket name field. Add a suffix to the bucket name, such as your user name, to make it globally unique. Choose Next.

  4. In the Configure options step, keep the settings as they are, and choose Next.

  5. In the Set permissions step, keep the settings as they are, and choose Next.

  6. Choose Create bucket.

  7. In the Amazon S3 console, choose the ka-app-code-<username> bucket, and choose Upload.

  8. In the Select files step, choose Add files. Navigate to the aws-kinesis-analytics-java-apps-1.0.jar file that you created in the previous step. Choose Next.

  9. You don't need to change any of the settings for the object, so choose Upload.

Your application code is now stored in an Amazon S3 bucket where your application can access it.

Create and run the Managed Service for Apache Flink application

Follow these steps to create, configure, update, and run the application using the console.

Create the application

  1. Open the Managed Service for Apache Flink console at https://console.aws.amazon.com/flink

  2. On the Managed Service for Apache Flink dashboard, choose Create analytics application.

  3. On the Managed Service for Apache Flink - Create application page, provide the application details as follows:

    • For Application name, enter MyApplication.

    • For Description, enter My java test app.

    • For Runtime, choose Apache Flink.

    • Keep the version as Apache Flink version 1.18.1 (Recommended version).

  4. For Access permissions, choose Create / update IAM role kinesis-analytics-MyApplication-us-west-2.

  5. Choose Create application.

Note

When you create a Managed Service for Apache Flink application using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:

  • Policy: kinesis-analytics-service-MyApplication-us-west-2

  • Role: kinesisanalytics-MyApplication-us-west-2

Edit the IAM policy

Edit the IAM policy to add permissions to access the Amazon S3 bucket.

To edit the IAM policy to add S3 bucket permissions
  1. Open the IAM console at https://console.amazonaws.cn/iam/.

  2. Choose Policies. Choose the kinesis-analytics-service-MyApplication-us-west-2 policy that the console created for you in the previous section.

  3. On the Summary page, choose Edit policy. Choose the JSON tab.

  4. Add the highlighted section of the following policy example to the policy.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-code-<username>", "arn:aws:s3:::ka-app-code-<username>/*" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] } ] }

Configure the application

Use the following procedure to configure the application.

To configure the application
  1. On the MyApplication page, choose Configure.

  2. On the Configure application page, provide the Code location:

    • For Amazon S3 bucket, enter ka-app-code-<username>.

    • For Path to Amazon S3 object, enter aws-kinesis-analytics-java-apps-1.0.jar.

  3. Under Access to application resources, for Access permissions, choose Create / update IAM role kinesis-analytics-MyApplication-us-west-2.

  4. Under Properties, choose Create group.

  5. Enter the following:

    Group ID Key Value
    FlinkApplicationProperties kafka-topic AWSKafkaTutorialTopic
    FlinkApplicationProperties brokers Your Amazon MSK cluster's Bootstrap Brokers list
    FlinkApplicationProperties s3Path ka-app-code-<username>
    FlinkApplicationProperties security.protocol SSL
    FlinkApplicationProperties ssl.truststore.location /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
    FlinkApplicationProperties ssl.truststore.password changeit
  6. Under Monitoring, ensure that the Monitoring metrics level is set to Application.

  7. For CloudWatch logging, select the Enable check box.

  8. In the Virtual Private Cloud (VPC) section, choose VPC configuration based on Amazon MSK cluster. Choose AWSKafkaTutorialCluster.

  9. Choose Update.

Note

When you choose to enable Amazon CloudWatch logging, Managed Service for Apache Flink creates a log group and log stream for you. The names of these resources are as follows:

  • Log group: /aws/kinesis-analytics/MyApplication

  • Log stream: kinesis-analytics-log-stream

Run the application

Use the following procedure to run the application.

To run the application
  1. On the MyApplication page, choose Run. Confirm the action.

  2. When the application is running, refresh the page. The console shows the Application graph.

  3. From your Amazon EC2 client, run the Python script you created previously to write records to the Amazon MSK cluster for your application to process:

    $ python3 stock.py

Stop the application

To stop the application, on the MyApplication page, choose Stop. Confirm the action.

Next step

Clean up Amazon resources