WaiterAspect And PublishersConsumerAspect<T>

imageHi, today I am happy to show you a WaiterAspect that has 5 us frequency and PublishersConsumerAspect<T> pattern implementation that has 5 us frequency too. So there are great. Also I want to share with you information that both today presented aspects are created in pure .NET Framework 2.0 and C# 2.0. So it is very easy to use them both in any legacy solution for example for having low latency waiting switching between threads. Or to create protector for multi threading environment with second aspect. So there are great aspects for multithreading and low latency enthusiasts.

Here it is WaiterAspects implementation.

namespace WaiterAspectSandbox
{
  using System;
  using System.Diagnostics;
  using System.Runtime.CompilerServices;
  using System.Threading;
  public class WaiterAspect
  {
    private object locker = new object();
    private volatile bool waiting;
    public void Wait() {
      lock (locker) {
        waiting = true;
        while(waiting)
          Monitor.Wait(locker);
      }
    }
    public void Set() {
      lock (locker) {
        Monitor.Pulse(locker);
        waiting = false;
      }
    }
    public bool IsWaiting { get { return waiting; } }
  }
  class WaiterAspectTests
  {
    static bool WaiterAspectTest(int count) {
      var waiter = new WaiterAspect();
      var counter = 0;
      var t = new Thread(() => {
        for (var i = 0; i < count; ++i) {
          ++counter;
          waiter.Wait();
        }
      });
      t.Start();
      for (var i = 0; i < count; ++i) {
        while (!waiter.IsWaiting)
          Thread.SpinWait(1);
        waiter.Set();
      }
      t.Join();
      return count == counter;
    }
    static void PerformanceWaiterAspectTest(int count) {
      var meter = Stopwatch.StartNew();
      var isPass = WaiterAspectTest(count);
      meter.Stop();
      Console.WriteLine(
        "WaiterAspectTest: {0} and it took {1} ms for {2} times.",
        isPass ? "PASS" : "FAILD",
        meter.ElapsedMilliseconds,
        count);
    }
    static void Main(string[] args) {
      for (var i = 0; i < 10; ++i)
        PerformanceWaiterAspectTest(1000000);
      Console.ReadKey(true);
    }
  }
}

And below it is a performance test output.

WaiterAspectTest: PASS and it took 3672 ms for 1000000 times.
WaiterAspectTest: PASS and it took 4221 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3690 ms for 1000000 times.
WaiterAspectTest: PASS and it took 4146 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3717 ms for 1000000 times.
WaiterAspectTest: PASS and it took 4213 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3716 ms for 1000000 times.
WaiterAspectTest: PASS and it took 4311 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3743 ms for 1000000 times.
WaiterAspectTest: PASS and it took 4092 ms for 1000000 times.

Here it is PublishersConsumerAspect<T> implementation.

namespace PublishersConsumerAspectSandbox
{
  using System;
  using System.Threading;
  using System.Collections.Generic;
  using System.Diagnostics;
  public class PublishersConsumerAspect<T> : IDisposable
  {
    readonly Thread worker;
    readonly Action<T> invokeAction;
    readonly Action<Exception> exceptionAction;
    public PublishersConsumerAspect(
      Action<T> invokeAction,
      Action<Exception> exceptionAction) {
      this.invokeAction = invokeAction;
      this.exceptionAction = exceptionAction;
      worker = new Thread(Consumer) { IsBackground = true };
      worker.Start();
    }
    volatile bool working = true;
    public void Dispose() {
      working = false;
      worker.Join();
    }
    readonly object locker = new object();
    readonly Queue<T> actions = new Queue<T>();
    public void Publish(T item) {
      lock (locker) {
        actions.Enqueue(item);
        Monitor.Pulse(locker);
      }
    }
    void Consumer() {
      while (working) {
        T item;
        lock (locker) {
          while (actions.Count == 0) Monitor.Wait(locker);
          item = actions.Dequeue();
        }
        if (item != null) {
          try {
            invokeAction.Invoke(item);
          }
          catch (Exception ex) {
            try {
              exceptionAction.Invoke(ex);
            }
            catch { }
          }
        }
      }
    }
  }
  class PublishersConsumerAspectTest
  {
    static void ParallelFor(int min, int max, Action<int> invoker) {
      for (var i = min; i < max; ++i)
        invoker.Invoke(i);
    }
    static void PerformanceWaiterAspectTest(int count) {
      int counter = 0;
      PublishersConsumerAspect<int> commandsInvoker = new PublishersConsumerAspect<int>(
        i => { Interlocked.Increment(ref counter); },
        e => { Console.WriteLine(e.ToString()); }
      );
      var procesorCount = Environment.ProcessorCount * 5;
      ThreadPool.SetMinThreads(procesorCount, procesorCount);
      var meter = Stopwatch.StartNew();
      for (var min = 0; min < count; min += count / procesorCount) {
        ThreadPool.QueueUserWorkItem(delegate {
          var max = min + count / procesorCount;
          for (var i = min; i < max; ++i)
            commandsInvoker.Publish(i);
        });
      }
      while (counter != count) Thread.SpinWait(1);
      meter.Stop();
      Console.WriteLine(
        "PublishersConsumerAspectTest: {0} and it took {1} ms for {2} times.",
        counter == count ? "PASS" : "FAIL",
        meter.ElapsedMilliseconds,
        count);
    }
    static void Main(string[] args) {
      for (var i = 0; i < 10; ++i)
        PerformanceWaiterAspectTest(1000000);
      Console.ReadKey(true);
    }
  }
}

And below it is a performance test output.

PublishersConsumerAspectTest: PASS and it took 6216 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 5573 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 3269 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 4008 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 4338 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 8090 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 4995 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 5078 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 5633 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 6008 ms for 1000000 times.

No big deal yes? So lets try compile this solution on .NET 4.5 😉

WiterAspect

WaiterAspectTest: PASS and it took 3828 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3837 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3697 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3919 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3678 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3783 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3575 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3815 ms for 1000000 times.
WaiterAspectTest: PASS and it took 4076 ms for 1000000 times.
WaiterAspectTest: PASS and it took 3521 ms for 1000000 times.

PublishersConsumerAspect<T>

PublishersConsumerAspectTest: PASS and it took 248 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 255 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 287 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 227 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 291 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 222 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 225 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 338 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 224 ms for 1000000 times.
PublishersConsumerAspectTest: PASS and it took 229 ms for 1000000 times.

Do you know why? Anyway… I hope you enjoy new aspects idea.

p ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.