Hi today I created powerful combination of performance aspects. First MemoryManagerAspect<T> and second ThreadsManagerAspect. Both of that ideas are for high-speed services created on multicore systems. I will show you code that produce following output that can be a proof that it is possible to create very low latency and asynchronous code in C#.
Async invoke 2500000 times took 821 ms, means 3045067 per second. Async invoke 2500000 times took 811 ms, means 3082614 per second. Async invoke 2500000 times took 813 ms, means 3075031 per second. Async invoke 2500000 times took 800 ms, means 3125000 per second. Async invoke 2500000 times took 836 ms, means 2990431 per second. Async invoke 2500000 times took 814 ms, means 3071253 per second. Async invoke 2500000 times took 822 ms, means 3041363 per second. Async invoke 2500000 times took 826 ms, means 3026634 per second. Async invoke 2500000 times took 810 ms, means 3086420 per second. Async invoke 2500000 times took 813 ms, means 3075031 per second. Press any key to close...
This code was produced by Idea of two performance aspects. First, manages memory to help in garbage collector work, and second manages threads. Test Code looks like below.
namespace PerformanceManagersSandbox { using System; using System.Diagnostics; using System.Threading; class TestProgram { static void Main(string[] args) { var pool = new ThreadsManagerAspect(BackgroundAction); var count = 2500000; for (var i = 0; i < 10; ++i) Test(ref pool, count); Console.WriteLine("Press any key to close..."); Console.ReadKey(); } static int invokeCount = 0; static void Wait(int count) { SpinWait.SpinUntil(() => invokeCount == count); invokeCount = 0; } static void Test(ref ThreadsManagerAspect pool, int count) { var meter = Stopwatch.StartNew(); for (var i = 0; i < count; ++i) pool.InvokeAsync(); Wait(count); meter.Stop(); Console.WriteLine( "Async invoke {0} times took {1} ms,{2}means {3} per second.", count, meter.ElapsedMilliseconds, Environment.NewLine, Math.Round((1d * count / meter.ElapsedMilliseconds) * 1000d), 4); } static void BackgroundAction() { Interlocked.Increment(ref invokeCount); } } }
It is very similar to the code from yesterday, but the implementation is completely new fresh stuff came from my head about an hour ago, and it looks like the below. I will show both of that pieces of code in one section because the real strength is in the correct combination of the two of them.
namespace PerformanceManagersSandbox { using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; public class MemoryManagerAspect<T> { private readonly ConcurrentQueue<T> dataStore = new ConcurrentQueue<T>(); private readonly Func<T> funcCreate; private readonly int poolSize; public MemoryManagerAspect(Func<T> funcCreate, int poolSize) { this.funcCreate = funcCreate; this.poolSize = poolSize; for (var i = 0; i < poolSize; ++i) { var data = funcCreate.Invoke(); dataStore.Enqueue(data); } } public void Create(out T data) { T dataRestored; RETRY: if (dataStore.IsEmpty) SpinWait.SpinUntil(() => !dataStore.IsEmpty); if (dataStore.TryDequeue(out dataRestored)) { data = dataRestored; } else { goto RETRY; } } public void Reuse(ref T data) { dataStore.Enqueue(data); } } public class ThreadsManagerAspect { private static readonly MemoryManagerAspect<NestedInvoker> invokeStore; private static int workerThreads, completionPortThreads, instancesCount, procesorCount; static ThreadsManagerAspect() { procesorCount = Environment.ProcessorCount; invokeStore = new MemoryManagerAspect<NestedInvoker> (InvokerCreate, 512 * procesorCount); ThreadPool.GetMaxThreads( out workerThreads, out completionPortThreads); } private static List<Action<Exception>> ActionExceptions = new List<Action<Exception>>(); private static NestedInvoker InvokerCreate() { return new NestedInvoker(ReuseThread, ActionExceptions); } private readonly Action actionAsync; public ThreadsManagerAspect( Action actionAsync, Action<Exception> actionException = null) { this.actionAsync = actionAsync; if (actionException != null) ActionExceptions.Add(actionException); Interlocked.Increment(ref instancesCount); ThreadPool.SetMaxThreads( 32 * procesorCount * instancesCount, completionPortThreads); } ~ThreadsManagerAspect() { ThreadPool.SetMaxThreads( 32 * procesorCount * instancesCount, completionPortThreads); Interlocked.Decrement(ref instancesCount); } private static void ReuseThread(NestedInvoker invoker) { invokeStore.Reuse(ref invoker); } private void CreateThread(out NestedInvoker invoker) { invokeStore.Create(out invoker); } private void Invoker(ref NestedInvoker invoker, ref Action action) { invoker.InvokeAsync(action); } public void InvokeAsync() { if (actionAsync == null) throw new InvalidOperationException( "There is no default Action defined in a constructor."); NestedInvoker invoker; Action action = actionAsync; CreateThread(out invoker); Invoker(ref invoker, ref action); } public void InvokeAsync(ref Action invokeAction) { NestedInvoker invoker; Action action = invokeAction; CreateThread(out invoker); Invoker(ref invoker, ref action); } private class NestedInvoker { private readonly Action<NestedInvoker> actionReuse; private readonly List<Action<Exception>> actionExceptions; public NestedInvoker( Action<NestedInvoker> actionReuse, List<Action<Exception>> actionExceptions) { this.actionReuse = actionReuse; this.actionExceptions = actionExceptions; } private void InvokeSync(object action) { Action actionAsync = (Action)action; try { actionAsync.Invoke(); actionReuse.Invoke(this); } catch (Exception ex) { foreach (var actionException in actionExceptions) { try { actionException.Invoke(ex); } catch { } } } } public void InvokeAsync(Action action) { ThreadPool.QueueUserWorkItem(InvokeSync, action); } } } }
P ;).