RabbitMQ in .NET C#: basic error handling in Receiver

Introduction

This post builds upon the basics of RabbitMQ in .NET. If you are new to this topic you should check out all the previous posts listed on this page. I won’t provide any details on bits of code that we’ve gone through before.

Most of the posts on RabbitMQ on this blog are based on the work of RabbitMQ guru Michael Stephenson.

It can happen that the Receiver is unable to process a message it has received from the message queue.

In some cases the receiver may not be able to accept an otherwise well-formed message. That message needs to be put back into the queue for later re-processing.

There’s also a case where processing a message throws an exception every time the receiver tries to process it. It will keep putting the message back to the queue only to receive the same exception over and over again. This also blocks the other messages from being processed. We call such a message a Poison Message.

In a third scenario the Receiver simply might not understand the message. It is malformed, contains unexpected properties etc.

The receiver can follow 2 basic strategies: retry processing the message or discard it after the first exception. Both options are easy to implement with RabbitMQ .NET.

Demo

If you’ve gone through the other posts on RabbitMQ on this blog then you’ll have a Visual Studio solution ready to be extended. Otherwise just create a new blank solution in Visual Studio 2012 or 2013. Add a new solution folder called FailingMessages to the solution. In that solution add the following projects:

  • A console app called BadMessageReceiver
  • A console app called BadMessageSender
  • A C# library called MessageService

Add the following NuGet package to all three projects:

RabbitMQ new client package NuGet

Add a project reference to MessageService from BadMessageReceiverand BadMessageSender. Add a class called RabbitMqService to MessageService with the following code to set up the connection with the local RabbitMQ instance:

public class RabbitMqService
{
		private string _hostName = "localhost";
		private string _userName = "guest";
		private string _password = "guest";

		public static string BadMessageBufferedQueue = "BadMessageQueue";

		public IConnection GetRabbitMqConnection()
		{
			ConnectionFactory connectionFactory = new ConnectionFactory();
			connectionFactory.HostName = _hostName;
			connectionFactory.UserName = _userName;
			connectionFactory.Password = _password;

			return connectionFactory.CreateConnection();
		}
}

Let’s set up the queue. Add the following code to Main of BadMessageSender:

RabbitMqService rabbitMqService = new RabbitMqService();
IConnection connection = rabbitMqService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
model.QueueDeclare(RabbitMqService.BadMessageBufferedQueue, true, false, false, null);

Run the Sender project. Check in the RabbitMq management console that the queue has been set up.

Comment out the call to model.QueueDeclare, we won’t need it.

Add the following code in Program.cs of the Sender:

private static void RunBadMessageDemo(IModel model)
{
	Console.WriteLine("Enter your message. Quit with 'q'.");
	while (true)
	{
		string message = Console.ReadLine();
		if (message.ToLower() == "q") break;
		IBasicProperties basicProperties = model.CreateBasicProperties();
		basicProperties.SetPersistent(true);
		byte[] messageBuffer = Encoding.UTF8.GetBytes(message);
		model.BasicPublish("", RabbitMqService.BadMessageBufferedQueue, basicProperties, messageBuffer);
	}
}

This is probably the most basic message sending logic available in RabbitMQ .NET. Insert a call to this method from Main.

Now let’s turn to the Receiver. Add the following code to Main in Program.cs of BadMessageReceiver:

RabbitMqService messageService = new RabbitMqService();
IConnection connection = messageService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
ReceiveBadMessages(model);

…where ReceiveBadMessages looks as follows:

private static void ReceiveBadMessages(IModel model)
{
	model.BasicQos(0, 1, false);
	QueueingBasicConsumer consumer = new QueueingBasicConsumer(model);
	model.BasicConsume(RabbitMqService.BadMessageBufferedQueue, false, consumer);
	while (true)
	{
		BasicDeliverEventArgs deliveryArguments = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
		String message = Encoding.UTF8.GetString(deliveryArguments.Body);
		Console.WriteLine("Message from queue: {0}", message);
		Random random = new Random();
		int i = random.Next(0, 2);

		//pretend that message cannot be processed and must be rejected
		if (i == 1) //reject the message and discard completely
		{
			Console.WriteLine("Rejecting and discarding message {0}", message);
			model.BasicReject(deliveryArguments.DeliveryTag, false);
		}
		else //reject the message but push back to queue for later re-try
		{
			Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
			model.BasicReject(deliveryArguments.DeliveryTag, true);
		}
	}
}

The only new bit compared to the basics is the BasicReject method. It accepts the delivery tag and a boolean parameter. If that’s set to false then the message is sent back to RabbitMQ which in turn will discard it, i.e. the message is not re-entered into the queue. Else if it’s true then the message is put back into the queue for a retry.

Let’s run the demo. Start the Sender app first. Then right-click the Receiver app in VS, select Debug and Run new instance. You’ll have two console windows up and running. Start sending messages from the Sender. Depending on the outcome of the random integer on the Receiver side you should see an output similar to this one:

Basic retry console output 1

In the above case the following has happened:

  • Message “hello” was received and immediately discarded
  • Same happened to “hello again”
  • Message “bye” was put back into the queue several times before it was finally discarded – see the output below

Basic retry console output 2

Note that I didn’t type “bye” multiple times. The reject-requeue-retry cycle was handled automatically.

The message “bye” in this case was an example of a Poison Message. In the code it was eventually rejected because the random number generator produced a 0.

This strategy was OK for demo purposes but you should do something more sophisticated in a real project. You can’t just rely on random numbers. On the other hand if you don’t build in any mechanism to finally discard a message then it will just keep coming back to the receiver. That will cause a “traffic jam” in the message queue as all messages will keep waiting to be delivered.

We’ll look at some other strategies in the next post.

View the list of posts on Messaging here.

Joining common values from two sequences using the LINQ Intersect operator

Say you have the following two sequences:

string[] first = new string[] {"hello", "hi", "good evening", "good day", "good morning", "goodbye" };
string[] second = new string[] {"whatsup", "how are you", "hello", "bye", "hi"};

If you’d like to find the common elements in the two arrays and put them to another sequence then it’s very easy with the Intersect operator:

IEnumerable<string> intersect = first.Intersect(second);
foreach (string value in intersect)
{
	Console.WriteLine(value);
}

The ‘intersect’ variable will include “hello” and “hi” as they are common elements to both arrays.

The intersect operator uses a comparer to determine whether two elements are equal. In this case .NET has a built-in default comparer to compare strings so you didn’t have to implement any custom comparer. However, if you have custom objects in the two arrays then the default object reference comparer won’t be enough:

public class Singer
{
	public int Id { get; set; }
	public string FirstName { get; set; }
	public string LastName { get; set; }
}

IEnumerable<Singer> singersA = new List<Singer>() 
{
	new Singer(){Id = 1, FirstName = "Freddie", LastName = "Mercury"} 
	, new Singer(){Id = 2, FirstName = "Elvis", LastName = "Presley"}
	, new Singer(){Id = 3, FirstName = "Chuck", LastName = "Berry"}

};

IEnumerable<Singer> singersB = new List<Singer>() 
{
	new Singer(){Id = 1, FirstName = "Freddie", LastName = "Mercury"} 
	, new Singer(){Id = 2, FirstName = "Elvis", LastName = "Presley"}
	, new Singer(){Id = 4, FirstName = "Ray", LastName = "Charles"}
	, new Singer(){Id = 5, FirstName = "David", LastName = "Bowie"}
};

IEnumerable<Singer> singersIntersection = singersA.Intersect(singersB);
foreach (Singer s in singersIntersection)
{
	Console.WriteLine(s.Id);
}

The singersIntersection sequence will be empty of course as each object is different as far as their references are concerned. This is where another prototype of the operator enters the scene where you can define your own comparison function:

public class DefaultSingerComparer : IEqualityComparer<Singer>
{
	public bool Equals(Singer x, Singer y)
	{
		return x.Id == y.Id;
	}

	public int GetHashCode(Singer obj)
	{
		return obj.Id.GetHashCode();
	}
}

So we say that singerA == singerB if their IDs are equal. You can use this comparer as follows:

IEnumerable<Singer> singersIntersection = singersA.Intersect(singersB, new DefaultSingerComparer());
foreach (Singer s in singersIntersection)
{
	Console.WriteLine(s.Id);
}

singersIntersection will now include singers #1 and #2.

You can view all LINQ-related posts on this blog here.

RabbitMQ in .NET: handling large messages

Introduction

This post builds upon the basics of RabbitMQ in .NET. If you are new to this topic you should check out all the previous posts listed on this page. I won’t provide any details on bits of code that we’ve gone through before.

Most of the posts on RabbitMQ on this blog are based on the work of RabbitMQ guru Michael Stephenson.

Messaging systems that handle very large amounts of messages per second are normally designed to take care of small and concise messages. This is logical; it is a lot more efficient to process a small message than a large one.

RabbitMQ can handle large messages with 2 different techniques:

  • Chunking: the large message is chunked into smaller units by the Sender and reassembled by the Receiver
  • Buffering: the message is buffered and sent in one piece

However, note that handling large messages means a negative impact on performance depending on the storage mechanism of the message: in memory – not persistent – or on disk – persistent.

Despite the general recommendation for small messages there may be occasions where you simply have to deal with large ones. A typical example is when you need to send the contents of a file.

A strategy you may follow is to have a special dedicated server with a RabbitMQ instance installed which is designated to handle large messages. “Normal” short messages are then handled by the main RabbitMQ instances.

There’s no magic built-in method in the RabbitMq library to handle chunking and buffering, we’ll have to write some code to make them work. Don’t worry, it just simple standard .NET File I/O.

Buffered message demo

If you’ve gone through the other posts on RabbitMQ on this blog then you’ll have a Visual Studio solution ready to be extended. Otherwise just create a new blank solution in Visual Studio 2012 or 2013. Add a new solution folder called LargeMessages to the solution. In that solution add the following projects:

  • A console app called LargeMessageReceiver
  • A console app called LargeMessageSender
  • A C# library called MessagingService

Add the following NuGet package to all three projects:

RabbitMQ new client package NuGet

Add a project reference to MessagingService from LargeMessageReceiver and LargeMessageSender. Add a class called RabbitMqService to MessagingService with the following code:

public class RabbitMqService
{
	private string _hostName = "localhost";
	private string _userName = "guest";
	private string _password = "guest";

	public static string LargeMessageBufferedQueue = "LargeMessageBufferedQueue";

	public IConnection GetRabbitMqConnection()
	{
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.HostName = _hostName;
		connectionFactory.UserName = _userName;
		connectionFactory.Password = _password;

		return connectionFactory.CreateConnection();
	}
}

Let’s set up the queue. Add the following code to Main of LargeMessageSender:

RabbitMqService rabbitMqService = new RabbitMqService();
IConnection connection = rabbitMqService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
model.QueueDeclare(RabbitMqService.LargeMessageBufferedQueue, true, false, false, null);

Run the Sender project. Check in the RabbitMq management console that the queue has been set up.

Comment out the call to model.QueueDeclare, we won’t need it.

Next get a text file ready that is about 15-18 MB in size. Copy or download some large text from the internet and save it on your hard drive somewhere.

Add the following code in Program.cs of the Sender:

private static void RunBufferedMessageExample(IModel model)
{
	string filePath = @"c:\large_file.txt";
	ConsoleKeyInfo keyInfo = Console.ReadKey();
	while (true)
	{
		if (keyInfo.Key == ConsoleKey.Enter)
		{
			IBasicProperties basicProperties = model.CreateBasicProperties();
			basicProperties.SetPersistent(true);
			byte[] fileContents = File.ReadAllBytes(filePath);
			model.BasicPublish("", RabbitMqService.LargeMessageBufferedQueue, basicProperties, fileContents);
		}
                keyInfo = Console.ReadKey();
	}
}

So when we press Enter then the large file is read into a byte array. The byte array is then sent to the queue we’ve just set up. Insert a call to this method from Main.

Now let’s turn to the Receiver. Add the following code to Main in Program.cs of LargeMessageReceiver:

RabbitMqService commonService = new RabbitMqService();
IConnection connection = commonService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
ReceiveBufferedMessages(model);

…where ReceiveBufferedMessages looks as follows:

private static void ReceiveBufferedMessages(IModel model)
{
	model.BasicQos(0, 1, false);
	QueueingBasicConsumer consumer = new QueueingBasicConsumer(model);
	model.BasicConsume(RabbitMqService.LargeMessageBufferedQueue, false, consumer);
	while (true)
	{
		BasicDeliverEventArgs deliveryArguments = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
		byte[] messageContents = deliveryArguments.Body;
		string randomFileName = string.Concat(@"c:\large_file_from_rabbit_", Guid.NewGuid(), ".txt");
                Console.WriteLine("Received message, will save it to {0}", randomFileName);
		File.WriteAllBytes(randomFileName, messageContents);
		model.BasicAck(deliveryArguments.DeliveryTag, false);
	}
}

Run the Sender application. Next, start the Receiver as well: right-click on it in VS, select Debug, Start new instance. There should be 2 console windows up and running. Have the Sender as active and press Enter. The file should be read and sent over to the receiver and saved under the random file name:

Large file received by receiver

Chunked messages

Let’s set up a different queue for this demo. Add the following static string to RabbitMqService:

public static string ChunkedMessageBufferedQueue = "ChunkedMessageBufferedQueue";

We’ll reorganise the code a bit in Main of LargeMessageSender.Program.cs:

static void Main(string[] args)
{
	RabbitMqService rabbitMqService = new RabbitMqService();
	IConnection connection = rabbitMqService.GetRabbitMqConnection();
	IModel model = connection.CreateModel();
	//model.QueueDeclare(RabbitMqService.LargeMessageBufferedQueue, true, false, false, null);
	//RunBufferedMessageExample(model);
	model.QueueDeclare(RabbitMqService.ChunkedMessageBufferedQueue, true, false, false, null);
}

Run the Sender to create the queue. Check in the RabbitMq management console that it was in fact created. Comment out the call to model.QueueDeclare. Add the following private method to Program.cs of LargeMessageSender:

private static void RunChunkedMessageExample(IModel model)
{
	string filePath = @"c:\large_file.txt";
	int chunkSize = 4096;	
	while (true)
	{
                ConsoleKeyInfo keyInfo = Console.ReadKey();
		if (keyInfo.Key == ConsoleKey.Enter)
		{
			Console.WriteLine("Starting file read operation...");
			FileStream fileStream = File.OpenRead(filePath);
			StreamReader streamReader = new StreamReader(fileStream);
			int remainingFileSize = Convert.ToInt32(fileStream.Length);
			int totalFileSize = Convert.ToInt32(fileStream.Length);
			bool finished = false;
			string randomFileName = string.Concat("large_chunked_file_", Guid.NewGuid(), ".txt");
			byte[] buffer;
			while (true)
			{
				if (remainingFileSize <= 0) break;
				int read = 0;
				if (remainingFileSize > chunkSize)
				{
					buffer = new byte[chunkSize];
					read = fileStream.Read(buffer, 0, chunkSize);
				}
				else
				{
					buffer = new byte[remainingFileSize];
					read = fileStream.Read(buffer, 0, remainingFileSize);						
					finished = true;
				}

				IBasicProperties basicProperties = model.CreateBasicProperties();
				basicProperties.SetPersistent(true);
				basicProperties.Headers = new Dictionary<string, object>();
				basicProperties.Headers.Add("output-file", randomFileName);
				basicProperties.Headers.Add("finished", finished);

				model.BasicPublish("", RabbitMqService.ChunkedMessageBufferedQueue, basicProperties, buffer);
				remainingFileSize -= read;
			}
			Console.WriteLine("Chunks complete.");
		}
	}
}

That’s a bit longer than what we normally have. We define a chunk size of 4KB. Then upon pressing enter we start reading the file. We read chunks of 4kb into the variable called ‘buffer’. In the inner while loop we keep reading the file until all bytes have been processed. Upon each iteration we send some metadata about the message in the Headers section: the file name that the receiver can start saving the data into and whether there’s any more message to be expected. We then publish the partial message. Add a call to this method from Main.

Now let’s turn to the Receiver. Re-organise the current code in Main as follows:

static void Main(string[] args)
{
	RabbitMqService commonService = new RabbitMqService();
	IConnection connection = commonService.GetRabbitMqConnection();
	IModel model = connection.CreateModel();
	//ReceiveBufferedMessages(model);
	ReceiveChunkedMessages(model);
}

…where ReceiveChunkedMessages looks as follows:

private static void ReceiveChunkedMessages(IModel model)
{
	model.BasicQos(0, 1, false);
	QueueingBasicConsumer consumer = new QueueingBasicConsumer(model);
	model.BasicConsume(RabbitMqService.ChunkedMessageBufferedQueue, false, consumer);
	while (true)
	{
		BasicDeliverEventArgs deliveryArguments = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
		Console.WriteLine("Received a chunk!");
		IDictionary<string, object> headers = deliveryArguments.BasicProperties.Headers;
		string randomFileName = Encoding.UTF8.GetString((headers["output-file"] as byte[]));
		bool isLastChunk = Convert.ToBoolean(headers["finished"]);
		string localFileName = string.Concat(@"c:\", randomFileName);
		using (FileStream fileStream = new FileStream(localFileName, FileMode.Append, FileAccess.Write))
		{
			fileStream.Write(deliveryArguments.Body, 0, deliveryArguments.Body.Length);
			fileStream.Flush();
		}
		Console.WriteLine("Chunk saved. Finished? {0}", isLastChunk);
		model.BasicAck(deliveryArguments.DeliveryTag, false);
	}
}

Most of this is standard RabbitMq code from previous posts. The new things are that the we read the headers and save the contents of the message body in a file on the C drive.

Run the Sender application. Then run the Receiver the same way as in the previous demo. You’ll have two console windows up and running. Make sure that the Sender is selected and press Enter. You’ll see that the chunks are sent over to the Receiver and are processed accordingly:

Chunks complete

Check the target file destination to see if the file has been saved.

With the chunking pattern it’s probably a good idea to keep your infrastructure as simple as possible:

  • Start with a single Receiver: you can have multiple receivers as we saw int the post on worker queues but then you’ll face the challenge of putting the chunks into the right order
  • Have a dedicated queue for chunked messages: multi-purpose queues are cumbersome as we saw [here], you shouldn’t add chunking to the complexity if you can avoid that

Read the next part in this series here.

View the list of posts on Messaging here.

Reading the outcome of parallel loops in .NET C#

The Parallel.For() and Parallel.ForEach() methods both return a ParallelLoopResult object. This object has two properties which you can use to read if Break or Stop have been called:

  • IsCompleted: true if all loop iterations have been completed without calling either Break or Stop
  • LowestBreakIteration: the index of the lowest iteration in which the Break method was called

Example:

ParallelLoopResult parallelLoopResult =
        Parallel.For(0, 10, (int index, ParallelLoopState parallelLoopState) =>
	{
		if (index == 5)
		{
			parallelLoopState.Stop();
		}
	});

Console.WriteLine("IsCompleted: {0}", parallelLoopResult.IsCompleted);
Console.WriteLine("BreakValue: {0}", parallelLoopResult.LowestBreakIteration.HasValue?       parallelLoopResult.LowestBreakIteration.Value
				: -1);

The properties return the following values:

IsCompleted (IC): false
LowestBreakIteration.HasValue (LBI): false

Here come the possible value pairs and their meaning:

  • IC true, LBI false: all iterations were completed without breaking or stopping
  • IC false, LBI false: Stop was called
  • IF false, LBI true: Break was called

View the list of posts on the Task Parallel Library here.

Determine if two sequences are equal with LINQ C#

Say you have two sequences of objects:

string[] bands = { "ACDC", "Queen", "Aerosmith", "Iron Maiden", "Megadeth", "Metallica", "Cream", "Oasis", "Abba", "Blur", "Chic", "Eurythmics", "Genesis", "INXS", "Midnight Oil", "Kent", "Madness", "Manic Street Preachers"							 , "Noir Desir", "The Offspring", "Pink Floyd", "Rammstein", "Red Hot Chili Peppers", "Tears for Fears"						 , "Deep Purple", "KISS"};

string[] bandsTwo = { "ACDC", "Queen", "Aerosmith", "Iron Maiden", "Megadeth", "Metallica", "Cream", "Oasis", "Abba", "Blur", "Chic", "Eurythmics", "Genesis", "INXS", "Midnight Oil", "Kent", "Madness", "Manic Street Preachers"							 , "Noir Desir", "The Offspring", "Pink Floyd", "Rammstein", "Red Hot Chili Peppers", "Tears for Fears"						 , "Deep Purple", "KISS"};

If you’d like to check whether the two sequences include the same elements then you can use the SequenceEquals LINQ operator:

bool equal = bands.SequenceEqual(bandsTwo);
Console.WriteLine(equal);

This approach works fine for objects where a good built-in comparer exists. .NET can compare two strings, two integers etc. and determine whether they are equal. It’s a different story with reference types such as your custom objects:

public class Singer
{
	public int Id { get; set; }
	public string FirstName { get; set; }
	public string LastName { get; set; }
	public int BirthYear { get; set; }
}

IEnumerable<Singer> singers = new List<Singer>() 
			{
				new Singer(){Id = 1, FirstName = "Freddie", LastName = "Mercury", BirthYear=1964}
				, new Singer(){Id = 2, FirstName = "Elvis", LastName = "Presley", BirthYear = 1954}
				, new Singer(){Id = 3, FirstName = "Chuck", LastName = "Berry", BirthYear = 1954}
				, new Singer(){Id = 4, FirstName = "Ray", LastName = "Charles", BirthYear = 1950}
				, new Singer(){Id = 5, FirstName = "David", LastName = "Bowie", BirthYear = 1964}
			};

IEnumerable<Singer> singersTwo = new List<Singer>() 
			{
				new Singer(){Id = 1, FirstName = "Freddie", LastName = "Mercury", BirthYear=1964}
				, new Singer(){Id = 2, FirstName = "Elvis", LastName = "Presley", BirthYear = 1954}
				, new Singer(){Id = 3, FirstName = "Chuck", LastName = "Berry", BirthYear = 1954}
				, new Singer(){Id = 4, FirstName = "Ray", LastName = "Charles", BirthYear = 1950}
				, new Singer(){Id = 5, FirstName = "David", LastName = "Bowie", BirthYear = 1964}
			};

bool singersEqual = singers.SequenceEqual(singersTwo);
Console.WriteLine(singersEqual);

This will yield false as .NET doesn’t automatically know how to compare the Singer objects in a way that makes sense to you. Instead, the comparison will be based on reference equality.

This is where an overloaded version of SequenceEquals enters the scene, one where you can specify your own equality comparer:

public class DefaultSingerComparer : IEqualityComparer<Singer>
{
	public bool Equals(Singer x, Singer y)
	{
		return x.Id == y.Id;
	}

	public int GetHashCode(Singer obj)
	{
		return obj.Id.GetHashCode();
	}
}

So we say that if the singer Ids are equal then the Singer objects are equal:

bool singersEqual = singers.SequenceEqual(singersTwo, new DefaultSingerComparer());
Console.WriteLine(singersEqual);

…which yields true.

You can view all LINQ-related posts on this blog here.

RabbitMQ in .NET: data serialisation II

Introduction

In the previous post we discussed the basics of data serialisation in RabbitMQ .NET. We saw how to set the content type and the object type.

This last point is open for further investigation as the object type is a string which gives you a very wide range of possibilities how to define the object type.

In this post we’ll take a closer look at the scenario where the same .NET objects are used in both the sender and receiver applications.

We’ll build on the demo we started in the previous post so have it ready.

.NET objects

If it’s guaranteed that both the Sender and the Receiver are .NET projects then it’s the fully qualified object name will be a good way to denote the object type. We had the following object in the SharedObjects library:

[Serializable]
public class Customer
{
     public string Name { get; set; }
}

Insert another one which has the same structure but a different classname:

[Serializable]
public class NewCustomer
{
	public string Name { get; set; }
}

Add two new Console apps to the Serialisation folder: DotNetObjectSender and DotNetObjectReceiver. Add the following NuGet packages to both:

RabbitMQ new client package NuGet

Newtonsoft JSON.NET NuGet package

Add a reference to the SharedObjects library to both console apps.

Let’s set up the queue. Add the following field to CommonService.cs:

public static string DotNetObjectQueueName = "DotNetObjectQueue";

Add the following code to Program.cs Main of the Sender app:

CommonService commonService = new CommonService();
IConnection connection = commonService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
model.QueueDeclare(CommonService.DotNetObjectQueueName, true, false, false, null);

Run DotNetObjectSender so that the queue is created. You can check in the RabbitMq management console if the queue has been set up. Comment out the call to model.QueueDeclare.

We’ll go with JSON serialisation as it is very popular, but XML and binary serialisation are also possible. We saw in the previous post how to serialise and deserialise in those formats if you need them. Add the following method to Program.cs of the Sender and call it from Main:

private static void RunDotNetObjectDemo(IModel model)
{
	Console.WriteLine("Enter customer name. Quit with 'q'.");
	while (true)
	{
		string customerName = Console.ReadLine();
		if (customerName.ToLower() == "q") break;
		Random random = new Random();
		int i = random.Next(0, 2);
		String type = "";
		String jsonified = "";
		if (i == 0)
		{
			Customer customer = new Customer() { Name = customerName };
			jsonified = JsonConvert.SerializeObject(customer);
			type = customer.GetType().AssemblyQualifiedName;
		}
		else
		{
			NewCustomer newCustomer = new NewCustomer() { Name = customerName };
			jsonified = JsonConvert.SerializeObject(newCustomer);
			type = newCustomer.GetType().AssemblyQualifiedName;
		}
				
		IBasicProperties basicProperties = model.CreateBasicProperties();
		basicProperties.SetPersistent(true);
		basicProperties.ContentType = "application/json";
		basicProperties.Type = type;
		byte[] customerBuffer = Encoding.UTF8.GetBytes(jsonified);
		model.BasicPublish("", CommonService.DotNetObjectQueueName, basicProperties, customerBuffer);
	}
}

All of this should be familiar from the previous discussion. We randomly construct either a Customer or a NewCustomer and set the message type accordingly.

Let’s turn to the Receiver and see how it can read the message. Add the following code to Main in Program.cs:

CommonService commonService = new CommonService();
IConnection connection = commonService.GetRabbitMqConnection();
IModel model = connection.CreateModel();
ReceiveDotNetObjects(model);

…where ReceiveDotNetObjects looks as follows:

private static void ReceiveDotNetObjects(IModel model)
{
	model.BasicQos(0, 1, false);
	QueueingBasicConsumer consumer = new QueueingBasicConsumer(model);
	model.BasicConsume(CommonService.DotNetObjectQueueName, false, consumer);
	while (true)
	{			
		BasicDeliverEventArgs deliveryArguments = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
		string objectType = deliveryArguments.BasicProperties.Type;
		Type t = Type.GetType(objectType);				
		String jsonified = Encoding.UTF8.GetString(deliveryArguments.Body);
		object rawObject = JsonConvert.DeserializeObject(jsonified, t);
		Console.WriteLine("Object type: {0}", objectType);
								
		if (rawObject.GetType() == typeof(Customer))
		{
			Customer customer = rawObject as Customer;
			Console.WriteLine("Customer name: {0}", customer.Name);
		}
		else if (rawObject.GetType() == typeof(NewCustomer))
		{
			NewCustomer newCustomer = rawObject as NewCustomer;
			Console.WriteLine("NewCustomer name: {0}", newCustomer.Name);
		}
		model.BasicAck(deliveryArguments.DeliveryTag, false);
	}
}

We extract the fully qualified name of the incoming object from the full assembly name and deserialise it accordingly.

Start the Sender application. The right-click the Receiver project in Visual Studio, Select Debug, Create new instance. You’ll have two console windows up and running. Start sending customer names to the Receiver. You’ll see that the Receiver can handle both Customer and NewCustomer objects:

Dot net objects serialised

Read the next part in this series here.

View the list of posts on Messaging here.

Determine if all elements fulfil a condition in a sequence with LINQ C#

Say we have the following string list:

string[] bands = { "ACDC", "Queen", "Aerosmith", "Iron Maiden", "Megadeth", "Metallica", "Cream", "Oasis", "Abba", "Blur", "Chic", "Eurythmics", "Genesis", "INXS", "Midnight Oil", "Kent", "Madness", "Manic Street Preachers"
, "Noir Desir", "The Offspring", "Pink Floyd", "Rammstein", "Red Hot Chili Peppers", "Tears for Fears"
, "Deep Purple", "KISS"};

Say we’d like to determine if all elements in the sequence fulfil a certain condition. Nothing could be easier using the All operator:

bool all = bands.All(b => b.StartsWith("A"));
Console.WriteLine(all);

This yields false as not all band names start with an A. However, their length is certainly longer than 2 characters so the below query returns true:

bool all = bands.All(b => b.Length > 2);
Console.WriteLine(all);

You can view all LINQ-related posts on this blog here.

RabbitMQ in .NET: data serialisation I

Introduction

We went through the basic messaging concepts in RabbitMQ in a previous series. You’ll find the links to all installment on this page. In this new series we’ll continue our discussion of messaging concepts in RabbitMQ. If you’re entirely new to RabbitMq then you should at least skim through the foundations as I won’t provide any detailed description of the code that was covered before.

So far we’ve kept our messages simple in order to concentrate on the key concepts: we only sent simple text messages to RabbitMQ. However, in reality we normally send objects with properties and not only text. The object needs to be serialised into a byte array so that it can be included in the message body. On the receiving end the serialised object needs to be deserialised.

We’re going to look at different ways of achieving this goal.

Most of the posts on RabbitMQ on this blog are based on the work of RabbitMQ guru Michael Stephenson.

Demo: JSON serialisation

I’m using Visual Studio 2012 for this demo but you’ll probably be fine with VS 2010 and VS 2013 as well. If you followed along the original tutorial then you can open that solution. Otherwise create a new blank solution in Visual Studio and insert a solution folder called Serialisation. Add two Console app projects to this folder: SerialisationSender and SerialisationReceiver. Add the following NuGet packages to both:

RabbitMQ new client package NuGet

Newtonsoft JSON.NET NuGet package

Add a class library called SharedObjects and add the same RabbitMq NuGet package to it as above. Insert an object called Customer:

public class Customer
{
     public string Name { get; set; }
}

Set a reference to this class library from both the Sender and the Receiver console apps. Insert another class called CommonService to the class library:

public class CommonService
{
	private string _hostName = "localhost";
	private string _userName = "guest";
	private string _password = "guest";

	public static string SerialisationQueueName = "SerialisationDemoQueue";

	public IConnection GetRabbitMqConnection()
	{
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.HostName = _hostName;
		connectionFactory.UserName = _userName;
		connectionFactory.Password = _password;

		return connectionFactory.CreateConnection();
	}
}

Next we’ll set up the queue for this demo. Add the following code to Program.cs in the Sender app:

static void Main(string[] args)
{
	CommonService commonService = new CommonService();
	IConnection connection = commonService.GetRabbitMqConnection();
	IModel model = connection.CreateModel();
	SetupSerialisationMessageQueue(model);
}

private static void SetupSerialisationMessageQueue(IModel model)
{
	model.QueueDeclare(CommonService.SerialisationQueueName, true, false, false, null);
}

Run the Sender app and check in the RabbitMQ management console that the queue has been created. Comment out the call to SetupSerialisationMessageQueue in Main. Insert the following method to the Sender:

private static void RunSerialisationDemo(IModel model)
{
	Console.WriteLine("Enter customer name. Quit with 'q'.");
	while (true)
	{
		string customerName = Console.ReadLine();
		if (customerName.ToLower() == "q") break;
		Customer customer = new Customer() { Name = customerName };
		IBasicProperties basicProperties = model.CreateBasicProperties();
		basicProperties.SetPersistent(true);
		String jsonified = JsonConvert.SerializeObject(customer);
		byte[] customerBuffer = Encoding.UTF8.GetBytes(jsonified);
		model.BasicPublish("", CommonService.SerialisationQueueName, basicProperties, customerBuffer);
	}
}

There’s not much magic going on: we enter the customer name, construct the Customer object, build a JSON object out of it, get the byte array out of the JSON string and send it to the message queue. Add a call to this method from Main:

RunSerialisationDemo(model);

In the SerialisationReceiver add the following code to Program.cs:

static void Main(string[] args)
{
	CommonService commonService = new CommonService();
	IConnection connection = commonService.GetRabbitMqConnection();
	IModel model = connection.CreateModel();
	ReceiveSerialisationMessages(model);
}

private static void ReceiveSerialisationMessages(IModel model)
{
	model.BasicQos(0, 1, false);
	QueueingBasicConsumer consumer = new QueueingBasicConsumer(model);
	model.BasicConsume(CommonService.SerialisationQueueName, false, consumer);
	while (true)
	{
		BasicDeliverEventArgs deliveryArguments = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
		String jsonified = Encoding.UTF8.GetString(deliveryArguments.Body);
		Customer customer = JsonConvert.DeserializeObject<Customer>(jsonified);
		Console.WriteLine("Pure json: {0}", jsonified);
		Console.WriteLine("Customer name: {0}", customer.Name);
		model.BasicAck(deliveryArguments.DeliveryTag, false);
	}
}

This bit of code should look very familiar to you from the foundations course: we’re listening to a specific queue and extract any incoming messages. The only new bit is that we deserialise the JSON string into a Customer object.

The demo is ready to run. Start the Sender app. Then right-click the Receiver project, select Debug, Start new Instance. You’ll have two console window up and running. Start sending customer names to the Receiver from the Sender:

Serialised message output

The same technique works for more complicated objects with multiple properties and other objects as properties.

Setting the content type

JSON is not the only message type we can send. The other two common message formats are XML and binary. No matter how you format your message, it will need to be sent to the message queue as a byte array. We’ve already seen how to serialise a JSON message. For XML you can use the following method:

private byte[] SerialiseIntoXml(Customer customer)
{
	MemoryStream memoryStream = new MemoryStream();
	XmlSerializer xmlSerialiser = new XmlSerializer(customer.GetType());
	xmlSerialiser.Serialize(memoryStream, customer);
	memoryStream.Flush();
	memoryStream.Seek(0, SeekOrigin.Begin);
        return memoryStream.GetBuffer();
}

…and to get the Customer object from a binary format you can use the following method:

private byte[] SerialiseIntoBinary(Customer customer)
{
	MemoryStream memoryStream = new MemoryStream();
	BinaryFormatter binaryFormatter = new BinaryFormatter();
	binaryFormatter.Serialize(memoryStream, customer);
	memoryStream.Flush();
	memoryStream.Seek(0, SeekOrigin.Begin);
	return memoryStream.GetBuffer();
}

Note that the Customer object must be serialisable for these to work:

[Serializable]
public class Customer
{
      public string Name { get; set; }
}

So now we can serialise our message in 3 different ways. It now makes sense to denote the content type of the message. The IBasicProperties interface has a property called ContentType where you can set the MIME type using the well-known values below:

  • JSON: application/json
  • XML: text/xml
  • Binary: application/octet-stream

Example:

[Serializable]
IBasicProperties basicProperties = model.CreateBasicProperties();
basicProperties.SetPersistent(true);
basicProperties.ContentType = "application/json";

The properties are sent along in the BasicPublish method so the MIME type is preserved.

On the client side you can read the MIME type as follows:

BasicDeliverEventArgs deliveryArguments = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
string contentType = deliveryArguments.BasicProperties.ContentType;

Also, the client will need to deserialise the message. We’ve already seen how to do that in the case of the JSON format. For XML you can have the following helper method:

private Customer DeserialiseFromXml(byte[] messageBody)
{
	MemoryStream memoryStream = new MemoryStream();
	memoryStream.Write(messageBody, 0, messageBody.Length);
	memoryStream.Seek(0, SeekOrigin.Begin);
	XmlSerializer xmlSerialiser = new XmlSerializer(typeof(Customer));
	return xmlSerialiser.Deserialize(memoryStream) as Customer;
}

…and for the binary format you use something like this:

private Customer DeserialiseFromBinary(byte[] messageBody)
{
	MemoryStream memoryStream = new MemoryStream();
	memoryStream.Write(messageBody, 0, messageBody.Length);
	memoryStream.Seek(0, SeekOrigin.Begin);
	BinaryFormatter binaryFormatter = new BinaryFormatter();
	return binaryFormatter.Deserialize(memoryStream) as Customer;
}

Note that the Customer object must be shared between Sender and the Receiver for binary serialisation to work.

Denoting the type of the message

It can happen that the Receiver doesn’t know in advance what type of message is coming, i.e. if it’s a Customer, an Order, a Product etc.

You can solve this issue using the Type property of the IBasicProperties object when sending the message from the Sender. It is a string property:

IBasicProperties basicProperties = model.CreateBasicProperties();
basicProperties.SetPersistent(true);
basicProperties.ContentType = "application/json";
basicProperties.Type = "Customer";

And then you can read the type in the Receiver as follows:

BasicDeliverEventArgs deliveryArguments = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
string contentType = deliveryArguments.BasicProperties.ContentType;
string objectType = deliveryArguments.BasicProperties.Type;

You are free to set the value of the Type property. You can do it in a simple way like above, but the Receiver will need to know those values. In the world of open APIs and automatic documentation generators this shouldn’t be a serious obstacle. Other solutions:

  • Fully qualified name, including the namespace, such as “SharedObjects.Customer”. This is easy to retrieve with the typeof keyword: typeof(Customer).ToString(). This approach is mostly viable within the .NET world, where both the Sender and the Receiver can work with .NET objects
  • Canonical messages where the object type is described using XSD along with the root element. This approach works best if interoperability between disparate systems is a must

We’ll look at the first option in the next post.

View the list of posts on Messaging here.

Determine if a sequence contains a certain element with LINQ C#

Say we have the following string list:

string[] bands = { "ACDC", "Queen", "Aerosmith", "Iron Maiden", "Megadeth", "Metallica", "Cream", "Oasis", "Abba", "Blur", "Chic", "Eurythmics", "Genesis", "INXS", "Midnight Oil", "Kent", "Madness", "Manic Street Preachers"
, "Noir Desir", "The Offspring", "Pink Floyd", "Rammstein", "Red Hot Chili Peppers", "Tears for Fears"
, "Deep Purple", "KISS"};

If you’d like to check if a certain string element is present in the sequence then you can use the Contains operator in LINQ:

bool contains = bands.Contains("Queen");
Console.WriteLine(contains);

This yields true as you might expect.

This was a straightforward case as .NET has a good built-in implementation of string equality. It works just as well with primitive types like int or long. .NET can determine if two integers are equal so the default version of Contains is sufficient.

It is different with your custom objects, such as this one:

public class Singer
{
	public int Id { get; set; }
	public string FirstName { get; set; }
	public string LastName { get; set; }
	public int BirthYear { get; set; }
}

Consider the following list:

IEnumerable<Singer> singers = new List<Singer>() 
			{
				new Singer(){Id = 1, FirstName = "Freddie", LastName = "Mercury", BirthYear=1964}
				, new Singer(){Id = 2, FirstName = "Elvis", LastName = "Presley", BirthYear = 1954}
				, new Singer(){Id = 3, FirstName = "Chuck", LastName = "Berry", BirthYear = 1954}
				, new Singer(){Id = 4, FirstName = "Ray", LastName = "Charles", BirthYear = 1950}
				, new Singer(){Id = 5, FirstName = "David", LastName = "Bowie", BirthYear = 1964}
			};

…and the following code:

Singer s = new Singer() { Id = 2, FirstName = "Elvis", LastName = "Presley", BirthYear = 1954 };
bool containsSinger = singers.Contains(s);
Console.WriteLine(containsSinger);

This will of course yield false as equality is based on references and there’s no element in the sequence with the same reference as “s”. In this case we can use the overloaded version of Contains where you can supply an equality comparer:

public class DefaultSingerComparer : IEqualityComparer<Singer>
{
	public bool Equals(Singer x, Singer y)
	{
		return x.Id == y.Id;
	}

	public int GetHashCode(Singer obj)
	{
		return obj.Id.GetHashCode();
	}
}

So equality is based on the ID. We can rewrite our query as follows:

Singer s = new Singer() { Id = 2, FirstName = "Elvis", LastName = "Presley", BirthYear = 1954 };
bool containsSinger = singers.Contains(s, new DefaultSingerComparer());
Console.WriteLine(containsSinger);

…which yields true.

You can view all LINQ-related posts on this blog here.

Reversing a sequence using the Reverse operator in .NET LINQ

Reversing the order of a sequence with LINQ is extremely simple: just use the Reverse() operator.

Example data structure:

string[] bands = { "ACDC", "Queen", "Aerosmith", "Iron Maiden", "Megadeth", "Metallica", "Cream", "Oasis", "Abba", "Blur", "Chic", "Eurythmics", "Genesis", "INXS", "Midnight Oil", "Kent", "Madness", "Manic Street Preachers"						 , "Noir Desir", "The Offspring", "Pink Floyd", "Rammstein", "Red Hot Chili Peppers", "Tears for Fears"						 , "Deep Purple", "KISS"};

You can use the Reverse operator as follows:

IEnumerable<string> bandsReversed = bands.Reverse();
foreach (string item in bandsReversed)
{
	Console.WriteLine(item);
}

…which will print the above array in reverse order, i.e. starting with KISS and finishing with ACDC.

You can view all LINQ-related posts on this blog here.

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

ARCHIVED: Bite-size insight on Cyber Security for the not too technical.