使用 Kinesis 代理写入亚马逊 Data Firehose - Amazon Data Firehose
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

亚马逊 Data Firehose 以前被称为亚马逊 Kinesis Data Firehose

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Kinesis 代理写入亚马逊 Data Firehose

Amazon Kinesis 代理是一款独立的 Java 软件应用程序,可作为参考实现,展示如何收集数据并将其发送到 Firehose。代理会持续监控一组文件,并将新数据发送到您的 Firehose 传输流。该代理显示了如何处理文件轮换、检查点操作和失败时重试。它展示了如何以可靠、及时和简单的方式交付数据。它还显示了如何发布 CloudWatch 指标以更好地监控流媒体过程并对其进行故障排除。要了解更多信息,请访问 awslabs/ amazon-kinesis-agent

默认情况下,会基于换行符 ('\n') 分析每个文件中的记录。但是,也可以将代理配置为分析多行记录(请参阅 代理配置设置)。

您可以在基于 Linux 的服务器环境(如 Web 服务器、日志服务器和数据库服务器)上安装此代理。安装代理后,通过指定要监控的文件和数据的 Firehose 流来对其进行配置。配置代理后,它会持久地从文件中收集数据,并可靠地将其发送到 Firehose 数据流。

先决条件

凭证

使用以下方法之一管理您的 Amazon 凭证:

  • 创建自定义凭证提供程序。有关更多信息,请参阅 自定义凭证提供程序

  • 当您启动您的 EC2 实例时指定该 IAM 角色。

  • 在配置代理时,指定 Amazon 凭证(请参阅代理配置设置下面的配置表中的 awsAccessKeyIdawsSecretAccessKey 条目)。

  • 编辑 /etc/sysconfig/aws-kinesis-agent 以指定您的 Amazon 区域和 Amazon 访问密钥。

  • 如果您的 EC2 实例位于不同的Amazon账户中,请创建一个 IAM 角色以提供对 Amazon Data Firehose 服务的访问权限。在配置代理时指定该角色(参见 Assume Learn 和 IdassumeRoleExternal)。使用上述方法之一,来指定其他账户中有权限担任该角色的用户的 Amazon 凭证。

自定义凭证提供程序

您可以创建自定义凭证提供程序,并在以下配置设置中为 Kinesis 代理提供其类名和 jar 路径:userDefinedCredentialsProvider.classnameuserDefinedCredentialsProvider.location。有关这两个配置设置的说明,请参阅代理配置设置

要创建自定义凭证提供程序,请定义一个实现 AmazonCredentialsProvider 接口的类,如下例所示。

import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; public class YourClassName implements AWSCredentialsProvider { public YourClassName() { } public AWSCredentials getCredentials() { return new BasicAWSCredentials("key1", "key2"); } public void refresh() { } }

您的类必须有一个不带参数的构造函数。

Amazon 会定期调用刷新方法以获取更新的凭证。如果希望凭证提供程序在其整个生命周期内提供不同的凭证,请在此方法中包含用于刷新凭证的代码。或者,如果您需要提供静态(不更改)凭证的凭证提供程序,则可以将此方法保留为空。

下载并安装代理

首先,连接到您的实例。有关更多信息,请参阅适用于 Linux 实例的 Amazon EC2 用户指南中的连接到您的实例。如果您在连接时遇到问题,请参阅《适用于 Linux 实例的 Amazon EC2 用户指南》中的实例连接问题排查

接下来,请使用以下方法之一安装代理。

  • 要从 Amazon Linux 存储库设置代理

    此方法仅适用于 Amazon Linux 实例。使用以下命令:

    sudo yum install –y aws-kinesis-agent

    代理 2.0.0 或更高版本安装在操作系统为 Amazon Linux 2(AL2)的计算机上。此代理版本需要安装 Java 1.8 或更高版本。如果尚未安装所需的 Java 版本,代理安装程序将会安装。有关 Amazon Linux 2 的更多信息,请参阅 https://aws.amazon.com/amazon-linux-2/

  • 从 Amazon S3 存储库设置代理

    此方法从公开可用的存储库安装代理,因此适用于 Red Hat Enterprise Linux 以及 Amazon Linux 2 实例。使用以下命令下载并安装最新版本的代理 2.x.x:

    sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm

    要安装特定版本的代理,请在命令中指定版本号。例如,以下命令将安装代理 2.0.1。

    sudo yum install –y https://streaming-data-agent.s3.amazonaws.com/aws-kinesis-agent-2.0.1-1.amzn1.noarch.rpm

    如果您使用的是 Java 1.7,但不想升级,则可以下载与 Java 1.7 兼容的代理 1.x.x。例如,要下载代理 1.1.6,可使用以下命令:

    sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-1.1.6-1.amzn1.noarch.rpm

    可以使用以下命令下载最新的代理 1.x.x:

    sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm
  • 从 GitHub repo 中设置代理

    1. 首先,确保您已安装所需的 Java 版本,具体取决于代理版本。

    2. awslabs amazon-kinesis-agent GitHub /存储库下载代理。

    3. 导航到下载目录并运行以下命令来安装代理:

      sudo ./setup --install
  • 在 Docker 容器中设置代理

    Kinesis 代理可以在容器中运行,也可以通过 amazonlinux 容器库运行。使用以下 Dockerfile,然后运行 docker build

    FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]

配置并启动代理

配置并启动代理
  1. 打开并编辑配置文件(如果使用默认文件访问权限,则以超级用户的身份来执行):/etc/aws-kinesis/agent.json

    在此配置文件中,指定代理从中收集数据的文件 ("filePattern") 以及代理向其发送数据的 Firehose 流的名称 ("deliveryStream")。文件名是一种模式,并且代理能够识别文件轮换。每秒内您轮换使用文件或创建新文件的次数不能超过一次。代理使用文件创建时间戳来确定要跟踪哪些文件并将其追踪到您的 Firehose 直播中。如果每秒创建新文件或轮换使用文件的次数超过一次,代理将无法正确区分这些文件。

    { "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "yourdeliverystream" } ] }

    默认 Amazon 区域是 us-east-1。如果使用的是其他区域,请将 firehose.endpoint 设置添加到配置文件,为区域指定终端节点。有关更多信息,请参阅 代理配置设置

  2. 手动启动代理:

    sudo service aws-kinesis-agent start
  3. (可选)将代理配置为在系统启动时启动:

    sudo chkconfig aws-kinesis-agent on

现在,代理作为系统服务在后台运行。它会持续监视指定的文件并将数据发送到指定的 Firehose 流。代理活动记录在 /var/log/aws-kinesis-agent/aws-kinesis-agent.log 中。

代理配置设置

代理支持两个必需的配置设置,即 filePatterndeliveryStream,以及可用于其他功能的可选配置设置。您可以在 /etc/aws-kinesis/agent.json 中指定必需配置设置和可选配置设置。

只要您更改配置文件,就必须使用以下命令停止再启动代理:

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

或者,您也可以使用以下命令:

sudo service aws-kinesis-agent restart

一般的设置配置如下。

配置设置 描述
assumeRoleARN

用户应承担的角色的 Amazon 资源名称 (ARN)。有关更多信息,请参阅《IAM 用户指南》中的使用 IAM 角色跨 Amazon 账户委派访问权限

assumeRoleExternalId

确定谁可以担任该角色的可选标识符。有关更多信息,请参阅《IAM 用户指南》中的如何使用外部 ID

awsAccessKeyId

覆盖默认凭证的 Amazon 访问密钥 ID。此设置优先于所有其他凭证提供程序。

awsSecretAccessKey

覆盖默认凭证的 Amazon 密钥。此设置优先于所有其他凭证提供程序。

cloudwatch.emitMetrics

CloudWatch 如果已设置,则允许代理向其发送指标 (true)。

默认值:true

cloudwatch.endpoint

的区域终端节点 CloudWatch。

默认值:monitoring.us-east-1.amazonaws.com

firehose.endpoint

亚马逊 Data Firehose 的区域终端节点。

默认值:firehose.us-east-1.amazonaws.com

sts.endpoint

Amazon Security Token Service 的区域端点。

默认值:https://sts.amazonaws.com

userDefinedCredentialsProvider.classname 如果定义自定义凭证提供程序,请使用此设置提供其完全限定类名。不要在类名末尾包含 .class
userDefinedCredentialsProvider.location 如果定义自定义凭证提供程序,请使用此设置指定包含自定义凭证提供程序的 jar 的绝对路径。代理还在以下位置查找 jar 文件:/usr/share/aws-kinesis-agent/lib/

流配置设置如下。

配置设置 描述
aggregatedRecordSizeBytes

要使代理聚合记录,然后通过一次操作将其放入 Firehose 流,请指定此设置。将其设置为你希望聚合记录在代理将其放入 Firehose 直播之前的大小。

默认值:0(不聚合)

dataProcessingOptions

在每条已解析的记录发送到 Firehose 流之前,应用于该记录的处理选项列表。这些处理选项按指定的顺序执行。有关更多信息,请参阅 使用代理预处理数据

deliveryStream

[必填] Firehose 直播的名称。

filePattern

[必需] 需要由代理监控的文件的 Glob。与此模式匹配的任何文件都会自动由代理挑选出来并进行监控。对于匹配此模式的所有文件,请向 aws-kinesis-agent-user 授予读取权限。对于包含这些文件的目录,请向 aws-kinesis-agent-user 授予读取和执行权限。

重要

代理将选择与此模式匹配的任何文件。为了确保代理不会选择意外的记录,请仔细选择此模式。

initialPosition

开始解析文件的初始位置。有效值为 START_OF_FILEEND_OF_FILE

默认值:END_OF_FILE

maxBufferAgeMillis

代理在将数据发送到 Firehose 流之前缓冲数据的最长时间,以毫秒为单位。

值范围:1,000-900,000(1 秒到 15 分钟)

默认值:60,000(1 分钟)

maxBufferSizeBytes

代理在将数据发送到 Firehose 流之前对其进行缓冲的最大大小,以字节为单位。

值范围:1-4,194,304(4MB)

默认值:4194304 (4 MB)

maxBufferSizeRecords

代理在将数据发送到 Firehose 流之前对其进行缓冲的最大记录数。

值范围:1-500

默认值:500

minTimeBetweenFilePollsMillis

代理轮询和分析受监控文件中是否有新数据的时间间隔(以毫秒计)。

值范围:1 或更大值

默认值:100

multiLineStartPattern

用于标识记录开始的模式。记录由与模式匹配的行以及与模式不匹配的任何以下行组成。有效值为正则表达式。默认情况下,日志文件中的每一个新行都被解析为一条记录。

skipHeaderLines

代理从受监控文件开头跳过分析的行数。

值范围:0 或更大值

默认值:0(零)

truncatedRecordTerminator

当记录大小超过 Amazon Data Firehose 记录大小限制时,代理用来截断已解析记录的字符串。(1000 KB)

默认值:'\n'(换行符)

监控多个文件目录并写入多个流

通过指定多个流程配置设置,您可以配置代理以监控多个文件目录并将数据发送到多个流。在以下配置示例中,代理监控两个文件目录,并将数据分别发送到 Kinesis 数据流和 Firehose 流。您可以为 Kinesis Data Streams 和 Amazon Data Firehose 指定不同的终端节点,这样您的数据流和 Firehose 流就不必位于同一个区域。

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

有关在 Amazon Kinesis Data Streams 中使用代理的更多详细信息,请参阅使用 Kinesis 代理写入 Amazon Kinesis Data Streams

使用代理预处理数据

代理可以预处理从受监控文件中解析的记录,然后再将其发送到您的 Firehose 直播中。通过将 dataProcessingOptions 配置设置添加到您的文件流可以启用此功能。可以添加一个或多个处理选项,这些选项将按指定的顺序执行。

代理支持以下处理选项。由于代理是开源的,您可以进一步开发和扩展其处理选项。您可以从 Kinesis 代理下载代理。

处理选项
SINGLELINE

通过移除换行符和首尾的空格,将多行记录转换为单行记录。

{ "optionName": "SINGLELINE" }
CSVTOJSON

将记录从分隔符分隔的格式转换为 JSON 格式。

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[必需] 在每个 JSON 键值对中用作键的字段名称。例如,如果您指定 ["f1", "f2"],则记录“v1, v2”将转换为 {"f1":"v1","f2":"v2"}

delimiter

在记录中用作分隔符的字符串。默认值为逗号(,)。

LOGTOJSON

将记录从日志格式转换为 JSON 格式。支持的日志格式为 Apache Common LogApache Combined LogApache Error LogRFC3164 Syslog

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[必需] 日志条目格式。以下是可能的值:

  • COMMONAPACHELOG – Apache 通用日志格式。默认情况下每个日志条目都为以下模式:“%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}”。

  • COMBINEDAPACHELOG – Apache 组合日志格式。默认情况下每个日志条目都为以下模式:“%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}”。

  • APACHEERRORLOG – Apache 错误日志格式。默认情况下每个日志条目都为以下模式:“[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}”。

  • SYSLOG – RFC3164 系统日志格式。默认情况下每个日志条目都为以下模式:“%{timestamp} %{hostname} %{program}[%{processid}]: %{message}”。

matchPattern

覆盖指定的日志格式的默认模式。如果日志条目使用自定义格式,则可以使用该设置提取日志条目中的值。如果指定 matchPattern,还必须指定 customFieldNames

customFieldNames

在每个 JSON 键值对中用作键的自定义字段名称。您可以使用此设置定义从 matchPattern 中提取的值的字段名称,或覆盖预定义日志格式的默认字段名称。

例 :LOGTOJSON 配置

下面是一个转换为 JSON 格式的 Apache 通用日志条目的 LOGTOJSON 配置示例:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

转换前:

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

转换后:

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
例 :包含自定义字段的 LOGTOJSON 配置

下面是 LOGTOJSON 配置的另一个示例:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

使用此配置设置时,上一个示例中的同一个 Apache 通用日志条目将如下转换为 JSON 格式:

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
例 :转换 Apache 通用日志条目

以下流配置将一个 Apache 通用日志条目转换为 JSON 格式的单行记录:

{ "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "my-delivery-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
例 :转换多行记录

以下流配置分析第一行以“[SEQUENCE=”开头的多行记录。每个记录先转换为一个单行记录。然后,将基于制表分隔符从记录中提取值。提取的值映射到指定的 customFieldNames 值,从而形成 JSON 格式的单行记录。

{ "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "my-delivery-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }
例 :具有匹配模式的 LOGTOJSON 配置

下面是一个转换为 JSON 格式的 Apache 通用日志条目的 LOGTOJSON 配置示例,其中省略了最后一个字段 (bytes):

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

转换前:

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

转换后:

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

代理 CLI 命令

系统启动时自动启动代理:

sudo chkconfig aws-kinesis-agent on

检查代理的状态:

sudo service aws-kinesis-agent status

停止代理:

sudo service aws-kinesis-agent stop

从此位置读取代理的日志文件:

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

卸载代理:

sudo yum remove aws-kinesis-agent

常见问题解答

有没有适用于 Windows 的 Kinesis 代理?

适用于 Windows 的 Kinesis 代理与适用于 Linux 平台的 Kinesis 代理是不同的软件。

为什么 Kinesis 代理速度变慢且/或 RecordSendErrors 增加?

这通常是由于 Kinesis 节流造成的。查看 Kinesis Data Streams 的WriteProvisionedThroughputExceeded指标或 Firehose 直播ThrottledRecords的指标。这些指标从 0 开始的任何增加都表示需要增加流限制。有关更多信息,请参阅 Kinesis 数据流限制和 Firehose 数据流

排除节流后,查看 Kinesis 代理是否配置为跟踪大量小文件。Kinesis 代理跟踪新文件时会有延迟,因此 Kinesis 代理应跟踪少量大文件。尝试将您的日志文件合并为大文件。

为什么我会收到 java.lang.OutOfMemoryError 异常?

Kinesis 代理没有足够的内存来处理当前的工作负载。尝试增加 /usr/bin/start-aws-kinesis-agent 中的 JAVA_START_HEAPJAVA_MAX_HEAP,并重新启动代理。

为什么我会收到 IllegalStateException : connection pool shut down 异常?

Kinesis 代理没有足够的连接来处理当前的工作负载。尝试在 /etc/aws-kinesis/agent.json 的常规代理配置设置中增加 maxConnectionsmaxSendingThreads。这些字段的默认值是可用运行时系统处理器的 12 倍。有关高级代理配置设置的更多信息,请参见 AgentConfiguration.java

如何调试 Kinesis 代理的其他问题?

可在 /etc/aws-kinesis/log4j.xml 中启用 DEBUG 级别日志。

我应该如何配置 Kinesis 代理?

maxBufferSizeBytes 越小,Kinesis 代理发送数据的频率就越高。这可能是好事,因为这减少了记录的传输时间,但也增加了每秒对 Kinesis 的请求。

为什么 Kinesis 代理会发送重复记录?

这是由于文件跟踪中的错误配置造成的。确保每个 fileFlow’s filePattern 只匹配一个文件。如果正在使用的 logrotate 模式处于 copytruncate 模式,也可能发生这种情况。尝试将模式更改为默认模式或创建模式以避免重复。有关处理重复记录的更多信息,请参阅处理重复记录