Messaging through a service bus in .NET using MassTransit part 7: intercepting messages

Introduction

In the previous post we investigated how types are handled by MassTransit and how they are represented in RabbitMq exchanges and queues. With MassTransit we can send and receive concrete objects and it will take care of creating the necessary queues and exchanges for us. The objects are serialised and deserialised without us having to worry about JSON or XML strings.

In this post we’ll see how to intercept messages both in the publisher and receiver.

The message observers

Messages can be intercepted and inspected using 5 different interfaces in MassTransit. Note that we cannot modify the messages with them, i.e. they are read-only operations.

They can be divided into two groups:

Publisher-based message observers

These are the observers that intercept published and sent messages of any message type. They are not generic in the sense that we cannot intercept just any type of message.

  • ISendObserver
  • IPublishObserver

Consumer-based message observers

With these observers we can intercept messages received and consumed.

  • IReceiveObserver: to intercept any received message of any concrete type
  • IConsumeObserver: to intercept any consumed message of any concrete type
  • IConsumeMessageObserver of T: the generic version of IConsumeObserver to intercept specific message types consumed

Intercepting messages received

IReceiveObserver is the largest of the options with 5 methods to be implemented:

  • ConsumeFault
  • PostConsume
  • PostReceive
  • PreReceive
  • ReceiveFault

The method names are descriptive of their respective role. All of them return a Task i.e. they are well suited for await-async operations. Insert the following MessageReceiveObserver class into the MassTransit.Receiver project of the demo application:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MassTransit.Receiver
{
	public class MessageReceiveObserver : IReceiveObserver
	{
		private readonly string _logger = "[MessageReceiveObserver]";

		public async Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception) where T : class
		{
			Console.WriteLine(string.Concat(_logger, ": A fault is consumed for type ", context.Message.GetType(), ". Consumer type: ", consumerType, ", exception message: ", exception.Message));
			await context.CompleteTask;
		}

		public async Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType) where T : class
		{
			Console.WriteLine(string.Concat(_logger, ": A message is consumed for type ", context.Message.GetType(), 
				". Consumer type: ", consumerType, ", duration in millis: ", duration.TotalMilliseconds));
			await context.CompleteTask;
		}

		public async Task PostReceive(ReceiveContext context)
		{
			Console.WriteLine(string.Concat(_logger, ": A message is past the receive stage. Has been delivered to at least one consumer: ", 
				context.IsDelivered));
			await context.CompleteTask;
		}

		public async Task PreReceive(ReceiveContext context)
		{
			Console.WriteLine(string.Concat(_logger, ": A message is before the receive stage. Has been delivered to at least one consumer: ",
				context.IsDelivered));
			await context.CompleteTask;
		}

		public async Task ReceiveFault(ReceiveContext context, Exception exception)
		{
			Console.WriteLine(string.Concat(_logger, ": A fault is received with exception ", exception.Message));
			await context.CompleteTask;
		}
	}
}

We need to register the observer with the bus control. Add the following line of code…

rabbitBusControl.ConnectReceiveObserver(new MessageReceiveObserver());

…just after…:

rabbitBusControl.Start();

…in the RunMassTransitReceiverWithRabbit method in Program.cs of MassTransit.Receiver. Start the receiver project and then the publisher MassTransit.Publisher. You can set breakpoints within the various methods of the observer to see how and when they are triggered. If everything goes fine then you should see messages similar to the following in the consumer’s command window:

[MessageReceiveObserver]: A message is before the receive stage. Has been delivered to at least one consumer: False
New domain registered. Target and importance: Customers / 1
[MessageReceiveObserver]: A message is consumed for type DynamicInternal08d3ec860054c792fc3fdb87d1ba0000.MyCompany.Messaging.IRegisterDomain. Consumer type: MassTransit.Receiver.RegisterDomainConsumer, duration in millis: 3,8467
A new customer has signed up, it’s time to register it in the command receiver. Details:
New Street
Nice people LTD
dc049caf-e645-4d31-8671-81b4f29178b0
True
The concrete customer repository was called for customer Nice people LTD
[MessageReceiveObserver]: A message is consumed for type DynamicInternal08d3ec860054c792fc3fdb87d1ba0000.MyCompany.Messaging.IRegisterCustomer. Consumer type: MassTransit.Receiver.RegisterCustomerConsumer, duration in millis: 84,9193
[MessageReceiveObserver]: A message is past the receive stage. Has been delivered to at least one consumer: True

Note that both IRegisterDomain and IRegisterCustomer are observed as they should. The ConsumeContext class is like the one we saw in the various consumer classes like RegisterCustomerConsumer, i.e. it offers the same level of information like the conversation ID and all the rest.

Let’s now try the consume observer interface. It has the PostConsume and ConsumeFault methods like above but it also has a PreConsume method, i.e. we get to analyse the context before the message is consumed.

Here’s a minimal implementation of the IConsumeObserver interface in the MassTransit.Receiver project:

using MassTransit.Pipeline;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MassTransit.Receiver
{
	public class MessageConsumeObserver : IConsumeObserver
	{
		private readonly string _logger = "[MessageConsumeObserver]";
		public async Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
		{			
			Console.WriteLine(string.Concat(_logger, ": A fault is consumed for type ", 
				context.Message.GetType(), ", exception message: ", exception.Message));
			await context.CompleteTask;
		}

		public async Task PostConsume<T>(ConsumeContext<T> context) where T : class
		{
			Console.WriteLine(string.Concat(_logger, ": A message is past the consume stage for type ", context.Message.GetType()));
			await context.CompleteTask;
		}

		public async Task PreConsume<T>(ConsumeContext<T> context) where T : class
		{
			Console.WriteLine(string.Concat(_logger, ": A message is before the consume stage for type ", context.Message.GetType()));
			await context.CompleteTask;
		}
	}
}

Again, we need to register the observer with the bus control:

//rabbitBusControl.ConnectReceiveObserver(new MessageReceiveObserver());
rabbitBusControl.ConnectConsumeObserver(new MessageConsumeObserver());

I commented out the MessageReceiveObserver registration so that we can easily see the messages from the consumer observer. The same test yields the following output in the receiver’s command window:

[MessageConsumeObserver]: A message is before the consume stage for type DynamicInternal08d3ec87df9416fbfc3fdb87d1ba0000.MyCompany.Messaging.IRegisterCustomer
A new customer has signed up, it’s time to register it in the command receiver. Details:
New Street
Nice people LTD
e2a1a036-d5f5-417d-83a8-d3d0b0a0ab76
True
The concrete customer repository was called for customer Nice people LTD
[MessageConsumeObserver]: A message is past the consume stage for type DynamicInternal08d3ec87df9416fbfc3fdb87d1ba0000.MyCompany.Messaging.IRegisterCustomer
[MessageConsumeObserver]: A message is before the consume stage for type DynamicInternal08d3ec87df9416fbfc3fdb87d1ba0000.MyCompany.Messaging.IRegisterDomain
New domain registered. Target and importance: Customers / 1
[MessageConsumeObserver]: A message is past the consume stage for type DynamicInternal08d3ec87df9416fbfc3fdb87d1ba0000.MyCompany.Messaging.IRegisterDomain

The generic IConsumeMessageObserver of T interface is the same as the non-generic IConsumeObserver but for specific message types, e.g.:

public class RegisterCustomerMessageObserver : IConsumeMessageObserver<IRegisterCustomer>
{
//code ignored
}

It has the same methods as IConsumeObserver so we’ll not go through a separate example for that. It too requires to be registered with the bus:

rabbitBusControl.ConnectConsumeMessageObserver(new RegisterCustomerMessageObserver());

Intercepting messages sent

The usage of ISendObserver and IPublishObserver are about the same as the 3 receiver observers above. Note that at the time of writing this post there’s no generic version of either of these interfaces. Here’s an ISendObserver that I’m adding to the MassTransit.Publisher console application:

using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MassTransitTests
{
	public class SendObjectObserver : ISendObserver
	{
		private readonly string _logger = "[SendObjectObserver]";
		public async Task PostSend<T>(SendContext<T> context) where T : class
		{
			Console.WriteLine(string.Concat(_logger, ": A message is post the sending stage for type ", context.Message.GetType()));
			await Task.FromResult(0);
		}

		public async Task PreSend<T>(SendContext<T> context) where T : class
		{
			Console.WriteLine(string.Concat(_logger, ": A message is before the sending stage for type ", context.Message.GetType()));
			await Task.FromResult(0);
		}

		public async Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
		{
			Console.WriteLine(string.Concat(_logger, ": A message fault is sent for the type ", context.Message.GetType()
				, ", exception: ", exception.Message));
			await Task.FromResult(0);
		}
	}
}

It needs to be registered with bus in the RunMassTransitPublisherWithRabbit method:

rabbitBusControl.ConnectSendObserver(new SendObjectObserver());

…just after the IBusControl declaration.

The publisher’s command window will show the following messages:

[SendObjectObserver]: A message is before the sending stage for type DynamicInternal08d3ec8af8a9b9bffc3fdb87d1ba0000.MyCompany.Messaging.IRegisterCustomer
[SendObjectObserver]: A message is post the sending stage for type DynamicInternal08d3ec8af8a9b9bffc3fdb87d1ba0000.MyCompany.Messaging.IRegisterCustomer

The IPublishObserver has the same function but it’s for intercepting published messages. You can surely implement it yourself after these examples. It needs to be registered with the IBusControl.ConnectPublishObserver method.

So, in case you want to do something extra with the messages sent, published, consumed or received then it’s a piece of cake using these interfaces.

We’ll continue in the next post by looking at intercepting bus-related events.

View the list of posts on Messaging here.

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

2 Responses to Messaging through a service bus in .NET using MassTransit part 7: intercepting messages

  1. ramessesx says:

    Love these posts Andras, thanks again for your time in putting them together.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

ultimatemindsettoday

A great WordPress.com site

iReadable { }

.NET Tips & Tricks

Robin Sedlaczek's Blog

Love to use Microsoft Technologies

HarsH ReaLiTy

A Good Blog is Hard to Find

Ricos Blog zu Softwaredesign- und architektur

Ideen und Gedanken rund um Softwaredesign und -architektur, Domain-Driven Design, C# und Windows Azure

the software architecture

thoughts, ideas, diagrams,enterprise code, design pattern , solution designs

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

Anything around ASP.NET MVC,WEB API, WCF, Entity Framework & AngularJS

Cyber Matters

Bite-size insight on Cyber Security for the not too technical.

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: