开始使用 Amazon Kinesis Data Streams 流式摄取 - Amazon Redshift
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

开始使用 Amazon Kinesis Data Streams 流式摄取

Amazon Redshift 流式摄取的设置涉及创建映射到流数据来源的外部 Schema 以及创建引用外部 Schema 的实体化视图。Amazon Redshift 流式摄取支持将 Kinesis Data Streams 作为源。因此,在配置流式摄取之前,您必须有可用的 Kinesis Data Streams 源。如果还没有源,请按照 Kinesis 文档 Amazon Kinesis Data Streams 入门中的说明执行操作,或者按照通过 Amazon 管理控制台创建流中的说明在控制台上创建一个源。

Amazon Redshift 流式摄取使用实体化视图,该视图将在 REFRESH 运行时直接从流中更新。实体化视图映射到流数据来源。在实体化视图定义中,您可以对流数据执行筛选和聚合。流式摄取实体化视图(基本实体化视图)只能引用一个流,但是您可以创建额外的实体化视图,以与基本实体化视图和其他实体化视图或表连接使用。

注意

流式摄取和 Amazon Redshift Serverless – 本主题中的配置步骤同时适用于预调配的 Amazon Redshift 集群和 Amazon Redshift Serverless。有关更多信息,请参阅 流式摄取行为和数据类型

假设您有 Kinesis Data Streams 流可用,第一步是使用 CREATE EXTERNAL SCHEMA 在 Amazon Redshift 中定义一个 Schema,并引用某个 Kinesis Data Streams 资源。之后,要访问流中的数据,请在实体化视图中定义 STREAM。您可以用半结构化的 SUPER 格式存储流记录,或者定义一个会将数据转换为 Redshift 数据类型的 Schema。当您查询实体化视图时,返回的记录是流的时间点视图。

  1. 使用允许 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组代入该角色的信任策略创建 IAM 角色。有关如何为 IAM 角色配置信任策略的信息,请参阅授权 Amazon Redshift 代表您访问其他 Amazon 服务。创建后的角色应具有以下 IAM 策略,提供与 Amazon Kinesis 数据流进行通信的权限。

    来自 Kinesis 数据流的未加密流 IAM 策略

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream" ], "Resource": "arn:aws:kinesis:*:0123456789:stream/*" }, { "Sid": "ListStream", "Effect": "Allow", "Action": [ "kinesis:ListStreams", "kinesis:ListShards" ], "Resource": "*" } ] }

    来自 Kinesis 数据流的加密流 IAM 策略

    { "Version": "2012-10-17", "Statement": [{ "Sid": "ReadStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream" ], "Resource": "arn:aws:kinesis:*:0123456789:stream/*" }, { "Sid": "DecryptStream", "Effect": "Allow", "Action": [ "kms:Decrypt" ], "Resource": "arn:aws:kms:us-east-1:0123456789:key/1234abcd-12ab-34cd-56ef-1234567890ab" }, { "Sid": "ListStream", "Effect": "Allow", "Action": [ "kinesis:ListStreams", "kinesis:ListShards" ], "Resource": "*" } ] }
  2. 检查您的 VPC,并确认您的 Amazon Redshift 集群或 Amazon Redshift Serverless 拥有使用 NAT 网关或互联网网关通过互联网到达 Kinesis Data Streams 端点的路由。如果您想让 Redshift 和 Kinesis Data Streams 之间的流量保持在 Amazon 网络内,可以考虑使用 Kinesis 接口 VPC 端点。有关更多信息,请参阅将 Amazon Kinesis Data Streams 与接口 VPC 端点结合使用

  3. 在 Amazon Redshift 中,创建外部 Schema 以将 Kinesis 中的数据映射到某个 Schema。

    CREATE EXTERNAL SCHEMA kds FROM KINESIS IAM_ROLE { default | 'iam-role-arn' };

    Kinesis Data Streams 的流式摄取不需要身份验证类型。它使用在 CREATE EXTERNAL SCHEMA 语句中定义的 IAM 角色来发出 Kinesis Data Streams 请求。

    可选:使用 REGION 关键字指定 Amazon Kinesis Data Streams 或 Amazon MSK 流所在的区域。

    CREATE EXTERNAL SCHEMA kds FROM KINESIS REGION 'us-west-2' IAM_ROLE { default | 'iam-role-arn' };

    在此示例中,区域指定了源流的位置。IAM_ROLE 就是一个示例。

  4. 创建一个实体化视图以使用流数据。使用如下语句,如果无法解析记录,则会导致错误。如果您不想跳过错误记录,请使用类似这样的命令。

    CREATE MATERIALIZED VIEW my_view AUTO REFRESH YES AS SELECT * FROM kds.my_stream_name;

    以下示例定义了采用 JSON 格式的源数据的实体化视图。该视图可验证传入数据的 JSON 格式是否正确。Kinesis 流名称区分大小写,可以包含大写字母和小写字母。要从具有大写名称的流中摄取,可以在数据库级别将配置 enable_case_sensitive_identifier 设置为 true。有关更多信息,请参阅名称和标识符enable_case_sensitive_identifier

    CREATE MATERIALIZED VIEW my_view AUTO REFRESH YES AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, refresh_time, JSON_PARSE(kinesis_data) as kinesis_data FROM kds.my_stream_name WHERE CAN_JSON_PARSE(kinesis_data);

    要开启自动刷新,请使用 AUTO REFRESH YES。默认行为是手动刷新。请注意,当您使用 CAN_JSON_PARSE 时,可能会跳过无法解析的记录。

    元数据列包括以下内容:

    元数据列 数据类型 描述
    approximate_arrival_timestamp 不带时区的时间戳 记录插入 Kinesis 流的大致时间
    partition_key varchar(256) Kinesis 用于将记录分配给分片的键
    shard_id char(20) 从中检索记录的流式内的分片的唯一标识符
    sequence_number varchar(128) Kinesis 分片中的记录的唯一标识符
    refresh_time 不带时区的时间戳 刷新开始的时间
    kinesis_data varbyte 来自 Kinesis 流式的记录

    请务必注意,如果您的实体化视图定义中有业务逻辑,那么在某些情况下,业务逻辑错误可能会导致流式摄取被阻止。这可能会导致您不得不删除实体化视图,然后重新创建。为避免这种情况,我们建议您尽可能简化逻辑,并在摄取数据后对数据进行大部分业务逻辑检查。

  5. 刷新视图,这会调用 Redshift 从流式中读取数据并将数据加载到实体化视图中。

    REFRESH MATERIALIZED VIEW my_view;
  6. 在实体化视图中查询数据。

    select * from my_view;