Problem
I am trying to exploit asynchronism for parallelism. This is my first attempt at a parallel event subscriber. In your expert opinions, is this a valid approach?
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using EventStore.ClientAPI;
namespace Sandbox
{
public class SomeEventSubscriber
{
private Position? _latestPosition;
private readonly Dictionary<Type, Action<object>> _eventHandlerMapping;
private IEventStoreConnection _connection;
public Dictionary<Type, Action<object>> EventHandlerMapping
{
get { return _eventHandlerMapping; }
}
public SomeEventSubscriber()
{
_eventHandlerMapping = CreateEventHandlerMapping();
_latestPosition = Position.Start;
}
public void Start()
{
ConnectToEventstore();
}
private void ConnectToEventstore()
{
_connection = EventStoreConnectionWrapper.Connect();
_connection.Connected +=
(sender, args) => _connection.SubscribeToAllFrom(_latestPosition, false, EventOccured, LiveProcessingStarted, HandleSubscriptionDropped);
}
private Dictionary<Type, Action<object>> CreateEventHandlerMapping()
{
return new Dictionary<Type, Action<object>>
{
{typeof (FakeEvent1), o => Handle(o as FakeEvent1)},
{typeof (FakeEvent2), o => Handle(o as FakeEvent2)},
};
}
private void Handle(FakeEvent1 eventToHandle)
{
SomethingLongRunning(eventToHandle);
}
private void Handle(FakeEvent2 eventToHandle)
{
SomethingLongRunning(eventToHandle);
}
private void SomethingLongRunning(BaseFakeEvent eventToHandle)
{
Console.WriteLine("Start Handling: " + eventToHandle.GetType());
for (int i = 0; i < 10000000; i++)
{
}
Console.WriteLine("Finished Handling: " + eventToHandle.GetType());
}
private void EventOccured(EventStoreCatchUpSubscription eventStoreCatchUpSubscription,
ResolvedEvent resolvedEvent)
{
if (resolvedEvent.OriginalEvent.EventType.StartsWith("$") || resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$"))
return;
var @event = EventSerialization.DeserializeEvent(resolvedEvent.OriginalEvent);
if (@event != null)
{
var eventType = @event.GetType();
if (_eventHandlerMapping.ContainsKey(eventType))
{
var task = Task.Factory.StartNew(() => _eventHandlerMapping[eventType](@event));
Console.WriteLine("The task is running asynchronously...");
}
}
if (resolvedEvent.OriginalPosition != null) _latestPosition = resolvedEvent.OriginalPosition.Value;
}
private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception ex)
{
if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
{
//TODO: Wait and reconnect probably with back off
}
if (dropReason == SubscriptionDropReason.UserInitiated)
return;
if (SubscriptionDropMayBeRecoverable(dropReason))
{
Start();
}
}
private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
{
return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError ||
dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed;
}
private static void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription)
{
}
}
}
Solution
var task = Task.Factory.StartNew(() => _eventHandlerMapping[eventType](@event));
Console.WriteLine("The task is running asynchronously...");
task.Wait();
This code doesn’t make any sense. You’re running the event handler on another thread, but then you’re immediately waiting for it to finish. The end result is your code will behave exactly as if you called the handler directly, only with more code and some added overhead.