Amazon Kinesis Data Streams
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

使用 Kinesis 代理写入 Amazon Kinesis Data Streams

Kinesis 代理是独立的 Java 软件应用程序,可提供更轻松的方式来收集数据并将数据发送到 Kinesis Data Streams。此代理持续监控一组文件,并将新数据发送到您的流。代理处理文件轮换、检查点操作并在失败时重试。它以可靠及时的简单方法提供您的所有数据。它还会发出 Amazon CloudWatch 指标,以帮助您更好地监控流处理过程并排除故障。

默认情况下,会基于换行符 ('\n') 分析每个文件中的记录。但是,也可以将代理配置为分析多行记录(请参阅代理配置设置)。

您可以在基于 Linux 的服务器环境(如 Web 服务器、日志服务器和数据库服务器)上安装此代理。在安装代理后,通过指定要监控的日志文件和 名称来配置代理。在配置好代理之后,代理将持续从这些文件中收集数据并以可靠的方式将数据发送到流。

先决条件

  • 您的操作系统必须是 Amazon Linux AMI 版本 2015.09 或更高版本,或者 Red Hat Enterprise Linux 版本 7 或更高版本。

  • 如果您使用 Amazon EC2 运行代理,请启动您的 EC2 实例。

  • 使用以下方法之一来管理 AWS 凭证:

    • 当您启动您的 EC2 实例时指定该 IAM 角色。

    • 当您配置代理时指定 AWS 凭证(参见 awsAccessKeyIdawsSecretAccessKey)。

    • 编辑 /etc/sysconfig/aws-kinesis-agent 以指定您的区域和 AWS 访问密钥。

    • 如果您的 EC2 实例位于其他 AWS 账户中,请创建一个 IAM 角色来提供对 Kinesis Data Streams 服务的访问,并在配置代理时指定该角色 (请参阅 assumeRoleARNassumeRoleExternalId)。请使用上述方法之一指定其他账户中有权担任该角色的用户的 AWS 凭证。

  • 您指定的 IAM 角色或 AWS 凭证必须有权执行 Kinesis Data Streams PutRecords 操作,以便让代理将数据发送到您的流。如果您为代理启用 CloudWatch 监控,则还需要具有执行 CloudWatch PutMetricData 操作的权限。有关更多信息,请参阅使用 IAM 控制对 Amazon Kinesis Data Streams 资源的访问, 使用 Amazon CloudWatch 监控 Kinesis Data Streams 代理运行状况CloudWatch 访问控制

下载并安装代理

首先,连接到您的实例。想要了解更多信息,请参阅 Amazon EC2 用户指南(适用于 Linux 实例) 中的连接到您的实例。如果在连接时出现问题,请参阅 Amazon EC2 用户指南(适用于 Linux 实例) 中的排除连接实例的故障

使用 Amazon Linux AMI 设置代理

使用以下命令下载和安装代理:

sudo yum install –y aws-kinesis-agent

使用 Red Hat Enterprise Linux 设置代理

使用以下命令下载和安装代理:

sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm

使用 GitHub 设置代理

  1. awlabs/amazon-kinesis-agent 下载代理。

  2. 导航到下载目录并运行以下命令来安装代理:

    sudo ./setup --install

配置并启动代理

配置并启动代理

  1. 打开并编辑配置文件(如果使用默认文件访问权限,则以超级用户的身份来执行):/etc/aws-kinesis/agent.json

    在此配置文件中,指定代理从中收集数据的文件 ("filePattern") 以及代理将数据发送到的流的名称 ("kinesisStream")。请注意,文件名是一种模式,并且代理能够识别文件轮换。每秒内您轮换使用文件或创建新文件的次数不能超过一次。代理使用文件创建时间戳来确定要跟踪并送入您的流中的文件;如果每秒创建新文件或轮换使用文件的次数超过一次,代理将无法正确区分这些文件。

    { "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "yourkinesisstream" } ] }
  2. 手动启动代理:

    sudo service aws-kinesis-agent start
  3. (可选) 将代理配置为在系统启动时启动:

    sudo chkconfig aws-kinesis-agent on

现在,代理作为系统服务在后台运行。它会持续监控指定的文件,并将数据发送到指定的流。代理活动记录在 /var/log/aws-kinesis-agent/aws-kinesis-agent.log 中。

代理配置设置

代理支持两个必需的配置设置,即 filePatternkinesisStream,以及可用于其他功能的可选配置设置。您可以在 /etc/aws-kinesis/agent.json 中指定必需配置和可选配置。

只要您更改配置文件,就必须使用以下命令停止再启动代理:

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

或者,您也可以使用以下命令:

sudo service aws-kinesis-agent restart

一般的设置配置如下。

配置设置 描述
assumeRoleARN

由该用户担任的角色的 ARN。有关更多信息,请参阅 IAM 用户指南 中的使用 IAM 角色委派跨 AWS 账户的访问权限

assumeRoleExternalId

确定谁可以担任该角色的可选标识符。有关更多信息,请参阅 IAM 用户指南 中的如何使用外部 ID

awsAccessKeyId

覆盖默认凭证的 AWS 访问密钥 ID。此设置优先于所有其他凭证提供程序。

awsSecretAccessKey

覆盖默认凭证的 AWS 密钥。此设置优先于所有其他凭证提供程序。

cloudwatch.emitMetrics

设置为 true 时将使代理能够向 CloudWatch 发送指标。

默认值:True

cloudwatch.endpoint

CloudWatch 区域终端节点。

默认值:monitoring.us-east-1.amazonaws.com

kinesis.endpoint

Kinesis Data Streams 区域终端节点。

默认值:kinesis.us-east-1.amazonaws.com

流配置设置如下。

配置设置 描述
dataProcessingOptions

在将每个被分析的记录发送到流之前应用于这些记录的处理选项的列表。这些处理选项按指定的顺序执行。有关更多信息,请参阅 使用代理预处理数据

kinesisStream

[必需] 流名称。

filePattern

[必需] 必须由代理监控的文件的 Glob。与此模式匹配的任何文件都会自动由代理挑选出来并进行监控。对于与此模式匹配的所有文件,必须向 aws-kinesis-agent-user 授予读取权限。对于包含这些文件的目录,必须向 aws-kinesis-agent-user 授予读取和执行权限。

initialPosition

开始解析文件的初始位置。有效值为 START_OF_FILEEND_OF_FILE

默认值:END_OF_FILE

maxBufferAgeMillis

代理在将数据发送到流之前缓冲数据的最长时间(以毫秒计)。

值范围:1000 到 900000(1 秒到 15 分钟)

默认值:60000(1 分钟)

maxBufferSizeBytes

代理在将数据发送到流之前缓冲的数据的最大大小(以字节计)。

值范围:1 到 4194304 (4 MB)

默认值:4194304 (4 MB)

maxBufferSizeRecords

代理在将数据发送到流之前缓冲数据的最大记录数。

值范围:1 到 500

默认值:500

minTimeBetweenFilePollsMillis

代理轮询和分析受监控文件中是否有新数据的时间间隔(以毫秒计)。

值范围:1 或更大值

默认值:100

multiLineStartPattern

用于标识记录开始的模式。记录由与模式匹配的行以及与模式不匹配的任何以下行组成。有效值为正则表达式。默认情况下,日志文件中的每一个新行都被解析为一条记录。

partitionKeyOption

生成分区键的方法。有效值为 RANDOM(随机生成的整数)和 DETERMINISTIC(基于数据计算出来的哈希值)。

默认值:RANDOM

skipHeaderLines

代理从受监控文件开头跳过分析的行数。

值范围:0 或更大值

默认值:0(零)

truncatedRecordTerminator

代理在记录大小超过 Kinesis Data Streams 记录大小限制时用来截断所分析记录的字符串。(1000 KB)

默认值:'\n'(换行符)

监控多个文件目录并写入多个流

通过指定多个流程配置设置,您可以配置代理以监控多个文件目录并将数据发送到多个流。在以下配置示例中,代理监控两个文件目录并将数据分别发送到一个 Kinesis 流和一个 Kinesis Firehose 交付流。请注意,您可以为 Kinesis Data Streams 和 Kinesis Firehose 指定不同的终端节点,这样,Kinesis 流和 Kinesis Firehose 交付流就不必位于同一区域。

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

有关将代理与 Kinesis Firehose 一起使用的更详细的信息,请参阅 使用 Kinesis 代理写入 Amazon Kinesis Data Firehose

使用代理预处理数据

代理可以预处理从受监控文件分析的记录,然后再将这些记录发送到流。通过将 dataProcessingOptions 配置设置添加到您的文件流可以启用此功能。可以添加一个或多个处理选项,这些选项将按指定的顺序执行。

代理支持下面列出的处理选项。由于此代理是开源的,您可以进一步开发和扩展其处理选项。您可以从 Kinesis 代理 下载此代理。

处理选项

SINGLELINE

通过移除换行符和首尾的空格,将多行记录转换为单行记录。

{ "optionName": "SINGLELINE" }
CSVTOJSON

将记录从分隔符分隔的格式转换为 JSON 格式。

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[必需] 在每个 JSON 键值对中用作键的字段名称。例如,如果您指定 ["f1", "f2"],则记录“v1, v2”将转换为 {"f1":"v1","f2":"v2"}

delimiter

在记录中用作分隔符的字符串。默认为逗号 (,)。

LOGTOJSON

将记录从日志格式转换为 JSON 格式。支持的日志格式有 Apache 通用日志Apache 组合日志Apache 错误日志RFC3164 Syslog

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[必需] 日志条目格式。以下是可能的值:

  • COMMONAPACHELOG - Apache 通用日志格式。默认情况下每个日志条目都为以下模式:“%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}”。

  • COMBINEDAPACHELOG - Apache 组合日志格式。默认情况下每个日志条目都为以下模式:“%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}”。

  • APACHEERRORLOG - Apache 错误日志格式。默认情况下每个日志条目都为以下模式:“[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}”。

  • SYSLOG - RFC3164 Syslog 格式。默认情况下每个日志条目都为以下模式:“%{timestamp} %{hostname} %{program}[%{processid}]: %{message}”。

matchPattern

用于从日志条目中提取值的正则表达式模式。如果您的日志条目不属于其中一种预定义日志格式,则将使用此设置。如果使用此设置,您还必须指定 customFieldNames

customFieldNames

在每个 JSON 键值对中用作键的自定义字段名称。您可以使用此设置定义从 matchPattern 中提取的值的字段名称,或覆盖预定义日志格式的默认字段名称。

例 :LOGTOJSON 配置

下面是一个转换为 JSON 格式的 Apache 通用日志条目的 LOGTOJSON 配置示例:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

转换前:

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

转换后:

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}

例 :包含自定义字段的 LOGTOJSON 配置

下面是 LOGTOJSON 配置的另一个示例:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

使用此配置设置时,上一个示例中的同一个 Apache 通用日志条目将如下转换为 JSON 格式:

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}

例 :转换 Apache 通用日志条目

以下流配置将一个 Apache 通用日志条目转换为 JSON 格式的单行记录:

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }

例 :转换多行记录

以下流配置分析第一行以“[SEQUENCE=”开头的多行记录。每个记录先转换为一个单行记录。然后,将基于制表分隔符从记录中提取值。提取的值映射到指定的 customFieldNames 值,从而形成 JSON 格式的单行记录。

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }

例 :具有匹配模式的 LOGTOJSON 配置

下面是一个转换为 JSON 格式的 Apache 通用日志条目的 LOGTOJSON 配置示例,其中省略了最后一个字段 (bytes):

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

转换前:

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

转换后:

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

代理 CLI 命令

系统启动时自动启动代理:

sudo chkconfig aws-kinesis-agent on

检查代理的状态:

sudo service aws-kinesis-agent status

停止代理:

sudo service aws-kinesis-agent stop

从此位置读取代理的日志文件:

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

卸载代理:

sudo yum remove aws-kinesis-agent