SimpleDatabaseBroker

Hi, today I would like to share with you implementation of new SimpleServiceBus that now uses Apache.Thrift and Protobuf.NET and is faster because of that. The first reason for using those libraries was, of course, performance, but also, I wanted to build something completely new, SimpleDatabaseBroker. To use this code in SQL Server 2012, I needed something different than dependency for System.ServiceModel.dll because that cannot be used as SQL CLR. In other words, you cannot use WCF for SQL CLR. So, I replaced the communication model from WCF with Apache. Thrift. I used that solution for sending simple messages with a generic List of byte arrays. And that byte array contains serialized messages with Protobuf. NET. Unfortunately, Protobuf deserialized some messages with null properties with complicated communications, but not for the communication model I used for SimpleDatabaseBroker. Let me start with a simple example to show you how it works. For example, you want to invoke the following code in T-SQL on SQL Server 2012. Where you invoke RequestAsync stored procedure with 2 arguments, preconfigured queue name, and message content.

USE SimpleDatabaseBroker;
EXEC [dbo].[RequestAsync] '*', 'Hello SimpleDatabaseBroker!';
EXEC [dbo].[RequestAsync] 'emails', 'some.email@domain.somwhere.com';
EXEC [dbo].[RequestAsync] 'notifs', 'Hi!';
EXEC [dbo].[RequestAsync] 'emails', 'This is queue for emails notifies;
EXEC [dbo].[RequestAsync] 'notifs', 'This is queue for notification';
EXEC [dbo].[RequestAsync] '*', 'This is queue for rest messages';
EXEC [dbo].[RequestAsync] '*', 'key:A val:1';
EXEC [dbo].[RequestAsync] '*', 'key:B val:2';
EXEC [dbo].[RequestAsync] '*', 'key:C val:3';
EXEC [dbo].[RequestAsync] '*', 'key:D val:4';
EXEC [dbo].[RequestAsync] '*', 'key:E val:5';

And when you do that, you can at the same time consume your queue, messages information in any services for long-running query operations, or background processing in your backend Cloud environment. It can look like the below in tester consumers for the above code in T-SQL. The essential thing to remember is that SimpleDatabaseBroker is truly async, and sending or requesting messages is putting that messages into a data model for consumption. So when you use that trigger, for example, a trigger that sends messages to customers, you make a request truly async, and your trigger is not blocked. So for the above code, I started 4 consumer testers, and all of that consumers received all messages. Order per queue is correct, but all messages are consumed by separated threads, so writing lines with message content can have a different order.

image

To use this solution all you have to do is using SQL CLR database and new version of SimpleServiceBus that is integrated part of SimpleDatabaseBroker. Below you can find the T-SQL code that creates this database. I am not sure if UNSAFE permission is required for this case. You can experiment with removing that if you like.

/* -- drop database under development of solution
USE master
GO
DROP DATABASE SimpleDatabaseBroker
GO
*/
CREATE DATABASE SimpleDatabaseBroker
GO
USE SimpleDatabaseBroker
GO
sp_configure 'CLR enabled', 1
GO
RECONFIGURE
GO
ALTER DATABASE SimpleDatabaseBroker
SET TRUSTWORTHY ON
GO
RECONFIGURE
GO
DECLARE @FPATH AS VARCHAR(100)
-- check this path on your System
SET @FPATH = 'C:\Windows\Microsoft.NET\Framework64\v4.0.30319';
CREATE ASSEMBLY System_Web
AUTHORIZATION [dbo]
FROM @FPATH + '\System.Web.dll'
WITH PERMISSION_SET = UNSAFE;
DECLARE @PATH AS VARCHAR(100)
-- check this path on your System
SET @PATH = 'C:\Projects\SimpleDatabaseBroker\SimpleDatabaseBroker\bin\Release';
CREATE ASSEMBLY CodingByToDesign_SimpleServiceBusThrift
AUTHORIZATION [dbo]
FROM @PATH + '\CodingByToDesign.SimpleServiceBusThrift.dll'
WITH PERMISSION_SET = UNSAFE;
CREATE ASSEMBLY CodingByToDesign_SimpleServiceBusProtobuf
AUTHORIZATION [dbo]
FROM @PATH + '\CodingByToDesign.SimpleServiceBusProtobuf.dll'
WITH PERMISSION_SET = UNSAFE;
CREATE ASSEMBLY CodingByToDesign_SimpleServiceBus
AUTHORIZATION [dbo]
FROM @PATH + '\CodingByToDesign.SimpleServiceBus.dll'
WITH PERMISSION_SET = UNSAFE;
CREATE ASSEMBLY CodingByToDesign_SimpleDatabaseBrokerProtocol
AUTHORIZATION [dbo]
FROM @PATH + '\CodingByToDesign.SimpleDatabaseBrokerProtocol.dll'
WITH PERMISSION_SET = UNSAFE;
CREATE ASSEMBLY CodingByToDesign_SimpleDatabaseBroker
AUTHORIZATION [dbo]
FROM @PATH + '\CodingByToDesign.SimpleDatabaseBroker.dll'
WITH PERMISSION_SET = UNSAFE;
GO
CREATE PROCEDURE RequestAsync (
 @queue nvarchar(max),
 @message nvarchar(max)
)
AS
EXTERNAL
NAME
CodingByToDesign_SimpleDatabaseBroker.[CodingByToDesign.SimpleDatabaseBroker.Broker].RequestAsync
GO
/* -- trivial test
EXEC dbo.RequestAsync '*', 'Hello SimpleDatabaseBroker!';
*/

I want to also show you implementations of almost trivial key elements of SimpleDatabaseBroker here. First, I will show you the Broker class in CodingByToDesign.SimpleDatabaseBroker.dll assembly. Behind the scenes, it opens listeners and accepts connections for, in this case, 3 configured Requesters for separated queues. The important thing is that it truly works async and puts into the data model to queue information that can be but does not have to be consumed by consumers. And I decided to limit that queue to a 1 million messages buffer. If you like using this solution on production servers, you may consider adding Start, Stop, and GetState methods; the last one has to use output parameters, not return value, to become a stored procedure.

namespace CodingByToDesign.SimpleDatabaseBroker
{
    using System;
    using System.Collections.Generic;
    using Microsoft.SqlServer.Server;
    using CodingByToDesign.SimpleDatabaseBrokerProtocol;
    using CodingByToDesign.SimpleServiceBus.Communication.Contracts;
    using CodingByToDesign.SimpleServiceBus.Communication;
    public static class Broker
    {
        static read-only IRequester<SimpleDatabaseBrokerMessage>[] requesters
            = new IRequester<SimpleDatabaseBrokerMessage>
              [SimpleDatabaseBrokerConfigs.Configs.Length];
        static read-only IDictionary<string, int> indexes
            = new Dictionary<string, int>
              (SimpleDatabaseBrokerConfigs.Configs.Length);
        static Broker()
        {
            for (var index = 0;
                 index < SimpleDatabaseBrokerConfigs.Configs.Length;
                 ++index)
            {
                var config = SimpleDatabaseBrokerConfigs.Configs[index];
                indexes.Add(config.QueueName, index);
                requesters[index] = new Requester<SimpleDatabaseBrokerMessage>
                (config.QueueNetTcpAddress);
            }
        }
        [SqlProcedure]
        public static void RequestAsync(string queue, string message)
        {
            int index;
            if (indexes.TryGetValue(queue, out index))
            {
                requesters[index].RequestAsync(new SimpleDatabaseBrokerMessage
                { Message = message });
            }
        }
    }
}

Also interesting thing is Consumer code that is able to consume messages from Broker. But if you want to use that on production, remember that you should get messages as fast as possible on the Consumer machine in the backend Cloud system. So you can use the Publisher-Consumer pattern and enqueue all messages to concurrent queues and then consume in separated threads messages. If you do that, you will be able to control your queues and, for example, purge messages from processing very fast. Or you can use IRequester and IReceiver from SimpleServiceBus in a few lines of code because both those components use the Publisher-Consumer pattern as well.

namespace CodingByToDesign.SimpleDatabaseBrokerConsumer
{
    using System;
    using System.Collections.Generic;
    using CodingByToDesign.SimpleDatabaseBrokerProtocol;
    using CodingByToDesign.SimpleServiceBus.Communication.Contracts;
    using CodingByToDesign.SimpleServiceBus.Communication;
    public static class Consumer
    {
        static readonly IReceiver<SimpleDatabaseBrokerMessage>[] receivers
            = new IReceiver<SimpleDatabaseBrokerMessage>
              [SimpleDatabaseBrokerConfigs.Configs.Length];
        static readonly IDictionary<string, int> indexes
            = new Dictionary<string, int>
              (SimpleDatabaseBrokerConfigs.Configs.Length);
        static Consumer()
        {
            for (var index = 0;
                 index < SimpleDatabaseBrokerConfigs.Configs.Length;
                 ++index)
            {
                var config = SimpleDatabaseBrokerConfigs.Configs[index];
                indexes.Add(config.QueueName, index);
                receivers[index] = new Receiver<SimpleDatabaseBrokerMessage>
                                   (config.QueueNetTcpAddress)
                {
                    RecieveAction = new Action<SimpleDatabaseBrokerMessage>(
                        message =>
                        {
                            var messageReceived = MessageReceived;
                            var queueName = config.QueueName;
                            if (messageReceived != null)
                            {
                                messageReceived.Invoke(queueName, message.Message);
                            }
                        }
                    )
                };
            }
        }
        public static Action<string, string> MessageReceived;
    }
}

For your own experiments I am sharing source code. So here there are. Source Code of SimpleServiceBus with Thrift and Protobuf (2937 downloads) and Source Code of SimpleDatabaseBroker (2960 downloads). One more thing, I decided to share those source codes with Apache License 2.0 for reuse in commercial software. Enjoy!

p ;).

2 Replies to “SimpleDatabaseBroker”

  1. I asked for help author of Protobuf.NET, Marc Gravell. Maybe he will help me to pass last performance test on SimpleServiceBus that fail because of deserializaed string property that is null but should have content. I will update content if He help me or when I find out where is the issue.

  2. Pingback: Happy Holidays at The End of 2014 @ coding by to design

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.