Loops in RxJS Observable don’t stop after calling complete or error

When calling observer.complete() or observer.error() an observer stops sending data and is considered done. However, If there is a for loop in the observer, the loop will continue running even after observer.complete() is called. Can someone explain to me this behaviour? I expect that the loop would be cut short. The current behaviour means that an interval or a while loop will forever run in an Observable unless I unsubscribe in the code.

In the below snippet I added a console.log to see if the log will be called after observer.complete().

    const testObservable = new Observable( observer => {
    for (let count = 0; count < 11; count++){
      observer.next(count);
      if (count > 5) {
        observer.complete()
        console.log("test")
      }
      if (count > 7) {
        observer.error(Error("This is an error"))
      }
    }}
  );

let firstObservable = testObservable.subscribe(
    next => {console.log(next)},
    error => { alert(error.message)},
    () => {console.log("complete")}
  )

Answer

It is a responsibility of the create function to properly destroy resources. In your case there could be simply return statement after observer.complete().

But in general if needed, the create function should return the teardown function (TeardownLogic), that is called when the observable is finalised.

new Observable( observer => {
  // observable logic, e.g. ajax request
  return () => {
    // teardown function, e.g. cancel ajax request
  };
});

Observable just ignore future calls of next(...), complete() and error(...) once the observable is finalized. It means that complete() or error(...) function was called internally or the observable was unsubscribed externally.

new Observable( observer => {
  observer.next('a');
  observer.complete();
  // next calls will be ignored
  observer.next('b');
});
const observable = new Observable( observer => {
  setTimeout(
    () => observer.next('a'), // this will be ignored due to unsubscribe
    10
  );
});
const subscription = observable.subscribe(...);
subscription.unsubscribe();