Marille 0.5.1
See the version list below for details.
dotnet add package Marille --version 0.5.1
NuGet\Install-Package Marille -Version 0.5.1
<PackageReference Include="Marille" Version="0.5.1" />
paket add Marille --version 0.5.1
#r "nuget: Marille, 0.5.1"
// Install Marille as a Cake Addin #addin nuget:?package=Marille&version=0.5.1 // Install Marille as a Cake Tool #tool nuget:?package=Marille&version=0.5.1
marille
Marille is thin layer on top of System.Threading.Channels that provides a in-process publisher/subscriber hub.
The aim of the library is to simplify the management of several Channels<T>
within an application. The general idea
is that users can create different "topics" that will later be divided in subtopics based on the type of
messages that they distribute. The normal pattern for usage is as follows:
- Create a topic, which is a pair of a topic name and a type of messages.
- Add 1 or more workers which will be used to consume events.
- Use the Hub to publish events to a topic.
- Workers will pick the messages and consume them based on the topic strategy that was chosen.
You can think about Marille as an in-process pub/sub similar to a distributed queue with the difference that the constraints are much simpler. Working within the same process makes dealing with failure much easier.
The library does not impose any thread usage and it fully relies on the default TaskScheduler used in the application, that is, there are no Task.Run calls to be found in the source of the library.
Examples
Here are some examples of the API usage:
Single Publisher/Worker
In this pattern we will create a single topic with a unique consumer. The Hub will only send messages to the registered consumer.
- Declare your consumer implementation:
// define a message type to consumer
public record struct MyMessage (string Id, string Operation, int Value);
// create a worker type with the minimum implementation
class MyWorker : IWorker<MyMessage> {
// Workers will be scheduled by the default TaskScheduler from dotnet
public Task ConsumeAsync (MyMessage message, CancellationToken cancellationToken = default)
=> Task.FromResult (Completion.TrySetResult(true));
}
// creeate an error worker to handle exceptions from workers
class ErrorWorker : IErrorWorker<MyMessage> {
public Task ConsumeAsync (MyMessage message, Exception exception, CancellationToken cancellationToken = default)
=> Task.FromResult (Completion.TrySetResult(true));
}
- Setup the topic via a new Hub:
private Hub _hub;
public Task CreateChannels () {
_hub = new Hub ();
// configuration to be used to create the topic
var configuration = new() { Mode = ChannelDeliveryMode.AtMostOnce };
var topic = "topic";
var worker = new MyWorker ();
var errorWorker = new ErrorWorker ();
// workers can be either added during the channel creation or after the fact
// with the TryRegister method.
return _hub.CreateAsync (topic, configuration, errorWorer, worker);
}
Lambdas can also be registered as workers:
Func<MyMessage, CancellationToken, Task> worker = (_, _) => Task.FromResult (true);
return _hub.CreateAsync (topic, configuration, worker);
- Publish messages via the hub.
public Task ProduceMessage (string id, string operation, int value) =>
_hub.Publish (topic, new MyMessage (di, operation, value));
- Cancel and wait for events to be completed
Becasue the entire library puspose of the library is to be able to process events in a multithreaded manner, the main thread has to wait until the events are processed. That can be done by waiting on a Channel to be closed:
// close a specific topic
await _hub.CloseAsync<MyMessage> (topic);
// close all topcis
await _hub.CloseAsync<MyMessage> (topic1);
- Error handling
The library provides a way to handle exceptions that are thrown by the workers. The error worker is called whenever an exception is thrown by a worker that is consuming a topic. It is up to the implementation to decide if the message should be retried or not.
If we do not want to retry a message a worker can throw and exception, such exception will be added to a queue that will be consumed by the error worker that was used when the topic was created. Each topic has its own error queue, that means that the error worker will only consume errors from the topic that it was created for.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net8.0
- No dependencies.
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
0.5.8 | 79 | 10/7/2024 |
0.5.7 | 94 | 9/3/2024 |
0.5.6 | 97 | 8/29/2024 |
0.5.5 | 94 | 8/28/2024 |
0.5.3 | 131 | 8/25/2024 |
0.5.2 | 131 | 8/22/2024 |
0.5.1 | 119 | 8/22/2024 |
0.5.0 | 120 | 8/22/2024 |
0.4.3 | 119 | 8/21/2024 |
0.4.2 | 109 | 8/19/2024 |
0.4.1 | 109 | 8/16/2024 |
0.3.8 | 135 | 6/12/2024 |
0.3.7 | 95 | 6/12/2024 |
0.2.0 | 118 | 5/28/2024 |
0.0.1 | 110 | 5/26/2024 |