Loading streaming data into Amazon OpenSearch Service
You can use OpenSearch Ingestion to directly load streaming data
You can still use other sources to load streaming data, such as Amazon Data Firehose and Amazon CloudWatch Logs, which have built-in support for OpenSearch Service. Others, like Amazon S3, Amazon Kinesis Data Streams, and Amazon DynamoDB, use Amazon Lambda functions as event handlers. The Lambda functions respond to new data by processing it and streaming it to your domain.
Note
Lambda supports several popular programming languages and is available in most Amazon Web Services Regions. For more information, see Getting started with Lambda in the Amazon Lambda Developer Guide and Amazon service endpoints in the Amazon Web Services General Reference.
Topics
- Loading streaming data from OpenSearch Ingestion
- Loading streaming data from Amazon S3
- Loading streaming data from Amazon Kinesis Data Streams
- Loading streaming data from Amazon DynamoDB
- Loading streaming data from Amazon Data Firehose
- Loading streaming data from Amazon CloudWatch
- Loading streaming data from Amazon IoT
Loading streaming data from OpenSearch Ingestion
You can use Amazon OpenSearch Ingestion to load data into an OpenSearch Service domain. You configure your data producers to send data to OpenSearch Ingestion, and it automatically delivers the data to the collection that you specify. You can also configure OpenSearch Ingestion to transform your data before delivering it. For more information, see Amazon OpenSearch Ingestion.
Loading streaming data from Amazon S3
You can use Lambda to send data to your OpenSearch Service domain from Amazon S3. New data that arrives in an S3 bucket triggers an event notification to Lambda, which then runs your custom code to perform the indexing.
This method of streaming data is extremely flexible. You can index object metadata
Prerequisites
Before proceeding, you must have the following resources.
Prerequisite | Description |
---|---|
Amazon S3 bucket | For more information, see Create your first S3 bucket in the Amazon Simple Storage Service User Guide. The bucket must reside in the same Region as your OpenSearch Service domain. |
OpenSearch Service domain | The destination for data after your Lambda function processes it. For more information, see Creating OpenSearch Service domains. |
Create the Lambda deployment package
Deployment packages are ZIP or JAR files that contain your code and its dependencies. This section includes Python sample code. For other programming languages, see Lambda deployment packages in the Amazon Lambda Developer Guide.
-
Create a directory. In this sample, we use the name
s3-to-opensearch
. -
Create a file within the directory named
sample.py
:import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)
Edit the variables for
region
andhost
. -
Install pip
if you haven't already, then install the dependencies to a new package
directory:cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
All Lambda execution environments have Boto3
installed, so you don't need to include it in your deployment package. -
Package the application code and dependencies:
cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py
Create the Lambda function
After you create the deployment package, you can create the Lambda function. When you create a function, choose a name, runtime (for example, Python 3.8), and IAM role. The IAM role defines the permissions for your function. For detailed instructions, see Create a Lambda function with the console in the Amazon Lambda Developer Guide.
This example assumes you're using the console. Choose Python 3.9 and a role that has S3 read permissions and OpenSearch Service write permissions, as shown in the following screenshot:
After you create the function, you must add a trigger. For this example, we want the code to run whenever a log file arrives in an S3 bucket:
-
Choose Add trigger and select S3.
-
Choose your bucket.
-
For Event type, choose PUT.
-
For Prefix, type
logs/
. -
For Suffix, type
.log
. -
Acknowledge the recursive invocation warning and choose Add.
Finally, you can upload your deployment package:
-
Choose Upload from and .zip file, then follow the prompts to upload your deployment package.
-
After the upload finishes, edit the Runtime settings and change the Handler to
sample.handler
. This setting tells Lambda the file (sample.py
) and method (handler
) that it should run after a trigger.
At this point, you have a complete set of resources: a bucket for log files, a function that runs whenever a log file is added to the bucket, code that performs the parsing and indexing, and an OpenSearch Service domain for searching and visualization.
Testing the Lambda Function
After you create the function, you can test it by uploading a file to the Amazon S3
bucket. Create a file named sample.log
using following sample log
lines:
12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"
Upload the file to the logs
folder of your S3 bucket. For
instructions, see Upload an
object to your bucket in the Amazon Simple Storage Service User Guide.
Then use the OpenSearch Service console or OpenSearch Dashboards to verify that the
lambda-s3-index
index contains two documents. You can also make a
standard search request:
GET https://domain-name
/lambda-s3-index/_search?pretty
{
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "lambda-s3-index",
"_type" : "_doc",
"_id" : "vTYXaWIBJWV_TTkEuSDg",
"_score" : 1.0,
"_source" : {
"ip" : "12.345.678.91",
"message" : "GET /some-file.jpg",
"timestamp" : "10/Oct/2000:14:56:14 -0700"
}
},
{
"_index" : "lambda-s3-index",
"_type" : "_doc",
"_id" : "vjYmaWIBJWV_TTkEuCAB",
"_score" : 1.0,
"_source" : {
"ip" : "12.345.678.90",
"message" : "PUT /some-file.jpg",
"timestamp" : "10/Oct/2000:13:55:36 -0700"
}
}
]
}
}
Loading streaming data from Amazon Kinesis Data Streams
You can load streaming data from Kinesis Data Streams to OpenSearch Service. New data that arrives in the data stream triggers an event notification to Lambda, which then runs your custom code to perform the indexing. This section includes some unsophisticated Python sample code.
Prerequisites
Before proceeding, you must have the following resources.
Prerequisite | Description |
---|---|
Amazon Kinesis Data Stream | The event source for your Lambda function. To learn more, see
Kinesis Data Streams |
OpenSearch Service Domain | The destination for data after your Lambda function processes it. For more information, see Creating OpenSearch Service domains |
IAM Role |
This role must have basic OpenSearch Service, Kinesis, and Lambda permissions, such as the following:
The role must have the following trust relationship:
To learn more, see Creating IAM roles in the IAM User Guide. |
Create the Lambda function
Follow the instructions in Create the
Lambda deployment package, but create a directory
named kinesis-to-opensearch
and use the following code for
sample.py
:
import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'
Edit the variables for region
and host
.
Install pip
cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
Then follow the instructions in Create the Lambda function, but specify the IAM role from Prerequisites and the following settings for the trigger:
-
Kinesis stream: your Kinesis stream
-
Batch size: 100
-
Starting position: Trim horizon
To learn more, see What is Amazon Kinesis Data Streams? in the Amazon Kinesis Data Streams Developer Guide.
At this point, you have a complete set of resources: a Kinesis data stream, a function that runs after the stream receives new data and indexes that data, and an OpenSearch Service domain for searching and visualization.
Test the Lambda Function
After you create the function, you can test it by adding a new record to the data stream using the Amazon CLI:
aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region
us-west-1
Then use the OpenSearch Service console or OpenSearch Dashboards to verify that
lambda-kine-index
contains a document. You can also use the
following request:
GET https://domain-name
/lambda-kine-index/_search
{
"hits" : [
{
"_index": "lambda-kine-index",
"_type": "_doc",
"_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042",
"_score": 1,
"_source": {
"timestamp": 1523648740.051,
"message": "My test data.",
"id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042"
}
}
]
}
Loading streaming data from Amazon DynamoDB
You can use Amazon Lambda to send data to your OpenSearch Service domain from Amazon DynamoDB. New data that arrives in the database table triggers an event notification to Lambda, which then runs your custom code to perform the indexing.
Prerequisites
Before proceeding, you must have the following resources.
Prerequisite | Description |
---|---|
DynamoDB table | The table contains your source data. For more information, see Basic Operations on DynamoDB Tables in the Amazon DynamoDB Developer Guide. The table must reside in the same Region as your OpenSearch Service domain and have a stream set to New image. To learn more, see Enabling a Stream. |
OpenSearch Service domain | The destination for data after your Lambda function processes it. For more information, see Creating OpenSearch Service domains. |
IAM role | This role must have basic OpenSearch Service, DynamoDB, and Lambda execution permissions, such as the following:
The role must have the following trust relationship:
To learn more, see Creating IAM roles in the IAM User Guide. |
Create the Lambda function
Follow the instructions in Create the
Lambda deployment package, but create a directory
named ddb-to-opensearch
and use the following code for
sample.py
:
import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'
Edit the variables for region
and host
.
Install pip
cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
Then follow the instructions in Create the Lambda function, but specify the IAM role from Prerequisites and the following settings for the trigger:
-
Table: your DynamoDB table
-
Batch size: 100
-
Starting position: Trim horizon
To learn more, see Process New Items with DynamoDB Streams and Lambda in the Amazon DynamoDB Developer Guide.
At this point, you have a complete set of resources: a DynamoDB table for your source data, a DynamoDB stream of changes to the table, a function that runs after your source data changes and indexes those changes, and an OpenSearch Service domain for searching and visualization.
Test the Lambda function
After you create the function, you can test it by adding a new item to the DynamoDB table using the Amazon CLI:
aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region
us-west-1
Then use the OpenSearch Service console or OpenSearch Dashboards to verify that
lambda-index
contains a document. You can also use the following
request:
GET https://domain-name
/lambda-index/_doc/00001
{
"_index": "lambda-index",
"_type": "_doc",
"_id": "00001",
"_version": 1,
"found": true,
"_source": {
"director": {
"S": "Kevin Costner"
},
"id": {
"S": "00001"
},
"title": {
"S": "The Postman"
}
}
}
Loading streaming data from Amazon Data Firehose
Firehose supports OpenSearch Service as a delivery destination. For instructions about how to load streaming data into OpenSearch Service, see Creating a Kinesis Data Firehose Delivery Stream and Choose OpenSearch Service for Your Destination in the Amazon Data Firehose Developer Guide.
Before you load data into OpenSearch Service, you might need to perform transforms on the data. To learn more about using Lambda functions to perform this task, see Amazon Kinesis Data Firehose Data Transformation in the same guide.
As you configure a delivery stream, Firehose features a "one-click" IAM role that gives it the resource access it needs to send data to OpenSearch Service, back up data on Amazon S3, and transform data using Lambda. Because of the complexity involved in creating such a role manually, we recommend using the provided role.
Loading streaming data from Amazon CloudWatch
You can load streaming data from CloudWatch Logs to your OpenSearch Service domain by using a CloudWatch Logs
subscription. For information about Amazon CloudWatch subscriptions, see
Real-time processing of log data with subscriptions
Loading streaming data from Amazon IoT
You can send data from Amazon IoT using rules. To learn more, see the OpenSearch action in the Amazon IoT Developer Guide.