本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Apache Flink 在 Kinesis Data Analytics Data Analytics 中使用操作符以使用DataStreamAPI
要在 Apache Flink 使用 Kinesis Data Analytics Data Analytics for Apache Flink 应用程序,您可以使用 Apache Flink操作者. Apache Flink 操作符将一个或多个数据流转换为新的数据流。新数据流包含来自原始数据流的修改的数据。Apache Flink 提供超过 25 个预构建的流处理操作符。有关更多信息,请参阅 。运算符
转换操作符
以下是对 JSON 数据流的某个字段进行简单文本转换的示例。
该代码创建转换的数据流。新数据流具有与原始流相同的数据,并在 TICKER
字段内容后面附加“ Company
”字符串。
DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );
聚合操作符
以下是一个聚合操作符示例。该代码创建聚合的数据流。该操作符创建一个 5 秒的滚动窗口,并返回窗口中具有相同 TICKER
值的记录的 PRICE
值之和。
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });
有关使用操作符的完整代码示例,请参阅入门 (DataStreamAPI)。入门应用程序的源代码可在开始使用