Working with threads – shared access and graceful cancellation

Posted on

Problem

I’ve created this program to show how to stop threads in a controlled manner.

Important points in the program:

  • Starting and waiting for threads
  • Exception handling
  • Stopping threads in a controlled manner via the CancellationToken.
  • Overall simplicity and correctness.

The scenario

3 tasks with access to the same List that keep working on the list for 10 seconds and then stop. The 3rd task throws an exception.

The output

A is working. Job number: 0
B is working. Job number: 0
C throwing after 1000ms
A is working. Job number: 1
A is working. Job number: 2
B is working. Job number: 1
A is working. Job number: 3
B is working. Job number: 2
A is working. Job number: 4
Oh no. System.AggregateException thrown: One or more errors occurred.
Exceptions:
        C throwing after 1000ms
Elapsed 10228ms
8,3,4,2,7,0,0,2

The program

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TwoThreadsFor10Secs
{
    class GoForTenSeconds
    {
        private object _lock = new object();
        private List<int> _list = new List<int>();
        private Random _rnd = new Random();

        public List<int> List {
            get {
                lock (_lock)
                {
                    return _list.ToList();
                }
            }
        }

        private async Task DoWorkAsync(string taskId, CancellationToken cancelToken, int sleepInMs)
        {
            int i = 0;
            while (true)
            {
                if (cancelToken.IsCancellationRequested) return;
                lock (_lock)
                {
                   Console.WriteLine($"{taskId} is working. Job number: {i++}");
                    _list.Add(_rnd.Next(0, 10));
                }
                await Task.Delay(sleepInMs, cancelToken);
            }
        }

        private async Task ThrowAfterAsync(string taskId, CancellationToken cancelToken, int afterMs )
        {
            await Task.Delay(afterMs, cancelToken);
            var msg = $"{taskId} throwing after {afterMs}ms";
            Console.WriteLine(msg);
            throw new ApplicationException(msg);
        }

        public async Task<List<int>> GoForNSecondsAsync( int howLongSecs)
        {
            using (var cancelSource = new CancellationTokenSource())
            {
                Task[] tasks = null;
                try
                {
                    var cancelToken = cancelSource.Token;
                    tasks = new[] {  Task.Run( async () => await DoWorkAsync("A",cancelToken,2000)),
                              Task.Run(async () => await DoWorkAsync("B", cancelToken,4000)),
                              Task.Run(async () => await ThrowAfterAsync("C", cancelToken, 1000))
                           };
                    await Task.Delay(howLongSecs * 1000);
                    cancelSource.Cancel();
                    await Task.WhenAll(tasks);

                }
                catch (Exception)
                {
                    foreach (var task in tasks)
                    {
                        var taskEx = task.Exception;
                        if (taskEx != null)
                        {
                            Console.WriteLine($"Oh no. {taskEx.GetType()} thrown: {taskEx.Message}");

                            if (taskEx is AggregateException)
                            {
                                Console.WriteLine($"Exceptions:");
                                var aggregateEx = taskEx as AggregateException;
                                foreach (var individualEx in aggregateEx.InnerExceptions)
                                {
                                    Console.WriteLine("t"+individualEx.Message);
                                }
                            }
                        }
                    }
                }
                return _list;

            }
        }
    }
    class Program
    {
        static void Main(string[] args)
        {
            Stopwatch sw = new Stopwatch();
            var list = Task.Run(async () =>
            {
                sw.Start();
                return await new GoForTenSeconds().GoForNSecondsAsync(10);

            }).Result;

            var elapsedMs = sw.ElapsedMilliseconds;
            Console.WriteLine($"Elapsed {elapsedMs}ms");
            Console.WriteLine(string.Join(",", list));
            Console.ReadLine();
        }
    }
}

Solution

  • If you want to determine if something is of type X and if that’s the case do something with it’s value working as a X type variable you just need to safely cast it, instead of using the is operator combined with the as operator:
if (taskEx is AggregateException)
{
    Console.WriteLine($"Exceptions:");
    var aggregateEx = taskEx as AggregateException;
    foreach (var individualEx in aggregateEx.InnerExceptions)
    {
        Console.WriteLine("t" + individualEx.Message);
    }
}

Can be rewritten like this :

var ex = taskEx as AggregateException;
if (ex != null)
{
    Console.WriteLine($"Exceptions:");
    var aggregateEx = ex;
    foreach (var individualEx in aggregateEx.InnerExceptions)
    {
        Console.WriteLine("t" + individualEx.Message);
    }
}

However none of that is necessary since the Task.Exception is already of type AggregateException. Having that in mind your foreach can look like this :

foreach (var task in tasks)
{
    AggregateException taskEx = task.Exception;
    if (taskEx != null)
    {
        Console.WriteLine($"Oh no. {taskEx.GetType()} thrown: {taskEx.Message}");
        Console.WriteLine("Exceptions:");
        foreach (var individualEx in taskEx.InnerExceptions)
        {
            Console.WriteLine("t" + individualEx.Message);
        }
    }
}
  • The cost of simply entering a try/catch block is negligible. The cost of THROWING an exception isn’t, that’s why this is supposed to be an exceptional behaviour.

  • You don’t need to start a separate thread just to run an async operation your Task[] tasks can be simplified :

From this :

tasks = new[]
{
    Task.Run(async () => await DoWorkAsync("A", cancelToken, 2000)),
    Task.Run(async () => await DoWorkAsync("B", cancelToken, 4000)),
    Task.Run(async () => await ThrowAfterAsync("C", cancelToken, 1000))
};

To this :

tasks = new[]
{
    DoWorkAsync("A", cancelToken, 2000),
    DoWorkAsync("B", cancelToken, 4000),
    ThrowAfterAsync("C", cancelToken, 1000)
};

Leave a Reply

Your email address will not be published. Required fields are marked *