MicroPlumberd 1.0.14.57
See the version list below for details.
dotnet add package MicroPlumberd --version 1.0.14.57
NuGet\Install-Package MicroPlumberd -Version 1.0.14.57
<PackageReference Include="MicroPlumberd" Version="1.0.14.57" />
paket add MicroPlumberd --version 1.0.14.57
#r "nuget: MicroPlumberd, 1.0.14.57"
// Install MicroPlumberd as a Cake Addin #addin nuget:?package=MicroPlumberd&version=1.0.14.57 // Install MicroPlumberd as a Cake Tool #tool nuget:?package=MicroPlumberd&version=1.0.14.57
micro-plumberd
Micro library for EventStore, CQRS and EventSourcing Just eXtreamly simple.
Documentation can be found here: MicroPlumberd Documentation
Getting started
Install nugets:
# For your domain
dotnet add package MicroPlumberd
dotnet add package MicroPlumberd.SourceGenerators
If you'd like to use direct dotnet-dotnet communication to execute command-handlers install MicroPlumberd.DirectConnect
# For application-layer using EventStore as message-bus.
dotnet add package MicroPlumberd.Services
# For application-layer communicating (dotnet-2-dotnet) using GRPC:
dotnet add package MicroPlumberd.Services.Grpc.DirectConnect
# EXPERIMENTAL ProcessManager support can be found here:
dotnet add package MicroPlumberd.Services.ProcessManagers
Configure plumber
/// change to your connection-string.
string connectionString = $"esdb://admin:changeit@localhost:2113?tls=false&tlsVerifyCert=false";
var settings = EventStoreClientSettings.Create(connectionString);
var plumber = Plumber.Create(settings);
If you'd want to do it at service-level with DI:
/// change to your connection-string.
string connectionString = $"esdb://admin:changeit@localhost:2113?tls=false&tlsVerifyCert=false";
var settings = EventStoreClientSettings.Create(connectionString);
services.AddPlumberd(settings);
Aggregates
- Write an aggregate.
[Aggregate]
public partial class FooAggregate(Guid id) : AggregateBase<FooAggregate.FooState>(id)
{
internal new FooState State => base.State;
public record FooState { public string Name { get; set; } };
private static FooState Given(FooState state, FooCreated ev) => state with { Name = ev.Name };
private static FooState Given(FooState state, FooUpdated ev) => state with { Name =ev.Name };
public void Open(string msg) => AppendPendingChange(new FooCreated() { Name = msg });
public void Change(string msg) => AppendPendingChange(new FooUpdated() { Name = msg });
}
// And events:
public record FooCreated { public string? Name { get; set; } }
public record FooUpdated { public string? Name { get; set; } }
Comments:
- State is encapsulated in nested class FooState.
- Given methods, that are used when loading aggregate from the EventStoreDB are private and static. State is encouraged to be immutable.
- [Aggregate] attribute is used by SourceGenerator that will generate dispatching code and handy metadata.
- Consume an aggregate.
If you want to create a new aggregate and save it to EventStoreDB:
FooAggregate aggregate = FooAggregate.New(Guid.NewGuid());
aggregate.Open("Hello");
await plumber.SaveNew(aggregate);
If you want to load aggregate from EventStoreDB, change it and save back to EventStoreDB
var aggregate = await plumber.Get<FooAggregate>("YOUR_ID");
aggregate.Change("World");
await plumber.SaveChanges(aggregate);
Write a read-model/processor
- Read-Models
[EventHandler]
public partial class FooModel
{
private async Task Given(Metadata m, FooCreated ev)
{
// your code
}
private async Task Given(Metadata m, FooUpdated ev)
{
// your code
}
}
Comments:
- ReadModels have private async Given methods. Since they are async, you can invoke SQL here, or othere APIs to store your model.
- Metadata contains standard stuff (Created, CorrelationId, CausationId), but can be reconfigured.
var fooModel = new FooModel();
var sub= await plumber.SubscribeModel(fooModel);
// or if you want to persist progress of your subscription
var sub2= await plumber.SubscribeModelPersistently(fooModel);
With SubscribeModel you can subscribe from start, from certain moment or from the end of the stream.
- Processors
[EventHandler]
public partial class FooProcessor(IPlumber plumber)
{
private async Task Given(Metadata m, FooUpdated ev)
{
var agg = FooAggregate.New(Guid.NewGuid());
agg.Open(ev.Name + " new");
await plumber.SaveNew(agg);
}
}
Implementing a processor is technically the same as implementing a read-model, but inside the Given method you would typically invoke a command or execute an aggregate.
Features
Conventions
- SteamNameConvention - from aggregate type, and aggregate id
- EventNameConvention - from aggregate? instance and event instance
- MetadataConvention - to enrich event with metadata based on aggregate instance and event instance
- EventIdConvention - from aggregate instance and event instance
- OutputStreamModelConvention - for output stream name from model-type
- GroupNameModelConvention - for group name from model-type
Ultra development cycle for Read-Models (EF example).
Imagine this:
- You create a read-model that subscribes persistently.
- You subscribe it with plumber.
- You changed something in the event and want to see the new model.
- Instead of re-creating old read-model, you can easily create new one. Just change MODEL_VER to reflect new version.
Please note that Sql schema create/drop auto-generation script will be covered in a different article. (For now we leave it for developers.)
Comments:
- By creating a new read-model you can always compare the differences with the previous one.
- You can leverage canary-deployment strategy and have 2 versions of your system running in parallel.
[OutputStream(FooModel.MODEL_NAME)]
[EventHandler]
public partial class FooModel : DbContext
{
internal const string MODEL_VER = "_v1";
internal const string MODEL_NAME = $"FooModel{MODEL_VER}";
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder
.Entity<FooEntity>()
.ToTable($"FooEntities{MODEL_VER}");
}
private async Task Given(Metadata m, FooCreated ev)
{
// your code
}
private async Task Given(Metadata m, FooUpdated ev)
{
// your code
}
}
Subscription Sets - Models ultra-composition
- You can easily create a stream that joins events together by event-type, and subscribe many read-models at once. Here it is named 'MasterStream', which is created out of events used to create DimentionLookupModel and MasterModel.
- In this way, you can easily manage the composition and decoupling of read-models. You can nicely composite your read-models. And if you don't wish to decouple read-models, you can reuse your existing one.
/// Given simple models, where master-model has foreign-key used to obtain value from dimentionLookupModel
var dimentionTable = new DimentionLookupModel();
var factTable = new MasterModel(dimentionTable);
await plumber.SubscribeSet()
.With(dimentionTable)
.With(factTable)
.SubscribeAsync("MasterStream", FromStream.Start);
EventStoreDB as message-bus
If you want to start as quickly as possible, you can start with EventStoreDB as command-message-bus.
services.AddPlumberd()
.AddCommandHandler<FooCommandHandler>()
// on the client side:
ICommandBus bus; // from DI
bus.SendAsync(Guid.NewGuid(), new CreateFoo() { Name = "Hello" });
If you are running many replicas of your service, you need to switch command-execution to persistent mode:
services.AddPlumberd(configure: c => c.Conventions.ServicesConventions().AreHandlersExecutedPersistently = () => true)
.AddCommandHandler<FooCommandHandler>()
This means, that once your microservice subscribes to commands, it will execute all. So if your service is down, and commands are saved, once your service is up, they will be executed. To skip old commands, you can configure a filter.
services.AddPlumberd(configure: c => {
c.Conventions.ServicesConventions().AreHandlersExecutedPersistently = () => true;
c.Conventions.ServicesConventions().CommandHandlerSkipFilter = (m,ev) => DateTimeOffset.Now.Substract(m.Created()) > TimeSpan.FromSeconds(60);
})
.AddCommandHandler<FooCommandHandler>()
GRPC Direct communication
If you prefer direct communication (like REST-API, but without the hassle for contract generation/etc.) you can use direct communication where client invokes command handle using grpc. Command is not stored in EventStore.
/// Let's configure server:
services.AddCommandHandler<FooCommandHandler>().AddServerDirectConnect();
/// Add mapping to direct-connect service
app.MapDirectConnect();
Here is an example of a command handler code:
[CommandHandler]
public partial class FooCommandHandler(IPlumber plumber)
{
[ThrowsFaultException<BusinessFault>]
public async Task Handle(Guid id, CreateFoo cmd)
{
if (cmd.Name == "error")
throw new BusinessFaultException("Foo");
var agg = FooAggregate.New(id);
agg.Open(cmd.Name);
await plumber.SaveNew(agg);
}
[ThrowsFaultException<BusinessFault>]
public async Task<HandlerOperationStatus> Handle(Guid id, ChangeFoo cmd)
{
if (cmd.Name == "error")
throw new BusinessFaultException("Foo");
var agg = await plumber.Get<FooAggregate>(id);
agg.Change(cmd.Name);
await plumber.SaveChanges(agg);
return HandlerOperationStatus.Ok();
}
}
And how on the client side:
service.AddClientDirectConnect().AddCommandInvokers();
// And invocation
var clientPool = sp.GetRequiredService<IRequestInvokerPool>();
var invoker = clientPool.Get("YOUR_GRPC_URL");
await invoker.Execute(Guid.NewId(), new CreateFoo(){});
EXPERIMENTAL Process-Manager
Given diagram:
The code of Order Process Manager looks like this:
[ProcessManager]
public class OrderProcessManager(IPlumberd plumberd)
{
public async Task<ICommandRequest<MakeReservation>> StartWhen(Metadata m, OrderCreated e)
{
return CommandRequest.Create(Guid.NewId(), new MakeReservation());
}
public async Task<ICommandRequest<MakePayment>> When(Metadata m, SeatsReserved e)
{
return CommandRequest.Create(Guid.NewId(), new MakePayment());
}
public async Task When(Metadata m, PaymentAccepted e)
{
var order = await plumberd.Get<Order>(this.Id);
order.Confirm();
await plumberd.SaveChanges(order);
}
// Optional
private async Task Given(Metadata m, OrderCreated v){
// this will be used to rehydrate state of process-manager
// So that when(SeatsReserved) you can adjust the response.
}
// Optional 2
private async Task Given(Metadata m, CommandEnqueued<MakeReservation> e){
// same here.
}
}
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net8.0
- EventStore.Client.Grpc (>= 23.2.1)
- EventStore.Client.Grpc.PersistentSubscriptions (>= 23.2.1)
- EventStore.Client.Grpc.ProjectionManagement (>= 23.2.1)
- EventStore.Client.Grpc.Streams (>= 23.2.1)
NuGet packages (7)
Showing the top 5 NuGet packages that depend on MicroPlumberd:
Package | Downloads |
---|---|
MicroPlumberd.CommandBus.Abstractions
Package Description |
|
MicroPlumberd.Services
CQRS/EventSourcing made eXtreamly simple. Application-Layer: Command-Handlers, Command-Bus |
|
MicroPlumberd.ProcessManager.Abstractions
Package Description |
|
MicroPlumberd.Services.Grpc.DirectConnect
Package Description |
|
MicroPlumberd.Encryption
SecretObject that uses certificates for message confidentiality when using with EventStore as a CommandBus. |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
1.0.77.125 | 71 | 11/26/2024 |
1.0.76.125 | 137 | 10/12/2024 |
1.0.75.125 | 140 | 10/12/2024 |
1.0.74.125 | 126 | 10/12/2024 |
1.0.73.124 | 125 | 10/12/2024 |
1.0.72.122 | 240 | 6/9/2024 |
1.0.71.121 | 153 | 6/7/2024 |
1.0.70.121 | 164 | 6/6/2024 |
1.0.69.119 | 165 | 5/15/2024 |
1.0.68.118 | 148 | 5/15/2024 |
1.0.67.118 | 150 | 5/15/2024 |
1.0.66.118 | 141 | 5/15/2024 |
1.0.65.117 | 141 | 5/15/2024 |
1.0.64.116 | 150 | 5/14/2024 |
1.0.63.115 | 132 | 5/11/2024 |
1.0.62.114 | 135 | 5/11/2024 |
1.0.61.113 | 126 | 5/11/2024 |
1.0.60.112 | 154 | 5/8/2024 |
1.0.58.112 | 140 | 5/8/2024 |
1.0.57.111 | 238 | 4/26/2024 |
1.0.55.111 | 170 | 4/23/2024 |
1.0.54.110 | 160 | 4/23/2024 |
1.0.53.110 | 160 | 4/23/2024 |
1.0.51.109 | 155 | 4/22/2024 |
1.0.50.109 | 146 | 4/22/2024 |
1.0.49.108 | 132 | 4/22/2024 |
1.0.48.108 | 155 | 4/20/2024 |
1.0.46.107 | 149 | 4/20/2024 |
1.0.45.106 | 160 | 4/20/2024 |
1.0.44.106 | 150 | 4/20/2024 |
1.0.43.100 | 150 | 4/17/2024 |
1.0.41.97 | 150 | 4/11/2024 |
1.0.40.95 | 152 | 4/11/2024 |
1.0.39.94 | 162 | 4/11/2024 |
1.0.37.94 | 137 | 4/9/2024 |
1.0.36.93 | 134 | 4/9/2024 |
1.0.35.90 | 165 | 4/8/2024 |
1.0.34.87 | 144 | 4/8/2024 |
1.0.29.85 | 153 | 4/7/2024 |
1.0.27.84 | 115 | 4/7/2024 |
1.0.26.83 | 158 | 4/7/2024 |
1.0.20.72 | 158 | 3/24/2024 |
1.0.19.71 | 164 | 3/24/2024 |
1.0.17.71 | 161 | 3/24/2024 |
1.0.16.71 | 153 | 3/23/2024 |
1.0.15.70 | 139 | 3/23/2024 |
1.0.14.57 | 170 | 3/21/2024 |
1.0.13.56 | 146 | 3/21/2024 |
1.0.11.54 | 156 | 3/21/2024 |
1.0.10.40 | 139 | 3/17/2024 |
1.0.9.40 | 131 | 3/17/2024 |
1.0.8.32 | 140 | 3/13/2024 |
1.0.5.18 | 135 | 3/12/2024 |
1.0.0 | 119 | 3/10/2024 |
0.0.4.18 | 113 | 3/12/2024 |