Batch Processing Aspect in C#

code_puzzleHi, today I want to share with you idea of batch processing aspect. It solves an issue with calling T-SQL Server procedures 1-by-1 requests, for example, for inserts calls. And instead of calling 1-by-1, I prepared aspects that you call 1-by-1, but it does it in batches, for example, up to 100-by-100, like in my test example. Below you can find a code that includes aspects and a simple test. The most important quality factor of usage of this aspect is the performance of the execution. For example, on the local machine MS SQL Server where network latency is minimal, execution of 1-by-1 of 1000 inserts took more than 6 seconds, and 100-by-100 took about 0.8 seconds. And it scales very well, especially when the network latency increases the execution of all-around trip time. For 1 million requests in my test, I needed only 193 seconds to put them inside the database, which is 10 times faster than NHibernate or Entity Framework ORMs. And what is the most important usage of aspect is just the requested entity changes 1-by-1, and aspects do the work and make them 100-by-100. Of course, it depends on how the constructor of the aspect is configured. In code below it is up to 100-by-100 with delay up to 1000 milliseconds.

namespace BatchProcessingAspectSandbox
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Data.SqlClient;
    using System.Diagnostics;
    using System.Threading;
    public interface IBatchProcessingAspectEntity { }
    public class BatchProcessingAspect<T> where T : IBatchProcessingAspectEntity
    {
        Action<T[]> dbAction;
        int maxTimeout;
        int maxCount;
        bool working;
        public BatchProcessingAspect (
            Action<T[]> dbAction,
            int maxTimeout = 1000,
            int maxCount = 100
        )
        {
            this.dbAction = dbAction;
            this.maxTimeout = maxTimeout;
            this.maxCount = maxCount;
            this.working = true;
            new Thread(Consume){IsBackground = true}.Start();
        }
        ConcurrentQueue<T> entities = new ConcurrentQueue<T>();
        void Consume()
        {
            var values = new List<T>();
            while (working)
            {
                SpinWait.SpinUntil(() => entities.Count >= maxCount, maxTimeout);
                if (entities.IsEmpty) continue;
                values.Clear();
                while (entities.Count > 0 && values.Count < maxCount)
                {
                    T value = default(T);
                    if (entities.TryDequeue(out value))
                    {
                        values.Add(value);
                    }
                }
                if (values.Count > 0)
                {
                    try
                    {
                        dbAction(values.ToArray());
                    }
                    catch (Exception exception)
                    {
                        Console.WriteLine(exception.ToString());
                    }
                }
            }
        }
        public void BatchInvocation(T entity)
        {
            entities.Enqueue(entity);
        }
        public bool IsEmpty
        {
            get { return entities.IsEmpty; }
        }
    }
    class PostChatTest
    {
        class PostChatEntity : IBatchProcessingAspectEntity
        {
            public int ID { get; set; }
            public string Room { get; set; }
            public string Chat { get; set; }
            public string User { get; set; }
            public string Message { get; set; }
        }
        static void PostsChat(PostChatEntity[] entities)
        {
            var ids = new int[entities.Length];
            var rooms = new string[entities.Length];
            var chats = new string[entities.Length];
            var users = new string[entities.Length];
            var messages = new string[entities.Length];
            for (var i = 0; i < entities.Length; ++i)
            {
                rooms[i] = entities[i].Room;
                chats[i] = entities[i].Chat;
                users[i] = entities[i].User;
                messages[i] = entities[i].Message;
            }
            var resutls = DbPostsChat(rooms, chats, users, messages);
            for(var i = 0; i < entities.Length; ++i)
            {
                entities[i].ID = resutls[i];
            }
        }
        static int[] DbPostsChat (
            string[] rooms, string[] chats, string[] users, string[] messages
        )
        {
            var sqlProcedure = "PostsChat";
            var sqlSeparator = "·";
            var connectionStringBuilder = new SqlConnectionStringBuilder();
            connectionStringBuilder.DataSource = "MACBOOKPRO";
            connectionStringBuilder.ApplicationName = "BatchProcessingAspect";
            connectionStringBuilder.InitialCatalog = "CrisisManagement";
            connectionStringBuilder.IntegratedSecurity = true;
            connectionStringBuilder.ConnectTimeout = 0;
            using (var connection
                   = new SqlConnection(connectionStringBuilder.ToString()))
            {
                connection.Open();
                using (var command = new SqlCommand(sqlProcedure, connection))
                {
                    command.CommandType = System.Data.CommandType.StoredProcedure;
                    if (rooms != null)
                    {
                        command.Parameters.AddWithValue("Rooms",
                        string.Join(sqlSeparator, rooms));
                    }
                    if (chats != null)
                    {
                        command.Parameters.AddWithValue("Chats",
                        string.Join(sqlSeparator, chats));
                    }
                    if (users != null)
                    {
                        command.Parameters.AddWithValue("Users",
                        string.Join(sqlSeparator, users));
                    }
                    if (messages != null)
                    {
                        command.Parameters.AddWithValue("Messages",
                        string.Join(sqlSeparator, messages));
                    }
                    var errors = new List<string>();
                    var ids = new List<string>();
                    var names = new List<string>();
                    var values = new List<string>();
                    var flow = new List<string>[] { errors, ids, names, values };
                    var flowStep = -1;
                    SqlDataReader reader = null;
                    try
                    {
                        reader = command.ExecuteReader();
                        do
                        {
                            ++flowStep;
                            while (reader.Read())
                            {
                                flow[flowStep].Add(reader.GetString(0));
                            }
                        }
                        while (reader.NextResult());
                    }
                    catch (Exception exception)
                    {
                        Console.WriteLine(exception.ToString());
                    }
                    finally
                    {
                        if (reader != null)
                        {
                            reader.Dispose();
                            reader = null;
                        }
                    }
                    var response = new int[flow[1].Count];
                    for(var i = 0; i < flow[1].Count; ++i)
                    {
                        response[i] = int.Parse(flow[1][i]);
                    }
                    return response;
                }
            }
        }
        static BatchProcessingAspect<PostChatEntity> batchProcessingAspect
        = new BatchProcessingAspect<PostChatEntity>
        (dbAction: PostsChat, maxTimeout: 1000, maxCount: 100);
        static void Main(string[] args)
        {
            var count = 1000;
            Console.WriteLine("Test Start");
            var entities = new PostChatEntity[count];
            for (var i = 0; i < entities.Length; ++i)
            {
                entities[i] =
                new PostChatEntity
                {
                    Room = "Room1",
                    Chat = "Chat_1_1",
                    User = "user1",
                    Message = "Message+" + i.ToString().PadLeft(6, '0')
                };
            }
            Stopwatch measure = Stopwatch.StartNew();
            for (var i = 0; i < entities.Length; ++i)
            {
                batchProcessingAspect.BatchInvocation (
                    entities[i]
                );
            }
            SpinWait.SpinUntil(() => batchProcessingAspect.IsEmpty);
            measure.Stop();
            Console.Write("Test Stop: took {0} ms... ", measure.ElapsedMilliseconds);
            Thread.Sleep(2500);
            var pass = true;
            for (var i = 0; i < entities.Length; ++i)
            {
                if (entities[i].ID == 0)
                {
                    pass = false;
                    break;
                }
            }
            Console.WriteLine(pass ? "PASS" : "FAIL");
            Console.ReadKey(true);
        }
    }
}

And here it is a extension for database code from Simple Service Bus Training database I posted some time ago. The below procedure is ready for bulk executions of many arguments. Only limitation is a request size for MS SQL Server.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
-- =============================================
-- Author: 	 	  Piotr Sowa
-- Create Date: July 10th, 2015
-- Description:	Split Arguments Util Function
-- =============================================
CREATE FUNCTION [dbo].[SplitArguments] (
@Arguments [NVARCHAR](MAX)
)
RETURNS
@ReturnArguments TABLE (
[Index] [INT] IDENTITY(0,1) PRIMARY KEY,
[Argument] [NVARCHAR](4000))
AS
BEGIN
	DECLARE @delimiter NVARCHAR(1)
	SET @delimiter = N'·'
    IF @Arguments is null return
    DECLARE	@iStart INT,
    		@iPos INT
    IF SUBSTRING(@Arguments, 1, 1) = @delimiter
    BEGIN
    	SET	@iStart = 2
    	INSERT INTO @ReturnArguments
    	VALUES (NULL)
    END
    ELSE
    	SET	@iStart = 1
    WHILE 1=1
    BEGIN
    	SET @iPos = CHARINDEX(@delimiter, @Arguments, @iStart)
    	IF @iPos = 0
    		SET	@iPos = len( @Arguments )+1
    	IF @iPos - @iStart > 0
    		INSERT INTO @ReturnArguments
    		VALUES (SUBSTRING (@Arguments, @iStart, @iPos-@iStart))
    	ELSE
    		INSERT INTO @ReturnArguments
    		VALUES (NULL)
    	SET	@iStart = @iPos+1
    	IF @iStart > len( @Arguments )
    		BREAK
    END
	RETURN
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
-- =============================================
-- Author: 	 	  Piotr Sowa
-- Create Date: July 21th, 2016
-- Description:	PostChat in CrisisManagement
-- =============================================
CREATE PROCEDURE [dbo].[PostsChat]
@Rooms NVARCHAR(MAX),
@Chats NVARCHAR(MAX),
@Users NVARCHAR(MAX),
@Messages NVARCHAR(MAX)
AS
BEGIN
	DECLARE @ErrorsTable TABLE(
[Index] [INT] IDENTITY(1,1) PRIMARY KEY,
[Error] [NVARCHAR](4000))
	DECLARE @IDsTable TABLE(
[Index] [INT] IDENTITY(1,1) PRIMARY KEY,
[ID] [NVARCHAR](4000))
	DECLARE @NamesTable TABLE(
[Index] [INT] IDENTITY(1,1) PRIMARY KEY,
[Name] [NVARCHAR](4000))
	DECLARE @ValuesTable TABLE(
[Index] [INT] IDENTITY(1,1) PRIMARY KEY,
[Value] [NVARCHAR](4000))
	DECLARE @ArgumentsInTable TABLE(
[Index] [INT] PRIMARY KEY,
[C_ID] INT,
[User] [NVARCHAR](4000),
[Message] [NVARCHAR](4000))
	INSERT INTO @ArgumentsInTable([R].[Index], [C_ID], [User], [Message])
	SELECT [R].[Index], [CT].[ID], [U].[Argument], [M].[Argument]
	FROM SplitArguments(@Rooms) AS [R]
	INNER JOIN [dbo].[Rooms] AS [RT] ON [RT].[Name] = [R].Argument
	INNER JOIN SplitArguments(@Chats) AS [C] ON [R].[Index] = [C].[Index]
	INNER JOIN [dbo].[Chats] AS [CT] ON [CT].[Name] = [C].[Argument]
	INNER JOIN SplitArguments(@Users) AS [U] ON [R].[Index] = [U].[Index]
	INNER JOIN SplitArguments(@Messages) AS [M] ON [R].[Index] = [M].[Index]
	ORDER BY [Index]
	INSERT INTO [ChatPosts]([ChatID], [UserName], [Message], [Status])
	OUTPUT CAST(INSERTED.ID AS NVARCHAR(4000))
        INTO @IDsTable([ID])
	SELECT
	[A].[C_ID], [A].[User], [A].[Message], 0
	FROM @ArgumentsInTable [A]
	ORDER BY [A].[Index]
	SELECT [Error] FROM @ErrorsTable ORDER BY [Index]
	SELECT [ID] FROM @IDsTable ORDER BY [Index]
	SELECT [Name] FROM @NamesTable ORDER BY [Index]
	SELECT [Value] FROM @ValuesTable ORDER BY [Index]
END
GO

p ;).

One Reply to “Batch Processing Aspect in C#”

  1. Pingback: Batch Processing Aspect in Java - coding by to design

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.