Managing Concurrency With Trees[2]

by Administrator 4. April 2008 02:15

You may want to be familiar with these articles before continuing:

Managing Concurrency With Trees[0]

Managing Concurrency With Trees[1]

Client Code

Now that I’ve built and populated my Tree<IExecutable>, I need to get something to go to work on it.  The client code is very simple:

            Console.WriteLine("=========Task Tree Scheduler=========");

            TaskTreeScheduler scheduler = new TaskTreeScheduler();

            scheduler.HardwareThreadsOnly = false;

            scheduler.Run(workTree);

The last line of code shown here will block until all the work contained in the tree is done.

The TaskTreeScheduler Class

So, how does the work actually get done?

    public class TaskTreeScheduler

    {

        public TaskTreeScheduler()

        {

            _maxThreads = Environment.ProcessorCount;

            _eligible = new Queue<IExecutable>();

            _syncRoot = new object();

        }

 

        private Tree<IExecutable> _work;

        private int _maxThreads;

        private int _runningThreads;

        private object _syncRoot;

        private Queue<IExecutable> _eligible;

As you can already see, we use the Tree structure to describe dependencies, but we will be slowly creating a flattened view into the tree using a Queue<IExecutable>.  The next two methods show how we get some threads started:

 

        private void AddEligible(IExecutable task)

        {

            lock (_syncRoot)

            {

                _eligible.Enqueue(task);

                Console.WriteLine("{0} tasks elibible to run", _eligible.Count);

            }

        }

 

        public void Run(Tree<IExecutable> workTree)

        {

            _work = workTree;

            AddEligible(_work.Value);

            ThreadStart ts = new ThreadStart(RunTrafficCop);

            Thread cop = new Thread(ts);

            cop.Name = "TrafficCop";

            cop.Start();

            cop.Join();

        }

When we pass a Tree<IExecutable> to Run, a “traffic cop” thread gets started.  The traffic cop could be the UI thread, however I wanted to allow the UI to block or go about it’s business, so Run uses Thread.Join().  The first item in the Queue is the Value of the Root of the tree, and we print some simple debug information to help visualize what goes on as work happens.  The entire RunTrafficCop method is fairly short:

        public void RunTrafficCop()

        {

            while (true)

            {

                int work = 0;

                lock (_syncRoot)

                {

                    work = _eligible.Count;

                }

                bool allComplete = _work.TrueForAll(new Predicate<IExecutable>(Complete));

                while (work > 0 || !allComplete)

                {

                    IExecutable task = null;

                    if (_eligible.Count > 0)

                    {

                        Console.WriteLine("***Waiting for work***");

                        task = _eligible.Dequeue();                       

                    }

                    else

                    {

                        Thread.Sleep(2);//Traffic cop sleeps

                        allComplete = _work.TrueForAll(new Predicate<IExecutable>(Complete));

                        continue;

                    }

                    StartThread(task);

 

                    lock (_syncRoot)

                    {

                        work = _eligible.Count;

                    }

                    allComplete = _work.TrueForAll(new Predicate<IExecutable>(Complete));

                }

                return;

            }

        }

We could do with one less while() loop here, but here’s what is going on.  The root task will be in the queue of eligible work.  As the traffic cop iterates, it will either Dequue() the next item that can be executed or enter a very brief wait state and check the work in the tree using the Predicate<IExecutable> to check the IsComplete flag of each instance.  If work is found, we call StartThread:

        public void StartThread(IExecutable task)

        {

            Interlocked.Increment(ref _runningThreads);

            PrintThreadUsage();

            task.Complete = new CompleteCallback(OnTaskComplete);

            ThreadStart ts = new ThreadStart(task.Execute);

            Thread t = new Thread(ts);

            t.Name = task.FriendlyClassName();

            t.Start();

        }

So here we’ve started a thread for an eligible-to-run IExecutable instance. This is where the light plumbin of the IExecutable contract comes into play and we see the expected behavior of implementing classes:

        public override void Execute()

        {

            base.Execute();

            Console.WriteLine("OddJob::Execute");

            Spin(2500);

            IsComplete = true;

            Complete(this);

        }

Spin(int) exists on the base class and merely uses Thread.Sleep to simulate some work being done for 2500 milliseconds.  When the work is done we set IsComplete to true and execute the callback delegate supplied by the TaskTreeScheduler.  The remaining interesting code lives in TaskTreeScheduler.OnTaskComplete:

        public void OnTaskComplete(IExecutable task)

        {

            StopThread();

            Tree<IExecutable> childTree = _work.FindTreeByValue(task);

            lock (_syncRoot)

            {

                foreach (Tree<IExecutable> tree in childTree.ChildNodes)

                {

                    _eligible.Enqueue(tree.Value);

                }

            }

        }

OnTaskComplete finds the Tree whose Value is the task that just finished executing.  Now that the Parent is done executing, any children are eligible to run, and when they complete their children will be eligible to run, and so forth.  Running the program now gives me output like this:

I realize there are a few implementation details I left out.  I will post the Solution in a .zip file eventually.  The static structure of the solution is as follows:

The next post wraps up my initial thoughts in this realm although I’m sure I’ll come back to this.  I have alluded to some experimentation with the Task Parallel library in addition to my custom code, the next part deals with the TPL.

Tags:

Add comment




  Country flag
biuquote
  • Comment
  • Preview
Loading


About the author

Damon Payne is a Microsoft MVP specializing in Smart Client solution architecture. 

INETA Community Speakers Program

Month List

Page List

flickr photostream