ksqlDb.RestApi.Client.ProtoBuf 2.0.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package ksqlDb.RestApi.Client.ProtoBuf --version 2.0.0                
NuGet\Install-Package ksqlDb.RestApi.Client.ProtoBuf -Version 2.0.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="ksqlDb.RestApi.Client.ProtoBuf" Version="2.0.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add ksqlDb.RestApi.Client.ProtoBuf --version 2.0.0                
#r "nuget: ksqlDb.RestApi.Client.ProtoBuf, 2.0.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install ksqlDb.RestApi.Client.ProtoBuf as a Cake Addin
#addin nuget:?package=ksqlDb.RestApi.Client.ProtoBuf&version=2.0.0

// Install ksqlDb.RestApi.Client.ProtoBuf as a Cake Tool
#tool nuget:?package=ksqlDb.RestApi.Client.ProtoBuf&version=2.0.0                

This package generates KSQL push and pull queries from your .NET C# LINQ queries. You can filter, project, limit, etc. your push notifications server side with ksqlDB push queries. You can continually process computations over unbounded (theoretically never-ending) streams of data. It also allows you to execute SQL statements via the REST API such as inserting records into streams and creating tables, types, etc. or execute admin operations such as listing streams.

ksqlDB.RestApi.Client is a contribution to Confluent ksqldb-clients

main

Install with NuGet package manager:

Install-Package ksqlDB.RestApi.Client

or with .NET CLI

dotnet add package ksqlDB.RestApi.Client

This adds a <PackageReference> to your csproj file, similar to the following:

<PackageReference Include="ksqlDB.RestApi.Client" Version="3.0.0" />

Alternative option is to use Protobuf content type:

dotnet add package ksqlDB.RestApi.Client.ProtoBuf

The following example can be tried out with a .NET interactive Notebook:

using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.Query.Options;

var ksqlDbUrl = @"http:\\localhost:8088";

var contextOptions = new KSqlDBContextOptions(ksqlDbUrl)
{
  ShouldPluralizeFromItemName = true
};

await using var context = new KSqlDBContext(contextOptions);

using var disposable = context.CreateQueryStream<Tweet>()
  .WithOffsetResetPolicy(AutoOffsetReset.Latest)
  .Where(p => p.Message != "Hello world" || p.Id == 1)
  .Select(l => new { l.Message, l.Id })
  .Take(2)
  .Subscribe(tweetMessage =>
  {
    Console.WriteLine($"{nameof(Tweet)}: {tweetMessage.Id} - {tweetMessage.Message}");
  }, error => { Console.WriteLine($"Exception: {error.Message}"); }, () => Console.WriteLine("Completed"));

Console.WriteLine("Press any key to stop the subscription");

Console.ReadKey();

public class Tweet : Record
{
  public int Id { get; set; }

  public string Message { get; set; }
}

An entity class in ksqlDB.RestApi.Client represents the structure of a table or stream. An instance of the class represents a record in that stream while properties are mapped to columns respectively.

LINQ code written in C# from the sample is equivalent to this KSQL query:

SELECT Message, Id
  FROM Tweets
 WHERE Message != 'Hello world' OR Id = 1 
  EMIT CHANGES 
 LIMIT 2;

In the above mentioned code snippet everything runs server side except of the IQbservable<TEntity>.Subscribe method. It subscribes to your ksqlDB stream created in the following manner:

using ksqlDB.RestApi.Client.KSql.RestApi.Http;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.Api.Client.Samples.Models;

EntityCreationMetadata metadata = new()
{
  KafkaTopic = nameof(Tweet),
  Partitions = 3,
  Replicas = 3
};

var httpClient = new HttpClient()
{
  BaseAddress = new Uri(@"http:\\localhost:8088")
};

var httpClientFactory = new HttpClientFactory(httpClient);
var restApiClient = new KSqlDbRestApiClient(httpClientFactory);

var httpResponseMessage = await restApiClient.CreateOrReplaceStreamAsync<Tweet>(metadata);

CreateOrReplaceStreamAsync executes the following statement:

CREATE OR REPLACE STREAM Tweets (
	Id INT,
	Message VARCHAR
) WITH ( KAFKA_TOPIC='Tweet', VALUE_FORMAT='Json', PARTITIONS='3', REPLICAS='3' );

Run the following insert statements to publish some messages with your ksqldb-cli

docker exec -it $(docker ps -q -f name=ksqldb-cli) ksql http://ksqldb-server:8088
INSERT INTO tweets (id, message) VALUES (1, 'Hello world');
INSERT INTO tweets (id, message) VALUES (2, 'ksqlDB rulez!');

or insert a record from C#:

var responseMessage = await new KSqlDbRestApiClient(httpClientFactory)
  .InsertIntoAsync(new Tweet { Id = 2, Message = "ksqlDB rulez!" });

or with KSqlDbContext:

await using var context = new KSqlDBContext(ksqlDbUrl);

context.Add(new Tweet { Id = 1, Message = "Hello world" });
context.Add(new Tweet { Id = 2, Message = "ksqlDB rulez!" });

var saveChangesResponse = await context.SaveChangesAsync();

Sample projects can be found under Samples solution folder in ksqlDB.RestApi.Client.sln

External dependencies:

Clone the repository

git clone https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet.git

CD to Samples

CD Samples\ksqlDB.RestApi.Client.Sample\

run in command line:

docker compose up -d

AspNet Blazor server side sample:

  • set docker-compose.csproj as startup project in InsideOut.sln for an embedded Kafka connect integration and stream processing examples.

IQbservable<T> extension methods

As depicted bellow IObservable<T> is the dual of IEnumerable<T> and IQbservable<T> is the dual of IQueryable<T>. In all four cases LINQ providers are using deferred execution. While the first two are executed locally the latter two are executed server side. The server side execution is possible thanks to traversing ASTs (Abstract Syntax Trees) with visitors. The KSqlDbProvider will create the KSQL syntax for you from expression trees and pass it along to ksqlDB.

<img src="https://www.codeproject.com/KB/cs/646361/WhatHowWhere.jpg" />

List of supported push query extension methods:

Setting query parameters

Default settings: 'auto.offset.reset' is set to 'earliest' by default. New parameters could be added or existing ones changed in the following manner:

var contextOptions = new KSqlDBContextOptions(@"http:\\localhost:8088");

contextOptions.QueryStreamParameters["auto.offset.reset"] = "latest";

Overriding stream names

Stream names are generated based on the generic record types. They are pluralized with Pluralize.NET package.

By default the generated from item names such as stream and table names are pluralized. This behavior could be switched off with the following ShouldPluralizeStreamName configuration.

context.CreateQueryStream<Person>();
FROM People

This can be disabled:

var contextOptions = new KSqlDBContextOptions(@"http:\\localhost:8088")
{
  ShouldPluralizeFromItemName = false
};

new KSqlDBContext(contextOptions).CreateQueryStream<Person>();
FROM Person

Setting an arbitrary stream name (from_item name):

context.CreateQueryStream<Tweet>("custom_topic_name");
FROM custom_topic_name

Register the KSqlDbContext

IKSqlDBContext and IKSqlDbRestApiClient can be provided with dependency injection. These services can be registered during app startup and components that require these services, are provided with these services via constructor parameters.

To register KsqlDbContext as a service, open Program.cs, and add the lines to the ConfigureServices method shown bellow or see some more details in the workshop:

using ksqlDB.RestApi.Client.Sensors;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
      var ksqlDbUrl = @"http:\\localhost:8088";

      services.AddDbContext<ISensorsKSqlDbContext, SensorsKSqlDbContext>(
        options =>
        {
          var setupParameters = options.UseKSqlDb(ksqlDbUrl);

          setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);

        }, ServiceLifetime.Transient, restApiLifetime: ServiceLifetime.Transient);
    })
    .Build();

await host.RunAsync();

Aggregation functions

List of supported ksqldb aggregation functions:

Rest API reference

Some KSql function examples can be found here

List of supported data types:

List of supported Joins:

List of supported pull query extension methods:

List of supported ksqlDB SQL statements:

KSqlDbContext

Config

Operators

Data definitions

Miscelenaous

Functions

LinqPad samples

Push Query

Pull Query

Nuget

https://www.nuget.org/packages/ksqlDB.RestApi.Client/

Scalar functions

Aggregation functions

Push query

Acknowledgements:

"Buy Me A Coffee"

Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 is compatible.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
4.0.0 124 5/3/2024
3.0.0 2,412 11/17/2023
2.0.0 292 3/2/2023
2.0.0-rc.1 413 2/14/2023
1.0.0 284 1/8/2023
1.0.0-rc.1 195 12/30/2022
0.1.0 389 8/17/2022
0.1.0-rc.1 435 8/5/2022