In reactor there is a way of waiting for a specific number of subscribers and only then the publisher should start emitting.
I would like to implement a slightly different scenario:
- subscriber subscribes
- publisher awaits another subscription
- subscription comes in less than expected time t
- timer resets and publisher continue to wait for another subscriber
- no subscription in a given time t
- publisher starts to emmit
If the cycle of awaits lasts for too long (longer than time t2) than publisher starts emitting regardless.
How to do it?
Answer
There’s no baked-in operator to do that but you should be able to emulate that with a scheduled task and an AtomicInteger
, using the standard publish()
–connect()
cycle:
@Test public void deferredConnect() throws InterruptedException { ConnectableFlux<Integer> connectableFlux = Flux.range(1, 10) .publish(); AtomicInteger subCount = new AtomicInteger(); Flux<Integer> deferredConnect = connectableFlux .doOnSubscribe(sub -> { int current = subCount.incrementAndGet(); Schedulers.parallel().schedule(() -> { if (subCount.compareAndSet(current, -1)) { connectableFlux.connect(); } }, 1, TimeUnit.SECONDS); }); deferredConnect.subscribe(v -> System.out.println("1: " + v)); Thread.sleep(500); deferredConnect.subscribe(v -> System.out.println("2: " + v)); Thread.sleep(400); deferredConnect.subscribe(v -> System.out.println("3: " + v)); Thread.sleep(200); assertThat(subCount).hasNonNegativeValue(); Thread.sleep(800); assertThat(subCount).hasNegativeValue(); }