Problem
What do you think? In particular, I’m not sure about my exception handling approach to raise errors when posting objects; but I couldn’t come up with anything better.
public class Thing
{
public Thing(int number)
{
Number = number;
}
public int Number { get; set; }
}
class Program
{
static void Main(string[] args)
{
var thingBatcher = new Batcher<Thing>(ProcessBatch, 100, TimeSpan.FromSeconds(30));
try
{
for (var i = 0; i < int.MaxValue; i++)
{
thingBatcher.Post(new Thing(i));
}
}
catch (Exception ex)
{
Console.WriteLine("Here:" + ex.Message);
throw;
}
Console.ReadLine();
}
private static void ProcessBatch(IEnumerable<Thing> things)
{
Console.WriteLine("Batch found: " + things.Count());
if (things.Any(thing => thing.Number == 400))
{
throw new InvalidOperationException("Something bad happened");
}
}
}
public class Batcher<T>
{
private BufferBlock<T> _buffer;
private ActionBlock<IEnumerable<T>> _actionBlock;
public Batcher(
Action<IEnumerable<T>> processBatch,
int batchSize,
TimeSpan maxTimeWaitingForBatch)
{
Initialize(batchSize, processBatch, maxTimeWaitingForBatch);
}
private void Initialize(int batchSize,
Action<IEnumerable<T>> processBatch,
TimeSpan maxTimeWaitingForBatch)
{
_buffer = new BufferBlock<T>();
var batchStockEvents = new BatchBlock<T>(batchSize);
// Use a timer to make sure that items do not remain in memory for too long
var triggerBatchTimer = new Timer(delegate { batchStockEvents.TriggerBatch(); });
// Use a transform block to reset the timer whenever an item is inserted to avoid unnessary batches
var timeoutBlock = new TransformBlock<T, T>((value) =>
{
triggerBatchTimer.Change(maxTimeWaitingForBatch, TimeSpan.FromMilliseconds(-1));
return value;
});
_actionBlock = new ActionBlock<IEnumerable<T>>(processBatch);
_buffer.LinkTo(timeoutBlock);
timeoutBlock.LinkTo(batchStockEvents);
batchStockEvents.LinkTo(_actionBlock);
_actionBlock.Completion.ContinueWith(x =>
{
// todo log exception here
Console.WriteLine(x.Exception.Message);
}, TaskContinuationOptions.OnlyOnFaulted);
}
public void Post(T someObject)
{
if (_actionBlock.Completion.IsCompleted)
{
throw new InvalidOperationException("The batcher is not accepting new objects as it is in a failed state");
}
_buffer.Post(someObject);
}
}
Solution
var batchStockEvents = new BatchBlock<T>(batchSize);
I think that batchStockEvents
is a weird name for a BatchBlock
. It sounds like it’s a collection of events, or something like that.
new Timer(delegate { batchStockEvents.TriggerBatch(); });
While using delegate
this way allows you to ignore the parameter, I think it’s cleaner (and shorter) to use lambda and use _
as the parameter name to indicate it’s ignored:
new Timer(_ => batchStockEvents.TriggerBatch());
// Use a transform block to reset the timer whenever an item is inserted to avoid unnessary batches
Wouldn’t it be enough to reset the timer whenever a batch is triggered? It would be more efficient and you would have one less block.
Also, the comment has a typo, it should be “unnecessary”.
What’s the purpose of the _buffer
block? Couldn’t Post()
post directly to timeoutBlock
?
In particular, I’m not sure about my exception handling approach to raise errors when posting objects; but I couldn’t come up with anything better.
Yeah, you can’t rely on that check in Post()
because by the time the failure happens, all items could have been posted. But if you just want to stop doing useless work by trying to Post()
items that can never be processed, then I think this is the right approach.