myCSharp.de - DIE C# und .NET Community (https://www.mycsharp.de/wbb2/index.php)
- Gemeinschaft (https://www.mycsharp.de/wbb2/board.php?boardid=64)
-- .NET-Komponenten und C#-Snippets (https://www.mycsharp.de/wbb2/board.php?boardid=67)
--- Throttling -- Begrenzung der max. Anzahl an gleichzeitig ausgeführten asynchronen Vorgängen (https://www.mycsharp.de/wbb2/thread.php?threadid=119133)


Geschrieben von gfoidl am 11.05.2017 um 14:23:
  Throttling -- Begrenzung der max. Anzahl an gleichzeitig ausgeführten asynchronen Vorgängen
Beschreibung:

Mit dieser Erweiterungsmethode für System.Threading.Tasks.TaskFactory kann die max. Anzahl an gleichzeitigen asynchronen Vorgänge begrenzt werden. Sobald ein Task fertig ist, wird ein neuer gestartet, damit die max. Anzahl so lange es geht aufrecht erhalten wird.

Die Verwendung bietet sich v.a. bei asynchronen IO-Vorgängen wie Webrequests, Webservice-Aufrufen und Datenbank-Aktionen an, wenn viele gleichzeitig aber nicht alle auf einmal durchgeführt werden sollen.

Wenn beispielsweise eine Webrequest oft und maximal 18x gleichzeitig durchgeführt werden soll, so kann mein Beispielcode in  Wie mehrere HttpWebRequest Parallel durchführen ?
mit dieser Erweiterungsmethode direkter und ohne Infrastrukturrauschen geschrieben werden.

C#-Code:
public async Task<IList<string>> DownloadDataAsync(string url, int count, int maxParallelRequests = 18)
{
    using (HttpClient httpClient = new HttpClient())
    {
        // Ein Func die den Download-Task erzeugt, sobald die Func ausgeführt wird
        Func<string, Func<Task<string>>> taskFunc = u => () => httpClient.GetStringAsync(u);

        var resultList = new List<string>(count);
        var taskFuncs  = Enumerable.Repeat(url, count).Select(taskFunc);

        foreach (var completedTask in Task.Factory.Throttling(taskFuncs, maxParallelRequests))
        {
            string result = await completedTask.ConfigureAwait(false);

            resultList.Add(result);
        }

        return resultList;
    }
}

Außerdem ist diese Variante ressourcenschonender als der "triviale" Ansatz im verlinkten Code. Dort muss für Task.WhenAny und Task.WhenAll zu jedem Task immer wieder eine Continuation hinzugefügt werden (intern durch die Infrastruktur von Task.WhenXXX). Hier wird zu jedem Task nur einmal eine Continuation angehängt. Weiters ist keine List<Task> nötig der Task hinzugefügt und entfernt werden.

C#-Code:
using System.Collections.Generic;
using System.Linq;

namespace System.Threading.Tasks
{
    public static class TaskFactoryExtensions
    {
        public static IEnumerable<Task<T>> Throttling<T>(this TaskFactory factory, IEnumerable<Func<Task<T>>> taskFuncs, int concurrencyLevel = int.MaxValue)
        {
            if (factory   == null) throw new ArgumentNullException(nameof(factory));
            if (taskFuncs == null) throw new ArgumentNullException(nameof(taskFuncs));

            return ThrottlingCore(taskFuncs.ToList(), concurrencyLevel);
        }
        //---------------------------------------------------------------------
        private static IEnumerable<Task<T>> ThrottlingCore<T>(List<Func<Task<T>>> taskFuncs, int concurrencyLevel)
        {
            var sources              = taskFuncs.Select(t => new TaskCompletionSource<T>()).ToList();
            var nextTaskEnumerator   = taskFuncs.GetEnumerator();
            var nextSourceEnumerator = sources.GetEnumerator();
            var syncRoot             = new object();

            Action<Task<T>> continuation = null;
            continuation = completed =>
            {
                lock (syncRoot)
                {
                    if (nextSourceEnumerator.MoveNext())
                    {
                        var source = nextSourceEnumerator.Current;

                        if (completed.IsFaulted)
                            source.TrySetException(completed.Exception.InnerExceptions);
                        else if (completed.IsCanceled)
                            source.TrySetCanceled();
                        else
                            source.TrySetResult(completed.Result);
                    }

                    if (nextTaskEnumerator.MoveNext())
                    {
                        Func<Task<T>> taskFunc = nextTaskEnumerator.Current;

                        taskFunc().ContinueWith(
                            continuation,
                            CancellationToken.None,
                            TaskContinuationOptions.ExecuteSynchronously,
                            TaskScheduler.Default);
                    }
                }
            };

            lock (syncRoot)
            {
                int counter = 0;
                while (counter++ < concurrencyLevel && nextTaskEnumerator.MoveNext())
                {
                    Func<Task<T>> taskFunc = nextTaskEnumerator.Current;

                    Task<T> task = taskFunc();
                    task.ContinueWith(
                        continuation,
                        CancellationToken.None,
                        TaskContinuationOptions.ExecuteSynchronously,
                        TaskScheduler.Default);
                }
            }

            return sources.Select(s => s.Task);
        }
    }
}

Cancellation ist in diesem Snippet nicht berücksichtig, kann jedoch leicht nachgerüstet werden.

Anmerkung: Inspiration zu diesem Snippet, das bei mir schon länger herumlag, hab ich aus  Task-based Asynchronous Pattern Abschnitte Throttling / Interleaving

Schlagwörter: Task, Throttling, Begrenzung, Speedski, async/await


© Copyright 2003-2020 myCSharp.de-Team | Impressum | Datenschutz | Alle Rechte vorbehalten. | Dieses Portal verwendet zum korrekten Betrieb Cookies. 25.05.2020 22:38