Why Publish().RefCount() doesn’t work well with Retry()

When you’re designing a library with a large number of methods, one of the hard problems is making sure that all of the methods work together in a consistent way. If done correctly, then you can make your library a joy to use. However, if done badly, then your users will spend a frustrating number of hours debugging what’s going on…

And that experience of spending a frustrating number of hours debugging what’s going on is exactly what happened to me recently when I was using Reactive Extensions.

It all started with a performance problem.

We quickly identified what was causing the performance problem, and so we added a performance optimisation — Publish().RefCount() — in order to fix it. And that’s when our code broke.

We were lucky — our tests caught the fact that the performance optimisation had broken our code — and so we set about trying to get the best of both worlds: working code that was also fast.

This blog post goes into the detail of precisely what happened and how we fixed it.

A failing test

Let’s look at a test that demonstrates this. The test below creates an observable, uses Retry() to add error handling, and then it asserts that the observable is equal to 0 followed by 1.

[Test]
public void Test([Values(false, true)] bool withPublishAndRefCount)
{
    IObservable<int> original = CreateObservable(withPublishAndRefCount);

    var withRetry = original.Retry(2);

    Assert.That(withRetry.ToList().Wait(), Is.EqualTo(new[] { 0, 1 }));
}

The test is executed twice. The difference between the two executions is the value of the parameter withPublishAndRefCount.

Creating the observable

The method below shows how that parameter is used. When it’s set to false, Publish().RefCount() isn’t used, and the test passes. However, when it’s set to true, CreateObservable wraps the observable using Publish().RefCount() and just like it did for us, this causes the test to fail.

private IObservable<int> CreateObservable(bool withPublishAndRefCount)
{
    var observable = Observable.Defer(ObservableFactory);
    if (withPublishAndRefCount)
    {
        return observable.Publish().RefCount();
    }
    else
    {
        return observable;
    }
}

Note that the return type of CreateObservable is IObservable<int>. At this point, it’s probably worth mentioning the Liskov Substitution Principle which essentially says that swapping one implementation of IObservable<int> for another shouldn’t cause the test to fail, but that’s precisely what’s happening! When we use Publish().RefCount() we get a different implementation of IObservable<int> and the test fails.

The observable factory

The method below shows the observable factory. The first subscription results in OnNext(0) followed by OnError(e). When Retry() sees this error, it will hide the error from everything downstream and re-subscribe. The second subscription results in OnNext(1) followed by OnCompleted()

private int nextSubscription = 0;
private IObservable<int> ObservableFactory()
{
    var thisSubscription = nextSubscription;
    nextSubscription++;
    switch (thisSubscription)
    {
        case 0:
            var e = new ApplicationException();
            return Observable.Return(thisSubscription)
                             .Concat(Observable.Throw<int>(e));
        default:
            return Observable.Return(thisSubscription);
    }
}

The root cause

So why does wrapping the observable using Publish().RefCount() cause the test to fail?

The test fails because Retry() isn’t able to recover from the error. When it sees the error and re-subscribes, it sees precisely the same error again, and again, and again… If we hadn’t specified a maximum number of times that it can retry, Retry() would spin forever (well, until the test times out)! Essentially the error has been cached by Publish().RefCount() so that subscriptions aren’t independent.

So what’s caching this error?

Well Publish().RefCount() is short-hand for Multicast(new Subject<T>()).RefCount()

And it is Subject<T> that caches the error.

The fix

Now that we know precisely what’s happening, we can work around it.

What we need to do is write a replacement for Subject<T> that keeps subscriptions independent, and then we simply replace all instances of Publish().RefCount() with Multicast(new IndependentSubscriptionsSubject<T>()).RefCount()

And here’s the code for IndependentSubscriptionsSubject<T>

public class IndependentSubscriptionsSubject<T> : ISubject<T>
{
    private ISubject<T> _innerSubject = new Subject<T>();

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _innerSubject.Subscribe(observer);
    }

    public void OnNext(T value)
    {
        _innerSubject.OnNext(value);
    }

    public void OnCompleted()
    {
        _innerSubject.OnCompleted();
    }

    public void OnError(Exception error)
    {
        var erroringInnerSubject = _innerSubject;
        _innerSubject = new Subject<T>();
        erroringInnerSubject.OnError(error);
    }
}

Conclusion

We’ve seen how Publish().RefCount() caches errors, which breaks Retry(), and we’ve also seen how to work around it by writing a replacement for Subject<T> that doesn’t cache errors.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>