Extensibility
The MessageBus supports extensibility through pipes and filters, allowing cross-cutting concerns like logging, validation, and access control.
Message Flow
PublishAsync(message)
│
▼
┌───────────────────┐
│ Global Pipes │ ◄── IMessagePipe (all messages)
│ (logging, etc.) │ Return false to block
└───────┬───────────┘
│
▼
┌───────────────────┐
│ Typed Pipes │ ◄── IMessagePipe<TMessage> (specific type)
│ (validation) │ Return false to block
└───────┬───────────┘
│
▼
┌───────────────────┐
│ Get Consumers │ Registered + subscribed consumers
└───────┬───────────┘
│
▼
┌───────────────────┐
│ Consumer Filters │ ◄── IConsumerFilter (per-consumer)
│ (access control) │ Return false to skip consumer
└───────┬───────────┘
│
▼
┌───────────────────┐
│ Consumer.Consume │ Parallel execution
└───────────────────┘Message Pipes
Pipes process messages before they reach consumers. They can:
- Log or monitor messages
- Validate message content
- Transform messages
- Block messages entirely
IMessagePipe (Global)
Processes all messages regardless of type:
public interface IMessagePipe
{
Task<bool> ProcessAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
where TMessage : class;
}Example: Logging Pipe
public class MessageLoggingPipe : IMessagePipe
{
private readonly ILogger<MessageLoggingPipe> _logger;
public MessageLoggingPipe(ILogger<MessageLoggingPipe> logger)
{
_logger = logger;
}
public Task<bool> ProcessAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
where TMessage : class
{
_logger.LogInformation(
"MessageBus: Publishing {MessageType} - {Message}",
typeof(TMessage).Name,
message);
return Task.FromResult(true); // Continue processing
}
}Registration:
// Generic registration
services.AddMessagePipe<MessageLoggingPipe>();
// Or use the built-in convenience method
services.AddMessageLogging();IMessagePipe<TMessage> (Typed)
Processes only messages of a specific type:
public interface IMessagePipe<TMessage> where TMessage : class
{
Task<bool> ProcessAsync(TMessage message, CancellationToken cancellationToken);
}Example: Order Validation Pipe
public class OrderValidationPipe : IMessagePipe<CreateOrderMessage>
{
public Task<bool> ProcessAsync(CreateOrderMessage message, CancellationToken cancellationToken)
{
if (message.Items.Count == 0)
{
// Block empty orders from being processed
return Task.FromResult(false);
}
if (message.Total <= 0)
{
return Task.FromResult(false);
}
return Task.FromResult(true);
}
}Registration:
services.AddMessagePipe<OrderValidationPipe, CreateOrderMessage>();Blocking Messages
Return false from ProcessAsync to prevent the message from reaching consumers:
public class RateLimitingPipe : IMessagePipe
{
private readonly Dictionary<Type, DateTime> _lastPublish = new();
private readonly TimeSpan _minInterval = TimeSpan.FromMilliseconds(100);
public Task<bool> ProcessAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
where TMessage : class
{
var messageType = typeof(TMessage);
if (_lastPublish.TryGetValue(messageType, out var last) &&
DateTime.UtcNow - last < _minInterval)
{
return Task.FromResult(false); // Block rapid-fire messages
}
_lastPublish[messageType] = DateTime.UtcNow;
return Task.FromResult(true);
}
}Consumer Filters
Filters determine which consumers receive a message. Unlike pipes (which affect all consumers), filters are evaluated per-consumer.
IConsumerFilter
public interface IConsumerFilter
{
bool ShouldInvoke<TMessage>(IConsumer<TMessage> consumer, TMessage message)
where TMessage : class;
}All registered filters must return true for a consumer to receive the message.
Example: Plugin Filter
The framework includes DisabledPluginConsumerFilter which prevents consumers from disabled plugins from receiving messages:
public class DisabledPluginConsumerFilter : IConsumerFilter
{
private readonly PluginState _pluginState;
public DisabledPluginConsumerFilter(PluginState pluginState)
{
_pluginState = pluginState;
}
public bool ShouldInvoke<TMessage>(IConsumer<TMessage> consumer, TMessage message)
where TMessage : class
{
var consumerAssembly = consumer.GetType().Assembly;
// Find if consumer belongs to a plugin
var plugin = _pluginState.Plugins
.FirstOrDefault(p => p.Assembly == consumerAssembly);
// Non-plugin consumers always receive messages
if (plugin is null)
return true;
// Plugin consumers only receive if enabled
return plugin.IsEnabled && _pluginState.PluginsActive;
}
}Registration:
// Generic registration
services.AddConsumerFilter<DisabledPluginConsumerFilter>();
// Or use the convenience method
services.AddPluginConsumerFilter();Built-in Extensions
Message Logging
services.AddMessageLogging();Logs all messages at Information level. Useful for debugging message flow.
Plugin Consumer Filter
services.AddPluginConsumerFilter();Prevents consumers from disabled plugins from receiving messages. Requires AddPluginFramework() to be called first.
Registration Methods Summary
| Method | Purpose |
|---|---|
AddMessagePipe<TPipe>() | Register global pipe |
AddMessagePipe<TPipe, TMessage>() | Register typed pipe for specific message |
AddConsumerFilter<TFilter>() | Register consumer filter |
AddMessageLogging() | Add built-in logging pipe |
AddPluginConsumerFilter() | Add built-in plugin filter |
Order of Execution
- Global pipes execute in registration order
- Typed pipes execute after global pipes
- Consumer filters evaluate for each consumer
- Consumers execute in parallel (order not guaranteed)
Best Practices
Keep Pipes Fast
Pipes run synchronously in the message path. Avoid slow operations:
// Good: Fast, synchronous check
public Task<bool> ProcessAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
return Task.FromResult(IsValid(message));
}
// Avoid: Slow database call in pipe
public async Task<bool> ProcessAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
var isAllowed = await _database.CheckPermissionAsync(message); // Slow!
return isAllowed;
}Use Filters for Access Control
Filters are the right place for consumer-specific access control:
// Good: Filter checks if consumer should receive message
public bool ShouldInvoke<TMessage>(IConsumer<TMessage> consumer, TMessage message)
{
return HasPermission(consumer.GetType());
}Combine Pipes and Filters
Use pipes for message-level concerns, filters for consumer-level:
// Setup
services.AddMessagePipe<ValidationPipe>(); // Message validation
services.AddMessagePipe<AuditLoggingPipe>(); // Audit trail
services.AddConsumerFilter<PermissionFilter>(); // Access control
services.AddConsumerFilter<DisabledPluginConsumerFilter>(); // Plugin lifecycle