将流数据加载到 Amazon Elasticsearch Service - Amazon Elasticsearch Service
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将流数据加载到 Amazon Elasticsearch Service

可以从许多不同的源将流数据加载到 Amazon Elasticsearch Service 域。有些资源(如 Amazon Kinesis Data Firehose 和 Amazon CloudWatch Logs)具有针对 Amazon ES 的内置支持。其他资源(如 Amazon S3、Amazon Kinesis Data Streams 和 Amazon DynamoDB)使用 AWS Lambda 函数作为事件处理程序。Lambda 函数响应新数据的方式是处理数据并将其流式传输到域。

注意

Lambda 支持多种常用编程语言,并且在大多数 AWS 区域中都可用。有关更多信息,请参阅 https://docs.amazonaws.cn/lambda/latest/dg/lambda-app.html 中的AWS Lambda Developer Guide构建 Lambda 函数https://docs.amazonaws.cn/general/latest/gr/rande.html#lambda_region 中的 AWS General ReferenceAWS Lambda 区域

从 Amazon S3 将流数据加载到 Amazon ES

可以使用 Lambda 从 Amazon S3 将数据发送到 Amazon ES 域。到达 S3 存储桶的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。

这种流式传输数据的方式极其灵活。可以为对象元数据编制索引,或者如果对象是纯文本,则对对象正文的部分元素进行解析和编制索引。此节包含一些简单的 Python 示例代码,这些代码使用正则表达式解析日志文件并为匹配项编制索引。

提示

有关 Node.js 中更可靠的代码,请参阅 上的 amazon-elasticsearch-lambda-samplesGitHub。 某些 Lambda 蓝图还包含有用的解析示例。

Prerequisites

继续操作之前,必须具有以下资源。

先决条件 描述
Amazon S3 存储桶 有关更多信息,请参阅 https://docs.amazonaws.cn/AmazonS3/latest/gsg/CreatingABucket.html 中的Amazon Simple Storage Service 入门指南创建存储桶。存储桶必须与 Amazon ES 域位于同一个区域。
Amazon ES 域 Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅创建 Amazon ES 域

创建 Lambda 部署程序包

部署程序包为 ZIP 或 JAR 文件,其中包含代码及其依赖项。此节包括 Python 示例代码。对于其他编程语言,请参阅 https://docs.aws.amazon.com/lambda/latest/dg/deployment-package-v2.html 中的AWS Lambda Developer Guide创建部署程序包

  1. 创建目录。在此示例中,我们使用名称 s3-to-es

  2. 在名为 sample.py 的目录中创建一个文件:

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-s3-index' type = 'lambda-type' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    编辑 regionhost 的变量。

  3. 安装依赖项:

    cd s3-to-es pip install requests -t . pip install requests_aws4auth -t .

    所有 Lambda 执行环境都已安装 Boto3,因此无需将其包含在部署程序包中。

    提示

    如果您使用 macOS,这些命令可能无法正常工作。解决方法是将名为 setup.cfg 的文件添加到 s3-to-es 目录:

    [install] prefix=
  4. 打包应用程序代码和依赖项:

    zip -r lambda.zip *

创建 Lambda 函数

创建部署程序包之后,可以创建 Lambda 函数。创建函数时,选择名称、运行时 (例如,Python 2.7) 和 IAM 角色。IAM 角色定义对函数的权限。有关详细说明,请参阅 https://docs.amazonaws.cn/lambda/latest/dg/get-started-create-function.html 中的AWS Lambda Developer Guide创建示例 Lambda 函数

此示例假定使用的是控制台。选择 Python 2.7 和具有 S3 读取权限和 Amazon ES 写入权限的角色,如以下屏幕截图中所示。


                    函数的示例配置Lambda

在创建此函数后,必须添加一个触发器。在本示例中,我们希望代码在日志文件到达 S3 存储桶中时运行:

  1. 选择 S3。

  2. 选择存储桶。

  3. 对于 Event type (事件类型),选择 PUT

  4. 对于 Prefix (前缀),键入 logs/

  5. 对于 Filter pattern (筛选条件模式),键入 .log

  6. 选择 Enable trigger (启用触发器)

  7. 选择 Add

最后,可以上传部署程序包:

  1. 对于 Handler (处理程序),键入 sample.handler。 此设置将告知 Lambda 文件 (sample.py) 和方法 (handler) 应在触发之后运行。

  2. 对于 Code entry type (代码输入种类),选择 Upload a .ZIP file (上传 .ZIP 文件),然后按照提示上传部署程序包。

  3. 选择 Save

此时,您具有一整套资源:存储日志文件的存储桶、日志文件添加到存储桶时运行的函数、执行解析和编制索引的代码以及搜索和可视化的 Amazon ES 域。

测试 Lambda 函数

在创建此函数之后,可以通过将文件上传到 Amazon S3 存储桶来测试此函数。使用以下示例日志行创建一个名为 sample.log 的文件:

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

将文件上传到 S3 存储桶的 logs 文件夹。有关说明,请参阅 https://docs.amazonaws.cn/AmazonS3/latest/gsg/PuttingAnObjectInABucket.html 中的Amazon Simple Storage Service 入门指南向存储桶添加对象

然后使用 Amazon ES 控制台或 Kibana 验证 lambda-s3-index 索引是否包含两个文档。还可以发出标准搜索请求:

GET https://es-domain/lambda-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

从 Amazon Kinesis Data Streams 将流数据加载到 Amazon ES

您可以从 Kinesis Data Streams 将流数据加载到 Amazon ES。到达此数据流的新数据将向 Lambda 触发事件通知,这将运行自定义代码以执行索引编制。此节包括一些简单的 Python 示例代码。有关 Node.js 中更可靠的代码,请参阅 上的 amazon-elasticsearch-lambda-samplesGitHub。

Prerequisites

继续操作之前,必须具有以下资源。

先决条件 描述
Amazon Kinesis 数据流 Lambda 函数的事件源。要了解更多信息,请参阅 Kinesis 数据流
Amazon ES 域 Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅创建 Amazon ES 域
IAM 角色

此角色必须具有基本的 Amazon ES、Kinesis 和 Lambda 权限,如以下内容:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

角色必须拥有以下信任关系:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

要了解更多信息,请参阅 https://docs.amazonaws.cn/IAM/latest/UserGuide/id_roles_create.html 中的IAM 用户指南创建 IAM 角色

创建 Lambda 函数

按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 kinesis-to-es 的目录并对 sample.py 使用以下代码:

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-kine-index' type = 'lambda-kine-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

编辑 regionhost 的变量。

使用以下命令安装依赖项:

cd kinesis-to-es pip install requests -t . pip install requests_aws4auth -t .

然后按照创建 Lambda 函数中的说明操作,但指定Prerequisites中的 IAM 角色和以下触发器设置:

  • Kinesis 流:您的 Kinesis 流

  • 批处理大小:100

  • 起始位置:时间范围

要了解更多信息,请参阅 https://docs.amazonaws.cn/streams/latest/dev/working-with-kinesis.html 中的Amazon Kinesis Data Streams 开发人员指南使用 Amazon Kinesis 数据流

此时,您具有一整套资源:Kinesis 数据流、在流收到新数据并为该数据编制索引后运行的函数、用于搜索和可视化的 Amazon ES 域。

测试 Lambda 函数

创建此函数后,可以通过使用 AWS CLI 将新记录添加到数据流来测试它:

aws kinesis put-record --stream-name es-test --data "My test data." --partition-key partitionKey1 --region us-west-1

然后使用 Amazon ES 控制台或 Kibana 验证 lambda-kine-index 是否包含文档。还可使用以下请求:

GET https://es-domain/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "lambda-kine-type", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

从 Amazon DynamoDB 将流数据加载到 Amazon ES

可以使用 AWS Lambda 从 Amazon DynamoDB 将数据发送到 Amazon ES 域。到达数据库表的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。

Prerequisites

继续操作之前,必须具有以下资源。

先决条件 描述
DynamoDB Table

此表包含源数据。有关更多信息,请参阅 https://docs.amazonaws.cn/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html 中的Amazon DynamoDB 开发人员指南表的基本操作

此表必须与 Amazon ES 域驻留在同一个区域并且流设置为新映像。要了解更多信息,请参阅启用流

Amazon ES 域 Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅创建 Amazon ES 域
IAM 角色

此角色必须具有基本的 Amazon ES、DynamoDB 和 Lambda 执行权限,如以下内容:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

角色必须拥有以下信任关系:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

要了解更多信息,请参阅 https://docs.amazonaws.cn/IAM/latest/UserGuide/id_roles_create.html 中的IAM 用户指南创建 IAM 角色

创建 Lambda 函数

按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 ddb-to-es 的目录并对 sample.py 使用以下代码:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, with https:// index = 'lambda-index' type = 'lambda-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the Elasticsearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

编辑 regionhost 的变量。

使用以下命令安装依赖项:

cd ddb-to-es pip install requests -t . pip install requests_aws4auth -t .

然后按照创建 Lambda 函数中的说明操作,但指定Prerequisites中的 IAM 角色和以下触发器设置:

  • :您的 DynamoDB 表

  • 批处理大小:100

  • 起始位置:时间范围

要了解更多信息,请参阅 中的DynamoDB处理 表中的新项目。Amazon DynamoDB 开发人员指南

此时,您具有一整套资源:源数据的 DynamoDB 表、表更改的 DynamoDB 流、在源数据更改并为这些更改编制索引后运行的函数以及用于搜索和可视化的 Amazon ES 域。

测试 Lambda 函数

创建此函数后,可以通过使用 AWS CLI 将新项目添加到 DynamoDB 表来测试它:

aws dynamodb put-item --table-name es-test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

然后使用 Amazon ES 控制台或 Kibana 验证 lambda-index 是否包含文档。还可使用以下请求:

GET https://es-domain/lambda-index/lambda-type/00001 { "_index": "lambda-index", "_type": "lambda-type", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

从 Amazon Kinesis Data Firehose 将流数据加载到 Amazon ES

Kinesis Data Firehose 支持 Amazon ES 作为传输目标。有关如何将流数据加载到 Amazon ES 的说明,请参阅 https://docs.amazonaws.cn/firehose/latest/dev/basic-create.html中的创建 Kinesis 数据 Firehose 传输流和Amazon Kinesis Data Firehose 开发人员指南选择 Amazon ES 作为目标

在将数据加载到 Amazon ES 之前,可能需要对数据执行转换。要了解有关使用 Lambda 函数执行此任务的更多信息,请参阅此同一指南中的数据转换

配置传输流时,Kinesis Data Firehose 具有“一键式”IAM 角色,该角色将为传输流提供将数据发送到 Amazon ES、在 Amazon S3 上备份数据以及使用 Lambda 转换数据所需的资源访问权限。由于手动创建此类角色的过程非常复杂,我们建议使用提供的角色。

从 Amazon CloudWatch 将流数据加载到 Amazon ES

您可以使用 CloudWatch Logs 订阅从 CloudWatch Logs 将流数据加载到您的 Amazon ES 域。有关 Amazon CloudWatch 订阅的信息,请参阅使用订阅实时处理日志数据。有关配置信息,请参阅 开发人员指南CloudWatch 中的 Logs 数据流式传输到 Amazon Elasticsearch ServiceAmazon CloudWatch。

将 AWS IoT 中的数据加载到 Amazon ES

您可以使用AWS IoT规则 发送数据。要了解更多信息,请参阅 https://docs.amazonaws.cn/iot/latest/developerguide/elasticsearch-rule.html 中的 AWS IoT 开发人员指南Amazon ES 操作