Example: Writing to an Amazon S3 Bucket
In this exercise, you create an Amazon Kinesis Data Analytics for Apache Flink that has a Kinesis data stream as a source and an Amazon S3 bucket as a sink. Using the sink, you can verify the output of the application in the Amazon S3 console.
To set up required prerequisites for this exercise, first complete the Getting Started (DataStream API) exercise.
This topic contains the following sections:
- Create Dependent Resources
- Write Sample Records to the Input Stream
- Download and Examine the Application Code
- Modify the Application Code
- Compile the Application Code
- Upload the Apache Flink Streaming Java Code
- Create and Run the Kinesis Data Analytics Application
- Verify the Application Output
- Optional: Customize the Source and Sink
- Clean Up Amazon Resources
Create Dependent Resources
Before you create an Amazon Kinesis Data Analytics for Apache Flink for this exercise, you create the following dependent resources:
-
A Kinesis data stream (
ExampleInputStream
). -
An Amazon S3 bucket to store the application's code and output (
ka-app-code-
)<username>
Kinesis Data Analytics for Apache Flink cannot write data to Amazon S3 with server-side encryption enabled on Kinesis Data Analytics.
You can create the Kinesis stream and Amazon S3 bucket using the console. For instructions for creating these resources, see the following topics:
-
Creating and Updating Data Streams in the Amazon Kinesis Data Streams Developer Guide. Name your data stream
ExampleInputStream
. -
How Do I Create an S3 Bucket? in the Amazon Simple Storage Service User Guide. Give the Amazon S3 bucket a globally unique name by appending your login name, such as
ka-app-code-
. Create two folders (<username>
code
anddata
) in the Amazon S3 bucket.
The application creates the following CloudWatch resources if they don't already exist:
-
A log group called
/aws/kinesis-analytics-java/MyApplication
. -
A log stream called
kinesis-analytics-log-stream
.
Write Sample Records to the Input Stream
In this section, you use a Python script to write sample records to the stream for the application to process.
This section requires the Amazon SDK for Python (Boto)
-
Create a file named
stock.py
with the following contents:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
Run the
stock.py
script:$ python stock.py
Keep the script running while completing the rest of the tutorial.
Download and Examine the Application Code
The Java application code for this example is available from GitHub. To download the application code, do the following:
-
Install the Git client if you haven't already. For more information, see Installing Git
. -
Clone the remote repository with the following command:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
-
Navigate to the
amazon-kinesis-data-analytics-java-examples/S3Sink
directory.
The application code is located in the S3StreamingSinkJob.java
file. Note
the following about the application code:
-
The application uses a Kinesis source to read from the source stream. The following snippet creates the Kinesis source:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
-
You need to add the following import statement:
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-
The application uses an Apache Flink S3 sink to write to Amazon S3.
The sink reads messages in a tumbling window, encodes messages into S3 bucket objects, and sends the encoded objects to the S3 sink. The following code encodes objects for sending to Amazon S3:
input.map(value -> { // Parse the JSON JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class); return new Tuple2<>(jsonNode.get("ticker").toString(), 1); }).returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(v -> v.f0) // Logically partition the stream for each word .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .sum(1) // Count the appearances by ticker per partition .map(value -> value.f0 + " count: " + value.f1.toString() + "\n") .addSink(createS3SinkFromStaticConfig());
The application uses a Flink StreamingFileSink
object to write to
Amazon S3. For more information about the StreamingFileSink
, see StreamingFileSink
Modify the Application Code
In this section, you modify the application code to write output to your Amazon S3 bucket.
Update the following line with your user name to specify the application's output location:
private static final String s3SinkPath = "s3a://ka-app-code-
<username>
/data";
Compile the Application Code
To compile the application, do the following:
-
Install Java and Maven if you haven't already. For more information, see Prerequisites in the Getting Started (DataStream API) tutorial.
-
Compile the application with the following command:
mvn package -Dflink.version=1.15.3
Compiling the application creates the application JAR file
(target/aws-kinesis-analytics-java-apps-1.0.jar
).
The provided source code relies on libraries from Java 11.
Upload the Apache Flink Streaming Java Code
In this section, you upload your application code to the Amazon S3 bucket you created in the Create Dependent Resources Write Sample Records to the Input Stream section.
-
In the Amazon S3 console, choose the ka-app-code-
<username>
bucket, navigate to the code folder, and choose Upload. -
In the Select files step, choose Add files. Navigate to the
aws-kinesis-analytics-java-apps-1.0.jar
file that you created in the previous step. -
You don't need to change any of the settings for the object, so choose Upload.
Your application code is now stored in an Amazon S3 bucket where your application can access it.
Create and Run the Kinesis Data Analytics Application
Follow these steps to create, configure, update, and run the application using the console.
Create the Application
Open the Kinesis Data Analytics console at https://console.amazonaws.cn/kinesisanalytics
. -
On the Kinesis Data Analytics dashboard, choose Create analytics application.
-
On the Kinesis Analytics - Create application page, provide the application details as follows:
-
For Application name, enter
MyApplication
. -
For Runtime, choose Apache Flink.
Leave the version pulldown as Apache Flink version 1.15.2 (Recommended version).
-
-
For Access permissions, choose Create / update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Choose Create application.
Note When you create a Kinesis Data Analytics application using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:
-
For Application name, enter
MyApplication
. -
For Runtime, choose Apache Flink.
-
Leave the version as Apache Flink version 1.15.2 (Recommended version).
-
-
For Access permissions, choose Create / update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Choose Create application.
When you create an Amazon Kinesis Data Analytics for Apache Flink using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:
-
Policy:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Role:
kinesis-analytics-
MyApplication
-us-west-2
Edit the IAM Policy
Edit the IAM policy to add permissions to access the Kinesis data stream.
Open the IAM console at https://console.amazonaws.cn/iam/
. -
Choose Policies. Choose the
kinesis-analytics-service-MyApplication-us-west-2
policy that the console created for you in the previous section. -
On the Summary page, choose Edit policy. Choose the JSON tab.
-
Add the highlighted section of the following policy example to the policy. Replace the sample account IDs (
012345678901
) with your account ID. Replace <username> with your user name.{ "Sid": "S3", "Effect": "Allow", "Action": [ "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-code-<username>", "arn:aws:s3:::ka-app-code-<username>/*" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:%LOG_GROUP_PLACEHOLDER%:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:%LOG_GROUP_PLACEHOLDER%:log-stream:%LOG_STREAM_PLACEHOLDER%" ] }
, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" },
Configure the Application
-
On the MyApplication page, choose Configure.
-
On the Configure application page, provide the Code location:
-
For Amazon S3 bucket, enter
ka-app-code-
.<username>
-
For Path to Amazon S3 object, enter
code/aws-kinesis-analytics-java-apps-1.0.jar
.
-
-
Under Access to application resources, for Access permissions, choose Create / update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Under Monitoring, ensure that the Monitoring metrics level is set to Application.
-
For CloudWatch logging, select the Enable check box.
-
Choose Update.
When you choose to enable CloudWatch logging, Kinesis Data Analytics creates a log group and log stream for you. The names of these resources are as follows:
-
Log group:
/aws/kinesis-analytics/MyApplication
-
Log stream:
kinesis-analytics-log-stream
This log stream is used to monitor the application. This is not the same log stream that the application uses to send results.
Run the Application
-
On the MyApplication page, choose Run. Leave the Run without snapshot option selected, and confirm the action.
-
When the application is running, refresh the page. The console shows the Application graph.
Verify the Application Output
In the Amazon S3 console, open the data folder in your S3 bucket.
After a few minutes, objects containing aggregated data from the application will appear.
Aggregration is enabled by default in Flink. To disable it, use the following:
sink.producer.aggregation-enabled' = 'false'
Optional: Customize the Source and Sink
In this section, you customize settings on the source and sink objects.
After changing the code sections described in the sections following, do the following to reload the application code:
-
Repeat the steps in the Compile the Application Code section to compile the updated application code.
-
Repeat the steps in the Upload the Apache Flink Streaming Java Code section to upload the updated application code.
-
On the application's page in the console, choose Configure and then choose Update to reload the updated application code into your application.
This section contains the following sections:
Configure Data Partitioning
In this section, you configure the names of the folders that the streaming file sink creates in the S3 bucket. You do this by adding a bucket assigner to the streaming file sink.
To customize the folder names created in the S3 bucket, do the following:
-
Add the following import statements to the beginning of the
S3StreamingSinkJob.java
file:import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
-
Update the
createS3SinkFromStaticConfig()
method in the code to look like the following:private static StreamingFileSink<String> createS3SinkFromStaticConfig() { final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<String>("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH")) .withRollingPolicy(DefaultRollingPolicy.create().build())
.build(); return sink; }
The preceding code example uses the DateTimeBucketAssigner
with a custom date format to create folders in the S3 bucket. The DateTimeBucketAssigner
uses the current system time to create bucket names. If you want to create a custom bucket assigner to further customize the created folder names, you can create a class that implements BucketAssignergetBucketId
method.
A custom implementation of BucketAssigner
can use the Context
Configure Read Frequency
In this section, you configure the frequency of reads on the source stream.
The Kinesis Streams consumer reads from the source stream five times per second by default. This frequency will cause issues if there is more than one client reading from the stream, or if the application needs to retry reading a record. You can avoid these issues by setting the read frequency of the consumer.
To set the read frequency of the Kinesis consumer, you set the
SHARD_GETRECORDS_INTERVAL_MILLIS
setting.
The following code example sets the SHARD_GETRECORDS_INTERVAL_MILLIS
setting to one second:
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");
Configure Write Buffering
In this section, you configure the write frequency and other settings of the sink.
By default, the application writes to the destination bucket every minute. You can
change this interval and other settings by configuring the
DefaultRollingPolicy
object.
The Apache Flink streaming file sink writes to its output bucket every time the application creates a checkpoint. The application creates a checkpoint every minute by default. To increase the write interval of the S3 sink, you must also increase the checkpoint interval.
To configure the DefaultRollingPolicy
object, do the
following:
-
Increase the application's
CheckpointInterval
setting. The following input for the UpdateApplication action sets the checkpoint interval to 10 minutes:{ "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "ConfigurationTypeUpdate" : "CUSTOM", "CheckpointIntervalUpdate": 600000 } } }, "ApplicationName": "MyApplication", "CurrentApplicationVersionId":
5
}To use the preceding code, specify the current application version. You can retrieve the application version by using the ListApplications action.
-
Add the following import statement to the beginning of the
S3StreamingSinkJob.java
file:import java.util.concurrent.TimeUnit;
-
Update the
createS3SinkFromStaticConfig
method in theS3StreamingSinkJob.java
file to look like the following:private static StreamingFileSink<String> createS3SinkFromStaticConfig() { final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<String>("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH")) .withRollingPolicy( DefaultRollingPolicy.create() .withRolloverInterval(TimeUnit.MINUTES.toMillis(8)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build())
.build(); return sink; }The preceding code example sets the frequency of writes to the Amazon S3 bucket to 8 minutes.
For more information about configuring the Apache Flink streaming file sink, see
Row-encoded Formats
Clean Up Amazon Resources
This section includes procedures for cleaning up Amazon resources that you created in the Amazon S3 tutorial.
This topic contains the following sections:
Delete Your Kinesis Data Analytics Application
Open the Kinesis Data Analytics console at https://console.amazonaws.cn/kinesisanalytics
. -
In the Kinesis Data Analytics panel, choose MyApplication.
-
On the application's page, choose Delete and then confirm the deletion.
Delete Your Kinesis Data Stream
Open the Kinesis console at https://console.amazonaws.cn/kinesis
. -
In the Kinesis Data Streams panel, choose ExampleInputStream.
-
On the ExampleInputStream page, choose Delete Kinesis Stream and then confirm the deletion.
Delete Your Amazon S3 Objects and Bucket
Open the Amazon S3 console at https://console.amazonaws.cn/s3/
. -
Choose the ka-app-code-
<username>
bucket. -
Choose Delete and then enter the bucket name to confirm deletion.
Delete Your IAM Resources
Open the IAM console at https://console.amazonaws.cn/iam/
. -
On the navigation bar, choose Policies.
-
In the filter control, enter kinesis.
-
Choose the kinesis-analytics-service-MyApplication-
<your-region>
policy. -
Choose Policy Actions and then choose Delete.
-
On the navigation bar, choose Roles.
-
Choose the kinesis-analytics-MyApplication-
<your-region>
role. -
Choose Delete role and then confirm the deletion.
Delete Your CloudWatch Resources
Open the CloudWatch console at https://console.amazonaws.cn/cloudwatch/
. -
On the navigation bar, choose Logs.
-
Choose the /aws/kinesis-analytics/MyApplication log group.
-
Choose Delete Log Group and then confirm the deletion.