Merge n sorted iterators

Posted on

Problem

Over the weekend I was curious about efficiently merging multiple sorted iterators together, and for them to be in sorted order.
This is quite like a challenge on HackerRank:

You’re given the pointer to the head nodes of two sorted linked lists. The data in both lists will be sorted in ascending order. Change the next pointers to obtain a single, merged linked list which also has data in ascending order. Either head pointer given may be null meaning that the corresponding list is empty.

I may have cheated a bit by printing, rather than returning the items. However I don’t mind that much about the HackerRank challenge.

I managed to do this in about O(3l)O(3l) space and O(l(2+n))O(l(2+n)) time. So O(l)O(l) and O(ln)O(ln). Where ll is the amount of lists, and nn is the amount of data.

import operator
import functools

def from_iterable(fn):
    @functools.wraps(fn)
    def inner(*args):
        return fn(args)
    inner.from_iterable = fn
    return inner


@from_iterable
def merge(sources):
    sources = {
        id_: iter(source)
        for id_, source in enumerate(sources)
    }
    values = {}
    for id_, source in list(sources.items()):
        try:
            values[id_] = next(source)
        except StopIteration:
            del sources[id_]

    by_value = operator.itemgetter(1)
    while sources:
        id_, minimum = min(values.items(), key=by_value)
        try:
            values[id_] = next(sources[id_])
        except StopIteration:
            del values[id_], sources[id_]
        yield minimum


def iter_nodes(node):
    while node is not None:
        yield node.data
        node = node.next


def MergeLists(headA, headB):
    vals = merge.from_iterable(iter_nodes(l) for l in (headA, headB))
    print(' '.join(str(i) for i in vals), end='')

A bit overkill for the HackerRank challenge. But that wasn’t the main reason for doing it.
Any and all reviews are welcome.

Solution

  1. There are no docstrings. What do these functions do? What arguments do they take? What do they return?

  2. Decorators are normally named after the effect they have on the decorated function. For example @functools.lru_cache adds an LRU cache to a function; @unittest.skip causes a test case to be skipped, and so on.

    The effect of @from_iterable is that it converts the arguments into a tuple and passes that tuple as a single argument to the original function. I would not have guessed this functionality from the name; in fact I would have guessed the reverse (that it would convert a single iterable argument to individual arguments).

  3. It was established in comments that the intention of this decorator is to transform a function into a pair of functions like chain and chain.from_iterable (where the original function becomes available under the latter name, and the new function under the former name).

    The trouble with this is that chain was a mistake. If we only had chain.from_iterable, then we would not need chain, because we could just make a tuple: we’d write chain.from_iterable((arg1, arg2, ...)) instead of chain(arg1, arg2, ...). This is a trivial change to calling convention (just two parentheses) that does not need another function to support it. (For example, it would be ridiculous to have sorted and sorted.from_iterable.)

    The Python developers were constrained by backwards compatibility to leave chain alone, and so the only thing they could do was add another function. I don’t think they intended it to be a pattern to be followed (it’s certainly not used anywhere else in the standard library). To this day, Python programmers looking for a “flatten” function are surprised to find that it already exists in the standard library under an obscure name.

    Note that the code in the post does not actually use merge, it only uses merge.from_iterable. So the obvious thing to do would be to write merge so that it takes an iterable, and avoid the need for the @from_iterable decorator and its associated confusion.

  4. Since headA and headB are not used independently of each other, you could change the calling convention for MergeLists so that it took an arbitrary number of linked lists, like this:

    def merge_linked_lists(*lists):
        "Merge multiple sorted linked lists."
        return merge(map(iter_nodes, lists))
    
  5. The expression:

    str(i) for i in vals
    

    can be written:

    map(str, vals)
    

    Or if you prefer the comprehension, then consider improving the name of the loop variable. The name i is conventionally used for an index, but here we are not iterating over indexes, but over values, so v or value would be clearer.

  6. The code in merge does not use the built-in function id, so there is no need to respell the variable as id_ in order to avoid shadowing the built-in.

  7. The function merge maintains two data structures. The dictionary source maps a source id to an iterator over the corresponding source, and the dictionary values maps a source id to the value at the front of the corresponding source. The code would be simplified if the two data structures were combined into one, like this:

    def merge(iterables):
        "Merge sorted iterables."
        entries = {} # Map from id to [front value, id, iterator].
        for id, it in enumerate(map(iter, iterables)):
            try:
                entries[id] = [next(it), id, it]
            except StopIteration:
                pass
        by_value = operator.itemgetter(1)
        while entries:
            id, entry = min(entries.items(), key=by_value)
            value, _, it = entry
            yield value
            try:
                entry[0] = next(it)
            except StopIteration:
                del entries[id]
    
  8. The runtime bottleneck is the call to min to find the entry with the smallest value. This takes time proportional to the number of remaining iterables, that is, O(l)O(l).

    We can improve this to O(logl)O(logl) if we keep the entries in a heap, using the heapq module:

    from heapq import heapify, heappop, heapreplace
    
    def merge(iterables):
        "Merge sorted iterables."
        entries = []          # Heap of [front value, id, iterator].
        for id, it in enumerate(map(iter, iterables)):
            try:
                entries.append([next(it), id, it])
            except StopIteration:
                pass
        heapify(entries)
        while entries:
            value, _, it = entry = entries[0]
            yield value
            try:
                entry[0] = next(it)
                heapreplace(entries, entry)
            except StopIteration:
                heappop(entries)
    

    This improves the overall runtime to O((n+l)logl)O((n+l)logl).

  9. This is already built into Python as heapq.merge. It’s worth taking a look at the source code — the code in §8 above is a simplified version of this.

Leave a Reply

Your email address will not be published. Required fields are marked *