Streaming on apache kafka topic has no output

I was following the tutorial of the apache Kafka website link.

The input topic is processed as stream and middle topics also generated but the final output topic is empty.

Below is the topology output:

Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
  --> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
  --> KSTREAM-KEY-SELECT-0000000002
  <-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
  --> KSTREAM-FILTER-0000000006
  <-- KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FILTER-0000000006 (stores: [])
  --> KSTREAM-SINK-0000000005
  <-- KSTREAM-KEY-SELECT-0000000002
Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)
  <-- KSTREAM-FILTER-0000000006

Sub-topology: 1
Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])
  --> KSTREAM-AGGREGATE-0000000004
Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003])
  --> KTABLE-TOSTREAM-0000000008
  <-- KSTREAM-SOURCE-0000000007
Processor: KTABLE-TOSTREAM-0000000008 (stores: [])
  --> KSTREAM-SINK-0000000009
  <-- KSTREAM-AGGREGATE-0000000004
Sink: KSTREAM-SINK-0000000009 (topic: streams-wordcount-output)
  <-- KTABLE-TOSTREAM-0000000008

and the code:

final CountDownLatch latch = new CountDownLatch(10);

    new Thread(() -> {
        while(latch.getCount() > 0){
            latch.countDown();
            kafkaTopicService.sendMessage("This is a line with 7 words" );
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }).start();
    final Serde<String> stringSerde = Serdes.String();
    final Serde<Long> longSerde = Serdes.Long();

    final StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> textLines = builder.stream(
            KafkaTopicService.TOPIC_NAME,
            Consumed.with(stringSerde, stringSerde)
    );

    KTable<String, Long> wordCounts = textLines
            // Split each text line, by whitespace, into words.
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))

            // Group the text words as message keys
            .groupBy((key, value) -> value)

            // Count the occurrences of each word (message key).
            .count();

    wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));


    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    final Topology topology = builder.build();
    System.out.println(topology.describe());
    final KafkaStreams streams = new KafkaStreams(topology, props);


    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
    }

Answer

Actually, setting commit interval to 0 would send any change log data immediately to output topic:

    props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, String.valueOf(0));