EasyRabbitFlow 1.0.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package EasyRabbitFlow --version 1.0.0
NuGet\Install-Package EasyRabbitFlow -Version 1.0.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="EasyRabbitFlow" Version="1.0.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add EasyRabbitFlow --version 1.0.0
#r "nuget: EasyRabbitFlow, 1.0.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install EasyRabbitFlow as a Cake Addin
#addin nuget:?package=EasyRabbitFlow&version=1.0.0

// Install EasyRabbitFlow as a Cake Tool
#tool nuget:?package=EasyRabbitFlow&version=1.0.0

RabbitFlow Documentation

Welcome to the documentation for the RabbitFlow library! This guide will walk you through the process of configuring and using RabbitMQ consumers in your application using RabbitFlow.

Table of Contents

  1. Introduction
  2. Installation
  3. Configuration
  4. Consumers
  5. Middleware
  6. Publishing Messages
  7. Queue State
  8. Configure Temporary Queues
  9. Examples

1. Introduction

Welcome to the documentation for the RabbitFlow library! This comprehensive guide will walk you through the process of configuring and using RabbitMQ consumers in your application using RabbitFlow.

RabbitFlow is a powerful and intuitive library designed to simplify the management of RabbitMQ consumers in your application. This documentation provides step-by-step instructions for setting up, configuring, and using RabbitFlow effectively in your projects.

RabbitFlow is specifically designed to interact with pre-defined exchanges and queues within RabbitMQ. The rationale behind this approach is to separate the concepts of usage from the creation and configuration of infrastructure.

Why Pre-defined Exchanges and Queues?

By interacting with pre-defined exchanges and queues, RabbitFlow allows you to decouple the infrastructure setup from your application's logic. This separation of concerns offers several advantages:

  • Clarity and Separation of Roles: It ensures that infrastructure-related tasks, such as defining exchanges and queues and configuring their properties, are handled independently from your application's business logic. This separation makes your codebase more organized and easier to maintain.

  • Reusability: You can reuse the same pre-configured exchanges and queues across multiple applications or components. This promotes code reusability and streamlines the deployment process.

  • Flexibility: You have the flexibility to configure exchanges and queues according to your specific requirements before integrating them with RabbitFlow. This means you can tailor the RabbitMQ infrastructure to suit your application's needs precisely.

  • Maintenance and Scalability: Separating the infrastructure setup allows for easier maintenance and scalability. You can update, modify, or scale your exchanges and queues independently of your application's codebase.

In essence, RabbitFlow simplifies the process of managing RabbitMQ consumers by focusing on the interaction with well-defined infrastructure components. This approach not only enhances the clarity and modularity of your code but also offers flexibility and scalability as your application evolves.

Now, let's dive into the details of how to set up, configure, and use RabbitFlow effectively in your projects.

2. Installation

To integrate the RabbitFlow library into your project, you can use the NuGet package manager:

dotnet add package EasyRabbitFlow

3. Configuration

To configure RabbitFlow in your application, use the AddRabbitFlow method:

builder.Services.AddRabbitFlow(opt =>
{
    // Configure host settings
    opt.ConfigureHost(hostSettings =>
    {
        hostSettings.Host = "rabbitmq.example.com";
        hostSettings.Username = "guest";
        hostSettings.Password = "guest";
    });

    // Configure JSON serialization options
    opt.ConfigureJsonSerializerOptions(jsonSettings =>
    {
        jsonSettings.PropertyNameCaseInsensitive = true;
        jsonSettings.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
        jsonSettings.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
    });

    // Add and configure consumers
    // ...
});
  • 3.1 Host Configuration

The ConfigureHost method allows you to specify the connection details for your RabbitMQ host:

opt.ConfigureHost(hostSettings =>
{
    hostSettings.Host = "rabbitmq.example.com";
    hostSettings.Username = "guest";
    hostSettings.Password = "guest";
});
  • 3.2 JSON Serialization Options

To configure RabbitFlow in your application, use the AddRabbitFlow method:

opt.ConfigureJsonSerializerOptions(jsonSettings =>
{
    jsonSettings.PropertyNameCaseInsensitive = true;
    jsonSettings.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
    jsonSettings.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
});

Configuring JSON Serialization Options is optional, if no configuration is provided, the default options will be used.

  • 3.3 Publisher Options

Optionally, you may configure the publisher that you intend to use by defining the 'DisposePublisherConnection' variable. This variable determines whether the connection established by the publisher with RabbitMQ should be kept active or terminated upon the completion of the process.

Furthermore, the 'PublishAsync' method of the publisher offers an optional parameter named 'useNewConnection,' which defaults to 'false.' This parameter allows for the explicit control of whether a new connection should be established, irrespective of the global configuration set for the publisher. It is essential to note that the same publisher can be employed to transmit messages to different queues, each with its unique configuration settings.

 opt.ConfigurePublisher(publisherSettings => publisherSettings.DisposePublisherConnection = true);

4. Consumers

  • 4.1 Creating Consumers

Define and configure consumers for specific queues using the AddConsumer method:

opt.AddConsumer("email-queue", consumerSettings =>
{
    consumerSettings.AutoAck = true;
    consumerSettings.PrefetchCount = 1;
    // ... other settings ...
    consumerSettings.SetConsumerHandler<EmailConsumer>();
});
Consumer Settings
- AutoAck: Automatically acknowledge messages after processing.
- PrefetchCount: Limit unacknowledged messages to consume at a time.
- Timeout: Set maximum processing time for each message.
4.2 Retry Policies

Configure a retry policy for handling message processing failures:

consumerSettings.ConfigureRetryPolicy<EmailConsumer>(retryPolicy =>
{
    retryPolicy.MaxRetryCount = 3;
    retryPolicy.RetryInterval = 1000;
    retryPolicy.ExponentialBackoff = true;
    retryPolicy.ExponentialBackoffFactor = 2;
});

4.3 Consumer Interface
// Consumers must implement the IRabbitFlowConsumer<TEvent> interface:

public interface IRabbitFlowConsumer<TEvent>
{
    Task HandleAsync(TEvent message, CancellationToken cancellationToken);
}

5. Middleware

Use the UseConsumer middleware to efficiently process messages using registered consumers:

app.UseConsumer<EmailEvent, EmailConsumer>();

app.UseConsumer<WhatsAppEvent, WhatsAppConsumer>(opt =>
{
    opt.PerMessageInstance = true;
    opt.Active = false;
});
Middleware Options

Use the ConsumerRegisterSettings class to configure the middleware:

  • Active: If set to true, the middleware will be enabled. If set to false, the middleware will be disabled, meaning the consumer won't process any message from the queue.
  • PerMessageInstance: If set to true, a new instance of the consumer service handler will be created for each message. If set to false, the same instance of the service will be used for all messages.

6. Publishing Messages

Publisher Interface

Use the IRabbitFlowPublisher interface to publish messages to a RabbitMQ exchange:

public interface IRabbitFlowPublisher
{
    Task PublishAsync<T>(T message, string exchangeName, string routingKey, bool useNewConnection = false);
}

7. Queue State

Queue State Interface

The IRabbitFlowState interface provides methods to query queue status:

public interface IRabbitFlowState
{
    bool IsEmptyQueue(string queueName);
    uint GetQueueLength(string queueName);
}

8. Configure Temporary Queues

RabbitFlow's temporary queues functionality offers a convenient way to manage short-lived queues in RabbitMQ. Temporary queues are ideal for scenarios where you need to exchange messages efficiently between components without the need for long-term queue management. With RabbitFlow, you can easily publish events to these queues and establish temporary consumer handlers for efficient message consumption. This feature simplifies the process of handling transient message exchanges, allowing you to focus on your application's core functionality.

To use temporary queues, you must first configure the RabbitFlow host as mentioned before. Then you can use the interface IRabbitFlowTemporaryQueue.

The IRabbitFlowTemporaryQueue interface represents a service for managing temporary RabbitMQ queues and handling message publishing and consumption. This interface defines methods that allow you to publish events to queues and set up temporary consumer handlers.

Methods

1. PublishAsync<TEvent>(string queueName, TEvent @event, CancellationToken cancellationToken)

Publishes an event to a specified queue asynchronously.

  • queueName: The name of the queue to publish the event to.
  • event: The event to publish.
  • cancellationToken: A cancellation token to cancel the operation.
2. void SetTemporaryConsumerHandler<TEvent>(string queueName, TemporaryConsummerSettings settings, Func<TEvent, CancellationToken, Task> temporaryConsumerHandler = default!);

Sets up a temporary consumer handler for the specified queue, allowing message consumption.

  • queueName: The name of the queue to consume from.
  • settings: Temporary consumer settings.
  • temporaryConsumerHandler: The handler to process received events.

TemporaryConsummerSettings parameter contains the following properties:

  • PrefetchCount: Gets or sets the number of messages that the consumer can prefetch.
  • Timeout: Gets or sets the timeout duration for processing a single message.

temporaryConsumerHandler Is a function that takes the following parameters:

  • event: The event to process.
  • cancellationToken: A cancellation token to cancel the operation.

Check the example below for more details.

8.1 Temporary Queues Lifecycle

Temporary queues are created when you define a temporary consumer handler, and they are deleted when the service is manually disposed or automatically when it detects that in a max period of 5 minutes, no message was sent to the queue.

This means that you can handle the disposal of the service manually or let it dispose by itself.

9. Examples

9.1 Basic Consumer Setup
opt.AddConsumer("whatsapp-queue", consumerSettings =>
{
    consumerSettings.SetConsumerHandler<WhatsAppConsumer>();
});

Middleware Usage

app.UseConsumer<EmailEvent, EmailConsumer>();
9.2 Consumer Implementation
public class EmailConsumer : IRabbitFlowConsumer<EmailEvent>
{
	public async Task HandleAsync(EmailEvent message, CancellationToken cancellationToken)
	{
		// Process message
	}
}
9.3 Set Temporary Queues
public class TemporaryQueueConsumer
{
   
    private readonly IRabbitFlowTemporaryQueue _rabbitFlowTemporary;
    private readonly ISomeTranscientService _someTranscientService;
    private readonly ILogger<TemporaryQueueConsumer> _logger;
    private int totalMessages = 0;
    private int processedMessages = 0;

    public TemporaryQueueConsumer(IRabbitFlowTemporaryQueue rabbitFlowTemporary, ISomeTranscientService someTranscientService, ILogger<TemporaryQueueConsumer> logger)
    {
        _rabbitFlowTemporary = rabbitFlowTemporary;
        _someTranscientService = someTranscientService;

        //setting up a temporary consumer handler for the queue "long-task-queue"
        _rabbitFlowTemporary.SetTemporaryConsumerHandler<LongTaskItem>("long-task-queue", new TemporaryConsummerSettings { PrefetchCount = 1 }, ConsumerHandler);
        
        _logger = logger;
    }
}

In the TemporaryQueueConsumer class constructor, an instance of IRabbitFlowTemporaryQueue is injected, allowing you to interact with temporary queues.

The SetTemporaryConsumerHandler method is used to establish a temporary consumer handler for the "long-task-queue".

The consumer handler method ConsumerHandler is defined later in the example.

Consumer Handler
private async Task ConsumerHandler(LongTaskItem longTaskItem, CancellationToken cancellation)
{
    LongTimeOperation(longTaskItem);

    processedMessages++;

    if (processedMessages == totalMessages)
    {
        _rabbitFlowTemporary.Dispose();
    }

    await Task.CompletedTask;
}

The ConsumerHandler method processes received messages from the "long-task-queue". In this example, a long operation LongTimeOperation is simulated. Once all messages are processed, the temporary consumer is manually disposed.

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.1 is compatible. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.1 86 4/26/2024
1.0.0 134 10/3/2023