Fuzzy list sorting with a heuristic approach to generate a schedule for COVID-19 tests in a hospital

Posted on

Problem

I recently came across a problem scheduling Coronavirus testing at a hospital; the testing capacity needed to be allocated to:

  • High risk wards (combining many factors).
  • Ones which hadn’t been tested recently.

This presents a really tricky problem when scheduling, because as well as complexity in combining many properties of the ward to understand its risk factor, there is a knock-on effect where the position of a ward in the schedule dictates its probability of coming up again soon.

Coming back into the programming realm, I wanted to do some kind of weighted average of different factors to compare wards for “priority”, and overload __gt__ to allow for comparison using the native comparison operators. The problem is, I can’t directly compare priority of two wards to sort the list and create a schedule; ward A and ward B may have exactly the same properties – size, risk factor etc. but if ward B was tested more recently then it has a lower priority.

What I understood was that I can’t compare wards, but I can compare different schedules. That is, I can compare timelines to see which is more optimal, and then try and sort a random list in a way that guides it towards a more optimal sorting. That’s what I mean by “sorting with a heuristic approach” – I use a cost function evaluated over the whole list and optimise this. I hope that’s reasonably clear.

How can I sort a list using a cost function? I have this base class:

from __future__ import annotations
import numpy as np
from typing import Sequence, Callable, Tuple, Optional
import pprint
import string
class SequenceItemBase:
    """Class that wraps a value and the list that contains it
    and overrides normal value comparison with a heuristic for guiding swaps in the list
    """

    def __init__(
        self,
        parent: Sequence[SequenceItemBase],
        heuristic: Callable[[Sequence[SequenceItemBase], Tuple[int, int]]],
    ):
        self.parent = parent
        self._heuristic = heuristic
    def __gt__(self, other):
        "An item should be placed higher in the list if doing so would increase the value of the heuristic"
        # store a copy of the current list state so we can "imagine" what effect
        # swapping self and other would have on the heuristic
        after_change = self.parent.copy()
        self_index = self.parent.index(self)
        other_index = self.parent.index(other)

        swap_indecies = sorted((self_index, other_index))

        after_change[self_index], after_change[other_index] = after_change[other_index], after_change[self_index]

        # whether the swap improved our heuristic
        positive_delta_h = self._heuristic(
            after_change, swap_indecies
        ) > self._heuristic(self.parent, swap_indecies)

        # if self greater than other, then 1 of 2 things happens:
        #     when self is earlier in the list, the swap will happen because we are going ascending
        #     vice-versa when self is later in the list
        # so, if the swap is encouraged by our heuristic, then we must mark self as greater than other
        #     only when it is earlier in the list
        # and when it is later in the list, then only when our heuristic discourages swapping places
        return (self_index < other_index and positive_delta_h) or (
            self_index > other_index and not positive_delta_h
        )

I’ve added a few explanatory comments, but essentially what it does is to override the comparison operator which is called at every step of the sorting process, and replace it with one that looks at the current state of the list, imagines swapping the items being compared to see what effect that would have on the list, and if swapping would be good, then make __gt__ return whatever it has to say “the later thing should be earlier in the schedule”.

So, when asked “Is A greater than B”, instead of something like:

Is the value of A > the value of B

it says:

If I swapped A and B around, would that make the list have a better sorting? If so, then yes, A is greater/less than B 🙂

Bit of special casing because we don’t know if self or other will be earlier in the list.


This base class can be inherited from to define a sortable class that provides any data the cost function might need. For example, this one just wraps a value that the heuristic function can access.

class ValueItem(SequenceItemBase):
    def __init__(self, value, parent=None, heuristic=None):
        self.value = value
        super().__init__(parent, heuristic)
    def __repr__(self):
        return str(self.value)

def prefer_sequences_in_ascending_order_heuristic(
    intermediate_state: Sequence[ValueItem],
    swap_indecies: Optional[Tuple[int, int]] = None,
):
    "This heuristic will return a larger number when the list is sorted in ascending order"
    return sum(index * item.value for index, item in enumerate(intermediate_state))

Here the heuristic is equivalent to just doing ascending order. You can see this here:

random_list_of_nums = []
source_nums = np.random.randint(1, 100, 100)
heuristic = prefer_sequences_in_ascending_order_heuristic

# wrap the random numbers in classes that all hold a reference to the containing list
# so that they can be sorted using the heuristic
for i in source_nums:
    random_list_of_nums.append(ValueItem(i, random_list_of_nums, heuristic))
before = random_list_of_nums.copy()
perfect = [ValueItem(value, None) for value in sorted(source_nums)]

print(f"{heuristic(before)/heuristic(perfect):0.0%}", before)

selection_sort(random_list_of_nums)
after = random_list_of_nums

print(f"{heuristic(after)/heuristic(perfect):0.0%}", after)

The list is sorted perfectly by value and the heuristic is maximised.


For a more applicable problem, there is a method in scheduling called “minimize average tardiness” meaning; “for some tasks each with a duration and due-date, what ordering minimises the average lateness/tardiness”:

class TardinessItem(SequenceItemBase):
    def __init__(self, duration, due_date, parent=None, heuristic=None):
        self.duration = duration
        self._due_date = due_date
        super().__init__(parent, heuristic)
    def tardiness(self, start_date):
        return max(0, start_date + self.duration - self._due_date)
    def __repr__(self):
        return f"{self.name}: duration {self.duration} day{'s' if self.duration > 1 else ''} - due in {self._due_date}"

def tardiness_values(sequence: Sequence[TardinessItem]):
    running_date_total = 0
    for item in sequence:
        yield item.tardiness(running_date_total)
        running_date_total += item.duration

def minimising_average_tardiness_heuristic(
    intermediate_state: Sequence[TardinessItem],
    swap_indecies: Optional[Tuple[int, int]] = None,
):
    #negative so that maximising this heuristic will minimise total tardiness
    return sum(-tardiness for tardiness in tardiness_values(intermediate_state))

Example:

timeline = []
# source_nums = list(zip(np.random.randint(1,10,10),np.random.randint(20,40,10)))
source_nums = zip([2, 7, 3, 8, 4, 4, 6, 8, 5], [5, 10, 15, 22, 23, 24, 25, 30, 33])

heuristic = minimising_average_tardiness_heuristic

for i, (duration, date) in enumerate(source_nums):
    timeline.append(
        TardinessItem(duration, date, timeline, minimising_average_tardiness_heuristic)
    )
    timeline[-1].name = string.ascii_uppercase[i]
pprint.pprint(timeline)
print(
    f"Average Tardiness: {np.average(list(tardiness_values(timeline)))}, Heuristic: {heuristic(timeline)}"
)

for _ in range(10):
    selection_sort(timeline)
after = timeline

pprint.pprint(after)

print(
    f"Average Tardiness: {np.average(list(tardiness_values(timeline)))}, Heuristic: {heuristic(timeline)}"
)

prints

[A: duration 2 days - due in 5,
 B: duration 7 days - due in 10,
 C: duration 3 days - due in 15,
 D: duration 8 days - due in 22,
 E: duration 4 days - due in 23,
 F: duration 4 days - due in 24,
 G: duration 6 days - due in 25,
 H: duration 8 days - due in 30,
 I: duration 5 days - due in 33]
Average Tardiness: 4.444444444444445, Heuristic: -40

[A: duration 2 days - due in 5,
 B: duration 7 days - due in 10,
 C: duration 3 days - due in 15,
 D: duration 8 days - due in 22,
 E: duration 4 days - due in 23,
 F: duration 4 days - due in 24,
 I: duration 5 days - due in 33,
 G: duration 6 days - due in 25,
 H: duration 8 days - due in 30]
Average Tardiness: 4.0, Heuristic: -36

which is the same output as MDD gives (another heuristic way to approach minimum tardiness scheduling).


Example sort algorithm

This approach is designed to be used with an in-place sort because parent effectively holds a live view of the intermediate steps when sorting, and at the moment selection_sort is used because it reflects the idea of swapping elements as a measure of progress, but I’m open to suggestions…

def selection_sort(nums):
    # This value of i corresponds to how many values were sorted
    for i in range(len(nums)):
        # We assume that the first item of the unsorted segment is the smallest
        lowest_value_index = i
        # This loop iterates over the unsorted items
        for j in range(i + 1, len(nums)):
            if nums[j] < nums[lowest_value_index]:
                lowest_value_index = j
        # Swap values of the lowest unsorted element with the first unsorted
        # element
        nums[i], nums[lowest_value_index] = nums[lowest_value_index], nums[i]

I originally planned to use python’s built-in list.sort based on quicksort, which is why I overloaded the native operator, however that method doesn’t update the list in place at every step, so the cost function wasn’t changing value.

Solution

swap_indecies -> swap_indices

Also, you have inconsistent type hinting on your methods: this one is complete –

def __init__(
    self,
    parent: Sequence[SequenceItemBase],
    heuristic: Callable[[Sequence[SequenceItemBase], Tuple[int, int]]],
):

but these are not:

def __gt__(self, other):

def __init__(self, value, parent=None, heuristic=None):

def __init__(self, duration, due_date, parent=None, heuristic=None):

The latter suggests that your original hints are incorrect and should be wrapped in Optional.

So, if I understand correctly, you’ve designed a class SequenceItemBase such that if there are two instances of SequenceItemBase, a and b, a < b might evaluate to True in the context of one list of SequenceItemBase instances, but the same expression might evaluate to False in the context of a different list of SequenceItemBase instances, despite none of the attributes of either a or b having changed in between the two tests.

My advice would be not to do that. Defining custom comparison dunder methods in python is common, and can often make object-oriented code much more readable, but I think outcomes like this would be very unexpected to other people reading your code. I think it would be a reasonable expectation for other readers of your code to think that if a < b evaluates as True in one context, then a < b will always evaluate as True unless the attributes of either a or b had changed. Rather than having any __lt__ or __gt__ methods at all, I would instead use a classmethod for sorting these sequences.

I also find the way you’re combining an object-oriented approach with elements of functional programming quite confusing. You define custom TardinessItem and ValueItem classes, which is an object-oriented approach. But despite the fact that you’re essentially developing a custom method of sorting instances of these classes, the main sort function is a function in the global namespace, not the class namespace. The way you’re passing in functions to objects for them to use in their comparison dunder methods also confuses me, given that the functions seem to be specific to a certain class. If a function is specific to a certain class, it should probably be defined in the class namespace.

Here’s a significant refactoring of your code which makes it more object-oriented and (in my opinion) easier to understand. Rather than have external functions passed into class instances, I’ve instead taken the approach of defining SequenceItemBase as an Abstract Base Class with one abstractmethod, get_sequence_score, fulfilling a similar purpose to your heuristic function. Subclasses of SequenceItemBase will fail to be instantiated at runtime if they don’t provide a concrete implementation of this abstractmethod. This strategy of isolating the specific parts of the algorithm that need to be modified by subclasses for more specific use-cases is known as the “template method pattern”.

Another major change I’ve made (along with lots of smaller changes) is to define your TardinessItem class as a dataclass. This makes sense, I think, given that the class is essentially a wrapper around structured data, and it saves us having to write an __init__ function that would be solely boilerplate code.

from __future__ import annotations

from abc import ABCMeta, abstractmethod
from typing import TypeVar, Type, List, Any, Protocol, Sequence
from dataclasses import dataclass
from pprint import pprint


S = TypeVar('S', bound='SequenceItem')


class SupportsComparisons(Protocol):
    """Return type for `SequenceItem.get_sequence_score`.
    Not used at runtime; solely for helping out static type checkers.
    """

    def __lt__(self, other: Any) -> bool: ...
    def __gt__(self, other: Any) -> bool: ...
    def __eq__(self, other: Any) -> bool: ...


class SequenceItem(metaclass=ABCMeta):
    __slots__ = ()
    
    @classmethod
    @abstractmethod
    def get_sequence_score(cls: Type[S], cls_sequence: Sequence[S]) -> SupportsComparisons:
        """Return a "sequence score" for a given sequence of `SequenceItem`s.

        If the swapping of two elements in the sequence results in a higher "sequence score",
        the two elements will be swapped when the sequence is sorted.

        This is an abstract method that must be defined in any concrete subclass of `SequenceItem`.
        """
    
    @classmethod
    def swapping_improves_order(
        cls: Type[S],
        list_before_change: List[S],
        item_a: S,
        item_a_index: int,
        item_b: S,
        item_b_index: int
    ) -> bool:
        """Determine whether swapping two items in a list would improve the order according to a given metric."""
        
        # store a copy of the current list state so we can "imagine" what effect...
        # ...swapping the two items would have on the "sequence score"
        list_after_change = list_before_change.copy()
        list_after_change[item_a_index], list_after_change[item_b_index] = item_b, item_a

        # whether the swap improved our "sequence score"
        positive_delta_h = cls.get_sequence_score(list_after_change) > cls.get_sequence_score(list_before_change)

        return (item_a_index < item_b_index) != positive_delta_h
    
    @classmethod
    def sort_sequence_in_place(cls: Type[S], cls_list: List[S]) -> None:
        """Sort a list of class instances in-place.

        The items in the list are sorted in such a way
        so as to maximise the list's "score" according to a given metric.

        The sorting algorithm used is the "selection_sort" algorithm.
        """ 
        
        for i, item_i in enumerate(cls_list):
            # We assume that the first item of the unsorted segment is the smallest
            lowest_value_index = i
            next_index = i + 1
            
            # This loop iterates over the unsorted items
            for j, item_j in enumerate(cls_list[next_index:], start=next_index):
                if cls.swapping_improves_order(cls_list, item_j, j, cls_list[lowest_value_index], lowest_value_index):
                    lowest_value_index = j
                    
            # Swap values of the lowest unsorted element with the first unsorted element
            cls_list[i], cls_list[lowest_value_index] = cls_list[lowest_value_index], item_i


T = TypeVar('T', bound='TardinessItem')


@dataclass
class TardinessItem(SequenceItem):
    __slots__ = 'name', 'duration', '_due_date'
    
    name: str
    duration: int
    _due_date: int

    def format_duration(self) -> str:
        """Helper method for __repr__"""
        
        duration = self.duration
        return f'duration {duration} day{"s" if duration > 1 else ""}'

    def __repr__(self) -> str:
        return f'{self.name}: {self.format_duration()} - due in {self._due_date}'

    def tardiness(self, start_date: int) -> int:
        """Determine how tardy this scheduled event would be if it started on a given date."""
        return max(0, start_date + self.duration - self._due_date)

    @classmethod
    def total_tardiness_of_sequence(cls: Type[T], cls_sequence: Sequence[T]) -> int:
        """Determine the total tardiness of a given sequence of `TardinessItem`s."""
        
        total_tardiness, running_date_total = 0, 0

        for item in cls_sequence:
            total_tardiness += item.tardiness(running_date_total)
            running_date_total += item.duration

        return total_tardiness

    @classmethod
    def get_sequence_score(cls: Type[T], cls_sequence: Sequence[T]) -> int:
        """Determine the "sequence score" of a given sequence of `TardinessItem`s.

        The greater the total tardiness of a sequence, the lower its sequence score.
        """
        
        return ( - cls.total_tardiness_of_sequence(cls_sequence))
    

def run_example(
    name_inputs: Sequence[str],
    duration_inputs: Sequence[int],
    date_inputs: Sequence[int]
) -> None:
    
    timeline = [
        TardinessItem(name, duration, date)
        for name, duration, date in zip(name_inputs, duration_inputs, date_inputs)
    ]

    initial_total_tardiness = TardinessItem.total_tardiness_of_sequence(timeline)
    initial_average_tardiness = initial_total_tardiness / len(timeline)
    initial_sequence_score = ( - initial_total_tardiness)
    print('### RESULTS PRIOR TO SORTING ###n')
    pprint(timeline)
    print(f"nAverage Tardiness: {initial_average_tardiness}, Heuristic: {initial_sequence_score}")

    print()
    TardinessItem.sort_sequence_in_place(timeline)

    total_tardiness_after = TardinessItem.total_tardiness_of_sequence(timeline)
    average_tardiness_after = total_tardiness_after / len(timeline)
    sequence_score_after = ( - total_tardiness_after)
    print('### RESULTS AFTER SORTING ###n')
    pprint(timeline)
    print(f"nAverage Tardiness: {average_tardiness_after}, Heuristic: {sequence_score_after}")


if __name__ == '__main__':
    from string import ascii_uppercase
    duration_inputs = [2, 7, 3, 8, 4, 4, 6, 8, 5]
    date_inputs = [5, 10, 15, 22, 23, 24, 25, 30, 33]
    run_example(ascii_uppercase, duration_inputs, date_inputs)

Output:

### RESULTS PRIOR TO SORTING ###

[A: duration 2 days - due in 5,
 B: duration 7 days - due in 10,
 C: duration 3 days - due in 15,
 D: duration 8 days - due in 22,
 E: duration 4 days - due in 23,
 F: duration 4 days - due in 24,
 G: duration 6 days - due in 25,
 H: duration 8 days - due in 30,
 I: duration 5 days - due in 33]

Average Tardiness: 4.444444444444445, Heuristic: -40

### RESULTS AFTER SORTING ###

[A: duration 2 days - due in 5,
 B: duration 7 days - due in 10,
 C: duration 3 days - due in 15,
 D: duration 8 days - due in 22,
 E: duration 4 days - due in 23,
 F: duration 4 days - due in 24,
 I: duration 5 days - due in 33,
 G: duration 6 days - due in 25,
 H: duration 8 days - due in 30]

Average Tardiness: 4.0, Heuristic: -36

Leave a Reply

Your email address will not be published. Required fields are marked *