本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将流数据加载到 Amazon OpenSearch 服务
您可以使用 OpenSearch Ingestion 将流数据
您仍然可以使用其他来源加载流数据,例如 Amazon Data Firehose 和 Amazon CloudWatch Logs,它们内置了对 OpenSearch 服务的支持。其他用户(如 Amazon S3、Amazon Kinesis Data Streams 和 Amazon DynamoDB)使用 Amazon Lambda 函数作为事件处理程序。Lambda 函数响应新数据的方式是处理数据并将其流式传输到域。
注意
Lambda 支持多种常用编程语言,并且在大多数 Amazon Web Services 区域中都可用。有关更多信息,请参阅 Amazon Lambda 开发人员指南中的 Lambda 入门和 Amazon Web Services 一般参考 中的 Amazon 服务端点。
主题
从 OpenSearch Ingestion 加载流数据
您可以使用 Amazon OpenSearch Ingestion 将数据加载到 OpenSearch 服务域中。您可以将数据生成器配置为将数据发送到 OpenSearch Ingestion,它会自动将数据传送到您指定的集合。您还可以将 OpenSearch Ingestion 配置为在交付数据之前对其进行转换。有关更多信息,请参阅Amazon OpenSearch Ingestion。
从 Amazon S3 表中加载流数据
您可以使用 Lambda 将数据从 Amazon S3 发送到您的 OpenSearch 服务域。到达 S3 存储桶的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。
这种流式传输数据的方式极其灵活。可以为对象元数据编制索引
先决条件
继续操作之前,必须具有以下资源。
先决条件 | 描述 |
---|---|
Amazon S3 存储桶 | 有关更多信息,请参阅 Amazon Simple Storage Service 用户指南中的创建您的第一个 S3 存储桶。存储桶必须与您的 OpenSearch 服务域位于同一区域。 |
OpenSearch 服务域 | Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch 服务域。 |
创建 Lambda 部署程序包
部署程序包为 ZIP 或 JAR 文件,其中包含代码及其依赖项。此节包括 Python 示例代码。对于其他编程语言,请参阅 Amazon Lambda 开发人员指南中的 Lambda 部署程序包。
-
创建目录。在此示例中,我们使用名称
s3-to-opensearch
。 -
在名为
sample.py
的目录中创建一个文件:import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)
编辑
region
和host
的变量。 -
安装 pip
(如果您尚未安装,则将依赖项安装到 package
目录:cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
所有 Lambda 执行环境都已安装 Boto3
,因此无需将其包含在部署程序包中。 -
打包应用程序代码和依赖项:
cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py
创建 Lambda 函数
创建部署程序包之后,可以创建 Lambda 函数。创建函数时,选择名称、运行时 (例如,Python 3.8) 和 IAM 角色。IAM 角色定义对函数的权限。有关详细说明,请参阅 Amazon Lambda 开发人员指南中的通过控制台创建 Lambda 函数。
此示例假定使用的是控制台。选择 Python 3.9 以及具有 S3 读取权限和 OpenSearch 服务写入权限的角色,如以下屏幕截图所示:
在创建此函数后,必须添加一个触发器。在此示例中,我们希望代码在日志文件到达 S3 存储桶中时执行:
-
选择添加触发器并选择 S3。
-
选择存储桶。
-
对于 Event type (事件类型),选择 PUT。
-
对于 Prefix (前缀),键入
logs/
。 -
对于后缀,键入
.log
。 -
确认递归调用警告,然后选择添加。
最后,可以上传部署程序包:
-
选择上载自和 .zip 文件,然后按照提示上传部署程序包。
-
上载完成后,编辑 Runtime 设置并更改处理程序为
sample.handler
。此设置告知 Lambda 在触发之后应执行的文件 (sample.py
) 和方法 (handler
)。
此时,您拥有一整套资源:一个用于存储日志文件的存储桶、一个在向存储桶中添加日志文件时运行的函数、执行解析和索引的代码,以及一个用于搜索和可视化的 OpenSearch 服务域。
测试 Lambda 函数
在创建此函数之后,可以通过将文件上传到 Amazon S3 存储桶来测试此函数。使用以下示例日志行创建一个名为 sample.log
的文件:
12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"
将文件上传到 S3 存储桶的 logs
文件夹。有关说明,请参阅 Amazon Simple Storage Service 用户指南中的将对象上传到存储桶。
然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证lambda-s3-index
索引是否包含两个文档。还可以发出标准搜索请求:
GET https://domain-name
/lambda-s3-index/_search?pretty
{
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "lambda-s3-index",
"_type" : "_doc",
"_id" : "vTYXaWIBJWV_TTkEuSDg",
"_score" : 1.0,
"_source" : {
"ip" : "12.345.678.91",
"message" : "GET /some-file.jpg",
"timestamp" : "10/Oct/2000:14:56:14 -0700"
}
},
{
"_index" : "lambda-s3-index",
"_type" : "_doc",
"_id" : "vjYmaWIBJWV_TTkEuCAB",
"_score" : 1.0,
"_source" : {
"ip" : "12.345.678.90",
"message" : "PUT /some-file.jpg",
"timestamp" : "10/Oct/2000:13:55:36 -0700"
}
}
]
}
}
从 Amazon Kinesis Data Streams 加载流数据
您可以将流数据从 Kinesis Data Streams 加载 OpenSearch 到服务。到达此数据流的新数据将向 Lambda 触发事件通知,这将运行自定义代码以执行索引编制。此节包括一些简单的 Python 示例代码。
先决条件
继续操作之前,必须具有以下资源。
先决条件 | 描述 |
---|---|
Amazon Kinesis Data Stream | Lambda 函数的事件源。要了解更多信息,请参阅 Kinesis Data Streams。 |
OpenSearch 服务域 | Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch 服务域 |
IAM 角色 |
此角色必须具有基本的 OpenSearch 服务、Kinesis 和 Lambda 权限,例如:
角色必须拥有以下信任关系:
要了解更多信息,请参阅 IAM 用户手册中的创建 IAM 角色。 |
创建 Lambda 函数
按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 kinesis-to-opensearch
的目录并对 sample.py
使用以下代码:
import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'
编辑 region
和 host
的变量。
安装 pip
cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
然后按照创建 Lambda 函数中的说明操作,但指定先决条件中的 IAM 角色和以下触发器设置:
-
Kinesis stream:您的 Kinesis stream
-
批处理大小:100
-
起始位置:时间范围
有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的什么是 Amazon Kinesis Data Streams?。
此时,您已拥有一整套资源:Kinesis 数据流、在流接收新数据并索引该数据之后运行的函数,以及用于搜索和可视化的 OpenSearch 服务域。
测试 Lambda 函数
创建此函数后,可以通过使用 Amazon CLI将新记录添加到数据流来测试它:
aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region
us-west-1
然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证是否lambda-kine-index
包含文档。还可使用以下请求:
GET https://domain-name
/lambda-kine-index/_search
{
"hits" : [
{
"_index": "lambda-kine-index",
"_type": "_doc",
"_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042",
"_score": 1,
"_source": {
"timestamp": 1523648740.051,
"message": "My test data.",
"id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042"
}
}
]
}
从 Amazon DynamoDB 表中加载流数据
您可以使用 Amazon Lambda 将数据从亚马逊 DynamoDB 发送到您的 OpenSearch 服务域。到达数据库表的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。
先决条件
继续操作之前,必须具有以下资源。
先决条件 | 描述 |
---|---|
DynamoDB 表 | 此表包含源数据。有关更多信息,请参阅 Amazon DynamoDB 开发人员指南中的 DynamoDB Tables 中的基本操作。 该表必须与您的 OpenSearch 服务域位于同一区域,并且必须将直播设置为 “新图像”。要了解更多信息,请参阅启用流。 |
OpenSearch 服务域 | Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch 服务域。 |
IAM 角色 | 此角色必须具有基本的 OpenSearch 服务、DynamoDB 和 Lambda 执行权限,例如:
角色必须拥有以下信任关系:
要了解更多信息,请参阅 IAM 用户手册中的创建 IAM 角色。 |
创建 Lambda 函数
按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 ddb-to-opensearch
的目录并对 sample.py
使用以下代码:
import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'
编辑 region
和 host
的变量。
安装 pip
cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
然后按照创建 Lambda 函数中的说明操作,但指定先决条件中的 IAM 角色和以下触发器设置:
-
表:DynamoDB 表
-
批处理大小:100
-
起始位置:时间范围
要了解更多信息,请参阅 Amazon DynamoDB 开发人员指南中使用 DynamoDB Streams 和 Lambda 处理新项目。
此时,您已拥有一整套资源:用于存放源数据的 DynamoDB 表、表更改的 DynamoDB 流、在源数据更改并索引这些更改后运行的函数,以及用于搜索和可视化的服务域。 OpenSearch
测试 Lambda 函数
创建此函数后,可以通过使用 Amazon CLI将新项目添加到 DynamoDB 表来测试它:
aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region
us-west-1
然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证是否lambda-index
包含文档。还可使用以下请求:
GET https://domain-name
/lambda-index/_doc/00001
{
"_index": "lambda-index",
"_type": "_doc",
"_id": "00001",
"_version": 1,
"found": true,
"_source": {
"director": {
"S": "Kevin Costner"
},
"id": {
"S": "00001"
},
"title": {
"S": "The Postman"
}
}
}
从 Amazon Data Firehose 加载流数据
Firehose 支持将 OpenSearch 服务作为送货目的地。有关如何将流数据加载到 OpenSearch 服务的说明,请参阅《亚马逊数据 Fireh ose 开发者指南》中的创建 Kinesis Data Firehos e 传送流 OpenSearch 和为目的地选择服务。
在将数据加载到 S OpenSearch ervice 之前,可能需要对数据执行转换。要了解有关使用 Lambda 函数执行此任务的更多信息,请参阅此同一指南中的 Amazon Kinesis Data Firehose 数据转换。
在您配置传输流时,Firehose 具有 “一键式” IAM 角色,可为其提供向 OpenSearch 服务发送数据、在 Amazon S3 上备份数据以及使用 Lambda 转换数据所需的资源访问权限。由于手动创建此类角色的过程非常复杂,我们建议使用提供的角色。
正在加载来自亚马逊的流媒体数据 CloudWatch
您可以使用 CloudWatch CloudWatch 日志订阅将流数据从 Logs 加载到您的 OpenSearch 服务域。有关 Amazon CloudWatch 订阅的信息,请参阅通过订阅实时处理日志数据。有关配置信息,请参阅《亚马逊 CloudWatch开发者指南》中的将 CloudWatch 日志数据流式传输到亚马逊 OpenSearch 服务。
从 Amazon IoT表中加载流数据
Amazon IoT 您可以使用规则发送数据。要了解更多信息,请参阅《Amazon IoT 开发者指南》中的OpenSearch操作。