Hi, Today I try to resolve in elegant and generic way problem of getting very big amount of data from WCF server. This is my resolution for that problem, and it is based on the materialization of a generic IEnumerable interface for both the client and server site. On the server side, I try to prepare a generic way for getting any kind of data from any kind of data source. Also, I use a small buffer for data in my solution because I do not need to store all data in memory. I can imagine that amount of this data is simply too big to store all. Hence, I need to fetch data elegantly by injecting the enumerator and consuming this elephant amount of data piece by piece.
So lets start from the ServiceContract and the DataContract implementation. Everything is generic because I need to serve any kind of data row by row and decide what kind of data I serve on the WCF host. Generics give me the possibility to create any kind of enumerator that is necessary for real implementation.
using System; using System.Runtime.Serialization; using System.ServiceModel; namespace ForeachWCFContracts.Service { [ServiceContract] public interface IForeachWCF<TKey, TValue> { [OperationContract] TValue[] GetNext(TKey key); } [DataContract] [Serializable] public class Entity { [DataMember] public string Value { get; set; } } }
Now I need IForeachWCF implementation and I will try to make it as simple as I can but with some extra benefits. I want to inject any IEnumerable implementation into this service, and I need to have that enumerator generic for serving anything I want. Also, I need to automatically collect unnecessary buffers from memory on the server site when clients stop getting data or die from a connection perspective.
using System; using System.Collections.Generic; using System.Collections.Concurrent; using System.ServiceModel; using System.Threading; using ForeachWCFContracts.Service; namespace ForeachWCFServer.Service { [ServiceBehavior( InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple )] public class ForeachWCF<TKey, TValue> : IForeachWCF<TKey, TValue> where TValue : class, new() { private readonly Func<IEnumerable<TValue>> Enumerator; private readonly int BufferSize; private readonly int MaxTimeout; public ForeachWCF(Func<IEnumerable<TValue>> enumerator, int bufferSize, int maxTimeout) { if (enumerator == null) throw new InvalidOperationException( "Enumerator cannot be null."); if (bufferSize <= 0) throw new InvalidOperationException( "BufferSize cannot be less or equal than 0."); if (maxTimeout <= 0) throw new InvalidOperationException( "MaxTimeout cannot be less or equal than 0."); Enumerator = enumerator; BufferSize = bufferSize; MaxTimeout = maxTimeout; } private readonly ConcurrentDictionary<TKey, ConcurrentQueue<TValue>> enumerators = new ConcurrentDictionary<TKey, ConcurrentQueue<TValue>>(); public TValue[] GetNext(TKey key) { var count = BufferSize; var data = new TValue[count]; ConcurrentQueue<TValue> queue = null; if (!enumerators.ContainsKey(key)) { queue = new ConcurrentQueue<TValue>(); enumerators.TryAdd(key, queue); new Thread( () => FetchDataFromDataStore(key, queue) ) { IsBackground = true }.Start(); } else { queue = enumerators[key]; } SpinWait.SpinUntil(() => !queue.IsEmpty); TValue value; var fetched = 0; RETRY: if (queue.TryDequeue(out value)) { if (value == null) { enumerators.TryRemove(key, out queue); for(var pos = fetched; pos < count; ++pos) data[pos] = value; return data; } data[fetched++] = value; if (fetched < count) { SpinWait.SpinUntil(() => !queue.IsEmpty); goto RETRY; } } else { SpinWait.SpinUntil(() => !queue.IsEmpty); goto RETRY; } return data; } private void FetchDataFromDataStore( TKey key, ConcurrentQueue<TValue> queue) { foreach (var next in Enumerator()) { if (SpinWait.SpinUntil( () => queue.Count < BufferSize, MaxTimeout)) { queue.Enqueue(next); } else { enumerators.TryRemove(key, out queue); break; } } } } }
For every fetching of the data, I start a new background thread, this implementation can be extended for using ThreadPool. Still, I try to make everything as simple as possible in my example. As you can probably see, I am using SpinWait class for the regulation of server buffer size, and I also stop fetching data when a client does not ask about the next element of data by a time longer than the timeout. everything is parameterized by generics types and constructor parameters. As you can see, I inject enumerator implementation as a parameter for the constructor by using Func delegate.
So, next we can serve this service implementation on WCF endpoint. I choose NetTcpBinding binding with security mode set to the None value but feel free to use another binding if you need it. In this code example, you can see an injection of the enumerator. You can imagine that this enumerator is getting data from RDBS data store by 1k of elements or something like that and then serving it one by one with yield return statements in our ForeachWCF service implementation.
using System; using System.Collections.Generic; using System.Runtime.Serialization; using System.ServiceModel; using System.ServiceModel.Description; using ForeachWCFContracts.Service; namespace ForeachWCFServer { class Server { static void Main(string[] args) { ServiceHost seriveHost = null; try { string addressStr = "net.tcp://localhost:27272/ForeachWCF"; string addressMexStr = "net.tcp://localhost:27272/ForeachWCF/MEX"; var netTcpBinding = new NetTcpBinding(SecurityMode.None); var addressUri = new Uri(addressStr); var service = new Service.ForeachWCF<Guid, Entity>( // Enumerator Injection. GetEnumeratorFromDataStore, // 1000 entities is max buffer size for one enumeration. 1000, // 10 seconds is max timeout for getting next value from server. 10000 ); seriveHost = new ServiceHost(service, addressUri); var metadataBehavior = seriveHost.Description.Behaviors.Find<ServiceMetadataBehavior>(); if (metadataBehavior == null) { metadataBehavior = new ServiceMetadataBehavior(); seriveHost.Description.Behaviors.Add(metadataBehavior); } seriveHost.AddServiceEndpoint( typeof(IForeachWCF<Guid, Entity>), netTcpBinding, addressStr); seriveHost.AddServiceEndpoint( typeof(IMetadataExchange), netTcpBinding, addressMexStr); seriveHost.Open(); Console.WriteLine( "ForeachWCF service is running, press any key to stop."); Console.ReadKey(); } catch (Exception ex) { Console.WriteLine(ex.ToString()); } finally { seriveHost.Close(); } } #region Enumerator for Injection // Last yield return shall be null; private static IEnumerable<Entity> GetEnumeratorFromDataStore() { for (int i = 0; i < 1000000; ++i) { yield return new Entity { Value = i.ToString() }; } yield return null; } } #endregion }
As you can see GetEnumerator is injected to the service host and I try to generate 1 million of elements for fetch. This fetching will be autoregulated by service host because of buffer size. In the GetEnumeratorFromDataStore method, we can fetch data by 1k rows and then enumerate that data one by one using yield return construction.
And now we need only simple client implementation. Our service host has regular binding and MEX binding for clients who want to consume service and create its own proxy classes. So when we consume this service in a client application, we can add a service reference for our project and prepare a code similar to the one shown below.
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.ServiceModel; using System.Text; using ForeachWCFContracts.Service; namespace ForeachWCFClient { class Client { static void Main(string[] args) { var count = 0; var meter = Stopwatch.StartNew(); foreach (var element in ForeachWCFConsumer.GetEnumerator()) { var value = element.Value; ++count; //Console.WriteLine(value); } meter.Stop(); Console.WriteLine("{0} elements was feched in {1} ms.", count, meter.ElapsedMilliseconds); Console.ReadKey(); } } public static class ForeachWCFConsumer { static IForeachWCF<Guid, Entity> client; static ForeachWCFConsumer() { var binding = new NetTcpBinding(SecurityMode.None); var address = new EndpointAddress("net.tcp://localhost:27272/ForeachWCF"); var channelFactory = new ChannelFactory<IForeachWCF<Guid, Entity>>(binding, address); client = channelFactory.CreateChannel(); } public static IEnumerable<Entity> GetEnumerator() { var key = Guid.NewGuid(); Entity[] nexts = null; while ((nexts = client.GetNext(key)) != null) { for (int i = 0; i < nexts.Length; ++i) { var value = nexts[i]; if (value == null) yield break; yield return value; } } } } }
An output on client site on the same machine was something like that:
1 000 000 elements was fetched in 9 400 ms.
I measure performance of that WCF server and 4 clients and 4 Clients get about 10% of CPU and Server get about 5% of CPU time, so it is pretty good I think.
Memory usage is constant all the time the same with the number of threads.
Ok, so let me know if you enjoy this example and feel free to leave me any comments. I wonder about your experience with consuming a big amount of that in a simple manner. Can you share that information in comments?
P ;).
All good, but isn’t sending petabytes of data SOAP-encoded an anti-pattern? Web services (and WCF is a web service technology, no matter which binding is used) is not meant to send big data. Period. This is a domain of ETL solutions.
Hi Szymon, I have focused mainly on the design pattern which supplies watching the memory usage. I agree that is the domain of ETL solutions, but there will be only the difference in the transport layer. WCF is something fundamental to me, gives a view of how much better is the use of specialized transport layer. very interesting work is also a version 4.0 of the WCF REST (WebHttpBinding and WebHttpBehaviour plus ChannelFactory on the client side).
The goto an in line implementation obscure the server code alot :P, but I guess that was made to compact the code for the sake of example. Usually fetching big data is done through TCP binding and on the server side I tend to do it by splitting the data into chunks where each contains it’s own id that also gets send, if the id is in form of -1 then this means that there is no data left to fetch, this is serviced by a thread pool and glued together, now if the data chunks need to arrive in the correct order as a business requirement, I set up wait handles on chunks to improve performance so that way each chunk (n) in order to be send needs to be signaled by chunk (n -- 1) need to only wait on, this solution generally is acceptable if we can spare memory to improve performance, if we can’t then I leverage the threads and they only fetch next 1-2 chunks of data to sent.
On the front end I simply do: do { //call web service }while( response.hasnextchunk )..
Another more complex thing to do in such a case is to implement a chunk scheduler that’s based arround the implementation of a heap to keep the most “hot” data at the root and send it right away, this way statistically we will process data right away, but that’s all dependent on the use case in the situation you provided it’s simply not applicable, as there is not enough data.
Pingback: .NET Rulez! Blog » ForeachREST in WCF Generic Implementation