Hi, today I will show you a special concurrent collection created with Publisher->Consumer pattern. Below I will show you a C# 4.0 code with a small benchmark. Enjoy :).
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace PublisherConsumerCollection { public class PublisherConsumerCollection<T> : IList<T>, IDisposable { private enum MessageKind { Insert, InsertAt, RemoveAt, Add, Remove, Clear, CopyTo, } private class ChangeMessage<TItem> { public MessageKind Message { get; set; } public TItem Item { get;set; } public TItem[] ItemsArray { get; set; } public int Index { get; set; } private bool Invoked { get; set; } public object ReturnValue { get; set; } public void Invoke(Action method) { try { method(); } finally { Invoked = true; } } public void Invoke<TReturnValue>(Func<TReturnValue> method) { try { ReturnValue = method(); } finally { Invoked = true; } } public void Wait() { SpinWait.SpinUntil(() => Invoked); } } private readonly ConcurrentQueue<ChangeMessage<T>> ChangeMessages = new ConcurrentQueue<ChangeMessage<T>>(); private readonly IList<T> data = new List<T>(); private bool disposed = false; public void Consumer() { while (!disposed) { if (SpinWait.SpinUntil(() => !ChangeMessages.IsEmpty, 100)) { while (!ChangeMessages.IsEmpty) { ChangeMessage<T> message; if (ChangeMessages.TryDequeue(out message)) { switch (message.Message) { case MessageKind.Insert: message.Invoke(() => data.Insert(message.Index, message.Item)); break; case MessageKind.InsertAt: message.Invoke(() => data[message.Index] = message.Item); break; case MessageKind.RemoveAt: message.Invoke(() => data.RemoveAt(message.Index)); break; case MessageKind.Add: message.Invoke(() => data.Add(message.Item)); break; case MessageKind.Remove: message.Invoke(() => data.Remove(message.Item)); break; case MessageKind.CopyTo: message.Invoke(() => data.CopyTo(message.ItemsArray, message.Index)); break; case MessageKind.Clear: message.Invoke(() => data.Clear()); break; } } } } } } public PublisherConsumerCollection() { new Thread(Consumer){IsBackground = true}.Start(); } public void Dispose() { disposed = true; } public int IndexOf(T item) { return data.IndexOf(item); } public void Insert(int index, T item) { var message = new ChangeMessage<T> { Message = MessageKind.Insert, Index = index, Item = item }; ChangeMessages.Enqueue(message); message.Wait(); } public void RemoveAt(int index) { var message = new ChangeMessage<T> { Message = MessageKind.RemoveAt, Index = index }; ChangeMessages.Enqueue(message); message.Wait(); } public T this[int index] { get { return data[index]; } set { var message = new ChangeMessage<T> { Message = MessageKind.InsertAt, Index = index, Item = value }; ChangeMessages.Enqueue(message); message.Wait(); } } public void Add(T item) { var message = new ChangeMessage<T> { Message = MessageKind.Add, Item = item }; ChangeMessages.Enqueue(message); message.Wait(); } public bool Remove(T item) { var message = new ChangeMessage<T> { Message = MessageKind.Remove, Item = item }; ChangeMessages.Enqueue(message); message.Wait(); return (bool)message.ReturnValue; } public void Clear() { var message = new ChangeMessage<T> { Message = MessageKind.Clear }; ChangeMessages.Enqueue(message); message.Wait(); } public bool Contains(T item) { return data.Contains(item); } public void CopyTo(T[] array, int arrayIndex) { var message = new ChangeMessage<T> { Message = MessageKind.CopyTo, ItemsArray = array, Index = arrayIndex }; ChangeMessages.Enqueue(message); message.Wait(); } public int Count { get { return data.Count; } } public bool IsReadOnly { get { return data.IsReadOnly; } } public IEnumerator<T> GetEnumerator() { for (int i = 0; i < data.Count; i++) yield return data[i]; } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return this.GetEnumerator(); } } class Program { static void Main(string[] args) { using (PublisherConsumerCollection<string> collection = new PublisherConsumerCollection<string>()) { collection.Add("A"); collection.Add("B"); collection.Add("C"); Console.Write("Collection with 3 elements: "); Console.WriteLine(string.Join<string>(",", collection)); collection[1] = "X"; Console.Write("Collection after change 'B' with 'X' with 3 elements: "); Console.WriteLine(string.Join<string>(",", collection)); collection.Remove("C"); Console.Write("Collection after remove 'C' with with 2 elements: "); Console.WriteLine(string.Join<string>(",", collection)); collection.Clear(); var stopwatchCollection = Stopwatch.StartNew(); Parallel.For(0, 1000000, (i) => { collection.Add(i.ToString()); }); stopwatchCollection.Stop(); Console.WriteLine("Adding (PublisherConsumer) 1 000 000 elements took {0} ms.", stopwatchCollection.ElapsedMilliseconds); IDictionary<string, string> dictionary = new ConcurrentDictionary<string,string>(); var stopwatchConcurrentDictionary = Stopwatch.StartNew(); Parallel.For(0, 1000000, (i) => { dictionary.Add(i.ToString(), i.ToString()); }); stopwatchConcurrentDictionary.Stop(); Console.WriteLine("Adding (ConcurrentDictionary) 1 000 000 elements took {0} ms.", stopwatchConcurrentDictionary.ElapsedMilliseconds); Console.ReadKey(); } } } }
Benchmark output:
Collection with 3 elements: A,B,C Collection after change 'B' with 'X' with 3 elements: A,X,C Collection after remove 'C' with 2 elements: A,X Adding (PublisherConsumer) 1 000 000 elements took 1461 ms. Adding (ConcurrentDictionary) 1 000 000 elements took 1427 ms.
And when you comment out 177 line with waiting in Add method:
Collection with 3 elements: A,B,C Collection after change 'B' with 'X' with 3 elements: A,X,C Collection after remove 'C' with 2 elements: A,X Adding (PublisherConsumer) 1 000 000 elements took 674 ms. Adding (ConcurrentDictionary) 1 000 000 elements took 1712 ms.
It is up to you how to change this example, but you can see that using Publisher->Customer pattern is in some case twice faster than very good .NET 4.0 ConcurrentDictionary<TKey,TValue>, :).
Regards,
P ;).
P.S. You can try to benchmark another method than Add :).
I think you have observed some very interesting details , appreciate it for the post.
Thanks Barnel,
I know, it is kind of example that have many layers and my point was only show this example not technical aspects of every layers. I did not like show you answers but give only examples for think :).