Export data streams to the Amazon Web Services Cloud (console) - Amazon IoT Greengrass
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Amazon IoT Greengrass Version 1 entered the extended life phase on June 30, 2023. For more information, see the Amazon IoT Greengrass V1 maintenance policy. After this date, Amazon IoT Greengrass V1 won't release updates that provide features, enhancements, bug fixes, or security patches. Devices that run on Amazon IoT Greengrass V1 won't be disrupted and will continue to operate and to connect to the cloud. We strongly recommend that you migrate to Amazon IoT Greengrass Version 2, which adds significant new features and support for additional platforms.

Export data streams to the Amazon Web Services Cloud (console)

This tutorial shows you how to use the Amazon IoT console to configure and deploy an Amazon IoT Greengrass group with stream manager enabled. The group contains a user-defined Lambda function that writes to a stream in stream manager, which is then exported automatically to the Amazon Web Services Cloud.

Stream manager makes ingesting, processing, and exporting high-volume data streams more efficient and reliable. In this tutorial, you create a TransferStream Lambda function that consumes IoT data. The Lambda function uses the Amazon IoT Greengrass Core SDK to create a stream in stream manager and then read and write to it. Stream manager then exports the stream to Kinesis Data Streams. The following diagram shows this workflow.


      Diagram of the stream management workflow.

The focus of this tutorial is to show how user-defined Lambda functions use the StreamManagerClient object in the Amazon IoT Greengrass Core SDK to interact with stream manager. For simplicity, the Python Lambda function that you create for this tutorial generates simulated device data.

Prerequisites

To complete this tutorial, you need:

  • A Greengrass group and a Greengrass core (v1.10 or later). For information about how to create a Greengrass group and core, see Getting started with Amazon IoT Greengrass. The Getting Started tutorial also includes steps for installing the Amazon IoT Greengrass Core software.

    Note

    Stream manager is not supported on OpenWrt distributions.

  • The Java 8 runtime (JDK 8) installed on the core device.

    • For Debian-based distributions (including Raspbian) or Ubuntu-based distributions, run the following command:

      sudo apt install openjdk-8-jdk
    • For Red Hat-based distributions (including Amazon Linux), run the following command:

      sudo yum install java-1.8.0-openjdk

      For more information, see How to download and install prebuilt OpenJDK packages in the OpenJDK documentation.

  • Amazon IoT Greengrass Core SDK for Python v1.5.0 or later. To use StreamManagerClient in the Amazon IoT Greengrass Core SDK for Python, you must:

    • Install Python 3.7 or later on the core device.

    • Include the SDK and its dependencies in your Lambda function deployment package. Instructions are provided in this tutorial.

    Tip

    You can use StreamManagerClient with Java or NodeJS. For example code, see the Amazon IoT Greengrass Core SDK for Java and Amazon IoT Greengrass Core SDK for Node.js on GitHub.

  • A destination stream named MyKinesisStream created in Amazon Kinesis Data Streams in the same Amazon Web Services Region as your Greengrass group. For more information, see Create a stream in the Amazon Kinesis Developer Guide.

    Note

    In this tutorial, stream manager exports data to Kinesis Data Streams, which results in charges to your Amazon Web Services account. For information about pricing, see Kinesis Data Streams pricing.

    To avoid incurring charges, you can run this tutorial without creating a Kinesis data stream. In this case, you check the logs to see that stream manager attempted to export the stream to Kinesis Data Streams.

  • An IAM policy added to the Greengrass group role that allows the kinesis:PutRecords action on the target data stream, as shown in the following example:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws-cn:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }

The tutorial contains the following high-level steps:

The tutorial should take about 20 minutes to complete.

Step 1: Create a Lambda function deployment package

In this step, you create a Lambda function deployment package that contains Python function code and dependencies. You upload this package later when you create the Lambda function in Amazon Lambda. The Lambda function uses the Amazon IoT Greengrass Core SDK to create and interact with local streams.

Note

Your user-defined Lambda functions must use the Amazon IoT Greengrass Core SDK to interact with stream manager. For more information about requirements for the Greengrass stream manager, see Greengrass stream manager requirements.

  1. Download the Amazon IoT Greengrass Core SDK for Python v1.5.0 or later.

  2. Unzip the downloaded package to get the SDK. The SDK is the greengrasssdk folder.

  3. Install package dependencies to include with the SDK in your Lambda function deployment package.

    1. Navigate to the SDK directory that contains the requirements.txt file. This file lists the dependencies.

    2. Install the SDK dependencies. For example, run the following pip command to install them in the current directory:

      pip install --target . -r requirements.txt
  4. Save the following Python code function in a local file named transfer_stream.py.

    Tip

    For example code that uses Java and NodeJS, see the Amazon IoT Greengrass Core SDK for Java and Amazon IoT Greengrass Core SDK for Node.js on GitHub.

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. Zip the following items into a file named transfer_stream_python.zip. This is your Lambda function deployment package.

    • transfer_stream.py. App logic.

    • greengrasssdk. Required library for Python Greengrass Lambda functions that publish MQTT messages.

      Stream manager operations are available in version 1.5.0 or later of the Amazon IoT Greengrass Core SDK for Python.

    • The dependencies you installed for the Amazon IoT Greengrass Core SDK for Python (for example, the cbor2 directories).

    When you create the zip file, include only these items, not the containing folder.

Step 2: Create a Lambda function

In this step, you use the Amazon Lambda console to create a Lambda function and configure it to use your deployment package. Then, you publish a function version and create an alias.

  1. First, create the Lambda function.

    1. In the Amazon Web Services Management Console, choose Services, and open the Amazon Lambda console.

    2. Choose Create function and then choose Author from scratch.

    3. In the Basic information section, use the following values:

      • For Function name, enter TransferStream.

      • For Runtime, choose Python 3.7.

      • For Permissions, keep the default setting. This creates an execution role that grants basic Lambda permissions. This role isn't used by Amazon IoT Greengrass.

    4. At the bottom of the page, choose Create function.

  2. Next, register the handler and upload your Lambda function deployment package.

    1. On the Code tab, under Code source, choose Upload from. From the dropdown, choose .zip file.

      
                The Upload from dropdown with .zip file highlighted.
    2. Choose Upload, and then choose your transfer_stream_python.zip deployment package. Then, choose Save.

    3. On the Code tab for the function, under Runtime settings, choose Edit, and then enter the following values.

      • For Runtime, choose Python 3.7.

      • For Handler, enter transfer_stream.function_handler

    4. Choose Save.

      Note

      The Test button on the Amazon Lambda console doesn't work with this function. The Amazon IoT Greengrass Core SDK doesn't contain modules that are required to run your Greengrass Lambda functions independently in the Amazon Lambda console. These modules (for example, greengrass_common) are supplied to the functions after they are deployed to your Greengrass core.

  3. Now, publish the first version of your Lambda function and create an alias for the version.

    Note

    Greengrass groups can reference a Lambda function by alias (recommended) or by version. Using an alias makes it easier to manage code updates because you don't have to change your subscription table or group definition when the function code is updated. Instead, you just point the alias to the new function version.

    1. From the Actions menu, choose Publish new version.

    2. For Version description, enter First version, and then choose Publish.

    3. On the TransferStream: 1 configuration page, from the Actions menu, choose Create alias.

    4. On the Create a new alias page, use the following values:

      • For Name, enter GG_TransferStream.

      • For Version, choose 1.

      Note

      Amazon IoT Greengrass doesn't support Lambda aliases for $LATEST versions.

    5. Choose Create.

Now you're ready to add the Lambda function to your Greengrass group.

Step 3: Add a Lambda function to the Greengrass group

In this step, you add the Lambda function to the group and then configure its lifecycle and environment variables. For more information, see Controlling execution of Greengrass Lambda functions by using group-specific configuration.

  1. In the Amazon IoT console navigation pane, under Manage, expand Greengrass devices, and then choose Groups (V1).

  2. Choose the target group.

  3. On the group configuration page, choose the Lambda functions tab.

  4. Under My Lambda functions, choose Add.

  5. On the Add Lambda function page, choose the Lambda function for your Lambda function.

  6. For the Lambda version, choose Alias:GG_TransferStream.

    Now, configure properties that determine the behavior of the Lambda function in the Greengrass group.

  7. In the Lambda function configuration section, make the following changes:

    • Set Memory limit to 32 MB.

    • For Pinned, choose True.

    Note

    A long-lived (or pinned) Lambda function starts automatically after Amazon IoT Greengrass starts and keeps running in its own container. This is in contrast to an on-demand Lambda function, which starts when invoked and stops when there are no tasks left to run. For more information, see Lifecycle configuration for Greengrass Lambda functions.

  8. Choose Add Lambda function.

Step 4: Enable stream manager

In this step, you make sure that stream manager is enabled.

  1. On the group configuration page, choose the Lambda functions tab.

  2. Under System Lambda functions, select Stream manager, and check the status. If disabled, choose Edit. Then, choose Enable and Save. You can use the default parameter settings for this tutorial. For more information, see Configure Amazon IoT Greengrass stream manager.

Note

When you use the console to enable stream manager and deploy the group, the memory size for stream manager is set to 4194304 KB (4 GB) by default. We recommend that you set the memory size to at least 128000 KB.

Step 5: Configure local logging

In this step, you configure Amazon IoT Greengrass system components, user-defined Lambda functions, and connectors in the group to write logs to the file system of the core device. You can use logs to troubleshoot any issues you might encounter. For more information, see Monitoring with Amazon IoT Greengrass logs.

  1. Under Local logs configuration, check if local logging is configured.

  2. If logs aren't configured for Greengrass system components or user-defined Lambda functions, choose Edit.

  3. Choose User Lambda functions log level and Greengrass system log level.

  4. Keep the default values for logging level and disk space limit, and then choose Save.

Step 6: Deploy the Greengrass group

Deploy the group to the core device.

  1. Make sure that the Amazon IoT Greengrass core is running. Run the following commands in your Raspberry Pi terminal, as needed.

    1. To check whether the daemon is running:

      ps aux | grep -E 'greengrass.*daemon'

      If the output contains a root entry for /greengrass/ggc/packages/ggc-version/bin/daemon, then the daemon is running.

      Note

      The version in the path depends on the Amazon IoT Greengrass Core software version that's installed on your core device.

    2. To start the daemon:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. On the group configuration page, choose Deploy.

    1. In the Lambda functions tab, under the System Lambda functions section, select IP detector and choose Edit.

    2. In the Edit IP detector settings dialog box, select Automatically detect and override MQTT broker endpoints.

    3. Choose Save.

      This enables devices to automatically acquire connectivity information for the core, such as IP address, DNS, and port number. Automatic detection is recommended, but Amazon IoT Greengrass also supports manually specified endpoints. You're only prompted for the discovery method the first time that the group is deployed.

      Note

      If prompted, grant permission to create the Greengrass service role and associate it with your Amazon Web Services account in the current Amazon Web Services Region. This role allows Amazon IoT Greengrass to access your resources in Amazon services.

      The Deployments page shows the deployment timestamp, version ID, and status. When completed, the status displayed for the deployment should be Completed.

      For troubleshooting help, see Troubleshooting Amazon IoT Greengrass.

Step 7: Test the application

The TransferStream Lambda function generates simulated device data. It writes data to a stream that stream manager exports to the target Kinesis data stream.

  1. In the Amazon Kinesis console, under Kinesis data streams, choose MyKinesisStream.

    Note

    If you ran the tutorial without a target Kinesis data stream, check the log file for the stream manager (GGStreamManager). If it contains export stream MyKinesisStream doesn't exist in an error message, then the test is successful. This error means that the service tried to export to the stream but the stream doesn't exist.

  2. On the MyKinesisStream page, choose Monitoring. If the test is successful, you should see data in the Put Records charts. Depending on your connection, it might take a minute before the data is displayed.

    Important

    When you're finished testing, delete the Kinesis data stream to avoid incurring more charges.

    Or, run the following commands to stop the Greengrass daemon. This prevents the core from sending messages until you're ready to continue testing.

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. Remove the TransferStream Lambda function from the core.

    1. In the Amazon IoT console navigation pane, under Manage, expand Greengrass devices, and then choose Groups (V1).

    2. Under Greengrass groups, choose your group.

    3. On the Lambdas page, choose the ellipses () for the TransferStream function, and then choose Remove function.

    4. From Actions, choose Deploy.

To view logging information or troubleshoot issues with streams, check the logs for the TransferStream and GGStreamManager functions. You must have root permissions to read Amazon IoT Greengrass logs on the file system.

  • TransferStream writes log entries to greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log.

  • GGStreamManager writes log entries to greengrass-root/ggc/var/log/system/GGStreamManager.log.

If you need more troubleshooting information, you can set the logging level for User Lambda logs to Debug logs and then deploy the group again.

See also