ParallelExecutorAspect in C#

imageHi, today I would like to share with you idea of right parallel execution of actions/methods. Imagine at the beginning that you have a queue of messages and even some threads that, in parallel, get messages from the queue and then do background work. If that message executes actions on the relational database, you will quickly find that some of your work is done right. Still, you get timeouts, deadlocks, or any other errors that are difficult to repeat. That mostly is because you parallel actions that want to use the same data resources. Now you can go to your data architect and application architect and ask them to start using some strong transaction isolation levels. Still, the only thing you get will be a lot of resources consumed in your database environment. To solve that issue, I want to present to you ParallelExecutorAspect, which can automatically figure out what type of work can be parallelized. Of course, different actions can be and cannot be executed in parallel, right? And to say this aspect what is different and what is the same, you can just pass some keys to the Execute method that characterize your piece of work. For example, Queue Name, Customer Name, and Action Name. After that, all magic starts to happen, and you quickly discover that all your work is done without database interaction errors.

namespace ParallelExecutorAspect
{
    using System;
    using System.Collections.Concurrent;
    using System.Globalization;
    using System.Linq;
    using System.Runtime.CompilerServices;
    using System.Threading;
    public class ParallelExecutorAspect<T, U>
    {
        class ParallelExecutorUnit<TU, TW>
        {
            private readonly ParallelExecutorUnit<TU, TW> parent;
            private readonly ConcurrentDictionary<string, ParallelExecutorUnit<TU, TW>> node;
            private readonly ConcurrentQueue<Tuple<Func<TU, TW>, TU>> queue;
            private readonly int maxThreadLifeTimeout;
            private readonly int maxThreadCount;
            private readonly Action<Exception> exceptionAction;
            private bool unitIsActive;
            private bool unitIsWorking;
            private int unitCount;
            internal ParallelExecutorUnit(
                int maxThreadLifeTimeout,
                int maxThreadCount,
                Action<Exception> exceptionAction,
                ParallelExecutorUnit<TU, TW> parent)
            {
                this.maxThreadLifeTimeout = maxThreadLifeTimeout;
                this.maxThreadCount = maxThreadCount;
                this.exceptionAction = exceptionAction;
                this.parent = parent ?? this;
                this.unitCount = 0;
                this.node = new ConcurrentDictionary<string, ParallelExecutorUnit<TU, TW>>();
                this.queue = new ConcurrentQueue<Tuple<Func<TU, TW>, TU>>();
            }
            TW ExecuteMethod()
            {
                TW returnValue = default(TW);
                Tuple<Func<TU, TW>, TU> method;
                if (this.queue.TryDequeue(out method))
                {
                    // if you want to try-catch exceptions and implement
                    // the retry solution that is right place to do that.
                    try
                    {
                        returnValue = method.Item1.Invoke(method.Item2);
                    }
                    catch (Exception exception)
                    {
                        try
                        {
                            var exceptionInvoker = this.exceptionAction;
                            if (exceptionInvoker != null)
                            {
                                exceptionInvoker.Invoke(exception);
                            }
                        }
                        catch { }
                    }
                }
                else
                {
                    this.unitIsWorking = false;
                }
                return returnValue;
            }
            void ExecuteQueue(object state)
            {
                try
                {
                    this.unitIsActive = true;
                    Interlocked.Increment(ref this.parent.unitCount);
                    while (this.unitIsWorking)
                    {
                        if (SpinWait.SpinUntil(
                                () => !this.queue.IsEmpty || this.parent.unitCount == this.maxThreadCount,
                                this.maxThreadLifeTimeout
                            )
                        )
                        {
                            if (!this.queue.IsEmpty)
                            {
                                ExecuteMethod();
                            }
                            else
                            {
                                this.unitIsWorking = false;
                            }
                        }
                        else
                        {
                            this.unitIsWorking = false;
                        }
                    }
                }
                finally
                {
                    Interlocked.Decrement(ref this.parent.unitCount);
                    this.unitIsActive = false;
                }
            }
            [MethodImpl(MethodImplOptions.Synchronized)]
            void ReCreateThread()
            {
                if (this.unitIsWorking)
                {
                    return;
                }
                SpinWait.SpinUntil(
                    () => !this.unitIsActive &&
                    this.parent.unitCount < this.maxThreadCount);
                this.unitIsActive = false;
                this.unitIsWorking = true;
                new Thread(ExecuteQueue) { IsBackground = true }.Start();
                SpinWait.SpinUntil(() => this.unitIsActive);
            }
            [MethodImpl(MethodImplOptions.Synchronized)]
            internal ParallelExecutorUnit<TU, TW> GetOrSet(string key)
            {
                ParallelExecutorUnit<TU, TW> value;
                if (this.node.TryGetValue(key, out value))
                {
                    return value;
                }
                value = new ParallelExecutorUnit<TU, TW>(
                    this.maxThreadLifeTimeout,
                    this.maxThreadCount,
                    this.exceptionAction,
                    this.parent);
                this.node.TryAdd(key, value);
                return value;
            }
            internal void ExecuteAsync(Func<TU, TW> executeMethod, TU args)
            {
                SpinWait.SpinUntil(() => this.queue.IsEmpty);
                this.queue.Enqueue(new Tuple<Func<TU, TW>, TU>(executeMethod, args));
                ReCreateThread();
            }
            internal TW ExecuteSync(Func<TU, TW> executeMethod, TU args)
            {
                SpinWait.SpinUntil(() => this.queue.IsEmpty);
                TW returnValue = default(TW);
                bool finished = false;
                this.queue.Enqueue(new Tuple<Func<TU, TW>, TU>(
                    (eargs) =>
                    {
                        returnValue = executeMethod(eargs);
                        finished = true;
                        return returnValue;
                    },
                    args));
                ReCreateThread();
                SpinWait.SpinUntil(() => finished);
                return returnValue;
            }
        }
        private readonly Func<T, U> executeMethod;
        private readonly ParallelExecutorUnit<T, U> tree;
        public ParallelExecutorAspect(
            Func<T, U> executeMethod,
            int maxThreadLifeTimeout = 30000,
            int maxThreadCount = 100,
            Action<Exception> exceptionAction = null)
        {
            this.executeMethod = executeMethod;
            this.tree = new ParallelExecutorUnit<T, U>(
                maxThreadLifeTimeout,
                maxThreadCount,
                exceptionAction,
                null);
        }
        public void ExecuteAsync(T args, params string[] keys)
        {
            var node = keys.Aggregate(this.tree, (current, key) => current.GetOrSet(key));
            node.ExecuteAsync(this.executeMethod, args);
        }
        public U ExecuteSync(T args, params string[] keys)
        {
            var node = keys.Aggregate(this.tree, (current, key) => current.GetOrSet(key));
            return node.ExecuteSync(this.executeMethod, args);
        }
    }
    class TestProgram
    {
        internal class TestEntity
        {
            public string Queue { get; set; }
            public string Customer { get; set; }
            public string Action { get; set; }
            public string Params { get; set; }
        }
        static readonly ParallelExecutorAspect<TestEntity, bool>
            ExecutorTest = new ParallelExecutorAspect<TestEntity, bool>(
            executeMethod: ExecuteTestAction,
            maxThreadLifeTimeout: 5000,
            maxThreadCount: 10,
            exceptionAction: ExceptionTestAction);
        static readonly ParallelExecutorAspect<TestEntity, bool>
            ExecutorPerformanceTest = new ParallelExecutorAspect<TestEntity, bool>(
            executeMethod: ExecutePerformanceAction,
            maxThreadLifeTimeout: 5000,
            maxThreadCount: 10,
            exceptionAction: ExceptionTestAction);
        private static bool ExecuteTestAction(TestEntity entity)
        {
            // lets say that it takes a bit more than 1 second.
            Thread.Sleep(1000);
            // you can do do something with entity.Params
            // in this example or just write the action data.
            Console.WriteLine(
                "Q: {0}\tC: {1}\tA: {2}",
                entity.Queue,
                entity.Customer,
                entity.Action);
            return true;
        }
        private static int _counter;
        private static bool ExecutePerformanceAction(TestEntity entity)
        {
            Interlocked.Increment(ref _counter);
            Console.WriteLine(
                "Q: {0}\tC: {1}\tA: {2}",
                entity.Queue,
                entity.Customer,
                entity.Action);
            return true;
        }
        // can be sahred between invokers.
        private static void ExceptionTestAction(Exception exception)
        {
            // you can log exception here, or send alter, or aggregate it.
        }
        private static void Test()
        {
            var entity = new TestEntity { Queue = "A", Customer = "X", Action = "Add" };
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            entity = new TestEntity { Queue = "A", Customer = "Z", Action = "Upd1" };
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            entity = new TestEntity { Queue = "A", Customer = "Z", Action = "Upd2" };
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            entity = new TestEntity { Queue = "A", Customer = "Z", Action = "Upd3" };
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            entity = new TestEntity { Queue = "A", Customer = "X", Action = "Del" };
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            // even if all threads was finished after 7 seconds.
            Thread.Sleep(7000);
            // you can still use new one for below action.
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            // now lets switch between Threads 20 times.
            Thread.Sleep(2000);
            for (var i = 0; i < 20; ++i)
            {
                var tentity = new TestEntity
                {
                    Queue = "A",
                    Customer = "X",
                    Action =
                    "Action"
                    + i.ToString(CultureInfo.InvariantCulture).PadLeft(2, '0')
                };
                ExecutorTest.ExecuteAsync(tentity,
                    tentity.Queue, tentity.Customer, tentity.Action);
            }
        }
        private static void TestPerformance()
        {
            _counter = 0;
            const int count = 2000;
            for (var i = 0; i < count; ++i)
            {
                var entity = new TestEntity
                {
                    Queue = "A",
                    Customer = "X",
                    Action =
                    "PerfAction"
                    + i.ToString(CultureInfo.InvariantCulture).PadLeft(4, '0')
                };
                ExecutorPerformanceTest.ExecuteAsync(entity,
                    entity.Queue, entity.Customer);
            }
            SpinWait.SpinUntil(() => _counter == count, 30 * 1000);
            Console.WriteLine("Performance Test {0}.",
                              _counter == count ? "PASS" : "FAIL");
        }
        static void Main()
        {
            Console.WriteLine("Test 01 - Proof of Concept");
            Test();
            Thread.Sleep(2000);
            Console.WriteLine("Test 02 - Performance Test");
            TestPerformance();
            Console.ReadKey(true);
        }
    }
}

There is one more thing. ParallelExecutorAspect automatically releases not used Threads and reuse Treads if possible, so your overall execution performance is also better thanks to that solution. And you can replace your classic throttling with parallel threads and use this automatic solution with only 1 thread per queue and max thread count that decide in short, how many threads you have for this aspect, for example in my test shown above, you can control how many parallel customers you have. Last thing is that this aspect is a pure orthogonal solution and does not depends on any functional requirements. Enjoy!

p ;).

Leave a Reply

Your email address will not be published.

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.