Python priority queue class wrapper

Posted on


I’m trying to incorporate the advice found in in order to make a priority queue implementation (see corresponding section) in a class priorityQ. Not going to reinvent the wheel so i use the the python’s heapq implementation

class priorityQ():
    import heapq
    import itertools

    def __init__(self,mylist):

        self._entry_finder = {}               # mapping of tasks to entries
        self._counter = itertools.count()     # unique sequence count
        self.REMOVED = '<removed-task>'      # placeholder for a removed task

        if mylist:
   = []
            for element in mylist:
                priority, count, task = element[0], next(self._counter), element[1]
                entry = [priority,count,task]
                self._entry_finder[task] = entry
   = []

    def add_task(self,task,priority):
        if task in self._entry_finder:
        count = next(self._counter)
        entry = [priority,count,task]
        self._entry_finder[task] = entry
        heapq.heappush(, entry)

    def remove_task(self,task):
        'Mark an existing task as REMOVED.  Raise KeyError if not found.'
        entry = self._entry_finder.pop(task)
        entry[-1] = self.REMOVED

    def pop_task(self):
            priority, count, task = heapq.heappop(
            if task is not self.REMOVED:
                del self._entry_finder[task]
                return task
        raise KeyError('pop from an empty priority queue')

The code as it is seems to solve the implementation challenges listed in 8.5.2 in the link I gave. I just wonder if this is a clean implementation of such a candidate class. Is it better to just implement with the procedural style suggested in the manual and incorporate it in whatever project i’m working on or is it a better practice to make use of a class like the above (or a more refined version of that).

PS: I know it’s slightly off-topic for this stack exchange site, but i wonder if there is a way around the side effect of having a bunch of deleted-entries residing in the queue after doing a set of priority updates in existing tasks. (Something different than copying/cleaning the entire queue every fixed period of time)


1. Review

  1. The code in the post does not work:

    >>> q = PriorityQueue(())
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "", line 8, in __init__
        self._counter = itertools.count()     # unique sequence count
    NameError: name 'itertools' is not defined

    The problem is that itertools was imported in class scope (not at top level) so you need self.itertools (or to move the imports to the top level).

  2. The class has no docstring. How am I supposed to use this class?

  3. The __init__, add_task and pop_task methods have no docstrings. What arguments do I pass? What do they return?

  4. The objects in the queue are described as “tasks” but priority queues can be used for all sorts of different things — for example, the A* search algorithm uses a priority queue containing unexpanded search nodes. I would use a more generic name like “item” or “element”, and change add_task to add, remove_task to remove and pop_task to pop.

  5. The Python style guide (PEP8) recommends that

    Class names should normally use the CapWords convention.

    So I would call the class PriorityQueue. You’re not obliged to follow this guide, but it will make it easier for you to collaborate with other Python programmers if you do.

  6. If a class has no superclasses, then you can omit the parentheses from the class declaration.

  7. The constructor has a required argument mylist. It would be better if this argument was optional, and if omitted then the constructed queue is empty. This would match the behaviour of other class constructors like queue.PriorityQueue() and collections.deque() which construct empty queues if given no argument.

  8. There is a test if mylist: but this is unnecessary because if mylist is an empty list, then both branches have the same behaviour (setting to the empty list). So you might as well omit the test.

  9. Once you omitted the test on mylist, it doesn’t have to be a list any more – any iterable would do. So I would name this argument iterable.

  10. It is perverse that the argument to __init__ must be a list of pairs (priority, task) but when calling add_task you have to supply the arguments (task, priority). This seems likely to be the cause of errors.

  11. The need for _counter is slightly tricky (it’s necessary to break ties in case the items cannot be compared for order), so a comment would be a good idea.

  12. self.REMOVED is always the same, so it could be a member of the class. Also, it is only needed for use internally by the class so PEP8 recommends using a name starting with an underscore.

  13. Similarly, ought to have a name starting with an underscore.

  14. Instead of using element[0] and element[1] here:

    for element in mylist:
        priority, count, task = element[0], next(self._counter), element[1]
        entry = [priority,count,task]

    use tuple unpacking:

    for priority, item in iterable:
        entry = [priority, next(self._counter), item]

    (But see the next item.)

  15. The code in __init__ duplicates the code in add_task. So it would be simpler if the former called the latter:

    for priority, item in iterable:
        self.add(item, priority)
  16. The docstring for remove_task is written from the implementer’s point of view: it says, “Mark an existing item as REMOVED”. But the user does not need to know how removal is implemented: that’s an internal detail of the class which the documented API should hide.

  17. In pop_task you unpack all the elements of the popped entry:

    priority, count, item = heapq.heappop(self._data)

    but priority and count are not used. It is conventional to use the variable name _ for values that go unused.

2. Revised code

import heapq
import itertools

class PriorityQueue:
    """Collection of items with priorities, such that items can be
    efficiently retrieved in order of their priority, and removed. The
    items must be hashable.

    _REMOVED = object()         # placeholder for a removed entry

    def __init__(self, iterable=()):
        """Construct a priority queue from the iterable, whose elements are
        pairs (item, priority) where the items are hashable and the
        priorities are orderable.

        self._entry_finder = {} # mapping of items to entries

        # Iterable generating unique sequence numbers that are used to
        # break ties in case the items are not orderable.
        self._counter = itertools.count()

        self._data = []
        for item, priority in iterable:
            self.add(item, priority)

    def add(self, item, priority):
        """Add item to the queue with the given priority. If item is already
        present in the queue then its priority is updated.

        if item in self._entry_finder:
        entry = [priority, next(self._counter), item]
        self._entry_finder[item] = entry
        heapq.heappush(self._data, entry)

    def remove(self, item):
        """Remove item from the queue. Raise KeyError if not found."""
        entry = self._entry_finder.pop(item)
        entry[-1] = self._REMOVED

    def pop(self):
        """Remove the item with the lowest priority from the queue and return
        it. Raise KeyError if the queue is empty.

        while self._data:
            _, _, item = heapq.heappop(self._data)
            if item is not self._REMOVED:
                del self._entry_finder[item]
                return item
        raise KeyError('pop from an empty priority queue')

Leave a Reply

Your email address will not be published. Required fields are marked *