TimeoutManager: check whether a task has been finished within a given time

Posted on

Problem

I want to make a helper class, which raises an event whenever something is not finished in a certain time.

Example: my program sends out multiple network messages concurrently. After a random delay, each message should receive a single reply.
My helper class should raise an event whenever no reply is received for a specific message.

The TimeoutManager is used like this:

//When sending out a message, tell the manager to start a timer
timeoutManager.Add("Message: ID=001")

//When receiving a reply, tell the manager to stop waiting for a timeout 
bool replyWasExpected = timeoutManager.MarkOldestItemAsFinished(x=>x.Contains("ID=001");

//When no reply is received in time, this event will be called.
timeoutManager.OnTimeout += (evt, args) =>{Console.WriteLine("TIMEOUT! NO REPLY!");};

I am looking at suggestions to solve this more elegantly, especially in regards to the data structures used.

The solution I implemented has some ugly hacks, detailed below. I am happy with the interface, but I dislike the code behind it. I am currently on net 4.0, so async/await is not available.

The whole code is below. The main points are:

TimeoutManager<T_Item> is a generic timeout-manager, and should deal with an arbitrary “item” or “work unit”. I used strings or ints in the examples, but in practise, I would use some sort of Message object for each outgoing message.

It uses the BlockingCollection itemsWaitingForTimeout to remember each item,
and a single dedicated thread timeoutLoop() to wait for the timeout for all items.

Hack 1: I add each new item twice to the BlockingCollection. This way I can still mark an item as finished, after the dedicated thread has already removed it from the collection, but before the timeout occurs.

Hack 2: The dedicated thread uses Thread.Sleep() to wait for the timeout.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace McatXml
{

    public class TimeoutManager<T_Item>
    {
        public class TimeoutEventArgs : EventArgs
        {
            public T_Item Item { get; private set; }
            public TimeoutEventArgs(T_Item item) { this.Item = item; }
        }
        /// <summary> called whenever an item is not finished before the timeout </summary>
        public event EventHandler<TimeoutEventArgs> OnTimeout;


        private readonly TimeSpan timeout = TimeSpan.FromSeconds(30);
        private BlockingCollection<ItemWithTimeout> itemsWaitingForTimeout = new BlockingCollection<ItemWithTimeout>();

        /// <summary> private wrapper, to decorate an item with a timeout </summary>
        private class ItemWithTimeout
        {
            internal readonly T_Item Item;
            internal readonly Stopwatch Watch;
            internal volatile bool FinishedWaiting;
            internal ItemWithTimeout(T_Item item)
            {
                this.Item = item;
                this.Watch = Stopwatch.StartNew();
                this.FinishedWaiting = false;
            }
        }

        public TimeoutManager(TimeSpan timeout)
        {
            this.timeout = timeout;
            OnTimeout += (sender, args) => { };
            Task loop = new Task(this.timeoutLoop, TaskCreationOptions.LongRunning);
            loop.Start();
        }

        public void Add(T_Item item)
        {
            var itemExt = new ItemWithTimeout(item);
            itemsWaitingForTimeout.Add(itemExt);
            itemsWaitingForTimeout.Add(itemExt); // this is an ugly hack!
        }

        /// <summary> mark all items as finished, that fit the given condition  </summary>
        public bool MarkAllAsFinished(Func<T_Item, bool> isMatch = null)
        {
            return markAsFinished(stopAfterFirstHit: false, reverseOrder: false, isMatch: isMatch);
        }
        /// <summary> mark the most recent item as finished, that fits the given condition  </summary>
        public bool MarkNewestAsFinished(Func<T_Item, bool> isMatch = null)
        {
            return markAsFinished(stopAfterFirstHit: true, reverseOrder: true, isMatch: isMatch);
        }
        /// <summary> mark the oldest item as finished, that fits the given condition  </summary>
        public bool MarkOldestAsFinished(Func<T_Item, bool> isMatch = null)
        {
            return markAsFinished(stopAfterFirstHit: true, reverseOrder: false, isMatch: isMatch);
        }

        /// <summary> mark items as finished, that fit the given condition  </summary>
        private bool markAsFinished(bool stopAfterFirstHit, bool reverseOrder, Func<T_Item, bool> isMatch = null)
        {
            // get a snapshot of all currently waiting items
            var items = this.itemsWaitingForTimeout.ToArray();

            bool success = false;

            // start with the oldest or newest item?
            int startIdx = reverseOrder ? items.Length - 1 : 0;
            int inc = reverseOrder ? -1 : 1;

            for (int i = startIdx; i < items.Length && i >= 0; i += inc)
            {
                var item = items[i];
                if (item.FinishedWaiting) continue; // the item is already marked as finished
                if (isMatch == null || isMatch(item.Item))
                {
                    lock (item)
                    {
                        if (item.FinishedWaiting) continue; // another thread marked this as finished while we were waiting for the lock
                        item.FinishedWaiting = true;
                    }
                    success = true; // we found an item and marked it as finished
                    if (stopAfterFirstHit) break; // should we look for further items?
                }
            }
            return success; // did we find an item and mark it as finished?
        }

        /// <summary> for all items that are not finished, check whether their time is up  </summary>
        private void timeoutLoop()
        {
            foreach (var item in itemsWaitingForTimeout.GetConsumingEnumerable())
            {
                if (item.FinishedWaiting) continue; // item has already been finished

                while (!item.FinishedWaiting && item.Watch.Elapsed < this.timeout)
                {
                    // wait until the timeout has passed or the item is finished
                    Thread.Sleep(
                        TimeSpan.FromMilliseconds(Math.Max(1,
                                                           this.timeout.TotalMilliseconds -
                                                           item.Watch.ElapsedMilliseconds)));
                }

                if (item.FinishedWaiting) continue; // item has been finished while we were waiting
                lock (item)
                {
                    if (item.FinishedWaiting) continue; // item has been finished while we ackquired the lock

                    item.FinishedWaiting = true;
                }
                // item has not been finished in time!
                OnTimeout(this, new TimeoutEventArgs(item.Item));
            }
        }
    }
    static class Program
    {
        static void Main()
        {

            TimeoutManager<int> test = new TimeoutManager<int>(TimeSpan.FromSeconds(5));
            test.OnTimeout += (sender, args) => { Console.WriteLine("Timeout: {0}", args.Item); };

            // start 100 new timers, in random order
            Parallel.ForEach(Enumerable.Range(0, 100).OrderBy(i => Guid.NewGuid()), (i) =>
                {
                    test.Add(i);
                });

            // mark 90 tasks as finished 
            Parallel.ForEach(Enumerable.Range(0 , 90).OrderBy(i => Guid.NewGuid()), (i) =>
                {
                    if (!test.MarkAllAsFinished(x => x == i))
                        Console.WriteLine("could not mark as finished: {0}", i);
                });

            // after 5 seconds, 10 timeout events should be executed.
            Console.ReadLine();



        }
    }
}

Is there a simpler way to store my items and mark them as finished besides the BlockingCollection with double entries?

Did I overcomplicate this too much? Will my coworkers curse me If I use this in production?

I like the fact that the OnTimeout events will not be raised all at once, if 1000 timeouts happen at the same time. Nevertheless I am unsure whether the dedicated thread is justified.

Solution

Yes, I think you’ve overcomplicated this a bit. The BlockingCollection is not a great fit here, a simple list will do fine. Something like this perhaps:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Caching;
using System.Threading;
using System.Threading.Tasks;

public class SimpleTimeoutManager<T> : IDisposable
{
    private readonly List<ListEntry> _queue = new List<ListEntry>();
    private readonly TimeSpan _timeout;
    private readonly TimeSpan _timerInterval;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly CancellationToken _cancellationToken;

    public SimpleTimeoutManager(TimeSpan? timeout = null, TimeSpan? timerInterval = null)
    {
        _timeout = timeout ?? TimeSpan.FromSeconds(1);
        _timerInterval = timerInterval ?? TimeSpan.FromMilliseconds(100);
        _cancellationTokenSource = new CancellationTokenSource();
        _cancellationToken = _cancellationTokenSource.Token;
        new Task(Timer, TaskCreationOptions.LongRunning).Start();
    }

    public event EventHandler<TimeoutEventArgs> OnTimeout = delegate { };

    public void Dispose()
    {
        _cancellationTokenSource.Cancel();
    }

    public void Add(T item)
    {
        lock (_queue)
        {
            _queue.Add(new ListEntry(item, _timeout));
        }
    }

    public bool MarkAllAsFinished(Func<T, bool> isMatch = null)
    {
        isMatch = isMatch ?? (item => true);
        lock (_queue)
        {
            return _queue.RemoveAll(entry => isMatch(entry.Item)) > 0;
        }
    }

    public bool MarkNewestAsFinished(Func<T, bool> isMatch = null)
    {
        return MarkFirstMatchAsFinished(_queue.FindLastIndex, isMatch);
    }

    public bool MarkOldestAsFinished(Func<T, bool> isMatch = null)
    {
        return MarkFirstMatchAsFinished(_queue.FindIndex, isMatch);
    }

    private bool MarkFirstMatchAsFinished(Func<Predicate<ListEntry>, int> indexOfEntry, Func<T, bool> isMatch = null)
    {
        isMatch = isMatch ?? (item => true);
        lock (_queue)
        {
            var index = indexOfEntry(entry => isMatch(entry.Item));
            bool found = index >= 0;
            if (found)
            {
                _queue.RemoveAt(index);
            }

            return found;
        }
    }

    private void Timer()
    {
        while (!_cancellationToken.IsCancellationRequested)
        {
            DateTime wakeupAt = _queue.FirstOrDefault()?.Expires ?? DateTime.Now.Add(_timerInterval);
            var delay = wakeupAt - DateTime.Now;
            Thread.Sleep(delay);

            lock (_queue)
            {
                var expiredEntry = _queue.TakeWhile(entry => entry.HasExpired()).FirstOrDefault();
                if (expiredEntry != null)
                {
                    _queue.RemoveAt(0);
                    OnTimeout.Invoke(this, new TimeoutEventArgs(expiredEntry.Item));
                }
            }
        }
    }

    public class TimeoutEventArgs : EventArgs
    {
        public readonly T Item;

        public TimeoutEventArgs(T item)
        {
            Item = item;
        }
    }

    private class ListEntry
    {
        public readonly T Item;
        public readonly DateTime Expires;

        public ListEntry(T item, TimeSpan timeout)
        {
            Item = item;
            Expires = DateTime.Now.Add(timeout);
        }

        public bool HasExpired()
        {
            return Expires < DateTime.Now;
        }
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *