Defender.Mongo.MessageBroker
8.0.0
See the version list below for details.
dotnet add package Defender.Mongo.MessageBroker --version 8.0.0
NuGet\Install-Package Defender.Mongo.MessageBroker -Version 8.0.0
<PackageReference Include="Defender.Mongo.MessageBroker" Version="8.0.0" />
paket add Defender.Mongo.MessageBroker --version 8.0.0
#r "nuget: Defender.Mongo.MessageBroker, 8.0.0"
// Install Defender.Mongo.MessageBroker as a Cake Addin
#addin nuget:?package=Defender.Mongo.MessageBroker&version=8.0.0
// Install Defender.Mongo.MessageBroker as a Cake Tool
#tool nuget:?package=Defender.Mongo.MessageBroker&version=8.0.0
Defender.Mongo.MessageBroker
A message broker with topics based on Mongo, aiming to be a free and simplified version of Kafka under MongoDB.
How to configure
- Add the
Defender.Mongo.MessageBroker
NuGet package. - Call
AddMongoMessageBrokerServices
to register all the necessary services and options. Pass anAction<MessageBrokerOptions>
as the first parameter to set the configuration.
services.AddMongoMessageBrokerServices(opt =>
{
configuration.GetSection(nameof(MessageBrokerOptions)).Bind(opt);
});
MessageBrokerOptions
is required and has four parameters.MongoDbConnectionString
- Connection string to your Mongo DBMongoDbDatabaseName
- Database nameMaxTopicDocuments
- The maximum number of messages allowed in a topic (default = 1000).MaxTopicByteSize
- The maximum size limit in bytes (default = 1000000, about 1 MB).
How to use
Let's review the basic entities the library provides:
BaseTopicMessage
ITopicMessage
IConsumer
IProducer
ITopicMessage and BaseTopicMessage
using Defender.Mongo.MessageBroker.Models.TopicMessage;
public class TextMessage : BaseTopicMessage
{
public TextMessage() { }
public TextMessage(string text) { Text = text; }
public string Text { get; set; }
}
All models you publish and receive must inherit from one of these. You don't need to worry about the fields.
IConsumer
public class BackgroundListener : BackgroundService
{
private readonly IConsumer _consumer;
public BackgroundListener(IConsumer consumer)
{
_consumer = consumer
.SetTopic("topic-name")
.SetMessageType(MessageType.ClassName);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _consumer.SubscribeAsync<TextMessage>((text) => Log.AddLog(text.Text), stoppingToken);
}
}
- Use
SetTopic
to specify the topic to subscribe to. By default, it is set to "default". - Use
SetMessageType
to indicate the type of messages to listen to (it will skip other types). Possible values:NoType
(subscribe to all),ClassName
(set the type the same as the class name of the message),Custom
(e.g.,.SetMessageType("custom-type"))
. By default, it'sNoType
. - To subscribe to the topic, use
SubscribeAsync
and pass a delegate with a parameter to specify what to do with the message after it is received.
IProducer
public class MessagingService
{
private readonly IProducer _producer;
public MessagingService(IProducer producer)
{
_producer = producer;
_producer.SetTopic(Topics.TextTopic).SetMessageType(MessageType.ClassName);
}
public async Task PublishTextAsync(string text)
{
var message = new TextMessage(text);
await _producer.PublishAsync(message);
}
}
Everything is the same, but to publish a message, use the PublishAsync
method.
Behavior details
- Currently, it starts subscribing to the topic from the moment the application starts.
- It will create an instant connection to your MongoDB cluster and run a cursor (waiting for new documents in the collection).
- If something happens to the connection, it will constantly try to reconnect.
- After the connection is restored, it processes all messages that are later than the last message processed.
- If you set
MaxTopicDocuments
orMaxTopicByteSize
too small, messages might be discarded and never be processed after a crash. - If you need to ensure that only one service instance receives a message, create an auxiliary Consumer service that will be subscribed to the queue and will redirect the message to your service.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net8.0
- Microsoft.Extensions.Options (>= 8.0.0)
- MongoDB.Driver (>= 2.23.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.