TurboMqtt 0.1.1
See the version list below for details.
dotnet add package TurboMqtt --version 0.1.1
NuGet\Install-Package TurboMqtt -Version 0.1.1
<PackageReference Include="TurboMqtt" Version="0.1.1" />
paket add TurboMqtt --version 0.1.1
#r "nuget: TurboMqtt, 0.1.1"
// Install TurboMqtt as a Cake Addin #addin nuget:?package=TurboMqtt&version=0.1.1 // Install TurboMqtt as a Cake Tool #tool nuget:?package=TurboMqtt&version=0.1.1
TurboMqtt
TurboMqtt is a high-speed Message Queue Telemetry Transport (MQTT) client designed to support large-scale IOT workloads, handling over 100k msg/s from any MQTT 3.1.1+ broker.
TurboMqtt is written on top of Akka.NET and Akka.Streams, which is the secret to its efficient use of resources and high throughput.
Key Features
- MQTT 3.1.1 support;
- Extremely high performance - hundreds of thousands of messages per second;
- Extremely resource-efficient - pools memory and leverages asynchronous I/O best practices;
- Extremely robust fault tolerance - this is one of Akka.NET's great strengths and we've leveraged it in TurboMqtt;
- Supports all MQTT quality of service levels, with automatic publishing retries for QoS 1 and 2;
- Full OpenTelemetry support;
- Automatic retry-reconnect in broker disconnect scenarios;
- Full support for IAsyncEnumerable and backpressure on the receiver side;
- Automatically de-duplicates packets on the receiver side; and
- Automatically acks QoS 1 and QoS 2 packets.
Simple interface that works at very high rates of speed with minimal resource utilization.
Documentation
- QuickStart
- Performance
- OpenTelemetry Support
- MQTT 3.1.1 Roadmap
- MQTT 5.0 Roadmap
- MQTT over Quic Roadmap
QuickStart
To get started with TurboMqtt:
dotnet add package TurboMqtt
And from there, you can call AddTurboMqttClientFactory
on your IServiceCollection
:
var builder = new HostBuilder();
builder
.ConfigureAppConfiguration(configBuilder =>
{
configBuilder
.AddJsonFile("appsettings.json", optional: false);
})
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddConsole();
})
.ConfigureServices(s =>
{
s.AddTurboMqttClientFactory();
// HostedService is going to use TurboMqtt
s.AddHostedService<MqttProducerService>();
});
var host = builder.Build();
await host.RunAsync();
And inject IMqttClientFactory
into your ASP.NET Controllers, SignalR Hubs, gRPC services, IHostedService
s, etc and create IMqttClient
instances:
var tcpClientOptions = new MqttClientTcpOptions(config.Host, config.Port);
var clientConnectOptions = new MqttClientConnectOptions(config.ClientId, MqttProtocolVersion.V3_1_1)
{
UserName = config.User,
Password = config.Password
};
await using IMqttClient client = await _clientFactory.CreateTcpClient(clientConnectOptions, tcpClientOptions);
// connect to the broker
var connectResult = await client.ConnectAsync(linkedCts.Token);
if (!connectResult.IsSuccess)
{
_logger.LogError("Failed to connect to MQTT broker at {0}:{1} - {2}", config.Host, config.Port,
connectResult.Reason);
return;
}
Publishing Messages
Publishing messages with TurboMqtt is easy:
foreach (var i in Enumerable.Range(0, config.MessageCount))
{
var msg = new MqttMessage(config.Topic, CreatePayload(i, TargetMessageSize.EightKb))
{
QoS = QualityOfService.AtLeastOnce
};
IPublishResult publishResult = await client.PublishAsync(msg, stoppingToken);
if(i % 1000 == 0)
{
_logger.LogInformation("Published {0} messages", i);
}
}
The IPublishResult.IsSuccess
property will return true
when:
QualityOfService.AtMostOnce
(QoS 0) - as soon as the message has queued for delivery;QualityOfService.AtLeastOnce
(QoS 1) - after we've received aPubAck
from the broker, confirming receipt; andQualityOfService.ExactlyOnce
(QoS 2) - after we've completed the full MQTT QoS 2 exchange and received the finalPubComp
acknowledgement from the broker.
TurboMqtt will automatically retry delivery of messages in the event of overdue ACKs from the broker.
Receiving Messages
TurboMqtt is backpressure-aware and thus exposes the stream of received MqttMessage
s to consumers via System.Threading.Channel<MqttMessage>
:
ISubscribeResult subscribeResult = await client.SubscribeAsync(config.Topic, config.QoS, linkedCts.Token);
if (!subscribeResult.IsSuccess)
{
_logger.LogError("Failed to subscribe to topic {0} - {1}", config.Topic, subscribeResult.Reason);
return;
}
_logger.LogInformation("Subscribed to topic {0}", config.Topic);
var received = 0;
ChannelRead<MqttMessage> receivedMessages = client.ReceivedMessages;
while (await receivedMessages.WaitToReadAsync(stoppingToken))
{
while (receivedMessages.TryRead(out MqttMessage m))
{
_logger.LogInformation("Received message [{0}] for topic [{1}]", m.Payload, m.Topic);
}
}
If we've subscribed using QualityOfService.AtMostOnce
or QualityOfService.ExactlyOnce
, TurboMqtt has already fully ACKed the message for you by the time you receive it and we use a per-topic de-duplication buffer to detect and remove duplicates.
Licensing
You do not need a license for evaluation, development, test environments, or personal projects. Feel free to install TurboMqtt from NuGet.org if that's your situation.
However, if you are a for-profit company earning in excess of $1m USD annually or a non-profit with a budget in excess of $1m USD annually, you will need to purchase an annual license to TurboMqtt. Licenses include support and troubleshooting.
You can purchase a license and read our full commerical license terms here: https://sdkbin.com/publisher/petabridge/product/turbomqtt#license
Support
To get support with TurboMqtt, either fill out the help form on Sdkbin or file an issue on the TurboMqtt repository.
TurboMqtt developed and maintained by Petabridge, the company behind Akka.NET.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net8.0
- Akka (>= 1.5.20)
- Akka.Streams (>= 1.5.20)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.1)
- OpenTelemetry.Api.ProviderBuilderExtensions (>= 1.8.1)
- System.IO.Pipelines (>= 8.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
0.2.0 | 388 | 6/17/2024 |
0.1.1 | 379 | 5/2/2024 |
0.1.0 | 106 | 5/1/2024 |
0.1.0-beta3 | 115 | 4/26/2024 |
TurboMqtt v0.1.1 includes critical bug fixes and massive performance improvements over v0.1.0.
**Bug Fixes and Improvements**
* [Fixed QoS=1 packet handling - was previously treating it like QoS=2](https://github.com/petabridge/TurboMqtt/pull/103).
* [Improved flow control inside `ClientAckHandler`](https://github.com/petabridge/TurboMqtt/pull/105) - result is a massive performance improvement when operating at QoS 1 and 2.
* [Fix OpenTelemetry `TagList` for clientId and MQTT version](https://github.com/petabridge/TurboMqtt/pull/104) - now we can accurately track metrics per clientId via OpenTelemetry.
**Performance**
```
BenchmarkDotNet v0.13.12, Windows 11 (10.0.22631.3447/23H2/2023Update/SunValley3)
12th Gen Intel Core i7-1260P, 1 CPU, 16 logical and 12 physical cores
.NET SDK 8.0.101
[Host] : .NET 8.0.1 (8.0.123.58001), X64 RyuJIT AVX2
Job-FBXRHG : .NET 8.0.1 (8.0.123.58001), X64 RyuJIT AVX2
InvocationCount=1 LaunchCount=10 RunStrategy=Monitoring
UnrollFactor=1 WarmupCount=10
```
| Method | QoSLevel | PayloadSizeBytes | ProtocolVersion | Mean | Error | StdDev | Median | Req/sec |
|-------------------------- |------------ |----------------- |---------------- |----------:|----------:|---------:|----------:|-----------:|
| **PublishAndReceiveMessages** | **AtMostOnce** | **10** | **V3_1_1** | **5.175 μs** | **0.6794 μs** | **2.003 μs** | **4.345 μs** | **193,230.35** |
| **PublishAndReceiveMessages** | **AtLeastOnce** | **10** | **V3_1_1** | **26.309 μs** | **1.4071 μs** | **4.149 μs** | **25.906 μs** | **38,010.35** |
| **PublishAndReceiveMessages** | **ExactlyOnce** | **10** | **V3_1_1** | **44.501 μs** | **2.2778 μs** | **6.716 μs** | **42.175 μs** | **22,471.53** |
[Learn more about TurboMqtt's performance figures here](https://github.com/petabridge/TurboMqtt/blob/dev/docs/Performance.md).