TurboMqtt 0.1.1

There is a newer version of this package available.
See the version list below for details.
dotnet add package TurboMqtt --version 0.1.1                
NuGet\Install-Package TurboMqtt -Version 0.1.1                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="TurboMqtt" Version="0.1.1" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add TurboMqtt --version 0.1.1                
#r "nuget: TurboMqtt, 0.1.1"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install TurboMqtt as a Cake Addin
#addin nuget:?package=TurboMqtt&version=0.1.1

// Install TurboMqtt as a Cake Tool
#tool nuget:?package=TurboMqtt&version=0.1.1                

TurboMqtt

TurboMqtt is a high-speed Message Queue Telemetry Transport (MQTT) client designed to support large-scale IOT workloads, handling over 100k msg/s from any MQTT 3.1.1+ broker.

TurboMqtt logo

TurboMqtt is written on top of Akka.NET and Akka.Streams, which is the secret to its efficient use of resources and high throughput.

Key Features

  • MQTT 3.1.1 support;
  • Extremely high performance - hundreds of thousands of messages per second;
  • Extremely resource-efficient - pools memory and leverages asynchronous I/O best practices;
  • Extremely robust fault tolerance - this is one of Akka.NET's great strengths and we've leveraged it in TurboMqtt;
  • Supports all MQTT quality of service levels, with automatic publishing retries for QoS 1 and 2;
  • Full OpenTelemetry support;
  • Automatic retry-reconnect in broker disconnect scenarios;
  • Full support for IAsyncEnumerable and backpressure on the receiver side;
  • Automatically de-duplicates packets on the receiver side; and
  • Automatically acks QoS 1 and QoS 2 packets.

Simple interface that works at very high rates of speed with minimal resource utilization.

Documentation

  1. QuickStart
  2. Performance
  3. OpenTelemetry Support
  4. MQTT 3.1.1 Roadmap
  5. MQTT 5.0 Roadmap
  6. MQTT over Quic Roadmap

QuickStart

To get started with TurboMqtt:

dotnet add package TurboMqtt

And from there, you can call AddTurboMqttClientFactory on your IServiceCollection:

var builder = new HostBuilder();

builder
    .ConfigureAppConfiguration(configBuilder =>
    {
        configBuilder
            .AddJsonFile("appsettings.json", optional: false);
    })
    .ConfigureLogging(logging =>
    {
        logging.ClearProviders();
        logging.AddConsole();
    })
    .ConfigureServices(s =>
    {
        s.AddTurboMqttClientFactory();

        // HostedService is going to use TurboMqtt
        s.AddHostedService<MqttProducerService>();
        
    });

var host = builder.Build();

await host.RunAsync();

And inject IMqttClientFactory into your ASP.NET Controllers, SignalR Hubs, gRPC services, IHostedServices, etc and create IMqttClient instances:

var tcpClientOptions = new MqttClientTcpOptions(config.Host, config.Port);
var clientConnectOptions = new MqttClientConnectOptions(config.ClientId, MqttProtocolVersion.V3_1_1)
{
    UserName = config.User,
    Password = config.Password
};

await using IMqttClient client = await _clientFactory.CreateTcpClient(clientConnectOptions, tcpClientOptions);

// connect to the broker
var connectResult = await client.ConnectAsync(linkedCts.Token);
            
if (!connectResult.IsSuccess)
{
    _logger.LogError("Failed to connect to MQTT broker at {0}:{1} - {2}", config.Host, config.Port,
        connectResult.Reason);
    return;
}

Publishing Messages

Publishing messages with TurboMqtt is easy:

foreach (var i in Enumerable.Range(0, config.MessageCount))
{
    var msg = new MqttMessage(config.Topic, CreatePayload(i, TargetMessageSize.EightKb))
                {
                    QoS = QualityOfService.AtLeastOnce
                };

    IPublishResult publishResult = await client.PublishAsync(msg, stoppingToken);
    if(i % 1000 == 0)
    {
        _logger.LogInformation("Published {0} messages", i);
    }
}

The IPublishResult.IsSuccess property will return true when:

  1. QualityOfService.AtMostOnce (QoS 0) - as soon as the message has queued for delivery;
  2. QualityOfService.AtLeastOnce (QoS 1) - after we've received a PubAck from the broker, confirming receipt; and
  3. QualityOfService.ExactlyOnce (QoS 2) - after we've completed the full MQTT QoS 2 exchange and received the final PubComp acknowledgement from the broker.

TurboMqtt will automatically retry delivery of messages in the event of overdue ACKs from the broker.

Receiving Messages

TurboMqtt is backpressure-aware and thus exposes the stream of received MqttMessages to consumers via System.Threading.Channel<MqttMessage>:

ISubscribeResult subscribeResult = await client.SubscribeAsync(config.Topic, config.QoS, linkedCts.Token);
if (!subscribeResult.IsSuccess)
{
    _logger.LogError("Failed to subscribe to topic {0} - {1}", config.Topic, subscribeResult.Reason);
    return;
}

_logger.LogInformation("Subscribed to topic {0}", config.Topic);

var received = 0;
ChannelRead<MqttMessage> receivedMessages = client.ReceivedMessages;
while (await receivedMessages.WaitToReadAsync(stoppingToken))
{
    while (receivedMessages.TryRead(out MqttMessage m))
    {    
    	_logger.LogInformation("Received message [{0}] for topic [{1}]", m.Payload,  m.Topic);
    }
}

If we've subscribed using QualityOfService.AtMostOnce or QualityOfService.ExactlyOnce, TurboMqtt has already fully ACKed the message for you by the time you receive it and we use a per-topic de-duplication buffer to detect and remove duplicates.

Licensing

You do not need a license for evaluation, development, test environments, or personal projects. Feel free to install TurboMqtt from NuGet.org if that's your situation.

However, if you are a for-profit company earning in excess of $1m USD annually or a non-profit with a budget in excess of $1m USD annually, you will need to purchase an annual license to TurboMqtt. Licenses include support and troubleshooting.

You can purchase a license and read our full commerical license terms here: https://sdkbin.com/publisher/petabridge/product/turbomqtt#license

Support

To get support with TurboMqtt, either fill out the help form on Sdkbin or file an issue on the TurboMqtt repository.

TurboMqtt developed and maintained by Petabridge, the company behind Akka.NET.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.2.0 388 6/17/2024
0.1.1 379 5/2/2024
0.1.0 106 5/1/2024
0.1.0-beta3 115 4/26/2024

TurboMqtt v0.1.1 includes critical bug fixes and massive performance improvements over v0.1.0.

**Bug Fixes and Improvements**

* [Fixed QoS=1 packet handling - was previously treating it like QoS=2](https://github.com/petabridge/TurboMqtt/pull/103).
* [Improved flow control inside `ClientAckHandler`](https://github.com/petabridge/TurboMqtt/pull/105) - result is a massive performance improvement when operating at QoS 1 and 2.
* [Fix OpenTelemetry `TagList` for clientId and MQTT version](https://github.com/petabridge/TurboMqtt/pull/104) - now we can accurately track metrics per clientId via OpenTelemetry.

**Performance**

```

BenchmarkDotNet v0.13.12, Windows 11 (10.0.22631.3447/23H2/2023Update/SunValley3)
12th Gen Intel Core i7-1260P, 1 CPU, 16 logical and 12 physical cores
.NET SDK 8.0.101
 [Host]     : .NET 8.0.1 (8.0.123.58001), X64 RyuJIT AVX2
 Job-FBXRHG : .NET 8.0.1 (8.0.123.58001), X64 RyuJIT AVX2

InvocationCount=1  LaunchCount=10  RunStrategy=Monitoring  
UnrollFactor=1  WarmupCount=10  

```
| Method                    | QoSLevel    | PayloadSizeBytes | ProtocolVersion | Mean      | Error     | StdDev   | Median    | Req/sec    |
|-------------------------- |------------ |----------------- |---------------- |----------:|----------:|---------:|----------:|-----------:|
| **PublishAndReceiveMessages** | **AtMostOnce**  | **10**               | **V3_1_1**          |  **5.175 μs** | **0.6794 μs** | **2.003 μs** |  **4.345 μs** | **193,230.35** |
| **PublishAndReceiveMessages** | **AtLeastOnce** | **10**               | **V3_1_1**          | **26.309 μs** | **1.4071 μs** | **4.149 μs** | **25.906 μs** |  **38,010.35** |
| **PublishAndReceiveMessages** | **ExactlyOnce** | **10**               | **V3_1_1**          | **44.501 μs** | **2.2778 μs** | **6.716 μs** | **42.175 μs** |  **22,471.53** |


[Learn more about TurboMqtt's performance figures here](https://github.com/petabridge/TurboMqtt/blob/dev/docs/Performance.md).