MicroPlumberd 1.0.11.54
See the version list below for details.
dotnet add package MicroPlumberd --version 1.0.11.54
NuGet\Install-Package MicroPlumberd -Version 1.0.11.54
<PackageReference Include="MicroPlumberd" Version="1.0.11.54" />
paket add MicroPlumberd --version 1.0.11.54
#r "nuget: MicroPlumberd, 1.0.11.54"
// Install MicroPlumberd as a Cake Addin #addin nuget:?package=MicroPlumberd&version=1.0.11.54 // Install MicroPlumberd as a Cake Tool #tool nuget:?package=MicroPlumberd&version=1.0.11.54
micro-plumberd
Micro library for EventStore, CQRS and EventSourcing Just eXtreamly simple.
Documentation can be found here: MicroPlumberd Documentation
Getting started
Install nugets:
# For your domain
dotnet add package MicroPlumberd
dotnet add package MicroPlumberd.SourceGenerators
If you'd like to use direct dotnet-dotnet communication to execute command-handlers install MicroPlumberd.DirectConnect
# For application-layer using EventStore as message-bus.
dotnet add package MicroPlumberd.Services
# For application-layer communicating (dotnet-2-dotnet) using GRPC:
dotnet add package MicroPlumberd.Services.Grpc.DirectConnect
Configure plumber
/// change to your connection-string.
string connectionString = $"esdb://admin:changeit@localhost:2113?tls=false&tlsVerifyCert=false";
var settings = EventStoreClientSettings.Create(connectionString);
var plumber = Plumber.Create(settings);
Aggregates
- Write an aggregate.
[Aggregate]
public partial class FooAggregate(Guid id) : AggregateBase<FooAggregate.FooState>(id)
{
internal new FooState State => base.State;
public record FooState { public string Name { get; set; } };
private static FooState Given(FooState state, FooCreated ev) => state with { Name = ev.Name };
private static FooState Given(FooState state, FooUpdated ev) => state with { Name =ev.Name };
public void Open(string msg) => AppendPendingChange(new FooCreated() { Name = msg });
public void Change(string msg) => AppendPendingChange(new FooUpdated() { Name = msg });
}
// And events:
public record FooCreated { public string? Name { get; set; } }
public record FooUpdated { public string? Name { get; set; } }
Comments:
- State is encapsulated in nested class FooState.
- Given methods, that are used when loading aggregate from the EventStoreDB are private and static. State is encouraged to be immutable.
- [Aggregate] attribute is used by SourceGenerator that will generate dispatching code and handy metadata.
- Consume an aggregate.
If you want to create a new aggregate and save it to EventStoreDB:
FooAggregate aggregate = FooAggregate.New(Guid.NewGuid());
aggregate.Open("Hello");
await plumber.SaveNew(aggregate);
If you want to load aggregate from EventStoreDB, change it and save back to EventStoreDB
var aggregate = await plumber.Get<FooAggregate>("YOUR_ID");
aggregate.Change("World");
await plumber.SaveChanges(aggregate);
Write a read-model/processor
- Read-Models
[EventHandler]
public partial class FooModel
{
private async Task Given(Metadata m, FooCreated ev)
{
// your code
}
private async Task Given(Metadata m, FooUpdated ev)
{
// your code
}
}
Comments:
- ReadModels have private async Given methods. Since they are async, you can invoke SQL here, or othere APIs to store your model.
- Metadata contains standard stuff (Created, CorrelationId, CausationId), but can be reconfigured.
var fooModel = new FooModel();
var sub= await plumber.SubscribeModel(fooModel);
// or if you want to persist progress of your subscription
var sub2= await plumber.SubscribeModelPersistently(fooModel);
With SubscribeModel you can subscribe from start, from certain moment or from the end of the stream.
- Processors
[EventHandler]
public partial class FooProcessor(IPlumber plumber)
{
private async Task Given(Metadata m, FooUpdated ev)
{
var agg = FooAggregate.New(Guid.NewGuid());
agg.Open(ev.Name + " new");
await plumber.SaveNew(agg);
}
}
Implementing a processor is technically the same as implementing a read-model, but inside the Given method you would typically invoke a command or execute an aggregate.
Features
Conventions
- SteamNameConvention - from aggregate type, and aggregate id
- EventNameConvention - from aggregate? instance and event instance
- MetadataConvention - to enrich event with metadata based on aggregate instance and event instance
- EventIdConvention - from aggregate instance and event instance
- OutputStreamModelConvention - for output stream name from model-type
- GroupNameModelConvention - for group name from model-type
Ultra development cycle for Read-Models (EF example).
Imagine this:
- You create a read-model that subscribes persistently.
- You subscribe it with plumber.
- You changed something in the event and want to see the new model.
- Instead of re-creating old read-model, you can easily create new one. Just change MODEL_VER to reflect new version.
Please note that Sql schema create/drop auto-generation script will be covered in a different article. (For now we leave it for developers.)
Comments:
- By creating a new read-model you can always compare the differences with the previous one.
- You can leverage canary-deployment strategy and have 2 versions of your system running in parallel.
[OutputStream(FooModel.MODEL_NAME)]
[EventHandler]
public partial class FooModel : DbContext
{
internal const string MODEL_VER = "_v1";
internal const string MODEL_NAME = $"FooModel{MODEL_VER}";
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder
.Entity<FooEntity>()
.ToTable($"FooEntities{MODEL_VER}");
}
private async Task Given(Metadata m, FooCreated ev)
{
// your code
}
private async Task Given(Metadata m, FooUpdated ev)
{
// your code
}
}
Subscription Sets - Models ultra-composition
- You can easily create a stream that joins events together by event-type, and subscribe many read-models at once. Here it is named 'MasterStream', which is created out of events used to create DimentionLookupModel and MasterModel.
- In this way, you can easily manage the composition and decoupling of read-models. You can nicely composite your read-models. And if you don't wish to decouple read-models, you can reuse your existing one.
/// Given simple models, where master-model has foreign-key used to obtain value from dimentionLookupModel
var dimentionTable = new DimentionLookupModel();
var factTable = new MasterModel(dimentionTable);
await plumber.SubscribeSet()
.With(dimentionTable)
.With(factTable)
.SubscribeAsync("MasterStream", FromStream.Start);
EventStoreDB as message-bus
If you want to start as quickly as possible, you can start with EventStoreDB as command-message-bus.
services.AddPlumberd()
.AddCommandHandler<FooCommandHandler>()
// on the client side:
ICommandBus bus; // from DI
bus.SendAsync(Guid.NewGuid(), new CreateFoo() { Name = "Hello" });
GRPC Direct communication
If you prefer direct communication (like REST-API, but without the hassle for contract generation/etc.) you can use direct communication where client invokes command handle using grpc. Command is not stored in EventStore.
/// Let's configure server:
services.AddCommandHandler<FooCommandHandler>().AddServerDirectConnect();
/// Add mapping to direct-connect service
app.MapDirectConnect();
Here is an example of a command handler code:
[CommandHandler]
public partial class FooCommandHandler(IPlumber plumber)
{
[ThrowsFaultException<BusinessFault>]
public async Task Handle(Guid id, CreateFoo cmd)
{
if (cmd.Name == "error")
throw new BusinessFaultException("Foo");
var agg = FooAggregate.New(id);
agg.Open(cmd.Name);
await plumber.SaveNew(agg);
}
[ThrowsFaultException<BusinessFault>]
public async Task<HandlerOperationStatus> Handle(Guid id, ChangeFoo cmd)
{
if (cmd.Name == "error")
throw new BusinessFaultException("Foo");
var agg = await plumber.Get<FooAggregate>(id);
agg.Change(cmd.Name);
await plumber.SaveChanges(agg);
return HandlerOperationStatus.Ok();
}
}
And how on the client side:
service.AddClientDirectConnect().AddCommandInvokers();
// And invocation
var clientPool = sp.GetRequiredService<IRequestInvokerPool>();
var invoker = clientPool.Get("YOUR_GRPC_URL");
await invoker.Execute(Guid.NewId(), new CreateFoo(){});
Aspects
You can easily inject aspects through decorator pattern.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net8.0
- EventStore.Client.Grpc (>= 23.2.1)
- EventStore.Client.Grpc.PersistentSubscriptions (>= 23.2.1)
- EventStore.Client.Grpc.ProjectionManagement (>= 23.2.1)
- EventStore.Client.Grpc.Streams (>= 23.2.1)
NuGet packages (7)
Showing the top 5 NuGet packages that depend on MicroPlumberd:
Package | Downloads |
---|---|
MicroPlumberd.CommandBus.Abstractions
Package Description |
|
MicroPlumberd.Services
CQRS/EventSourcing made eXtreamly simple. Application-Layer: Command-Handlers, Command-Bus |
|
MicroPlumberd.ProcessManager.Abstractions
Package Description |
|
MicroPlumberd.Services.Grpc.DirectConnect
Package Description |
|
MicroPlumberd.Encryption
SecretObject that uses certificates for message confidentiality when using with EventStore as a CommandBus. |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
1.0.77.125 | 71 | 11/26/2024 |
1.0.76.125 | 137 | 10/12/2024 |
1.0.75.125 | 140 | 10/12/2024 |
1.0.74.125 | 126 | 10/12/2024 |
1.0.73.124 | 125 | 10/12/2024 |
1.0.72.122 | 240 | 6/9/2024 |
1.0.71.121 | 153 | 6/7/2024 |
1.0.70.121 | 164 | 6/6/2024 |
1.0.69.119 | 165 | 5/15/2024 |
1.0.68.118 | 148 | 5/15/2024 |
1.0.67.118 | 150 | 5/15/2024 |
1.0.66.118 | 141 | 5/15/2024 |
1.0.65.117 | 141 | 5/15/2024 |
1.0.64.116 | 150 | 5/14/2024 |
1.0.63.115 | 132 | 5/11/2024 |
1.0.62.114 | 135 | 5/11/2024 |
1.0.61.113 | 126 | 5/11/2024 |
1.0.60.112 | 154 | 5/8/2024 |
1.0.58.112 | 140 | 5/8/2024 |
1.0.57.111 | 238 | 4/26/2024 |
1.0.55.111 | 170 | 4/23/2024 |
1.0.54.110 | 160 | 4/23/2024 |
1.0.53.110 | 160 | 4/23/2024 |
1.0.51.109 | 155 | 4/22/2024 |
1.0.50.109 | 146 | 4/22/2024 |
1.0.49.108 | 132 | 4/22/2024 |
1.0.48.108 | 155 | 4/20/2024 |
1.0.46.107 | 149 | 4/20/2024 |
1.0.45.106 | 160 | 4/20/2024 |
1.0.44.106 | 150 | 4/20/2024 |
1.0.43.100 | 150 | 4/17/2024 |
1.0.41.97 | 150 | 4/11/2024 |
1.0.40.95 | 152 | 4/11/2024 |
1.0.39.94 | 162 | 4/11/2024 |
1.0.37.94 | 137 | 4/9/2024 |
1.0.36.93 | 134 | 4/9/2024 |
1.0.35.90 | 165 | 4/8/2024 |
1.0.34.87 | 144 | 4/8/2024 |
1.0.29.85 | 153 | 4/7/2024 |
1.0.27.84 | 115 | 4/7/2024 |
1.0.26.83 | 158 | 4/7/2024 |
1.0.20.72 | 158 | 3/24/2024 |
1.0.19.71 | 164 | 3/24/2024 |
1.0.17.71 | 161 | 3/24/2024 |
1.0.16.71 | 153 | 3/23/2024 |
1.0.15.70 | 139 | 3/23/2024 |
1.0.14.57 | 170 | 3/21/2024 |
1.0.13.56 | 146 | 3/21/2024 |
1.0.11.54 | 156 | 3/21/2024 |
1.0.10.40 | 139 | 3/17/2024 |
1.0.9.40 | 131 | 3/17/2024 |
1.0.8.32 | 140 | 3/13/2024 |
1.0.5.18 | 135 | 3/12/2024 |
1.0.0 | 119 | 3/10/2024 |
0.0.4.18 | 113 | 3/12/2024 |