namespace BlockingCollectionConsumerSandbox { using System; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; class Program { static int count; static void Main(string[] args) { var consumer = new BlockingCollectionConsumer<int>(Consume); var c = 1000000; Stopwatch meter = new Stopwatch(); for (int i = 0; i < 10; ++i) { meter.Start(); Parallel.For(0, c, item => consumer.Publish(item)); SpinWait.SpinUntil(() => c == count); meter.Stop(); Console.WriteLine("1 mln trys took about {0} ms.", meter.ElapsedMilliseconds); meter.Reset(); count = 0; } consumer.StopAndWait(); Console.WriteLine("Press any key to continue..."); Console.ReadKey(); } static void Consume(int item) { ++count; } } }
an output
1 mln trys took about 400 ms. 1 mln trys took about 411 ms. 1 mln trys took about 397 ms. 1 mln trys took about 402 ms. 1 mln trys took about 411 ms. 1 mln trys took about 408 ms. 1 mln trys took about 420 ms. 1 mln trys took about 411 ms. 1 mln trys took about 409 ms. 1 mln trys took about 399 ms. Press any key to continue...
implementation
namespace BlockingCollectionConsumerSandbox { using System.Collections.Concurrent; using System.Threading.Tasks; public class BlockingCollectionConsumer<T> { private readonly BlockingCollection<T> collection = new BlockingCollection<T>(); private readonly Task consumerTask; public BlockingCollectionConsumer(System.Action<T> consumeAction) { consumerTask = Task.Factory.StartNew(() => Consumer(consumeAction)); } private void Consumer(System.Action<T> consumeAction) { foreach (T item in collection.GetConsumingEnumerable()) consumeAction.Invoke(item); } public void Publish(T item) { collection.Add(item); } public void StopAndWait() { collection.CompleteAdding(); consumerTask.Wait(); } } }
no comments, enjoy,
P ;).