使用表格API连接器 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用表格API连接器

在 Apache Flink 编程模型中,连接器是应用程序用来从外部源(例如其他 Amazon 服务)读取或写入数据的组件。

使用 Apache Flink TableAPI,您可以使用以下类型的连接器:

  • 表格API来源:您可以使用表API源连接器通过API调TableEnvironment用或SQL查询在中创建表。

  • 桌上API水槽:您可以使用SQL命令将表数据写入外部源,例如亚马逊MSK主题或 Amazon S3 存储桶。

表格API来源

您可以从数据流创建表源。以下代码根据 Amazon MSK 主题创建表:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

有关表源的更多信息,请参阅 Apache Flink 文档中的表和SQL连接器

桌上API水槽

要将表数据写入接收器,请在接收器中创建接收器SQL,然后在StreamTableEnvironment对象上运行SQL基于接收器的接收器。

以下代码示例演示了如何将表数据写入 Amazon S3 接收器:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

您可以使用format参数来控制 Managed Service for Apache Flink 使用何种格式将输出写入接收器。有关格式的信息,请参阅 Apache Flink 文档中支持的连接器

用户定义的源和汇

您可以使用现有的 Apache Kafka 连接器向其他服务(例如亚马逊MSK和亚马逊 S3)发送数据或从其他 Amazon 服务发送数据。为了与其他数据源和目标进行交互,您可以定义自己的源和接收器。有关更多信息,请参阅 Apache Flink 文档中的用户定义源和接收器