I was following the tutorial of the apache Kafka website link.
The input topic is processed as stream and middle topics also generated but the final output topic is empty.
Below is the topology output:
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-KEY-SELECT-0000000002 (stores: []) --> KSTREAM-FILTER-0000000006 <-- KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FILTER-0000000006 (stores: []) --> KSTREAM-SINK-0000000005 <-- KSTREAM-KEY-SELECT-0000000002 Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition) <-- KSTREAM-FILTER-0000000006 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition]) --> KSTREAM-AGGREGATE-0000000004 Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003]) --> KTABLE-TOSTREAM-0000000008 <-- KSTREAM-SOURCE-0000000007 Processor: KTABLE-TOSTREAM-0000000008 (stores: []) --> KSTREAM-SINK-0000000009 <-- KSTREAM-AGGREGATE-0000000004 Sink: KSTREAM-SINK-0000000009 (topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000008
and the code:
final CountDownLatch latch = new CountDownLatch(10); new Thread(() -> { while(latch.getCount() > 0){ latch.countDown(); kafkaTopicService.sendMessage("This is a line with 7 words" ); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream( KafkaTopicService.TOPIC_NAME, Consumed.with(stringSerde, stringSerde) ); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+"))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count(); wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final Topology topology = builder.build(); System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology, props); try { streams.start(); latch.await(); } catch (Throwable e) { }
Answer
Actually, setting commit interval to 0 would send any change log data immediately to output topic:
props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, String.valueOf(0));