Concurrency 2.0.2

.NET 5.0
Install-Package Concurrency -Version 2.0.2
dotnet add package Concurrency --version 2.0.2
<PackageReference Include="Concurrency" Version="2.0.2" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Concurrency --version 2.0.2
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Concurrency, 2.0.2"
#r directive can be used in F# Interactive, C# scripting and .NET Interactive. Copy this into the interactive tool or source code of the script to reference the package.
// Install Concurrency as a Cake Addin
#addin nuget:?package=Concurrency&version=2.0.2

// Install Concurrency as a Cake Tool
#tool nuget:?package=Concurrency&version=2.0.2
The NuGet Team does not provide support for this client. Please contact its maintainers for support.

Concurrency

NuGet version Build (PR) Build (Merge & Publish)

Similar to Task.WhenAll but allows you to limit the number of tasks in flight (max concurrency).

NOTE: This has been written to run concurrently (not parallel). It uses a single enumerator concurrently to enumerate and run the tasks. As such, if you want to run in parallel and limit the parallelism I recommend:

Parallel.ForEach(
    someList,
    new ParallelOptions { MaxDegreeOfParallelism = 4 },
    item => { /*...*/ }
);

Here is some example usage from some unit tests:

public class Log
{
    public int Id { get; set; }
    public int NumTasksRunning { get; set; }
}

[TestMethod]
public async Task ShouldRunAllTasks()
{
    //Arrange
    var logs = new List<Log>();
    Func<Log, Task> addLog = log =>
    {
        logs.Add(log);
        return Task.FromResult(0);
    };

    var tasks = new List<Func<Task>>
    {
        async () => await addLog(new Log { Id = 1 }),
        async () => await addLog(new Log { Id = 2 }),
        async () => await addLog(new Log { Id = 3 }),
        async () => await addLog(new Log { Id = 4 }),
        async () => await addLog(new Log { Id = 5 })
    };

    //Act
    await ConcurrentTask.WhenAll(tasks, maxConcurrency: 5);

    //Assert
    logs.ShouldBeEquivalentTo(new List<Log>
    {
        new Log { Id = 1 },
        new Log { Id = 2 },
        new Log { Id = 3 },
        new Log { Id = 4 },
        new Log { Id = 5 },
    });
}

[TestMethod]
public async Task ShouldLimitConcurrency()
{
    //Arrange
    var logs = new List<Log>();
    var numTasksRunning = 0;
    var taskCompletions = Enumerable
        .Range(start: 1, count: 10)
        .Select(i => new TaskCompletionSource<int>())
        .ToList();
    Func<int, Task> addLog = async taskId =>
    {
        numTasksRunning++;
        logs.Add(new Log { Id = taskId, NumTasksRunning = numTasksRunning });
        await taskCompletions[taskId - 1].Task;
        numTasksRunning--;
    };

    var tasks = new List<Func<Task>>
    {
        async () => await addLog(1),
        async () => await addLog(2),
        async () => await addLog(3),
        async () => await addLog(4),
        async () => await addLog(5),
        async () => await addLog(6),
        async () => await addLog(7),
        async () => await addLog(8),
        async () => await addLog(9),
        async () => await addLog(10)
    };

    //Act
    int maxConcurrency = 3;
    var whenAllComplete = ConcurrentTask.WhenAll(tasks, maxConcurrency);
    taskCompletions.ForEach(taskCompletion => taskCompletion.SetResult(0));
    await whenAllComplete;

    //Assert
    logs
        .Max(log => log.NumTasksRunning)
        .Should()
        .Be(maxConcurrency, because: $"there should be up to {maxConcurrency} tasks in flight.");
}

[TestMethod]
public async Task ShouldReturnTasksInOrderDespiteWhenTheyResolve()
{
    //Arrange
    var taskCompletions = Enumerable
        .Range(start: 1, count: 10)
        .Select(i => new TaskCompletionSource<int>())
        .ToList();
    Func<int, Task<int>> getResultFromTaskId = async taskId =>
    {
        await taskCompletions[taskId - 1].Task;
        return taskId;
    };

    var tasks = Enumerable
        .Range(start: 1, count: 10)
        .Select(index => ((Func<Task<int>>)(async () => await getResultFromTaskId(index))));

    //Act
    int maxConcurrency = 3;
    var getResults = ConcurrentTask.WhenAll(tasks, maxConcurrency);
    taskCompletions[4].SetResult(0);
    taskCompletions[1].SetResult(0);
    taskCompletions[0].SetResult(0);
    taskCompletions[9].SetResult(0);
    taskCompletions[2].SetResult(0);
    taskCompletions[6].SetResult(0);
    taskCompletions[8].SetResult(0);
    taskCompletions[3].SetResult(0);
    taskCompletions[7].SetResult(0);
    taskCompletions[5].SetResult(0);
    var results = await getResults;

    //Assert
    results.ShouldBeEquivalentTo(new int[]
    {
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10
    }, ops => ops.WithStrictOrdering());
}
Product Versions
.NET net5.0 net5.0-windows net6.0 net6.0-android net6.0-ios net6.0-maccatalyst net6.0-macos net6.0-tvos net6.0-windows
Compatible target framework(s)
Additional computed target framework(s)
Learn more about Target Frameworks and .NET Standard.
  • net5.0

    • No dependencies.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
2.0.2 181 10/1/2021
2.0.1 131 10/1/2021
1.2.11 5,915 5/2/2018
1.2.10 5,390 3/14/2018
1.2.9 5,376 3/14/2018
1.2.8 5,398 3/14/2018
1.2.7 5,418 3/14/2018
1.2.6 5,402 3/14/2018
1.0.11 5,377 1/12/2018
1.0.10 5,291 1/12/2018
1.0.9 5,445 1/12/2018
1.0.8 5,267 1/12/2018
1.0.7 5,424 1/12/2018
1.0.6 5,306 1/12/2018
1.0.5 5,403 1/12/2018