Monday, November 25, 2013

EasyNetQ’s Minimalist DI Container

I’ve been a long time fan of IoC (or DI) containers ever since I first discovered Castle Windsor back in 2007. I’ve used Windsor in every major project I’ve been involved in since then, and if you’d gone to a developer event in the late naughties, you may well have encountered me speaking about Windsor. Indeed, Seb Lambla had the cheek to call me ‘Windsor man’. I shall label him ‘Rest-a-man’ in revenge.

When I started working on EasyNetQ in 2011, I initially thought it would be a very simple lightweight wrapper around the RabbitMQ.Client library. The initial versions were very procedural, ‘just get it done’, script-ish code burps. But as it turned into a more serious library, I started to factor the different pieces into more SRPish classes using dependency injection. At this point I was doing poor-man’s dependency injection, with an initial piece of wire-up code in the RabbitHutch class.

As EasyNetQ started to gain some traction outside 15below, I began to get questions like, “how do I use a different serializer?” And “I don’t like your error handling strategy, how can I implement my own?” I was also starting to get quite bored of maintaining the ever-growing wire up code. The obvious solution was to introduce a DI container, but I was very reluctant to take a dependency on something like Windsor. Writing a library is a very different proposition than writing an application. Every dependency you introduce is a dependency that your user also has to take. Imagine you are a happily using AutoFac and suddenly Castle.Windsor appears in your code base, or even worse, you are an up-to-date Windsor user, but EasyNetQ insists on installing an old version of Windsor alongside. Nasty. Windsor is an amazing library, some of its capabilities are quite magical, but I didn’t need any of these advanced features in EasyNetQ. In fact I could be highly constrained in my DI container requirements:

  • I only needed singleton instances.
  • The lifetime of all DI provided components matches the lifetime of the main IBus interface.
  • I didn’t need the container to manage component disposal.
  • I didn’t need open generic type registration.
  • I was happy to explicitly register all my components, so I didn’t need convention based registration.
  • I only needed constructor injection.
  • I can guarantee that a component implementation will only have a single constructor.

With this highly simplified list of requirements, I realised that I could write a very simple DI container in just few lines of code (currently 113 as it turns out).

My super simple container only provides two registration methods. The first takes a service interface type and a instance creation function:

IServiceRegister Register<TService>(Func<IServiceProvider, TService> serviceCreator) where TService : class;

The second takes an instance type and an implementation type:

IServiceRegister Register<TService, TImplementation>()
where TService : class
where TImplementation : class, TService;

These are both defined in a IServiceRegister interface. There is also a single Resolve method in an IServiceProvider interface:

TService Resolve<TService>() where TService : class;

You can see all 113 lines of the implementation in the DefaultServiceProvider class. As you can see, it’s not at all complicated, just three dictionaries, one holding the list of service factories, another holding a list of registrations, and the last a list of instances. Each registration simply adds a record to the instances dictionary. When Resolve is called, a bit of reflection looks up the implementation type’s constructor and invokes it, recursively calling resolve for each constructor argument. If the service is provided by a service factory, it is invoked instead of the constructor.

I didn’t have any worries about performance. The registration and resolve code is only called once when a new instance of IBus is created. EasyNetQ is designed to be instantiated at application start-up and for that instance to last the lifetime of the application. For 90% of applications, you should only need a single IBus instance.

EasyNetQ’s component are registered in a ComponentRegistration class. This provides an opportunity for the user to register services ahead of the default registration, and since the first to register wins, it provides an easy path for users to replace default services implementations with their own. Here’s an example of replacing EasyNetQ’s default console logger with a custom logger:

var logger = new MyLogger();
var bus = RabbitHutch.CreateBus(connectionString, x => x.Register<IEasyNetQLogger>(_ => logger));

You can replace pretty much any of the internals of EasyNetQ in this way: the logger, the serializer, the error handling strategy, the dispatcher threading model … Check out the ComponentRegistration class to see the whole list.

Of course the magic of DI containers, the first rule of their use, and the thing that some people find extraordinarily hard to grok, is that I only need one call to Resolve in the entire EasyNetQ code base at line 146 in the RabbitHutch class:

return serviceProvider.Resolve<IBus>();

IoC/DI containers are often seen as very heavyweight beasts that only enterprise architecture astronauts could love, but nothing could be more wrong. A simple implementation only needs a few lines of code, but provides your application, or library, with considerable flexibility.

EasyNetQ is open source under the (very flexible) MIT licence. Feel free to cut-n-paste my DefaultServiceProvider class for your own use.

Thursday, November 21, 2013

EasyNetQ: Non-Generic Subscribe

logo_design_150

Since it’s very first version, EasyNetQ has allowed you to subscribe to a message simply by providing a handler for a given message type (and a subscription id, but that’s another discussion).

bus.Subscribe<MyMessage>("subscriptionId", x => Console.WriteLine(x.Text));

But what do you do if you are discovering the message type at runtime? For example, you might have some system which loads add-ins and wants to subscribe to message types on their behalf. Before today (version 0.24) you would have had to employ some nasty reflection mojo to deal with this scenario, but now EasyNetQ provides you with non-generic subscription methods out-of-the-box.

Just add the this using statement:

using EasyNetQ.NonGeneric;

Which provides you with these extension methods:

public static IDisposable Subscribe(this IBus bus, Type messageType, string subscriptionId, Action<object> onMessage)
public static IDisposable Subscribe(
this IBus bus,
Type messageType,
string subscriptionId,
Action<object> onMessage,
Action<ISubscriptionConfiguration> configure)
public static IDisposable SubscribeAsync(
this IBus bus,
Type messageType,
string subscriptionId,
Func<object, Task> onMessage)
public static IDisposable SubscribeAsync(
this IBus bus,
Type messageType,
string subscriptionId,
Func<object, Task> onMessage,
Action<ISubscriptionConfiguration> configure)

They are just like the Subscribe methods on IBus except that you provide a Type argument instead of the generic argument, and the message handler is an Action<object> instead of an Action<T>.

Here’s an example of the non-generic subscribe in use:

var messageType = typeof(MyMessage);
bus.Subscribe(messageType, "my_subscriptionId", x =>
{
var message = (MyMessage)x;
Console.Out.WriteLine("Got Message: {0}", x.Text);
});

Very useful I think, and one of the most commonly asked for features.
 
Happy runtime discovery!

Monday, November 18, 2013

EasyNetQ: Send Receive Pattern

From version 0.22, EasyNetQ supports a new message pattern: Send/Receive.

Whereas the Publish/Subscribe and Request/Response patterns are location transparent, in that you don't need to specify where the consumer of the message is located, the Send/Receive pattern is specifically designed for communication via a named queue. It also makes no assumptions about the types of message that can be sent via the queue. This means that you can send different types of message via the same queue.

The send/receive pattern is ideal for creating 'command pipelines', where you want a buffered channel to a single command processor.

To send a message, use the Send method on IBus, specifying the name of the queue you wish to sent the message to and the message itself:

bus.Send("my.queue", new MyMessage{ Text = "Hello Widgets!" });

To setup a message receiver for a particular message type, use the Receive method on IBus:

bus.Receive<MyMessage>("the.queue", message => Console.WriteLine("MyMessage: {0}", message.Text));

You can set up multiple receivers for different message types on the same queue by using the Receive overload that takes an Action<IReceiveRegistration>. For example, this sets up a receive for both MyMessage and MyOtherMessage:

bus.Receive("the.queue", x => x
.Add<MyMessage>(message => deliveredMyMessage = message)
.Add<MyOtherMessage>(message => deliveredMyOtherMessage = message));

If a message arrives on a receive queue that doesn't have a matching receiver, EasyNetQ will write the message to the EasyNetQ error queue with an exception saying 'No handler found for message type <the message type>'.

Note: You probably do not want to call bus.Receive more than once for the same queue. This will create a new consumer on the queue and RabbitMQ will round-robin between them. If you are consuming different types on different Receive calls (and thus different consumers), some of your messages will end up on the error queue because EasyNetQ will not find a handler for your message type associated with the consumer on which it is consumed.

Wednesday, November 13, 2013

EasyNetQ: Multiple Handlers per Consumer

A common feature request for EasyNetQ has been to have some way of implementing a command pipeline pattern. Say you’ve got a component that is emitting commands. In a .NET application each command would most probably be implemented as a separate class. A command might look something like this:

public class AddUser
{
public string Username { get; private set; }
public string Email { get; private set; }

public AddUser(string username, string email)
{
Username = username;
Email = email;
}
}

Another component might listen for commands and act on them. Previously in EasyNetQ it would have been difficult to implement this pattern because a consumer (Subscriber) was always bound to a given message type. You would have had to use the lower level IAdvancedBus binary message methods and implement your own serialization and dispatch infrastructure.

But now EasyNetQ comes with multiple handlers per consumer out of the box.

From version 0.20 there’s a new overload of the Consume method that provides a fluent way for you to register multiple message type handlers to a single consumer, and thus a single queue.

Here’s an example:

bus = RabbitHutch.CreateBus("host=localhost");

var queue = bus.Advanced.QueueDeclare("multiple_types");

bus.Advanced.Consume(queue, x => x
.Add<AddUser>((message, info) =>
{
Console.WriteLine("Add User {0}", message.Body.Username);
})
.Add<DeleteUser>((message, info) =>
{
Console.WriteLine("Delete User {0}", message.Body.Username);
})
);

Now we can publish multiple message types to the same queue:

bus.Advanced.Publish(Exchange.GetDefault(), queue.Name, false, false, 
new Message<AddUser>(new AddUser("Steve Howe", "steve@worlds-best-guitarist.com"))));
bus.Advanced.Publish(Exchange.GetDefault(), queue.Name, false, false,
new Message<DeleteUser>(new DeleteUser("Steve Howe")));

By Default, if a matching handler cannot be found for a message, EasyNetQ will throw an exception. You can change this behaviour, and simply ignore messages that do not have a handler, by setting the ThrowOnNoMatchingHandler property to false, like this:

bus.Advanced.Consume(queue, x => x
.Add<AddUser>((message, info) =>
{
Console.WriteLine("Add User {0}", message.Body.Username);
})
.Add<DeleteUser>((message, info) =>
{
Console.WriteLine("Delete User {0}", message.Body.Username);
})
.ThrowOnNoMatchingHandler = false
);

Very soon there will be a send/receive pattern implemented at the IBus level to make this even easier. Watch this space!

Happy commanding!

Wednesday, November 06, 2013

EasyNetQ: Consumer Cancellation

Consumer cancellation has been a requested feature of EasyNetQ for a while now. I wasn’t intending to implement it immediately, but a pull request by Daniel White today made me look at the whole issue of consumer cancellation from the point of view of a deleted queue. It led rapidly to a quite straightforward implementation of user cancellations. The wonders of open source software development, and the generosity of people like Daniel, never fails to impress me.

So what is consumer cancellation? It means that you can stop consuming from a queue without having dispose of the entire IBus instance and close the connection. All the IBus Subscribe methods, and the IAdvancedBus Consume methods now return an IDisposable. To stop consuming, just call Dispose like this:

var cancelSubscription = bus.Subscribe<MyMessage>("subscriptionId", MessageHandler);
.
.
// sometime later stop consuming
cancelSubscription.Dispose();

Nice :)

Tuesday, November 05, 2013

EasyNetQ: Changes to Conventions With Version 0.18

TL:DR: Exchange and queue names used to have ‘.’ replaced with ‘_’. From version 0.18 this is no longer the case.

Yesterday I announced version 0.18 of EasyNetQ. The big change was the addition of polymorphic publish and subscribe.

I forgot to mention that there’s a slight change to the conventions that EasyNetQ uses, that might affect you when you upgrade.

EasyNetQ’s publish-subscribe pattern is implemented in AMQP as follows:

  • A topic exchange named after the published type is created.
  • A queue named by concatenating the published type and the subscriber id is created.
  • The exchange is bound to the queue with the wildcard, ‘#’, binding key.

So if you have this code:

bus.Subscribe<MyMessage>("test", MessageHandler);
bus.Publish<MyMessage>(message);

Note, in my case MyMessage is in an assembly ‘EasyNetQ.Tests’ with the same namespace.

You will see this in the RabbitMQ management UI:

img title="default_binding" style="border-left-width: 0px; border-right-width: 0px; border-bottom-width: 0px; display: inline; border-top-width: 0px" border="0" alt="default_binding" src="$default_binding5.png" width="715" height="487" />

Previously The exchange and queue names would have had the ‘.’ replaced with ‘_’: ‘EasyNetQ_Tests_MyMessage:EasyNetQ_tests’.

If you upgrade some services to version 0.18, but leave others at earlier version numbers, they won’t be able to communicate.

Monday, November 04, 2013

EasyNetQ: Polymorphic Publish and Subscribe

logo_design_150

From version 18.0 of EasyNetQ, you can now subscribe to an interface, then publish implementations of that interface.

Let's look at an example. I have an interface IAnimal and two implementations Cat and Dog:

public interface IAnimal
{
    string Name { get; set; }
}

public class Cat : IAnimal
{
    public string Name { get; set; }
    public string Meow { get; set; }
}

public class Dog : IAnimal
{
    public string Name { get; set; }
    public string Bark { get; set; }
}

I can subscribe to IAnimal and receive both Cat and Dog classes, or any other class that implements IAnimal:

bus.Subscribe<IAnimal>("polymorphic_test", @interface =>
    {
        var cat = @interface as Cat;
        var dog = @interface as Dog;

        if (cat != null)
        {
            Console.Out.WriteLine("Name = {0}", cat.Name);
            Console.Out.WriteLine("Meow = {0}", cat.Meow);
        }
        else if (dog != null)
        {
            Console.Out.WriteLine("Name = {0}", dog.Name);
            Console.Out.WriteLine("Bark = {0}", dog.Bark);
        }
        else
        {
            Console.Out.WriteLine("message was not a dog or a cat");
        }
    });

Let's publish a cat and a dog:

var cat = new Cat
{
    Name = "Gobbolino",
    Meow = "Purr"
};

var dog = new Dog
{
    Name = "Rover",
    Bark = "Woof"
};

bus.Publish<IAnimal>(cat);
bus.Publish<IAnimal>(dog);

Note that I have to explicitly specify that I am publishing IAnimal. EasyNetQ uses the generic type specified in the Publish and Subscribe methods to route the publications to the subscriptions.

Happy polymorphismising!

Friday, November 01, 2013

EasyNetQ: Big Breaking Changes to Request-Response

My intensive work on EasyNetQ (our super simple .NET API for RabbitMQ) continues. I’ve been taking lessons learned from nearly two years of production and the fantastic feedback from EasyNetQ’s users, mashing this together, and making lots changes to both the internals and the API. I know that API changes cause problems for users; they break your application and force you to revisit your code. But the longer term benefits should outweigh the immediate costs as EasyNetQ slowly morphs into a solid, reliable library that really does make working with RabbitMQ as easy as possible.

Changes in version 0.17 are all around the request-response pattern. The initial implementation was very rough with lots of nasty ways that resource use could run away when things went wrong. The lack of timeouts also meant that your application could wait forever when messages got lost. Lastly the API was quite clunky, with call-backs where Tasks are a far better choice. All these problems have been corrected in this version.

API changes

There is now a synchronous Request method. Of course messaging is by nature a asynchronous operation, but sometimes you just want the simplest possible thing and you don’t care about blocking your thread while you wait for a response. Here’s what it looks like:

var response = bus.Request<TestRequestMessage, TestResponseMessage>(request);

The old call-back Request method has been removed. There was no need for it when the RequestAsync that returned a Task<TResult> was always a better choice:

var task = bus.RequestAsync<TestRequestMessage, TestResponseMessage>(request)

task.ContinueWith(response =>
{
    Console.WriteLine("Got response: '{0}'", response.Result.Text);
});

Timeouts

Timeouts are an essential ingredient of any distributed system. This probably deserves a blog post of its own, but no matter how resilient you make your architecture, if an important piece simply goes away (like the network for example), you need a circuit breaker. EasyNetQ now has a global timeout that you can configure via the connection string:

var bus = RabbitHutch.CreateBus("host=localhost;timeout=60");

Here we’ve configured the timeout as 60 seconds. The default is 10 seconds. If you make a request, but no response is received within the timeout period, a System.Timeout exception will be thrown.

If the connection goes away while a request is in-flight, EasyNetQ doesn’t wait for the timeout to fire, but immediately throws an EasyNetQException with a message saying that the connection has been lost. Your application should catch both Timeout and EasyNetQ exceptions and react appropriately.

Internal Changes

My last blog post was a discussion of the implementation options of request-response with RabbitMQ. As I said there, I now believe that a single exclusive queue for all responses to a client is the best option. Version 0.17 implements this. When you call bus.Request(…) you will see a queue created named easynetq.response.<some guid>. This will last for the lifetime of the current connection.

Happy requesting!