

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 创建和使用 Managed Service for Apache Flink 应用程序的示例
<a name="examples-collapsibles"></a>

本节提供了在Managed Service for Apache Flink 中创建和使用应用程序的示例。它们包括示例代码和 step-by-step说明，可帮助您为 Apache Flink 应用程序创建托管服务并测试结果。

在分析这些示例之前，我们建议您先查看以下内容：
+ [工作原理](how-it-works.md)
+ [教程：开始使用 Apache Flink 托管服务中的 DataStream API](getting-started.md)

**注意**  
这些示例假设您使用的是美国东部（弗吉尼亚州北部）区域（us-east-1）。如果您使用不同的区域，请相应地更新应用程序代码、命令和 IAM 角色。

**Topics**
+ [Managed Service for Apache Flink 的 Java 示例](examples-new-java.md)
+ [Managed Service for Apache Flink 的 Python 示例](examples-new-python.md)
+ [Managed Service for Apache Flink 的 Scala 示例](examples-new-scala.md)

# Managed Service for Apache Flink 的 Java 示例
<a name="examples-new-java"></a>

以下示例演示如何创建以 Java 编写的应用程序。



**注意**  
大多数示例都设计为在本地、开发计算机和您选择的 IDE 上运行，以及在 Amazon Managed Service for Apache Flink 上运行。它们演示可用于传递应用程序参数的机制，以及如何正确设置依赖项，以便在不做任何更改的情况下在两个环境中运行应用程序。

## 提高序列化性能，定义自定义 TypeInfo
<a name="improving-serialization-performance-java"></a>

此示例说明了如何在记录或状态对象 TypeInfo 上定义自定义，以防止序列化回效率较低的 Kryo 序列化。例如，当您的对象包含 `List` 或 `Map` 时，这是必需的操作。有关更多信息，请参阅 Apache Flink 文档中的[数据类型和序列化](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#data-types--serialization)。该示例还展示如何测试对象的序列化是否回退到效率较低的 Kryo 序列化。

代码示例：[CustomTypeInfo](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/Serialization/CustomTypeInfo)

## 开始使用 DataStream API
<a name="getting-started-datastream-java"></a>

此示例显示一个简单的应用程序，该应用程序使用 `DataStream` API 从 Kinesis 数据流中读取数据并写入另一个 Kinesis 数据流。该示例演示如何使用正确的依赖项设置文件，构建 uber-JAR，然后解析配置参数，这样您就可以在本地、IDE 和 Amazon Managed Service for Apache Flink 上运行应用程序。

代码示例：[GettingStarted](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted)

## 开始使用 Table API 和 SQL
<a name="getting-started-table-java"></a>

此示例显示使用 `Table` API 和 SQL 的简单应用程序。其中演示如何在同一 Java 应用程序中将 `DataStream` API 与 `Table` API 或 SQL 集成。它还演示如何使用 `DataGen` 连接器在 Flink 应用程序内部生成随机测试数据，而无需使用外部数据生成器。

完整示例：[GettingStartedTable](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStartedTable)

## 使用 S3Sink (API) DataStream
<a name="s3-sink-java"></a>

此示例演示如何使用 `DataStream` API 的 `FileSink` 将 JSON 文件写入 S3 存储桶。

代码示例：[S3Sink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/S3Sink)

## 使用 Kinesis 来源、标准或 EFO 使用者以及接收器 (API) DataStream
<a name="kinesis-EFO-sink-java"></a>

此示例演示如何使用标准使用者或 EFO 配置从 Kinesis 数据流使用的源，以及如何为 Kinesis 数据流设置接收器。

代码示例：[KinesisConnectors](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)

## 使用亚马逊 Data Firehose 接收器 (API) DataStream
<a name="firehose-sink-java"></a>

此示例说明如何将数据发送到 Amazon Data Firehose（以前称为 Kinesis Data Firehose）。

代码示例：[KinesisFirehoseSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisFirehoseSink)

## 使用 Prometheus 接收器连接器
<a name="prometheus-sink-java"></a>

此示例演示如何使用 [Prometheus 接收器连接器](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/prometheus/)将时间序列数据写入 Prometheus。

代码示例：[PrometheusSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/PrometheusSink)

## 使用窗口聚合 (API) DataStream
<a name="windowing-aggregations-java"></a>

此示例演示 `DataStream` API 中四种类型的窗口聚合。

1. 基于处理时间的滑动窗口

1. 基于事件时间的滑动窗口

1. 基于处理时间的滚动窗口

1. 基于事件时间的滚动窗口

代码示例：[Windowing](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/Windowing) 

## 使用自定义指标
<a name="custom-metrics-java"></a>

此示例说明如何将自定义指标添加到您的 Flink 应用程序并将其发送到 CloudWatch 指标。

代码示例：[CustomMetrics](https://github.com/dzikosc/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics)

## 使用 Kafka 配置提供程序在运行时获取 mTLS 的自定义密钥库和信任存储库
<a name="kafka-keystore-mTLS"></a>

此示例说明如何使用 Kafka 配置提供程序设置自定义密钥库和信任存储库，其中包含用于 Kafka 连接器的 mTLS 身份验证的证书。此技术允许您从 Amazon S3 加载所需的自定义证书以及应用程序启动 Amazon Secrets Manager 时的密钥。

代码示例：[kafka-mtls-keystore-ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders)

## 使用 Kafka 配置提供程序在运行时获取用于 SASL/SCRAM 身份验证的密钥
<a name="kafka-secrets"></a>

此示例说明如何使用 Kafka 配置提供程序从 Amazon S3 获取证书 Amazon Secrets Manager 并从 Amazon S3 下载信任库，以便在 Kafka 连接器上设置 SASL/SCRAM 身份验证。此技术允许您从 Amazon S3 加载所需的自定义证书以及应用程序启动 Amazon Secrets Manager 时的密钥。

代码示例：[Kafka--SASL\$1SSL ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders)

## 使用 Kafka 配置提供程序在运行时通过 Table API/SQL 获取 mTLS 的自定义密钥库和信任存储库
<a name="kafka-custom-keystore"></a>

此示例说明如何使用 Table API /SQL 中的 Kafka 配置提供程序来设置自定义密钥库和信任存储库，其中包含用于 Kafka 连接器的 mTLS 身份验证的证书。此技术允许您从 Amazon S3 加载所需的自定义证书以及应用程序启动 Amazon Secrets Manager 时的密钥。

代码示例：[kafka-mtls-keystore-SQL-ConfigProviders](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders)

## 使用侧输出来拆分流
<a name="side-output"></a>

此示例说明如何利用 Apache Flink 中的[侧输出](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/)在指定属性上拆分流。当尝试在流应用程序中实现死信队列（DLQ）的概念时，这种模式特别有用。

代码示例：[SideOutputs](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/SideOutputs)

## 使用 Async 调 I/O 用外部端点
<a name="async-i-o"></a>

此示例说明如何使用 [Apache Flink 异步 I/O](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/asyncio/) 以非阻塞方式调用外部端点，并对可恢复的错误进行重试。

代码示例：[AsyncIO](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/AsyncIO)

# Managed Service for Apache Flink 的 Python 示例
<a name="examples-new-python"></a>

以下示例演示如何创建以 Python 编写的应用程序。

**注意**  
大多数示例都设计为在本地、开发计算机和您选择的 IDE 上运行，以及在 Amazon Managed Service for Apache Flink 上运行。它们演示可用于传递应用程序参数的简单机制，以及如何正确设置依赖项，以便在不做任何更改的情况下在两个环境中运行应用程序。

**项目依赖项**

大多数 PyFlink 示例都需要一个或多个依赖项作为 JAR 文件，例如 Flink 连接器。然后，在 Amazon Managed Service for Apache Flink 上部署时，必须将这些依赖项与应用程序打包在一起。

以下示例已经包含可让您在本地运行应用程序以进行开发和测试以及正确打包所需依赖项的工具。这个工具需要使用 Java JDK11 和 Apache Maven。有关具体说明，请参阅每个示例中包含的自述文件。

**示例**

## 开始使用 PyFlink
<a name="getting-started-pyflink"></a>

此示例演示了使用嵌入在 Python 代码中的 SQL 的 PyFlink 应用程序的基本结构。该项目还为任何包含 JAR 依赖关系（例如连接器）的 PyFlink 应用程序提供了一个框架。自述文件部分提供有关如何在本地运行 Python 应用程序进行开发的详细指导。该示例还展示了如何在您的 PyFlink应用程序中包含单个 JAR 依赖项，即本示例中的 Kinesis SQL 连接器。

代码示例：[GettingStarted](https://github.com/dzikosc/amazon-managed-service-for-apache-flink-examples/tree/main/python/GettingStarted)

## 添加 Python 依赖项
<a name="add-python-dependencies"></a>

此示例说明如何以最通用的方式将 Python 依赖项添加到您的 PyFlink 应用程序中。此方法适用于简单的依赖关系（例如 Boto3）或包含 C 库的复杂依赖项，例如。 PyArrow

代码示例：[PythonDependencies](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/PythonDependencies)

## 使用窗口聚合 (API) DataStream
<a name="windowing-aggregations-python"></a>

此示例演示采用 Python 应用程序中所嵌入 SQL 的四种类型窗口聚合。

1. 基于处理时间的滑动窗口

1. 基于事件时间的滑动窗口

1. 基于处理时间的滚动窗口

1. 基于事件时间的滚动窗口

代码示例：[Windowing](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/Windowing)

## 使用 S3 接收器
<a name="s3-sink-python"></a>

此示例说明如何使用 Python 应用程序中所嵌入 SQL 将输出作为 JSON 文件写入 Amazon S3。必须为 S3 接收器启用检查点功能，才能向 Amazon S3 写入和轮换文件。

代码示例：[S3Sink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/S3Sink)

## 使用用户定义的函数（UDF）
<a name="UDF-python"></a>

此示例演示如何定义用户定义函数，如何在 Python 中实现该函数，以及如何在 Python 应用程序中运行的 SQL 代码内使用该函数。

代码示例：[UDF](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/UDF)

## 使用 Amazon Data Firehose 接收器
<a name="Firehose-sink-python"></a>

此示例演示如何使用 SQL 将数据发送到 Amazon Data Firehose。

代码示例：[FirehoseSink](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/FirehoseSink)

# Managed Service for Apache Flink 的 Scala 示例
<a name="examples-new-scala"></a>

以下示例演示如何使用 Scala 和 Apache Flink 创建应用程序。



## 设置多步骤应用程序
<a name="getting-started-scala"></a>

此示例说明如何在 Scala 中设置 Flink 应用程序。其中演示如何配置 SBT 项目以包含依赖项并构建 uber-JAR。

代码示例：[GettingStarted](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/scala/GettingStarted)