Example: Creating a Sliding Window in Scala
Starting from version 1.15 Flink is Scala free. Applications can now use the Java API from any Scala version. Flink still uses Scala in a few key components internally but doesn't expose Scala into the user code classloader. Because of that, users need to add Scala dependencies into their jar-archives.
For more information about Scala changes in Flink 1.15, see
Scala Free in One Fifteen
In this exercise, you will create a simple streaming application which uses Scala 3.2.0 and Flink's Java DataStream API. The application reads data from Kinesis stream, aggregates it using sliding windows and writes results to output Kinesis stream.
To set up required prerequisites for this exercise, first complete the Getting Started (Scala) exercise.
This topic contains the following sections:
Download and Examine the Application Code
The Python application code for this example is available from GitHub. To download the application code, do the following:
Install the Git client if you haven't already. For more information, see Installing Git
. Clone the remote repository with the following command:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
Navigate to the
amazon-kinesis-data-analytics-java-examples/scala/SlidingWindow
directory.
Note the following about the application code:
A
build.sbt
file contains information about the application's configuration and dependencies, including the Kinesis Data Analytics libraries.The
BasicStreamingJob.scala
file contains the main method that defines the application's functionality.The application uses a Kinesis source to read from the source stream. The following snippet creates the Kinesis source:
private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }
The application also uses a Kinesis sink to write into the result stream. The following snippet creates the Kinesis sink:
private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
The application uses the window operator to find the count of values for each stock symbol over a 10-seconds window that slides by 5 seconds. The following code creates the operator and sends the aggregated data to a new Kinesis Data Streams sink:
environment.addSource(createSource) .map { value => val jsonNode = jsonParser.readValue(value, classOf[JsonNode]) new Tuple2[String, Double](jsonNode.get("ticker").toString, jsonNode.get("price").asDouble) } .returns(Types.TUPLE(Types.STRING, Types.DOUBLE)) .keyBy(v => v.f0) // Logically partition the stream for each word .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .min(1) // Calculate minimum price per ticker over the window .map { value => value.f0 + String.format(",%.2f", value.f1) + "\n" } .sinkTo(createSink)
The application creates source and sink connectors to access external resources using a StreamExecutionEnvironment object.
The application creates source and sink connectors using dynamic application properties. Runtime application's properties are read to configure the connectors. For more information about runtime properties, see Runtime Properties
.
Compile and upload the application code
In this section, you compile and upload your application code to an Amazon S3 bucket.
Compile the Application Code
Use the SBT
To use your application code, you compile and package it into a JAR file. You can compile and package your code with SBT:
sbt assembly
-
If the application compiles successfully, the following file is created:
target/scala-3.2.0/sliding-window-scala-1.0.jar
Upload the Apache Flink Streaming Scala Code
In this section, you create an Amazon S3 bucket and upload your application code.
Open the Amazon S3 console at https://console.aws.amazon.com/s3/
. Choose Create bucket
Enter
ka-app-code-<username>
in the Bucket name field. Add a suffix to the bucket name, such as your user name, to make it globally unique. Choose Next.In Configure options, keep the settings as they are, and choose Next.
In Set permissions, keep the settings as they are, and choose Next.
Choose Create bucket.
Choose the
ka-app-code-<username>
bucket, and then choose Upload.-
In the Select files step, choose Add files. Navigate to the
sliding-window-scala-1.0.jar
file that you created in the previous step. You don't need to change any of the settings for the object, so choose Upload.
Your application code is now stored in an Amazon S3 bucket where your application can access it.
Create and run the Application (console)
Follow these steps to create, configure, update, and run the application using the console.
Create the Application
Open the Kinesis Data Analytics console at https://console.amazonaws.cn/kinesisanalytics
. -
On the Kinesis Data Analytics dashboard, choose Create analytics application.
-
On the Kinesis Analytics - Create application page, provide the application details as follows:
-
For Application name, enter
MyApplication
. -
For Description, enter
My Scala test app
. -
For Runtime, choose Apache Flink.
-
Leave the version as Apache Flink version 1.15.2 (Recommended version).
-
-
For Access permissions, choose Create / update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Choose Create application.
When you create a Kinesis Data Analytics application using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:
-
Policy:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Role:
kinesis-analytics-
MyApplication
-us-west-2
Configure the Application
Use the following procedure to configure the application.
To configure the application
-
On the MyApplication page, choose Configure.
-
On the Configure application page, provide the Code location:
-
For Amazon S3 bucket, enter
ka-app-code-
.<username>
-
For Path to Amazon S3 object, enter
sliding-window-scala-1.0.jar.
.
-
-
Under Access to application resources, for Access permissions, choose Create / update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Under Properties, choose Add group.
-
Enter the following:
Group ID Key Value ConsumerConfigProperties
input.stream.name
ExampleInputStream
ConsumerConfigProperties
aws.region
us-west-2
ConsumerConfigProperties
flink.stream.initpos
LATEST
Choose Save.
Under Properties, choose Add group again.
Enter the following:
Group ID Key Value ProducerConfigProperties
output.stream.name
ExampleOutputStream
ProducerConfigProperties
aws.region
us-west-2
-
Under Monitoring, ensure that the Monitoring metrics level is set to Application.
-
For CloudWatch logging, choose the Enable check box.
-
Choose Update.
When you choose to enable Amazon CloudWatch logging, Kinesis Data Analytics creates a log group and log stream for you. The names of these resources are as follows:
-
Log group:
/aws/kinesis-analytics/MyApplication
-
Log stream:
kinesis-analytics-log-stream
Edit the IAM Policy
Edit the IAM policy to add permissions to access the Amazon S3 bucket.
To edit the IAM policy to add S3 bucket permissions
Open the IAM console at https://console.amazonaws.cn/iam/
. -
Choose Policies. Choose the
kinesis-analytics-service-MyApplication-us-west-2
policy that the console created for you in the previous section. -
On the Summary page, choose Edit policy. Choose the JSON tab.
-
Add the highlighted section of the following policy example to the policy. Replace the sample account IDs (
012345678901
) with your account ID.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/sliding-window-scala-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" } ] }
Run the Application
The Flink job graph can be viewed by running the application, opening the Apache Flink dashboard, and choosing the desired Flink job.
Stop the Application
To stop the application, on the MyApplication page, choose Stop. Confirm the action.
Create and run the application (CLI)
In this section, you use the Amazon Command Line Interface to create and run the Kinesis Data Analytics application. Use the kinesisanalyticsv2 Amazon CLI command to create and interact with Kinesis Data Analytics applications.
Create a permissions policy
You must create a permissions policy and role for your application. If you do not create these IAM resources, your application cannot access its data and log streams.
First, you create a permissions policy with two statements: one that grants permissions for the read action on the source stream, and another that grants permissions for write actions on the sink stream. You then attach the policy to an IAM role (which you create in the next section). Thus, when Kinesis Data Analytics assumes the role, the service has the necessary permissions to read from the source stream and write to the sink stream.
Use the following code to create the KAReadSourceStreamWriteSinkStream
permissions policy. Replace username
with the user name that you used to create the Amazon S3 bucket to store the application code. Replace the account ID in the Amazon Resource Names (ARNs)
(012345678901)
with your account ID.
{ "ApplicationName": "sliding_window", "ApplicationDescription": "Scala sliding window application", "RuntimeEnvironment": "FLINK-1_15", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-
username
", "FileKey": "sliding-window-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901
:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }
For step-by-step instructions to create a permissions policy, see Tutorial: Create and Attach Your First Customer Managed Policy in the IAM User Guide.
Create an IAM role
In this section, you create an IAM role that the Kinesis Data Analytics application can assume to read a source stream and write to the sink stream.
Kinesis Data Analytics cannot access your stream without permissions. You grant these permissions via an IAM role. Each IAM role has two policies attached. The trust policy grants Kinesis Data Analytics permission to assume the role, and the permissions policy determines what Kinesis Data Analytics can do after assuming the role.
You attach the permissions policy that you created in the preceding section to this role.
To create an IAM role
Open the IAM console at https://console.amazonaws.cn/iam/
. In the navigation pane, choose Roles and then Create Role.
Under Select type of trusted identity, choose Amazon Service
Under Choose the service that will use this role, choose Kinesis.
Under Select your use case, choose Kinesis Analytics.
Choose Next: Permissions.
On the Attach permissions policies page, choose Next: Review. You attach permissions policies after you create the role.
On the Create role page, enter
KA-stream-rw-role
for the Role name. Choose Create role.Now you have created a new IAM role called
KA-stream-rw-role
. Next, you update the trust and permissions policies for the roleAttach the permissions policy to the role.
Note For this exercise, Kinesis Data Analytics assumes this role for both reading data from a Kinesis data stream (source) and writing output to another Kinesis data stream. So you attach the policy that you created in the previous step, Create a Permissions Policy.
On the Summary page, choose the Permissions tab.
Choose Attach Policies.
In the search box, enter
KAReadSourceStreamWriteSinkStream
(the policy that you created in the previous section).Choose the
KAReadSourceStreamWriteSinkStream
policy, and choose Attach policy.
You now have created the service execution role that your application uses to access resources. Make a note of the ARN of the new role.
For step-by-step instructions for creating a role, see Creating an IAM Role (Console) in the IAM User Guide.
Create the application
Save the following JSON code to a file named create_request.json
. Replace the sample role ARN with the ARN for the role that you created previously. Replace the bucket ARN suffix (username) with the suffix that you chose in the previous section. Replace the sample account ID (012345678901)
in the service execution role with your account ID.
{ "ApplicationName": "sliding_window", "ApplicationDescription": "Scala sliding_window application", "RuntimeEnvironment": "FLINK-1_15", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-
username
", "FileKey": "sliding-window-scala-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } }, "CloudWatchLoggingOptions": [ { "LogStreamARN": "arn:aws:logs:us-west-2:012345678901
:log-group:MyApplication:log-stream:kinesis-analytics-log-stream" } ] }
Execute the CreateApplication with the following request to create the application:
aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
The application is now created. You start the application in the next step.
Start the Application
In this section, you use the StartApplication action to start the application.
To start the application
Save the following JSON code to a file named
start_request.json
.{ "ApplicationName": "sliding_window", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
Execute the
StartApplication
action with the preceding request to start the application:aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
The application is now running. You can check the Kinesis Data Analytics metrics on the Amazon CloudWatch console to verify that the application is working.
Stop the Application
In this section, you use the StopApplication action to stop the application.
To stop the application
Save the following JSON code to a file named
stop_request.json
.{ "ApplicationName": "sliding_window" }
Execute the
StopApplication
action with the preceding request to stop the application:aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
The application is now stopped.
Add a CloudWatch Logging Option
You can use the Amazon CLI to add an Amazon CloudWatch log stream to your application. For information about using CloudWatch Logs with your application, see Setting Up Application Logging.
Update Environment Properties
In this section, you use the UpdateApplication action to change the environment properties for the application without recompiling the application code. In this example, you change the Region of the source and destination streams.
To update environment properties for the application
Save the following JSON code to a file named
update_properties_request.json
.{"ApplicationName": "sliding_window", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleInputStream", "flink.stream.initpos" : "LATEST" } }, { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2", "stream.name" : "ExampleOutputStream" } } ] } } }
Execute the
UpdateApplication
action with the preceding request to update environment properties:aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json
Update the application code
When you need to update your application code with a new version of your code package, you use the UpdateApplication CLI action.
To load a new version of the application code with the same file name, you must specify the new object version. For more information about using Amazon S3 object versions, see Enabling or Disabling Versioning.
To use the Amazon CLI, delete your previous code package from your Amazon S3 bucket, upload the new version, and call UpdateApplication
, specifying the same Amazon S3 bucket and object name,
and the new object version. The application will restart with the new code package.
The following sample request for the UpdateApplication
action reloads the application code and restarts the application.
Update the CurrentApplicationVersionId
to the current application version. You can check the current application version using the
ListApplications
or DescribeApplication
actions. Update the bucket name suffix (<username>) with the suffix
that you chose in the Create Dependent Resources section.
{ "ApplicationName": "sliding_window", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-
username
", "FileKeyUpdate": "-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }
Clean Up Amazon Resources
This section includes procedures for cleaning up Amazon resources created in the sliding Window tutorial.
This topic contains the following sections:
Delete Your Kinesis Data Analytics Application
Open the Kinesis Data Analytics console at https://console.amazonaws.cn/kinesisanalytics
. in the Kinesis Data Analytics panel, choose MyApplication.
In the application's page, choose Delete and then confirm the deletion.
Delete Your Kinesis Data Streams
Open the Kinesis console at https://console.amazonaws.cn/kinesis
. In the Kinesis Data Streams panel, choose ExampleInputStream.
In the ExampleInputStream page, choose Delete Kinesis Stream and then confirm the deletion.
In the Kinesis streams page, choose the ExampleOutputStream, choose Actions, choose Delete, and then confirm the deletion.
Delete Your Amazon S3 Object and Bucket
Open the Amazon S3 console at https://console.amazonaws.cn/s3/
. Choose the ka-app-code-
<username>
bucket.Choose Delete and then enter the bucket name to confirm deletion.
Delete Your IAM Resources
Open the IAM console at https://console.amazonaws.cn/iam/
. In the navigation bar, choose Policies.
In the filter control, enter kinesis.
Choose the kinesis-analytics-service-MyApplication-
<your-region>
policy.Choose Policy Actions and then choose Delete.
In the navigation bar, choose Roles.
Choose the kinesis-analytics-MyApplication-
<your-region>
role.Choose Delete role and then confirm the deletion.
Delete Your CloudWatch Resources
Open the CloudWatch console at https://console.amazonaws.cn/cloudwatch/
. In the navigation bar, choose Logs.
Choose the /aws/kinesis-analytics/MyApplication log group.
Choose Delete Log Group and then confirm the deletion.