使用 DataStream API 在 Managed Service for Apache Flink 中使用运算符转换数据 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink(Amazon MSF)之前称为 Amazon Kinesis Data Analytics for Apache Flink。

使用 DataStream API 在 Managed Service for Apache Flink 中使用运算符转换数据

要在中转换传入数据,您可以使用 Apache Flink 运算符。Apache Flink 运算符将一个或多个数据流转换为新的数据流。新数据流包含来自原始数据流的修改的数据。Apache Flink 提供超过 25 个预构建的流处理运算符。有关更多信息,请参阅 pache Flink 文档中的运算符

本主题包含下列部分:

使用转换运算符

以下是对 JSON 数据流的某个字段进行简单文本转换的示例。

该代码创建转换的数据流。新数据流具有与原始流相同的数据,并在 Company 字段内容后面附加“TICKER”字符串。

DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );

使用聚合操作符

以下是一个聚合运算符示例。该代码创建聚合的数据流。该运算符创建一个 5 秒的滚动窗口,并返回窗口中具有相同 TICKER 值的记录的 PRICE 值之和。

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });

有关更多代码示例,请参阅 创建和使用 Managed Service for Apache Flink 应用程序的示例