表 API 连接器 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

表 API 连接器

在 Apache Flink 编程模型中,连接器是应用程序用来从外部源(例如其他Amazon服务)读取或写入数据的组件。

使用 Apache Flink Table API,您可以使用以下类型的连接器:

  • 表 API 来源:您可以使用表 API 源连接器通过 API 调TableEnvironment用或 SQL 查询在中创建表。

  • 表 API 接收器:您可以使用 SQL 命令将表数据写入外部来源,例如 Amazon MSK 主题或 Amazon S3 存储桶。

表 API 来源

您可以从数据流创建表源。以下代码根据Amazon MSK 主题创建表:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

有关表源的更多信息,请参阅 Apache Flink 文档中的表和连接器

表 API 接收器

要将表数据写入接收器,可以在 SQL 中创建接收器,然后在对象上运行基于 SQL 的StreamTableEnvironment接收器。

以下代码示例演示了如何将表数据写入 Amazon S3 接收器:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

您可以使用format参数来控制 Managed Service for Apache Flink 使用何种格式将输出写入接收器。有关格式的信息,请参阅 Apache Flink 文档中的格式

有关表源的更多信息,请参阅 Apache Flink 文档 中的表和连接器

用户定义的源和接收器

您可以使用现有的 Apache Kafka 连接器向其他Amazon服务(例如Amazon MSK 和Amazon S3)发送数据。为了与其他数据源和目标进行交互,您可以定义自己的源和接收器。有关更多信息,请参阅 Apache Flink 文档中的用户定义源和接收器