Create and Run a Kinesis Data Analytics for Python Application
In this exercise, you create a Kinesis Data Analytics application for Python application with a Kinesis stream as a source and a sink.
This section contains the following steps.
Create Dependent Resources
Before you create an Amazon Kinesis Data Analytics for Apache Flink for this exercise, you create the following dependent resources:
-
Two Kinesis streams for input and output.
-
An Amazon S3 bucket to store the application's code and output (
ka-app-code-
)<username>
Create Two Kinesis Streams
Before you create a Kinesis Data Analytics application for this exercise, create two Kinesis data streams
(ExampleInputStream
and ExampleOutputStream
). Your
application uses these streams for the application source and destination
streams.
You can create these streams using either the Amazon Kinesis console or the following Amazon CLI command. For console instructions, see Creating and Updating Data Streams in the Amazon Kinesis Data Streams Developer Guide.
To create the data streams (Amazon CLI)
-
To create the first stream (
ExampleInputStream
), use the following Amazon Kinesiscreate-stream
Amazon CLI command.$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
To create the second stream that the application uses to write output, run the same command, changing the stream name to
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Create an Amazon S3 Bucket
You can create the Amazon S3 bucket using the console. For instructions for creating this resource, see the following topics:
-
How Do I Create an S3 Bucket? in the Amazon Simple Storage Service User Guide. Give the Amazon S3 bucket a globally unique name by appending your login name, such as
ka-app-code-
.<username>
Other Resources
When you create your application, Kinesis Data Analytics creates the following Amazon CloudWatch resources if they don't already exist:
-
A log group called
/aws/kinesis-analytics-java/MyApplication
. -
A log stream called
kinesis-analytics-log-stream
.
Write Sample Records to the Input Stream
In this section, you use a Python script to write sample records to the stream for the application to process.
This section requires the Amazon SDK for Python (Boto)
The Python script in this section uses the Amazon CLI. You must configure your Amazon CLI to use your account credentials and default region. To configure your Amazon CLI, enter the following:
aws configure
-
Create a file named
stock.py
with the following contents:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
Run the
stock.py
script:$ python stock.py
Keep the script running while completing the rest of the tutorial.
Create and Examine the Apache Flink Streaming Python Code
The Python application code for this example is available from GitHub. To download the application code, do the following:
Install the Git client if you haven't already. For more information, see Installing Git
. Clone the remote repository with the following command:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
Navigate to the
amazon-kinesis-data-analytics-java-examples/python/GettingStarted
directory.
The application code is located in the streaming-file-sink.py
file. Note the following
about the application code:
The application uses a Kinesis table source to read from the source stream. The following snippet calls the
create_table
function to create the Kinesis table source:table_env.execute_sql( create_table(output_table_name, output_stream, output_region)
The
create_table
function uses a SQL command to create a table that is backed by the streaming source:def create_table(table_name, stream_name, region, stream_initpos = None): init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else '' return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}',{3} 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, init_pos) }
The application creates two tables, then writes the contents of one table to the other.
# 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
The application uses the Flink connector, from the flink- sql-connector-kinesis_2.12/1.15.2
file. When using 3rd-party python packages (such as boto3
), they need to be added to the GettingStarted folder where getting-started.py
is located. There is no need to add any additional configuration in Apache Flink or Kinesis Data Analytics. An example can be found at How to use boto3 within pyFlink.
Upload the Apache Flink Streaming Python Code
In this section, you create an Amazon S3 bucket and upload your application code.
To upload the application code using the console:
Use your preferred compression application to compress the
getting-started.py
and https://mvnrepository.com/artifact/org.apache.flink/flink- sql-connector-kinesis_2.12/1.15.2files. Name the archive myapp.zip
. If you include the outer folder in your archive, you must include this in the path with the code in your configuration file(s):GettingStarted/getting-started.py
.Open the Amazon S3 console at https://console.amazonaws.cn/s3/
. -
Choose Create bucket.
-
Enter
ka-app-code-
in the Bucket name field. Add a suffix to the bucket name, such as your user name, to make it globally unique. Choose Next.<username>
-
In the Configure options step, keep the settings as they are, and choose Next.
-
In the Set permissions step, keep the settings as they are, and choose Next.
-
Choose Create bucket.
-
In the Amazon S3 console, choose the ka-app-code-
<username>
bucket, and choose Upload. -
In the Select files step, choose Add files. Navigate to the
myapp.zip
file that you created in the previous step. Choose Next. -
You don't need to change any of the settings for the object, so choose Upload.
To upload the application code using the Amazon CLI:
Do not use the compress features in Finder (macOS) or Windows Explorer (Windows) to create the myapp.zip
archive.
This may result in invalid application code.
Use your preferred compression application to compress the
streaming-file-sink.py
and https://mvnrepository.com/artifact/org.apache.flink/flink- sql-connector-kinesis_2.12/1.15.2files. Note Do not use the compress features in Finder (macOS) or Windows Explorer (Windows) to create the myapp.zip archive. This may result in invalid application code.
Use your preferred compression application to compress the
getting-started.py
and https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis/1.15.2files. Name the archive myapp.zip
. If you include the outer folder in your archive, you must include this in the path with the code in your configuration file(s):GettingStarted/getting-started.py
.Run the following command:
$ aws s3 --region
aws region
cp myapp.zip s3://ka-app-code-<username>
Your application code is now stored in an Amazon S3 bucket where your application can access it.
Create and Run the Kinesis Data Analytics Application
Follow these steps to create, configure, update, and run the application using the console.
Create the Application
Open the Kinesis Data Analytics console at https://console.amazonaws.cn/kinesisanalytics
. -
On the Kinesis Data Analytics dashboard, choose Create analytics application.
-
On the Kinesis Analytics - Create application page, provide the application details as follows:
-
For Application name, enter
MyApplication
. -
For Description, enter
My java test app
. -
For Runtime, choose Apache Flink.
-
Leave the version as Apache Flink version 1.15.2 (Recommended version).
-
-
For Access permissions, choose Create / update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Choose Create application.
When you create a Kinesis Data Analytics application using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:
-
Policy:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Role:
kinesis-analytics-
MyApplication
-us-west-2
Configure the Application
Use the following procedure to configure the application.
To configure the application
-
On the MyApplication page, choose Configure.
-
On the Configure application page, provide the Code location:
-
For Amazon S3 bucket, enter
ka-app-code-
.<username>
-
For Path to Amazon S3 object, enter
myapp.zip
.
-
-
Under Access to application resources, for Access permissions, choose Create / update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Under Properties, choose Add group.
-
Enter the following:
Group ID Key Value consumer.config.0
input.stream.name
ExampleInputStream
consumer.config.0
aws.region
us-west-2
consumer.config.0
scan.stream.initpos
LATEST
Choose Save.
Under Properties, choose Add group again.
Enter the following:
Group ID Key Value producer.config.0
output.stream.name
ExampleOutputStream
producer.config.0
aws.region
us-west-2
producer.config.0
shard.count
1
Under Properties, choose Add group again. For Group ID, enter
kinesis.analytics.flink.run.options
. This special property group tells your application where to find its code resources. For more information, see Specifying your Code Files.Enter the following:
Group ID Key Value kinesis.analytics.flink.run.options
python
getting-started.py
kinesis.analytics.flink.run.options
jarfile
flink-sql-connector-kinesis-1.15.2.jar
-
Under Monitoring, ensure that the Monitoring metrics level is set to Application.
-
For CloudWatch logging, choose the Enable check box.
-
Choose Update.
When you choose to enable Amazon CloudWatch logging, Kinesis Data Analytics creates a log group and log stream for you. The names of these resources are as follows:
-
Log group:
/aws/kinesis-analytics/MyApplication
-
Log stream:
kinesis-analytics-log-stream
Edit the IAM Policy
Edit the IAM policy to add permissions to access the Amazon S3 bucket.
To edit the IAM policy to add S3 bucket permissions
Open the IAM console at https://console.amazonaws.cn/iam/
. -
Choose Policies. Choose the
kinesis-analytics-service-MyApplication-us-west-2
policy that the console created for you in the previous section. -
On the Summary page, choose Edit policy. Choose the JSON tab.
-
Add the highlighted section of the following policy example to the policy. Replace the sample account IDs (
012345678901
) with your account ID.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/myapp.zip" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
Run the Application
The Flink job graph can be viewed by running the application, opening the Apache Flink dashboard, and choosing the desired Flink job.
Stop the Application
To stop the application, on the MyApplication page, choose Stop. Confirm the action.