SafeParallelAsync 0.1.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package SafeParallelAsync --version 0.1.0
NuGet\Install-Package SafeParallelAsync -Version 0.1.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SafeParallelAsync" Version="0.1.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add SafeParallelAsync --version 0.1.0
#r "nuget: SafeParallelAsync, 0.1.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install SafeParallelAsync as a Cake Addin
#addin nuget:?package=SafeParallelAsync&version=0.1.0

// Install SafeParallelAsync as a Cake Tool
#tool nuget:?package=SafeParallelAsync&version=0.1.0

SafeParallelAsync

  • Running async operations one at a time in a for-each loop loses out on the ability to parallelise this for performance.
  • Running too many async network operations in parallel will lead to port exhaustion and could easily take down a web server.
  • Running parallel async operations on a large enumerable can easily lead to running out of memory if you try to group the tasks or build up a single list with all the tasks to await at the end.

This is a micro library that solves this single problem; how to to run a large (or small) number of async tasks in parallel, without exhausting your network sockets and without using more memory than strictly necessary.
It supports both IEnumerables and IAsyncEnumerables, can return values and can be chained.

The principles are simple and it's not many lines of code, but we found that every time we had to do this, it required too much thinking and it took too long, even with a code example in our development guidelines. Feel free to use it from Nuget or just copy the relevant lines of code into your project.

The original inspiration for this came from a tweet by Clemens Vasters. We have used that code many times, but each time we have to stop and think for too long - so it felt sensible to do a micro library that, effectively, just provides that code without lots of other stuff.
There are also scenarios like very large enumerables, return values and the fancy new IAsyncEnumerable that we wanted to support.

Installation

dotnet add SafeParallelAsync

Basic Usage

If you wanted to just write all the items in an IEnumerable or IAsyncEnumerable to a queue, you could do something like this:

    await enumerable.SafeParallelAsync(async msg => await queueWriter.Write(msg));

By default, this will write 100 messages in parallel. It will not wait for all 100 to finish, but will keep writing so that at any one time there are 100 writes in operation.

At the other end of the scale, you can chain things and handle cancellations like this:

    var cancellationToken = new CancellationToken();

    var process = idList.SafeParallelAsyncWithResult(id => dbReader.ReadData(id), 30)
                        .SafeParallelAsyncWithResult(readResult => dbWriter.WriteData(readResult.Output), 100)
                        .SafeParallelAsync(writeResult => queueWriter.Write(writeResult.Output.SomeDescription), 50, cancellationToken);

    await process;

The extension methods work for both IEnumerable and IAsyncEnumerable.

The SafeParrallelWithResult methods return a result object you can query - or you can call this extension again to process the results. WithResult returns an IAsyncEnumerable so you can optionally use the .WithCancellationToken syntax, even when your input is a normal IEnumerable. They have error handling and will catch exceptions; the exception will be returned on the result object.

The SafeParallelAsync methods return a plain Task that you can await. This method will not catch errors - if anything throws an exception, it will propagate to your code and interrupt the processing.

For more detailed usage instructions, see the Wiki

Why

One of the very nice things in C# Async is that it makes it easy to do things in parallel. For example, if you need to write 100 messages to a queue, you can do it in parallel instead of one by one, effectively speeding up your code by 100x.
But;

  • If you do too many network writes in parallel, you will exhaust your sockets and effectively take your server down.
  • If you do too many operations in parallel, you may overload the backend you are talking to.
  • If you need to run a very large number of operations, you may use too much memory storing all the Task objects and may exhaust the maxium size of lists.

This micro library:

  • Provides extension methods for IEnumerable and IAsyncEnumerable to run an async Func over each item in the list.
  • It will run the Func in parallel up to a maximum you specify.
  • It will not try to load all of the items from the enumerable into memory or do any aggregations on it so this can be safely used to "stream" data, for example reading millions of records from one database and writing them to another.
  • It will make sure the Task objects go out of scope on a regular basis so you don't consume too much memory, nor maintain references to values from the enumerable.

Background information

I have given a talk about the how async in C# really works (it is probably different to what you think) a few times. A recording is available on my blog

See the Wiki for more scenarions, including more details on cancellation and potential threading issues.

What about...

  • Task.Parallels.ForEach? To my understanding, that method is more about threads (do correct me if I am wrong) whereas this micro library is exclusively about async.
  • See also CSRakowski.Parallel. It offers more functionality in some cases but has a different focus.
Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.1 is compatible. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.4.0-pre.124681 15,046 1/19/2022
0.3.0 12,665 11/18/2020
0.2.0 383 11/11/2020
0.1.0 417 11/2/2020
0.1.0-beta1 266 10/30/2020

Initial release