Kafka Streams 与 MSK 快速代理和 MSK Serverless 结合使用
Kafka Streams 支持无状态和有状态转换。有状态转换(例如计数、聚合或联接)使用运算符,将其状态存储在内部 Kafka 主题中。此外,一些无状态转换(例如 GroupBy 或重分区)则将其结果存储在内部 Kafka 主题中。默认情况下,Kafka Streams 根据相应的运算符对这些内部主题进行命名。如果这些主题不存在,Kafka Streams 则创建内部 Kafka 主题。为创建内部主题,Kafka Streams 对 segment.bytes 配置进行硬编码,并将其设置为 50 MB。使用快速代理和 MSK Serverless 配置的 MSK 可保护某些主题配置,包括主题创建期间的 segment.size。因此,采用有状态转换的 Kafka Streams 应用程序无法使用 MSK 快速代理或 MSK Serverless 创建内部主题。
要在 MSK 快速代理或 MSK Serverless 上运行这样的 Kafka Streams 应用程序,您必须自行创建内部主题。为此,需先识别需要主题的 Kafka Streams 运算符并为其命名。然后,创建相应的内部 Kafka 主题。
注意
-
最好在 Kafka Streams 中手动命名运算符,尤其是依赖于内部主题的运算符。有关命名运算符的信息,请参阅 Kafka Streams 文档中的 Kafka Streams DSL 应用程序中的命名运算符
。 -
有状态转换的内部主题名称取决于 Kafka Streams 应用程序的
application.id以及有状态运算符application.id-statefuloperator_name的名称。
使用 MSK 快速代理或 MSK Serverless 创建 Kafka Streams 应用程序
如果 Kafka Streams 应用程序将其 application.id 设置为 msk-streams-processing,则可以使用 MSK 快速代理或 MSK Serverless 创建 Kafka Streams 应用程序。为此,请使用 count() 运算符,它需要一个具有该名称的内部主题。例如 msk-streams-processing-count-store。
要创建 Kafka Streams 应用程序,请执行以下操作:
识别并命名运算符
-
使用 Kafka Streams 文档中的有状态转换
来识别有状态处理器。 有状态处理器的一些示例包括
count、aggregate或join。 -
确定为重新分区创建主题的处理器。
以下示例包含一项需要状态的
count()运算。var stream = paragraphStream .groupByKey() .count() .toStream(); -
要命名主题,请为每个有状态处理器添加名称。根据处理器类型,由不同的命名类完成命名。例如,
count()运算是一种聚合操作。因此,它需要Materialized类。有关进行有状态操作的命名类的信息,请参阅 Kafka Streams 文档中的结论
。 在以下示例中,使用
Materialized类将count()运算符的名称设置为count-store。var stream = paragraphStream .groupByKey() .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())) .toStream();
创建内部主题
Kafka Streams 将 application.id 作为内部主题名称的前缀,其中 application.id 由用户定义。例如 application.id-internal_topic_name。内部主题是普通的 Kafka 主题,您可以使用 Kafka API 中的 创建 Apache Kafka 主题 或 AdminClient 提供的信息创建主题。
根据使用案例,可使用 Kafka Streams 的默认清理和保留策略,也可以自定义其值。可在 cleanup.policy 和 retention.ms 中定义这些值。
在以下示例中,使用 AdminClient API 创建主题并将 application.id 设置为 msk-streams-processing。
try (AdminClient client = AdminClient.create(configs.kafkaProps())) { Collection<NewTopic> topics = new HashSet<>(); topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3)); client.createTopics(topics); }
在集群上创建主题后,Kafka Streams 应用程序可使用 msk-streams-processing-count-store 主题进行 count() 运算。
(可选)检查主题名称
可使用拓扑描述器来描述流的拓扑结构,并查看内部主题的名称。以下示例演示如何运行拓扑描述器。
final StreamsBuilder builder = new StreamsBuilder(); Topology topology = builder.build(); System.out.println(topology.describe());
以下输出显示了上述示例的流拓扑结构。
Topology Description: Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) --> KSTREAM-AGGREGATE-0000000001 Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store]) --> KTABLE-TOSTREAM-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KTABLE-TOSTREAM-0000000002 (stores: []) --> KSTREAM-SINK-0000000003 <-- KSTREAM-AGGREGATE-0000000001 Sink: KSTREAM-SINK-0000000003 (topic: output_topic) <-- KTABLE-TOSTREAM-0000000002
有关如何使用拓扑描述器的信息,请参阅 Kafka Streams 文档中的 Kafka Streams DSL 应用程序中的命名运算符
命名运算符示例
本部分提供了命名运算符的一些示例。
GroupByKey() 的命名运算符示例
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
常规 count() 命名运算符的示例
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()))
窗口式 count() 的命名运算符示例
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()))
窗口式 suppressed() 的命名运算符示例
windowed suppressed() -> Suppressed<Windowed> suppressed = Suppressed .untilWindowCloses(Suppressed.BufferConfig.unbounded()) .withName("kafka-suppressed"); .suppress(suppressed)