TurboMqtt 0.2.0

dotnet add package TurboMqtt --version 0.2.0                
NuGet\Install-Package TurboMqtt -Version 0.2.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="TurboMqtt" Version="0.2.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add TurboMqtt --version 0.2.0                
#r "nuget: TurboMqtt, 0.2.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install TurboMqtt as a Cake Addin
#addin nuget:?package=TurboMqtt&version=0.2.0

// Install TurboMqtt as a Cake Tool
#tool nuget:?package=TurboMqtt&version=0.2.0                

TurboMqtt

TurboMqtt is a high-speed Message Queue Telemetry Transport (MQTT) client designed to support large-scale IOT workloads, handling over 100k msg/s from any MQTT 3.1.1+ broker.

TurboMqtt logo

TurboMqtt is written on top of Akka.NET and Akka.Streams, which is the secret to its efficient use of resources and high throughput.

Key Features

  • MQTT 3.1.1 support;
  • Extremely high performance - hundreds of thousands of messages per second;
  • Extremely resource-efficient - pools memory and leverages asynchronous I/O best practices;
  • Extremely robust fault tolerance - this is one of Akka.NET's great strengths and we've leveraged it in TurboMqtt;
  • Supports all MQTT quality of service levels, with automatic publishing retries for QoS 1 and 2;
  • Full OpenTelemetry support;
  • Automatic retry-reconnect in broker disconnect scenarios;
  • Full support for IAsyncEnumerable and backpressure on the receiver side;
  • Automatically de-duplicates packets on the receiver side; and
  • Automatically acks QoS 1 and QoS 2 packets.

Simple interface that works at very high rates of speed with minimal resource utilization.

Documentation

  1. QuickStart
  2. Performance
  3. OpenTelemetry Support
  4. MQTT 3.1.1 Roadmap
  5. MQTT 5.0 Roadmap
  6. MQTT over Quic Roadmap

QuickStart

To get started with TurboMqtt:

dotnet add package TurboMqtt

And from there, you can call AddTurboMqttClientFactory on your IServiceCollection:

var builder = new HostBuilder();

builder
    .ConfigureAppConfiguration(configBuilder =>
    {
        configBuilder
            .AddJsonFile("appsettings.json", optional: false);
    })
    .ConfigureLogging(logging =>
    {
        logging.ClearProviders();
        logging.AddConsole();
    })
    .ConfigureServices(s =>
    {
        s.AddTurboMqttClientFactory();

        // HostedService is going to use TurboMqtt
        s.AddHostedService<MqttProducerService>();
        
    });

var host = builder.Build();

await host.RunAsync();

And inject IMqttClientFactory into your ASP.NET Controllers, SignalR Hubs, gRPC services, IHostedServices, etc and create IMqttClient instances:

var tcpClientOptions = new MqttClientTcpOptions(config.Host, config.Port);
var clientConnectOptions = new MqttClientConnectOptions(config.ClientId, MqttProtocolVersion.V3_1_1)
{
    UserName = config.User,
    Password = config.Password
};

await using IMqttClient client = await _clientFactory.CreateTcpClient(clientConnectOptions, tcpClientOptions);

// connect to the broker
var connectResult = await client.ConnectAsync(linkedCts.Token);
            
if (!connectResult.IsSuccess)
{
    _logger.LogError("Failed to connect to MQTT broker at {0}:{1} - {2}", config.Host, config.Port,
        connectResult.Reason);
    return;
}

Publishing Messages

Publishing messages with TurboMqtt is easy:

foreach (var i in Enumerable.Range(0, config.MessageCount))
{
    var msg = new MqttMessage(config.Topic, CreatePayload(i, TargetMessageSize.EightKb))
                {
                    QoS = QualityOfService.AtLeastOnce
                };

    IPublishResult publishResult = await client.PublishAsync(msg, stoppingToken);
    if(i % 1000 == 0)
    {
        _logger.LogInformation("Published {0} messages", i);
    }
}

The IPublishResult.IsSuccess property will return true when:

  1. QualityOfService.AtMostOnce (QoS 0) - as soon as the message has queued for delivery;
  2. QualityOfService.AtLeastOnce (QoS 1) - after we've received a PubAck from the broker, confirming receipt; and
  3. QualityOfService.ExactlyOnce (QoS 2) - after we've completed the full MQTT QoS 2 exchange and received the final PubComp acknowledgement from the broker.

TurboMqtt will automatically retry delivery of messages in the event of overdue ACKs from the broker.

Receiving Messages

TurboMqtt is backpressure-aware and thus exposes the stream of received MqttMessages to consumers via System.Threading.Channel<MqttMessage>:

ISubscribeResult subscribeResult = await client.SubscribeAsync(config.Topic, config.QoS, linkedCts.Token);
if (!subscribeResult.IsSuccess)
{
    _logger.LogError("Failed to subscribe to topic {0} - {1}", config.Topic, subscribeResult.Reason);
    return;
}

_logger.LogInformation("Subscribed to topic {0}", config.Topic);

var received = 0;
ChannelRead<MqttMessage> receivedMessages = client.ReceivedMessages;
while (await receivedMessages.WaitToReadAsync(stoppingToken))
{
    while (receivedMessages.TryRead(out MqttMessage m))
    {    
    	_logger.LogInformation("Received message [{0}] for topic [{1}]", m.Payload,  m.Topic);
    }
}

If we've subscribed using QualityOfService.AtMostOnce or QualityOfService.ExactlyOnce, TurboMqtt has already fully ACKed the message for you by the time you receive it and we use a per-topic de-duplication buffer to detect and remove duplicates.

Licensing

TurboMqtt is available under the Apache 2.0 license.

Support

To get support with TurboMqtt, either fill out the help form on Sdkbin or file an issue on the TurboMqtt repository.

TurboMqtt developed and maintained by Petabridge, the company behind Akka.NET.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.2.0 403 6/17/2024
0.1.1 380 5/2/2024
0.1.0 107 5/1/2024
0.1.0-beta3 116 4/26/2024

* License has been migrated to Apache 2.0
* Upgraded to [Akka.NET v1.5.25](https://github.com/akkadotnet/akka.net/releases/tag/1.5.25).