Nifi custom Kafka processor code works for a limited time

This is the code for my custom kafka processor that simply consumes from a kafka topic and produces some data

ConsumerRecords<byte[],byte[]> records = consumer.poll(1000);
records.forEach(record -> {
    FlowFile flowFile = session.create();
    if (flowFile == null) {
       return;
    }
    try {
       byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT : 
       genericData.toString(record.value()).getBytes(StandardCharsets.UTF_8);
       flowFile = session.write(flowFile, rawOut -> {
           rawOut.write(outputBytes);
           consumer.commitSync();
           });
    } catch (ProcessException pe) {
       getLogger().error("Failed to deserialize {}", new Object[]{flowFile, pe});
       session.transfer(flowFile, REL_FAILURE);
       return;
    }
    flowFile = session.putAttribute(flowFile, "topic", record.topic());
    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
    getLogger().info("flowFile id " + flowFile.getId());
    session.transfer(flowFile, REL_SUCCESS);
});

This code takes a batch of around 500 kakfa messages and produces some flowFile for output. What I need is obviously to put it inside a while loop that does the same thing over and over again. When I do that though, nothing gets out of the processor. While still, the info log shows the flowFile ids are incremented, and seems the actual flowFile is produced. One thing I tested is this happens only in infinite while loops. When I use a limited for loops the processor works fine. I am wondering there might be something about nifi flow internal that I am not aware of.

Answer

The problem is I wasn’t committing the session manually. So it got committed only when the method returned which never happened in the case of an infinite while loop. The contrived solution ended up being something like this.

while(true)
    ConsumerRecords<byte[],byte[]> records = consumer.poll(Duration.ofMillis(1000));
    records.forEach(record -> {
        FlowFile flowFile = session.create();
        if (flowFile == null) {
           return;
        }
        try {
           byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT : 
           genericData.toString(record.value()).getBytes(StandardCharsets.UTF_8);
           flowFile = session.write(flowFile, rawOut -> {
               rawOut.write(outputBytes);
               consumer.commitSync();
               });
        } catch (ProcessException pe) {
           getLogger().error("Failed to deserialize {}", new Object[]{flowFile, pe});
           session.transfer(flowFile, REL_FAILURE);
           return;
        }
        flowFile = session.putAttribute(flowFile, "topic", record.topic());
        flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
        getLogger().info("flowFile id " + flowFile.getId());
        session.transfer(flowFile, REL_SUCCESS);
        session.commit();
    });
}

Leave a Reply

Your email address will not be published. Required fields are marked *