Stack Code Review

Batcher using Microsoft.Tpl.Dataflow

Problem

What do you think? In particular, I’m not sure about my exception handling approach to raise errors when posting objects; but I couldn’t come up with anything better.

public class Thing
{
    public Thing(int number)
    {
        Number = number;
    }

    public int Number { get; set; }
}

class Program
{
    static void Main(string[] args)
    {
        var thingBatcher = new Batcher<Thing>(ProcessBatch, 100, TimeSpan.FromSeconds(30));

        try
        {
            for (var i = 0; i < int.MaxValue; i++)
            {
                thingBatcher.Post(new Thing(i));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("Here:" + ex.Message);                
            throw;
        }

        Console.ReadLine();
    }

    private static void ProcessBatch(IEnumerable<Thing> things)
    {
        Console.WriteLine("Batch found: " + things.Count());


        if (things.Any(thing => thing.Number == 400))
        {
            throw new InvalidOperationException("Something bad happened");
        }
    }
}

public class Batcher<T>
{
    private BufferBlock<T> _buffer;

    private ActionBlock<IEnumerable<T>> _actionBlock;

    public Batcher(
        Action<IEnumerable<T>> processBatch,
        int batchSize,
        TimeSpan maxTimeWaitingForBatch)
    {
        Initialize(batchSize, processBatch, maxTimeWaitingForBatch);
    }

    private void Initialize(int batchSize, 
        Action<IEnumerable<T>> processBatch,
        TimeSpan maxTimeWaitingForBatch)
    {
        _buffer = new BufferBlock<T>();

        var batchStockEvents = new BatchBlock<T>(batchSize);

        // Use a timer to make sure that items do not remain in memory for too long
        var triggerBatchTimer = new Timer(delegate { batchStockEvents.TriggerBatch(); });

        // Use a transform block to reset the timer whenever an item is inserted to avoid unnessary batches
        var timeoutBlock = new TransformBlock<T, T>((value) =>
        {
            triggerBatchTimer.Change(maxTimeWaitingForBatch, TimeSpan.FromMilliseconds(-1));
            return value;
        });

        _actionBlock = new ActionBlock<IEnumerable<T>>(processBatch); 

        _buffer.LinkTo(timeoutBlock);
        timeoutBlock.LinkTo(batchStockEvents);
        batchStockEvents.LinkTo(_actionBlock);

        _actionBlock.Completion.ContinueWith(x =>
        {
            // todo log exception here
            Console.WriteLine(x.Exception.Message);

        }, TaskContinuationOptions.OnlyOnFaulted);
    }

    public void Post(T someObject)
    {
        if (_actionBlock.Completion.IsCompleted)
        {
            throw new InvalidOperationException("The batcher is not accepting new objects as it is in a failed state");    
        }

        _buffer.Post(someObject);
    }
}

Solution

var batchStockEvents = new BatchBlock<T>(batchSize);

I think that batchStockEvents is a weird name for a BatchBlock. It sounds like it’s a collection of events, or something like that.


new Timer(delegate { batchStockEvents.TriggerBatch(); });

While using delegate this way allows you to ignore the parameter, I think it’s cleaner (and shorter) to use lambda and use _ as the parameter name to indicate it’s ignored:

new Timer(_ => batchStockEvents.TriggerBatch());

// Use a transform block to reset the timer whenever an item is inserted to avoid unnessary batches

Wouldn’t it be enough to reset the timer whenever a batch is triggered? It would be more efficient and you would have one less block.

Also, the comment has a typo, it should be “unnecessary”.


What’s the purpose of the _buffer block? Couldn’t Post() post directly to timeoutBlock?


In particular, I’m not sure about my exception handling approach to raise errors when posting objects; but I couldn’t come up with anything better.

Yeah, you can’t rely on that check in Post() because by the time the failure happens, all items could have been posted. But if you just want to stop doing useless work by trying to Post() items that can never be processed, then I think this is the right approach.

Exit mobile version