Building a Message Bus with .NET Core and RabbitMQ

Fatih Dumanlı
7 min readNov 3, 2020

Designing a distributed system, not an easy task. Processes that run on different locations need to communicate somehow. They store data, process data, and publish events/commands to be handled by another process.

Legacy applications manage this concern using an RDBMS for communication. Generally, the emitter process writes the message on a table for whom may be interested, the worker process reads the messages and processes the message. This approach works but fails in terms of availability and scalability.

Message queues step in at this point to make our applications scalable and resilient.

What are Commands and Events

Command: Commands are the messages which are sent to a specific destination to demand a task from the application to which the command is sent. Commands cause a state change in the system and must be named in the imperative form. Like CreateOrderCommand, MarkAccountAsSuspendedCommand

Commands are sent to a single endpoint

Event: Events are messages that tell something notable happened in the publishing/emitting system. Events are the consequences of the commands. Each command causes an event to occur. In contrast with commands, events are published to the whole application instead of a single one. Events must be named in past tense like OrderCreated, AccountHasMarkedAsSuspended

Events are something happened in the past and sent to all endpoints

We need to keep our system in a consistent state and we need to establish resilient connections. I prefer to keep these topics superficial. They need to be elaborated in a different story.

My first challenge when getting familiar with a tool/framework is creating a Hello World example with that tool. After created a vanilla example, I’m getting eager to learn more and I start to elaborate on my example.

Each endpoint in a distributed system
1. Sends commands
2. Publishes events
3. Handles events/commands

Begin with a high-level picture.

Before delving into details, let’s see a high-level picture of what we’re about to create. I dashed down the diagram below to illustrate how processes talk to each other in a distributed system.

This is a very high-level picture of a distributed system. Beginning with high-level pictures is always a good idea.

I dash off three clients communicating with each other over RabbitMQ. Each client has its own queue to get messages which they are interested in. This queue is used for only receiving messages. When publishing, clients publish their messages to a centralized place called “Exchange” in terms of RabbitMQ.

Endpoints receive messages to their own queue, publishes messages directly to the exchange.

Send Messages To A Centralized Place

As shown above, each process sends a message when something notable happens in that context. Messages are sent to a centralized place like RabbitMQ Message Exchange for the sake of simplicity. They could be sent to the queues directly which are interested in this event, but this approach could become a nightmare. — Remember the mediator pattern — . Sending messages directly to the processes violates the idea behind of distributed system. We need to decouple components instead of coupling them. A centralized place is a good place to distribute all of the messages. Like a control tower.

Control towers acts as a message broker between pilots.
Control towers acts as a message broker between pilots.

Challenges

We have a bunch of challenges when implementing an event bus with RabbitMQ.

  1. Resilient connections
  2. Retries with exponential backoff
  3. Prevent data loss due to network issues/timeouts.

For commercial use, I strongly recommend using an abstraction layer like NServiceBus or Azure Service Bus to turn over those responsibilities to one of these products. But grasping how this concern is handled is important. I am going to try to cover how to handle these concerns at a basic level.

Ingredients

  1. Visual Studio Code
  2. .NET Core SDK installed
  3. RabbitMQ Client NuGet package
  4. Polly NuGet package

Creating the Project

A service bus (or a message bus, event bus) is a layer of abstraction for clients to publish/receive messages. The service bus is also responsible for error handling and making resilient connections to increase availability.

We’re going to create 2 building blocks.

  1. RabbitMQ Adapter: RabbitMQ adapter is a kind of brokerage between our service bus and official RabbitMQ.Client NuGet package. It manages relatively low-level operations to communicate with the RabbitMQ server here.
  2. MessageBus endpoint: The class in which clients interact to send commands and publish events.
A Service Bus implementation

I name this project ‘EventTower’ and when I reference the word ‘EventTower’ consider I reference the project we are about to create.

EventTower solution consists of two .NET class library.

  1. EventTower: a class library holds the whole logic. Includes RabbitMQAdapter and MessageBusEndpoint classes.
  2. EventTower.Messages: a class library that includes only message contracts.

Separating EventTower.Messages assembly from the main one makes sense because the messages used by multiple clients should be included in the common class library. The common class library can reference this EventTower.Messages assembly without referencing main MessageBus assembly.

1. Create the Messages Project

Create an empty folder, name it what you want to. And open the folder with Visual Studio Code.

Through the .NET Core CLI, run the following command to create a class library called EventTower.Messages. This project will include only message contracts. (IMessage.cs, IEvent.cs and ICommand.cs)

dotnet new classlib --name EventTower.Messages -f netcoreapp3.1

Create the interfaces

ust create the interfaces IMessage.cs, ICommand.cs and IEvent.cs. ICommand and IEvent interfaces must be implemented IMessage interface. (Actually, there is nothing to implement, they are just marker interface.)

We created three interfaces. IEvent and ICommand are also a IMessage. We do this to manage some common operations for both events and commands.
And we are done with EventTower.Messages project.

2. Create the Service Bus project

Now we need to create the Message Bus project which includes RabbitMQAdapter, MessageBusEndpoint components.

Run the following command through CLI at root folder.

dotnet new classlib --name EventTower -f netcoreapp3.1

Add RabbitMQ.Client and Polly packages

dotnet add package RabbitMQ.Client
dotnet add package Polly

This command creates a new class library named EventTower. Before going ahead, let’s look at what we have.

We have a Message Contract project and an empty class library called MessageBus that we just created.

Create a Message Handler interface

A message handler is a class in which received messages are handled in. Clients invoke their domain model/business logic when Handle() method is triggered by our service bus implementation.

Let’s create a generic interface for message handler classes with a single method named Handle(). Set IMessage constraint to avoid creating a handler class for any class other than IMessage. IMessageHandler<T> interface must be implemented for only IEvent and ICommand types.

Add a RabbitMQ Adapter

A RabbitMQ Adapter is the interface that is used to access the RabbitMQ server and perform publish/subscribe operations. Creating such an abstraction layer will make unit testing easy.

Using a retry policy when connecting to RabbitMQ Server

Connecting to the server might be interrupted due to network issues, we implement a retry policy to automatically retries to connect to the server in case of any interruption in TryConnect() method.

Publishing a command

When publishing a command, the adapter uses the BasicPublish() method of RabbitMQ.Client library. Adapter sends commands to the CommandExchange, which is a DirectExchange used to deliver the message to a single receiver. Sets the routingKey as destination endpoint. (i.e. Ordering)

Publishing an event

Unlike the commands, events are published to all endpoints in the system. So we’re going to FanoutExchange here. No need to set routingKey since it publishes the message to all receivers.

Creating consumer channel and set for receiving

RabbitMQAdapter creates a consumer channel for the endpoint. The consumer channel listens to both CommandExchange and EventExchange and declares a queue named with the endpoint name. Takes the commands which are sent to itself, and all events from the exchanges and puts those in the queue.

Once a command or event is received, RabbitMQAdapter raises an event called ‘MessageReceived’.

And the whole RabbitMQAdapter implementation looks like this:

Create MessageBusEndpoint class

MessageEndpoint is the class the clients are interacting with. Clients send command, publish events through a MessageEndpoint instance. This class listens for ‘MessageReceived’ event of RabbitMQAdapter. When the event is raised, perform some reflection magic and triggers Handle() method of received message handler.

How clients interact with the service bus?

An example of usage is shown below.

How the messages are handling?

Clients are handling messages by implementing IMessageHandler<T> interface.

The entire source code is available on GitHub.com/fatihdumanli/EventTower.

Further reading

Part 1: RabbitMQ Best Practices

--

--