Wait given (reset-able) time for subscribers to subscribe

In reactor there is a way of waiting for a specific number of subscribers and only then the publisher should start emitting.

I would like to implement a slightly different scenario:

  • subscriber subscribes
  • publisher awaits another subscription
  • subscription comes in less than expected time t
  • timer resets and publisher continue to wait for another subscriber
  • no subscription in a given time t
  • publisher starts to emmit

If the cycle of awaits lasts for too long (longer than time t2) than publisher starts emitting regardless.

How to do it?

Answer

There’s no baked-in operator to do that but you should be able to emulate that with a scheduled task and an AtomicInteger, using the standard publish()connect() cycle:

@Test
public void deferredConnect() throws InterruptedException {
    ConnectableFlux<Integer> connectableFlux = Flux.range(1, 10)
        .publish();
    AtomicInteger subCount = new AtomicInteger();

    Flux<Integer> deferredConnect = connectableFlux
        .doOnSubscribe(sub -> {
            int current = subCount.incrementAndGet();
            Schedulers.parallel().schedule(() -> {
                if (subCount.compareAndSet(current, -1)) {
                    connectableFlux.connect();
                }
            }, 1, TimeUnit.SECONDS);
        });

    deferredConnect.subscribe(v -> System.out.println("1: " + v));
    Thread.sleep(500);
    deferredConnect.subscribe(v -> System.out.println("2: " + v));
    Thread.sleep(400);
    deferredConnect.subscribe(v -> System.out.println("3: " + v));
    Thread.sleep(200);
    assertThat(subCount).hasNonNegativeValue();
    Thread.sleep(800);
    assertThat(subCount).hasNegativeValue();
}

Leave a Reply

Your email address will not be published. Required fields are marked *