Tree processing for series and parallel execution

Posted on

Problem

I have a tree processing application where some of the nodes are “executed” in parallel and some in series. I managed to do this but I feel that the solution looks a bit “hacked up” and can be improved on.

I have a JobObject class which I am using to build a tree:

[Serializable]
public class JobObject
{
    public int JobId { get; set; }
    public string ParalleJoblId { get; set; }
    public bool ContinueOnFailure { get; set; }
    public int? ParentJobId { get; set; }
    public int? LinkJobId { get; set; }
    public bool IsRoot { get; set; }

    public List<JobObject> Children = new List<JobObject>();
}

The LinkJobId is used to define a link to another node where a node can have multiple parents. Basically a node dependent on more than one node or to be precise, more than one sequence. For example, a node with JobId 4 depends on both Jobs (IDs: 1,2) and Job (IDs: 3) and only when these two sequences complete execution, will I be able to execute Job (Id 4) and its following sequence.

The ParallelJobId is a GUID which is used to identify the parallel sequences.

Then there is a StatusInfo class which is associated (later on) with the tree:

[Serializable]
public class StatusInfo
{
    public DateTime? EndTime;
    public DateTime? StarTime;
    public Status Status;
    public int JobId;

    public StatusInfo()
    {
        Status = Status.Idle;
        StarTime = null;
        EndTime = null;
    }
}

This is how my Main looks like:

static void Main(string[] args)
{
    var uniqueId1 = Controller.UniqueName;
    var uniqueId2 = Controller.UniqueName;

    var jobObjects = new List<JobObject>
    {
        new JobObject {JobId = 1, ContinueOnFailure = true, ParalleJoblId = null, ParentJobId = null, LinkJobId = null},
        new JobObject {JobId = 2, ContinueOnFailure = true, ParalleJoblId = null, ParentJobId = 1 ,LinkJobId = null},
        new JobObject {JobId = 3, ContinueOnFailure = true, ParalleJoblId = uniqueId1, ParentJobId = 2 ,LinkJobId = 4},
        new JobObject {JobId = 4, ContinueOnFailure = true, ParalleJoblId = uniqueId1, ParentJobId = 3 ,LinkJobId = 6},
        new JobObject {JobId = 5, ContinueOnFailure = false, ParalleJoblId =uniqueId1 , ParentJobId = 2 ,LinkJobId = 6},
        new JobObject {JobId = 6, ContinueOnFailure = false, ParalleJoblId =null , ParentJobId = null,LinkJobId = null},
        new JobObject {JobId = 7, ContinueOnFailure = false, ParalleJoblId =uniqueId1 , ParentJobId = 2,LinkJobId = 8},
        new JobObject {JobId = 8, ContinueOnFailure = false, ParalleJoblId =uniqueId1 , ParentJobId = 7,LinkJobId = 6},
        new JobObject {JobId = 9, ContinueOnFailure = false, ParalleJoblId =null , ParentJobId = 6,LinkJobId = null},

        new JobObject {JobId = 10, ContinueOnFailure = false, ParalleJoblId =uniqueId2 , ParentJobId = 9,LinkJobId = 12},
        new JobObject {JobId = 11, ContinueOnFailure = false, ParalleJoblId =uniqueId2, ParentJobId = 9,LinkJobId = 12},
        new JobObject {JobId = 12, ContinueOnFailure = false, ParalleJoblId =null, ParentJobId = null,LinkJobId = null},
    };

    var tree = Utils.BuildTreeAndReturnRootNodes(jobObjects);

    var flatTree = Utils.Flatten(tree);

    var statusInfos = Utils.StatusInfos(flatTree);

    Utils.ExecuteJob(tree, ref statusInfos);
}

The supporting helper functions are something like this

public static JobObject BuildTreeAndReturnRootNodes(List<JobObject> flatItems)
{
    var byIdLookup = flatItems.ToLookup(i => i.JobId);
    foreach (var item in flatItems)
    {
        if (item.ParentJobId != null)
        {
            var parent = byIdLookup[item.ParentJobId.Value].First();
            parent.Children.Add(item);

            if (item.LinkJobId != null)
            {
                var child = byIdLookup[item.LinkJobId.Value].First() ?? null;
                if (child.ParentJobId == null)
                {
                    item.Children.Add(child);
                }
            }
        }

    }
    return flatItems.First(i => i.ParentJobId == null);
}

public static List<JobObject> Flatten(JobObject root)
{
    var flattened = new List<JobObject> { root };
    var children = root.Children;
    if (children != null)
    {
        foreach (var child in children)
        {
            flattened.AddRange(Flatten(child));
        }
    }

    return flattened.Distinct().ToList();
}

public static List<StatusInfo> StatusInfos(List<JobObject> jobs)
{
    var result = new List<StatusInfo>();

    jobs.ForEach(job => result.Add(new StatusInfo { JobId = job.JobId }));

    return result;
}

The ExecuteJob is the function responsible to execute the sequences. Here, a root and runner are initially pointed to the parent node and runner iterates through the tree and finds out the series and parallel sequences and passes the execution to the the helper functions for series and parallel runs.

public static void ExecuteJob(JobObject jobTreeRootNode, ref List<StatusInfo> statusInfos)
{
    JobObject runner = jobTreeRootNode;

    JobObject root = jobTreeRootNode;

    while (runner?.Children != null)
    {
        if (runner.Children.Count == 0)
        {
            break;
        }
        if (runner.Children.Count == 1)
        {
            runner = runner.Children.FirstOrDefault();
            continue;
        }
        if (runner.Children.Count > 1)
        {
            ExecuteSeriesRun(root, runner.JobId, ref statusInfos);
            root = runner = ExecuteParallelRun(runner.Children, ref statusInfos);
        }
    }

    ExecuteSeriesRun(root, root.JobId, ref statusInfos);

    statusInfos.ForEach(job => Console.WriteLine(Environment.NewLine + "Job {0} : Execution Status: {1} : Execution Start: {3} :  Execution Completed @ {2}", job.JobId, job.Status, job.EndTime, job.StarTime));

    Console.WriteLine("Completed Execution!!!");
}

ExecuteParallelRun uses TPL to run the jobs in parallel:

public static JobObject ExecuteParallelRun(List<JobObject> parallelSequences, ref List<StatusInfo> statusInfos)
{
    var endOfParallelChain = parallelSequences[0].Descendants().FirstOrDefault(node => node.ParalleJoblId == null);

    if (endOfParallelChain != null)
    {
        var taskList = new List<Task>();

        var infos = statusInfos;

        parallelSequences.ForEach(job =>
        {
            Action action = () => ExecuteJob(job, ref infos, false, endOfParallelChain.LinkJobId);
            var t = new Task(action);
            taskList.Add(t);
            t.Start();
        });

        var tasks = taskList.ToArray();
        Task.WaitAll(tasks);
    }

    return endOfParallelChain;
}

The series helper function looks like this:

public static void ExecuteSeriesRun(JobObject sequenceRoot, int? stopByJobId, ref List<StatusInfo> statusInfos)
{
    while (sequenceRoot != null)
    {
        if (
            (from statusObj in statusInfos where statusObj.JobId == sequenceRoot.JobId select statusObj.Status)
                .FirstOrDefault() == Status.Idle)
        {
            statusInfos.Where(w => w.JobId == sequenceRoot.JobId).ToList().ForEach(s =>
            {
                s.StarTime = DateTime.Now;
                s.Status = Status.Running;
            });

            LogManager.GetLogger<ParallelJob>()
                .Info(string.Format("Running Job {1} at {0}", DateTime.Now.ToString("O"), sequenceRoot.JobId));
            System.Threading.Thread.Sleep(2000);

            statusInfos.Where(w => w.JobId == sequenceRoot.JobId).ToList().ForEach(s =>
            {
                s.Status = Status.Completed;
                s.EndTime = DateTime.Now;
            });
        }
        if (sequenceRoot.JobId == stopByJobId)
            break;
        sequenceRoot = sequenceRoot.Children.FirstOrDefault();
    }
}

Parallel nodes are found using the ParallelJobId using the function:

public static IEnumerable<JobObject> Descendants(this JobObject root)
{
    var nodes = new Stack<JobObject>(new[] { root });
    while (nodes.Any())
    {
        JobObject node = nodes.Pop();
        yield return node;
        foreach (var n in node.Children) nodes.Push(n);
    }
}

As I said in the beginning, the overall solution does work. But introducing ParallelJobId and LinkJobId to get to the parallel sequence is a bit of a hack which I wanted to eliminate but cant get to fixing the code. Any help/code via which I can eliminate this would be really helpful.

Solution

JobObject

Why isn’t List<JobObject> Children a property but a field ?

StatusInfo

You have a spelling error. It should be StartTime instead of StarTime.

public static JobObject BuildTreeAndReturnRootNodes()

A public method which doesn’t check the validness of the passed arguments is a red sign. Always check if the passed arguments are valid like != null.

The name of the method implies that multiple values will be returned. Change the name to make it clear that onle one RootNode will be returned.

If by any chance there is an item which ParentJobId.Value isn’t in the lookup then an InvalidOperationException is thrown at var parent = byIdLookup[item.ParentJobId.Value].First();.

The very same is true for var child = byIdLookup[item.LinkJobId.Value].First() ?? null; where the null coalescing operator is pretty senseless, because if something is null why should one replace it with null. But hey its not only useless but dangerous as well, because if it would work at all, then the next statement would throw a NullReferenceException. So using FirstOrDefault() instead of First() together with a null check should be done.

Because you are only interested in items where ParentJobId != null you could use this to narrow your flatItems using a Where() like so

foreach (var item in flatItems.Where(i => i.ParentJobId != null))  

which will reduce the horizontal spacing by one level.

Because you aren’t using any List<T> specific methods you should change the arguments type to an IList<T>.

Summing this up will lead to

public static JobObject BuildTreeAndReturnRootNode(IList<JobObject> flatItems)
{
    if (flatItems == null) { throw new ArgumentNullException("flatItems"); }
    if (flatItems.Count == 0 ) { return null; }

    var byIdLookup = flatItems.ToLookup(i => i.JobId);
    foreach (var item in flatItems.Where(i => i.ParentJobId != null))
    {
        var parent = byIdLookup[item.ParentJobId.Value].FirstOrDefault();

        if (parent == null) { continue; }

        parent.Children.Add(item);

        if (item.LinkJobId != null)
        {
            var child = byIdLookup[item.LinkJobId.Value].FirstOrDefault() 

            if (child != null && child.ParentJobId == null)
            {
                item.Children.Add(child);
            }
        }
    }
    return flatItems.FirstOrDefault(i => i.ParentJobId == null);
}

ExecuteJob()

The while loop can be improved by using the condition runner.Children.Count > 0 as well. This will eleminate the needing of the first if statement.

Instead of having an if with a continue and another if you should just use a else.

Using FirstOrDefault() if you are sure that the collection contains an item is not needed. Just change it to First() like so

while (runner?.Children != null && runner.Children.Count > 0)
{

    if (runner.Children.Count == 1)
    {
        runner = runner.Children.First();
    } 
    else
    {
        ExecuteSeriesRun(root, runner.JobId, ref statusInfos);
        root = runner = ExecuteParallelRun(runner.Children, ref statusInfos);
    }
}

But hey this method is doing to much because it is aranging the runner‘s, executing them and print the result of the statusInfos. At least the least the printing should be done somewhere else.

Leave a Reply

Your email address will not be published. Required fields are marked *