The question is published on by Tutorial Guruji team.
I’m new to Kafka and trying to implement a simple producer, sending data to a topic. If the topic doesn’t exist, I want to handle the sutiation as an exception.
private Producer<UUID, Object> producer = createProducer(); private static Producer createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "mybootstrapserveraddress"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "ADAPTER"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UUIDSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); return new KafkaProducer<>(props); } public void send(Event event, String topic){ try { UUID key = UUID.randomUUID(); producer.send(new ProducerRecord<>(topic, key , event), (rm, ex) -> { if (ex != null) { log.warn("Error sending message with key {}n{}", new Object[]{key, ex.getMessage()}); } else { log.info( "Partition for key-value {} is {}", new Object[]{key, rm.partition()}); } }); } catch (Exception e) { log.error("Failed to send message ",e); } finally { producer.flush(); } }
However, if the topic doesn’t exist, the message continues to be polled. Timeouts and retries from ProducerConfig are ignored.
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 6 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION} [kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 7 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION} [kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 8 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
I do not want to check that the topic exist through AdminClient. Kafka document https://kafka.apache.org/documentation/#producerconfigs was of no help.
Is there a way to fix the issue?
Answer
When the topic doesn’t exist the retry for getting metadata should ends after 60 seconds by default raising a timeout exception at the end.
The producer config parameter related to that is max.block.ms
(default 60000).
As far as I know, there is no way to have feedback earlier than reducing this timeout or using the AdminClient (which is something you don’t want to do).