Is there a way to loop an Observable and use the previous iteration to influence the next? Code Answer

I’m brand spakin’ new to rxjs, and would like to use it to build a video downloader. The intention is to run it 24/7 and automatically record an occasional livestream for later watching. Here is what I have so far.

import { BehaviorSubject, from, defer, of } from "rxjs";
import { delay, mergeMap, repeat, tap } from "rxjs/operators";

const downloader = url => {
  const defaultDelay = 1000;
  const maxDelay = 10000;
  const delayTime = new BehaviorSubject(defaultDelay);

  /*
   * Simulated download output.
   * 
   * @return {String|Number} potentialOutput 
   *         A {Number} 1 means "FAILURE, stream is offline."
   *         A {String} means "SUCCESS, video was downloaded."
   *         1 is the most likely value returned
   * 
   * greets https://stackoverflow.com/a/8877271/1004931
   */
  function randomWithProbability() {
    var potentialOutput = [1, 1, 1, 1, 1, "/tmp/video.mp4"];
    var idx = Math.floor(Math.random() * potentialOutput.length);
    return potentialOutput[idx];
  }

  /**
   * Simulated download. Returns a promise which resolves after 1 second.
   */
  const download = url => {
    let downloadP = new Promise((resolve, reject) => {
      setTimeout(() => {
        resolve(randomWithProbability());
      }, 1000);
    });
    return from(downloadP);
  };

  /**
   * Conditionally adjust the delay inbetween download attempts.
   *   - If the video downloaded successfuly, reset the timer to it's default.
   *     (in case the stream went down by error, we want to record again ASAP.)
   *   - If the video stream was offline, increase the delay until our next download attempt.
   *     (we don't want to be rude and flood the server)
   */
  const adjustTimer = (ytdlOutput) => {
    if (typeof ytdlOutput === 'string') {
      delayTime.next(defaultDelay); // video stream exited successfully, so reset in case the stream starts again
    } else {
      let adjustedTime = (delayTime.getValue() * 2 > maxDelay) ? maxDelay : delayTime.getValue() * 2;
      delayTime.next(adjustedTime); // video stream exited abnormally, likely due to being offline. wait longer until next attempt
    }
  };

  /**
   * The Observable.
   *   1. Start with the URL of the video stream
   *   2. delay by the time defined in delayTime
   *   3. download, merging the download observable with the parent observable.
   *   4. adjust the delayTime based on download output.
   *   5. repeat the process indefinitely.
   */
  const stream = of(url)
    .pipe(
      delay(delayTime.getValue()),
      mergeMap(download),
      tap(res => {
        adjustTimer(res);
      }),
      repeat()
    )
    
  stream.subscribe(val => {
    console.log(
      `download result:${val}, delayTime:${delayTime.getValue()}`
    );
  });
};

downloader("https://example.com/files/video.mp4");

(Stackblitz)

The problem I’m having is that the {BehaviorSubject} delayTime is not getting updated on every iteration of my loop. delayTime is getting updated, as indicated by delayTime.getValue() being called in the subscriber’s callback, but the changes aren’t having an effect in the memory(?) of the observable/subscriber(?).

Instead, I’m seeing that delayTime in the scope(?) of the observable is staying the same, as it was when it was first subscribed to. In the observable’s world, there is no update to the BehaviorSubject’s value, as I want there to be.

And this is where I’m stuck. How can I refactor my code to have a delay timer which changes over time, and effects the delay until the next download attempt?

Answer

Ignore rxjs for a moment, and look at this code pretending you don’t know what any of these functions mean:

  const stream = of(url)
    .pipe(
      delay(delayTime.getValue()),
      mergeMap(download),
      tap(res => {
        adjustTimer(res);
      }),
      repeat()
    )

An anonymized, simple version would be

someFunc(delayTime.getValue())

The problem here is that delayTime.getValue() gets evaluated directly, not when someFunc runs. The same is true for your code above: the evaluation happens when the stream variable is created, not on every “iteration” (better word: emission).


The delay operator works only with a fixed delay. For your purpose you want to use delayWhen, which is evaluated for each emission:

delayWhen(() => timer(delayTime.getValue())

Notice, however, that we need to return a notifier observable rather than the desired delay in ms.


As a final note, accessing getValue is a red flag for not using observables correctly. That’s also why we don’t actually use the arguments provided to the callback in delayWhen. Your code could do with refactoring to make it properly reactive, but that is beyond the scope here.

Related Posts

© No Copyrights, All Questions are retrived from public domain.
Tutorial Guruji