Leveraging user-defined functions (UDFs) - Amazon Kinesis Data Analytics for SQL Applications Developer Guide
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

For new projects, we recommend that you use the new Managed Service for Apache Flink Studio over Kinesis Data Analytics for SQL Applications. Managed Service for Apache Flink Studio combines ease of use with advanced analytical capabilities, enabling you to build sophisticated stream processing applications in minutes.

Leveraging user-defined functions (UDFs)

The purpose of the pattern is to demonstrate how to leverage UDFs in Kinesis Data Analytics-Studio Zeppelin notebooks for processing data in the Kinesis stream. Managed Service for Apache Flink Studio uses Apache Flink to provide advanced analytical capabilities, including exactly-once processing semantics, event-time windows, extensibility using user defined functions and customer integrations, imperative language support, durable application state, horizontal scaling, support for multiple data sources, extensible integrations, and more. These are critical for ensuring accuracy, completeness, consistency, and reliability of data streams processing and are not available with Amazon Kinesis Data Analytics for SQL.

In this sample application, we will demonstrate how to leverage UDFs in KDA-Studio Zeppelin notebook for processing data in the Kinesis stream. Studio notebooks for Kinesis Data Analytics allows you to interactively query data streams in real time, and easily build and run stream processing applications using standard SQL, Python, and Scala. With a few clicks in the Amazon Web Services Management Console, you can launch a serverless notebook to query data streams and get results in seconds. For more information, see Using a Studio notebook with Kinesis Data Analytics for Apache Flink.

Lambda functions used for pre/post processing of data in KDA-SQL applications:

User-defined functions for pre/post processing of data using KDA-Studio Zeppelin notebooks

User-defined functions (UDFs)

To reuse common business logic into an operator, it can be useful to reference a user-defined function to transform your data stream. This can be done either within the Managed Service for Apache Flink Studio notebook, or as an externally referenced application jar file. Utilizing User-defined functions can simplify the transformations or data enrichments that you might perform over streaming data.

In your notebook, you will be referencing a simple Java application jar that has functionality to anonymize personal phone numbers. You can also write Python or Scala UDFs for use within the notebook. We chose a Java application jar to highlight the functionality of importing an application jar into a Pyflink notebook.

Environment setup

To follow this guide and interact with your streaming data, you will use an Amazon CloudFormation scripts to launch the following resources:

  • Source and target Kinesis Data Streams

  • Glue Database

  • IAM role

  • Managed Service for Apache Flink Studio Application

  • Lambda Function to start Managed Service for Apache Flink Studio Application

  • Lambda Role to execute above Lambda function

  • Custom resource to invoke Lambda function

Download the Amazon CloudFormation template here.

Create the Amazon CloudFormation stack
  1. Go to the Amazon Web Services Management Console and choose CloudFormation under the list of services.

  2. On the CloudFormation page, choose Stacks and then choose Create Stack with new resources (standard).

  3. On the Create stack page, choose Upload a Template File, and then choose the kda-flink-udf.yml that you downloaded previously. Upload the file and then choose Next.

  4. Give the template a name, such as kinesis-UDF so that it is easy to remember, and update input Parameters such as input-stream if you want a different name. Choose Next.

  5. On the Configure stack options page, add Tags if you wish and then choose Next.

  6. On the Review page, check the boxes allowing for the creation of IAM resources and then choose Submit.

The Amazon CloudFormation stack may take 10 to 15 minutes to launch depending on the Region you are launching in. Once you see CREATE_COMPLETE status for the entire stack, you are ready to continue.

Working with Managed Service for Apache Flink Studio notebook

Studio notebooks for Kinesis Data Analytics allow you to interactively query data streams in real time, and easily build and run stream processing applications using standard SQL, Python, and Scala. With a few clicks in the Amazon Web Services Management Console, you can launch a serverless notebook to query data streams and get results in seconds.

A notebook is a web-based development environment. With notebooks, you get a simple interactive development experience combined with the advanced data stream processing capabilities provided by Apache Flink. Studio notebooks uses notebooks powered by Apache Zeppelin, and uses Apache Flink as the stream processing engine. Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets.

Apache Zeppelin provides your Studio notebooks with a complete suite of analytics tools, including the following:

  • Data Visualization

  • Exporting data to files

  • Controlling the output format for easier analysis

Using the notebook
  1. Go to the Amazon Web Services Management Console and choose Amazon Kinesis under the list of services.

  2. On the left-hand navigation page, choose Analytics applications and then choose Studio notebooks.

  3. Verify that the KinesisDataAnalyticsStudio notebook is running.

  4. Choose the notebook and then choose Open in Apache Zeppelin.

  5. Download the Data Producer Zeppelin Notebook file which you will use to read and load data into the Kinesis Stream.

  6. Import the Data Producer Zeppelin Notebook. Make sure to modify input STREAM_NAME and REGION in the notebook code. The input stream name can be found in the Amazon CloudFormation stack output.

  7. Execute Data Producer notebook by choosing the Run this paragraph button to insert sample data into the input Kinesis Data Stream.

  8. While the sample data loads, download MaskPhoneNumber-Interactive notebook, which will read input data, anonymize phone numbers from the input stream and store anonymized data into the output stream.

  9. Import the MaskPhoneNumber-interactive Zeppelin notebook.

  10. Execute each paragraph in the notebook.

    1. In paragraph 1, you import User Defined Function to anonymize phone numbers.

      %flink(parallelism=1) import com.mycompany.app.MaskPhoneNumber stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
    2. In the next paragraph, you create an in-memory table to read input stream data. Make sure stream name and Amazon region are correct.

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews; CREATE TABLE customer_reviews ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phone VARCHAR ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleInputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json');
    3. Check if data is loaded into the in-memory table.

      %flink.ssql(type=update) select * from customer_reviews
    4. Invoke the user defined function to anonymize the phone number.

      %flink.ssql(type=update) select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    5. Now that the phone numbers are masked, create a view with a masked number.

      %flink.ssql(type=update) DROP VIEW IF EXISTS sentiments_view; CREATE VIEW sentiments_view AS select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    6. Verify the data.

      %flink.ssql(type=update) select * from sentiments_view
    7. Create in-memory table for the output Kinesis Stream. Make sure stream name and Amazon Region are correct.

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews_stream_table; CREATE TABLE customer_reviews_stream_table ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phoneNumber varchar ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleOutputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json');
    8. Insert updated records in the target Kinesis Stream.

      %flink.ssql(type=update) INSERT INTO customer_reviews_stream_table SELECT customer_id, product, review, phoneNumber FROM sentiments_view
    9. View and verify data from the target Kinesis Stream.

      %flink.ssql(type=update) select * from customer_reviews_stream_table

Promoting a notebook as an application

Now that you have tested your notebook code interactively, you will deploy the code as a streaming application with durable state. You will need to first modify Application configuration to specify a location for your code in Amazon S3.

  1. On the Amazon Web Services Management Console, choose your notebook and in Deploy as application configuration - optional, choose Edit.

  2. Under Destination for code in Amazon S3, choose the Amazon S3 bucket that was created by the Amazon CloudFormation scripts. The process may take a few minutes.

  3. You won't be able to promote the note as is. If you try, you will an error as Select statements are not supported. To avert this issue, download the MaskPhoneNumber-Streaming Zeppelin Notebook.

  4. Import MaskPhoneNumber-streaming Zeppelin Notebook.

  5. Open the note and choose Actions for KinesisDataAnalyticsStudio.

  6. Choose Build MaskPhoneNumber-Streaming and export to S3. Make sure to rename Application Name and include no special characters.

  7. Choose Build and Export. This will take few minutes to setup Streaming Application.

  8. Once the build is complete, choose Deploy using Amazon console.

  9. On the next page, review settings and make sure to choose the correct IAM role. Next, choose Create streaming application.

  10. After few minutes, you would see message that the streaming application was created successfully.

For more information on deploying applications with durable state and limits, see Deploying as an application with durable state.

Cleanup

Optionally, you can now uninstall the Amazon CloudFormation stack. This will remove all the services which you set up in previously.