IglooSoftware.Sdk.EventSourcing
0.0.103
dotnet add package IglooSoftware.Sdk.EventSourcing --version 0.0.103
NuGet\Install-Package IglooSoftware.Sdk.EventSourcing -Version 0.0.103
<PackageReference Include="IglooSoftware.Sdk.EventSourcing" Version="0.0.103" />
paket add IglooSoftware.Sdk.EventSourcing --version 0.0.103
#r "nuget: IglooSoftware.Sdk.EventSourcing, 0.0.103"
// Install IglooSoftware.Sdk.EventSourcing as a Cake Addin #addin nuget:?package=IglooSoftware.Sdk.EventSourcing&version=0.0.103 // Install IglooSoftware.Sdk.EventSourcing as a Cake Tool #tool nuget:?package=IglooSoftware.Sdk.EventSourcing&version=0.0.103
IglooSoftware.Sdk.EventSourcing
Event Sourcing for Azure!
IglooSoftware.Sdk.EventSourcing
is SDK for building event sourced applications on Azure.
It follows the following principles:
- Event Sourcing - All changes to application state are stored as a sequence of events.
- CQRS - The application state is separated into two models, one for reading and one for writing.
- Domain Driven Design - The application is built around domain objects, which encapsulate both state and behaviour.
- Eventual Consistency - The application state is eventually consistent, meaning that the read model is not updated immediately after a write.
Table of Contents
- Installation
- Motivation
- Main Concepts
- Azure Resources
- IglooSoftware.Sdk.EventSourcing
- Unit Testing
- Event Storming
Installation
NuGet Package
IglooSoftware.Sdk.EventSourcing is available on NuGet.
> dotnet add package IglooSoftware.Sdk.EventSourcing
Service Setup
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddEventSourcing(EventSourcingConfiguration);
}
}
Event Sourcing Configuration
Here is default configuration for IglooSoftware.Sdk.EventSourcing
:
"AzureStorageTableName": "eventStorageFor{specificService}",
"AzureStorageCommitTableName": "eventStorageCommits",
"AzureStorageDeadLetterTableName": "eventStorageDeadLetter",
"AzureStorageContainerName": "event-storage-sequences",
"AzureStorageSnapshotsName": "event-storage-snapshots",
"AzureServiceBusTopicName": "event-sourcing-topic",
"AzureServiceBusSagaCommandTopicName": "saga-command-topic"
It consists of the following properties:
AzureStorageTableName
- The name of the Azure Table Storage table used to store events. All events are referenced to an Aggregate using thePartitionKey
property. TheRowKey
property is a unique identifier for the event and it's sequence number used for ordering.AzureStorageCommitTableName
- The name of the Azure Table Storage table used to store event commits. It contains two properties,PartitionKey
andRowKey
, which are the same as thePartitionKey
andRowKey
properties of the event. After sending an event to Azure Service Bus, it's removed from commit table.AzureStorageDeadLetterTableName
- The name of the Azure Table Storage table used to store dead letter events. It contains needed properties to retry dead letter events andError
property, which contains the error message. If an Aggregate is dead lettered, all further events for that Aggregate will be dead lettered as well. After fixing the issue, and retrying first dead letter event from Aggregate stream, all other dead letter events will be retried as well in the correct order.AzureStorageContainerName
- The name of the Azure Blob Storage container used to store event sequence numbers. It contains one file for each Aggregate, which contains the current sequence number for that Aggregate. It's used to check if cached Aggregate is up to date in specific scenarios in optimal way.AzureStorageSnapshotsName
- The name of the Azure Blob Storage container used to store event snapshots. It contains one file for each Aggregate, which contains the snapshot of the Aggregate state as JSON structure. It's used to restore Aggregate state from snapshot instead of replaying all events from the beginning.AzureServiceBusTopicName
- The name of the Azure Service Bus topic used to publish events. It contains one subscription for each Event Processor, which is used to receive events for that Event Processor.AzureServiceBusSagaCommandTopicName
- The name of the Azure Service Bus topic used to publish commands from specific Saga to other services. It contains one subscription for each Command Handler, which is used to receive commands for that Command Handler.
Do not connect using default configuration from local.settings.json
from local environment,
because it will affect dev environment and make it inconsistent.
Azure Resources Setup
Each service which uses IglooSoftware.Sdk.EventSourcing
requires the following job running on service startup,
which creates or updates the required Azure resources for event sourcing.
[assembly: WebJobsStartup(typeof(InitializationService))]
namespace Igloo.UserService.Init
{
public class InitializationService : IWebJobsStartup
{
public void Configure(IWebJobsBuilder builder)
{
// Register the AzureResources extension
builder.AddExtension<AzureResourcesConfigProvider>();
}
}
// AzureResources extension
[Extension("AzureResources")]
internal class AzureResourcesConfigProvider : IExtensionConfigProvider
{
private readonly IServiceScopeFactory _scopeFactory;
public AzureResourcesConfigProvider(IServiceScopeFactory scopeFactory)
{
_scopeFactory = scopeFactory;
}
public void Initialize(ExtensionConfigContext context)
{
using IServiceScope scope = _scopeFactory.CreateScope();
EventSourcingConfiguration eventSourcingConfiguration = scope.ServiceProvider.GetService<EventSourcingConfiguration>();
ILogger<AzureResourcesConfigProvider> logger = scope.ServiceProvider.GetService<ILogger<AzureResourcesConfigProvider>>();
// Run the create/update azure resources job and wait for it to finish
Task.Run(async () =>
{
// Get the service bus namespace resource
ServiceBusNamespaceResource namespaceResource = await AzureUtil.GetServiceBusNamespaceResource(eventSourcingConfiguration.ServiceBusSecondaryNamespace);
if (namespaceResource != null)
{
logger.LogInformation("Preparing event processors using service bus namespace: {0}", namespaceResource.Data.Name);
// Create or get event-sourcing-topic resource
ServiceBusTopicResource eventSourcingTopicResource = await AzureUtil.PrepareEventSourcingTopic(namespaceResource, eventSourcingConfiguration.AzureServiceBusTopicName);
// Create or update subscriptions for event processors and saga event processors
await AzureUtil.UpdateEventProcessors(AzureUtil.GetAllEventProcessors(), eventSourcingTopicResource);
await AzureUtil.UpdateSagaEventProcessors(AzureUtil.GetAllSagaEventProcessors(), eventSourcingTopicResource);
// In case service may receive commands on saga-command-topic, create or get saga-command-topic resource
if (eventSourcingConfiguration.AzureServiceBusSagaCommandTopicName != null)
{
ServiceBusTopicResource sagaCommandTopicResource = await AzureUtil.PrepareEventSourcingTopic(namespaceResource, eventSourcingConfiguration.AzureServiceBusSagaCommandTopicName);
// Create or update subscriptions for command handlers
await AzureUtil.UpdateCommandHandlers(AzureUtil.GetAllCommandHandlers(), sagaCommandTopicResource);
}
}
else
{
logger.LogError("Service Bus Namespace Resource not found.");
}
// Get the storage account resource
StorageAccountResource storageAccountResource = await AzureUtil.GetAccountStorageResource(eventSourcingConfiguration.TableStorageAccountUrl);
if (storageAccountResource != null)
{
logger.LogInformation("Preparing event storage using account storage: {0}", storageAccountResource.Data.Name);
// Create azure resources, tables and blob containers, inside storage account
await AzureUtil.PrepareEventStorage(storageAccountResource, eventSourcingConfiguration);
}
else
{
logger.LogError("Account Storage Resource not found.");
}
}).Wait();
}
}
}
Running Data Migrations
Sooner or later, you will need to run data migrations on event storage and projection database.
To support it, IglooSoftware.Sdk.EventSourcing
provides a MigrationService
which can be used to run data migrations.
To make sure if specific migration has been run, MigrationService
uses a MigrationAggregate
which stores migration state in event storage.
In case that migration has been run, MigrationService
will skip it and continue with the next migration.
If migration has not been run or it has been failed, MigrationService
will run it again on next service startup.
It is recommended to run data migrations on service startup, before the service starts processing commands and events.
Here is an example of how to run data migrations on service startup:
[assembly: WebJobsStartup(typeof(InitializationService))]
namespace Igloo.UserService.Init
{
public class InitializationService : IWebJobsStartup
{
public void Configure(IWebJobsBuilder builder)
{
builder.AddExtension<MigrationsConfigProvider>();
}
}
[Extension("Migrations")]
internal class MigrationsConfigProvider : IExtensionConfigProvider
{
private readonly IServiceScopeFactory _scopeFactory;
public MigrationsConfigProvider(IServiceScopeFactory scopeFactory)
{
_scopeFactory = scopeFactory;
}
public void Initialize(ExtensionConfigContext context)
{
using IServiceScope scope = _scopeFactory.CreateScope();
MigrationCommandHandler migrationCommandHandler = scope.ServiceProvider.GetService<MigrationCommandHandler>();
ITenantConfigService tenantConfigService = scope.ServiceProvider.GetService<ITenantConfigService>();
Task.Run(async () =>
{
// Iterating through all tenants and run data migrations for each tenant
await foreach (TenantConfigModel tenantConfig in tenantConfigService.GetTenantConfigsForAllTenants(Constants.ServiceKey))
{
// Run migration to migrate users to aggregates
await migrationCommandHandler.Handle(new MigrateUsersToAggregatesCommand(tenantConfig));
// Run other migrations
// ...
}
});
}
}
}
Running Locally
To run service locally, you will need to setup Azurite and own tables and blob containers in storage account.
Here is an example of event sourcing configuration for local environment:
"AzureStorageTableName": "{username}EventStorageFor{specificService}",
"AzureStorageCommitTableName": "{username}EventStorageCommits",
"AzureStorageDeadLetterTableName": "{username}EventStorageDeadLetter",
"AzureStorageContainerName": "{username}-event-storage-sequences",
"AzureStorageSnapshotsName": "{username}-event-storage-snapshots",
"AzureServiceBusTopicName": "{username}-event-sourcing-topic",
"AzureServiceBusSagaCommandTopicName": "{username}-saga-command-topic"
Replace {username}
with your username.
To use own CosmosDB database, you will need to create own CosmosDB containers inside local-data Azure database.
It will be easy to set it up using UseCustomTenantConfig
configuration property inside local.settings.json
.
Do not connect using default configuration from local.settings.json
from local environment,
because it will affect dev environment and make it inconsistent.
Motivation
Main Concepts
Event Sourcing
Event Sourcing captures and stores events that represent actions or changes in an application's state. By maintaining a list of events, Event Sourcing provides a natural audit trail and ensures the events serve as the source of truth. Unlike traditional state storage, Event Sourcing offers additional business value and facilitates flexible designs when combined with CQRS. It also supports ACID operations for write databases and eventual consistency for read databases, striking a balance between data consistency and system scalability.
CQRS
Command Query Responsibility Segregation (CQRS) is an architectural pattern that promotes efficient scalability in applications. By decoupling write and read operations and introducing separate databases for each, CQRS enables optimized write and read performance. This pattern is especially beneficial when using NoSQL databases.
Combining CQRS with Event Sourcing simplifies write operations by appending new events instead of updating the database directly.
Domain Driven Design
Domain Driven Design (DDD) is a popular approach for aligning business and technical terms, isolating business logic, and simplifying the determination of bounded contexts. It provides a powerful solution for bounding microservices and minimizing unnecessary communication between them. DDD is particularly effective when designing services that utilize NoSQL databases, such as CosmosDB, Blob, or Table Storage. By referencing specific units or Aggregates, events stored in Event Storage gain wider meaning, enabling efficient Event Sourcing at the Aggregate level.
Eventual Consistency
CQRS ensures data consistency by adopting eventual consistency, where the write database remains strongly consistent, while the read database can be eventually consistent. Eventual consistency is achieved by listening to changes from the write database through a Service Bus Topic.
Event Snapshotting
To address performance concerns related to the large number of events for an Aggregate, event snapshotting is employed. Snapshotting involves periodically storing the state of an Aggregate in Blob Storage. When loading an Aggregate, the latest snapshot is loaded, and only the subsequent events are applied. This method efficiently handles performance issues associated with event sourcing and large numbers of aggregate events.
When working with snapshots we must be careful, especially during migrations. Solutions such as aggregate snapshot versions or snapshot recreation based on all events are available to manage snapshot-related challenges.
Azure Resources
Azure Functions
Azure Functions is a serverless compute service that enables you to run code on-demand without having to explicitly provision or manage infrastructure. Use Azure Functions to run a script or piece of code in response to a variety of events. Azure Functions supports triggers, which are ways to start execution of your code, and bindings, which are ways to simplify coding for input and output data.
Durable Functions is an extension of Azure Functions that lets you write stateful functions in a serverless. These functions orchestrate calls to other functions, including Queries and Commands. In case of failures, Durable Functions handle dispatching compensation operations to ensure eventual data consistency.
Table Storage
Table Storage is utilized for storing events that determine the state of an Aggregate in a strongly consistent manner. The partition key represents the identification of the Aggregate, while the RowKey is a sequence number written in a string ordering safe form.
Blob Storage
Blob Storage is used for storing the sequence number of the Aggregate. Additionally, Blob Storage is employed for storing Aggregate snapshots, offering performance improvements when dealing with a large number of events for a specific Aggregate.
Service Bus
Service Bus enables communication between different function apps or functions using commands and events. It transfers events from Table Storage to the projections side, specifically the Event Processors responsible for making necessary changes within the CosmosDB database, which is primarily used for querying purposes.
In case of transferring Commands, Service Bus is used to transfer commands from an Saga to target function apps and their Command Handlers.
In case of failure while handling Command, it will be retried specified number of times and then moved to the Dead Letter Queue and Saga will be notified about failure.
Cosmos DB
CosmosDB serves as the data storage solution for storing projections based on events originating from Table Storage. It facilitates efficient querying of data.
SignalR
SignalR is utilized to provide information about processed commands from the Event Processor to specific users or groups of users interested in the changes. It enables real-time updates and notifications.
IglooSoftware.Sdk.EventSourcing
To facilitate the integration of Azure Services and the utilization of relevant patterns, the Igloo.Sdk.EventSourcing library is introduced. It exposes three interfaces that Function Apps/Functions should utilize to ensure the proper implementation of patterns and derive the associated business benefits.
Injectable Components
IEventStorage
Provides methods for loading Aggregate based on provided Aggregate identifier and appending new events to Aggregate.
There are two implementations of IEventStorage:
- DefaultEventStorage - for production purposes
- InMemoryEventStorage - for testing purposes
DefaultEventStorage
DefaultEventStorage is the default implementation of IEventStorage.
Main responsibilities of DefaultEventStorage are:
- Loading specific Aggregate by loading related events from Table Storage and applying them to Aggregate using Handler methods.
- If the Aggregate Snapshot exists, it will be loaded and all events that come after the snapshot is created will be applied to it.
- Appending new event to Table Storage but before it, storing event commit to Commit table
- Sending event to Service Bus Topic for further processing by Event Processor and removing event from Commit table
- Creating snapshots periodically for all or specific Aggregates containing a larger number of events, or creating snapshot on each N events.
- Support in-memory cache for Aggregates to prevent loading Aggregates without need. In case the conflict is detected, Aggregate instance inside cache is replaced with new one.
- Support for event versioning using "Upscale" methods. Each event may contain an Upscale static method that will contain a list of rules for migrating the event from one version of the event to the next version, executing the rules from the version of the event at the time of saving the event to the current version of the event.
InMemoryEventStorage
In memory implementation of IEventStorage. It's used for testing purposes only.
IEventProcessor
Provides methods for processing events from Event Storage / Service Bus, primarily for updating CosmosDB projections.
It implements a dead-letter table in an Aggregate safe consistent way by guaranteeing that event will be executed in order and in case of failing one of the events further events will be moved to a dead-letter topic also to avoid data consistency issues. After resubmitting events from the dead-letter topic all resubmitted events will be executed in order.
It provides the ReplayEvents method also using which it’s possible to replay all events from event storage (or for specific aggregate only) and apply them to provided supplier, e.g. to re-create the CosmosDB database or recreate it for a specific aggregate only.
ICommandRouter
Provides methods for sending commands from one function app to another through Service Bus. In case of executing a command from Saga, the corresponding SagaId will be provided and events dispatched as a consequence of that Command will contain the same SagaId in response, so the corresponding Saga can receive the response and continue execution.
It provides a method for handling Commands also. For communication inside the same function app, direct communication is preferred because there is no need to go through Service Bus.
IEventSourcingProvider
EventSourcingProvider is a wrapper around IEventStorage, IEventProcessor, and ICommandRouter.
It provides methods for loading specific EventStorage, EventProcessor, and CommandRouter based on the provided tenant configuration.
To keep good performance, components are cached inside the provider and reused for further requests.
Base Components
BaseAggregate
An abstract class that needs to be extended to create a specific Aggregate inside a specific service. It's referenced by using identification based on which IEventStorage will load all related events and create the aggregate state based on the events.
Aggregate Id needs to be designed carefully, it's possible to access the aggregate only by using the Aggregate Id.
Each aggregate contains a list of "Handle" methods where each handler updates the aggregate state based on the event it receives.
Here is an example of a simple Aggregate:
public class UserAggregate : BaseAggregate
{
#region Aggregate State
public Guid UserId { get; set; }
public string UserName { get; set; }
public string Email { get; set; }
#endregion
public UserAggregate() {
// Create snapshot after each 512 events.
// Optional parameter, used as performance improvement.
// Use it carefully because it may cause JSON serialization issues.
// In case of using snapshoting and updating Aggregate
// in way it's not possible to deserialize it,
// remove incompactible snapshots from Blob Storage and it will be recreated automatically.
CreateSnapshotAfter = 512;
}
// Each Handle method updates aggregate state based on the event it receives
// Method which creates aggregate state should set Id property
// Based on Id property it's possible to know if aggregate is new or already exists
#region Handle Methods
public UserCreatedEvent Handle(UserCreatedEvent e)
{
Id = e.StreamId; // Set Id on aggregate creation
UserId = e.CreatedUserId;
UserName = e.UserName;
Email = e.Email;
return e;
}
// Mutating aggregate state based on event
public UserUpdatedEvent Handle(UserUpdatedEvent e)
{
Email = e.Email;
return e;
}
// In case of "deleting" aggregate, Id property should be set to null
public UserDeletedEvent Handle(UserDeletedEvent e)
{
Id = null;
return e;
}
#endregion
// Static method which generates StreamId (Aggregate Id) based on passed parameters
// StreamId is used for identifying aggregate in EventStorage
public static string GetStreamId(Guid tenantId, Guid userId)
{
return $"{tenantId}_{userId}";
}
}
Each Aggregate will contain fields:
- Id - Aggregate Id, used for identifying aggregate in EventStorage
- Type - Aggregate Type, used for identifying Aggregate Type while serializing/deserializing snapshots
- Module - Module name, used for identifying module where to search for Aggregate class
- SequenceNumber - Sequence number, value of last event applied to Aggregate
BaseEvent
An abstract class that needs to be extended by all events dispatched by a specific service. These events will be used as a source for creating an Aggregate state.
Each event will contain following properties:
- Id - Event Id, unique identifier of the event used for deduplication
- StreamId - Aggregate Id, used for identifying aggregate in EventStorage to which event belongs
- SequenceNumber - Field which guarantees that events will be stored and loaded in the corresponding order.
- AggregateType - Aggregate Type to which event belongs
- SagaId - Optional parameter used for identifying Saga which dispatched command based on which event was dispatched
- Type - Event Type, used for identifying Event Type while serializing/deserializing events
- Module - Module name, used for identifying module where to search for Event class
- SkipEventProcessing - Set it to true in case event should be skipped while processing events from EventStorage. In that case it will be used only for Aggregate state mutation.
- Version - Event version, used for versioning events. In case of using versioning, it's possible to upscale events from one version to another using Upscale methods.
- EntityId - Entity Id, used for identifying entity to which event belongs
The Event can contain complex data also like lists, dictionaries, and custom Models, content will be flattened and stored in Table storage.
In the case of a large number of columns, it's possible to serialize the content as string and store it as separate property marked with [IgnoreProperty] attribute.
There is special type of BaseEvent called TenantEvent which contains TenantId property and three optional properties WorkplaceId, UserId and SupportUserId. These kind of events are specific for multi-tenant applications.
Here is an example of a simple Event:
public class UserUpdatedEvent : TenantEvent
{
public Guid UpdatedUserId { get; set; }
public string Email { get; set; }
// Complex property, will be flattened and stored in Table storage
public Name Name { get; set; }
// Ignore complex property while serializing/deserializing
[JsonIgnore]
public IDictionary<Guid, string> UserGroups { get; set; }
// Serialize complex property as string and
// set it's value to original property while deserializing
public string UserGroupsSerialized
{
get => JsonConvert.SerializeObject(UserGroups);
set => UserGroups = JsonConvert.DeserializeObject<IDictionary<Guid, string>>(value);
}
public UserUpdatedEvent(string streamId, Guid tenantId, Guid workplaceId, Guid userId) : base(streamId, tenantId, workplaceId, userId) { }
}
Here is example of event which uses versioning:
public class UserUpdatedEvent : BaseEvent
{
public Guid UpdatedUserId { get; set; }
public string UserEmail { get; set; }
public override int Version { get; set; } = 2;
// Upscale method will be called while loading event from EventStorage
// and before converting to specific Event changes will be applied to event sources
public static void Upscale(IDictionary<string, object> source)
{
switch (GetVersion(source))
{
case 0:
// Migrate from old property name
source["Email"] = source["Mail"];
goto case 1;
case 1:
// Migrate from old property name again
source[nameof(UserEmail)] = source["Email"];
break;
}
}
}
BaseCommand
An abstract class that needs to be extended by all commands dispatched by a specific service. These commands can be internal or external. If the command is part of a specific Saga then the SagaId property needs to be provided while creating the command instance. If the command is created based on an eventually consistent state and it’s crucial that the command is executed with strong consistency then field Version needs to be provided inside the command so it can be validated inside specific CommandHandler.
Each command will contain following properties:
- Id - Command Id, unique identifier of the event used for deduplication
- SagaId - Optional parameter used for identifying Saga from which command was dispatched
- SagaName - Optional parameter used for identifying Saga name from which command was dispatched
- Type - Event Type, used for identifying Event Type while serializing/deserializing events
- Module - Module name, used for identifying module where to search for Event class
- Timeout - Command timeout, used for setting timeout for command execution. If command is not executed in specified time, it will be moved to dead letter queue and than CommandErrorEvent will be dispatched to corresponding Saga.
- Version - Optional parameter used for validating command version. It's used in cases when command is created based on eventually consistent state and it's crucial that command is executed with strong consistency. In case inconsistency is detected, InconsistentStateException will be dispatched to corresponding Saga.
There is special type of BaseCommand called TenantCommand which contains TenantId property and three optional properties WorkplaceId, UserId and SupportUserId. These kind of commands are specific for multi-tenant applications.
There are two constructors for TenantCommand, one receiving TenantId as parameter and another receiving TenantConfigModel as parameter. Constructor which receive TenantConfigModel should be used for internal command handlers only.
Here is an example of a simple Command:
public class UpdateUserCommand : TenantCommand
{
public Guid UserIdToUpdate { get; set; }
public string Email { get; set; }
public Name Name { get; set; }
public UpdateUserCommand(Guid tenantId, Guid workplaceId, Guid userId)
: base(tenantId, workplaceId, userId)
{
// Command will expire after 60 minutes and CommandErrorEvent will be dispatched to Saga
Timeout = TimeSpan.FromMinutes(60);
}
}
Command Handler
Each service contains one or more Command Handlers that receive specific Commands based on which they load one or more aggregates, validate them and based on command action and aggregate state it dispatches some events using IEventStorage or throws errors. Command Handlers never return results except in cases when aggregate is created, in that case, it returns only Aggregate identifier in response.
To receive Commands from other function apps, the Command Handler subscribes to "saga-command-topic" using a specific subscription to receive specific Commands only.
Here is an example of a simple Command Handler:
// CommandHandler attribute is used for registering Command Handler
// Parameter maxDeliveryCount is used for setting maximum number of times that a command message can be delivered
[CommandHandler(CommandHandlerName, maxDeliveryCount: 2)]
public class UserActivationCommandHandler : BaseCommandHandler
{
// CommandHandlerName is used for identifying Command Handler, it's used for creating Azure Topic Subscription
// and it's referenced from corresponding listener function also
internal const string CommandHandlerName = $"{Constants.ServiceKey}{nameof(UserActivationCommandHandler)}";
private readonly ITenantConfigService _tenantConfigService;
private readonly IEventSourcingProvider _eventSourcingProvider;
private readonly IUserActivationService _userActivationService;
public UserActivationCommandHandler(
IEventSourcingProvider eventSourcingProvider, ILogger<UserActivationCommandHandler> logger,
ITenantConfigService tenantConfigService, IUserActivationService userActivationService) : base(logger)
{
_eventSourcingProvider = eventSourcingProvider;
_tenantConfigService = tenantConfigService;
_userActivationService = userActivationService;
}
// List of Handle methods for each Command based on which Azure Topic Subscription query will be created for specific Command Handler
#region Command Handlers
public async Task Handle(CreateUserActivationCommand command)
{
// Get stream id based on provided params
string streamId = UserActivationAggregate.GetStreamId(command.TenantId, command.ForUserId);
TenantConfigModel tenantConfig = await _tenantConfigService.GetTenantConfig(command.TenantId);
// Get EventStorage for specific Tenant
IEventStorage eventStorage = _eventSourcingProvider.GetEventStorage(tenantConfig);
// Appending new event to specific Aggregate loaded by provided stream id
await eventStorage.AppendEvent<UserActivationAggregate>(streamId,
state =>
{
// Do specific validation based on current state and dispatch error if needed
// If command is dispatched from Saga, exception will be wrapped in CommandErrorEvent and dispatched to target Saga
if (state.Id != null)
{
throw new UserActivationAlreadyExistsException(state.Id, state.ActivationCode);
}
// Dispatching new event to specific Aggregate
// In case null is returned, event will not be dispatched and no changes will be applied to Aggregate
return new UserActivationCreatedEvent(streamId, command.TenantId, command.WorkplaceId!.Value, command.UserId!.Value)
{
FirstName = command.FirstName,
LastName = command.LastName,
ForUserId = command.ForUserId,
Username = command.Username,
Email = command.Email,
ActivationCode = _userActivationService.GenerateActivationCode(),
ExpiresOn = DateTime.UtcNow.AddHours(Startup.ActivationExpiryHours),
SagaId = command.SagaId
};
});
}
#endregion
}
Consistency Check
In case that command is created based on eventually consistent state and it’s crucial that the command is executed with strong consistency then field Version needs to be provided inside the command so it can be validated inside specific CommandHandler.
In case that inconsistency is detected based on provided Version inside command and list of events which may create collision (e.g. UserUpdatedEvent, UserDeletedEvent) and if one of those events is applied to specific aggregate in meantime, InconsistentStateException will be dispatched to corresponding Saga.
await eventStorage.AppendEvent<UserAggregate>(streamId,
state =>
{
if (state.Id == null || state.IsDeleted)
{
throw new UserNotFoundException(command.UserIdToUpdate);
}
return new UserUpdatedEvent(streamId, command.TenantId, command.WorkplaceId, command.UserId)
{
UpdatedUserId = command.UserIdToUpdate,
Email = command.Email,
IdentityProviderId = command.IdentityProviderId,
ActivityFeedId = command.ActivityFeedId,
Name = command.Name,
LastModifiedOn = DateTime.UtcNow,
SagaId = command.SagaId
};
}, command.Version, new[] { typeof(UserUpdatedEvent), typeof(UserDeletedEvent) });
Command Retry
In case that event is appended on outdated aggregate version, supplier which appends event will be restarted on newer version of aggregate and event will be appended again. This will be repeated until event is appended successfully.
In case that supplier returns null or throws exception, additional check will be performed to check if aggregate version is updated in meantime. If it is, supplier will be restarted on newer version of aggregate.
In this way, it's possible to ensure that event will be appended to aggregate successfully in consistent way.
In case that it's needed to restart supplier on newer version of aggregate, it's possible to throw RestartCommandException from supplier. It's useful in case that old data exists in projection database but aggregate snapshot doesn't exist. In that case it's possible to create aggregate snapshot inside append event supplier and restart supplier on newer version of aggregate which will load aggregate state from snapshot created inside append event supplier.
Here is an example of append event supplier which creates aggregate snapshot in case that it doesn't exist:
await eventStorage.AppendEvent<UserActivationAggregate>(streamId,
async state =>
{
if (state.Id == null)
{
// In case that aggregate snapshot doesn't exist, create it and restart supplier on newer version of aggregate
// created from snapshot data based on projection database
if (await InitializeSnapshotForOldData(streamId, tenantConfig, command.ForUserId, eventStorage))
{
throw new RestartCommandException();
}
throw new UserActivationNotFoundException(command.ForUserId);
}
return new UserActivationResetEvent(
streamId, command.TenantId, command.WorkplaceId, command.UserId!.Value)
{
ForUserId = command.ForUserId,
ActivationCode = _userActivationService.GenerateActivationCode(),
ExpiresOn = DateTime.UtcNow.AddHours(Startup.ActivationExpiryHours),
SagaId = command.SagaId
};
});
Dead Letter
Command received from Service Bus topic command-saga-topic will be forwarded to corresponding CommandHandler.
In case that CommandHandler throws exception, command message will be forwarded to dead letter queue.
Each CommandHandler which supports running commands from outside needs to have corresponding dead letter topic listener which will send CommandErrorEvent to corresponding Saga based on SagaName property inside command message on dead letter queue.
Here is an example of dead letter topic listener for UserActivationCommandHandler:
[FunctionName($"DeadLetterFor{nameof(UserActivationCommandHandler)}")]
public async Task DeadLetterForUserActivationCommandHandlerListener([
ServiceBusTrigger("%AzureServiceBusSagaCommandTopicName%", $"{UserActivationCommandHandler.CommandHandlerName}/$deadletterqueue",
Connection = "ServiceBusSecondaryConnection")] ServiceBusReceivedMessage message)
{
await DefaultCommandRouter.SendCommandErrorEvent(
message, async command => _eventSourcingProvider.GetCommandRouter(await _tenantConfigService.GetTenantConfig(command.TenantId)));
}
Event Processor
Each service contains one or more event processors responsible for:
- Listening for events from Service Bus using a specific subscription to receive events specific to an Aggregate. There is one to one mapping between EventProcessor and Aggregate.
- Based on these events Event Processor updates the state inside the projection database in an eventually consistent way, e.g. CosmosDB database.
- If an update to CosmosDB fails, the event will be retried a specific number of times defined on the Service Bus Topic Subscription. After it exceeds it will be moved to the Dead-Letter topic. In meantime, while retrying the event, a record with a partition key equal to Aggregate Id and RowKey equal to Sequence Number will be inserted into Event Storage Dead Letter Table to prevent execution of the next event related to specific Aggregate to EventProcessor in case the previous event failed. When an event is executed successfully it will be removed from Dead Letter Table and the further events can be processed also. This will guarantee that the database state is consistent in case of partial failure. Events can be submitted from Dead Letter Topic to finish the database update process.
Here is an example of a simple Event Processor:
// EventProcessor attribute is used for registering Event Processor using specific name
// Optional parameter maxDeliveryCount is used for setting maximum number of times that an event can be delivered
// Optional parameter requiresSession is used for setting if Event Processor requires session to be enabled on Service Bus Topic Subscription
[EventProcessor(EventProcessorName, typeof(UserActivationAggregate), maxDeliveryCount: 3, requiresSession: true)]
public class UserActivationEventProcessor : BaseEventProcessor
{
#region Properties
// EventProcessorName is used for identifying Event Processor, it's used for creating Azure Topic Subscription
// and it's referenced from corresponding listener function also
internal const string EventProcessorName = $"{Constants.ServiceKey}{nameof(UserActivationEventProcessor)}";
private readonly ITenantConfigService _tenantConfigService;
private readonly ICosmosClientFactory _cosmosClientFactory;
#endregion
#region Constructors
public UserActivationEventProcessor(ILogger<UserActivationEventProcessor> logger,
ITenantConfigService tenantConfigService, ICosmosClientFactory cosmosClientFactory) : base(logger)
{
_tenantConfigService = tenantConfigService;
_cosmosClientFactory = cosmosClientFactory;
}
#endregion
// List of Handle methods for each Event which needs to be processed by Event Processor
#region Event Handlers
internal async Task Handle(UserActivationResetEvent e)
{
TenantConfigModel tenantConfigModel = await _tenantConfigService.GetTenantConfig(e.TenantId);
// Load CosmosDB client for specific Tenant
IAzureCosmosClient<UserActivationDataModel> cosmosClient = _cosmosClientFactory.Get<UserActivationDataModel>(tenantConfigModel);
// Load existing projection data from CosmosDB
UserActivationDataModel existing = await cosmosClient.Get(e.ForUserId.ToString(), $"{e.TenantId}_{Constants.Type}");
existing.ActivationCode = e.ActivationCode;
existing.ExpiresOn = e.ExpiresOn;
existing.Status = Models.Enums.Status.Pending;
existing.LastModifiedBy = e.UserId!.Value;
existing.LastModifiedOn = e.DispatchedOn;
existing.Version = e.SequenceNumber;
// Update projection data in CosmosDB
await cosmosClient.Update(existing);
}
#endregion
}
Saga Event Processor & Saga Orchestrator
It’s similar to EventProcessor, the difference is that it will receive events using a specific subscription through which it can receive events dispatched in name of other Aggregates or by other function apps.
The purpose of these events is not only to update the projection database, these events will potentially contain SagaId property inside, based on which event processor needs to provide these events to specific Saga Orchestrator.
Saga Orchestrator is a special type of Azure Function that is responsible for orchestrating long-running business processes.
It's based on Azure Durable Functions and it’s started based on a specific event from SagaEventProcessor. SagaOrchestrator contains some business logic that orchestrates one or multiple calls to external function apps or external services. These calls can be HTTP calls or Event-Based Commands sent through ICommandRouter.
SagaOrchestrator will "sleep" until the result of the Command come back through SagaEventProcessor or HTTP result is returned. In meantime, Orchestrator will save the state so it can continue from the place where it "slept".
Here is an example of a simple Saga Event Processor with Saga Orchestrator
// SagaEventProcessor attribute is used for registering Saga Event Processor using specific name
// Optional parameter maxDeliveryCount is used for setting maximum number of times that an event can be delivered
// Optional parameter requiresSession is used for setting if Saga Event Processor requires session to be enabled on Service Bus Topic Subscription
[SagaEventProcessor(SagaName, maxDeliveryCount: 2, requiresSession: true)]
public class ImportUserSaga : BaseEventProcessor
{
#region Properties
// SagaName is used for identifying Saga Event Processor, it's used for creating Azure Topic Subscription
// and it's referenced from corresponding listener function also
internal const string SagaName = $"{Constants.ServiceKey}{nameof(ImportUserSaga)}";
#endregion
// List of Handle methods for each Event which needs to be processed by Saga Event Processor
// based on which Azure Subscription Query is created
#region Event Handlers
internal async Task Handle(UserImportStartedEvent @event, IDurableOrchestrationClient orchestrationClient)
{
// Start new Saga Orchestrator by running Azure durable function which will run ImportUserSaga.RunSaga method
await orchestrationClient.StartNewAsync(
nameof(ImportUserSagaFunctions.ImportUserSagaOrchestrator), $"{@event.StreamId}_{@event.Data.Username}", @event);
}
// Orchestrate method is used for orchestrating events to corresponding Saga Orchestrator
// Event received by Orchestrate method must contain SagaId property
internal async Task Orchestrate(UserCreatedEvent @event, IDurableOrchestrationClient orchestrationClient)
{
await orchestrationClient.RaiseEventAsync(@event.SagaId, @event.GetType().Name, @event);
}
internal async Task Orchestrate(UserUpdatedEvent @event, IDurableOrchestrationClient orchestrationClient)
{
await orchestrationClient.RaiseEventAsync(@event.SagaId, @event.GetType().Name, @event);
}
internal async Task Handle(CommandErrorEvent @event, IDurableOrchestrationClient orchestrationClient)
{
await RaiseEventAsync(@event, orchestrationClient);
}
// Base Handle method which will be called in case there is no Handle method for specific event
// In case SagaId is provided it will forward event to corresponding Orchestrate method
internal async Task Handle(BaseEvent e, IDurableOrchestrationClient orchestrationClient)
{
if (e.SagaId != null)
{
await Orchestrate((dynamic)e, orchestrationClient);
return;
}
Logger.LogInformation("Unsupported event type " + e.Type);
}
#endregion
#region Internal Methods
// RunSaga method is used for running Saga Orchestrator
// Each Saga Orchestrator will be started using some input event which will orchestrate further commands
// Timer should be used in case orchestrator needs to have some timeout
//
internal async Task RunSaga(IDurableOrchestrationContext context, ILogger log)
{
var inputEvent = context.GetInput<UserImportStartedEvent>();
using var timeoutCts = new CancellationTokenSource();
// Orchestrator Id is used as SagaId for referencing commands which needs to be sent in
// the name of Saga Orchestrator
var sagaId = context.InstanceId;
try
{
var timeoutTask = context.CreateTimer(context.CurrentUtcDateTime.Add(TimeSpan.FromHours(2)), timeoutCts.Token);
var (userId, existingUser) = await CreateOrUpdateUserWithRetry(context, timeoutTask, inputEvent, sagaId, log);
// orchestration logic
await CompleteUserImport(context, inputEvent, userId!.Value, sagaId, log);
}
catch (Exception e)
{
log.LogError(e, "Error in saga orchestration");
await FailUserImport(context, inputEvent, sagaId, log);
throw;
}
finally
{
// Cancel timeout task in case orchestration is finished before timeout
timeoutCts.Cancel();
}
}
// Azure durable function which will be started by Saga Event Processor using specific event
[FunctionName(nameof(ImportUserSagaOrchestrator))]
public async Task ImportUserSagaOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
log = context.CreateReplaySafeLogger(log);
log.LogInformation($"Starting saga orchestration for {context.InstanceId}");
await _importUserSaga.RunSaga(context, log);
}
}
Unit Testing
For testing purposes, InMemoryEventStorage can be used. It's an implementation of IEventStorage interface which stores events in memory.
It receives Event Processor supplier function as parameter which is used to check which Events are dispatched as output.
SampleEvent expectedEvent = null;
// Arrange
_eventSourcingProvider.Setup(x => x.GetEventStorage(It.IsAny<TenantConfigModel>()))
.Returns(new InMemoryEventStorage(@event =>
{
expectedEvent = (SampleEvent)@event;
}));
Using it it's possible to create tests for Command Handlers in a way to provide Command to Command Handler and to assert that Command Handler will produce expected events or to assert that Command Handler will throw expected exception.
// Act
var sampleCommandHandler = new SampleCommandHandler(_eventSourcingProvider.Object, _nullLogger, tenantConfigService.Object);
await sampleCommandHandler.Handle(
new SampleCommand(Guid.NewGuid().ToString(), Guid.NewGuid(), _tenantConfigModel, _userToken));
// Assert
Assert.IsType<SampleEvent>(expectedEvent);
Also it's possible to provide initial state for Aggregate and check state of Aggregate after Command is executed.
// initial aggregate state
var sampleAggregate = new SampleAggregate
{
Names = new List<string>{"Test1", "Test2"}
};
SampleEvent expectedEvent = null;
// Arrange
_eventSourcingProvider.Setup(x => x.GetEventStorage(It.IsAny<TenantConfigModel>()))
.Returns(new InMemoryEventStorage(new Dictionary<Type, BaseAggregate>
{
{
typeof(SampleAggregate), sampleAggregate
}
}, @event =>
{
expectedEvent = (SampleEvent)@event;
}));
Assert.Equal(3, sampleAggregate.Names.Count);
In case that Command Handler dispatch multiple events, it's possible to assert that all events are dispatched in expected order or number of events or Aggregate state.
IList<BaseEvent> expectedEvents = new List<BaseEvent>();
// Arrange
_eventSourcingProvider.Setup(x =>x.GetEventStorage(It.IsAny<TenantConfigModel>()))
.Returns(new InMemoryEventStorage(@event =>
{
expectedEvents.Add(@event);
}));
var createUserCommand = new CreateUserCommand
{
UserName = "test",
TenantConfig = _tenantConfigModel
};
// Act
var result = await _userCommandHandler.Handle(createUserCommand);
// Assert
Assert.Equal(2, expectedEvents.Count);
Event Storming
Event Storming is a workshop format for quickly exploring complex business domains. It was created by Alberto Brandolini.
It starts with a big picture view and then drills down into the details. It’s a great way to explore a business domain and to identify the events that occur within it.
Except events, it can be used to identify Commands, Aggregates, Bounded Contexts and Sagas.
As result of workshop we will get a diagram that looks like this:
Based on Event Storming diagram it's easy to identify events, commands, aggregates, bounded contexts and sagas and to create corresponding implementation classes.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net6.0
- Azure.Data.Tables (>= 12.8.0)
- Azure.ResourceManager.ServiceBus (>= 1.0.0)
- Azure.ResourceManager.Storage (>= 1.1.1)
- IglooSoftware.Sdk.Authorization (>= 0.0.62)
- IglooSoftware.Sdk.Eventing (>= 0.0.100)
- JsonFlatten (>= 1.0.2)
- Microsoft.Azure.Functions.Worker.Extensions.ServiceBus (>= 5.19.0)
- Microsoft.Azure.WebJobs.Extensions.DurableTask (>= 2.9.0)
- Microsoft.Azure.WebJobs.Extensions.ServiceBus (>= 5.3.0)
- Microsoft.Azure.WebJobs.Extensions.Storage.Blobs (>= 5.0.1)
- System.Linq.Async (>= 6.0.1)
- System.Security.Cryptography.Algorithms (>= 4.3.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
0.0.103 | 89 | 10/30/2024 |
0.0.102 | 90 | 10/24/2024 |
0.0.101 | 965 | 9/12/2024 |
0.0.100 | 94 | 9/12/2024 |
0.0.99 | 121 | 9/11/2024 |
0.0.98 | 3,172 | 7/4/2024 |
0.0.97 | 82 | 7/2/2024 |
0.0.96 | 255 | 6/11/2024 |
0.0.95 | 1,783 | 5/1/2024 |
0.0.94 | 341 | 3/20/2024 |
0.0.92 | 2,890 | 9/8/2023 |
0.0.91 | 139 | 9/6/2023 |
0.0.90 | 477 | 7/20/2023 |
0.0.89 | 1,143 | 6/13/2023 |
0.0.88 | 1,163 | 6/8/2023 |
0.0.87 | 317 | 6/7/2023 |
0.0.86 | 168 | 6/6/2023 |
0.0.85 | 164 | 6/6/2023 |
0.0.84 | 174 | 6/6/2023 |
0.0.83 | 166 | 6/6/2023 |
0.0.82 | 155 | 6/6/2023 |
0.0.81 | 160 | 6/6/2023 |
0.0.80 | 213 | 5/26/2023 |
0.0.79 | 155 | 5/23/2023 |
0.0.78 | 155 | 5/23/2023 |
0.0.77 | 165 | 5/15/2023 |
0.0.76 | 362 | 5/15/2023 |
0.0.75 | 163 | 5/11/2023 |
0.0.74 | 611 | 5/5/2023 |
0.0.73 | 167 | 5/5/2023 |
0.0.72 | 254 | 5/4/2023 |
0.0.71 | 170 | 5/4/2023 |
0.0.70 | 183 | 5/3/2023 |
0.0.69 | 166 | 4/27/2023 |
0.0.68 | 387 | 4/23/2023 |
0.0.67 | 195 | 4/20/2023 |
0.0.66 | 201 | 4/20/2023 |
0.0.65 | 193 | 4/20/2023 |
0.0.64 | 203 | 4/20/2023 |
0.0.63 | 186 | 4/20/2023 |
0.0.62 | 197 | 4/20/2023 |
0.0.61 | 218 | 4/18/2023 |
0.0.60 | 201 | 4/18/2023 |
0.0.59 | 227 | 4/18/2023 |
0.0.58 | 182 | 4/17/2023 |
0.0.57 | 216 | 4/13/2023 |
0.0.56 | 199 | 4/13/2023 |
0.0.55 | 202 | 4/13/2023 |
0.0.54 | 213 | 4/12/2023 |
0.0.53 | 249 | 4/12/2023 |
0.0.52 | 220 | 4/11/2023 |
0.0.51 | 210 | 4/11/2023 |
0.0.50 | 217 | 4/11/2023 |
0.0.49 | 221 | 4/10/2023 |
0.0.48 | 206 | 4/10/2023 |
0.0.47 | 191 | 4/10/2023 |
0.0.46 | 202 | 4/10/2023 |
0.0.45 | 195 | 4/10/2023 |
0.0.44 | 206 | 4/10/2023 |
0.0.43 | 211 | 4/10/2023 |
0.0.42 | 212 | 4/10/2023 |
0.0.41 | 238 | 4/7/2023 |
0.0.40 | 200 | 4/7/2023 |
0.0.39 | 215 | 4/6/2023 |
0.0.38 | 196 | 4/6/2023 |
0.0.37 | 206 | 4/5/2023 |
0.0.36 | 211 | 4/5/2023 |
0.0.35 | 208 | 4/5/2023 |
0.0.34 | 189 | 4/5/2023 |
0.0.33 | 234 | 4/4/2023 |
0.0.32 | 245 | 4/4/2023 |
0.0.31 | 208 | 4/4/2023 |
0.0.30 | 198 | 4/4/2023 |
0.0.29 | 213 | 4/4/2023 |
0.0.28 | 215 | 4/3/2023 |
0.0.27 | 265 | 3/30/2023 |
0.0.26 | 219 | 3/30/2023 |
0.0.25 | 232 | 3/29/2023 |
0.0.24 | 221 | 3/29/2023 |
0.0.23 | 234 | 3/29/2023 |
0.0.22 | 222 | 3/29/2023 |
0.0.21 | 249 | 3/28/2023 |
0.0.20 | 244 | 3/27/2023 |
0.0.19 | 225 | 3/27/2023 |
0.0.18 | 232 | 3/27/2023 |
0.0.17 | 242 | 3/22/2023 |
0.0.16 | 228 | 3/22/2023 |
0.0.15 | 229 | 3/22/2023 |
0.0.14 | 215 | 3/22/2023 |
0.0.13 | 223 | 3/22/2023 |
0.0.12 | 256 | 3/21/2023 |
0.0.11 | 226 | 3/21/2023 |
0.0.10 | 238 | 3/21/2023 |
0.0.9 | 230 | 3/21/2023 |
0.0.8 | 229 | 3/21/2023 |
0.0.7 | 221 | 3/21/2023 |
0.0.6 | 247 | 3/20/2023 |
0.0.5 | 273 | 3/19/2023 |
0.0.4 | 242 | 3/18/2023 |
0.0.3 | 290 | 3/15/2023 |
0.0.2 | 231 | 3/15/2023 |
0.0.1 | 253 | 3/15/2023 |