How to prevent duplicate values in a shared queue

A producer thread queries a data store and puts objects into a queue. Each consumer thread(s) will then pull an object off of the shared queue and do a very long call to an external service. When the call returns, the consumer marks the object as having been completed.

My problem is that I basically have to wait until the queue is empty before the producer can add to it again, or else I risk getting duplicates being sent through.

[edit] Someone asked a good question over IRC and I figured I would add the answer here. The question was, “Why do your producers produce duplicates?” The answer is basically that the producer produces duplicates because we don’t track a “sending” state of each object, only “sent” or “unsent”.

Is there a way that I can check for duplicates in the queue?

Answer

It seems to me like it’s not really a problem to have duplicate objects in the queue; you just want to make sure you only do the processing once per object.

EDIT: I originally suggested using a set or OrderedDict to keep track of the objects, but Python has a perfect solution: functools.lru_cache

Use @functools.lru_cache as a decorator on your worker function, and it will manage a cache for you. You can set a maximum size, and it will not grow beyond that size. If you use an ordinary set and don’t manage it, it could grow to very large size and slow down your workers.

If you are using multiple worker processes instead of threads, you would need a solution that works across processes. Instead of a set or an lru_cache you could use a shared dict where the key is the unique ID value you use to detect duplicates, and the value is a timestamp for when the object went into the dict; then from time to time you could delete the really old entries in the dict. Here’s a StackOverflow answer about shared dict objects:

multiprocessing: How do I share a dict among multiple processes?

And the rest of my original answer follows:

If so, I suggest you have the consumer thread(s) use a set to keep track of objects that have been seen. If an object is not in the set, add it and process it; if it is in the set, ignore it as a duplicate.

If this will be a long-running system, instead of a set, use an OrderedDict to track seen objects. Then from time to time clean out the oldest entries in the OrderedDict.

Leave a Reply

Your email address will not be published. Required fields are marked *