How to process multiple background concurrent Tasks in c#

Posted on

Problem

I am trying to work out the best way to go about a task which relys on multiple long running tasks taking place.

My use case is that I want to have multiple events running for set periods of time, where I have websocket connections to each event for that period.

My thoughts were that I keep a conurrent list of all active events, when a new event pops into the list, it spawns a new thread to handle the event, when the event pops off the list, this thread will be closed.

Is this a good way to go about it? I am trying to set up a proof of concept, where all I am doing is logging out the event ID to the console for now, it kind of works, but I haven’t worked out a way to remove the thread yet etc.

Any advise anyone can give I would be really appreciative.

public class EventProcessingService : IHostedService, IDisposable
{
    private readonly ILogger<EventProcessingService> _logger;
    private readonly ICacheService _cacheService;
    private const int MaxThreads = 10;
    private static readonly CountdownEvent cde = new CountdownEvent(MaxThreads);

    public static readonly BlockingCollection<int> eventIds = new BlockingCollection<int>();

    ConcurrentBag<int> EventIdsProcessing = new ConcurrentBag<int>();

    private Timer _timer = null!;

    public EventProcessingService(ILogger<EventProcessingService> logger, ICacheService cacheService)
    {
        _logger = logger;
        _cacheService = cacheService;

        for (int i = 0; i < MaxThreads; i++)
        {
            Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
        }
    }

    public Task StartAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Timed Hosted Service running.");

        _timer = new Timer(DoWork, null, TimeSpan.Zero,
            TimeSpan.FromSeconds(5));

        return Task.CompletedTask;
    }

    private void DoWork(object? state)
    {
        var ids = _cacheService.GetCachedEventIds();

        foreach (var id in ids)
        {
            if (!EventIdsProcessing.Contains(id))
            {
                EventIdsProcessing.Add(id);
                eventIds.Add(id);
            }
        }

        cde.Wait();
    }

    private async Task Process()
    {
        foreach (var id in eventIds.GetConsumingEnumerable())
        {
            cde.Signal();
            while (true)
            {
                Console.WriteLine(id);
                await Task.Delay(1000);
            }
        }
    }

    public Task StopAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Timed Hosted Service is stopping.");

        _timer?.Change(Timeout.Infinite, 0);

        return Task.CompletedTask;
    }

    public void Dispose()
    {
        _timer?.Dispose();
    }
}

Solution

private members

  • I know naming is hard and MSDN has a lots of bad examples but cde is not a really good name
    • Try to capture what does it limit, like
      • ConcurrentProcessThrottler
      • ConcurrentProcessLimiter
      • etc.
  • Same applies for _timer, try to capture the essence why did you introduce it
  • Please try to follow consistent naming pattern
    • Inconsistent: EventIdsProcessing, _cacheService, cde, etc.
    • Either use underscore prefix for all your private members or do not prefix them
  • I know it is a POC but I would suggest to receive the maxThreads as a constructor parameter rather than using a hard-coded const
    • Tasks are not Threads, so a way better name would be
      • MaxDegreeOfParallelism
      • ThresholdForMaxConcurrency
      • etc.

public member

  • Please try to use Pascal Casing for public member (eventIds)
    • It is unclear why it should be public

EventProcessingService constructor

  • Try to express your intent by using the discard operator
    • If you want to just fire off a new Task and you don’t care about the Task itself then make this intent explicit
_ = Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
  • Here the StartNew returns a Task<Task> so you need to call Unwrap to have a flattened Task
    • Please prefer Task.Run over StartNew since the latter one might be dangerous

Process

  • Using GetConsumingEnumerable works fine if the producer side calls the CompleteAdding to signal it will not produce new elements
  • I assume that your infinite loop simulates some real processing logic
    • Based on your code I don’t see how will it move from the first element to the next since you have an infinite loop inside the loop body

StartAsync

  • I do believe you should kick off your concurrent Process workers/consumers here, not inside the constructor
    • With that you would be able to pass the CancellationToken to the Task.Run and to the Process as well
  • I would also recommend to add protection against multiple StartAsync calls
    • A StartAsync should have any affect only if it was not called before or if there was a completed StopAsync prior it

DoWork

  • It took me a couple of seconds to realize that DoWork has to match to TimerCallback delegate that’s why it has a object? state parameter
    • Please consider to add a comment there for future maintainers or to enhance legibility
  • As I said several times please try to use better naming
    • Here your DoWork acts like a single producer, please try to capture this information inside the method name
  • Please bear in my that ConcurrentBag is thread-safe if you perform atomic operation
    • Performing Contains then Add is not atomic << not thread-safe
    • Please consider to use lock or use ConcurrentDictionary which does expose TryAdd

Dispose

Leave a Reply

Your email address will not be published. Required fields are marked *