Thread-safe, async MVVM-style messaging in Blazor

It’s been nearly three years since I described a message broker we use for sending messages around Blazor applications.

It worked well for us up until now, but today I hit a problem with it. A component was sending out a message as part of a section of code that accessed the database, and another component that picked up the message was accessing the database to get the changes. I know I could have just sent the updated data in the message, but there are some (vaguely) good reasons why I didn’t do that.

For some reason, I kept hitting the dreaded “A second operation was started on this context instance before a previous operation completed” exception. As we inject instances of IDbContextFactory everywhere, we rarely see this, however, it was biting us here.

Now we’ve used the message broker with async operations before, including ones that involved database operations, and never had this issue, so I was somewhat baffled as to why it was happening now. However, one thing was clear, and that was that a purely synchronous message broker is going to hit a problem with async calls sooner or later. So, it was time to rethink.

What I ended up with wasn’t that different in usage from the old static message broker, but is thread-safe, and seems to work fine with async operations.

The message broker class

In order to avoid a clash with the existing MessageBroker class that I showed in the previous post, I named this one ComponentMessageBroker (brilliant piece of naming there eh 😎)…

public class ComponentMessageBroker(ILogger<ComponentMessageBroker> logger) {
  private readonly ConcurrentDictionary<Type, Delegate> _handlers = new();

  public async Task SendMessage<T>(T data) {
    try {
      if (_handlers.TryGetValue(typeof(T), out Delegate? handler)) {
        if (handler is Func<T, Task> asyncHandler) {
          await asyncHandler(data);
        } else {
          logger.LogWarning("Handler for type {Type} is not async", typeof(T).Name);
        }
      }
    }
    catch (Exception ex) {
      logger.LogError(ex, "Error processing message of type {Type}", typeof(T).Name);
      throw;
    }
  }

  public void Subscribe<T>(Func<T, Task> handler) =>
    _handlers.AddOrUpdate(typeof(T), handler, (_, existingHandler) => {
      if (existingHandler is Func<T, Task> existing) {
        return (Func<T, Task>)(async data => {
          await existing(data);
          await handler(data);
        });
      }
      return handler;
    });

  public void Unsubscribe<T>(Func<T, Task> handler) {
    if (_handlers.TryGetValue(typeof(T), out Delegate? currentHandler)) {
      if (currentHandler is Func<T, Task> existingHandler) {
        if (existingHandler.Equals(handler)) {
          // This is the only handler, so remove it completely
          _handlers.TryRemove(typeof(T), out _);
        } else if (existingHandler.GetInvocationList().Length > 1) {
          // There are multiple handlers, so remove just this one
          Delegate? newDelegate = Delegate.Remove(existingHandler, handler);
          _handlers.TryUpdate(typeof(T), newDelegate, currentHandler);
        }
      }
    }
  }
}

As you can see, this one includes logging and exception handling, so is more robust than the previous one.

Sending and receiving messages

When a component wants to send or receive messages, they need to inject an instance of the message broker…

  [Inject]
  public ComponentMessageBroker ComponentMessageBroker { get; set; } = null!;

Sending messages is as simple as…

await ComponentMessageBroker.SendMessage(90125);

Receiving messages can be done by calling the Subscribe method and supplying a message hanlder. To avoid memory leaks, the component should implement IDisposable, and call the Unsubscribe method…

  protected override async Task OnInitializedAsync() =>
    ComponentMessageBroker.Subscribe<int>(MessageReceived);

  public void Dispose() =>
    ComponentMessageBroker.Unsubscribe<int>(MessageReceived);

The Subscribe and Unsubscribe methods are generic over the message type, so components will only receive messages of the type(s) to which they subscribed. A coponent can subscribe to as many types as required, as long as a handler of the appropriate type is supplied for each.

The message handler is an async Task method whose single parameter is of the type specified when subscribing…

private async Task MessageReceived(int n) =>
  // Do whatever you want here

Be First to Comment

Leave a Reply

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.