In-Proc Agents Communications
Hello, this is my first article on Aspect Coder Network and I decide to start with some very modern subject of software architecture I am working on by a very long time before on implementation on this idea on .NET/C# technology stack. It takes me about four years to find the best practices in this subject. Which is modeling inter-process communications as a key of aspects designing for modeling multi-agent environment? It is a useful technique, especially when creating your backend sub-system, a part of grid computing, or even a multi-core desktop application modeled as a group of independent agents working together. Of course, such kind of modeling can be a perfect design because you can start with a group of agents, you can give to all of the names, and then you can only focus on the functional specialization of every one of them separately. This article will cover some techniques that answer the following questions. How to create such a design with clean white paper and pen? How to prepare the testing environment for agents you created with the stubbing technique? How many operations can you have per second? How can you calculate all computing bandwidth of the solution? All subjects I am trying to cover in this article are the results of my experiments and knowledge.
So, let’s begin. I can show you how the first example came to my mind just now as an easy-to-understand brief solution for financial risk calculation sub-system design. The main functionality of this sub-system will be the calculation of the risk of financial operations. I am not trying to cover all aspects of this kind of solution for you now. I only want to show you how to design such a system inefficiently. And also how to calculate the number of operations you can create. Of course, all details of this kind of calculation, data storing, and predicting algorithms are not subjects here. This can be your next step when you ensure everything works together fast and efficiently. You can precisely code your agents and focus only on algorithms with all business code logic. So let me show you an example of such high-level system design on a diagram.
Now, let me explain it. As you can see, we have the following agents in our design. ControllerAgent, AuditLogAgent, ValidationAgent, CalculationAgent, RepositoryAgent and CachingAgent. All agents coexist and work together, and I draw all communication messages that need to be exchanged. And I hope that all messages’ and agents’ names will help you to understand the functional role of each of them, but I will try to explain the particular role of every one of them. ControllerAgent can be used by any controller of your ASP.NET MVC application or by, WebAPI application or WCF application. It is responsible for interpreting the needs of all requests. It pleases not combine it with any technology yet, because it can also be used separately in operating system service. So, ControllerAgent creates requests synchronously. That means it needs to wait on a response, but as you already know, requests can be produced in many threads in all mentioned technology. For example, by Thread Pool or TPL abstractions-based engines as well. And controller’s main simple job is making sure that every request is processed and every response is created and returned to the client from our sub-system solution.
Above mentioned agent cooperates with AuditLogAgent, and it has an audit logging role, that is mean every single request will be logged. If you are wondering about a good logging technique, it is Trace and Debug objects from System.If you use the Diagnostics namespace correctly, you can store about 50 million logs per second. You do not need any other logging library, everything is built in .NET Framework and waiting for you, but that was only a small digression. So, let’s return to the subject, which is that AuditLogAgent role responsible for logging every request. It should not slow down the process, but this is a non-functional requirement, so all data came to this agent synchronically.
Next, you see the validation agent, responsible for validating every request because of any business rules you want, created to check if the request should or should not be forbidden because it is invalid. That also means that a response cannot be created, so the agent can immediately, after validation, return back the response to ControllerAgent with the reason for validation. Then ControllerAgent can create a forbidden response to this particular client request. On the other hand, if the request is valid, it can be transferred to CalculationAgent. And this agent is the main business logic agent in the entire application. Let’s say that this agent is responsible for calculating data for the risk of financial operations. To do so, it gets possible transaction prediction data that it needs to calculate and store risk. Before CalculationAgent gives a response, for example, as a group of the risks of a particular financial operation in different moments of time for the future, it needs to get calculation data to resolve this issue. To get that data, CalculationAgent asks RepositoryAgent about all the needed information. And then, RepositoryAgent can be smart and ask CachingAgent about already getting data for the same subject. When CachingAgent has all portions of data or any portion of data, that can helps RepositoryAgent to give data faster. But suppose there is no data in the memory of CachingAgent. In that case, the Repository agent needs to get data from storage. It can buy a relational database server, in-memory storage solution server, object database, or anything else. When RepositoryAgent has data, it sends that to the CachingAgent and CalculationAgent. Moreover, RepositoryAgent can notify CachingAgent that some of its data should be collected because information needs to be calculated again, for example, when RepositoryAgent gets a notification from storage. Another possibility for collecting data from CachingAgent memory is own decision of CachingAgent that can be made from all statistical logic of the agent. Of course, when CalculationAgent receives all necessary data, it creates a response for ControllerAgent, and then ControllerAgent will create a response for the client.
Ok, so you may wonder how to make this design possible. We all live as software engineers, coders, and designers in layer world. Am I right? Everyone told us we needed layers with separations. In my opinion, it depends. It is time for layers, and it is time for agents. The solution of this entry will show you how to use agents and model their communication. And I like the agent’s idea because of the elementary modeling of each agent’s functionality and the natural design of that. As you can see in my design drawing, it is easy to create it naturally. And I will also show you how easily you can create a system that works exactly the same way. All you need for that is the five following aspects, RequesterAspect with a defined request-only message type, RequesterAspect with both defined request and response message types, ReceiverAspect with defined response-only message type, ReceiverAspet with defined both request and response message types and to have all possible you can also create ResponderAspect with both request and response message types. The main difference between ReceiverAspect and ResponderAspect with two message types is that ReceiverAspect can respond. Still, it does not have to, and ResponderAspect needs to create a response all the time, no matter what. All mentioned aspects should be disposable. So that is a bunch of contracts that describe all of the mentioned aspects as a solution for every agent.
namespace AspectCoder.Communication.Contracts { using System; public interface IMessage { } } namespace AspectCoder.Communication.Contracts { using System; interface ITransactId { long Value { get; } } } namespace AspectCoder.Communication.Contracts { using System; public interface IRequesterAspect<TMessageReq> : IDisposable where TMessageReq : IMessage, new() { void RequestAsync(TMessageReq message); } public interface IRequesterAspect<TMessageReq, TMessageRes> : IDisposable where TMessageReq : IMessage, new() where TMessageRes : IMessage, new() { Action<TMessageRes> ResponseAction { get; set; } void RequestAsync(TMessageReq message); void RequestSync(TMessageReq message); void RequestAsync(TMessageReq message, Action<TMessageRes> responseAction); void RequestSync(TMessageReq message, Action<TMessageRes> responseAction); } } namespace AspectCoder.Communication.Contracts { using System; public interface IReceiverAspect<TMessageReq> : IDisposable where TMessageReq : IMessage, new() { Action<TMessageReq> RecieveAction { get; set; } } public interface IReceiverAspect<TMessageReq, TMessageRes> where TMessageReq : IMessage, new() where TMessageRes : IMessage, new() { Action<Guid, TMessageReq> RecieveAction { get; set; } void ResponseAsync(Guid id, TMessageRes message); } } namespace AspectCoder.Communication.Contracts { using System; public interface IResponderAspect<TMessageReq, TMessageRes> where TMessageReq : IMessage, new() where TMessageRes : IMessage, new() { Func<TMessageReq, TMessageRes> ResponseFunc { get; set; } } }
Now, you may be wonder how to make it happened and how to make implementation of above interfaces? To be honest, it can be done in a good and a bad way. Hope you are not wondering how to make it bad. So, let me explain the good implementation of this solution. First, you need to protect all invocation with the Publisher-Consumer pattern. It can be done by using ConcurrentQueue collection from .NET 4.0 and one Thread to consume messages. That allows you to have a solution without locking all production code. Maybe you will need some synchronization techniques in your messaging aspects, but I am not sure. I used to read and write slim blockers from .NET 4.0, which works great. The second thing is to remember that all aspects should somehow be able to find each other and that you can have a different number of requesters than receivers. If you want to store transactions in progress, you can use the ConcurrentDictionary collection from .NET 4.0, but remember to have some timeouts and collect old transactions from this dictionary. Ok, now it is time to challenge. This is my benchmark project for you, you can download it from here: New OpenSource NoLock InProc Communication Challenge (2386 downloads). And if you want, you can try to make something faster and better. All code here also shows you how to test aspects, stub agents’ communication, and measure the number of messages you can transfer in your sub-system. My results you can find below. This is also a challenge. You can try to get better performance if you can. Or prepare a better library inside this challenge. To be sure you won, you need to extend benchmark tests and add usages of AspectCoder.dll, including the assembly, and your assembly, then you can compare results. Good luck!
Testing communication... Syncronously sending 10 events: (1)Requester(msg)->(2)Receiver(msg) and NO BACK... PASS. Syncronously sending 10 events: (1)Requester(msg1)->(1)Responder(msg1,msg2)->(1)Receiver(msg1,msg2) and NO BACK... PASS. Syncronously sending 10 events: (1)Requester(msg1,msg2)->Responder(msg1,msg2) and BACK... PASS. Asyncronously sending 10 events: (1)Receiver(msg)->(1)Requester(msg) and NO BACK... PASS. Asyncronously sending 10 * 50 000 events: (1)Requester(msg1,msg2)->(1)Responder(msg1,msg2) and BACK... Benchmark start 50 000 events: 53 milliseconds, 943396.2264 per second. Benchmark start 50 000 events: 64 milliseconds, 781250 per second. Benchmark start 50 000 events: 83 milliseconds, 602409.6386 per second. Benchmark start 50 000 events: 53 milliseconds, 943396.2264 per second. Benchmark start 50 000 events: 66 milliseconds, 757575.7576 per second. Benchmark start 50 000 events: 80 milliseconds, 625000 per second. Benchmark start 50 000 events: 54 milliseconds, 925925.9259 per second. Benchmark start 50 000 events: 43 milliseconds, 1162790.6977 per second. Benchmark start 50 000 events: 78 milliseconds, 641025.641 per second. Benchmark start 50 000 events: 78 milliseconds, 641025.641 per second. Disposing... done after 0 milliseconds. PASS. Syncronously sending 10 * 100 000 events: (1)Requester(msg1,msg2)->(1)Receiver(msg1,msg2)-> (1)Requester(msg3,msg4)->(1)Receiver(msg3,msg4)-> (1)Requester(msg5)->(1)Receiver(msg5) and (2)BACK(msg2,msg4)... Benchmark start 100 000 events: 934 milliseconds, 107066.3812 per second. Benchmark start 100 000 events: 801 milliseconds, 124843.9451 per second. Benchmark start 100 000 events: 901 milliseconds, 110987.7913 per second. Benchmark start 100 000 events: 902 milliseconds, 110864.745 per second. Benchmark start 100 000 events: 896 milliseconds, 111607.1429 per second. Benchmark start 100 000 events: 910 milliseconds, 109890.1099 per second. Benchmark start 100 000 events: 953 milliseconds, 104931.7943 per second. Benchmark start 100 000 events: 979 milliseconds, 102145.046 per second. Benchmark start 100 000 events: 738 milliseconds, 135501.355 per second. Benchmark start 100 000 events: 968 milliseconds, 103305.7851 per second. Disposing... done after 0 milliseconds. PASS. Press any key to continue...
In this article I focused mostly on inside process communication between agents in solution. In the next article, I will show you a solution for creating agents or groups that communicate between many processes and hosts, for example, on a computing grid farm or a multi-core computing system. Good luck with a challenge and see you next time on Aspect Coder Network.
Coder
Out-Proc and In-Net Agents Communications
Hello, this will be my next article on Aspect Coder Network and I am very exciting to show you this subject. There are many ways to communicate across processes and inside the network. Truth be told, I spent more than the last 2 years finding the best way to do that most efficiently and securely simultaneously. The implementation I will present here is based on the best interoperability I know implemented in Apache Thrift in version 0.9.0. The best information is that you can simultaneously communicate with this solution across processes, inside the network, and across all modern languages and technology. For example, between Objective C, C++, C#, Cocoa, D, Delphi, Erlang, Haskell, Java, JavaScript, OCaml, Perl, PHP, Python, Ruby, Smalltalk, and probably more. And this is an enjoyable make generic design for common usage with this. Today I want to start with the following Cloud usage example. This will be a notification monitoring system for a few agents inside Cloud implementation on the hosting web farm. We often need to build agents solution inside the high-speed secured networks, for example, by Layer-3 VLAN-based separation. And for building the best backend, we often need good and speedy notifications from many resources from our cloud application. So we can imagine the following design.
We have in center of our design LongRunProcessing agent/service and we have 3 sources of long-run processing tasks, WebApp, DbTriggers and BackendAgents. We want to use NotificationAgent for all processing notifications. When any of the 3 task sources send the task to the LongRunProcessing agent, we need to send a notification that we are trying to contact this agent. Also, the LongRunProcessing agent needs to notify about begin of work and the end of work. All information needs to go to the NotoficationAgent. Our NotificationAgent stores all notifications that come into Relational DataBase Server for WebApp for future analysis. And there are two kinds of principles here. First is that all nodes need to know what is going on, so notifications go to NotificationAgent and to all nodes that send them or, in this solution, publish notifications. All design elements are independent processes and coexist in the same TCP/IP network. I can tell you a lot about the implementation I was doing inside this solution for NotificationAgent. Still, I think it will be most efficient to share with you only a few pieces of information for a basic understanding of the solution and present you working implementation challenge for your self experimenting. So, implementation using the LongPooling pattern forgets all published notifications and broadcasts them to nodes. And also, this implementation allows Publish operation for all nodes that connect to the service. Responsibility for getting all notifications from NotoficationAgent is on the node site. Because notifications are prepared but not sanded, you need to get them. And if NotificationAgent has no notification, it uses a long wait to have a client connected. The empty array of notifications is getting if there are no notifications even after a long wait. This is for holding connection alive. What is more important than getting notifications. No need to open a connection for SYN packets on the node site, only on the NotificationAgent site. Ok, I hope you are interested in how it can be implemented and try to challenge my implementation and benchmark. So here it is New OpenSource NoLock OutProc Communication Challenge (2096 downloads). That is whole library refactored a bit since first article to hold all inside process and outside process communication in the same place for reuse. And I will be happy if you find the best solution for this and let me know. Here are my results of the benchmark.
COMMUNICATION OUT-PROC CHALLENGES Syncronously sending 10 * 100 000 messages each with 10 characters: (1)LongPoolServer->(1)LongPoolClient... 100k messagess, each with 10 characters received in 490196.0784 per second. 100k messagess, each with 10 characters received in 495049.505 per second. 100k messagess, each with 10 characters received in 537634.4086 per second. 100k messagess, each with 10 characters received in 531914.8936 per second. 100k messagess, each with 10 characters received in 495049.505 per second. 100k messagess, each with 10 characters received in 492610.8374 per second. 100k messagess, each with 10 characters received in 537634.4086 per second. 100k messagess, each with 10 characters received in 492610.8374 per second. 100k messagess, each with 10 characters received in 534759.3583 per second. 100k messagess, each with 10 characters received in 534759.3583 per second. Syncronously sending 10 * 100 000 messages each with 100 characters: (1)LongPoolServer->(1)LongPoolClient... 100k messagess, each with 100 characters received in 401606.4257 per second. 100k messagess, each with 100 characters received in 495049.505 per second. 100k messagess, each with 100 characters received in 492610.8374 per second. 100k messagess, each with 100 characters received in 427350.4274 per second. 100k messagess, each with 100 characters received in 458715.5963 per second. 100k messagess, each with 100 characters received in 456621.0046 per second. 100k messagess, each with 100 characters received in 429184.5494 per second. 100k messagess, each with 100 characters received in 458715.5963 per second. 100k messagess, each with 100 characters received in 400000 per second. 100k messagess, each with 100 characters received in 458715.5963 per second. Syncronously sending 10 * 100 000 messages each with 1000 characters: (1)LongPoolServer->(1)LongPoolClient... 100k messagess, each with 1000 characters received in 114155.2511 per second. 100k messagess, each with 1000 characters received in 112485.9393 per second. 100k messagess, each with 1000 characters received in 106723.5859 per second. 100k messagess, each with 1000 characters received in 118483.4123 per second. 100k messagess, each with 1000 characters received in 116550.1166 per second. 100k messagess, each with 1000 characters received in 114285.7143 per second. 100k messagess, each with 1000 characters received in 112359.5506 per second. 100k messagess, each with 1000 characters received in 112359.5506 per second. 100k messagess, each with 1000 characters received in 112485.9393 per second. 100k messagess, each with 1000 characters received in 116414.4354 per second. Press any key to continue...
This is not the end, I want to show you how simple NotificationAgent can be build. There are two simple code examples. I think It not need to be explained. It is almost trivial. Remember only to split code samples to two Console Application projects based on namespace names.
namespace NotificationAgentSimpleServer { using System; using System.Collections.Generic; using System.Linq; using System.Threading; using AspectCoder.CommunicationOutProc; using AspectCoder.CommunicationOutProc.Generated; class SimpleServer { static void Main(string[] args) { var address = args.Length == 1 ? args[0] : "net.tcp://127.27.27.27:2727/NotificationAgentServer"; Uri uri; if (!Uri.TryCreate(address, UriKind.RelativeOrAbsolute, out uri)) return; var serviceServer = new LongPoolingNotificationEntityImplementationThrift(5000, 1000000); var communicatorServer = new LongPoolingNotificationEntityPublishServiceImplementationThrift(); var server = communicatorServer.CreateServer(uri.Port); try { Console.WriteLine("Hello, what is server name?"); var name = Console.ReadLine(); Console.WriteLine("Welcome '{0}' as notification server.", name); Console.WriteLine("write 'quit' command to quit."); new Thread(server.Serve) { IsBackground = true }.Start(); Console.WriteLine("Server is running..."); string text = string.Empty; while ((text = Console.ReadLine()) != "quit") { serviceServer.Publish( new List<LongPoolingNotificationEntityThrift> { new LongPoolingNotificationEntityThrift { DataString = string.Format("{0}> {1}", name, text) } }); } } finally { server.Stop(); } } } } namespace NotificationAgentSimpleClient { using System; using System.Collections.Generic; using System.Linq; using System.Threading; using AspectCoder.CommunicationOutProc; using AspectCoder.CommunicationOutProc.Generated; using AspectCoder.CommunicationOutProc.Consumers; using Thrift.Transport; class SimpleClient { static void Main(string[] args) { var address = args.Length == 1 ? args[0] : "net.tcp://127.27.27.27:2727/NotificationAgentServer"; Uri uri; if (!Uri.TryCreate(address, UriKind.RelativeOrAbsolute, out uri)) return; var communicatorClient = new LongPoolingNotificationEntityPublishServiceImplementationThrift(); var consumer = new LongPoolingNotificationPublishEntityConsumerThrift(); TTransport transport = null; Func<LongPoolingNotificationEntityPublishServiceThrift.Client> clientCreator = () => { if (transport != null && transport.IsOpen) transport.Close(); transport = null; var clientCreated = communicatorClient.CreateClient(uri.Host, uri.Port, out transport); SpinWait.SpinUntil(() => transport.IsOpen, 1000); return clientCreated; }; TTransport transportPublish = null; Func<LongPoolingNotificationEntityPublishServiceThrift.Client> clientCreatorPublish = () => { if (transportPublish != null && transportPublish.IsOpen) transportPublish.Close(); transportPublish = null; var clientCreated = communicatorClient.CreateClient(uri.Host, uri.Port, out transportPublish); SpinWait.SpinUntil(() => transportPublish.IsOpen, 1000); return clientCreated; }; var clientPublish = clientCreatorPublish(); consumer.Notified += notification => Console.WriteLine(notification.DataString); try { consumer.StartWorking(clientCreator); Console.WriteLine("Hello, what is your client name?"); var name = Console.ReadLine(); Console.WriteLine("Welcome '{0}' on notification server.", name); Console.WriteLine("write 'quit' command to quit."); string text = string.Empty; while ((text = Console.ReadLine()) != "quit") { Publish( new List<LongPoolingNotificationEntityThrift> { new LongPoolingNotificationEntityThrift { DataString = string.Format("{0}> {1}", name, text) } }, ref clientPublish, clientCreatorPublish, transportPublish); } } finally { consumer.StopWorking(); } } static void Publish(List<LongPoolingNotificationEntityThrift> data, ref LongPoolingNotificationEntityPublishServiceThrift.Client clientP, Func<LongPoolingNotificationEntityPublishServiceThrift.Client> clientCreatorP, TTransport transportP) { RETRY: try { clientP.Publish(data); } catch (Exception ex) { Thread.Sleep(1000); clientP = clientCreatorP(); goto RETRY; } } } }
In this article I focused mostly on outside process and inside network communication between agents in internal cloud environment. Good luck with a challenge and see you next time on Aspect Coder Network!
Coder