本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:使用 Python 向Amazon S3 发送流媒体数据
在本练习中,您将创建一个 Python Kinesis Data Analytics 应用程序,该应用程序将数据传输到亚马逊简单存储服务接收器。
注意
要为本练习设置所需的先决条件,请先完成入门指南 (Python)练习。
本主题包含下列部分:
创建相关资源
在为本练习创建 Kinesis Data Analytics 应用程序之前,您需要创建以下依赖资源:
一个 Kinesis 数据流 (
ExampleInputStream
)用于存储应用程序代码和输出的 Amazon S3 存储桶 (
ka-app-code-
)<username>
注意
在 Kinesis Data Analytics 上启用服务器端加密后,适用于 Apache Flink 的 Kinesis Data Analytics 无法将数据写入Amazon S3。
您可以使用控制台创建 Kinesis 直播桶和Amazon S3 存储桶和Amazon S3 存储桶 有关创建这些资源的说明,请参阅以下主题:
在 Amazon Kinesis Data Streams 开发人员指南中@@ 创建和更新数据流。将数据流命名为
ExampleInputStream
。如何创建 S3 存储桶? 在 Amazon Service 用户指南中。通过附加您的登录名,为 Amazon S3 存储桶指定一个全球唯一的名称,例如
ka-app-code-
。<username>
将示例记录写入输入流
在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。
注意
此部分需要 Amazon SDK for Python (Boto)
注意
本节中的 Python 脚本使用Amazon CLI. 您必须将您的配置Amazon CLI为使用您的账户凭证和默认区域。若要配置您的Amazon CLI,请输入以下内容:
aws configure
-
使用以下内容创建名为
stock.py
的文件:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
运行
stock.py
脚本:$ python stock.py
在完成本教程的其余部分时,请将脚本保持运行状态。
下载并检查应用程序代码
此示例的 Python 应用程序代码可从以下网址获得 GitHub。要下载应用程序代码,请执行以下操作:
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
导航到
amazon-kinesis-data-analytics-java-examples/python/S3Sink
目录。
应用程序代码位于 streaming-file-sink.py
文件中。请注意有关应用程序代码的以下信息:
该应用程序使用 Kinesis 表源从源数据流中读取数据。以下代码段调用该
create_source_table
函数来创建 Kinesis 表源:table_env.execute_sql( create_source_table(input_table_name, input_stream, input_region, stream_initpos) )
该
create_source_table
函数使用 SQL 命令创建由流式传输源支持的表import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
此应用程序使用
filesystem
连接器将记录发送到 Amazon S3 存储桶:def create_sink_table(table_name, bucket_name): return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time VARCHAR(64) ) PARTITIONED BY (ticker) WITH ( 'connector'='filesystem', 'path'='s3a://{1}/', 'format'='csv', 'sink.partition-commit.policy.kind'='success-file', 'sink.partition-commit.delay' = '1 min' ) """.format(table_name, bucket_name)
该应用程序使用 flink-sql-connector-kinesis-1.15.2.jar
文件中的 Kinesis Flink 连接器。
压缩并上传 Apache Flink 直播 Python 代码
在本部分中,您将应用程序代码上传到您在该创建相关资源部分创建的 Amazon S3 存储桶。
使用您首选的压缩应用程序压缩
streaming-file-sink.py
和 flink-sql-connector-kinesis-1.15.2.jar文件。命名档案 myapp.zip
。-
在 Amazon S3 控制台中,选择 ka-app-code-
<username>存储桶,然后选择上传。
-
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
myapp.zip
文件。 您无需更改该对象的任何设置,因此,请选择 Upload (上传)。
您的应用程序代码现在存储在 Amazon S3 桶中,您的应用程序可以在其中访问。
创建并运行 Kinesis Data Analytics 应用程序
按照以下步骤,使用控制台创建、配置、更新和运行应用程序。
创建 应用程序
打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
在 Kinesis Data Analytics 仪表板上,选择创建分析应用程序。
-
在 Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于 Application name (应用程序名称),输入
MyApplication
。 -
对于 Runtime (运行时),请选择 Apache Flink。
注意
Kinesis Data Analytics 使用 Apache Flink 版本 1.15.2。
将版本下拉列表保留为 Apache Flink 版本 1.15.2(推荐版本)。
-
-
对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2
。 -
选择创建应用程序。
注意
当您使用控制台创建 Kinesis Data Analytics 应用程序时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:
-
策略:
kinesis-analytics-service-
MyApplication
-us-west-2
-
角色:
kinesis-analytics-
MyApplication
-us-west-2
配置应用程序
-
在MyApplication页面上,选择配置。
-
在 Configure application (配置应用程序) 页面上,提供 Code location (代码位置):
-
对于 Amazon S3 存储桶,输入
ka-app-code-
。<username>
-
在 Amazon S3 对象的路径中,输入
myapp.zip
。
-
-
在 Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色)
kinesis-analytics-MyApplication-us-west-2
。 -
在 “属性” 下,选择 “添加群组”。
-
输入以下应用程序属性和值:
组 ID 键 值 consumer.config.0
input.stream.name
ExampleInputStream
consumer.config.0
aws.region
us-west-2
consumer.config.0
scan.stream.initpos
LATEST
选择保存。
在 “属性” 下,再次选择 “添加群组”。对于群组 ID,输入
kinesis.analytics.flink.run.options
。这个特殊的属性组告诉你的应用程序在哪里可以找到它的代码资源。有关更多信息,请参阅指定您的代码文件:输入以下应用程序属性和值:
组 ID 键 值 kinesis.analytics.flink.run.options
python
streaming-file-sink.py
kinesis.analytics.flink.run.options
jarfile
S3Sink/lib/flink-sql-connector-kinesis-1.15.2.jar
在 “属性” 下,再次选择 “添加群组”。对于群组 ID,输入
sink.config.0
。这个特殊的属性组告诉你的应用程序在哪里可以找到它的代码资源。有关更多信息,请参阅指定您的代码文件:输入以下应用程序属性和值:(将存储
桶名称
替换为您的 Amazon S3 存储桶的实际名称。)组 ID 键 值 sink.config.0
output.bucket.name
bucket-name
-
在 Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)。
-
要进行CloudWatch 记录,请选中 “启用” 复选框。
-
选择更新。
注意
当您选择启用 CloudWatch 日志记录时,Kinesis Data Analytics 会为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication
-
日志流:
kinesis-analytics-log-stream
该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。
编辑 IAM 策略
编辑 IAM 策略以添加访问 Kinesis 数据流的权限。
通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/
。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-west-2
策略。 -
在 Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901
) 替换为您的账户 ID。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:
012345678901
:log-group:*", "arn:aws:s3:::ka-app-code-<username>
/myapp.zip" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteObjects", "Effect": "Allow", "Action": [ "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-code-<username>", "arn:aws:s3:::ka-app-code-<username>/*" ] }
运行应用程序
可以通过运行应用程序、打开 Apache Flink 仪表板并选择所需的 Flink 作业来查看 Flink 任务图。
您可以在 CloudWatch 控制台上查看 Kinesis Data Analytics 指标,以验证应用程序是否正常运行。
清理 Amazon 资源
本节包含清理在滑动窗口教程中创建的 Amazon 资源的过程。
本主题包含下列部分:
删除 Kinesis Data Analytics
打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 在 Kinesis Data Analytics 面板中,选择MyApplication。
在应用程序的页面中,选择 Delete (删除),然后确认删除。
删除 Kinesis Data Streams
打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis
。 在 Kinesis Data Streams 面板中,选择ExampleInputStream。
在ExampleInputStream页面中,选择 “删除 Kinesis Stream”,然后确认删除。
删除您的Amazon S3 对象和存储桶
通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/
。 选择 ka-app-code-
存储桶。 <username>
选择 Delete (删除),然后输入存储桶名称以确认删除。
删除您的 IAM 资源
通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/
。 在导航栏中,选择策略。
在筛选条件控件中,输入 kinesis。
选择 kinesis-analytics-service-MyApplication-
<your-region>策略。
选择 Policy Actions (策略操作),然后选择 Delete (删除)。
在导航栏中,选择 Roles(角色)。
选择 k inesis-analyticsMyApplication-
<your-region>角色。
选择 Delete role (删除角色),然后确认删除。
删除您的 CloudWatch 资源
通过 https://console.aws.amazon.com/cloudwatch/
打开 CloudWatch 主机。 在导航栏中,选择 Logs (日志)。
选择 /aws/kinesis-analytics/MyApplication 日志组。
选择 Delete Log Group (删除日志组),然后确认删除。