Messaging with RabbitMQ and .NET review part 8: routing and topics

Introduction

In the previous post we looked at two-way messaging in RabbitMq. This messaging type corresponds to the Remote Procedure Call (RPC) messaging pattern. RPC is slightly different from the previous MEPs in that there’s a response queue involved. The sender sends an initial message to a destination queue via the default exchange. The message properties include a temporary queue where the consumer can reply. The receiver processes the message and responds using the response queue extracted from the message properties. The sender then processes the response. We managed to set up a rudimentary chat application in our demo project at the end of the post.

In this post we’ll concentrate on two message filtering techniques: routing keys and topics. The two are quite similar so a single post is enough to handle them both.

Read more of this post

Messaging with RabbitMQ and .NET review part 7: two way messaging

Introduction

In the previous post we looked at the fanout exchange type. This exchange type corresponds to the publish-subscribe message exchange pattern. This MEP is very similar to one-way messaging. The major difference is that there can be multiple queues bound to an exchange. The incoming messages are forwarded to every queue where each queue is monitored by a listener. This exchange type is suitable for cases where you know that there will be multiple listeners.

So far we’ve looked at patterns where the publisher sent a message to a queue and didn’t care about any response back from the consumer. In this post we’ll see how to set up two-way messaging where a publisher receives a response from the consumer.

Read more of this post

Messaging with RabbitMQ and .NET review part 6: the fanout exchange type

Introduction

In the previous post we looked at an alternative way to consume messages from a queue in RabbitMq. In particular we discussed the usage of the EventingBasicConsumer which is an event and delegate based alternative to the DefaultBasicConsumer class. The outcome is the same in both cases, i.e. the consumer monitors the assigned queue and pulls messages from it.

In this post we’ll discuss how to work with the fanout exchange type.

Read more of this post

Messaging with RabbitMQ and .NET review part 5: one way messaging with an event based consumer

Introduction

In the previous post we saw how to process messages from a queue using a receiver we derived from a default basic consumer. We implemented the HandleBasicDeliver function for that purpose. We also discussed two message exchange patterns (MEPs), one-way and and worker queues. The two are practically identical in code but the worker queues MEP implies that we have 2 or more consumers competing for the messages from the queue. That way we can spread out the message load across multiple consumer instances.

In this short post we’ll look at an alternative way to consume messages from a queue in code.

Read more of this post

Messaging with RabbitMQ and .NET review part 4: one way messaging with a basic consumer

Introduction

In the previous post we looked at the RabbitMq .NET client. The client is a library that can be downloaded from NuGet and which allows us to work with RabbitMq messages in our .NET projects in an object-oriented way. In particular we saw how to create an exchange, a queue and a binding in code. We also successfully sent a message to the queue we created in a simple .NET console application. We also discussed the notion of durability whereby we can make all resources in RabbitMq fault tolerant so that they survive a server restart.

In this post we’ll see how to consume one-way direct messages in code.

Read more of this post

Messaging with RabbitMQ and .NET review part 3: the .NET client and some initial code

Introduction

In the previous post we installed the RabbitMq service on Windows. I think you’ll agree that it wasn’t a very complicated process. We then logged into the management GUI using the default “guest” administrator user. We finally looked at how to create users and virtual hosts. We said that a virtual host was a container or namespace to delimit groups of resources within RabbitMq, such as “sales” or “accounting”. We also created a new user called “accountant”.

In this post we’ll start working with RabbitMq in Visual Studio. We’ll in particular start exploring the RabbitMq .NET client library.

Read more of this post

Messaging with RabbitMQ and .NET review part 2: installation and setup

Introduction

In the previous post we we went through a general introduction of RabbitMq and its terminology. RabbitMq is a message broker that helps to solve communication between disparate systems in a reliable and maintainable manner. It is a very fast and highly scalable open-source messaging system which by default supports the AMQP messaging protocol. We discussed the key terms exchange, binding, queue, connection and channel. We also listed the 4 exchange types which are direct, header exchange, topic and fan-out.

In this post we’ll install RabbitMq on Windows. I have Windows 10 on my laptop but the RabbitMq installation package should work equally well on other versions of Windows. The most recent version of RabbitMq at this time of writing this post is 3.6.4. There may be a later version by the time you read this.

Read more of this post

Messaging with RabbitMQ and .NET review part 1: foundations and terminology

Introduction

RabbitMQ is a message broker that helps to solve communication between disparate systems in a reliable and maintainable manner. There can be various platforms that need to communicate with each other: a Windows service, a Java servlet based web service, an MVC web application etc. Messaging aims to integrate these systems so that they can exchange information in a decoupled and platform independent fashion.

There have been numerous ways to solve messaging in the past: Java Messaging Service, MSMQ, IBM MQ, but they never really became widespread mostly because they are tied to a specific system, like Windows. Messaging systems based on those technologies were complex, expensive, difficult to connect to and in general difficult to work with. Also, they didn’t follow any particular messaging standard; each vendor had their own standards that the customers had to adhere to.

In this new series on RabbitMq we will revisit some concepts and techniques we discussed in the original series here. As a user commented on the original series, there have been a number of changes, extensions and new concepts in RabbitMq and its .NET client so it’s time for a review.

Read more of this post

Domain Driven Design with Web API extensions part 7: domain events with RabbitMq completed

Introduction

In the previous post we set up the local RabbitMq environment. We also created a simple console application that built the message queue where the DDD demo project and the simulated financial application will communicate. We also prepared the way for the completion of the messaging process by creating an app setting reader and adding the RabbitMq related settings to web config.

In this post we’ll complete the demo by sending a message to the queue and reading it from the dummy financial console application.

Read more of this post

RabbitMQ in .NET C#: more complex error handling in the Receiver

Introduction

In the previous part on RabbitMQ .NET we looked at ways how to reject a message if there was an exception while handling the message on the Receiver’s side. The message could then be discarded or re-queued for a retry. However, the exception handling logic was very primitive in that the same message could potentially be thrown at the receiver infinitely causing a traffic jam in the messages.

This post builds upon the basics of RabbitMQ in .NET. If you are new to this topic you should check out all the previous posts listed on this page. I won’t provide any details on bits of code that we’ve gone through before.

Most of the posts on RabbitMQ on this blog are based on the work of RabbitMQ guru Michael Stephenson.

So we cannot just keep retrying forever. We can instead finally discard the message after a certain amount of retries or depending on what kind of exception was encountered.

The logic around retries must be implemented in the receiver as there’s no simple method in RabbitMQ .NET, like “BasicRetry”. Why should there be anyway? Retry strategies can be very diverse so it’s easier to let the receiver handle it.

The strategy here is to reject the message without re-queuing it. We’ll then create a new message based on the one that caused the exception and attach an integer value to it indicating the number of retries. Then depending on a maximum ceiling we either create yet another message for re-queuing or discard it altogether.

We’ll build on the demo we started on in the previous post referred to above so have it ready.

Demo

We’ll reuse the queue from the previous post which we called “BadMessageQueue”. We’ll also reuse the code in BadMessageSender as there’s no variation on the Sender side.

BadMessageReceiver will however handle the messages in a different way. Currently there’s a method called ReceiveBadMessages which is called upon from Main. Comment out that method call. Insert the following method in ReceiveBadMessages.Program.cs and call it from Main:

private static void ReceiveBadMessageExtended(IModel model)
{
	model.BasicQos(0, 1, false);
	QueueingBasicConsumer consumer = new QueueingBasicConsumer(model);
	model.BasicConsume(RabbitMqService.BadMessageBufferedQueue, false, consumer);
	string customRetryHeaderName = "number-of-retries";
	int maxNumberOfRetries = 3;
	while (true)
	{
		BasicDeliverEventArgs deliveryArguments = consumer.Queue.Dequeue() as BasicDeliverEventArgs;
		String message = Encoding.UTF8.GetString(deliveryArguments.Body);
		Console.WriteLine("Message from queue: {0}", message);
		Random random = new Random();
		int i = random.Next(0, 3);
		int retryCount = GetRetryCount(deliveryArguments.BasicProperties, customRetryHeaderName);
		if (i == 2) //no exception, accept message
		{
			Console.WriteLine("Message {0} accepted. Number of retries: {1}", message, retryCount);
			model.BasicAck(deliveryArguments.DeliveryTag, false);
		}
		else //simulate exception: accept message, but create copy and throw back
		{
			if (retryCount < maxNumberOfRetries)
			{
				Console.WriteLine("Message {0} has thrown an exception. Current number of retries: {1}", message, retryCount);
				IBasicProperties propertiesForCopy = model.CreateBasicProperties();
				IDictionary<string, object> headersCopy = CopyHeaders(deliveryArguments.BasicProperties);
				propertiesForCopy.Headers = headersCopy;
				propertiesForCopy.Headers[customRetryHeaderName] = ++retryCount;
				model.BasicPublish(deliveryArguments.Exchange, deliveryArguments.RoutingKey, propertiesForCopy, deliveryArguments.Body);
				model.BasicAck(deliveryArguments.DeliveryTag, false);
				Console.WriteLine("Message {0} thrown back at queue for retry. New retry count: {1}", message, retryCount);
			}
			else //must be rejected, cannot process
			{
				Console.WriteLine("Message {0} has reached the max number of retries. It will be rejected.", message);
				model.BasicReject(deliveryArguments.DeliveryTag, false);
			}
		}
	}
}

…where CopyHeaders and GetRetryCount look as follows:

private static IDictionary<string, object> CopyHeaders(IBasicProperties originalProperties)
{
	IDictionary<string, object> dict = new Dictionary<string, object>();
	IDictionary<string, object> headers = originalProperties.Headers;
	if (headers != null)
	{
		foreach (KeyValuePair<string, object> kvp in headers)
		{
			dict[kvp.Key] = kvp.Value;
		}
	}

	return dict;
}

private static int GetRetryCount(IBasicProperties messageProperties, string countHeader)
{
	IDictionary<string, object> headers = messageProperties.Headers;
	int count = 0;
	if (headers != null)
	{
		if (headers.ContainsKey(countHeader))
		{
			string countAsString = Convert.ToString( headers[countHeader]);
			count = Convert.ToInt32(countAsString);
		}
	}

	return count;
}

Let’s see what’s going on here. We define a custom header to store the number of retries for a message. We also set an upper limit of 3 on the number of retries. Then we accept the messages in the usual way. A random number between 0 and 3 is generated – where the upper limit is exclusive – to decide whether to simulate an exception or not. If this number is 2 then we accept and acknowledge the message, so there’s a higher probability of “throwing an exception” just to make this demo more interesting. We also extract the current number of retries using the GetRetryCount method. This helper method simply checks the headers of the message for the presence of the custom retry count header.

If we simulate an exception then we need to check if the current retry count has reached the max number of retries. If not then the exciting new stuff begins. We create a new message where we copy the elements of the original message. We also set the new value of the retry count header. We send the message copy back to where it came from and acknowledge the original message. Otherwise if the max number of retries has been reached we reject the message completely using the BasicReject method we saw in the previous part.

Run both the Sender and Receiver apps and start sending messages from the Sender. Depending on the random number generated in the Receiver you’ll see a differing number of retries but you may get something like this:

Advanced retry console output

We can see the following here:

  • Message hello was rejected at first and then accepted after 1 retry
  • Message hi was accepted immediately
  • Message bye was accepted after 2 retries
  • Message seeyou was rejected completely

So we’ve seen how to add some more logic into how to handle exceptions.

Other considerations and extensions:

  • You can specify different max retries depending on the exception type. In that case you can add the exception type to the headers as well
  • You might consider storing the retry count somewhere else than the message itself, e.g. within the Receiver – the advantage of storing the retry count in the message is that if you have multiple receivers waiting for messages from the same queue then they will all have access to the retry property
  • If there’s a dependency between messages then exception handling becomes a bigger challenge: if message B depends on message A and message A throws an exception, what do we do with message B? You can force related messages to be processed in an ordered fashion which will have a negative impact on the message throughput. On the other hand you may simply ignore this scenario if it’s not important enough for your case – “enough” depends on the cost of slower message throughput versus the cost of an exception in interdependent messages. Somewhere between these two extremes you can decide to keep the order of related messages only and let all others be delivered normally. In this case you can put the sequence number, such as “5/10” in the header so that the receiver can check if all messages have come in correctly. If you have multiple receivers then the sequence number must be stored externally so that all receivers will have access to the same information. Otherwise you can have a separate queue or even a separate RabbitMQ instance for related messages in case the proportion of related messages in total number of messages is small.

View the list of posts on Messaging here.

Elliot Balynn's Blog

A directory of wonderful thoughts

HarsH ReaLiTy

A Good Blog is Hard to Find

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.

%d bloggers like this: