开始使用 Amazon Kinesis Data Streams 串流摄取 - Amazon Redshift
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

开始使用 Amazon Kinesis Data Streams 串流摄取

Amazon Redshift 串流摄取的设置涉及创建映射到流数据源的外部 Schema 以及创建引用外部 Schema 的实体化视图。Amazon Redshift 串流摄取支持将 Kinesis Data Streams 作为源。因此,在配置串流摄取之前,您必须有可用的 Kinesis Data Streams 源。如果还没有源,请按照 Kinesis 文档 Amazon Kinesis Data Streams 入门中的说明执行操作,或者按照通过 Amazon 管理控制台创建流中的说明在控制台上创建一个源。

Amazon Redshift 串流摄取使用实体化视图,该视图将在 REFRESH 运行时直接从流中更新。实体化视图映射到流数据源。在实体化视图定义中,您可以对流数据执行筛选和聚合。串流摄取实体化视图(基本实体化视图)只能引用一个流,但是您可以创建额外的实体化视图,以与基本实体化视图和其他实体化视图或表连接使用。

注意

串流摄取和 Amazon Redshift Serverless – 本主题中的配置步骤同时适用于预调配的 Amazon Redshift 集群和 Amazon Redshift Serverless。有关更多信息,请参阅 串流摄取注意事项

假设您有 Kinesis Data Streams 流可用,第一步是使用 CREATE EXTERNAL SCHEMA 在 Amazon Redshift 中定义一个 Schema,并引用某个 Kinesis Data Streams 资源。之后,要访问流中的数据,请在实体化视图中定义 STREAM。您可以用半结构化的 SUPER 格式存储流记录,或者定义一个会将数据转换为 Redshift 数据类型的 Schema。当您查询实体化视图时,返回的记录是流的时间点视图。

  1. 使用允许 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组代入该角色的信任策略创建 IAM 角色。有关如何为 IAM 角色配置信任策略的信息,请参阅授权 Amazon Redshift 代表您访问其他 Amazon 服务。创建后的角色应具有以下 IAM 策略,提供与 Amazon Kinesis 数据流进行通信的权限。

    来自 Kinesis 数据流的未加密流 IAM 策略

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream" ], "Resource": "arn:aws:kinesis:*:0123456789:stream/*" }, { "Sid": "ListStream", "Effect": "Allow", "Action": [ "kinesis:ListStreams", "kinesis:ListShards" ], "Resource": "*" } ] }

    来自 Kinesis 数据流的加密流 IAM 策略

    { "Version": "2012-10-17", "Statement": [{ "Sid": "ReadStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream" ], "Resource": "arn:aws:kinesis:*:0123456789:stream/*" }, { "Sid": "DecryptStream", "Effect": "Allow", "Action": [ "kms:Decrypt" ], "Resource": "arn:aws:kms:us-east-1:0123456789:key/1234abcd-12ab-34cd-56ef-1234567890ab" }, { "Sid": "ListStream", "Effect": "Allow", "Action": [ "kinesis:ListStreams", "kinesis:ListShards" ], "Resource": "*" } ] }
  2. 检查您的 VPC,并确认您的 Amazon Redshift 集群或 Amazon Redshift Serverless 拥有使用 NAT 网关或互联网网关通过互联网到达 Kinesis Data Streams 端点的路由。如果您想让 Redshift 和 Kinesis Data Streams 之间的流量保持在 Amazon 网络内,可以考虑使用 Kinesis 接口 VPC 端点。有关更多信息,请参阅将 Amazon Kinesis Data Streams 与接口 VPC 端点结合使用

  3. 在 Amazon Redshift 中,创建外部 Schema 以将 Kinesis 中的数据映射到某个 Schema。

    CREATE EXTERNAL SCHEMA kds FROM KINESIS IAM_ROLE { default | 'iam-role-arn' };

    Kinesis Data Streams 的串流摄取不需要身份验证类型。它使用在 CREATE EXTERNAL SCHEMA 语句中定义的 IAM 角色来发出 Kinesis Data Streams 请求。

    可选:使用 REGION 关键字指定 Amazon Kinesis Data Streams 或 Amazon MSK 流所在的区域。

    CREATE EXTERNAL SCHEMA kds FROM KINESIS REGION 'us-west-2' IAM_ROLE { default | 'iam-role-arn' };

    在此示例中,区域指定了源流的位置。IAM_ROLE 就是一个示例。

  4. 创建一个具体化视图以使用流数据。以下示例定义了一个包含 JSON 源数据的实体化视图。请注意,以下视图会验证数据是否为有效的 JSON 源。Kinesis 流名称区分大小写,可以包含大写字母和小写字母。要从具有大写名称的流中摄取,可以在数据库级别将配置 enable_case_sensitive_identifier 设置为 true。有关更多信息,请参阅名称和标识符enable_case_sensitive_identifier

    CREATE MATERIALIZED VIEW my_view AUTO REFRESH YES AS SELECT approximate_arrival_timestamp, JSON_PARSE(kinesis_data) as Data FROM kds.my_stream_name WHERE CAN_JSON_PARSE(kinesis_data);

    要开启自动刷新,请使用 AUTO REFRESH YES。默认行为是手动刷新。

    元数据列包括以下内容:

    元数据列 数据类型 描述
    approximate_arrival_timestamp 不带时区的时间戳 记录插入 Kinesis 流的大致时间
    partition_key varchar(256) Kinesis 用于将记录分配给分片的键
    shard_id char(20) 从中检索记录的串流内的分片的唯一标识符
    sequence_number varchar(128) Kinesis 分片中的记录的唯一标识符
    refresh_time 不带时区的时间戳 刷新开始的时间
    kinesis_data varbyte 来自 Kinesis 串流的记录

    请务必注意,如果您的实体化视图定义中有业务逻辑,那么在某些情况下,解析错误可能会导致串流摄取被阻止。这可能会导致您不得不删除实体化视图,然后重新创建它。为避免这种情况,我们建议您尽可能简化解析逻辑,并在摄取数据后对数据进行大部分业务逻辑检查。

  5. 刷新视图,这会调用 Redshift 从串流中读取数据并将数据加载到实体化视图中。

    REFRESH MATERIALIZED VIEW my_view;
  6. 在实体化视图中查询数据。

    select * from my_view;