将流数据加载到 Amazon OpenSearch 服务 - 亚马逊 OpenSearch 服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将流数据加载到 Amazon OpenSearch 服务

您可以使用 OpenSearch Ingestion 将流数据直接加载到您的亚马逊 OpenSearch 服务域中,无需使用第三方解决方案。要将数据发送到 OpenSearch Ingestion,您需要配置数据生成器,服务会自动将数据传送到您指定的域或集合。要开始使用 OpenSearch Ingestion,请参阅。教程:使用 Amazon Ingestion 将数据提取到集合中 OpenSearch

您仍然可以使用其他来源加载流数据,例如 Amazon Data Firehose 和 Amazon CloudWatch Logs,它们内置了对 OpenSearch 服务的支持。其他用户(如 Amazon S3、Amazon Kinesis Data Streams 和 Amazon DynamoDB)使用 Amazon Lambda 函数作为事件处理程序。Lambda 函数响应新数据的方式是处理数据并将其流式传输到域。

注意

Lambda 支持多种常用编程语言,并且在大多数 Amazon Web Services 区域中都可用。有关更多信息,请参阅 Amazon Lambda 开发人员指南中的 Lambda 入门Amazon Web Services 一般参考 中的 Amazon 服务端点

从 OpenSearch Ingestion 加载流数据

您可以使用 Amazon OpenSearch Ingestion 将数据加载到 OpenSearch 服务域中。您可以将数据生成器配置为将数据发送到 OpenSearch Ingestion,它会自动将数据传送到您指定的集合。您还可以将 OpenSearch Ingestion 配置为在交付数据之前对其进行转换。有关更多信息,请参阅Amazon OpenSearch Ingestion

从 Amazon S3 表中加载流数据

您可以使用 Lambda 将数据从 Amazon S3 发送到您的 OpenSearch 服务域。到达 S3 存储桶的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。

这种流式传输数据的方式极其灵活。可以为对象元数据编制索引,或者如果对象是纯文本,则对对象正文的部分元素进行解析和编制索引。此节包含一些简单的 Python 示例代码,这些代码使用正则表达式解析日志文件并为匹配项编制索引。

先决条件

继续操作之前,必须具有以下资源。

先决条件 描述
Amazon S3 存储桶 有关更多信息,请参阅 Amazon Simple Storage Service 用户指南中的创建您的第一个 S3 存储桶。存储桶必须与您的 OpenSearch 服务域位于同一区域。
OpenSearch 服务域 Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch 服务域

创建 Lambda 部署程序包

部署程序包为 ZIP 或 JAR 文件,其中包含代码及其依赖项。此节包括 Python 示例代码。对于其他编程语言,请参阅 Amazon Lambda 开发人员指南中的 Lambda 部署程序包

  1. 创建目录。在此示例中,我们使用名称 s3-to-opensearch

  2. 在名为 sample.py 的目录中创建一个文件:

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    编辑 regionhost 的变量。

  3. 安装 pip(如果您尚未安装,则将依赖项安装到 package 目录:

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    所有 Lambda 执行环境都已安装 Boto3,因此无需将其包含在部署程序包中。

  4. 打包应用程序代码和依赖项:

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

创建 Lambda 函数

创建部署程序包之后,可以创建 Lambda 函数。创建函数时,选择名称、运行时 (例如,Python 3.8) 和 IAM 角色。IAM 角色定义对函数的权限。有关详细说明,请参阅 Amazon Lambda 开发人员指南中的通过控制台创建 Lambda 函数

此示例假定使用的是控制台。选择 Python 3.9 以及具有 S3 读取权限和 OpenSearch 服务写入权限的角色,如以下屏幕截图所示:


                    Lambda 函数的示例配置

在创建此函数后,必须添加一个触发器。在此示例中,我们希望代码在日志文件到达 S3 存储桶中时执行:

  1. 选择添加触发器并选择 S3

  2. 选择存储桶。

  3. 对于 Event type (事件类型),选择 PUT

  4. 对于 Prefix (前缀),键入 logs/

  5. 对于后缀,键入 .log

  6. 确认递归调用警告,然后选择添加

最后,可以上传部署程序包:

  1. 选择上载自.zip 文件,然后按照提示上传部署程序包。

  2. 上载完成后,编辑 Runtime 设置并更改处理程序sample.handler。此设置告知 Lambda 在触发之后应执行的文件 (sample.py) 和方法 (handler)。

此时,您拥有一整套资源:一个用于存储日志文件的存储桶、一个在向存储桶中添加日志文件时运行的函数、执行解析和索引的代码,以及一个用于搜索和可视化的 OpenSearch 服务域。

测试 Lambda 函数

在创建此函数之后,可以通过将文件上传到 Amazon S3 存储桶来测试此函数。使用以下示例日志行创建一个名为 sample.log 的文件:

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

将文件上传到 S3 存储桶的 logs 文件夹。有关说明,请参阅 Amazon Simple Storage Service 用户指南中的将对象上传到存储桶

然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证lambda-s3-index索引是否包含两个文档。还可以发出标准搜索请求:

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

从 Amazon Kinesis Data Streams 加载流数据

您可以将流数据从 Kinesis Data Streams 加载 OpenSearch 到服务。到达此数据流的新数据将向 Lambda 触发事件通知,这将运行自定义代码以执行索引编制。此节包括一些简单的 Python 示例代码。

先决条件

继续操作之前,必须具有以下资源。

先决条件 描述
Amazon Kinesis Data Stream Lambda 函数的事件源。要了解更多信息,请参阅 Kinesis Data Streams
OpenSearch 服务域 Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch 服务域
IAM 角色

此角色必须具有基本的 OpenSearch 服务、Kinesis 和 Lambda 权限,例如:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

角色必须拥有以下信任关系:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

要了解更多信息,请参阅 IAM 用户手册中的创建 IAM 角色

创建 Lambda 函数

按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 kinesis-to-opensearch 的目录并对 sample.py 使用以下代码:

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

编辑 regionhost 的变量。

安装 pip——如果您尚未安装,则使用以下命令安装依赖项:

cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

然后按照创建 Lambda 函数中的说明操作,但指定先决条件中的 IAM 角色和以下触发器设置:

  • Kinesis stream:您的 Kinesis stream

  • 批处理大小:100

  • 起始位置:时间范围

有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的什么是 Amazon Kinesis Data Streams?

此时,您已拥有一整套资源:Kinesis 数据流、在流接收新数据并索引该数据之后运行的函数,以及用于搜索和可视化的 OpenSearch 服务域。

测试 Lambda 函数

创建此函数后,可以通过使用 Amazon CLI将新记录添加到数据流来测试它:

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证是否lambda-kine-index包含文档。还可使用以下请求:

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

从 Amazon DynamoDB 表中加载流数据

您可以使用 Amazon Lambda 将数据从亚马逊 DynamoDB 发送到您的 OpenSearch 服务域。到达数据库表的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。

先决条件

继续操作之前,必须具有以下资源。

先决条件 描述
DynamoDB 表

此表包含源数据。有关更多信息,请参阅 Amazon DynamoDB 开发人员指南中的 DynamoDB Tables 中的基本操作

该表必须与您的 OpenSearch 服务域位于同一区域,并且必须将直播设置为 “新图像”。要了解更多信息,请参阅启用流

OpenSearch 服务域 Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch 服务域
IAM 角色

此角色必须具有基本的 OpenSearch 服务、DynamoDB 和 Lambda 执行权限,例如:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

角色必须拥有以下信任关系:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

要了解更多信息,请参阅 IAM 用户手册中的创建 IAM 角色

创建 Lambda 函数

按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 ddb-to-opensearch 的目录并对 sample.py 使用以下代码:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

编辑 regionhost 的变量。

安装 pip——如果您尚未安装,则使用以下命令安装依赖项:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

然后按照创建 Lambda 函数中的说明操作,但指定先决条件中的 IAM 角色和以下触发器设置:

  • :DynamoDB 表

  • 批处理大小:100

  • 起始位置:时间范围

要了解更多信息,请参阅 Amazon DynamoDB 开发人员指南使用 DynamoDB Streams 和 Lambda 处理新项目

此时,您已拥有一整套资源:用于存放源数据的 DynamoDB 表、表更改的 DynamoDB 流、在源数据更改并索引这些更改后运行的函数,以及用于搜索和可视化的服务域。 OpenSearch

测试 Lambda 函数

创建此函数后,可以通过使用 Amazon CLI将新项目添加到 DynamoDB 表来测试它:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证是否lambda-index包含文档。还可使用以下请求:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

从 Amazon Data Firehose 加载流数据

Firehose 支持将 OpenSearch 服务作为送货目的地。有关如何将流数据加载到 OpenSearch 服务的说明,请参阅《亚马逊数据 Fireh ose 开发者指南》中的创建 Kinesis Data Firehos e 传送流 OpenSearch 和为目的地选择服务

在将数据加载到 S OpenSearch ervice 之前,可能需要对数据执行转换。要了解有关使用 Lambda 函数执行此任务的更多信息,请参阅此同一指南中的 Amazon Kinesis Data Firehose 数据转换

在您配置传输流时,Firehose 具有 “一键式” IAM 角色,可为其提供向 OpenSearch 服务发送数据、在 Amazon S3 上备份数据以及使用 Lambda 转换数据所需的资源访问权限。由于手动创建此类角色的过程非常复杂,我们建议使用提供的角色。

正在加载来自亚马逊的流媒体数据 CloudWatch

您可以使用 CloudWatch CloudWatch 日志订阅将流数据从 Logs 加载到您的 OpenSearch 服务域。有关 Amazon CloudWatch 订阅的信息,请参阅通过订阅实时处理日志数据。有关配置信息,请参阅《亚马逊 CloudWatch开发者指南》中的将 CloudWatch 日志数据流式传输到亚马逊 OpenSearch 服务

从 Amazon IoT表中加载流数据

Amazon IoT 您可以使用规则发送数据。要了解更多信息,请参阅《Amazon IoT 开发者指南》中的OpenSearch操作。