myCSharp.de - DIE C# und .NET Community
Willkommen auf myCSharp.de! Anmelden | kostenlos registrieren
 
 | Suche | FAQ

» Hauptmenü
myCSharp.de
» Startseite
» Forum
» FAQ
» Artikel
» C#-Snippets
» Jobbörse
» Suche
» Regeln
» Wie poste ich richtig?
» Forum-FAQ

Mitglieder
» Liste / Suche
» Wer ist wo online?

Ressourcen
» openbook: Visual C#
» openbook: OO
» Microsoft Docs

Team
» Kontakt
» Übersicht
» Wir über uns

» myCSharp.de Diskussionsforum
Du befindest Dich hier: Community-Index » Diskussionsforum » Gemeinschaft » .NET-Komponenten und C#-Snippets » Throttling -- Begrenzung der max. Anzahl an gleichzeitig ausgeführten asynchronen Vorgängen
Letzter Beitrag | Erster ungelesener Beitrag Druckvorschau | Thema zu Favoriten hinzufügen

Antwort erstellen
Zum Ende der Seite springen  

Throttling -- Begrenzung der max. Anzahl an gleichzeitig ausgeführten asynchronen Vorgängen

 
Autor
Beitrag « Vorheriges Thema | Nächstes Thema »
gfoidl gfoidl ist männlich
myCSharp.de-Team

avatar-2894.jpg


Dabei seit: 07.06.2009
Beiträge: 6.600
Entwicklungsumgebung: VS 2019
Herkunft: Waidring


gfoidl ist offline

Throttling -- Begrenzung der max. Anzahl an gleichzeitig ausgeführten asynchronen Vorgängen

Beitrag: beantworten | zitieren | editieren | melden/löschen       | Top

Beschreibung:

Mit dieser Erweiterungsmethode für System.Threading.Tasks.TaskFactory kann die max. Anzahl an gleichzeitigen asynchronen Vorgänge begrenzt werden. Sobald ein Task fertig ist, wird ein neuer gestartet, damit die max. Anzahl so lange es geht aufrecht erhalten wird.

Die Verwendung bietet sich v.a. bei asynchronen IO-Vorgängen wie Webrequests, Webservice-Aufrufen und Datenbank-Aktionen an, wenn viele gleichzeitig aber nicht alle auf einmal durchgeführt werden sollen.

Wenn beispielsweise eine Webrequest oft und maximal 18x gleichzeitig durchgeführt werden soll, so kann mein Beispielcode in  Wie mehrere HttpWebRequest Parallel durchführen ?
mit dieser Erweiterungsmethode direkter und ohne Infrastrukturrauschen geschrieben werden.

C#-Code:
public async Task<IList<string>> DownloadDataAsync(string url, int count, int maxParallelRequests = 18)
{
    using (HttpClient httpClient = new HttpClient())
    {
        // Ein Func die den Download-Task erzeugt, sobald die Func ausgeführt wird
        Func<string, Func<Task<string>>> taskFunc = u => () => httpClient.GetStringAsync(u);

        var resultList = new List<string>(count);
        var taskFuncs  = Enumerable.Repeat(url, count).Select(taskFunc);

        foreach (var completedTask in Task.Factory.Throttling(taskFuncs, maxParallelRequests))
        {
            string result = await completedTask.ConfigureAwait(false);

            resultList.Add(result);
        }

        return resultList;
    }
}

Außerdem ist diese Variante ressourcenschonender als der "triviale" Ansatz im verlinkten Code. Dort muss für Task.WhenAny und Task.WhenAll zu jedem Task immer wieder eine Continuation hinzugefügt werden (intern durch die Infrastruktur von Task.WhenXXX). Hier wird zu jedem Task nur einmal eine Continuation angehängt. Weiters ist keine List<Task> nötig der Task hinzugefügt und entfernt werden.

C#-Code:
using System.Collections.Generic;
using System.Linq;

namespace System.Threading.Tasks
{
    public static class TaskFactoryExtensions
    {
        public static IEnumerable<Task<T>> Throttling<T>(this TaskFactory factory, IEnumerable<Func<Task<T>>> taskFuncs, int concurrencyLevel = int.MaxValue)
        {
            if (factory   == null) throw new ArgumentNullException(nameof(factory));
            if (taskFuncs == null) throw new ArgumentNullException(nameof(taskFuncs));

            return ThrottlingCore(taskFuncs.ToList(), concurrencyLevel);
        }
        //---------------------------------------------------------------------
        private static IEnumerable<Task<T>> ThrottlingCore<T>(List<Func<Task<T>>> taskFuncs, int concurrencyLevel)
        {
            var sources              = taskFuncs.Select(t => new TaskCompletionSource<T>()).ToList();
            var nextTaskEnumerator   = taskFuncs.GetEnumerator();
            var nextSourceEnumerator = sources.GetEnumerator();
            var syncRoot             = new object();

            Action<Task<T>> continuation = null;
            continuation = completed =>
            {
                lock (syncRoot)
                {
                    if (nextSourceEnumerator.MoveNext())
                    {
                        var source = nextSourceEnumerator.Current;

                        if (completed.IsFaulted)
                            source.TrySetException(completed.Exception.InnerExceptions);
                        else if (completed.IsCanceled)
                            source.TrySetCanceled();
                        else
                            source.TrySetResult(completed.Result);
                    }

                    if (nextTaskEnumerator.MoveNext())
                    {
                        Func<Task<T>> taskFunc = nextTaskEnumerator.Current;

                        taskFunc().ContinueWith(
                            continuation,
                            CancellationToken.None,
                            TaskContinuationOptions.ExecuteSynchronously,
                            TaskScheduler.Default);
                    }
                }
            };

            lock (syncRoot)
            {
                int counter = 0;
                while (counter++ < concurrencyLevel && nextTaskEnumerator.MoveNext())
                {
                    Func<Task<T>> taskFunc = nextTaskEnumerator.Current;

                    Task<T> task = taskFunc();
                    task.ContinueWith(
                        continuation,
                        CancellationToken.None,
                        TaskContinuationOptions.ExecuteSynchronously,
                        TaskScheduler.Default);
                }
            }

            return sources.Select(s => s.Task);
        }
    }
}

Cancellation ist in diesem Snippet nicht berücksichtig, kann jedoch leicht nachgerüstet werden.

Anmerkung: Inspiration zu diesem Snippet, das bei mir schon länger herumlag, hab ich aus  Task-based Asynchronous Pattern Abschnitte Throttling / Interleaving

Schlagwörter: Task, Throttling, Begrenzung, Speedski, async/await
11.05.2017 14:23 Beiträge des Benutzers | zu Buddylist hinzufügen
Baumstruktur | Brettstruktur       | Top 
myCSharp.de | Forum Der Startbeitrag ist älter als 2 Jahre.
Der letzte Beitrag ist älter als 2 Jahre.
Antwort erstellen


© Copyright 2003-2019 myCSharp.de-Team | Impressum | Datenschutz | Alle Rechte vorbehalten. | Dieses Portal verwendet zum korrekten Betrieb Cookies. 13.11.2019 13:54