Replacing Kinesis Data Firehose as a source with Kinesis Data Streams - Amazon Kinesis Data Analytics for SQL Applications Developer Guide
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

For new projects, we recommend that you use the new Managed Service for Apache Flink Studio over Kinesis Data Analytics for SQL Applications. Managed Service for Apache Flink Studio combines ease of use with advanced analytical capabilities, enabling you to build sophisticated stream processing applications in minutes.

Replacing Kinesis Data Firehose as a source with Kinesis Data Streams

See Converting-KDASQL-KDAStudio/ for a full tutorial.

In the following exercise, you will change your data flow to use Amazon Managed Service for Apache Flink Studio. This will also mean switching from Amazon Kinesis Data Firehose to Amazon Kinesis Data Streams.

First we share a typical KDA-SQL architecture, before showing how you can replace this using Amazon Managed Service for Apache Flink Studio and Amazon Kinesis Data Streams. Alternatively you can launch the Amazon CloudFormation template here:

Amazon Kinesis Data Analytics-SQL and Amazon Kinesis Data Firehose

Here is the Amazon Kinesis Data Analytics SQL architectural flow:

We first examine the setup of a legacy Amazon Kinesis Data Analytics-SQL and Amazon Kinesis Data Firehose. The use case is a trading market where trading data, including stock ticker and price, streams from external sources to Amazon Kinesis systems. Amazon Kinesis Data Analytics for SQL uses the input stream to execute Windowed queries like Tumbling window to determine the trade volume and the min, max and average trade price over a one-minute window for each stock ticker. 

Amazon Kinesis Data Analytics-SQL is set up to ingest data from the Amazon Kinesis Data Firehose API. After processing, Amazon Kinesis Data Analytics-SQL sends the processed data to another Amazon Kinesis Data Firehose, which then saves the output in an Amazon S3 bucket.

In this case, you use Amazon Kinesis Data Generator. Amazon Kinesis Data Generator allows you to send test data to your Amazon Kinesis Data Streams or Amazon Kinesis Data Firehose delivery streams. To get started, please follow the instructions here. Use the Amazon CloudFormation template here in place of the one provided in the instructions:.

Once you run the Amazon CloudFormation template, the output section will provide the Amazon Kinesis Data Generator url. Log in to the portal using the Cognito user id and password you set up here. Select the Region and the target stream name. For current state, choose the Amazon Kinesis Data Firehose Delivery streams. For the new state, choose the Amazon Kinesis Data Firehose Streams name. You can create multiple templates, depending on your requirements, and test the template using the Test template  button before sending it to the target stream.

Following is a sample payload using Amazon Kinesis Data Generator. The data generator targets the input Amazon Kinesis Firehose Streams to stream the data continuously. The Amazon Kinesis SDK client can send data from other producers as well. 

2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763, "AMZN",3352023-02-17 09:28:07.763, "GOOGL",1852023-02-17 09:28:07.763, "AAPL",11162023-02-17 09:28:07.763, "GOOGL",1582

The following JSON is used to generate a random series of trade time and date, stock ticker, and stock price:

date.now(YYYY-MM-DD HH:mm:ss.SSS), "random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])", random.number(2000)

Once you choose Send data, the generator will start sending mock data.

External systems stream the data to Amazon Kinesis Data Firehose. Using Amazon Kinesis Data Analytics for SQL Applications, you can analyze streaming data using standard SQL. The service enables you to author and run SQL code against streaming sources to perform time-series analytics, feed real-time dashboards, and create real-time metrics. Amazon Kinesis Data Analytics for SQL Applications could create a destination stream from SQL queries on the input stream and send the destination stream to another Amazon Kinesis Data Firehose. The destination Amazon Kinesis Data Firehose could send the analytical data to Amazon S3 as the final state.

Amazon Kinesis Data Analytics-SQL legacy code is based on an extension of SQL Standard.

You use the following query in Amazon Kinesis Data Analytics-SQL. You first create a destination stream for the query output. Then, you would use PUMP, which is an Amazon Kinesis Data Analytics Repository Object (an extension of the SQL Standard) that provides a continuously running INSERT INTO stream SELECT ... FROM query functionality, thereby enabling the results of a query to be continuously entered into a named stream. 

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP, INGEST_TIME TIMESTAMP, TICKER VARCHAR(16), VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME", "ticker", COUNT(*) AS VOLUME, AVG("tradePrice") AS AVG_PRICE, MIN("tradePrice") AS MIN_PRICE, MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001" GROUP BY "ticker", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);

The above SQL uses two time windows – tradeTimestamp that comes from the incoming stream payload and ROWTIME.tradeTimestamp is also called Event Time or client-side time. It is often desirable to use this time in analytics because it is the time when an event occurred. However, many event sources, such as mobile phones and web clients, do not have reliable clocks, which can lead to inaccurate times. In addition, connectivity issues can lead to records appearing on a stream not in the same order the events occurred. 

In-application streams also include a special column called ROWTIME. It stores a timestamp when Amazon Kinesis Data Analytics inserts a row in the first in-application stream. ROWTIME reflects the timestamp at which Amazon Kinesis Data Analytics inserted a record into the first in-application stream after reading from the streaming source. This ROWTIME value is then maintained throughout your application. 

The SQL determines the count of ticker as volume, min, max and average price over a 60-second interval. 

Using each of these times in windowed queries that are time-based has advantages and disadvantages. Choose one or more of these times, and a strategy to deal with the relevant disadvantages based on your use case scenario. 

A two-window strategy uses two time-based, both ROWTIME and one of the other times like the event time.

  • Use ROWTIME as the first window, which controls how frequently the query emits the results, as shown in the following example. It is not used as a logical time.

  • Use one of the other times that is the logical time that you want to associate with your analytics. This time represents when the event occurred. In the following example, the analytics goal is to group the records and return count by ticker.

Amazon Managed Service for Apache Flink Studio 

In the updated architecture, you replace Amazon Kinesis Data Firehose with Amazon Kinesis Data Streams. Amazon Kinesis Data Analytics for SQL Applications are replaced by Amazon Managed Service for Apache Flink Studio. Apache Flink code is run interactively within an Apache Zeppelin Notebook. Amazon Managed Service for Apache Flink Studio sends the aggregated trade data to an Amazon S3 bucket for storage. The steps are shown following:

Here is the Amazon Managed Service for Apache Flink Studio architectural flow:

Create a Kinesis Data Stream

To create a data stream using the console
  1. Sign in to the Amazon Web Services Management Console and open the Kinesis console at https://console.amazonaws.cn/kinesis.

  2. In the navigation bar, expand the Region selector and choose a Region.

  3. Choose Create data stream.

  4. On the Create Kinesis stream page, enter a name for your data stream and accept the default On-demand capacity mode.

    With the On-demand mode, you can then choose Create Kinesis stream to create your data stream.

    On the Kinesis streams page, your stream's Status is Creating while the stream is being created. When the stream is ready to use, the Status changes to Active.

  5. Choose the name of your stream. The Stream Details page displays a summary of your stream configuration, along with monitoring information.

  6. In the Amazon Kinesis Data Generator, change the Stream/delivery stream to the new Amazon Kinesis Data Streams: TRADE_SOURCE_STREAM.

    JSON and Payload will be the same as you used for Amazon Kinesis Data Analytics-SQL. Use the Amazon Kinesis Data Generator to produce some sample trading payload data and target the TRADE_SOURCE_STREAM Data Stream for this exercise:

    {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}}, "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}", {{random.number(2000)}}
  7. On the Amazon Web Services Management Console go to Managed Service for Apache Flink and then choose Create application.

  8. On the left navigation pane, choose Studio notebooks and then choose Create studio notebook.

  9. Enter a name for the studio notebook.

  10. Under Amazon Glue database, provide an existing Amazon Glue database that will define the metadata for your sources and destinations. If you don’t have a Amazon Glue database, choose Create and do the following:

    1. In the Amazon Glue console, choose Databases under Data catalog from the left-hand menu.

    2. Choose Create database

    3. In the Create database page, enter a name for the database. In the Location - optional section, choose Browse Amazon S3 and select the Amazon S3 bucket. If you don't have an Amazon S3 bucket already set up, you can skip this step and come back to it later.

    4. (Optional). Enter a description for the database.

    5. Choose Create database.

  11. Choose Create notebook

  12. Once your notebook is created, choose Run.

  13. Once the notebook has been successfully staeted, launch a Zeppelin notebook by choosing Open in Apache Zeppelin.

  14. On the Zeppelin Notebook page, choose Create new note and name it MarketDataFeed.

The Flink SQL code is explained following, but first this is what a Zeppelin notebook screen looks like. Each window within the notebook is a separate code block, and they can be run one at a time.

Amazon Managed Service for Apache Flink Studio Code

Amazon Managed Service for Apache Flink Studio uses Zeppelin Notebooks to run the code. Mapping is done for this example to ssql code based on Apache Flink 1.13. The code in the Zeppelin Notebook is shown below one block at a time. 

Before running any code in your Zeppelin Notebook, Flink configuration commands must be run. If you need to change any configuration setting after running code (ssql, Python, or Scala), you will need to stop and restart your notebook. In this example, you will need to set checkpointing. Checkpointing is required so that you can stream data to a file in Amazon S3. This allows data streaming to Amazon S3 to be flushed to a file. The statement below sets the interval to 5000 miliseconds. 

%flink.conf execution.checkpointing.interval 5000

%flink.conf indicates that this block is configuration statements. For more information about Flink configuration including checkpointingg, see Apache Flink Checkpointing

The input table for the source Amazon Kinesis Data Streams is created with the Flink ssql code below. Note that the TRADE_TIME field stores the date/time created by the data generator.

%flink.ssql DROP TABLE IF EXISTS TRADE_SOURCE_STREAM; CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, TRADE_TIME TIMESTAMP(3), WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE, STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM', 'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');

You can view the input stream with this statement:

%flink.ssql(type=update)-- testing the source stream select * from TRADE_SOURCE_STREAM;

Before sending the aggregate data to Amazon S3, you can view it directly in Amazon Managed Service for Apache Flink Studio with a tumbling window select query. This aggregates the trading data in a one-minute time windows. Note that the %flink.ssql statement must have a (type=update) designation:

%flink.ssql(type=update) select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE) as TRADE_WINDOW, TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

You can then create a table for the destination in Amazon S3. You need to use a watermark. A watermark is a progress metric that indicates a point in time when you are confident that no more delayed events will arrive. The reason for the watermark is to account for late arrivals. The interval ‘5’ Second allows trades to enter the Amazon Kinesis Data Stream 5 seconds late and still be included if they have a timestamp within the window. For more information see Generating Watermarks.   

%flink.ssql(type=update) DROP TABLE IF EXISTS TRADE_DESTINATION_S3; CREATE TABLE TRADE_DESTINATION_S3 ( TRADE_WINDOW_START TIMESTAMP(3), WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND, TICKER STRING,  VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE) WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');

This statement inserts the data into the TRADE_DESTINATION_S3. TUMPLE_ROWTIME is the timestamp of the inclusive upper bound of the tumbling window.

%flink.ssql(type=update) insert into TRADE_DESTINATION_S3 select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE), TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

Let your statement run for 10 to 20 minutes to accumulate some data in Amazon S3. Then abort your statement.

This closes the file in Amazon S3 so that it is viewable.

Here is what the contents looks like:

You can use the Amazon CloudFormation template to create the infrastructure.

Amazon CloudFormation will create the following resources in your Amazon account:

  • Amazon Kinesis Data Streams

  • Amazon Managed Service for Apache Flink Studio

  • Amazon Glue database

  • Amazon S3 bucket

  • IAM roles and policies for Amazon Managed Service for Apache Flink Studio to access appropriate resources

Import the notebook and change the Amazon S3 bucket name with the new Amazon S3 bucket created by Amazon CloudFormation.

See more

Here are some additional resources you can use to learn more about using Managed Service for Apache Flink Studio: