Data Model Async

Hi, I saw million times solution with ThreadPool or sometimes event with new async keyword in C# 5.0. And so many times, I want to tell people that they’re doing it in the wrong way. About a half year ago, someone told me I tend to keep too much only for me. I am trying to change this. So, what is the best way to do async? It uses a data model and publisher consumer dispatching pattern with ConcurrentQueue collection and SpinWait.SpinUntil method. And how to wait for async invocation? The best way is to use ConcurrentDictionary with the Guid key as a transaction id. So let me show you simple example and analyze all benefits for you.

namespace DataModelAsyncSandbox
{
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    public class AsyncInvoker<T , TResult> : IDisposable
    {
        readonly ConcurrentQueue<tuple <Guid, T>> consumeProcessingQueue;
        readonly Func<guid , T, TResult> consumeBegin;
        readonly Action</guid><guid , TResult> consumeEnd;
        readonly Action<exception> consumeThrow;
        public AsyncInvoker(
            Func<guid , T, TResult> consumeBegin,
            Action</guid><guid , TResult> consumeEnd,
            Action<exception> consumeThrow)
        {
            if (consumeBegin == null)
            {
                throw new ArgumentNullException("consumeBegin");
            }
            if (consumeEnd == null)
            {
                throw new ArgumentNullException("consumeEnd");
            }
            if (consumeThrow == null)
            {
                throw new ArgumentNullException("consumeThrow");
            }
            this.consumeProcessingQueue = new ConcurrentQueue<tuple <Guid, T>>();
            this.consumeBegin = consumeBegin;
            this.consumeEnd = consumeEnd;
            this.consumeThrow = consumeThrow;
            new Thread(Consume){IsBackground=true}.Start();
        }
        bool disposed;
        void Consume()
        {
            while (!disposed)
            {
                if (SpinWait.SpinUntil(() => !consumeProcessingQueue.IsEmpty, 1000))
                {
                    while (!consumeProcessingQueue.IsEmpty)
                    {
                        Tuple<guid , T> data;
                        if (consumeProcessingQueue.TryDequeue(out data))
                        {
                            try
                            {
                                TResult result = consumeBegin.Invoke(
                                    data.Item1,
                                    data.Item2);
                                consumeEnd.Invoke(
                                    data.Item1,
                                    result);
                            }
                            catch (Exception exception)
                            {
                                try
                                {
                                    consumeThrow.Invoke(exception);
                                }
                                catch { }
                            }
                        }
                    }
                }
            }
        }
        public void InvokeAsync(Guid transactionId, T data)
        {
            consumeProcessingQueue.Enqueue(new Tuple</guid><guid , T>(transactionId, data));
        }
        ~AsyncInvoker()
        {
            disposed = true;
        }
        public void Dispose()
        {
            GC.SuppressFinalize(this);
            disposed = true;
        }
    }
    class TestProgram
    {
        static int ProcessData(Guid transactionId, int input)
        {
            var output = input + 1;
            return input;
        }
        static int count;
        static void ProcessDataEnd(Guid transactionId, int output)
        {
            ++count;
        }
        static void CatchException(Exception exception)
        {
        }
        static void Main(string[] args)
        {
            var invoker = new AsyncInvoker<int , int>(
                ProcessData,ProcessDataEnd,CatchException);
            for (var t = 0; t < 10; ++t)
            {
                count = 0;
                var meter = Stopwatch.StartNew();
                var meterAll = Stopwatch.StartNew();
                Parallel.For(0, 1000000, i =>
                {
                    invoker.InvokeAsync(Guid.NewGuid(), i);
                });
                meter.Stop();
                Console.WriteLine(
                    "1 million async invocations took about {0} ms.",
                    meter.ElapsedMilliseconds);
                SpinWait.SpinUntil(() => count == 1000000);
                meterAll.Stop();
                Console.WriteLine(
                    "1 million async invocations processing took about {0} ms.",
                    meter.ElapsedMilliseconds);
            }
        }
    }
}

As you can see I am using only ConcurrentQueue as a data model for async operations. And it scales very well. You may say. Ok, but you invoke everything synchronically. That’s right. Moreover, it guarantees not only invocation with the order but also invocation in thread-safe mode. So, I do not need to put any lock in my code, and it is truly async. So, I hope you know how to build things that correspond with the previous post? I almost forget… is that truly a multi-core solution? It is when you have, for example, 64 such kinds of invokers in the entire application that it scales forever. And the best part is that you have a constant number of threads during the entire runtime of your apps. So it is cool, but if you want to have many threads but still constant numbers, just create them all in the constructor of AsyncInvoker as a loop of last like that creates a new Thread… In the end, you can see the result of the invocation above code.

1 million async invocations took about 188 ms.
1 million async invocations processing took about 188 ms.
1 million async invocations took about 186 ms.
1 million async invocations processing took about 186 ms.
1 million async invocations took about 176 ms.
1 million async invocations processing took about 176 ms.
1 million async invocations took about 175 ms.
1 million async invocations processing took about 175 ms.
1 million async invocations took about 178 ms.
1 million async invocations processing took about 178 ms.
1 million async invocations took about 181 ms.
1 million async invocations processing took about 181 ms.
1 million async invocations took about 196 ms.
1 million async invocations processing took about 196 ms.
1 million async invocations took about 186 ms.
1 million async invocations processing took about 186 ms.
1 million async invocations took about 185 ms.
1 million async invocations processing took about 185 ms.
1 million async invocations took about 180 ms.
1 million async invocations processing took about 180 ms.

P ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.