KafkaGenericProcessor.Core 0.0.2

dotnet add package KafkaGenericProcessor.Core --version 0.0.2
                    
NuGet\Install-Package KafkaGenericProcessor.Core -Version 0.0.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="KafkaGenericProcessor.Core" Version="0.0.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="KafkaGenericProcessor.Core" Version="0.0.2" />
                    
Directory.Packages.props
<PackageReference Include="KafkaGenericProcessor.Core" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add KafkaGenericProcessor.Core --version 0.0.2
                    
#r "nuget: KafkaGenericProcessor.Core, 0.0.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package KafkaGenericProcessor.Core@0.0.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=KafkaGenericProcessor.Core&version=0.0.2
                    
Install as a Cake Addin
#tool nuget:?package=KafkaGenericProcessor.Core&version=0.0.2
                    
Install as a Cake Tool

KafkaGenericProcessor.Core

NOTE: This library is 100% vibe code material, the requirements were wrapping the KafkaFlow library to provide a simplified consume + consumeproduce + produce abilities with an easy simple fluent pattern. this library is NOT production ready, do NOT use it in production, contributions are welcomed, but please vibe code them 🤟

NuGet License: MIT

A robust, type-safe framework for building Kafka message processing applications in .NET. Simplify your Kafka consumer and producer implementation with a comprehensive set of features designed for enterprise applications.

Features

  • Type-Safe Processing: Handle Kafka messages with strongly-typed processors and serializers
  • Middleware Pipeline: Process messages through configurable middleware chains
  • Validation: Built-in support for message validation before processing
  • Error Handling: Comprehensive exception hierarchy with automatic handling
  • Retry Policies: Exponential backoff retry mechanism with configurable parameters
  • Health Checks: Built-in health monitoring for Kafka connectivity
  • Structured Logging: Consistent, correlation-based logging throughout the processing pipeline
  • Performance Metrics: Track processing times and throughput
  • Correlation IDs: Trace messages through the entire processing pipeline

Quick Start

Installation

dotnet add package KafkaGenericProcessor.Core

Basic Configuration

Add the following to your appsettings.json:

{
  "Kafka": {
    "Configurations": {
      "order-processor": {
        "Brokers": ["kafka-broker:9092"],
        "ConsumerTopic": "incoming-orders",
        "ProducerTopic": "processed-orders",
        "GroupId": "order-processing-group",
        "WorkersCount": 10,
        "BufferSize": 100,
        "CreateTopicsIfNotExists": true
      }
    }
  }
}

Register Services

// In Program.cs or Startup.cs
public void ConfigureServices(IServiceCollection services)
{
    // Add required services
    services.AddLogging();
    
    // Register your message processors and validators
    services.AddKeyedTransient<IMessageProcessor<OrderMessage, ProcessedOrderMessage>, OrderProcessor>("order-processor");
    services.AddKeyedTransient<IMessageValidator<OrderMessage>, OrderValidator>("order-processor");
    
    // Configure Kafka Generic Processor
    services
        .AddKafkaGenericProcessors(Configuration)
        .AddConsumerProducerProcessor<OrderMessage, ProcessedOrderMessage>("order-processor")
        .Build();
}

Create Message Types

public class OrderMessage
{
    public string OrderId { get; set; }
    public string CustomerName { get; set; }
    public decimal Amount { get; set; }
}

public class ProcessedOrderMessage
{
    public string OrderId { get; set; }
    public string Status { get; set; }
    public DateTimeOffset ProcessedAt { get; set; }
}

Implement Processor

public class OrderProcessor : IMessageProcessor<OrderMessage, ProcessedOrderMessage>
{
    private readonly ILogger<OrderProcessor> _logger;
    
    public OrderProcessor(ILogger<OrderProcessor> logger)
    {
        _logger = logger;
    }
    
    public async Task<ProcessedOrderMessage> ProcessAsync(
        OrderMessage input, 
        string correlationId, 
        CancellationToken cancellationToken = default)
    {
        _logger.LogInformation(
            "Processing order {OrderId} for customer {CustomerName}. CorrelationId: {CorrelationId}", 
            input.OrderId, input.CustomerName, correlationId);
            
        // Process the order (your business logic here)
        await Task.Delay(100, cancellationToken);
        
        return new ProcessedOrderMessage
        {
            OrderId = input.OrderId,
            Status = "Processed",
            ProcessedAt = DateTimeOffset.UtcNow
        };
    }
}

Implement Validator

public class OrderValidator : IMessageValidator<OrderMessage>
{
    public async Task<bool> ValidateAsync(
        OrderMessage message, 
        string correlationId, 
        CancellationToken cancellationToken = default)
    {
        var errors = await GetValidationErrorsAsync(message, correlationId, cancellationToken);
        return !errors.Any();
    }
    
    public Task<IReadOnlyList<ValidationError>> GetValidationErrorsAsync(
        OrderMessage message, 
        string correlationId, 
        CancellationToken cancellationToken = default)
    {
        var errors = new List<ValidationError>();
        
        if (string.IsNullOrEmpty(message.OrderId))
        {
            errors.Add(new ValidationError("OrderId", "Order ID is required"));
        }
        
        if (string.IsNullOrEmpty(message.CustomerName))
        {
            errors.Add(new ValidationError("CustomerName", "Customer name is required"));
        }
        
        if (message.Amount <= 0)
        {
            errors.Add(new ValidationError("Amount", "Amount must be greater than zero"));
        }
        
        return Task.FromResult<IReadOnlyList<ValidationError>>(errors);
    }
}

Documentation

For more detailed documentation, please refer to:

Contributing

Contributions are welcome! Please see our contributing guidelines for details.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.0.2 228 5/15/2025
0.0.1 222 5/14/2025