开始使用 Amazon Managed Streaming for Apache Kafka 串流摄取 - Amazon Redshift
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

开始使用 Amazon Managed Streaming for Apache Kafka 串流摄取

Amazon Redshift 串流摄取的目的是简化将串流数据直接从串流服务摄取到 Amazon Redshift 或 Amazon Redshift Serverless 的过程。这适用于 Amazon MSK 和 Amazon MSK Serverless 以及 Kinesis。使用 Amazon Redshift 串流摄取时,在将串流数据摄取到 Redshift 之前,无需在 Amazon S3 中暂存 Kinesis Data Streams 流或 Amazon MSK 主题。

在技术层面上,来自 Amazon Kinesis Data Streams 和 Amazon Managed Streaming for Apache Kafka 的串流摄取以低延迟、高速度的方式将串流或主题数据摄取到 Amazon Redshift 实体化视图中。设置完成后,使用实体化视图刷新,可以接收大量数据。

通过执行以下步骤,为 Amazon MSK 设置 Amazon Redshift 串流摄取:

  1. 创建映射到串流数据来源的外部 Schema。

  2. 创建引用外部 Schema 的实体化视图。

在配置 Amazon Redshift 串流摄取之前,您必须有可用的 Amazon MSK 源。如果您没有源,请按照开始使用 Amazon MSK 中的说明进行操作。

注意

串流摄取和 Amazon Redshift Serverless – 本主题中的配置步骤同时适用于预调配的 Amazon Redshift 集群和 Amazon Redshift Serverless。有关更多信息,请参阅 串流摄取注意事项

设置 IAM 并从 Kafka 执行串流摄取

假设您有可用的 Amazon MSK 集群,第一步是使用 CREATE EXTERNAL SCHEMA 在 Redshift 中定义一个架构并引用 Kafka 主题作为数据来源。之后,要访问主题中的数据,请在实体化视图中定义 STREAM。您可以用半结构化 SUPER 格式存储来自主题的记录,或者定义一个会将数据转换为 Amazon Redshift 数据类型的 Schema。当您查询实体化视图时,返回的记录是主题的时间点视图。

  1. 使用允许 Amazon Redshift 集群或 Amazon Redshift Serverless 代入 IAM 角色的信任策略创建该角色。有关如何为 IAM 角色配置信任策略的信息,请参阅授权 Amazon Redshift 代表您访问其他 Amazon 服务。创建角色后,它应具有以下 IAM 策略,从而提供与 Amazon MSK 集群进行通信的权限。如果您使用 Amazon MSK,则所需的策略取决于集群上使用的身份验证方法。有关 Amazon MSK 中可用的身份验证方法,请参阅 Apache Kafka API 的身份验证和授权

    使用未经身份验证的访问权限时的 Amazon MSK IAM 策略:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "kafka:GetBootstrapBrokers" ], "Resource": "*" } ] }

    使用 IAM 身份验证时的 Amazon MSK IAM 策略:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "MSKIAMpolicy", "Effect": "Allow", "Action": [ "kafka-cluster:ReadData", "kafka-cluster:DescribeTopic", "kafka-cluster:Connect" ], "Resource": [ "arn:aws:kafka:*:0123456789:cluster/*/*", "arn:aws:kafka:*:0123456789:topic/*/*/*" ] }, { "Sid": "MSKPolicy", "Effect": "Allow", "Action": [ "kafka:GetBootstrapBrokers" ], "Resource": "*" } ] }
  2. 检查您的 VPC,并确认您的 Amazon Redshift 集群或 Amazon Redshift Serverless 拥有通往 Amazon MSK 集群的路由。Amazon MSK 集群的入站安全组规则应允许 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组的安全组。如果您使用 Amazon MSK,则您指定的端口取决于集群上使用的身份验证方法。有关更多信息,请参阅端口信息从 Amazon 内但在 VPC 外部访问

    请注意,对于串流摄取,不支持通过 mTLS 进行客户端身份验证。有关更多信息,请参阅限制

    下表显示了为从 Amazon MSK 进行串流摄取所要设置的免费配置选项:

    Amazon Redshift 配置 Amazon MSK 配置 要在 Redshift 和 Amazon MSK 之间打开的端口
    AUTHENTICATION NONE TLS 传输已禁用 9092
    AUTHENTICATION NONE TLS 传输已启用 9094
    AUTHENTICATION IAM IAM 9098/9198

    Amazon Redshift 身份验证是在 CREATE EXTERNAL SCHEMA 语句中设置的。

    如果 Amazon MSK 集群启用了相互传输层安全性协议 (mTLS) 身份验证,则将 Amazon Redshift 配置为使用 AUTHENTICATION NONE 会指示它使用端口 9094 进行未经身份验证的访问。但是,由于 mTLS 身份验证正在使用该端口,因此这一过程将失败。因此,当您使用 mTLS 时,我们建议您切换到 AUTHENTICATION IAM。

  3. 在 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组中启用增强型 VPC 路由。有关更多信息,请参阅启用增强型 VPC 路由

    注意

    为了检索 Amazon MSK 引导程序代理 URL,Amazon Redshift 使用附加的 IAM 角色提供的权限进行 GetBootstrapBrokers API 调用。请注意,为了使此请求在启用增强型 VPC 路由时获得成功,您的 Amazon Redshift 预调配集群或 Amazon Redshift Serverless 工作组的子网必须具有 NAT 网关或互联网网关。您的网络 ACL 和上述子网的安全组出站规则还必须允许访问 Amazon MSK API 服务端点。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 端点和限额

  4. 在 Amazon Redshift 中,创建一个外部 Schema 以映射到 Amazon MSK 集群。

    CREATE EXTERNAL SCHEMA MySchema FROM MSK IAM_ROLE { default | 'iam-role-arn' } AUTHENTICATION { none | iam } CLUSTER_ARN 'msk-cluster-arn';

    FROM 子句中,Amazon MSK 表示模式映射来自托管式 Kafka 服务的数据。

    当您创建外部模式时,Amazon MSK 的串流摄取提供以下身份验证类型:

    • – 指定没有身份验证步骤。

    • iam – 指定 IAM 身份验证。选择此选项时,请确保 IAM 角色具有 IAM 身份验证的权限。

    串流摄取不支持其他 Amazon MSK 身份验证方法,例如 TLS 身份验证或用户名和密码。

    CLUSTER_ARN 指定要从中进行流式传输的 Amazon MSK 集群。

  5. 创建一个实体化视图以使用来自主题的数据。以下示例定义了一个包含 JSON 源数据的实体化视图。请注意,以下视图会确认数据是有效 JSON 和 utf8。Kafka 主题名称区分大小写,可以包含大写字母和小写字母。要从具有大写名称的主题中摄取,可以在数据库级别将配置 enable_case_sensitive_identifier 设置为 true。有关更多信息,请参阅名称和标识符enable_case_sensitive_identifier

    CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS SELECT kafka_partition, kafka_offset, kafka_timestamp_type, kafka_timestamp, kafka_key, JSON_PARSE(kafka_value) as Data, kafka_headers FROM MySchema."mytopic" WHERE CAN_JSON_PARSE(kafka_value);

    要开启自动刷新,请使用 AUTO REFRESH YES。默认行为是手动刷新。

    元数据列包括以下内容:

    元数据列 数据类型 描述
    kafka_partition bigint 来自 Kafka 主题的记录的分区 ID
    kafka_offset bigint Kafka 主题中给定分区的记录的偏移
    kafka_timestamp_type char(1)

    Kafka 记录中使用的时间戳类型:

    • C – 客户端的记录创建时间 (CREATE_TIME)

    • L – Kafka 服务器端的记录追加时间 (LOG_APPEND_TIME)

    • U – 记录创建时间不可用 (NO_TIMESTAMP_TYPE)

    kafka_timestamp 不带时区的时间戳 记录的时间戳值
    kafka_key varbyte Kafka 记录的键
    kafka_value varbyte 从 Kafka 收到的记录
    kafka_headers super 从 Kafka 收到的记录的标头
    refresh_time 不带时区的时间戳 刷新开始的时间

    请务必注意,如果您的实体化视图定义中有业务逻辑,那么在某些情况下,解析错误可能会导致串流摄取被阻止。这可能会导致您不得不删除实体化视图,然后重新创建它。为避免这种情况,我们建议您尽可能简化解析逻辑,并在摄取数据后对数据进行大部分业务逻辑检查。以下示例显示了如何使用 CAN_JSON_PARSE 函数 以防出现错误并更成功地摄取数据。

  6. 刷新视图,这会调用 Amazon Redshift 从主题中读取数据并将数据加载到实体化视图中。

    REFRESH MATERIALIZED VIEW MyView;
  7. 在实体化视图中查询数据。

    select * from MyView;

    REFRESH 运行时,直接从主题更新实体化视图。您创建映射到 Kafka 主题数据来源的实体化视图。在实体化视图定义中,您可以对数据执行筛选和聚合。串流摄取实体化视图(基本实体化视图)只能引用一个 Kafka 主题,但是您可以创建额外的实体化视图,以与基本实体化视图和其他实体化视图或表连接使用。

有关串流摄取限制的更多信息,请参阅 限制