Hi, today I would like to share with you implementation of new SimpleServiceBus that now uses Apache.Thrift and Protobuf.NET and is faster because of that. The first reason for using those libraries was, of course, performance, but also, I wanted to build something completely new, SimpleDatabaseBroker. To use this code in SQL Server 2012, I needed something different than dependency for System.ServiceModel.dll because that cannot be used as SQL CLR. In other words, you cannot use WCF for SQL CLR. So, I replaced the communication model from WCF with Apache. Thrift. I used that solution for sending simple messages with a generic List of byte arrays. And that byte array contains serialized messages with Protobuf. NET. Unfortunately, Protobuf deserialized some messages with null properties with complicated communications, but not for the communication model I used for SimpleDatabaseBroker. Let me start with a simple example to show you how it works. For example, you want to invoke the following code in T-SQL on SQL Server 2012. Where you invoke RequestAsync stored procedure with 2 arguments, preconfigured queue name, and message content.
USE SimpleDatabaseBroker; EXEC [dbo].[RequestAsync] '*', 'Hello SimpleDatabaseBroker!'; EXEC [dbo].[RequestAsync] 'emails', 'some.email@domain.somwhere.com'; EXEC [dbo].[RequestAsync] 'notifs', 'Hi!'; EXEC [dbo].[RequestAsync] 'emails', 'This is queue for emails notifies; EXEC [dbo].[RequestAsync] 'notifs', 'This is queue for notification'; EXEC [dbo].[RequestAsync] '*', 'This is queue for rest messages'; EXEC [dbo].[RequestAsync] '*', 'key:A val:1'; EXEC [dbo].[RequestAsync] '*', 'key:B val:2'; EXEC [dbo].[RequestAsync] '*', 'key:C val:3'; EXEC [dbo].[RequestAsync] '*', 'key:D val:4'; EXEC [dbo].[RequestAsync] '*', 'key:E val:5';
And when you do that, you can at the same time consume your queue, messages information in any services for long-running query operations, or background processing in your backend Cloud environment. It can look like the below in tester consumers for the above code in T-SQL. The essential thing to remember is that SimpleDatabaseBroker is truly async, and sending or requesting messages is putting that messages into a data model for consumption. So when you use that trigger, for example, a trigger that sends messages to customers, you make a request truly async, and your trigger is not blocked. So for the above code, I started 4 consumer testers, and all of that consumers received all messages. Order per queue is correct, but all messages are consumed by separated threads, so writing lines with message content can have a different order.
To use this solution all you have to do is using SQL CLR database and new version of SimpleServiceBus that is integrated part of SimpleDatabaseBroker. Below you can find the T-SQL code that creates this database. I am not sure if UNSAFE permission is required for this case. You can experiment with removing that if you like.
/* -- drop database under development of solution USE master GO DROP DATABASE SimpleDatabaseBroker GO */ CREATE DATABASE SimpleDatabaseBroker GO USE SimpleDatabaseBroker GO sp_configure 'CLR enabled', 1 GO RECONFIGURE GO ALTER DATABASE SimpleDatabaseBroker SET TRUSTWORTHY ON GO RECONFIGURE GO DECLARE @FPATH AS VARCHAR(100) -- check this path on your System SET @FPATH = 'C:\Windows\Microsoft.NET\Framework64\v4.0.30319'; CREATE ASSEMBLY System_Web AUTHORIZATION [dbo] FROM @FPATH + '\System.Web.dll' WITH PERMISSION_SET = UNSAFE; DECLARE @PATH AS VARCHAR(100) -- check this path on your System SET @PATH = 'C:\Projects\SimpleDatabaseBroker\SimpleDatabaseBroker\bin\Release'; CREATE ASSEMBLY CodingByToDesign_SimpleServiceBusThrift AUTHORIZATION [dbo] FROM @PATH + '\CodingByToDesign.SimpleServiceBusThrift.dll' WITH PERMISSION_SET = UNSAFE; CREATE ASSEMBLY CodingByToDesign_SimpleServiceBusProtobuf AUTHORIZATION [dbo] FROM @PATH + '\CodingByToDesign.SimpleServiceBusProtobuf.dll' WITH PERMISSION_SET = UNSAFE; CREATE ASSEMBLY CodingByToDesign_SimpleServiceBus AUTHORIZATION [dbo] FROM @PATH + '\CodingByToDesign.SimpleServiceBus.dll' WITH PERMISSION_SET = UNSAFE; CREATE ASSEMBLY CodingByToDesign_SimpleDatabaseBrokerProtocol AUTHORIZATION [dbo] FROM @PATH + '\CodingByToDesign.SimpleDatabaseBrokerProtocol.dll' WITH PERMISSION_SET = UNSAFE; CREATE ASSEMBLY CodingByToDesign_SimpleDatabaseBroker AUTHORIZATION [dbo] FROM @PATH + '\CodingByToDesign.SimpleDatabaseBroker.dll' WITH PERMISSION_SET = UNSAFE; GO CREATE PROCEDURE RequestAsync ( @queue nvarchar(max), @message nvarchar(max) ) AS EXTERNAL NAME CodingByToDesign_SimpleDatabaseBroker.[CodingByToDesign.SimpleDatabaseBroker.Broker].RequestAsync GO /* -- trivial test EXEC dbo.RequestAsync '*', 'Hello SimpleDatabaseBroker!'; */
I want to also show you implementations of almost trivial key elements of SimpleDatabaseBroker here. First, I will show you the Broker class in CodingByToDesign.SimpleDatabaseBroker.dll assembly. Behind the scenes, it opens listeners and accepts connections for, in this case, 3 configured Requesters for separated queues. The important thing is that it truly works async and puts into the data model to queue information that can be but does not have to be consumed by consumers. And I decided to limit that queue to a 1 million messages buffer. If you like using this solution on production servers, you may consider adding Start, Stop, and GetState methods; the last one has to use output parameters, not return value, to become a stored procedure.
namespace CodingByToDesign.SimpleDatabaseBroker { using System; using System.Collections.Generic; using Microsoft.SqlServer.Server; using CodingByToDesign.SimpleDatabaseBrokerProtocol; using CodingByToDesign.SimpleServiceBus.Communication.Contracts; using CodingByToDesign.SimpleServiceBus.Communication; public static class Broker { static read-only IRequester<SimpleDatabaseBrokerMessage>[] requesters = new IRequester<SimpleDatabaseBrokerMessage> [SimpleDatabaseBrokerConfigs.Configs.Length]; static read-only IDictionary<string, int> indexes = new Dictionary<string, int> (SimpleDatabaseBrokerConfigs.Configs.Length); static Broker() { for (var index = 0; index < SimpleDatabaseBrokerConfigs.Configs.Length; ++index) { var config = SimpleDatabaseBrokerConfigs.Configs[index]; indexes.Add(config.QueueName, index); requesters[index] = new Requester<SimpleDatabaseBrokerMessage> (config.QueueNetTcpAddress); } } [SqlProcedure] public static void RequestAsync(string queue, string message) { int index; if (indexes.TryGetValue(queue, out index)) { requesters[index].RequestAsync(new SimpleDatabaseBrokerMessage { Message = message }); } } } }
Also interesting thing is Consumer code that is able to consume messages from Broker. But if you want to use that on production, remember that you should get messages as fast as possible on the Consumer machine in the backend Cloud system. So you can use the Publisher-Consumer pattern and enqueue all messages to concurrent queues and then consume in separated threads messages. If you do that, you will be able to control your queues and, for example, purge messages from processing very fast. Or you can use IRequester and IReceiver from SimpleServiceBus in a few lines of code because both those components use the Publisher-Consumer pattern as well.
namespace CodingByToDesign.SimpleDatabaseBrokerConsumer { using System; using System.Collections.Generic; using CodingByToDesign.SimpleDatabaseBrokerProtocol; using CodingByToDesign.SimpleServiceBus.Communication.Contracts; using CodingByToDesign.SimpleServiceBus.Communication; public static class Consumer { static readonly IReceiver<SimpleDatabaseBrokerMessage>[] receivers = new IReceiver<SimpleDatabaseBrokerMessage> [SimpleDatabaseBrokerConfigs.Configs.Length]; static readonly IDictionary<string, int> indexes = new Dictionary<string, int> (SimpleDatabaseBrokerConfigs.Configs.Length); static Consumer() { for (var index = 0; index < SimpleDatabaseBrokerConfigs.Configs.Length; ++index) { var config = SimpleDatabaseBrokerConfigs.Configs[index]; indexes.Add(config.QueueName, index); receivers[index] = new Receiver<SimpleDatabaseBrokerMessage> (config.QueueNetTcpAddress) { RecieveAction = new Action<SimpleDatabaseBrokerMessage>( message => { var messageReceived = MessageReceived; var queueName = config.QueueName; if (messageReceived != null) { messageReceived.Invoke(queueName, message.Message); } } ) }; } } public static Action<string, string> MessageReceived; } }
For your own experiments I am sharing source code. So here there are. Source Code of SimpleServiceBus with Thrift and Protobuf (2937 downloads) and Source Code of SimpleDatabaseBroker (2960 downloads). One more thing, I decided to share those source codes with Apache License 2.0 for reuse in commercial software. Enjoy!
p ;).
I asked for help author of Protobuf.NET, Marc Gravell. Maybe he will help me to pass last performance test on SimpleServiceBus that fail because of deserializaed string property that is null but should have content. I will update content if He help me or when I find out where is the issue.
Pingback: Happy Holidays at The End of 2014 @ coding by to design