Publish-Subscribe
The publish-subscribe pattern allows multiple consumers to react to the same message. Messages are "fire-and-forget" from the publisher's perspective, but follow the normal C# async pipeline with async task instead of going down async void routes, or calling an async method without awaiting it.
IConsumer Interface
public interface IConsumer<TMessage>
{
Task Consume(TMessage message);
}Implement this interface to receive messages of type TMessage.
Creating a Consumer
Basic Consumer
public class OrderCreatedMessage
{
public string OrderId { get; init; }
public decimal Total { get; init; }
}
public class NotifyWarehouseConsumer : IConsumer<OrderCreatedMessage>
{
private readonly IWarehouseService _warehouse;
public NotifyWarehouseConsumer(IWarehouseService warehouse)
{
_warehouse = warehouse;
}
public async Task Consume(OrderCreatedMessage message)
{
await _warehouse.PrepareOrder(message.OrderId);
}
}Consumers are resolved from the DI container, so constructor injection works normally.
Multiple Consumers for Same Message
Multiple consumers can handle the same message type. All are invoked in parallel:
// All of these receive OrderCreatedMessage
public class NotifyWarehouseConsumer : IConsumer<OrderCreatedMessage> { ... }
public class SendConfirmationEmailConsumer : IConsumer<OrderCreatedMessage> { ... }
public class UpdateAnalyticsConsumer : IConsumer<OrderCreatedMessage> { ... }Publishing Messages
Basic Publishing
[Inject]
private IMessageBus MessageBus { get; set; }
private async Task CreateOrder()
{
// Create order logic...
await MessageBus.PublishAsync(new OrderCreatedMessage
{
OrderId = order.Id,
Total = order.Total
});
}With Timeout and Cancellation
await MessageBus.PublishAsync(
new OrderCreatedMessage { OrderId = "123" },
new PublishConfiguration
{
Timeout = TimeSpan.FromSeconds(10),
CancellationToken = cancellationToken
});PublishConfiguration Properties:
| Property | Default | Description |
|---|---|---|
Timeout | 30 seconds | Maximum time to wait for all consumers |
CancellationToken | None | Token to cancel the operation |
Registration
Automatic Discovery
Consumers are automatically discovered and registered when using AddMessageBus():
// Discovers consumers in the framework assembly
builder.Services.AddMessageBus();
// Discovers consumers in multiple assemblies
builder.Services.AddMessageBus(
typeof(Program).Assembly,
typeof(MyPlugin).Assembly);After the service provider is built:
app.Services.UseMessageBus();Manual Registration
For explicit control:
// Register consumer type (resolved from DI)
messageBus.RegisterConsumerType<OrderCreatedMessage, NotifyWarehouseConsumer>();ComponentConsumer for Blazor
Use ComponentConsumer<TMessage> when a Blazor component needs to receive messages:
@inherits ComponentConsumer<ThemeChangedMessage>
<div class="@_themeClass">
Content here
</div>
@code {
private string _themeClass = "light";
protected override async Task Consume(ThemeChangedMessage message, CancellationToken cancellationToken)
{
_themeClass = message.IsDark ? "dark" : "light";
StateHasChanged();
}
}Key features:
- Auto-subscribes in
OnInitialized - Auto-unsubscribes in
Dispose - Wraps
ConsumeinInvokeAsyncfor thread safety - Provides
CancellationTokenfor async operations
Exception Handling
Consumer exceptions are isolated — one consumer failing doesn't affect others:
public class RiskyConsumer : IConsumer<SomeMessage>
{
public async Task Consume(SomeMessage message)
{
throw new Exception("This won't stop other consumers");
}
}
public class ReliableConsumer : IConsumer<SomeMessage>
{
public async Task Consume(SomeMessage message)
{
// This still executes even if RiskyConsumer throws
await DoWork();
}
}Exceptions are logged but not propagated to the publisher.
Best Practices
Message Design
// Good: Immutable, descriptive
public class UserRegisteredMessage
{
public required string UserId { get; init; }
public required string Email { get; init; }
public DateTime RegisteredAt { get; init; } = DateTime.UtcNow;
}
// Avoid: Mutable, generic
public class Message
{
public object Data { get; set; }
}Consumer Responsibility
Keep consumers focused on a single responsibility:
// Good: Single responsibility
public class SendWelcomeEmailConsumer : IConsumer<UserRegisteredMessage> { ... }
public class CreateUserProfileConsumer : IConsumer<UserRegisteredMessage> { ... }
// Avoid: Multiple responsibilities
public class UserRegisteredConsumer : IConsumer<UserRegisteredMessage>
{
public async Task Consume(UserRegisteredMessage message)
{
await SendEmail();
await CreateProfile();
await NotifyAdmins();
}
}Idempotency
Design consumers to handle duplicate messages safely:
public class ProcessPaymentConsumer : IConsumer<PaymentMessage>
{
public async Task Consume(PaymentMessage message)
{
// Check if already processed
if (await _repository.PaymentExists(message.PaymentId))
return;
await _paymentService.Process(message);
}
}