LanguageExt.Streaming
5.0.0-beta-54
dotnet add package LanguageExt.Streaming --version 5.0.0-beta-54
NuGet\Install-Package LanguageExt.Streaming -Version 5.0.0-beta-54
<PackageReference Include="LanguageExt.Streaming" Version="5.0.0-beta-54" />
<PackageVersion Include="LanguageExt.Streaming" Version="5.0.0-beta-54" />
<PackageReference Include="LanguageExt.Streaming" />
paket add LanguageExt.Streaming --version 5.0.0-beta-54
#r "nuget: LanguageExt.Streaming, 5.0.0-beta-54"
#:package LanguageExt.Streaming@5.0.0-beta-54
#addin nuget:?package=LanguageExt.Streaming&version=5.0.0-beta-54&prerelease
#tool nuget:?package=LanguageExt.Streaming&version=5.0.0-beta-54&prerelease
The Streaming library of language-ext is all about compositional streams. There are two key types of streaming
functionality: closed-streams and open-streams...
Closed streams
Closed streams are facilitated by the Pipes system. The types in the Pipes system are compositional
monad-transformers that 'fuse' together to produce an EffectT<M, A>. This effect is a closed system,
meaning that there is no way (from the API) to directly interact with the effect from the outside: it can be executed
and will return a result if it terminates.
The pipeline components are:
These are the components that fuse together (using the | operator) to make an EffectT<M, A>. The
types are monad-transformers that support lifting monads with the MonadIO trait only (which constrains M). This
makes sense, otherwise the closed-system would have no effect other than heating up the CPU.
There are also more specialised versions of the above that only support the lifting of the Eff<RT, A> effect-monad:
They all fuse together into an Effect<RT, A>.
Pipes are especially useful if you want to build reusable streaming components that you can glue together ad infinitum. Pipes are, arguably, less useful for day-to-day stream processing, like handling events, but your mileage may vary.
More details on the Pipes page.
Open streams
Open streams are closer to what most C# devs have used classically. They are like events or IObservable streams.
They yield values and (under certain circumstances) accept inputs.
SourceandSourceTyield values synchronously or asynchronously depending on their construction.SinkandSinkTreceives values and propagates them through the channel they're attached to.ConduitandConduitTprovides and input transducer (acts like aSink), an internal buffer, and an output transducer (acts like aSource).
I'm calling these 'open streams' because we can
Postvalues to aSink/SinkTand we canReducevalues yielded bySource/SourceT. So, they are 'open' for public manipulation, unlikePipeswhich fuse the public access away.
Source
Source<A> is the 'classic stream': you can lift any of the following types into it: System.Threading.Channels.Channel<A>,
IEnumerable<A>, IAsyncEnumerable<A>, or singleton values. To process a stream, you need to use one of the Reduce
or ReduceAsync variants. These take Reducer delegates as arguments. They are essentially a fold over the stream of
values, which results in an aggregated state once the stream has completed. These reducers can be seen to play a similar
role to Subscribe in IObservable streams, but are more principled because they return a value (which we can leverage
to carry state for the duration of the stream).
Source also supports some built-in reducers:
Last- aggregates no state, simply returns the last item yieldedIter- this forces evaluation of the stream, aggregating no state, and ignoring all yielded values.Collect- adds all yielded values to aSeq<A>, which is then returned upon stream completion.
SourceT
SourceT<M, A> is the classic-stream embellished - it turns the stream into a monad-transformer that can
lift any MonadIO-enabled monad (M), allowing side effects to be embedded into the stream in a principled way.
So, for example, to use the IO<A> monad with SourceT, simply use: SourceT<IO, A>. Then you can use one of the
following static methods on the SourceT type to lift IO<A> effects into a stream:
SourceT.liftM(IO<A> effect)creates a singleton-streamSourceT.foreverM(IO<A> effect)creates an infinite stream, repeating the same effect over and overSourceT.liftM(Channel<IO<A>> channel)lifts aSystem.Threading.Channels.Channelof effectsSourceT.liftM(IEnumerable<IO<A>> effects)lifts anIEnumerableof effectsSourceT.liftM(IAsyncEnumerable<IO<A>> effects)lifts anIAsyncEnumerableof effects
Obviously, when lifting non-
IOmonads, the types above change.
SourceT also supports the same built-in convenience reducers as Source (Last, Iter, Collect).
Sink
Sink<A> provides a way to accept many input values. The values are buffered until consumed. The sink can be
thought of as a System.Threading.Channels.Channel (which is the buffer that collects the values) that happens to
manipulate the values being posted to the buffer just before they are stored.
This manipulation is possible because the
Sinkis aCoFunctor(contravariant functor). This is the dual ofFunctor: we can think ofFunctor.Mapas converting a value fromA -> B. WhereasCoFunctor.Comapconverts fromB -> A.
So, to manipulate values coming into the Sink, use Comap. It will give you a new Sink with the manipulation 'built-in'.
SinkT
SinkT<M, A> provides a way to accept many input values. The values are buffered until consumed. The sink can
be thought of as a System.Threading.Channels.Channel (which is the buffer that collects the values) that happens to
manipulate the values being posted to the buffer just before they are stored.
This manipulation is possible because the
SinkTis aCoFunctor(contravariant functor). This is the dual ofFunctor: we can think ofFunctor.Mapas converting a value fromA -> B. WhereasCoFunctor.Comapconverts fromB -> A.
So, to manipulate values coming into the SinkT, use Comap. It will give you a new SinkT with the manipulation 'built-in'.
SinkT is also a transformer that lifts types of K<M, A>.
Conduit
Conduit<A, B> can be pictured as so:
+----------------------------------------------------------------+
| |
| A --> Transducer --> X --> Buffer --> X --> Transducer --> B |
| |
+----------------------------------------------------------------+
- A value of
Ais posted to theConduit(viaPost) - It flows through an input
Transducer, mapping theAvalue toX(an internal type you can't see) - The
Xvalue is then stored in the conduit's internal buffer (aSystem.Threading.Channels.Channel) - Any invocation of
Reducewill force the consumption of the values in the buffer - Flowing each value
Xthrough the outputTransducer
So the input and output transducers allow for pre and post-processing of values as they flow through the conduit.
Conduit is a CoFunctor, call Comap to manipulate the pre-processing transducer. Conduit is also a Functor, call
Map to manipulate the post-processing transducer. There are other non-trait, but common behaviours, like FoldWhile,
Filter, Skip, Take, etc.
Conduitsupports access to aSinkand aSourcefor more advanced processing.
ConduitT
ConduitT<M, A, B> can be pictured as so:
+------------------------------------------------------------------------------------------+
| |
| K<M, A> --> TransducerM --> K<M, X> --> Buffer --> K<M, X> --> TransducerM --> K<M, B> |
| |
+------------------------------------------------------------------------------------------+
- A value of
K<M, A>is posted to theConduit(viaPost) - It flows through an input
TransducerM, mapping theK<M, A>value toK<M, X>(an internal type you can't see) - The
K<M, X>value is then stored in the conduit's internal buffer (aSystem.Threading.Channels.Channel) - Any invocation of
Reducewill force the consumption of the values in the buffer - Flowing each value
K<M, A>through the outputTransducerM
So the input and output transducers allow for pre and post-processing of values as they flow through the conduit.
ConduitT is a CoFunctor, call Comap to manipulate the pre-processing transducer. Conduit is also a Functor, call
Map to manipulate the post-processing transducer. There are other non-trait, but common behaviours, like FoldWhile,
Filter, Skip, Take, etc.
ConduitTsupports access to aSinkTand aSourceTfor more advanced processing.
Open to closed streams
Clearly, even for 'closed systems' like the Pipes system, it would be beneficial to be able to post values
into the streams from the outside. And so, the open-stream components can all be converted into Pipes components
like ProducerT and ConsumerT.
ConduitandConduitTsupportToProducer,ToProducerT,ToConsumer, andToConsumerT.SinkandSinkTsupportsToConsumer, andToConsumerT.SourceandSourceTsupportsToProducer, andToProducerT.
This allows for the ultimate flexibility in your choice of streaming effect. It also allows for efficient concurrency in
the more abstract and compositional world of the pipes. In fact ProducerT.merge, which merges many streams into one,
uses ConduitT internally to collect the values and to merge them into a single ProducerT.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- LanguageExt.Core (>= 5.0.0-beta-54)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on LanguageExt.Streaming:
| Package | Downloads |
|---|---|
|
LanguageExt.Parsec
Parser combinators library based on Haskell Parsec. This is part of the LanguageExt functional framework and requires LanguageExt.Core |
|
|
LanguageExt.Sys
Extensions to language-ext framework effects system that wraps the IO behaviours from the .NET BCL |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 5.0.0-beta-54 | 3,338 | 5/8/2025 |
| 5.0.0-beta-53 | 182 | 5/7/2025 |
| 5.0.0-beta-52 | 324 | 4/30/2025 |
| 5.0.0-beta-51 | 415 | 4/23/2025 |