How do I spin up an embedded kafka cluster during my test?

To test how our spring boot application handles it when the kafka cluster is not yet up, I would like to spin up an embedded kafka cluster in a junit test some time after the application starts up. How could I approach this? As I understand it spring-kafka-test’s @EmbeddedKafka starts up the cluster before creating the application context of a SpringBootTest. Is there any way to configure that timing?

Answer

When defined as a bean (via @EmbeddedKafka) or as a JUnit condition (again via @EmbeddedKafka – when there is no test Spring ApplicationContext), the broker is started in afterPropertiesSet().

You should be able to create the broker manually and call afterPropertiesSet() whenever you are ready.

Here is the code from the JUnit5 EmbeddedkafkaCondition:

@SuppressWarnings("unchecked")
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
    EmbeddedKafkaBroker broker;
    int[] ports = setupPorts(embedded);
    broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(),
                    embedded.partitions(), embedded.topics())
            .zkPort(embedded.zookeeperPort())
            .kafkaPorts(ports)
            .zkConnectionTimeout(embedded.zkConnectionTimeout())
            .zkSessionTimeout(embedded.zkSessionTimeout());
    Properties properties = new Properties();

    for (String pair : embedded.brokerProperties()) {
        if (!StringUtils.hasText(pair)) {
            continue;
        }
        try {
            properties.load(new StringReader(pair));
        }
        catch (Exception ex) {
            throw new IllegalStateException("Failed to load broker property from [" + pair + "]",
                    ex);
        }
    }
    if (StringUtils.hasText(embedded.brokerPropertiesLocation())) {
        Resource propertiesResource = new PathMatchingResourcePatternResolver()
                .getResource(embedded.brokerPropertiesLocation());
        if (!propertiesResource.exists()) {
            throw new IllegalStateException(
                    "Failed to load broker properties from [" + propertiesResource
                            + "]: resource does not exist.");
        }
        try (InputStream in = propertiesResource.getInputStream()) {
            Properties p = new Properties();
            p.load(in);
            p.forEach(properties::putIfAbsent);
        }
        catch (IOException ex) {
            throw new IllegalStateException(
                    "Failed to load broker properties from [" + propertiesResource + "]", ex);
        }
    }
    broker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
    if (StringUtils.hasText(embedded.bootstrapServersProperty())) {
        broker.brokerListProperty(embedded.bootstrapServersProperty());
    }
    broker.afterPropertiesSet();
    return broker;
}