For new projects, we recommend that you use the new Managed Service for Apache Flink Studio over Kinesis Data Analytics for SQL Applications. Managed Service for Apache Flink Studio combines ease of use with advanced analytical capabilities, enabling you to build sophisticated stream processing applications in minutes.
Replacing Kinesis Data Firehose as a source with Kinesis Data Streams
See Converting-KDASQL-KDAStudio/
In the following exercise, you will change your data flow to use Amazon Managed Service for Apache Flink Studio. This will also mean switching from Amazon Kinesis Data Firehose to Amazon Kinesis Data Streams.
First we share a typical KDA-SQL architecture, before showing how you can replace this using Amazon Managed Service for Apache Flink Studio and Amazon Kinesis Data Streams.
Alternatively you can launch the Amazon CloudFormation template here
Amazon Kinesis Data Analytics-SQL and Amazon Kinesis Data Firehose
Here is the Amazon Kinesis Data Analytics SQL architectural flow:
![](images/legacy-sql.png)
We first examine the setup of a legacy Amazon Kinesis Data Analytics-SQL and Amazon Kinesis Data Firehose.
The use case is a trading market where trading data, including stock ticker and price, streams from external sources to Amazon Kinesis systems.
Amazon Kinesis Data Analytics for SQL uses the input stream to execute Windowed queries like Tumbling window to determine the trade volume and the min
, max
and average
trade price over a one-minute window for each stock ticker.
Amazon Kinesis Data Analytics-SQL is set up to ingest data from the Amazon Kinesis Data Firehose API. After processing, Amazon Kinesis Data Analytics-SQL sends the processed data to another Amazon Kinesis Data Firehose, which then saves the output in an Amazon S3 bucket.
In this case, you use Amazon Kinesis Data Generator. Amazon Kinesis Data Generator allows you to send test data to your
Amazon Kinesis Data Streams or Amazon Kinesis Data Firehose delivery streams.
To get started, please follow the instructions here
Once you run the Amazon CloudFormation template, the output section will provide the Amazon Kinesis Data Generator url. Log in to the portal using the Cognito user id and password you set up here
Following is a sample payload using Amazon Kinesis Data Generator. The data generator targets the input Amazon Kinesis Firehose Streams to stream the data continuously. The Amazon Kinesis SDK client can send data from other producers as well.
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763, "AMZN",3352023-02-17 09:28:07.763, "GOOGL",1852023-02-17 09:28:07.763, "AAPL",11162023-02-17 09:28:07.763, "GOOGL",1582
The following JSON is used to generate a random series of trade time and date, stock ticker, and stock price:
date.now(YYYY-MM-DD HH:mm:ss.SSS), "random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])", random.number(2000)
Once you choose Send data, the generator will start sending mock data.
External systems stream the data to Amazon Kinesis Data Firehose. Using Amazon Kinesis Data Analytics for SQL Applications, you can analyze streaming data using standard SQL. The service enables you to author and run SQL code against streaming sources to perform time-series analytics, feed real-time dashboards, and create real-time metrics. Amazon Kinesis Data Analytics for SQL Applications could create a destination stream from SQL queries on the input stream and send the destination stream to another Amazon Kinesis Data Firehose. The destination Amazon Kinesis Data Firehose could send the analytical data to Amazon S3 as the final state.
Amazon Kinesis Data Analytics-SQL legacy code is based on an extension of SQL Standard.
You use the following query in Amazon Kinesis Data Analytics-SQL. You first create a destination stream for the query output.
Then, you would use PUMP
, which is an Amazon Kinesis Data Analytics Repository Object (an extension of the SQL Standard)
that provides a continuously running INSERT INTO stream SELECT ... FROM
query functionality, thereby enabling
the results of a query to be continuously entered into a named stream.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP, INGEST_TIME TIMESTAMP, TICKER VARCHAR(16), VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME", "ticker", COUNT(*) AS VOLUME, AVG("tradePrice") AS AVG_PRICE, MIN("tradePrice") AS MIN_PRICE, MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001" GROUP BY "ticker", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
The above SQL uses two time windows – tradeTimestamp
that comes from the incoming stream payload and ROWTIME.tradeTimestamp
is also
called Event Time
or client-side time
. It is often desirable to use this time in analytics because it is the time when
an event occurred. However, many event sources, such as mobile phones and web clients, do not have reliable clocks, which can lead to inaccurate times.
In addition, connectivity issues can lead to records appearing on a stream not in the same order the events occurred.
In-application streams also include a special column called ROWTIME
. It stores a timestamp when Amazon Kinesis Data Analytics inserts a
row in the first in-application stream. ROWTIME
reflects the timestamp at which Amazon Kinesis Data Analytics inserted a record into
the first in-application stream after reading from the streaming source. This ROWTIME
value is then maintained throughout your application.
The SQL determines the count of ticker as volume
, min
, max
and average
price over a 60-second interval.
Using each of these times in windowed queries that are time-based has advantages and disadvantages. Choose one or more of these times, and a strategy to deal with the relevant disadvantages based on your use case scenario.
A two-window strategy uses two time-based, both ROWTIME
and one of the other times like the event time.
Use
ROWTIME
as the first window, which controls how frequently the query emits the results, as shown in the following example. It is not used as a logical time.Use one of the other times that is the logical time that you want to associate with your analytics. This time represents when the event occurred. In the following example, the analytics goal is to group the records and return count by ticker.
Amazon Managed Service for Apache Flink Studio
In the updated architecture, you replace Amazon Kinesis Data Firehose with Amazon Kinesis Data Streams. Amazon Kinesis Data Analytics for SQL Applications are replaced by Amazon Managed Service for Apache Flink Studio. Apache Flink code is run interactively within an Apache Zeppelin Notebook. Amazon Managed Service for Apache Flink Studio sends the aggregated trade data to an Amazon S3 bucket for storage. The steps are shown following:
Here is the Amazon Managed Service for Apache Flink Studio architectural flow:
![](images/kda-studio.png)
Create a Kinesis Data Stream
To create a data stream using the console
Sign in to the Amazon Web Services Management Console and open the Kinesis console at https://console.amazonaws.cn/kinesis
. -
In the navigation bar, expand the Region selector and choose a Region.
-
Choose Create data stream.
-
On the Create Kinesis stream page, enter a name for your data stream and accept the default On-demand capacity mode.
With the On-demand mode, you can then choose Create Kinesis stream to create your data stream.
On the Kinesis streams page, your stream's Status is Creating while the stream is being created. When the stream is ready to use, the Status changes to Active.
-
Choose the name of your stream. The Stream Details page displays a summary of your stream configuration, along with monitoring information.
In the Amazon Kinesis Data Generator, change the Stream/delivery stream to the new Amazon Kinesis Data Streams: TRADE_SOURCE_STREAM.
JSON and Payload will be the same as you used for Amazon Kinesis Data Analytics-SQL. Use the Amazon Kinesis Data Generator to produce some sample trading payload data and target the TRADE_SOURCE_STREAM Data Stream for this exercise:
{{date.now(YYYY-MM-DD HH:mm:ss.SSS)}}, "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}", {{random.number(2000)}}
On the Amazon Web Services Management Console go to Managed Service for Apache Flink and then choose Create application.
On the left navigation pane, choose Studio notebooks and then choose Create studio notebook.
Enter a name for the studio notebook.
Under Amazon Glue database, provide an existing Amazon Glue database that will define the metadata for your sources and destinations. If you don’t have a Amazon Glue database, choose Create and do the following:
In the Amazon Glue console, choose Databases under Data catalog from the left-hand menu.
Choose Create database
In the Create database page, enter a name for the database. In the Location - optional section, choose Browse Amazon S3 and select the Amazon S3 bucket. If you don't have an Amazon S3 bucket already set up, you can skip this step and come back to it later.
(Optional). Enter a description for the database.
Choose Create database.
Choose Create notebook
Once your notebook is created, choose Run.
Once the notebook has been successfully staeted, launch a Zeppelin notebook by choosing Open in Apache Zeppelin.
On the Zeppelin Notebook page, choose Create new note and name it MarketDataFeed.
The Flink SQL code is explained following, but first this is what a Zeppelin notebook screen looks like
Amazon Managed Service for Apache Flink Studio Code
Amazon Managed Service for Apache Flink Studio uses Zeppelin Notebooks to run the code. Mapping is done for this example to ssql code based on Apache Flink 1.13. The code in the Zeppelin Notebook is shown below one block at a time.
Before running any code in your Zeppelin Notebook, Flink configuration commands must be run. If you need to change any configuration setting after running code (ssql, Python, or Scala), you will need to stop and restart your notebook. In this example, you will need to set checkpointing. Checkpointing is required so that you can stream data to a file in Amazon S3. This allows data streaming to Amazon S3 to be flushed to a file. The statement below sets the interval to 5000 miliseconds.
%flink.conf execution.checkpointing.interval 5000
%flink.conf
indicates that this block is configuration statements.
For more information about Flink configuration including checkpointingg, see Apache Flink Checkpointing
The input table for the source Amazon Kinesis Data Streams is created with the Flink ssql code below. Note that the TRADE_TIME
field stores the date/time created by the data generator.
%flink.ssql DROP TABLE IF EXISTS TRADE_SOURCE_STREAM; CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, TRADE_TIME TIMESTAMP(3), WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE, STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM', 'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
You can view the input stream with this statement:
%flink.ssql(type=update)-- testing the source stream select * from TRADE_SOURCE_STREAM;
Before sending the aggregate data to Amazon S3, you can view it directly in Amazon Managed Service for Apache Flink Studio with a tumbling window select query. This aggregates the trading data in a one-minute time windows. Note that the %flink.ssql statement must have a (type=update) designation:
%flink.ssql(type=update) select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE) as TRADE_WINDOW, TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
You can then create a table for the destination in Amazon S3. You need to use a watermark. A watermark is a progress metric
that indicates a point in time when you are confident that no more delayed events will arrive.
The reason for the watermark is to account for late arrivals. The interval ‘5’ Second
allows trades to enter the
Amazon Kinesis Data Stream 5 seconds late and still be included if they have a timestamp within the window.
For more information see Generating Watermarks
%flink.ssql(type=update) DROP TABLE IF EXISTS TRADE_DESTINATION_S3; CREATE TABLE TRADE_DESTINATION_S3 ( TRADE_WINDOW_START TIMESTAMP(3), WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND, TICKER STRING, VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE) WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
This statement inserts the data into the TRADE_DESTINATION_S3
. TUMPLE_ROWTIME
is the timestamp of the
inclusive upper bound of the tumbling window.
%flink.ssql(type=update) insert into TRADE_DESTINATION_S3 select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE), TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
Let your statement run for 10 to 20 minutes to accumulate some data in Amazon S3. Then abort your statement.
This closes the file in Amazon S3 so that it is viewable.
Here is what the contents looks like:
![](images/kda-studio-contents.png)
You can use the Amazon CloudFormation template
Amazon CloudFormation will create the following resources in your Amazon account:
Amazon Kinesis Data Streams
Amazon Managed Service for Apache Flink Studio
Amazon Glue database
Amazon S3 bucket
IAM roles and policies for Amazon Managed Service for Apache Flink Studio to access appropriate resources
Import the notebook and change the Amazon S3 bucket name with the new Amazon S3 bucket created by Amazon CloudFormation.
![](images/kda-studio-cfn.png)
See more
Here are some additional resources you can use to learn more about using Managed Service for Apache Flink Studio: