Cold streams and publishers, is the Garbage Collector fully aware of its true death?

    final Consumer<T> localConsumer = this::accept;
    insatnceSubscriptorManager.baseSubscribe(
            isActive -> {
                if (isActive) publisher.add(localConsumer);
                else publisher.remove(localConsumer);
            }
    );

When instanceSubscriptorManager is unable to receive any more boolean values, the last thing to be performed will be a false as isActive, removing localConsumer from publisher.

How is localConsumer “aware” that instanceSubscriptorManager will be unable to receive an isActive -> true (again) sometime in the future, especially since the class is self-referencing itself?

In short, this class is hold by both ends, the this::accept hold by the publisher which in turn is hold by this.insatnceSubscriptorManager receiving trues or falses.

This tells me that the JVM must create an index of references that is not looking at things from an OOP perspective, instead it treats all variables, local or instanced as independent nodes.

If the “thing” that delivers instanceSubscriptorManager trues or falses gets GC’d, is it reason enough for the Garbage Collector to determine that the references captured by the baseSubscribe interface should no longer be captured, freeing the publisher in the process?

OR should we still need to execute an unsubscribe() manually?

I wonder if there’s a limit to the “depth” of the collected branch, How many subscribers/Consumers could be linked to one another until the GC can no longer detach them all if a node get’s detached?

Answer

No, as you wrote it? publisher is stick and never gets GCed.

The cheap fix is to ‘just call unsubscribe in a timely fashion’, but that will probably sound about as useful to you as telling a leg amputee to just ‘grow the leg back’.

You can have a system that will ‘auto unsubscribe’ an event listener at the appropriate time based on GC eligibility of objects.

Why can publisher not be GCed?

If you’re already clear on this and just needed it confirmed, scroll to the next section.

The publisher variable is cloned. That’s why any variables you use in lambdas are required to either be declared final or ‘effectively final’ (meaning: If you shove final on it, it wouldn’t be a problem, in which the compiler will do that for you). Just making copies of things that change would be highly confusing.

Thus, The lambda here (everything from isActive -> to the end of the brace) is, for GC purposes, encapsulated in an object, and a ref to whatever publisher is pointing at is part of this object. As in, it has a field, with that value.

This object is then handed to the baseSubscribe method.

Assuming that:

  • instanceSubscriptorManager is pointing at an object that is not eligible for GC.
  • You can ‘get’ from that object (via its fields, whether code exists anywhere in the runtime that actually does this is irrelevant) to the handler, and thus from there to publisher.

Then, voila – GCing of whatever publisher is pointing at will now never happen. It’s ‘stuck’, due to being perenially reachable from instanceSubscriptorManager which we just axiomatically stated isn’t going away. (However, if publisher is the only way to get to instanceSubscriptorManager, and instanceSubscriptorManager is the only way to get to publisher, there’s no problem here – java does not suffer from the inability to clean up cyclic orphan chains, unlike, say, Apple’s refcount GC impl for ObjC and, I think, swift).

Smart unsubscribe sounds awesome. What’s that about?

This (management of fleeting event handlers) is an intractably hard problem that just about every event system out there ‘solves’ by offering you a subscribe method, an unsubscribe method, and a sincere wish for good fortune and luck. You figure out how to manage it with those 2 primitives, that’s all you get.

Which, for many application domains that want to employ fleeting handlers (‘fleeting’ as in: You’d like for them to be eligible for GC, and you make an infinite amount of them if the app runs forever) – sucks.

One obvious reason that the unsubscribe part feels so cruddy is that most app domains simply do not have fleeting handlers. The amount of times .subscribe() is invoked for the lifetime of an app is a constant value, and the handler objects you pass to the event source are intended to live forever.

But when that is not what you want, as you appear to run into here (i.e., you want fleeting handlers), it’s vastly more convenient to use the tools in the java.lang.ref package: WeakReferences and reference queues.

See, you can subscribe to java’s own GC system! You can turn an object into a soft or weak ref (the difference? The javadoc will explain, it gets a little complicated. You probably want Weak), and then ask the GC system if the object it would be pointing at is GCed or not, and even ask to be notified once it is.

This gets to a different paradigm for subscribe/unsubscribe. Instead of:

  1. Subscribe to receive event notifications (and in passing, lock that object in as not GC-able, as it is reachable by the event source now), and
  2. When you no longer want that, call unsubscribe and this will also relinquish the reachability from the eventsource to the object; it will no longer impede GC

you can do a much better job. Simply do:

  1. Subscribe to receive event notifications, but also pass along a ‘watch’. The event source holds a weak reference to the watch (See below) object and will automatically unsubscribe the event handler once the watch object is GCed. Because it knows about the watch via the weakref indirection, the event source is not impeding GC eligibility of the watch. It’s up to the code that wants to subscribe to events to ensure that the watch cannot be GCed until you want to stop receiving notifications. This system is also nebulous as to when you stop receiving events: Java does not instantly mark an unreachable object as eligible for GC, nor can it fire the refqueues to inform anything keeping track of the GC-ability of the watch in an instant fashion. It’ll get around to it.
  2. … there is no step 2. Unsubscribing just happens, just when you’d want it to.

In particular, you have the concept of a java.lang.ref.WeakReference, which lets you apply a layer of indirection and disconnect the referencability chain:

class SomethingLongLived {
   private final Object field;

   void code() {
     field = new Object();
   }
}

In the above, that object we make can’t be GCed until the instance of SomethingLongLived goes away, which for the purposes of these examples, it won’t. But now it can be:

class SomethingLongLived {
   private final WeakReference<Object> field;

   void code() {
     field = new WeakReference<Object>(new Object());
   }
}

The WeakReference object itself cannot be collected of course, but the general idea is, a WeakRef object is itself tiny, and presumably the object you’re interested in is either huge, or you are interested in it solely to keep track of whether it is eligible for GC or not. You can ask a WeakReference object for the ‘referrant’, but that will return null if the referrant is already GCed or in the final phases (one of the last things that happens to an object before the memory it occupied is truly marked as ‘free’, is that any WeakReference links to it are nulled out. After that, there isn’t any way to get at that object, period).

Your event firing code can be as simple as checking the associated weakref first. If it is null, do NOT send the event to the associated eventhandler, and remove the weakref+eventhandler pair from the list of handlers.

Note that it can still be complicated: If the eventhandler itself has a way to reach the watch object, then they never ever go away: The event source can reach the handler, the handler can reach the watch: Nothing ever gets GCed. Watch management is therefore a bit tricky sometimes.

You can, if you want, simplify this system by making the event handler itself the watch – the event source doesn’t prevent GC of any of the handlers subscribed to it; that’d be the responsibility of callers. But in practice this doesn’t work out nicely, and you’re at significant risk of creating a chain that then prevents all GCs. Much better to have trivial watch objects (literally trivial: new Object() – its only purpose is for the event source to observe whether it’s been GCed), and find a place to store a ref to this thing, so that ‘if that place gets GCed – the watch goes with it, and the event handler is unsubscribed automatically as well’.

For what its worth, I am not aware of any event source system that works this way, which is slightly odd, as it seems much nicer than an unsubscribe method.

A basic implementation:

public class WindowSystem {
  // This class represents an 'event source'.
  // you can subscribe to these.

  public interface MouseMovedHandler {
    void mouseAt(int x, int y);
  }

  @Value
  private static class HandlerContainer {
    WeakReference<Object> watch;
    MouseMovedHandler handler;
  }

  private final List<HandlerContainer> handlers =
    new ArrayList<HandlerContainer>();

  // NB: WARNING - this code is not threadsafe.
  // it needs to be - exercise left to the reader.
  public void subscribe(@NonNull Object watch, @NonNull MouseMovedHandler handler) {
    handlers.add(new HandlerContainer(new WeakReference<>(watch), handler));
  }

  void mainLoop() {
    // This code is called a lot by some separate thread managing
    // the window

    // ... the mouse is moved...
    fireMouseMoveEvent(x, y);
  }

  private void fireMouseMoveEvent(int x, int y) {
    var it = handlers.iterator();
    while (it.hasNext()) {
      var c = it.next();
      if (c.getWatch().get() == null) {
        it.remove();
      } else {
        c.getHandler().mouseAt(x, y);
      }
    }
  }
}

There’s a lot more to it: Clearing stuff from the middle of enormous arraylists isn’t efficient, sometimes you want separate threads for firing events, obviously the above is going to explode because you’re modifying a single object from 2 separate threads (window loop, and whatever code is calling .subscribe), so it needs to use a relevant list type from java.util.concurrent (also check out WeakHashMap which is relevant to this kind of thing), or have some synchronized or j.u.c.Lock based code added, etc.

But, hopefully the idea is clear: That’s how you ‘auto unsubscribe’ things based on the GC system.