MultiThreadingProtectorAspect

392735_2313298187894_683652837_n[1]Hi, Have you ever wants to protect your method against many concurrent thread in multi-threading environment? Let’s say you have WCF service implementation. In that implementation, the best practice is to create a static class with static methods because of performance (static is about 40% faster than non-static). Oh, right, and let’s say that you need to access something, for example, a file stored in that file, or you have a field that should be protected, or you can simply want to write everything without locking. That last reason is key to success in concurrent backend systems. Of course, you can use locking. That’s the easiest and lazy way. But you can also think twice and create a protector that does not buffer anything but is ready for concurrent invocation and can protect your resource from access by many threads simultaneously. So I think about such kind of usage you can see below.

using System;
using System.Diagnostics;
using System.Globalization;
using System.Threading.Tasks;
namespace MultiThreadingProtectorAspect
{
  internal class Program
  {
    private static int _count;
    private static readonly MultiThreadingProtectorAspect<string>
      _protector = new MultiThreadingProtectorAspect<string>
                   (ProtectForManyThreads, HandleException);
    static void Main() {
      _count = 0;
      Console.WriteLine("==> 1 mln of invocations start.");
      var meter = Stopwatch.StartNew();
      Parallel.For(
          0,
          1000000,
          i => ServiceMethod(i.ToString()));
      meter.Stop();
      Console.WriteLine("==> 1 mln of invocations took {0} ms, count field equals {1}.",
        meter.ElapsedMilliseconds,
        _count);
      Console.WriteLine("Press any key to close...");
      Console.ReadKey();
    }
    public static void ServiceMethod(string arg)
    {
      _protector.Invoke(arg);
    }
    static void ProtectForManyThreads(string arg) {
      _count++;
      if (_count == 250000)
        throw new InvalidOperationException("Booo!");
    }
    static void HandleException(Exception exception) {
      var color = Console.ForegroundColor;
      Console.ForegroundColor = ConsoleColor.Yellow;
      Console.WriteLine(exception.ToString());
      Console.ForegroundColor = color;
    }
  }
}

As you can see I have a _count filed lets say that I need to protect this counter for some reason. If you invoke the method ProtectForManyThreads in “Parallel.For” 1 mln times, you can be sure that incrementing the counter by the ++ operator gives you the wrong count value. It is a classic problem. Very often, the resolution is by interlocked invocation. But I need something much nicer to protect everything, not only this one method. And for that, I invented this aspect. as you can see, the aspect hides the original invocation, takes responsibility in the ServiceMethod method for invocation, and protects our static method. It can be a method in WCF service, and “Parallel.For” was created to simulate concurrent invocation. So you are probably wondering about the output of this program and about implementation. I start with output, and it looks like it is shown below.

==> 1 mln of invocations start.
System.InvalidOperationException: Booo!
   at MultiThreadingProtectorAspect.Program.ProtectForManyThreads(String arg) in D:\Projekty\MultiThreadingProtectorAspect\MultiThreadingProtectorAspect\Program.cs:line 42
   at MultiThreadingProtectorAspect.MultiThreadingProtectorAspect`1.NestedInvoker`1.Invoke() in D:\Projekty\MultiThreadingProtectorAspect\MultiThreadingProtectorAspect\MultiThreadingProtectorAspect.cs:line 69
==> 1 mln of invocations took 608 ms, count field equals 1000000.
Press any key to close...

I have very slow laptop but event on it 1 million invocations took about 600 milliseconds that means that per 1 invocation you need about 0,0006 milliseconds. So, it is pretty fast. And moreover, you can see that count filed is concurrent, so nothing was loosed. Ok, so as I promised just before, this is an implementation of this aspect below.

using System;
using System.Collections.Concurrent;
using System.Threading;
namespace MultiThreadingProtectorAspect
{
  public class MultiThreadingProtectorAspect<T> : IDisposable
  {
    private readonly Action<T> _invoker;
    private readonly Action<Exception> _exceptionHandler;
    private bool _disposed;
    public MultiThreadingProtectorAspect(
        Action<T> invoker,
        Action<Exception> exceptionHandler = null) {
      _invoker = invoker;
      _exceptionHandler = exceptionHandler;
      _disposed = false;
      new Thread(Consume) { IsBackground = true }.Start();
    }
    private readonly ConcurrentQueue<NestedInvoker<T>> _invokeQueue
        = new ConcurrentQueue<NestedInvoker<T>>();
    public void Invoke(T arg) {
      var nestedInvoker = new NestedInvoker<T>(arg, _invoker, _exceptionHandler);
      _invokeQueue.Enqueue(nestedInvoker);
      nestedInvoker.Wait();
    }
    public void InvokeAsync(T arg) {
      var nestedInvoker = new NestedInvoker<T>(arg, _invoker, _exceptionHandler);
      _invokeQueue.Enqueue(nestedInvoker);
    }
    private void Consume() {
      NestedInvoker<T> invoker;
      while (!_disposed)
        if (SpinWait.SpinUntil(() => !_invokeQueue.IsEmpty, 100) &&
            _invokeQueue.TryDequeue(out invoker))
          invoker.Invoke();
    }
    public void Dispose() {
      _disposed = true;
    }
    private class NestedInvoker<TItem>
    {
      private readonly TItem _item;
      private readonly Action<TItem> _invoker;
      private readonly Action<Exception> _exceptionHandler;
      private bool _invoked;
      public NestedInvoker(
          TItem item,
          Action<TItem> invoker,
          Action<Exception> exceptionHandler = null) {
        _invoker = invoker;
        _item = item;
        _exceptionHandler = exceptionHandler;
      }
      public void Invoke() {
        try {
          _invoker.Invoke(_item);
        } catch (Exception exception) {
          if (_exceptionHandler != null)
            _exceptionHandler.Invoke(exception);
        } finally {
          _invoked = true;
        }
      }
      public void Wait() {
        SpinWait.SpinUntil(() => _invoked);
      }
    }
  }
}

How it is build? The Source of the solution is based on ConcurrentQueue<T> which is my favorite concurrent collection in .NET 4.0 because of one pretty amazing feature you can add to the queue elements without locking, and it always works, except, of course, OutOfMemmory exception situation :). Ok, let me return to the subject. I used this queue and freed the queue one by one element synchronically. If you study this example carefully, you can see no buffer, even with the queue. Of course, you can say it is not parallel because it works synchronically, but it is only part of the truth. Most important, the order of the invocation is protection without locking. Now it is time for you please examine how fast your solution will be with lock and 100 or later 500 concurrent threads? Please let me know about the result of that experiment, and then I will tell you why you observed such behavior. Another exercise is to protect the method that returns the value. But I am sure I was shown you how you can easily pass through this last problem. I hope you enjoy this entry.

P ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.