MultiThreadingProtectorAspect

sowaHi, today I would like to share with you another aspect for protection service method in multi-threading environment. I also examine how fast this solution is and how much CPU time it consumes. Today’s implementation is extended for most popular usage, and it is a method that returns a value and is still protected. I will try to show you a test program that will be a kind of console application invoker. This tester will use a multi-threaded service invoker. I will show you two examples of services that implement the same interface. And then, I will show you my protector idea class implementation. And at the end, I will present the benchmark results and try to make some notes and conclusions for you. There is a lot to cover, so let’s start with our test application benchmark. It looks like what is shown below.

using System;
using System.Diagnostics;
namespace MultiThreadingProtectorAspectSandbox
{
  class Program
  {
    static void Main()
    {
      int numThreads = 250;
      int count = 1000000;
      Console.WriteLine("==> Experiment with {0} threads <==", numThreads);
      var meter = new Stopwatch();
      Console.WriteLine("==> 1 mln of locked invocations start.");
      meter.Start();
      var countWithLock = ServiceInvokerTester<TestServiceWithLock>.RunServiceTest(
            new TestServiceWithLock(),
            numThreads,
            count);
      meter.Stop();
      Console.WriteLine("==> locked invocation took {0} us, count field equals {1}.",
        meter.ElapsedMilliseconds,
        countWithLock);
      meter.Reset();
      Console.WriteLine("==> 1 mln of unlocked invocations start.");
      meter.Start();
      var countWithoutLock = ServiceInvokerTester<TestServiceWithoutLock>.RunServiceTest(
            new TestServiceWithoutLock(),
            numThreads,
            count);
      meter.Stop();
      Console.WriteLine("==> unlocked invocation took {0} us, count field equals {1}.",
        meter.ElapsedMilliseconds,
        countWithoutLock);
      meter.Reset();
      Console.WriteLine("Press any key to close...");
      Console.ReadKey();
    }
  }
}

As you can see I use ServiceInvokerTester that run static method in 10 threads. It is a kind of light service invoker simulator like that in WCF or in other interoperability open source implementations like Thrift, Zero ICE, ZeroMQ, or something similar. My tester invoker code looks like is shown below.

using System;
using System.Threading;
namespace MultiThreadingProtectorAspectSandbox
{
  public class ServiceInvokerTester<T> where T : ITestService
  {
    public static int RunServiceTest(T service, int numThreads, int count)
    {
      ThreadRunner(service.ServiceMethod, numThreads, count);
      SpinWait.SpinUntil(() => count == service.GetCount());
      return service.GetCount();
    }
    static void ThreadRunner(Func<int, int> invoker, int numThreads, int count)
    {
      var threads = new Thread[numThreads];
      for (var i = 0; i < numThreads; ++i)
      {
        threads[i] = new Thread(
        () =>
        {
          var inv = invoker;
          var min = ((count / numThreads) * i);
          var max = ((count / numThreads) * (i + 1));
          Runner(inv, min, max);
        }
        );
      }
      for (var i = numThreads - 1; i >= 0; --i)
        threads[i].Start();
    }
    static void Runner(Func<int, int> invoker, int min, int max)
    {
      for (var arg = min; arg < max; ++arg)
        invoker(arg);
    }
  }
}

As you can see I use trivial partitioning of invocations and waiting for end up of the service invocations status. You probably wonder about ITestService interface implementation. It is shown below.

namespace MultiThreadingProtectorAspectSandbox
{
  public interface ITestService
  {
    int GetCount();
    int ServiceMethod(int arg);
  }
}

There is a kind of public contract service interface with two methods. The first one, called GetCount, reads the service state, and the next one, named ServiceMethod, does some operations and probably all of you may guess that the second method changes the service state by the first method. Ok, this may be a little difficult at this moment, but I am sure it will be evident when you see two implementations. The first one that uses locking is shown below.

using System;
using System.Runtime.CompilerServices;
namespace MultiThreadingProtectorAspectSandbox
{
  public class TestServiceWithLock : ITestService
  {
    private int _count;
    public TestServiceWithLock()
    {
      _count = 0;
    }
    [MethodImpl(MethodImplOptions.Synchronized)]
    public int ServiceMethod(int arg)
    {
      try
      {
        return ProtectForManyThreads(arg);
      }
      catch (Exception exception)
      {
        HandleException(exception);
      }
      return default(int);
    }
    public int GetCount()
    {
      return _count;
    }
    private void HandleException(Exception exception)
    {
      Console.WriteLine(exception.ToString());
    }
    private int ProtectForManyThreads(int arg)
    {
      // slow down about 600 us.
      for (var i = 0; i < 1000; ++i)
        ;
      _count++;
      if (_count == 250000)
        throw new InvalidOperationException(string.Format("Booo! at {0}", arg));
      return arg + 1;
    }
  }
}

I think that above implementation is very trivial. And as you can see, for the state of the service, I used a _count field that needs to be protected for changes with the lock as a protector. The above implementation shows you implementation with locking by the MethodImpl attribute. it is the best way to lock the entire method from the performance perspective. I also prepared a service implementation with a protector, shown below.

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace MultiThreadingProtectorAspectSandbox
{
  public class TestServiceWithoutLock : ITestService
  {
    private readonly MultiThreadingProtectorAspect<int, int>
      _protector;
    private int _count;
    public TestServiceWithoutLock()
    {
      _protector = new MultiThreadingProtectorAspect<int, int>
                   (ProtectForManyThreads, HandleException);
      _count = 0;
    }
    public int ServiceMethod(int arg)
    {
      return _protector.Invoke(arg);
    }
    public int GetCount()
    {
      return _count;
    }
    private void HandleException(Exception exception)
    {
      Console.WriteLine(exception.ToString());
    }
    private int ProtectForManyThreads(int arg)
    {
      // slow down about 600 us.
      for (var i = 0; i < 1000; ++i)
        ;
      _count++;
      if (_count == 250000)
        throw new InvalidOperationException(string.Format("Booo! at {0}", arg));
      return arg + 1;
    }
  }
}

Ok, and that is it. That is test implementation. I hope you are curios about the results :). the results of invocation is show below.

==> Experiment with 250 threads <==
==> 1 mln of locked invocations start.
System.InvalidOperationException: Booo! at 1000470
   at MultiThreadingProtectorAspectSandbox.TestServiceWithLock.ProtectForManyThreads(Int32 arg) in C:\Stuff\Projects\MultiThreadingProtectorAspectSandbox\MultiThreadingProtectorAspectSandbox\TestServiceWithLock.cs:line 47
   at MultiThreadingProtectorAspectSandbox.TestServiceWithLock.ServiceMethod(Int32 arg) in C:\Stuff\Projects\MultiThreadingProtectorAspectSandbox\MultiThreadingProtectorAspectSandbox\TestServiceWithLock.cs:line 20
==> locked invocation took 893 us, count field equals 1000000.
==> 1 mln of unlocked invocations start.
System.InvalidOperationException: Booo! at 1000663
   at MultiThreadingProtectorAspectSandbox.TestServiceWithoutLock.ProtectForManyThreads(Int32 arg) in C:\Stuff\Projects\MultiThreadingProtectorAspectSandbox\MultiThreadingProtectorAspectSandbox\TestServiceWithoutLock.cs:line 45
   at MultiThreadingProtectorAspectSandbox.MultiThreadingProtectorAspect`2.NestedInvoker`2.Invoke() in C:\Stuff\Projects\MultiThreadingProtectorAspectSandbox\MultiThreadingProtectorAspectSandbox\MultiThreadingProtectorAspect.cs:line 94
==> unlocked invocation took 1077 us, count field equals 1000000.
Press any key to close...

So before conclusions I will show you a protector implementation below.

using System;
using System.Collections.Concurrent;
using System.Threading;
namespace MultiThreadingProtectorAspectSandbox
{
  public class MultiThreadingProtectorAspect<T, TResult> : IDisposable
  {
    private readonly Func<T, TResult> _invoker;
    private readonly ThreadLocal<NestedInvoker<T, TResult>> _localNestedInvoker;
    private readonly Action<Exception> _exceptionHandler;
    private bool _disposed;
    public MultiThreadingProtectorAspect(
        Func<T, TResult> invoker,
        Action<Exception> exceptionHandler = null) {
      _invoker = invoker;
      _exceptionHandler = exceptionHandler;
      _disposed = false;
      _localNestedInvoker = new ThreadLocal<NestedInvoker<T, TResult>>
          (() => new NestedInvoker<T, TResult>(_invoker, _exceptionHandler));
      new Thread(Consume) { IsBackground = true }.Start();
    }
    private readonly ConcurrentQueue<NestedInvoker<T, TResult>> _nestedInvokers
        = new ConcurrentQueue<NestedInvoker<T, TResult>>();
    public TResult Invoke(T arg) {
      var nestedInvoker = _localNestedInvoker.Value;
      nestedInvoker.SetItem(arg);
      _nestedInvokers.Enqueue(nestedInvoker);
      nestedInvoker.Wait();
      return nestedInvoker.GetResult();
    }
    public Tuple<Action, Func<TResult>> BeginInvoke(T arg) {
      var nestedInvoker = new NestedInvoker<T, TResult>(_invoker, _exceptionHandler);
      nestedInvoker.SetItem(arg);
      _nestedInvokers.Enqueue(nestedInvoker);
      return new Tuple<Action,Func<TResult>>(nestedInvoker.Wait,
        nestedInvoker.GetResult);
    }
    public TResult EndInvoke(Tuple<Action, Func<TResult>> asyncState) {
      var waiter = asyncState.Item1;
      var result = asyncState.Item2;
      waiter.Invoke();
      return result.Invoke();
    }
    private void Consume() {
      NestedInvoker<T, TResult> invoker;
      while (!_disposed)
        if (SpinWait.SpinUntil(() => !_nestedInvokers.IsEmpty, 100) &&
            _nestedInvokers.TryDequeue(out invoker))
          invoker.Invoke();
    }
    public void Dispose() {
      _disposed = true;
    }
    private class NestedInvoker<TItem, TItemResult>
    {
      private TItem _item;
      private TItemResult _result;
      private readonly Func<TItem, TItemResult> _invoker;
      private readonly Action<Exception> _exceptionHandler;
      private bool _invoked;
      public NestedInvoker(
          Func<TItem, TItemResult> invoker,
          Action<Exception> exceptionHandler = null) {
        _invoker = invoker;
        _exceptionHandler = exceptionHandler;
      }
      public void Invoke() {
        try {
          _result = _invoker.Invoke(_item);
        } catch (Exception exception) {
          _result = default(TItemResult);
          if (_exceptionHandler != null)
            _exceptionHandler.Invoke(exception);
        } finally {
          _invoked = true;
        }
      }
      public void SetItem(TItem value) { _item = value; }
      public TItemResult GetResult() { return _result; }
      public void Wait() {
        SpinWait.SpinUntil(() => _invoked);
        _invoked = false;
      }
    }
  }
}

Ok, that last piece of code is a protector aspect. I try to not write a saga about it. I will try to focus only on the results. One very experienced Technical Architect told me once that when I am using the “SpinWait.SpinUntil” and ConcurrentQueue<T> I do probably something wrong. I cannot agree with that. As you can see, the results are excellent, and the method has almost the same speed as locking. That means you can prepare high-speed implementations in this way for your services. The results show how much 1 invocation consumes CPU time, and I think that on my old, slow laptop, about 1000 us by invocation is an excellent result, and in production servers, it should be much better. I hope you enjoy this blog entry. Maybe someday I will win the ultimate edition of Visual Studio to check performance with concurrent profilers on it. Until this time, I have only intuition and console application tests :).

P ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.