本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
表 API 连接器
在 Apache Flink 编程模型中,连接器是应用程序用于从外部源读取或写入数据的组件,例如其他Amazon服务。
使用 Apache Flink 表 API,您可以使用以下类型的连接器:
表 API 来源
您可以从数据流创建表源。以下代码根据亚马逊 MSK 主题创建一个表:
//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);
有关表源的更多信息,请参阅。表和连接器
表 API 接收器
要将表数据写入汇,请在 SQL 中创建接收器,然后在StreamTableEnvironment
对象。
以下代码示例演示了如何将表数据写入 Amazon S3 接收器:
final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");
您可以使用format
参数来控制 Kinesis Data Analytics 用什么格式将输出写入接收器。有关格式的信息,请参阅。格式
有关表接收器的更多信息,请参阅。表和连接器
用户定义的源和汇
你可以使用现有的 Apache Kafka 连接器向其他人发送数据或从其他人之间发送Amazon服务,例如 Amazon MSK 和 Amazon S3。要与其他数据源和目标进行交互,您可以定义自己的源和汇。有关更多信息,请参阅 。用户定义的源和汇