Introduction to WebSockets with SignalR in .NET Part 3

Introduction

We’ll continue our discussion of SignalR where we left off in the previous post. So open the SignalRWeb demo project and let’s get to it!

Demo continued

We left off having the following “scripts” section in Index.cshtml:

@section scripts
{
    <script src="~/Scripts/jquery.signalR-2.0.3.js"></script>
    <script src="~/Scripts/knockout-3.1.0.js"></script>
    <script src="~/Scripts/results.js"></script>
}

We’ll build on this example to see how a client can interact with the server through WebSockets. At present we have an empty Hello() method in our ResultsHub class. Insert another method which will allow the clients to send a message to the server:

public void SendMessage(String message)
{

}

There’s a property called Context that comes with SignalR. It is similar to HttpContext in ASP.NET: it contains a lot of information about the current connection: headers, query string, authentication etc. Just type “Context.” in the method body and inspect the available properties, they should be self-explanatory. We have no authentication so we’ll use the connection ID property of Context to give the sender some identifier:

public void SendMessage(String message)
{
	string completeMessage = string.Concat(Context.ConnectionId
		, " has registered the following message: ", message);

	Clients.All.registerMessage(completeMessage);
}

You’ll recall from the previous post that we need to deal with JavaScript in SignalR. The above piece of code will result in a JavaScript method called “registerMessage” to be invoked from the server. Where is that method? We need to write it of course. We inserted a JavaScript file called “results.js” previously. Open that file and enter the following stub:

(function () {
    var resultsHub = $.connection.resultsHub;
}());

resultsHub is a reference to the hub we’ve been working on. The “connection” property comes from the SignalR jQuery library and creates a SignalR connection. Through the connection property you’ll be able to reference your hubs. There’s no IntelliSense for the hub names, so be careful with the spelling.

We need to stop for a second and go back to Index.cshtml. There’s one more JavaScript source we need to reference, but it’s not available in the Scripts folder. Recall that we used the MapSignalR OWIN extension in Startup.cs. That extension will map the SignalR hubs to the /signalr endpoint. Add the following script declaration to the scripts section below the jquery.signalR-2.x.x.js script reference:

<script src="~/SignalR/hubs"></script>

Start the application and navigate to http://localhost:xxxxx/SignalR/hubs. You should see some JavaScript related to SignalR in the browser, so the script reference is valid. Basically this code generates proxies for the hubs. If you scroll down you’ll see that it has found the ResultsHub and its two dynamic methods:

proxies.resultsHub = this.createHubProxy('resultsHub'); 
proxies.resultsHub.client = { };
proxies.resultsHub.server = {
   hello: function () {
    return proxies.resultsHub.invoke.apply(proxies.resultsHub, $.merge(["Hello"], $.makeArray(arguments)));
   },

   sendMessage: function (message) {
      return proxies.resultsHub.invoke.apply(proxies.resultsHub, $.merge(["SendMessage"], $.makeArray(arguments)));
             }
};

As you add other hubs and other dynamic methods they will be registered here in this script generated by SignalR using Reflection. Note that the hello and sendMessage functions have been registered on the server – proxies.resultsHub.server – which is expected.

Add the following code to results.js just below the resultsHub reference:

$.connection.hub.logging = true;
$.connection.hub.start();

We turn on logging so that we can see what SignalR is doing behind the scenes. Then we tell SignalR to start the communication. This method will go through the 4 ways of establishing a connection with the client and determine which one works best.

Next we need the JavaScript methods that will be invoked: hello and registerMessage. Add the following stubs just below the call to hub.start():

resultsHub.client.hello = function(){

}

resultsHub.client.registerMessage = function (message) {

};

We’ll concentrate on the registerMessage function, hello can remain empty. Next we need to show the responses on the screen. As mentioned before we’ll use knockout.js as it provides for a very responsive GUI. I in fact only know the basics of knockout.js, as client side programming is not really my cup of tea. If you don’t know anything about knockout.js then you might want to go through the first couple of tutorials here. It is very much based on view-models which are bound to HTML elements. As the properties change so do the values of those elements in a seamless fashion. We won’t go into any detail about the knockout specific details here. Add the following code below the resultsHub.client.registerMessage stub:

var messageModel = function () {
      this.registeredMessage = ko.observable(""),
      this.registeredMessageList = ko.observableArray()
};

The registeredMessage property will show a message sent by the client. registeredMessageList will hold all messages that have been sent from the server. Next add a model prototype:

messageModel.prototype = {

        newMessage: function () {
            resultsHub.server.sendMessage(this.registeredMessage());
            this.registeredMessage("");
        },
        addMessageToList: function (message) {
            this.registeredMessageList.push(message);
        }

    };

newMessage is meant to be a function that can be invoked upon a button click. It sends the message to the server and then clears it. Recall that we called our function in ResultsHub “SendMessage”. You can call it using the “server” property followed by the name of the function you’d like to invoke. addMessageToList does exactly what the function name implies. Then we create an instance of the view-model and instruct knockout to start binding it to our GUI elements:

var viewModel = new messageModel();
$(function () {
        ko.applyBindings(viewModel);
    });

We can now fill in the resultsHub.client.registerMessage stub:

resultsHub.client.registerMessage = function (message) {
        viewModel.addMessageToList(message);
};

It simply calls upon the view-model instance to add the new message to the message list.

Now we can create the corresponding HTML code on index.cshtml. Add the following right above the scripts section.

<div>
    <input type="text" placeholder="Your message..." data-bind="value:registeredMessage" />
    <button data-bind="click:newMessage">Register message</button>
</div>

This bit of markup will serve as the “register” section where the user enters a message in the text box and sends it to the server using the button. Note the knockout-related data-bind attributes. The text box is bound to the registeredMessage property and the click event of the button is bound to the newMessage function of the model, both defined in our results.js file.

Add the following HTML below the “register” section:

<div>
    <div data-bind="foreach:registeredMessageList">
        <div data-bind="text: $data"></div>
    </div>
</div>

This is a knockout foreach expression: for each message in the registeredMessageList array we print the message in a separate div. We bind the text property of the div to the message. The current value in the foreach loop can be accessed using “$data” in knockout.

Let’s go through the expected process step by step:

  1. The user enters a message in the text box and presses the register message button
  2. The button invokes the newMessage function of the knockout view-model ‘messageModel’
  3. The newMessage function will invoke the SendMessage(string input) method of the ResultsHub.cs Hub
  4. The SendMessage function constructs some return message and calls the registerMessage JavaScript function on each client, in our case defined in results.js
  5. The regiesterMessage function receives the string returned by the SendMessage function of ResultsHub
  6. regiesterMessage adds the message to the messages array of the view-model
  7. The view-model is updated and knockout magically updates the screen with the new list of messages

Set a breakpoint within SendMessage of ResultsHub.cs. Start the application in Chrome and press F12 to open the developer tools. You should see some SignalR related messages in the log:

Connecting to WebSockets in Devtools

It has found the ResultsHub and managed to open the web socket endpoint. Note the protocol of ‘ws’. Now insert a message in the text box and press register. Code execution should stop at the breakpoint in Visual Studio meaning that we’ve managed to wire up the components correctly. I encourage you to inspect the Context property and see what’s available in it. You should then see your message under the textbox as returned by the SendMessage:

Message collection by SignalR

In the meantime the DevTools log should fill up with interesting messages such as:

  • SignalR: Invoking resultshub.SendMessage
  • SignalR: Triggering client hub event ‘registerMessage’ on hub ‘ResultsHub’.
  • SignalR: webSockets reconnecting.

You’ll see this third message if you stop at the breakpoint to check out the available properties. As this artificially slows down the response time the web sockets connection is lost and then re-opened.

If you want to talk to yourself on two different screens then open up another browser window, Firefox or IE, and navigate to the same localhost address as in Chrome. Then start sending messages. You should see them in both windows:

Messages in two browser windows

So a message sent from one browser is immediately propagated to all listeners by SignalR with minimal code from the developer.

Read the next post in this series here.

View the list of posts on Messaging here.

Joining unique values from two sequences with the LINQ Union operator

Say you have two sequences of the same object type:

string[] first = new string[] {"hello", "hi", "good evening", "good day", "good morning", "goodbye" };
string[] second = new string[] {"whatsup", "how are you", "hello", "bye", "hi"};

You’d then like to join the two sequences containing the values from both but filtering out duplicates. Here’s how to achieve that with the first prototype of the LINQ Union operator:

IEnumerable<string> union = first.Union(second);
foreach (string value in union)
{
	Console.WriteLine(value);
}

You’ll see that “hello” and “hi” were filtered out from the second sequence as they already figure in the first. This version of the Union operator used a default comparer to compare the string values. As .NET has a good default comparer for strings you could rely on that to filter out duplicates.

However, if you have custom objects then .NET won’t automatically know how to compare them so the comparison will be based on reference equality which is not what you want. Say you have the following object:

public class Singer
{
	public int Id { get; set; }
	public string FirstName { get; set; }
	public string LastName { get; set; }
}

…and the following sequences:

IEnumerable<Singer> singersA = new List<Singer>() 
{
	new Singer(){Id = 1, FirstName = "Freddie", LastName = "Mercury"} 
	, new Singer(){Id = 2, FirstName = "Elvis", LastName = "Presley"}
	, new Singer(){Id = 3, FirstName = "Chuck", LastName = "Berry"}
				
};

IEnumerable<Singer> singersB = new List<Singer>() 
{
	new Singer(){Id = 1, FirstName = "Freddie", LastName = "Mercury"} 
	, new Singer(){Id = 2, FirstName = "Elvis", LastName = "Presley"}
	, new Singer(){Id = 4, FirstName = "Ray", LastName = "Charles"}
	, new Singer(){Id = 5, FirstName = "David", LastName = "Bowie"}
};

If you try the following:

IEnumerable<Singer> singersUnion = singersA.Union(singersB);
foreach (Singer s in singersUnion)
{
	Console.WriteLine(s.Id);
}

…then you’ll see that the duplicates weren’t in fact filtered out and that’s expected. This is where the second version of Union enters the picture where you can provide your custom comparer, like the following:

public class DefaultSingerComparer : IEqualityComparer<Singer>
{
	public bool Equals(Singer x, Singer y)
	{
		return x.Id == y.Id;
	}

	public int GetHashCode(Singer obj)
	{
		return obj.Id.GetHashCode();
	}
}

You can use this comparer as follows:

IEnumerable<Singer> singersUnion = singersA.Union(singersB, new DefaultSingerComparer());
foreach (Singer s in singersUnion)
{
	Console.WriteLine(s.Id);
}

Problem solved!

You can view all LINQ-related posts on this blog here.

Performing joins across two sequences with the LINQ Join operator

With the Join operator in LINQ you can perform joins similar to using the JOIN keyword in SQL: the result will be a join on two sequences based on some common key.

We’ll use the following data structures:

public class Singer
{
	public int Id { get; set; }
	public string FirstName { get; set; }
	public string LastName { get; set; }
}

public class Concert
{
	public int SingerId { get; set; }
	public int ConcertCount { get; set; }
	public int Year { get; set; }
}

public static IEnumerable<Singer> GetSingers()
{
	return new List<Singer>() 
	{
		new Singer(){Id = 1, FirstName = "Freddie", LastName = "Mercury"} 
		, new Singer(){Id = 2, FirstName = "Elvis", LastName = "Presley"}
		, new Singer(){Id = 3, FirstName = "Chuck", LastName = "Berry"}
		, new Singer(){Id = 4, FirstName = "Ray", LastName = "Charles"}
		, new Singer(){Id = 5, FirstName = "David", LastName = "Bowie"}
	};
}

public static IEnumerable<Concert> GetConcerts()
{
	return new List<Concert>()
	{
		new Concert(){SingerId = 1, ConcertCount = 53, Year = 1979}
		, new Concert(){SingerId = 1, ConcertCount = 74, Year = 1980}
		, new Concert(){SingerId = 1, ConcertCount = 38, Year = 1981}
		, new Concert(){SingerId = 2, ConcertCount = 43, Year = 1970}
		, new Concert(){SingerId = 2, ConcertCount = 64, Year = 1968}
		, new Concert(){SingerId = 3, ConcertCount = 32, Year = 1960}
		, new Concert(){SingerId = 3, ConcertCount = 51, Year = 1961}
		, new Concert(){SingerId = 3, ConcertCount = 95, Year = 1962}
		, new Concert(){SingerId = 4, ConcertCount = 42, Year = 1950}
		, new Concert(){SingerId = 4, ConcertCount = 12, Year = 1951}
		, new Concert(){SingerId = 5, ConcertCount = 53, Year = 1983}
	};
}

I think most LINQ operators are quite straightforward to use. Join is probably one of the more complex ones with its Func delegates. The signature of the operator looks as follows:

IEnumerable<TResult> result = IEnumerable<TOuter>.Join<TOuter, TInner, TKey, TResult>(IEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector);

Let’s see an example of how the singer and concert objects can be joined using the singer ids:

var singerConcerts = singers.Join(concerts, s => s.Id, c => c.SingerId, (s, c) => new
{
	Id = s.Id,
	SingerName = string.Concat(s.FirstName, " ", s.LastName),
	ConcertCount = c.ConcertCount,
	Year = c.Year
});

foreach (var res in singerConcerts)
{
	Console.WriteLine(string.Concat(res.Id, ": ", res.SingerName, ", ", res.Year, ", ", res.ConcertCount));
}

Join operator output

  • ‘singers’ is the outer sequence of the two sequences
  • ‘concerts’ will be the inner sequence
  • s – s.Id: the outer key selector, in this case the singer’s ID
  • c – c.SingerId: the inner key selector, in this case the SingerId secondary key of a Concert object
  • The last element is the result selector where we declare what type of object we want returned from the operation

The Join operator has an overload which accepts an EqualityComparer of TKey. In our case it’s not necessary as the TKey will be the singer ID, i.e. an integer, and .NET can easily compare those. However, if the comparison key is an object, then you can implement the IEqualityComparer interface. Here’s an example how to do that.

You can view all LINQ-related posts on this blog here.

Introduction to WebSockets with SignalR in .NET Part 2: code basics

Introduction

In the previous post of this series we looked at the foundations of WebSockets and SignalR. We’re now ready to look at some action.

Demo

I’ll build this demo on Visual Studio 2013 Express for Web. If you have the full suite of VS 2013 Pro then you’ll be fine too. The goal is to build two model applications with SignalR: a rudimentary chat site and an equally rudimentary stock ticker site where the current stock price is updated real-time based on what the server is reporting to it. Hopefully we’ll learn enough so that you can build your own scenario: show real-time data on performance data, sport scores, the number of people on Earth etc.

Open VS 2013 and select the ASP.NET Web Application template in the New Project window with .NET 4.5 as the underlying framework. Call it SignalRWeb, click OK and then pick the MVC template. On the same window click “Change Authentication” and select the No Authentication option:

Create Web Project

We don’t want to deal with authentication issues as it would only divert us from the main topic. Click OK to create the MVC app. SignalR is not added automatically to the project. Add the Microsoft.AspNet.SignalR NuGet package to the solution:

SignalR Nuget package

This will install a number of other NuGet packages and dependencies. You will be greeted with a readme.txt file in Visual Studio giving you hints about the initial steps. If you followed along the introductory posts on Owin then you’ll recognise the short sample code:

public class Startup
{
     public void Configuration(IAppBuilder app)
     {
         app.MapSignalR();
     }
}

We have the Startup class with the Configuration method that accepts an IAppBuilder object. We call an extension method on this app builder. If you don’t understand what this means then it’s best if you at least skim through the posts on OWIN first.

Normally if you create an MVC app with the default “Individual user Accounts” authentication option then the project starting point will already have Katana components in it that are related to authentication: OAuth – Facebook, Google, etc., or authentication using some other user store. However, we have no authentication at all in our project and there’s no OWIN Startup entry point class either. In fact, the Owin libraries were only added when we downloaded the SignalR NuGet package and its dependencies. This means we’ll need to create it ourselves, but that’s OK. Add a new class to the project called Startup.cs. Be sure to call it “Startup” and not “startup” or “StartUp” so that we follow the naming conventions. Let’s do exactly as it says in readme.txt and add the following code to Startup.cs:

public void Configuration(IAppBuilder app)
{
     app.MapSignalR();
}

You’ll need to import the Owin namespace. This extension will make sure that our app starts listening to requests. You’ll recall from the previous post that we’ll need to create at least one Hub that clients can connect to. Add a new folder called Hubs. In that folder add a new class. In the Add New Item window you’ll notice that there are some new template types under the Web/SignalR category:

SignalR hub class template

Select the Hub Class template. The Persistent Connection template is similar but you’ll need to deal with connections at a lower level whereas the Hub Class represents a higher level of abstraction. I’ve never used the Persistent Connection template but I believe it’s rather for experienced programmers who may want to control connections at a lower level for customisation purposes.

Call the class “ResultsHub.cs”. You’ll be provided a very short class implementation that derives from Hub:

public class ResultsHub : Hub
{
     public void Hello()
     {
         Clients.All.hello();
     }
}

Let’s explore this a little before we continue with our main goal. Clients.All represents all clients that have connected to this hub which has a simple hello() method. “All” is a dynamic property so you can attach any method name to it: elvisPresley(), mickeyMouse() etc., so don’t expect any IntelliSense there. hello() is therefore a dynamic method, you won’t find it through IntelliSense either. This bit of code means that each client should be contacted and a JavaScript function called “hello” should be invoked on each of them. Keep in mind that we’re programming a lot against JavaScript in SignalR hence the need for dynamic methods. IntelliSense cannot expect the exact JavaScript method names, right?

You don’t have to contact every caller. Start typing “Clients.” within the Hello function and you’ll see other options, e.g.:

  • Contact certain groups
  • Contact every client except some with certain connection IDs
  • One specific user

So you have some control over who will receive updates from this hub. We can send a greeting to the method if there’s a matching JavaScript method on the client side:

public class ResultsHub : Hub
{
     public void Hello()
     {
         Clients.All.hello("Welcome from SignalR!");
     }
}

Let’s see how this can be used in a view. Locate the Views/Index.cshtml view. Erase all code from it except:

@{
    ViewBag.Title = "Home Page";
}

We’ll need some JavaScript on this page. The SignalR NuGet package has installed a couple of jQuery libraries specific to SignalR. Insert the following script section stub to Index.cshtml:

@section scripts
{
<script src="~/Scripts/jquery.signalR-2.0.3.js"></script>
}

The easiest way to achieve this is to locate the jquery.signalR-2.x.x file in the Scripts folder which you can drag and drop within the curly braces of the scripts section. That will create the script tag for you automatically. This library has a dependency on jQuery but that’s already imported in Shared/_Layout.cshtml in a bundle. You’ll also find the scripts section declared in that common layout file which is why we can extend it in Index.cshtml.

We’ll also need some custom javascript. Add a new JS file into the Scripts folder called results.js. Reference it already now on the Index page:

@section scripts
{
    <script src="~/Scripts/jquery.signalR-2.0.3.js"></script>
    <script src="~/Scripts/results.js"></script>
}

We’ll also need knockout.js to make the updates on the page very responsive and fluent. It’s not currently available in our project so let’s add the following NuGet package:

knockout js NuGet package

Add it to the scripts section:

@section scripts
{
    <script src="~/Scripts/jquery.signalR-2.0.3.js"></script>
    <script src="~/Scripts/knockout-3.1.0.js"></script>
    <script src="~/Scripts/results.js"></script>
}

We’ve laid the foundations for our SignalR app. We’ll continue in the next post.

View the list of posts on Messaging here.

Projection in LINQ C# with the SelectMany operator

The SelectMany operator creates a one-to-many output projection sequence over an input sequence. SelectMany will return 0 or more output elements for every input element. Let’s see an example.

Data source:

string[] bands = { "ACDC", "Queen", "Aerosmith", "Iron Maiden", "Megadeth", "Metallica", "Cream", "Oasis", "Abba", "Blur", "Chic", "Eurythmics", "Genesis", "INXS", "Midnight Oil", "Kent", "Madness", "Manic Street Preachers"
, "Noir Desir", "The Offspring", "Pink Floyd", "Rammstein", "Red Hot Chili Peppers", "Tears for Fears"
, "Deep Purple", "KISS"};

Consider the following code:

IEnumerable<char> characters = bands.SelectMany(b => b.ToArray());
foreach (char item in characters)
{
	Console.WriteLine(item);
}

We provide a string input to SelectMany and apply the ToArray extension to it which will be an array of chars. So we have a single input – a string – and a collection of many elements as output – the sequence of characters from each band name.

Here’s a small part of the output:

SelectMany operator simple example

This is an interesting but probably not too useful application of SelectMany. We can do more interesting projections with it. Consider the following classes:

public class Singer
{
	public int Id { get; set; }
	public string FirstName { get; set; }
	public string LastName { get; set; }
}
public class Concert
{
	public int SingerId { get; set; }
	public int ConcertCount { get; set; }
	public int Year { get; set; }
}

We have the following data in the data collection:

public static IEnumerable<Singer> GetSingers()
{
	return new List<Singer>() 
	{
		new Singer(){Id = 1, FirstName = "Freddie", LastName = "Mercury"} 
		, new Singer(){Id = 2, FirstName = "Elvis", LastName = "Presley"}
		, new Singer(){Id = 3, FirstName = "Chuck", LastName = "Berry"}
		, new Singer(){Id = 4, FirstName = "Ray", LastName = "Charles"}
		, new Singer(){Id = 5, FirstName = "David", LastName = "Bowie"}
	};
}
public static IEnumerable<Concert> GetConcerts()
{
	return new List<Concert>()
	{
		new Concert(){SingerId = 1, ConcertCount = 53, Year = 1979}
		, new Concert(){SingerId = 1, ConcertCount = 74, Year = 1980}
		, new Concert(){SingerId = 1, ConcertCount = 38, Year = 1981}
		, new Concert(){SingerId = 2, ConcertCount = 43, Year = 1970}
		, new Concert(){SingerId = 2, ConcertCount = 64, Year = 1968}
		, new Concert(){SingerId = 3, ConcertCount = 32, Year = 1960}
		, new Concert(){SingerId = 3, ConcertCount = 51, Year = 1961}
		, new Concert(){SingerId = 3, ConcertCount = 95, Year = 1962}
		, new Concert(){SingerId = 4, ConcertCount = 42, Year = 1950}
		, new Concert(){SingerId = 4, ConcertCount = 12, Year = 1951}
		, new Concert(){SingerId = 5, ConcertCount = 53, Year = 1983}
	};
}

We can create a join with the SelectMany operator to see how many concerts each singer gave per year. Consider the following query:

IEnumerable<Singer> singers = GetSingers();
IEnumerable<Concert> concerts = GetConcerts();

var singerConcerts = singers.SelectMany(s => concerts.Where(c => c.SingerId == s.Id)
	.Select(c => new {Year = c.Year, ConcertCount = c.ConcertCount, Name = string.Concat(s.FirstName, " ", s.LastName) }));

foreach (var item in singerConcerts)
{
	Console.WriteLine(string.Concat(item.Name, ", ", item.Year, ", ", item.ConcertCount));
}

First every Singer object is passed into the lambda expression of SelectMany, which will be ‘s’ parameter. In the lambda expression we retrieve every Concert of each singer using the Where extension. We’re in effect joining the two data collections on the Id/SingerId properties. Finally we construct an anonymous object collection with the Select operator. Here’s the output:

SelectMany operator complex example

You can project the results into “real” objects as opposed to anonymous ones. We have the following object:

public class SingerConcert
{
	public string SingerName { get; set; }
	public int Year { get; set; }
	public int ConcertCount { get; set; }
}

Our query can be modified as follows to build a sequence of SingerConcert objects:

IEnumerable<Singer> singers = DemoCollections.GetSingers();
IEnumerable<Concert> concerts = DemoCollections.GetConcerts();

IEnumerable<SingerConcert> singerConcerts = singers.SelectMany(s => concerts.Where(c => c.SingerId == s.Id)
	.Select(c => new SingerConcert() { Year = c.Year, ConcertCount = c.ConcertCount, SingerName = string.Concat(s.FirstName, " ", s.LastName) }));

foreach (SingerConcert item in singerConcerts)
{
	Console.WriteLine(string.Concat(item.SingerName, ", ", item.Year, ", ", item.ConcertCount));
}

…which provides the same output as the anonymous class example above.

You can view all LINQ-related posts on this blog here.

Introduction to WebSockets with SignalR in .NET Part 1: the basics

Introduction

The normal communication flow between a browser and a web app on a server is quite limited: the client sends a HTTP request to the server and receives a HTTP response back. If the client needs some new information then it will need to send another request to the server. Normally a web server will not just send out HTTP responses to a client without a HTTP request first to act upon.

This is true even for pages where you view the same type of data which is updated periodically. A typical example is a sports site where you can view the current standing of a game. You as the viewer would probably like to have real time updates so that you can be happy when your team scores – or smash your smart phone against the wall if it’s losing.

There are various ways to solve this problem on a web site:

  • Put a big “Refresh” button on the page which requests and updated set of objects from the server – very straightforward, but not too convenient for the client
  • Put some kind of JavaScript timer function on the web page which periodically requests an update from the server and refreshes the page
  • Same as above but use an Ajax call to only refresh the page partially

…and possibly others.

The underlying method with all three solutions is that the browser will need to keep sending the same request to the server over and over again. Imagine that this site is very popular so their server receives tens of thousands of essentially the same request. The company will need to invest in extra infrastructure, such as a web farm, a load balancer, CDN etc. Wouldn’t it be easier if the browser could tell the server: “hey, server, I need updates from you on the same set of data periodically, thanks.” And then the communication channel is magically kept open between the client and the server. The server will be able to send updates over that channel automatically without waiting for new requests. This can potentially result in an alleviated server load, a very responsive web UI and significant savings for the company in terms of reduced infrastructure.

Web Sockets

These communication channels through Web Sockets have been available for some years now, so it’s not some kind of a dream. A web socket is a TCP socket connection between the client and the server. This socket is maintained between two parties. It is a two-way full duplex communication channel: the client can send data to the server and the server can send data back to the client over the same channel at any time.

The protocol for a web socket is “ws”, such as “ws://mysite.com”. Similarly a secure web socket follows the “wss” scheme. Note that not all browsers support web sockets. Check out the table on this web site to see which browsers support it.

SignalR is a technology that alleviates the burden of setting up a web app which uses web sockets. There are many things you need to take care of with web sockets communication: serialising data, deserialising data, maintaining the connection, processing messages that arrive in no particular order. SignalR simplifies the construction of sockets-aware web apps by hiding a lot of the underlying complexity. You as a developer can then concentrate on the fun parts instead of worrying too much about low level details. SignalR can even help older browsers “understand” web sockets by emulating its behaviour.

So what’s SignalR?

As hinted on above SignalR is a Microsoft framework that makes working with web sockets easier for the programmer. You can make it available in your web project by downloading the Microsoft.AspNet.SignalR NuGet package. The most recent version at the time of writing this post is 2.0.3:

SignalR Nuget package

By the time you read this post there might be an updated package. If you see that some functionality described in this blog series is not compatible with a newer version then please comment about it and then I’ll update the posts.

As we said before Web Sockets can be quite challenging to work with if you try to do so using some low-level JavaScript package. WebSockets is not the only type of connection that enables real-time two-way communication between the client and the server. Here come some more:

SignalR will test all 4 of these until it finds one that’s available. It always tries WebSockets first as it is the the most efficient one of the 4 as far as overhead is concerned. Normally if the browser supports WebSockets then it can be used for communication. However, this is not always the case. In that case it will test the other three one after the other.

For WebSockets to succeed then even the web server will need to support this technology. You are presumably a .NET web developer so chances are high that you’ll deploy your web app on IIS. Only IIS8 and above will support WebSockets. I have Window8.1 on the PC I’m building the demo on but IIS8 should be available on Windows8 as well. The built-in IIS Express server of Visual Studio 2013 supports WebSockets. If you’d like to deploy and test your web app locally on your PC then search for IIS on your PC. If you don’t find it then you’ll need to enable it first:

Turn windows features on and off

Enable IIS

Also enable the WebSocket feature:

Enable WebSockets

Start IIS and check its version:

Check IIS version

If you’re deploying your app an Windows Server then you’ll need Windows Server 2012 and you’ll need to enable the WebSocket feature as shown above.

The SignalR NuGet package will enable us to work with both the client side and server side part of the communication. On the server side the most important component we’ll work with is called a Hub which is a class that hides the implementation details of the communication on the server side. On the client side we’ll need some JavaScript code that can be called by SignalR to connect to this Hub.

In the next post we’ll start our demo.

View the list of posts on Messaging here.

Projection in LINQ C# with the Select operator

You can use the Select() extension method in LINQ to create an output of type T from an input sequence of type other than T. Let’s see some examples:

Source data:

string[] bands = { "ACDC", "Queen", "Aerosmith", "Iron Maiden", "Megadeth", "Metallica", "Cream", "Oasis", "Abba", "Blur" , "Chic", "Eurythmics", "Genesis", "INXS", "Midnight Oil", "Kent", "Madness", "Manic Street Preachers", "Noir Desir", "The Offspring", "Pink Floyd", "Rammstein", "Red Hot Chili Peppers", "Tears for Fears", "Deep Purple", "KISS"};

Say you want to collect the lengths of each string in the array:

IEnumerable<int> lengths = bands.Select(b => b.Length);
foreach (int l in lengths)
{
	Console.WriteLine(l);
}

Select operator simple output

You can also project to a sequence of anonymous objects…:

var customObjects = bands.Select(b => new { Name = b, Length = b.Length });
foreach (var item in customObjects)
{
	Console.WriteLine("Band name: {0}, length: {1}", item.Name, item.Length);
}

Select operator extended output

…or to different concrete objects:

public class Band
{
	public string Name { get; set; }
	public int NameLength { get; set; }
	public string AllCapitals { get; set; }
}

IEnumerable<Band> bandList = bands.Select(b => new Band() { AllCapitals = b.ToUpper(), Name = b, NameLength = b.Length });
foreach (Band band in bandList)
{
	Console.WriteLine(string.Concat(band.Name, ", ", band.NameLength, ", ", band.AllCapitals));
}

Select operator custom object output

An overload of Select() allows us to read an index value:

public class Band
{
	public string Name { get; set; }
	public int NameLength { get; set; }
	public string AllCapitals { get; set; }
	public int BandIndex { get; set; }
}

IEnumerable<Band> bandList = bands.Select((b, i) => new Band() { AllCapitals = b.ToUpper(), BandIndex = i + 1, Name = b, NameLength = b.Length });
foreach (Band band in bandList)
{
	Console.WriteLine(string.Concat(band.BandIndex, ": ", band.Name, ", ", band.NameLength, ", ", band.AllCapitals));
}

Select operator indexed object output

You can view all LINQ-related posts on this blog here.

Convert array of strings to integers with C# LINQ

Consider the following array of strings:

string[] numbersAsString = new string[] { "3", "1", "2", "4" };

You can easily convert each member and store them in an integer array with just one line of code:

int[] numbersAsInt = numbersAsString.Select(s => int.Parse(s)).ToArray();

You can sort the integers too:

int[] numbersAsInt = numbersAsString.Select(s => int.Parse(s)).OrderBy(s => s).ToArray();

You can view all LINQ-related posts on this blog here.

Messaging with RabbitMQ and .NET C# part 5: headers and scatter/gather

Introduction

In the previous post on RabbitMQ .NET we looked at the Routing and Topics exchange patterns. In this post we’ll continue looking at RabbitMQ in .NET. In particular we’ll talk about routing messages using the following two patterns:

  • Headers
  • Scatter/gather

We’ll use the demo application we’ve been working on in this series so have it ready in Visual Studio. Also, log onto the RabbitMQ management console on http://localhost:15672/

Most of the posts on RabbitMQ on this blog are based on the work of RabbitMQ guru Michael Stephenson.

Headers

The Headers exchange pattern is very similar to Topics we saw in the previous part of this series. The sender sends a message of type Headers to RabbitMQ. The message is routed based on the header value. All queues with a matching key will receive the message. We’ll dedicate an exchange to deliver the messages but the routing key will be ignored as it is the headers that will be the basis for the match. We can specify more than one header and a rule that says if all headers must match or just one using the “x-match” property which can have 2 values: “any” or “all”. The default value of this property is “all” so all headers must match for a queue to receive a message.

We’ll create one dedicated exchange and three queues. Add a new Console app to the solution called HeadersSender. Like before, add references to the RabbitMQ NuGet package and the RabbitMqService library in the solution. Insert the following code to Main:

AmqpMessagingService messagingService = new AmqpMessagingService();
IConnection connection = messagingService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
messagingService.SetUpExchangeAndQueuesForHeadersDemo(model);

…where SetUpExchangeAndQueuesForHeadersDemo in AmqpMessagingService looks like this:

public void SetUpExchangeAndQueuesForHeadersDemo(IModel model)
{
	model.ExchangeDeclare(_headersExchange, ExchangeType.Headers, true);
	model.QueueDeclare(_headersQueueOne, true, false, false, null);
	model.QueueDeclare(_headersQueueTwo, true, false, false, null);
	model.QueueDeclare(_headersQueueThree, true, false, false, null);
			
	Dictionary<string,object> bindingOneHeaders = new Dictionary<string,object>();
	bindingOneHeaders.Add("x-match", "all");
	bindingOneHeaders.Add("category", "animal");
	bindingOneHeaders.Add("type", "mammal");
	model.QueueBind(_headersQueueOne, _headersExchange, "", bindingOneHeaders);

	Dictionary<string, object> bindingTwoHeaders = new Dictionary<string, object>();
	bindingTwoHeaders.Add("x-match", "any");
	bindingTwoHeaders.Add("category", "animal");
	bindingTwoHeaders.Add("type", "insect");
	model.QueueBind(_headersQueueTwo, _headersExchange, "", bindingTwoHeaders);

	Dictionary<string, object> bindingThreeHeaders = new Dictionary<string, object>();
	bindingThreeHeaders.Add("x-match", "any");
	bindingThreeHeaders.Add("category", "plant");
	bindingThreeHeaders.Add("type", "flower");
	model.QueueBind(_headersQueueThree, _headersExchange, "", bindingThreeHeaders);
}

The following private fields will be necessary as well:

private string _headersExchange = "HeadersExchange";
private string _headersQueueOne = "HeadersQueueOne";
private string _headersQueueTwo = "HeadersQueueTwo";
private string _headersQueueThree = "HeadersQueueThree";

We specify the headers in a dictionary. The first dictionary means that the queue will be interested in messages with headers of category = animal and type = mammal. The x-match property of “all” indicates that the queue wants to see both headers. You can probably understand the other two header bindings. As the default value of the x-match header is “all”, we could ignore adding that header but I prefer to be explicit in a demo like this.

Set HeadersSender as the start up project and start the application. Check in the RabbitMQ management UI whether the exchange and the queues have been set up correctly. Check the bindings on the exchange as well, you should see the correct header values.

Comment out the call to messagingService.SetUpExchangeAndQueuesForHeadersDemo. Back in AmqpMessageService.cs add the following method to send a message with headers:

public void SendHeadersMessage(string message, Dictionary<string,object> headers, IModel model)
{
	IBasicProperties basicProperties = model.CreateBasicProperties();
	basicProperties.SetPersistent(_durable);
	basicProperties.Headers = headers;
	byte[] messageBytes = Encoding.UTF8.GetBytes(message);
	model.BasicPublish(_headersExchange, "", basicProperties, messageBytes);
}

In HeadersSender.cs insert the following private method which reads the header values using delimiters and calls upon the SendHeadersMessage method:

private static void RunHeadersDemo(IModel model, AmqpMessagingService messagingService)
{
	Console.WriteLine("Enter your message as follows: the header values for 'category' and 'type separated by a colon. Then put a semicolon, and then the message. Quit with 'q'.");
	while (true)
	{
		string fullEntry = Console.ReadLine();
		string[] parts = fullEntry.Split(new char[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
		string headers = parts[0];
		string[] headerValues = headers.Split(new char[] { ',' }, StringSplitOptions.RemoveEmptyEntries);
		Dictionary<string, object> headersDictionary = new Dictionary<string, object>();
		headersDictionary.Add("category", headerValues[0]);
		headersDictionary.Add("type", headerValues[1]);
		string message = parts[1];
		if (message.ToLower() == "q") break;
		messagingService.SendHeadersMessage(message, headersDictionary, model);
	}
}

Add a call to this private method from Main:

RunHeadersDemo(model, messagingService);

It’s time to set up the receivers. They will be very similar to what we have seen before. In preparation for the receiver projects insert the following three methods into AmqpMessagingService.cs:

public void ReceiveHeadersMessageReceiverOne(IModel model)
{
	model.BasicQos(0, 1, false);
	Subscription subscription = new Subscription(model, _headersQueueOne, false);
	while (true)
	{
		BasicDeliverEventArgs deliveryArguments = subscription.Next();
		StringBuilder messageBuilder = new StringBuilder();
		String message = Encoding.UTF8.GetString(deliveryArguments.Body);
		messageBuilder.Append("Message from queue: ").Append(message).Append(". ");
		foreach (string headerKey in deliveryArguments.BasicProperties.Headers.Keys)
		{
			byte[] value = deliveryArguments.BasicProperties.Headers[headerKey] as byte[];
			messageBuilder.Append("Header key: ").Append(headerKey).Append(", value: ").Append(Encoding.UTF8.GetString(value)).Append("; ");
		}
		
		Console.WriteLine(messageBuilder.ToString());
		subscription.Ack(deliveryArguments);
	}
}

public void ReceiveHeadersMessageReceiverTwo(IModel model)
{
	model.BasicQos(0, 1, false);
	Subscription subscription = new Subscription(model, _headersQueueTwo, false);
	while (true)
	{
		BasicDeliverEventArgs deliveryArguments = subscription.Next();
		StringBuilder messageBuilder = new StringBuilder();
		String message = Encoding.UTF8.GetString(deliveryArguments.Body);
		messageBuilder.Append("Message from queue: ").Append(message).Append(". ");
		foreach (string headerKey in deliveryArguments.BasicProperties.Headers.Keys)
		{
			byte[] value = deliveryArguments.BasicProperties.Headers[headerKey] as byte[];
			messageBuilder.Append("Header key: ").Append(headerKey).Append(", value: ").Append(Encoding.UTF8.GetString(value)).Append("; ");
		}

		Console.WriteLine(messageBuilder.ToString());
		subscription.Ack(deliveryArguments);
	}
}

public void ReceiveHeadersMessageReceiverThree(IModel model)
{
	model.BasicQos(0, 1, false);
	Subscription subscription = new Subscription(model, _headersQueueThree, false);
	while (true)
	{
		BasicDeliverEventArgs deliveryArguments = subscription.Next();
		StringBuilder messageBuilder = new StringBuilder();
		String message = Encoding.UTF8.GetString(deliveryArguments.Body);
		messageBuilder.Append("Message from queue: ").Append(message).Append(". ");
		foreach (string headerKey in deliveryArguments.BasicProperties.Headers.Keys)
		{
			byte[] value = deliveryArguments.BasicProperties.Headers[headerKey] as byte[];
			messageBuilder.Append("Header key: ").Append(headerKey).Append(", value: ").Append(Encoding.UTF8.GetString(value)).Append("; ");
		}
        	Console.WriteLine(messageBuilder.ToString());
		subscription.Ack(deliveryArguments);
	}
}

The only new bit of code is that we’re extracting the header values from the incoming payload. Otherwise the code should be very familiar by now.

Add three new console applications to the solution: HeadersReceiverOne, HeadersReceiverTwo, HeadersReceiverThree. Add references to the RabbitMQ NuGet package and the RabbitMqService library in all three. Insert the following bits of code…:

…to HeadersReceiverOne.Main:

AmqpMessagingService messagingService = new AmqpMessagingService();
IConnection connection = messagingService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
messagingService.ReceiveHeadersMessageReceiverOne(model);

…to HeadersReceiverTwo.Main:

AmqpMessagingService messagingService = new AmqpMessagingService();
IConnection connection = messagingService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
messagingService.ReceiveHeadersMessageReceiverTwo(model);

…and to HeadersReceiverThree.Main:

AmqpMessagingService messagingService = new AmqpMessagingService();
IConnection connection = messagingService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
messagingService.ReceiveHeadersMessageReceiverThree(model);

Perform these steps to run all relevant console apps:

  1. Make sure that HeadersSender is set as the start up project and start the application
  2. Start the receivers by right-clicking them on Visual Studio and selecting Debug, Start new instance
  3. You should have 4 console windows up and running on your screen

Start sending messages from the HeadersSender. Be careful with the delimiters: ‘,’ for the headers and ‘;’ for the message. The message should be routed according to the specified routing rules:

Headers MEP console

Scatter/gather

This pattern is similar to the RPC message exchange pattern we saw in a previous post of this series in that the sender will be expecting a response from the receiver. The main difference is that in this scenario the sender can collect a range of responses from various receivers. The sender will set up a temporary response queue where the receivers can send their responses. It’s possible to implement this pattern using any exchange type: fanout, direct, headers and topic depending on how you’ve set up the exchange/queue binding. You can also specify a routing key in the binding as we saw before.

I think this is definitely a message exchange pattern which can be widely used in real applications out there that require 2 way communication with more than 2 parties. Consider that you send out a request to construction companies asking for a price offer. The companies then can respond using the message broker and the temporary response queue.

We’ll re-use several ideas and bits of code from the RPC pattern so make sure you understand the basics of that MEP as well. I won’t explain the same ideas again.

Let’s set up the exchange and the queue first as usual. Insert the following private fields to AmqpMessagingService.cs:

private string _scatterGatherExchange = "ScatterGatherExchange";
private string _scatterGatherReceiverQueueOne = "ScatterGatherReceiverQueueOne";
private string _scatterGatherReceiverQueueTwo = "ScatterGatherReceiverQueueTwo";
private string _scatterGatherReceiverQueueThree = "ScatterGatherReceiverQueueThree";

The following method in AmqpMessagingService.cs will set up the necessary pieces:

public void SetUpExchangeAndQueuesForScatterGatherDemo(IModel model)
{
	model.ExchangeDeclare(_scatterGatherExchange, ExchangeType.Topic, true);
	model.QueueDeclare(_scatterGatherReceiverQueueOne, true, false, false, null);
	model.QueueDeclare(_scatterGatherReceiverQueueTwo, true, false, false, null);
	model.QueueDeclare(_scatterGatherReceiverQueueThree, true, false, false, null);

	model.QueueBind(_scatterGatherReceiverQueueOne, _scatterGatherExchange, "cars");
	model.QueueBind(_scatterGatherReceiverQueueOne, _scatterGatherExchange, "trucks");

	model.QueueBind(_scatterGatherReceiverQueueTwo, _scatterGatherExchange, "cars");
	model.QueueBind(_scatterGatherReceiverQueueTwo, _scatterGatherExchange, "aeroplanes");
	model.QueueBind(_scatterGatherReceiverQueueTwo, _scatterGatherExchange, "buses");

	model.QueueBind(_scatterGatherReceiverQueueThree, _scatterGatherExchange, "cars");
	model.QueueBind(_scatterGatherReceiverQueueThree, _scatterGatherExchange, "buses");
	model.QueueBind(_scatterGatherReceiverQueueThree, _scatterGatherExchange, "tractors");
}

You’ll notice that we are going to go for the Topic exchange type and that we’ll bind 3 queues to the exchange. The routing keys will tell you what each receiver is interested in. E.g. all queues will receive a message with a routing key of “cars”.

Add a new Console application called ScatterGatherSender to the solution. Add a reference to the RabbitMQ NuGet package and the RabbitMqService library. Insert the following code to Main:

AmqpMessagingService messagingService = new AmqpMessagingService();
IConnection connection = messagingService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
messagingService.SetUpExchangeAndQueuesForScatterGatherDemo(model);

Set ScatterGatherSender as the start up project and run the application. Check in the RabbitMQ console that all elements have been set up correctly. Comment out the call to messagingService.SetUpExchangeAndQueuesForScatterGatherDemo.

Next we’ll set up the message sending logic in AmqpMessagingService.cs. Like in RPC we’ll need a queue that the Sender will dynamically set up. Insert the following private fields in AmqpMessagingService:

private QueueingBasicConsumer _scatterGatherConsumer;
private string _scatterGatherResponseQueue;

The following method will take care of sending the message to the exchange and collect the responses from the receivers:

public List<string> SendScatterGatherMessageToQueues(string message, IModel model, TimeSpan timeout, string routingKey, int minResponses)
{
	List<string> responses = new List<string>();
	if (string.IsNullOrEmpty(_scatterGatherResponseQueue))
	{
		_scatterGatherResponseQueue = model.QueueDeclare().QueueName;
	}

	if (_scatterGatherConsumer == null)
	{
		_scatterGatherConsumer = new QueueingBasicConsumer(model);
		model.BasicConsume(_scatterGatherResponseQueue, true, _scatterGatherConsumer);
	}

	string correlationId = Guid.NewGuid().ToString();
	IBasicProperties basicProperties = model.CreateBasicProperties();
	basicProperties.ReplyTo = _scatterGatherResponseQueue;
	basicProperties.CorrelationId = correlationId;

	byte[] messageBytes = Encoding.UTF8.GetBytes(message);
	model.BasicPublish(_scatterGatherExchange, routingKey, basicProperties, messageBytes);
			
	DateTime timeoutDate = DateTime.UtcNow + timeout;
	while (DateTime.UtcNow <= timeoutDate)
	{
		BasicDeliverEventArgs deliveryArguments;
		_scatterGatherConsumer.Queue.Dequeue(500, out deliveryArguments);
		if (deliveryArguments != null && deliveryArguments.BasicProperties != null
			&& deliveryArguments.BasicProperties.CorrelationId == correlationId)
		{
			string response = Encoding.UTF8.GetString(deliveryArguments.Body);
			responses.Add(response);
			if (responses.Count >= minResponses)
			{
				break;
			}
		}
	}

	return responses;
}

This piece of code looks very much like what we saw with the RPC pattern. The first key difference is that we need to wait for a range of responses, not just a single one, hence the return type of List of string. The purpose of the minResponse input parameter is that in practice the sender will probably not know how many responses it could receive so it specifies a minimum. The Dequeue() method has an interesting overload for a scenario where the sender doesn’t know how long it can take for each receiver to respond:

Dequeue(int millisecondsTimeout, out BasicDeliverEventArgs eventArgs);

If the timeout is passed then the BasicDeliverEventArgs eventArgs out parameter will be null, so we effectively ignore all responses that came in after the timeout. In the RPC example code we didn’t specify any such timeout so the Dequeue() code will block the code execution until there’s a message. In reality the sender could wait for a long time or even for ever to get a response so a timeout parameter can be very useful. Imagine that the sender specifies a min response count of 5 and only 3 responses are received. Then without a timeout parameter in Dequeue the sender would have to wait for ever which is not optimal. Instead we periodically check the queue, wait for 500 milliseconds and then try again until the timeOut date parameter is up. If the response count reaches the minimum before that then the response list is returned. Otherwise a shorter list will be returned. The sender can of course omit a minimum response count and simply wait until the timeout has been passed. This simulates the scenario where applicants are allowed to participate in an open tender until some specified deadline and the number of applications can be anything from 0 to int.MaxValue.

This method can be called from ScatterGatherSender as follows:

private static void RunScatterGatherDemo(IModel model, AmqpMessagingService messagingService)
{
	Console.WriteLine("Enter your message as follows: the routing key, followed by a semicolon, and then the message. Quit with 'q'.");
	while (true)
	{
		string fullEntry = Console.ReadLine();
		string[] parts = fullEntry.Split(new char[] { ';' }, StringSplitOptions.RemoveEmptyEntries);
		string key = parts[0];
		string message = parts[1];
		if (message.ToLower() == "q") break;
		List<string> responses = messagingService.SendScatterGatherMessageToQueues(message, model, TimeSpan.FromSeconds(20), key, 3);
		Console.WriteLine("Received the following messages: ");
		foreach (string response in responses)
		{
			Console.WriteLine(response);
		}
	}
}

So the receivers will have 20 seconds to respond.

Call this private method from Main:

RunScatterGatherDemo(model, messagingService);

Back in AmqpMessagingService.cs we’ll prepare the code which will receive the scatter/gather messages and send the responses from the receivers. The code is actually identical to ReceiveRpcMessage(IModel model) we saw earlier so I won’t explain it again:

public void ReceiveScatterGatherMessageOne(IModel model)
{
	ReceiveScatterGatherMessage(model, _scatterGatherReceiverQueueOne);
}

public void ReceiveScatterGatherMessageTwo(IModel model)
{
	ReceiveScatterGatherMessage(model, _scatterGatherReceiverQueueTwo);
}

public void ReceiveScatterGatherMessageThree(IModel model)
{
	ReceiveScatterGatherMessage(model, _scatterGatherReceiverQueueThree);
}

private void ReceiveScatterGatherMessage(IModel model, string queueName)
{
	model.BasicQos(0, 1, false);
	QueueingBasicConsumer consumer = new QueueingBasicConsumer(model);
	model.BasicConsume(queueName, false, consumer);
	while (true)
	{
		BasicDeliverEventArgs deliveryArguments = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
		string message = Encoding.UTF8.GetString(deliveryArguments.Body);
		Console.WriteLine("Message: {0} ; {1}", message, " Enter your response: ");
		string response = Console.ReadLine();
		IBasicProperties replyBasicProperties = model.CreateBasicProperties();
		replyBasicProperties.CorrelationId = deliveryArguments.BasicProperties.CorrelationId;
		byte[] responseBytes = Encoding.UTF8.GetBytes(response);
		model.BasicPublish("", deliveryArguments.BasicProperties.ReplyTo, replyBasicProperties, responseBytes);
		model.BasicAck(deliveryArguments.DeliveryTag, false);
	}
}

Insert threw new Console applications: ScatterGatherReceiverOne, ScatterGatherReceiverTwo, ScatterGatherReceiverThree. Add references to the RabbitMQ NuGet package and the RabbitMqService library to all 3. Insert the following bits of code.

To ScatterGatherReceiverOne.Main:

AmqpMessagingService messagingService = new AmqpMessagingService();
IConnection connection = messagingService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
messagingService.ReceiveScatterGatherMessageOne(model);

…to ScatterGatherReceiverTwo.Main:

AmqpMessagingService messagingService = new AmqpMessagingService();
IConnection connection = messagingService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
messagingService.ReceiveScatterGatherMessageTwo(model);

…and to ScatterGatherReceiverThree.Main:

AmqpMessagingService messagingService = new AmqpMessagingService();
IConnection connection = messagingService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
messagingService.ReceiveScatterGatherMessageThree(model);

Follow these steps to start the demo:

  1. Make sure that ScatterGatherSender is set as the start up project and start the application
  2. Start all 3 receivers by the usual technique: right-click in VS, Debug, Start new instance
  3. You’ll have 4 console windows up and running on your screen

Start sending messages from the Sender. Take care when entering the message so you delimit the routing key and the message:

scatter gather console

Read the next part of this series here.

View the list of posts on Messaging here.

Parallel LINQ in .NET C#: using AsUnordered()

In this post we saw how to keep the order of item processing in a parallel query using the AsOrdered() extension method. We also mentioned that this comes at a slight performance cost.

The effect of using AsOrdered() in combined queries is that the order is restored at every step in the query. The performance cost of restoring the order occurs multiple times:

int[] integerList = new int[100];
for (int i = 0; i < integerList.Length; i++)
{
	integerList[i] = i;
}

var result =
	integerList.AsParallel().AsOrdered()
	.Take(10)
	.Select(item => new
	{
		SourceValue = item,
		ResultValue = Math.Pow(item, 2)
	});

foreach (var v in result)
{
	Console.WriteLine("Source {0}, Result {1}",
		v.SourceValue, v.ResultValue);
}

In this example the first 10 items are taken after AsOrdered() is called. AsOrdered() imposes the ordering on the Select subquery as well. Therefore ordering will be performed once again on a subquery that can be performed without ordering. This is not too efficient.

The solution comes with the AsUnordered() extension method:

int[] integerList = new int[100];
for (int i = 0; i < integerList.Length; i++)
{
	integerList[i] = i;
}

var result =
	integerList.AsParallel().AsOrdered()
	.Take(10).AsUnordered()
	.Select(item => new
	{
		SourceValue = item,
		ResultValue = Math.Pow(item, 2)
	});

foreach (var v in result)
{
	Console.WriteLine("Source {0}, Result {1}",
		v.SourceValue, v.ResultValue);
}

AsUnordered notifies PLINQ that ordering is not important. The first ten items are taken from the data source after the AsParallel() extension as before. The taken items will then be the basis for the Select subquery which can be performed without ordering.

Conclusion: the AsOrdered and AsUnordered extension methods are applied to all subqueries. In the above example if the Select query is followed by other queries then they will be unordered as well.

View the list of posts on the Task Parallel Library here.

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

ARCHIVED: Bite-size insight on Cyber Security for the not too technical.