Messaging through a service bus in .NET using MassTransit part 8: observing events in the bus

Introduction

In the previous post we learned how to intercept messages in MassTransit. We can intercept messages using various observer interfaces when sending, publishing, receiving and consuming messages. We cannot modify the message content so they are read-only operations. The implemented interfaces need to be registered with the bus so that we can do any extra work on the messages such as logging or debugging.

In this post we’ll see how to register events that happen in the bus.

The bus observer

The IBusObserver interface is similar to the ones we saw in the previous post in that it has a couple of events that are triggered at various stages of the bus life cycle. All methods return a Task so they are prepared for the async-await paradigm.

The following BusObserver class in the MassTransit.Receiver project of the demo application shows a minimal implementation:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MassTransit.Receiver
{
	public class BusObserver : IBusObserver
	{
		private readonly string _logger = "[BusObserver]";

		public async Task CreateFaulted(Exception exception)
		{
			Console.WriteLine(string.Concat(_logger, ": Bus exception: ", exception.Message));
			await Task.FromResult(0);
		}

		public async Task PostCreate(IBus bus)
		{
			Console.WriteLine(string.Concat(_logger, ": Bus has been created with address ", bus.Address));
			await Task.FromResult(0);
		}

		public async Task PostStart(IBus bus, Task busReady)
		{
			Console.WriteLine(string.Concat(_logger, ": Bus has been started with address ", bus.Address));
			await busReady;
		}

		public async Task PostStop(IBus bus)
		{
			Console.WriteLine(string.Concat(_logger, ": Bus has been stopped with address ", bus.Address));
			await Task.FromResult(0);
		}

		public async Task PreStart(IBus bus)
		{
			Console.WriteLine(string.Concat(_logger, ": Bus is about to start with address ", bus.Address));
			await Task.FromResult(0);
		}

		public async Task PreStop(IBus bus)
		{
			Console.WriteLine(string.Concat(_logger, ": Bus is about to stop with address ", bus.Address));
			await Task.FromResult(0);
		}

		public async Task StartFaulted(IBus bus, Exception exception)
		{
			Console.WriteLine(string.Concat(_logger, ": Bus exception at start-up: ", exception.Message
				, " for bus ", bus.Address));
			await Task.FromResult(0);
		}

		public async Task StopFaulted(IBus bus, Exception exception)
		{
			Console.WriteLine(string.Concat(_logger, ": Bus exception at shut-down: ", exception.Message
				, " for bus ", bus.Address));
			await Task.FromResult(0);
		}
	}
}

The method names reveal that we can monitor events related to creating, starting and stopping the bus. We can also observe exceptions. The IBus object offers most bus-related methods and functions we used for our IBusControl object such as Send, Publish, ConnectConsumer etc., so there’s nothing stopping us from sending an extra message to a queue from within the bus observer.

We can register the bus observer within the block where we create the IBusControl object. The RunMassTransitReceiverWithRabbit method in Program.cs of MassTransit.Receiver where we start and stop the bus can be extended with an extra line of code:

IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
	IRabbitMqHost rabbitMqHost = rabbit.Host(new Uri("rabbitmq://localhost:5672/accounting"), settings =>
	{
		settings.Password("accountant");
		settings.Username("accountant");
	});

	rabbit.BusObserver(new BusObserver());
        //rest of code ignored
}

I’ve commented out all other observers we created in the previous post to easily identify the messages from the bus observer. We can additionally put a Console.ReadKey after the code that stops the bus so that we can see the messages related to stopping the bus:

rabbitBusControl.Start();
Console.ReadKey();

rabbitBusControl.Stop();
Console.ReadKey();

For the demo it is enough to start the receiver, we don’t need to send messages here. There should be messages similar to the following after a successful bus start-up:

[BusObserver]: Bus has been created with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d6puubdj658n3ra?durable=false&autodelete=true&prefetch=8
[BusObserver]: Bus is about to start with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d6puubdj658n3ra?durable=false&autodelete=true&prefetch=8
[BusObserver]: Bus has been started with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d6puubdj658n3ra?durable=false&autodelete=true&prefetch=8

Press a key so that the bus is stopped. We’ll see some additional messages:

[BusObserver]: Bus is about to stop with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d6puubdj658n3ra?durable=false&autodelete=true&prefetch=8
[BusObserver]: Bus has been stopped with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d6puubdj658n3ra?durable=false&autodelete=true&prefetch=8

We can also simulate an exception where RabbitMq is unreachable. Open the Services application – search for “Services” using the standard Windows search function – and locate the service called RabbitMQ and press Stop:

stop-the-rabbitmq-service-for-masstransit

Start the Receiver now. You should see that the same RabbitMqConnectionException is thrown repeatedly with exception messages similar to the following:

[BusObserver]: Bus has been created with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d97ucbdj658joby?durable=false&autodelete=true&prefetch=8
[BusObserver]: Bus is about to start with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d97ucbdj658joby?durable=false&autodelete=true&prefetch=8
RabbitMQ connection failed: Connect failed: accountant@localhost:5672/accounting
[BusObserver]: Bus is about to stop with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d97ucbdj658joby?durable=false&autodelete=true&prefetch=8
[BusObserver]: Bus has been stopped with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d97ucbdj658joby?durable=false&autodelete=true&prefetch=8
[BusObserver]: Bus exception at start-up: Connect failed: accountant@localhost:5672/accounting for bus rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d97ucbdj658joby?durable=false&autodelete=true&prefetch=8
RabbitMQ connection failed: Connect failed: accountant@localhost:5672/accounting
[BusObserver]: Bus is about to start with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d97ucbdj658joby?durable=false&autodelete=true&prefetch=8
RabbitMQ receive transport failed: The supervisor is stopping, no additional scopes can be created
RabbitMQ receive transport failed: The supervisor is stopping, no additional scopes can be created
RabbitMQ receive transport failed: The supervisor is stopping, no additional scopes can be created
RabbitMQ receive transport failed: The supervisor is stopping, no additional scopes can be created
[BusObserver]: Bus is about to stop with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d97ucbdj658joby?durable=false&autodelete=true&prefetch=8
[BusObserver]: Bus has been stopped with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d97ucbdj658joby?durable=false&autodelete=true&prefetch=8
[BusObserver]: Bus exception at start-up: The supervisor is stopping, no additional scopes can be created for bus rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d97ucbdj658joby?durable=false&autodelete=true&prefetch=8
RabbitMQ receive transport failed: The supervisor is stopping, no additional scopes can be created

The above test also reveals how MassTransit doesn’t just give up if RabbitMq is not reachable. Instead, it will try to connect to the provided RabbitMq endpoint repeatedly.

In fact we can perform at least one more test. Restart the RabbitMQ service in the Services window and start the Receiver project. Let the bus be created and started. Then stop the RabbitMQ service again. You’ll see that the bus will start complaining:

[BusObserver]: Bus has been started with address rabbitmq://localhost:5672/accounting/bus-ANDRASPC1-MassTransit.Receiver.vshost-4g7yyyg5o96d96b8bdj658qfgw?durable=false&autodelete=true&prefetch=8
Timeout waiting for consumer to exit: rabbitmq://localhost:5672/accounting/mycompany.domains.queues?prefetch=8
RabbitMQ connection failed: Connect failed: accountant@localhost:5672/accounting
RabbitMQ connection failed: Connect failed: accountant@localhost:5672/accounting
RabbitMQ connection failed: Connect failed: accountant@localhost:5672/accounting
RabbitMQ connection failed: Connect failed: accountant@localhost:5672/accounting
.
.
.

Start the RabbitMQ service again. After a couple of seconds the MassTransit bus will be up again and the “connection failed” messages will stop appearing. You can now start the Publisher and you’ll see that the message was consumed by the Consumer like before, i.e. a gap in the RabbitMq operation wasn’t fatal to MassTransit.

We’ll continue in the next post.

View the list of posts on Messaging here.

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

ultimatemindsettoday

A great WordPress.com site

iReadable { }

.NET Tips & Tricks

Robin Sedlaczek's Blog

Love to use Microsoft Technologies

HarsH ReaLiTy

A Good Blog is Hard to Find

Ricos Blog zu Softwaredesign- und architektur

Ideen und Gedanken rund um Softwaredesign und -architektur, Domain-Driven Design, C# und Windows Azure

the software architecture

thoughts, ideas, diagrams,enterprise code, design pattern , solution designs

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

Anything around ASP.NET MVC,WEB API, WCF, Entity Framework & AngularJS

Cyber Matters

Bite-size insight on Cyber Security for the not too technical.

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: