Problem
I am trying to work out the best way to go about a task which relys on multiple long running tasks taking place.
My use case is that I want to have multiple events running for set periods of time, where I have websocket connections to each event for that period.
My thoughts were that I keep a conurrent list of all active events, when a new event pops into the list, it spawns a new thread to handle the event, when the event pops off the list, this thread will be closed.
Is this a good way to go about it? I am trying to set up a proof of concept, where all I am doing is logging out the event ID to the console for now, it kind of works, but I haven’t worked out a way to remove the thread yet etc.
Any advise anyone can give I would be really appreciative.
public class EventProcessingService : IHostedService, IDisposable
{
private readonly ILogger<EventProcessingService> _logger;
private readonly ICacheService _cacheService;
private const int MaxThreads = 10;
private static readonly CountdownEvent cde = new CountdownEvent(MaxThreads);
public static readonly BlockingCollection<int> eventIds = new BlockingCollection<int>();
ConcurrentBag<int> EventIdsProcessing = new ConcurrentBag<int>();
private Timer _timer = null!;
public EventProcessingService(ILogger<EventProcessingService> logger, ICacheService cacheService)
{
_logger = logger;
_cacheService = cacheService;
for (int i = 0; i < MaxThreads; i++)
{
Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
}
}
public Task StartAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Timed Hosted Service running.");
_timer = new Timer(DoWork, null, TimeSpan.Zero,
TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
private void DoWork(object? state)
{
var ids = _cacheService.GetCachedEventIds();
foreach (var id in ids)
{
if (!EventIdsProcessing.Contains(id))
{
EventIdsProcessing.Add(id);
eventIds.Add(id);
}
}
cde.Wait();
}
private async Task Process()
{
foreach (var id in eventIds.GetConsumingEnumerable())
{
cde.Signal();
while (true)
{
Console.WriteLine(id);
await Task.Delay(1000);
}
}
}
public Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Timed Hosted Service is stopping.");
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
public void Dispose()
{
_timer?.Dispose();
}
}
Solution
private
members
- I know naming is hard and MSDN has a lots of bad examples but
cde
is not a really good name- Try to capture what does it limit, like
ConcurrentProcessThrottler
ConcurrentProcessLimiter
- etc.
- Try to capture what does it limit, like
- Same applies for
_timer
, try to capture the essence why did you introduce it- I’m not sure that you really need to use the null-forgiving / damn-it operator
- Please try to follow consistent naming pattern
- Inconsistent:
EventIdsProcessing
,_cacheService
,cde
, etc. - Either use underscore prefix for all your
private
members or do not prefix them
- Inconsistent:
- I know it is a POC but I would suggest to receive the
maxThreads
as a constructor parameter rather than using a hard-codedconst
- Tasks are not Threads, so a way better name would be
MaxDegreeOfParallelism
ThresholdForMaxConcurrency
- etc.
- Tasks are not Threads, so a way better name would be
public
member
- Please try to use Pascal Casing for public member (
eventIds
)- It is unclear why it should be
public
- It is unclear why it should be
EventProcessingService
constructor
- Try to express your intent by using the discard operator
- If you want to just fire off a new
Task
and you don’t care about theTask
itself then make this intent explicit
- If you want to just fire off a new
_ = Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
- Here the
StartNew
returns aTask<Task>
so you need to callUnwrap
to have a flattenedTask
- Please prefer
Task.Run
overStartNew
since the latter one might be dangerous
- Please prefer
Process
- Using
GetConsumingEnumerable
works fine if the producer side calls theCompleteAdding
to signal it will not produce new elements - I assume that your infinite loop simulates some real processing logic
- Based on your code I don’t see how will it move from the first element to the next since you have an infinite loop inside the loop body
StartAsync
- I do believe you should kick off your concurrent
Process
workers/consumers here, not inside the constructor- With that you would be able to pass the
CancellationToken
to theTask.Run
and to theProcess
as well
- With that you would be able to pass the
- I would also recommend to add protection against multiple
StartAsync
calls- A
StartAsync
should have any affect only if it was not called before or if there was a completedStopAsync
prior it
- A
DoWork
- It took me a couple of seconds to realize that
DoWork
has to match toTimerCallback
delegate that’s why it has aobject? state
parameter- Please consider to add a comment there for future maintainers or to enhance legibility
- As I said several times please try to use better naming
- Here your
DoWork
acts like a single producer, please try to capture this information inside the method name
- Here your
- Please bear in my that
ConcurrentBag
is thread-safe if you perform atomic operation- Performing
Contains
thenAdd
is not atomic << not thread-safe - Please consider to use
lock
or useConcurrentDictionary
which does exposeTryAdd
- Performing
Dispose
- Please try to implement the Dispose pattern as it should be