Messaging with RabbitMQ and .NET review part 11: various other topics

Introduction

In the previous post we looked at the scatter/gather message exchange pattern. It is similar to RPC in that the sender will be expecting a response from the receiver. The main difference is that in this scenario the sender can collect a range of responses from various receivers. The sender will set up a temporary response queue where the receivers can send their responses. This MEP is suitable for scenarios that require 2 way communication with more than a single consumer. An example would be a system where we’re sending out a notice to some selected construction companies asking for a price offer. The companies then can respond using the message broker and the temporary response queue.

In this post, which will also finish the series, we’ll look at various smaller topics around the RabbitMq client.

Mandatory queuing

If a message cannot be forwarded to any queue from an exchange it’s lost by default. In other words the publisher doesn’t know whether a message has been relayed to at least one queue. There is a way around that via an overload of the BasicPublish method. We also need to set up an event handler for the BasicReturn event of the IModel object. The event handler is triggered in case there was no matching queue for the message. The following example deliberately sets up an exchange with no queue:

private static void SetUpDirectExchange()
{
	ConnectionFactory connectionFactory = new ConnectionFactory();

	connectionFactory.Port = 5672;
	connectionFactory.HostName = "localhost";
	connectionFactory.UserName = "accountant";
	connectionFactory.Password = "accountant";
	connectionFactory.VirtualHost = "accounting";

	IConnection connection = connectionFactory.CreateConnection();
	IModel channel = connection.CreateModel();

	channel.ExchangeDeclare("no.queue.exchange", ExchangeType.Direct, true, false, null);
	IBasicProperties properties = channel.CreateBasicProperties();
	channel.BasicReturn += Channel_BasicReturn;
							
	channel.BasicPublish("no.queue.exchange", "", true, properties, Encoding.UTF8.GetBytes("This is a message from the RabbitMq .NET driver"));

	channel.Close();
	connection.Close();
}

private static void Channel_BasicReturn(object sender, BasicReturnEventArgs e)
{
	Debug.WriteLine(string.Concat("Queue is missing for the message: ", Encoding.UTF8.GetString(e.Body)));
	Debug.WriteLine(string.Concat("Reply code and text: ", e.ReplyCode, " ", e.ReplyText));
}

The above example produces the following output in the Debug window:

Queue is missing for the message: This is a message from the RabbitMq .NET driver
Reply code and text: 312 NO_ROUTE

Confirmation from the exchange

The publisher can receive a confirmation from RabbitMq whether a message successfully reached the exchange. Note the following three components in the example code:

  • ConfirmSelects: it activates feedback mechanism for the publisher
  • The BasicAcks event handler which is called in case the message broker has acknowledged the message from the publisher
  • The BasicNacks event handler which is triggered in case RabbitMq for some reason could not acknowledge a message. In this case you can re-send a message if it’s of critical importance
private static void SetUpDirectExchange()
{
	ConnectionFactory connectionFactory = new ConnectionFactory();

	connectionFactory.Port = 5672;
	connectionFactory.HostName = "localhost";
	connectionFactory.UserName = "accountant";
	connectionFactory.Password = "accountant";
	connectionFactory.VirtualHost = "accounting";

	IConnection connection = connectionFactory.CreateConnection();
	IModel channel = connection.CreateModel();

	channel.ExchangeDeclare("my.first.exchange", ExchangeType.Direct, true, false, null);			
	channel.QueueDeclare("my.first.queue", true, false, false, null);			
	channel.QueueBind("my.first.queue", "my.first.exchange", "");
	channel.ConfirmSelect();
	channel.BasicAcks += Channel_BasicAcks;
	channel.BasicNacks += Channel_BasicNacks;

	IBasicProperties properties = channel.CreateBasicProperties();					
	PublicationAddress address = new PublicationAddress(ExchangeType.Direct, "my.first.exchange", "");		
	channel.BasicPublish(address, properties, Encoding.UTF8.GetBytes("This is a message from the RabbitMq .NET driver"));		
			
	channel.Close();
	connection.Close();
}

private static void Channel_BasicNacks(object sender, BasicNackEventArgs e)
{
	Console.WriteLine(string.Concat("Message broker could not acknowledge message with tag: ", e.DeliveryTag));
}

private static void Channel_BasicAcks(object sender, BasicAckEventArgs e)
{
	Console.WriteLine(string.Concat("Message broker has acknowledged message with tag: ", e.DeliveryTag));
}

Unacknowledged messages

A recurring line of code in our demos was that the message registered by the receiver had to be acknowledged:

channel.BasicAck(deliveryTag, false);

The reason we did that was that the “noAck” flag in the channel.BasicConsume function was set to false. That configuration ensures that a message is not deleted from the queue until it has been acknowledged by the consumer:

channel.BasicConsume("my.first.queue", false, basicConsumer);

As soon as that publisher has acknowledged the message it is deleted from the queue. However, what if there is no acknowledgement? It can happen in at least two cases:

  • If there’s an exception during the message processing then the receiver might want to force resending the message. The message will then be requeued and redelivered
  • If the consumer crashes after receiving the message but before sending the acknowledgement then either another consumer instance can receive it or the same instance after rebooting

The “redelivered” flag will be true in both cases so you’ll have to consider it in your code. If a message is delivered more than once then there might be something wrong with it.

To actively “unacknowledge” a message use the BasicNack function:

channel.BasicNack(deliveryTag, false, true);

The first two parameters are the same as for BasicAck, the last one means whether the message should be requeued.

If “noAck” in BasicConsume is set to true then the message will be deleted from the queue as soon as it’s been delivered.

Object serialisation

So far we’ve only sent simple string messages back and forth. However, in reality you’ll often need to send objects across the wire. We took up data serialisation in the original series and those posts are still applicable. Keep in mind that the receiver examples in that series are based on an outdated object. Make sure you use the receiver techniques we’ve seen in this blog.

Large messages

Sometimes we need to send large messages through RabbitMq, although the idea with messaging is that message size should be as little as possible. In case you need a solution for large messages you can refer back to the previous RabbitMq series. Like above, make sure you update the receiver:

RabbitMQ in .NET: handling large messages

In general you should avoid large messages and message dependencies. This latter means that message A is followed by message B which in turn is followed by message C and the three are correlated, then having one exception can cause an unnecessarily complex message handling logic.

Exception handling

Finally we have the topic of exception handling. The following posts from the previous series…

…take up the following topics:

  • Rejecting a faulty message in the receiver
  • Requeing a faulty message for a retry
  • Retries

That’s the end of this series. The next series will also be related to messaging: we’ll take a look at the MassTransit service bus solution.

View the list of posts on Messaging here.

Advertisements

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

ultimatemindsettoday

A great WordPress.com site

Elliot Balynn's Blog

A directory of wonderful thoughts

Robin Sedlaczek's Blog

Developer on Microsoft Technologies

HarsH ReaLiTy

A Good Blog is Hard to Find

Softwarearchitektur in der Praxis

Wissenswertes zu Webentwicklung, Domain-Driven Design und Microservices

the software architecture

thoughts, ideas, diagrams,enterprise code, design pattern , solution designs

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

Anything around ASP.NET MVC,WEB API, WCF, Entity Framework & AngularJS

Cyber Matters

Bite-size insight on Cyber Security for the not too technical.

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: