将流式数据加载到 Amazon OpenSearch Service - Amazon Opensearch Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

将流式数据加载到 Amazon OpenSearch Service

可以从许多不同的源将流数据上载到 Amazon OpenSearch Service 阈。有些资源(如 Amazon Kinesis Data Firehose 和 Amazon CloudWatch Logs)具有针对 OpenSearch Service 的内置支持。其他用户(如 Amazon S3、Amazon Kinesis Data Streams 和 Amazon DynamoDB)使用 Amazon Lambda 函数作为事件处理程序。Lambda 函数响应新数据的方式是处理数据并将其流式传输到域。

注意

Lambda 支持多种常用编程语言,并且在大多数 Amazon Web Services 区域 中都可用。有关更多信息,请参阅 Amazon Lambda 开发人员指南中的 Lambda 入门Amazon一般参考Amazon 服务端点

从 Amazon S3 表中加载流数据

您可以使用 Lambda 从 Amazon S3 将数据发送到您的 OpenSearch Service 域。到达 S3 存储桶的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。

这种流式传输数据的方式极其灵活。可以为对象元数据编制索引,或者如果对象是纯文本,则对对象正文的部分元素进行解析和编制索引。此节包含一些简单的 Python 示例代码,这些代码使用正则表达式解析日志文件并为匹配项编制索引。

先决条件

继续操作之前,必须具有以下资源。

先决条件 描述
Amazon S3 存储桶 有关更多信息,请参阅 Amazon Simple Storage Service 用户指南中的创建您的第一个 S3 存储桶。存储桶必须与 OpenSearch Service 域位于同一个区域。
OpenSearch Service 域 Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch Service 域

创建 Lambda 部署程序包

部署程序包为 ZIP 或 JAR 文件,其中包含代码及其依赖项。此节包括 Python 示例代码。对于其他编程语言,请参阅 Amazon Lambda 开发人员指南中的 Lambda 部署程序包

  1. 创建目录。在此示例中,我们使用名称 s3-to-opensearch

  2. 在名为 sample.py 的目录中创建一个文件:

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' type = '_doc' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    编辑 regionhost 的变量。

  3. 安装 pip(如果您尚未安装,则将依赖项安装到 package 目录:

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    所有 Lambda 执行环境都已安装 Boto3,因此无需将其包含在部署程序包中。

  4. 打包应用程序代码和依赖项:

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

创建 Lambda 函数

创建部署程序包之后,可以创建 Lambda 函数。创建函数时,选择名称、运行时 (例如,Python 3.8) 和 IAM 角色。IAM 角色定义对函数的权限。有关详细说明,请参阅 Amazon Lambda 开发人员指南中的通过控制台创建 Lambda 函数

此示例假定使用的是控制台。选择 Python 3.9 具有 S3 读取权限和 OpenSearch Service 写入权限的角色,如以下屏幕截图中所示:


                    Lambda 函数的示例配置

在创建此函数后,必须添加一个触发器。在此示例中,我们希望代码在日志文件到达 S3 存储桶中时执行:

  1. 选择添加触发器并选择 S3

  2. 选择存储桶。

  3. 对于 Event type (事件类型),选择 PUT

  4. 对于 Prefix (前缀),键入 logs/

  5. 对于后缀,键入 .log

  6. 确认递归调用警告,然后选择添加

最后,可以上传部署程序包:

  1. 选择上载自.zip 文件,然后按照提示上传部署程序包。

  2. 上载完成后,编辑 Runtime 设置并更改处理程序sample.handler。此设置告知 Lambda 在触发之后应执行的文件 (sample.py) 和方法 (handler)。

此时,您具有一整套资源:存储日志文件的存储桶、日志文件添加到存储桶时执行的函数、执行解析和编制索引的代码以及搜索和可视化的 OpenSearch Service 域。

测试 Lambda 函数

在创建此函数之后,可以通过将文件上传到 Amazon S3 存储桶来测试此函数。使用以下示例日志行创建一个名为 sample.log 的文件:

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

将文件上传到 S3 存储桶的 logs 文件夹。有关说明,请参阅 Amazon Simple Storage Service 用户指南中的将对象上传到存储桶

然后使用 OpenSearch Service 控制台或 OpenSearch 控制面板以验证 lambda-s3-index 索引是否包含两个文档。还可以发出标准搜索请求:

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

从 Amazon Kinesis Data Streams 加载流数据

您可以从 Kinesis Data Streams 将流数据加载到 OpenSearch Service。到达此数据流的新数据将向 Lambda 触发事件通知,这将运行自定义代码以执行索引编制。此节包括一些简单的 Python 示例代码。

先决条件

继续操作之前,必须具有以下资源。

先决条件 描述
Amazon Kinesis Data Stream Lambda 函数的事件源。要了解更多信息,请参阅 Kinesis Data Streams
OpenSearch Service 域 Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch Service 域
IAM 角色

此角色必须具有基本的 OpenSearch Service、Kinesis 和 Lambda 权限,如以下内容:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

角色必须拥有以下信任关系:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

要了解更多信息,请参阅 IAM 用户手册中的创建 IAM 角色

创建 Lambda 函数

按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 kinesis-to-opensearch 的目录并对 sample.py 使用以下代码:

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

编辑 regionhost 的变量。

安装 pip——如果您尚未安装,则使用以下命令安装依赖项:

cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

然后按照创建 Lambda 函数中的说明操作,但指定先决条件中的 IAM 角色和以下触发器设置:

  • Kinesis stream:您的 Kinesis stream

  • 批处理大小:100

  • 起始位置:时间范围

有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的什么是 Amazon Kinesis Data Streams?

此时,您具有一整套资源:Kinesis 数据流、在流收到新数据并为该数据编制索引后执行的函数、用于搜索和可视化的 OpenSearch Service 域。

测试 Lambda 函数

创建此函数后,可以通过使用 Amazon CLI 将新记录添加到数据流来测试它:

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

然后使用 OpenSearch Service 控制台或 OpenSearch 控制面板以验证 lambda-kine-index 是否包含一个文档。还可使用以下请求:

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

从 Amazon DynamoDB 表中加载流数据

您可以使用 Amazon Lambda 从 Amazon DynamoDB 将数据发送到您的 OpenSearch Service 域。到达数据库表的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。

先决条件

继续操作之前,必须具有以下资源。

先决条件 描述
DynamoDB 表

此表包含源数据。有关更多信息,请参阅 Amazon DynamoDB 开发人员指南中的 DynamoDB Tables 中的基本操作

此表必须与 OpenSearch Service 域驻留在同一个区域并且流设置为新映像。要了解更多信息,请参阅启用流

OpenSearch Service 域 Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch Service 域
IAM 角色

此角色必须具有基本的 OpenSearch Service、DynamoDB 和 Lambda 执行权限,如以下内容:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

角色必须拥有以下信任关系:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

要了解更多信息,请参阅 IAM 用户手册中的创建 IAM 角色

创建 Lambda 函数

按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 ddb-to-opensearch 的目录并对 sample.py 使用以下代码:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

编辑 regionhost 的变量。

安装 pip——如果您尚未安装,则使用以下命令安装依赖项:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

然后按照创建 Lambda 函数中的说明操作,但指定先决条件中的 IAM 角色和以下触发器设置:

  • :DynamoDB 表

  • 批处理大小:100

  • 起始位置:时间范围

要了解更多信息,请参阅 Amazon DynamoDB 开发人员指南使用 DynamoDB Streams 和 Lambda 处理新项目

此时,您具有一整套资源:源数据的 DynamoDB 表、表更改的 DynamoDB 流、在源数据更改并为这些更改编制索引之后执行的函数以及用于搜索和可视化的 OpenSearch Service 域。

测试 Lambda 函数

创建此函数后,可以通过使用 Amazon CLI 将新项目添加到 DynamoDB 表来测试它:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

然后使用 OpenSearch Service 控制台或 OpenSearch 控制面板以验证 lambda-index 是否包含一个文档。还可使用以下请求:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

从 Amazon Kinesis Data Firehose 加载流数据

Kinesis Data Firehose 支持 OpenSearch Service 作为交付目标。有关如何将流数据加载到 OpenSearch Service 的说明,请参阅 Amazon Kinesis Data Firehose 开发人员指南中的创建 Kinesis Data Firehose 传输流选择 OpenSearch Service 作为目标

在将数据加载到 OpenSearch Service 之前,可能需要对数据执行转换。要了解有关使用 Lambda 函数执行此任务的更多信息,请参阅此同一指南中的 Amazon Kinesis Data Firehose 数据转换

当您配置交付流时,Kinesis Data Firehose 管具有“一键式”IAM 角色,该角色为其提供向 OpenSearch Service 发送数据、Amazon S3 上备份数据以及使用 Lambda 转换数据所需的资源访问。由于手动创建此类角色的过程非常复杂,我们建议使用提供的角色。

从 Amazon CloudWatch 中加载流数据

您可以使用 CloudWatch Logs 订阅从 CloudWatch Logs 将流数据加载到您的 OpenSearch Service 域。有关 Amazon CloudWatch 订阅的信息,请参阅用订阅实时处理日志数据。有关配置信息,请参阅 Amazon CloudWatch 开发人员指南中的将CloudWatch Logs 数据流式传输到 Amazon OpenSearch Service

从 Amazon IoT 表中加载流数据

您可以使用Amazon IoT规则 发送数据。如需了解更多信息,请参阅 Amazon IoT 开发人员指南中的 OpenSearch 操作。