Ways to replace Rx.NET Subject which representing state

I currently working on fixing bug in the following method which polls stateChecker condition while it null until it becomes true (or false due to timeout):

private static void WaitWithSubject(
   Func<bool> stateChecker,
   TimeSpan timeout,
   TimeSpan stepTime, 
   string errorMessage,
   ILifetimeInfo lifetimeInfo)
{
   (bool? IsOk, string Message) state = (IsOk: null, Message: string.Empty);
   var waitCancellation = (int)stepTime.TotalMilliseconds;
   using (var stateSubject = new Subject<(bool? IsOk, string Message)>())
   {
      using (Observable.Timer(timeout).Subscribe(it => stateSubject.OnNext((IsOk: false, Message: errorMessage))))
      using (Observable.Timer(TimeSpan.Zero, stepTime).
         Subscribe(it =>
         {
            if (stateChecker())
               stateSubject.OnNext((IsOk: true, Message: string.Empty));
         }))
      {
         using (stateSubject.Subscribe(it => state = it))
         {
            while (state.IsOk == null)
               lifetimeInfo.Canceler.ThrowIfCancellationRequested(waitCancellation);
            if (state.IsOk != true)
               throw new TimeoutException(state.Message);
            stateSubject.OnCompleted();
         }
      }
   }
}

This method occasionally generates ObjectDisposedException at following point in code on executing method OnNext :

if ( stateChecker() )
    stateSubject.OnNext( ( IsOk: true, Message: string.Empty ) );

Is there a way of totally avoid using Subject in this case in favor of something like Observable.Interval or Observable.Create?

Answer

It seems to me that this is what you’re trying to do:

private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo) =>
    Observable
        .Amb(
            Observable
                .Timer(timeout)
                .SelectMany(_ => Observable.Throw<Unit>(new TimeoutException(errorMessage))),
            Observable
                .Timer(TimeSpan.Zero, stepTime)
                .Where(_ => stateChecker())
                .Select(_ => Unit.Default))
        .Take(1)
        .Wait();

The key here is the Amb operator which starts the two sequences and only returns values from the first one to produce a value or an error. The Take(1) ensures that the observable finishes as soon as a value is produced.

You can throw the following line in just before the Wait() to cancel if you have a CancellationToken:

.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => o.OnNext(Unit.Default))))

After a bit of back and forth with Theodor, I’ve come up with this version that I think is possibility the cleanest I can think of:

private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo)
{
    var good =
        Observable
            .Timer(TimeSpan.Zero, stepTime)
            .Where(_ => stateChecker())
            .Take(1);

    var fail =
        Observable
            .Timer(timeout)
            .SelectMany(_ => Observable.Throw<long>(new TimeoutException(errorMessage)));
    
    good.Merge(fail).RunAsync(lifetimeInfo.Canceler).Wait();
}

Leave a Reply

Your email address will not be published. Required fields are marked *