

 从补丁 198 开始，Amazon Redshift 将不再支持创建新的 Python UDF。现有的 Python UDF 将继续正常运行至 2026 年 6 月 30 日。有关更多信息，请参阅[博客文章](https://www.amazonaws.cn/blogs/big-data/amazon-redshift-python-user-defined-functions-will-reach-end-of-support-after-june-30-2026/)。

# 从 Apache Kafka 源进行流式摄取入门
<a name="materialized-view-streaming-ingestion-getting-started-MSK"></a>

本主题介绍如何通过实体化视图使用来自 Amazon MSK、Apache Kafka 或 Confluent Cloud 的流数据。

 Amazon Redshift 流式摄取的目的是简化将流式数据直接从流式服务摄取到 Amazon Redshift 或 Amazon Redshift Serverless 的过程。这适用于 Amazon MSK Provisioned 和 Amazon MSK Serverless 以及开源 Apache Kafka 和 Confluent Cloud。使用 Amazon Redshift 流式摄取时，在将流数据摄取到 Redshift 之前，无需在 Amazon S3 中暂存 Apache Kafka 主题。

 在技术层面上，流式摄取以低延迟、高速度的方式，将流或主题数据摄取到 Amazon Redshift 实体化视图中。设置完成后，使用实体化视图刷新，可以接收大量数据。

在配置 Amazon Redshift 流式摄取之前，您必须有可用的 Apache Kafka 源。如果没有源，请按照以下说明创建一个源：
+ **Amazon MSK** - [Getting Started Using Amazon MSK](https://docs.amazonaws.cn/msk/latest/developerguide/getting-started.html)
+ **Apache Kafka** - [Apache Kafka Quickstart](https://kafka.apache.org/quickstart)
+ **Confluent Cloud** - [Quick Start for Confluent Cloud](https://docs.confluent.io/cloud/current/get-started/index.html)

## 设置从 Kafka 进行流式摄取
<a name="materialized-view-streaming-ingestion-getting-started-MSK-setup"></a>

使用以下步骤设置从 Amazon MSK 或非 Amazon 托管的 Apache Kafka 源（Apache Kafka 和 Confluent Cloud）流式摄取到 Amazon Redshift 的过程。

**Topics**
+ [设置身份验证](#materialized-view-streaming-ingestion-getting-started-MSK-setup-auth)
+ [设置 VPC](#materialized-view-streaming-ingestion-getting-started-MSK-Setup-VPC)
+ [创建实体化视图](#materialized-view-streaming-ingestion-getting-started-MSK-setup-materialized-view)

### 设置身份验证
<a name="materialized-view-streaming-ingestion-getting-started-MSK-setup-auth"></a>

本节介绍如何设置身份验证来支持 Amazon Redshift 应用程序访问 Amazon MSK 源。

创建应用程序的角色后，请附加以下策略之一，以支持访问 Amazon MSK、Apache Kafka 或 Confluent Cloud 集群。对于 mTLS 身份验证，可以将 Amazon Redshift 使用的证书存储在 ACM 或 Secrets Manager 中，因此，您必须选择与证书存储位置相匹配的策略。

请注意，当您使用直接流式摄取，将任何支持的 Apache Kafka 流式来源摄取到 Amazon Redshift 中时，身份验证或传输中数据不支持自签名证书。这包括 Amazon MSK、Apache Kafka 和 Confluent Cloud。请考虑使用由 Amazon Certificate Manager 或任何其他公开可信证书颁发机构生成的证书。

只有 Kafka 版本 2.7.1 或更高版本才支持使用 MSK 进行 Amazon Redshift IAM 身份验证。

**AUTHENTICATION IAM（仅限 Amazon MSK）：**

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "MSKIAMpolicy",
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:Connect"
            ],
            "Resource": [
                "arn:aws:kafka:*:111122223333:cluster/MyTestCluster/*",
                "arn:aws:kafka:*:111122223333:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:*:111122223333:group/MyTestCluster/*"
            ]
        }
    ]
}
```

------

**AUTHENTICATION MTLS：使用存储在 Amazon Certificate Manager 中的证书**

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "MSKmTLSACMpolicy",
            "Effect": "Allow",
            "Action": [
                "acm:ExportCertificate" 
            ],
            "Resource": [
                "arn:aws:acm:us-east-1:444455556666:certificate/certificate_ID"
            ]
        }
    ]
}
```

------

**AUTHENTICATION MTLS：使用存储在 Amazon Secrets Manager 中的证书**

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "MSKmTLSSecretsManagerpolicy",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue" 
            ],
            "Resource": [
                "arn:aws:secretsmanager:us-east-1:444455556666:secret:secret_ID"
            ]
        }
    ]
}
```

------

------
#### [ Amazon MSK ]

如果您使用 AUTHENTICATION NONE 连接到 Amazon MSK 源，则不需要任何 IAM 角色。但是，如果使用 AUTHENTICATION IAM 或 MTLS 向 Amazon MSK 集群进行身份验证，则 Amazon Redshift 集群或 Amazon Redshift Serverless 命名空间必须附加具有适当权限的 IAM 角色。使用允许 Amazon Redshift 集群或 Amazon Redshift Serverless 命名空间代入 IAM 角色的信任策略创建该角色。创建角色后，添加以下权限之一以支持 IAM 或 MTLS。对于 mTLS 身份验证，Amazon Redshift 使用的证书可以存储在 Amazon Certificate Manager 或 Amazon Secrets Manager 中，因此必须选择与证书存储位置相匹配的策略。将角色附加到 Amazon Redshift 预置集群或 Redshift Serverless 命名空间。有关如何为 IAM 角色配置信任策略的信息，请参阅[授权 Amazon Redshift 代表您访问其他 Amazon 服务](https://docs.amazonaws.cn/redshift/latest/mgmt/authorizing-redshift-service.html)。

下表显示了为从 Amazon MSK 进行流式摄取所要设置的免费配置选项：


| Amazon Redshift 配置 | Amazon MSK 配置 | 要在 Redshift 和 Amazon MSK 之间打开的端口 | 
| --- | --- | --- | 
|  AUTHENTICATION NONE  |  TLS 传输已禁用  | 9092 | 
|  AUTHENTICATION NONE  |  TLS 传输已启用  | 9094 | 
|  AUTHENTICATION IAM  |  IAM  | 9098/9198 | 
|  AUTHENTICATION MTLS  |  TLS 传输已启用  | 9094 | 

Amazon Redshift 身份验证是在 CREATE EXTERNAL SCHEMA 语句中设置的。

**注意**  
如果 Amazon MSK 集群启用了相互传输层安全性协议 (mTLS) 身份验证，则将 Amazon Redshift 配置为使用 AUTHENTICATION NONE 会指示它使用端口 9094 进行未经身份验证的访问。但是，由于 mTLS 身份验证正在使用该端口，因此这一过程将失败。因此，我们建议您在使用 mTLS 时切换到 AUTHENTICATION mtls。

------
#### [ Apache Kafka or Confluent Cloud ]

对于 Apache Kafka 和 Confluent Cloud，Amazon Redshift 支持以下连接协议：
+ 连接到 Apache Kafka 时，可以使用 mTLS 或具有 TLS 的纯文本传输进行身份验证。
+ 连接到 Confluent Cloud 时，只能使用 mTLS 进行身份验证。

Amazon Redshift 支持使用以下加密协议来连接到 Apache Kafka 或 Confluent Cloud：

**Apache Kafka 和 Confluent Cloud 支持的身份验证方法**


| Amazon Redshift | Kafka 安全协议 | Apache Kafka 支持 | Confluent Cloud 支持 | 
| --- | --- | --- | --- | 
| AUTHENTICATION NONE | PLAINTEXT | 否 | 否 | 
| AUTHENTICATION NONE | SSL | 是 | 否 | 
| AUTHENTICATION IAM | SASL\$1SSL | 否 | 否 | 
| AUTHENTICATION MTLS | SSL | 是（带证书） | 是（带证书） | 

请注意，Amazon Redshift 不支持 SASL/SCRAM 或 SASL/PLAINTEXT。

------

### 设置 VPC
<a name="materialized-view-streaming-ingestion-getting-started-MSK-Setup-VPC"></a>

创建身份验证资源后，检查您的 VPC，并确认 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组拥有通往 Apache Kafka 源的路由。

**注意**  
对于 Amazon MSK，Amazon MSK 集群的入站安全组规则应支持 Amazon Redshift 集群或 Redshift Serverless 工作组的安全组。指定的端口取决于 Amazon MSK 集群上配置的身份验证方法。有关更多信息，请参阅[端口信息](https://docs.amazonaws.cn/msk/latest/developerguide/port-info.html)和[从 Amazon 内但在 VPC 外部访问](https://docs.amazonaws.cn/msk/latest/developerguide/aws-access.html)。

下一步，在 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组中启用增强型 VPC 路由。有关更多信息，请参阅[启用增强型 VPC 路由](https://docs.amazonaws.cn/redshift/latest/mgmt/enhanced-vpc-enabling-cluster.html)。

### 创建实体化视图
<a name="materialized-view-streaming-ingestion-getting-started-MSK-setup-materialized-view"></a>

在本节中，您将设置实体化视图，Amazon Redshift 使用该视图来访问 Apache Kafka 流数据。

假设您有可用的 Apache Kafka 集群，第一步是使用 `CREATE EXTERNAL SCHEMA` 在 Redshift 中定义一个架构，并引用该集群作为数据来源。之后，要访问主题中的数据，请在实体化视图中定义 `STREAM`。您可以使用默认的 Amazon Redshift VARBYTE 数据类型存储主题中的记录，也可以定义架构来将数据转换为半结构化 `SUPER` 格式。当您查询实体化视图时，返回的记录是主题的时间点视图。

1. 在 Amazon Redshift 中，创建一个外部架构来映射到 Apacke Kafka 集群。该语法如下所示：

   ```
   CREATE EXTERNAL SCHEMA MySchema
   FROM KAFKA
   [ IAM_ROLE [ default | 'iam-role-arn' ] ]
   AUTHENTICATION [ none | iam | mtls ]
   {AUTHENTICATION_ARN 'acm-certificate-arn' |  SECRET_ARN 'asm-secret-arn'};
   ```

   在 `FROM` 子句中，`KAFKA` 表示架构映射来自 Apache Kafka 源的数据。

    `AUTHENTICATION` 表示用于流式摄取的身份验证类型。有三种类型：
   + **无** – 指定不需要身份验证。这相当于 MSK 上的未经身份验证访问。这与 Apache Kafka 中的 SSL 身份验证相对应。Confluent Cloud 不支持这种身份验证方法。
   + **iam** – 指定 IAM 身份验证。您只能在 Amazon MSK 中使用 IAM 身份验证。选择此选项时，请确保 IAM 角色具有 IAM 身份验证的权限。有关设置所需 IAM 策略的更多信息，请参阅 [设置从 Kafka 进行流式摄取](#materialized-view-streaming-ingestion-getting-started-MSK-setup)。
   + **mtls** – 指定双向传输层安全通过促进客户端和服务器之间的身份验证来提供安全通信。在这种情况下，客户端是 Redshift，服务器是 Apache Kafka。有关使用 mTLS 配置流式摄取的更多信息，请参阅 [使用 mTLS 对来自 Apache Kafka 源的 Redshift 流式摄取进行身份验证](materialized-view-streaming-ingestion-mtls.md)。

   请注意，流式摄取不支持使用用户名和密码的 Amazon MSK 身份验证。

   `AUTHENTICATION_ARN` 参数指定用于建立加密连接的 ACM 双向传输层安全（mTLS）证书的 ARN。

   `SECRET_ARN` 参数指定 Amazon Secrets Manager 密钥的 ARN，其中包含 Amazon Redshift 用于 mTLS 的证书。

   以下示例展示了如何在创建外部架构时为 Amazon MSK 集群设置代理 URI：

   **使用 IAM 身份验证：**

   ```
   CREATE EXTERNAL SCHEMA my_schema
   FROM KAFKA
   IAM_ROLE 'arn:aws:iam::012345678901:role/my_role'
   AUTHENTICATION IAM
   URI 'b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098,b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098'
   ```

   **不使用身份验证：**

   ```
   CREATE EXTERNAL SCHEMA my_schema
   FROM KAFKA 
   AUTHENTICATION none
   URI 'b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9092,b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9092'
   ```

   **使用 mTLS：**

   ```
   CREATE EXTERNAL SCHEMA my_schema
   FROM KAFKA
   IAM_ROLE 'arn:aws:iam::012345678901:role/my_role'
   AUTHENTICATION MTLS
   URI 'b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9094,b- 2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9094'
   {AUTHENTICATION_ARN 'acm-certificate-arn' |  SECRET_ARN 'asm-secret-arn'}
   ```

   有关创建外部架构的更多信息，请参阅 [CREATE EXTERNAL SCHEMA](https://docs.amazonaws.cn/redshift/latest/dg/r_CREATE_EXTERNAL_SCHEMA.html)。

1. 创建一个实体化视图以使用来自主题的数据。使用 SQL 命令，如以下示例。

   ```
   CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS
   SELECT *
   FROM MySchema."mytopic";
   ```

   Kafka 主题名称区分大小写，可以包含大写字母和小写字母。要从名称大写的主题摄取内容，可以在会话或数据库级别将配置 `enable_case_sensitive_identifier` 设置为 `true`。有关更多信息，请参阅[名称和标识符](https://docs.amazonaws.cn/redshift/latest/dg/r_names.html)和 [enable\$1case\$1sensitive\$1identifier](https://docs.amazonaws.cn/redshift/latest/dg/r_enable_case_sensitive_identifier.html)。

   要开启自动刷新，请使用 `AUTO REFRESH YES`。默认行为是手动刷新。

1. 元数据列包括以下内容：    
[\[See the AWS documentation website for more details\]](http://docs.amazonaws.cn/redshift/latest/dg/materialized-view-streaming-ingestion-getting-started-MSK.html)

   需要注意的是，如果实体化视图定义中的业务逻辑导致业务逻辑错误，在某些情况下可能会导致流式摄取中的摄取失败。这可能会导致您不得不删除实体化视图，然后重新创建。为避免这种情况，我们建议您尽可能简化业务逻辑，并在摄取数据后对数据运行额外的逻辑。

1. 刷新视图，这会调用 Amazon Redshift 从主题中读取数据并将数据加载到实体化视图中。

   ```
   REFRESH MATERIALIZED VIEW MyView;
   ```

1. 在实体化视图中查询数据。

   ```
   select * from MyView;
   ```

   当 `REFRESH` 运行时，直接从主题更新实体化视图。您创建映射到 Kafka 主题数据来源的实体化视图。在实体化视图定义中，您可以对数据执行筛选和聚合。流式摄取实体化视图（基本实体化视图）只能引用一个 Kafka 主题，但是您可以创建额外的实体化视图，以与基本实体化视图和其他实体化视图或表连接使用。

有关流式摄取限制的更多信息，请参阅 [流式摄取行为和数据类型](materialized-view-streaming-ingestion.md#materialized-view-streaming-ingestion-limitations)。