Laden...

Dataflow LinkTo TransformBlock ist langsam

Erstellt von BlackMatrix vor 6 Jahren Letzter Beitrag vor 6 Jahren 3.853 Views
B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 6 Jahren
Dataflow LinkTo TransformBlock ist langsam

Hallo,

ich konnte bei Stackoverflow leider noch keine passende Antwort zu einer ähnlichen Frage erhalten, daher probiere ich es hier.

Ich habe zwei TransformBlock, die in einem Kreislauf angeordnet sind und ihre Daten weiterreichen. TransformBlock 1 stellt einen Block dar, der I/O Operationen durchführt und auf maximal 50 parallel arbeitende Tasks beschränkt werden soll. Danach sollen die eingelesenen Daten + Metadaten des Einlesens an den nächsten Block weitergereicht werden, der dann aufgrund der Metadaten entscheidet, ob diese erneut (nach einer gewissen Wartezeit) in den I/O Prozess geschickt werden sollen oder aber ob sie aussortiert werden.
Der I/O Block ist auf 50 begrenz (MaxDegreeOfParallelism = 50), der Warte/Aussortierblock ist unlimitiert.
Nun habe ich das Phänomen, schicke (TransformBlock.Post) ich eine große Anzahl von Elementen in den I/O Block und sind fertig bearbeitet, habe ich festgestellt, dass es eine halbe Ewigkeit dauert, bis diese in den 2. Block weitergegeben werden. Ich habe es mit Logeinträgen mal beobachtet und gesehen, dass etwa 10 Minuten kein Eintrag "gelinkt" wird, dann aber gut 1000 Einträge mit einmal. Teilweise sind die Auswirkungen so groß, dass dann mein 1. Block unausgelastet ist, weil der 2. Block viel zu spät seine Daten erhält.

Normalerweise würde ich es so implementieren, weil es sich so "richtiger" anfühlt:


    public void Start()
    {
        _ioBlock = new TransformBlock<Data,Tuple<Data, MetaData>>(async data =>
        {
            var metaData = await ReadAsync(data).ConfigureAwait(false);

            return new Tuple<Data, MetaData>(data, metaData);

        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });

        _waitBlock = new TransformBlock<Tuple<Data, MetaData>,Data>(async dataMetaData =>
        {
            var data = dataMetaData.Item1;
            var metaData = dataMetaData.Item2;

            if (!metaData.Repost)
            {
                return null;
            }

            await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);

            return data;

        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        _ioBlock.LinkTo(_waitBlock);
        _waitBlock.LinkTo(_ioBlock, data => data != null);
        _waitBlock.LinkTo(DataflowBlock.NullTarget<Data>());

        foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
        {
            _ioBlock.Post(data);
        }
    }

Aufgrund des beschriebenen Problems muss ich es aber so schreiben, damit es meiner Vorstellung entspricht. Dadurch werden die Daten schneller an den nächsten Block weitergeleitet. Ich finde aber, dass es eher ein "Hack" ist:


 public void Start()
    {
        _ioBlock = new ActionBlock<Data>(async data =>
        {
            var metaData = await ReadAsync(data).ConfigureAwait(false);

            var dataMetaData= new Tuple<Data, MetaData>(data, metaData);

            _waitBlock.Post(dataMetaData);

        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });

        _waitBlock = new ActionBlock<Tuple<Data, MetaData>>(async dataMetaData =>
        {
            var data = dataMetaData.Item1;
            var metaData = dataMetaData.Item2;

            if (metaData.Repost)
            {
                await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);

                _ioBlock.Post(data);
            }
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
        {
            _ioBlock.Post(data);
        }
    }

Weiß jemand Rat?

16.806 Beiträge seit 2008
vor 6 Jahren

Du baust hier irgendwie TPL Pipelining nach, kann das sein?
Wenn ja - warum?

B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 6 Jahren

Du baust hier irgendwie TPL Pipelining nach, kann das sein?
Wenn ja - warum?

Was meinst du mit nachbauen? Ich nutze sie?

System.Threading.Tasks.Dataflow

https://docs.microsoft.com/de-de/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline

16.806 Beiträge seit 2008
vor 6 Jahren

Nein, Du benutzt nicht das Prinzip von TPL Piplines mit den entsprechenden Collections.

Ich sehe jetzt nicht, warum Du nicht den Pipline Pattern verwenden könntest.
Bekommst geschenkt.

B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 6 Jahren

Wo genau liegt jetzt der Unterschied zwischen den beiden? Auf den ersten Blick scheint mir TPL Pipelines veraltet und weniger "griffig". Auch scheint der Grad der Parallelisierung nicht mit wenig Aufwand gesetzt werden zu können?

W
955 Beiträge seit 2010
vor 6 Jahren

Hi,
Du könntest mal schauen ob der Concurrency Visualizer dir helfen kann.

16.806 Beiträge seit 2008
vor 6 Jahren

Veraltet ist ein mutiger Begriff, denn TPL Dataflow stammt aus der Prototyp-Entwicklung von async/await aber hat es dann nie wirklich ins Framework geschafft; hat also bezogen auf den von Dir verwendeten Begriff "veraltet" schon ein wenig auf dem Buckel (~6-7 Jahre) - kam dann aber doch als NuGet raus.

Meiner Meinung nach wurde da auch weniger rein investiert, da ganz viele Szenarien von TPL Dataflow eben schon durch Reactive Extensions super einfach lösbar sind und das bereits eine große Verbreitung und Community hat.
(Wir hatten dazu erst gestern einen Thread, aber mit WPF-UI Fokus Reactive Extensions (RX) in WPF: Erfahrungen und/oder Bewertungen?)

Ich sehe auch den Pipeline Pattern aus Architektur-sicht weit dem Dataflow überlegen, da ich viel isolierter arbeiten kann und das Skalieren viel einfacher umsetzbar ist.
Während ich bei Pipelines die Paralellisierung von Außen dynamisch bestimmen und kontrollieren kann, geht das bei DataFlow nur umständlich und auch nur von Innen.
Ich muss ja auch den Code wirklich mit allen möglichen DataFlow-Klassen und -Elementen anreichern, damit das alles klappt, was man eben bei Pipelines nicht muss.

Sehe wenig Nutzen derzeit am Dataflow Feature von TPL und würde es in meinen Projekten auch nicht verwenden.

B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 6 Jahren

Ich sehe auch den Pipeline Pattern aus Architektur-sicht weit dem Dataflow überlegen, da ich viel isolierter arbeiten kann und das Skalieren viel einfacher umsetzbar ist.
Während ich bei Pipelines die Paralellisierung von Außen dynamisch bestimmen und kontrollieren kann, geht das bei DataFlow nur umständlich und auch nur von Innen.

Ja, wenn das der Fall wäre, dann müsste man sich tatsächlich über eine Alternative Gedanken machen. Dataflow funktioniert am besten, wenn man statische Blöcke hat, bei denen die der Parallelisierungsgrad fix ist.

Ich habe jetzt mal auf TPL Pipelines umgebaut und ich muss ganz ehrlich sagen, da ist viel Code dabei, den ich bei Dataflow eindeutig nicht schreiben muss. Beim Schreiben des Codes erinnert mich das ein wenig an meine zweite Variante mit dem direkten TransformBlock.Post anstatt dem Linken. Hinzu kommt, dass ich noch nicht den Fall implementiert habe, wenn die IO-Pipeline (Max. 50 Pipeline) mal nicht prall gefüllt ist. Scheinbar muss ich da noch einen weiteren Buffer einführen mit BoundedCapacities limitiert auf 50.

Ist denn der erstmal so angedacht mit TPL Pipelining und entspricht in etwa dem von oben oder siehst du da noch irgendwelche Fallstricke, die ich (noch) nicht sehe?


        public void Start()
        {
            var ioBuffer = new BlockingCollection<Data>();
            var waitBuffer = new BlockingCollection<Tuple<Data, MetaData>>();

            ConsumeIOBuffer(ioBuffer, waitBuffer);
            ConsumeWaitBuffer(waitBuffer, ioBuffer);

            foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
            {
                ioBuffer.Add(data);
            }
        }

        private static void ConsumeWaitBuffer(BlockingCollection<Tuple<Data, MetaData>> waitBuffer, BlockingCollection<Data> ioBuffer)
        {
            Task.Run(() =>
            {
                while (true)
                {
                    var dataMetaData = waitBuffer.Take();

                    var _ = WaitAndAddAsync(dataMetaData, ioBuffer);
                }
            });
        }

        private static async Task WaitAndAddAsync(Tuple<Data, MetaData> dataMetaData, BlockingCollection<Data> ioBuffer)
        {
            var data = dataMetaData.Item1;
            var metaData = dataMetaData.Item2;

            if (metaData.Repost)
            {
                await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
                ioBuffer.Add(data);
             }

            
        }

        private void ConsumeIOBuffer(BlockingCollection<Data> ioBuffer, BlockingCollection<Tuple<Data, MetaData>> waitBuffer)
        {
            Task.Run(async () =>
            {
                var tasks = new Dictionary<Task<MetaData>, Data>();

                const int maxDegreeOfParallelism = 50;

                while (true)
                {
                    var data = ioBuffer.Take();

                    var task = ReadAsync(data);

                    tasks[task] = data;

                    if (tasks.Count >= maxDegreeOfParallelism)
                    {
                        var finishedTask = await Task.WhenAny(tasks.Keys).ConfigureAwait(false);

                        var metaData = await finishedTask.ConfigureAwait(false);

                        var dataMetaData = new Tuple<Data, MetaData>(tasks[finishedTask], metaData);

                        waitBuffer.Add(dataMetaData);

                        tasks.Remove(finishedTask);
                    }
                }
            });
        }

Edit:

So wie ich das sehe, wenn man den Fall kleiner maxDegreeOfParallelism betrachtet, wird es richtig kompliziert. Jedes Hinzufügen eines Elements zum IOBuffer müsste erneut schauen, welcher Task fertig ist (Task.WhenAny) um ihn an den WaitBuffer weiterzugeben?

Sehe wenig Nutzen derzeit am Dataflow Feature von TPL und würde es in meinen Projekten auch nicht verwenden.

Es würde mich ja schon mal interessieren, wie das in meinem Fall mit TPL Pipelines mit evtl. minimal größerem Aufwand bewerkstelligt werden könnte.

B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 6 Jahren

Also mir fehlt noch ein bisschen der Durchblick bei den Pipelines. Ich hab mir jetzt gedacht, es müssen 3 Buffer werden. Der 1. Buffer nimmt erstmal alle Datenelemente entgegen und ist unlimitert. Der 2. Buffer müsste der I/O Buffer werden, der dann auf 50 mittels bounded capacity begrenzt wird. Nun ist die Frage von welchem Datentyp dieser sein müsste. Entweder wieder ein BlockingCollection<Data>, der eben auf 50 limitert ist. Dann hätte man so etwas was ich im letzten Beispiel in der Methode ConsumeIOBuffer versucht hatte. Ich glaube da baue ich aber etwas nach, was eigentlich schon durch die BlockingCollection erreicht werden sollte. Oder aber man baut einen Buffer ala BlockingCollection<Task<...>>, dann hätte ich aber das Problem, dass ich nicht gezielt fertiggestellte/completed Tasks aus der BlockingCollection nehmen (Take) kann.
Übersehe ich noch etwas oder wo ist mein Denkfehler?

16.806 Beiträge seit 2008
vor 6 Jahren

Es werden niemals die Buffer selbst limitiert, sondern die verarbeiteten Elemente - zB. die Consumer Tasks.

Der Buffer selbst weiß niemals, wie vielen Producern er eigentlich Daten eingeschoben bekommt bzw. von Consumer wieder entnommen werden.
Ist ihm auch wurst.

Hangfire verfolgt genau das gleiche Ziel, hat einfach einen In-Process Buffer in Form einer BlockingCollection, sondern zB. eine SQL Datenbank oder einen Redis.
Der Buffer-Typ ist technisch wurst; ergibt sich nur durch die Anforderungen (flüchtig vs. persistent).

D
985 Beiträge seit 2014
vor 6 Jahren

mMn macht es durchaus Sinn auch die (InMemory-)Buffer zu begrenzen, denn ansonsten kann es passieren, dass der Speicher überläuft.

Das passiert immer dann, wenn der Buffer schneller befüllt als abgearbeitet werden kann und es eben mehr Elemente gibt als in den Speicher passen.

16.806 Beiträge seit 2008
vor 6 Jahren

Wenn Du in so eine Sizing Situation kommst, dann ist InMemory einfach auch das Falsche.

B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 6 Jahren

🤔

16.806 Beiträge seit 2008
vor 6 Jahren

Was ist Dir unklar?

6.911 Beiträge seit 2009
vor 6 Jahren

Hallo,

ich sehe das so wie Sir Rufo. Der Buffer kann / muss eine Limitierung haben und wenn diese erreicht ist, so werden die Producer-Tasks blockiert. Mit der BlockingCollection<T> aus der TPL ist genau so und nur so kann sicher ein "Überlauf" verhindert werden.

Das hat auch nichts mit einer Limitierung von In-Memory zu tun od. sonst etwas.
Ließe sich durch eine andere Architekture diese Limitierung vermeiden, so wäre das besser und die Anwendung skaliert auch besser. Aber auch in solch einer Architektur sollte zur Sicherheit eine Buffer-Limitierung vorhanden sein.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

16.806 Beiträge seit 2008
vor 6 Jahren

Hber auch in solch einer Architektur sollte zur Sicherheit eine Buffer-Limitierung vorhanden sein.

Nein. Meine Meinung nach ein deutliches Nein.

Was soll das denn für eine Sicherheit sein? Soll dann das Add blockieren und Du kommst in eine Situation, an dem die Anwendung hart abbrechen muss, weil das schließlich eine Situation ist, die nicht automatisch gelöst werden kann?
Welchen Sinn hat das denn?

Der Buffer ist doch genau dazu da, dass Messages auflaufen dürfen, wenn nicht schnell genug abgearbeitet werden kann, zB weil derzeit / temporär ein Consumer nicht online ist.

Macht absolut kein Sinn in meinen Augen.

6.911 Beiträge seit 2009
vor 6 Jahren

Hallo Abt,

warum soll die Anwendung hart abbrechen müssen?
Der Producer ist einfach blockiert*, bis der Buffer wieder Platz hat. Standard Producer/Consumer-Szenario.

Ist es deiner Meinung nach besser wenn der Buffer "übergeht" und es so zu einer OutOfMemoryException kommt? Das kann doch keine Logik sein.

Der Buffer ist doch genau dazu da, dass Messages auflaufen dürfen,

Genau, aber nicht bis zum App-Crash, sondern nur bis zu einem bestimmten Limit.

So etwas macht absolut Sinn in meinen Augen.

* bzw. durch die TryXxx-Methoden ist sogar keine Thread-Blockade durch eine Synchronisationsprimitive nötig, sondern es kann eine alternative Ausführungspfad gewählt werden.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

16.806 Beiträge seit 2008
vor 6 Jahren

Naja, dann muss man auch lesen, was ich geschrieben hab.

Wenn ein Buffer in ein Szenario kommt, dass er Größen wie eine OutOfMemory-Exception kommt, dann ist ein InMemory-Szenario halt auch völlig deplatziert.
Um sowas vorzubeugen gibt es persistierende Queues.

Und nein, einen Producer zu blockieren ist kein adäquatives Mittel.
Ich wüsste nicht wieso. Es macht auch keinen Sinn.
Das ist nicht notwendig, wenn die Queue die richtige Technologie hat.
Dafür ist sie da.

6.911 Beiträge seit 2009
vor 6 Jahren

Hallo Abt,

also gut ich fasse zusammen: Wenn du "Nein" sagst, sowie dass es keinen Sinn macht und du nicht weißt wieso, dann gilt automatisch deine Antwort die auch gelesen werden soll. Super! Da ist mir die Zeit zu schade um zu diskutieren.

Nur noch so viel: hast du dir einmal überlegt wo überall ein Producer/Consumer eingesetzt wird und wieviele davon ohne Limitierung auskommen? Vermutlich schon...wozu frag ich überhaupt.

Hallo zusammen,

es gibt etliche Implementierung eines Buffers für ein Producer/Consumer-Szenario die eine Limitierung / Bounded Capacity haben und das aus gutem Grund, denn es mag verschiedene Anwendungsfälle geben, in denen eine maximale Bufferkapazität nicht überschritten werden darf. BlockingCollection<T> ist so ein Beispiel für einen In-Memory Buffer der in .net dabei ist.

Zitat von: Erzeuger-Verbraucher-Problem
Ist die Aufnahmekapazität der Datenstruktur beschränkt, so soll die Zugriffsregelung ferner verhindern, dass ein erzeugender Prozess auf die Datenstruktur zugreift, wenn die Aufnahmekapazität der Datenstruktur bereits ausgeschöpft ist.

Es gibt aber auch Anwendungsfälle, bei denen ein Producer nicht blockiert werden darf. Auch hierfür gibt es gute Gründe, wie z.B. dass periodische Abtastwerte nicht "verschluckt" werden dürfen.
Für diese Fälle hat Abt passende Möglichkeiten genannt.

Hallo BlackMatrix,

ich konnte bei Stackoverflow leider noch keine passende Antwort

Kannst du dann wenigstens die andere Frage verlinken?

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 6 Jahren

ich konnte bei Stackoverflow leider noch keine passende Antwort
Kannst du dann wenigstens die andere Frage verlinken?

Kann ich 😃

TPL Dataflow LinkTo TransformBlock is very slow

Hatte den Code oben mal mit Logeinträgen versehen:

https://pastebin.com/zRYYWDGx

Resultat:

https://pastebin.com/i3QLSc6Z

Es dauert eben bis zu 50 Sekunden ein fertiges Ergebnis des I/O Blocks an den Waitblock übergeben wird.

2018-03-21 15:37:03.6352|DEBUG|Starter.StackoverFlowTest|Exit I/O block 29...|
2018-03-21 15:37:56.9446|DEBUG|Starter.StackoverFlowTest|Enter wait block 29...|

16.806 Beiträge seit 2008
vor 6 Jahren

Meine eigentliche Aussage wird irgendwie nicht beachtet:

Wenn die BlockingCollection künstlich blockiert werden muss - weil der Speicher sonst platzt - ist denn dann ein InMemory-Verfahren überhaupt nocht der richtige weg und kann das nicht ausgelagert werden?

Wenn die Datenbank nicht mehr in den Speicher passt, dann lagern wir das doch auch aus.
Wieso nicht denn auch mit Buffern? Das ist Alltag (auch wenn nur ein kleiner Bestandteil) in verteilten Architekturen.

Es ist was völlig anderes, wenn das InMemory-Verfahren nicht ersetzt werden kann.

@BlackMatrix: bitte keine externen Bildhoster.
[Hinweis] Wie poste ich richtig? 6.1 Bilder als Dateianhang hochladen

Ich versteh trotzdem Dein Problem nicht.
Es hatte sich so angehört (für mich), dass Deine Anwendung 50 Sekunden nichts macht.

Jetzt geht es Dir einfach nur um die Reihenfolge der Tasks, oder wie?
Es gibt keine Garantie in Deinem Code, dass nach einem Exit sofort ein Enter kommt.
Die Reihenfolge des Task Schedulers kannst Du hier gar nicht in der Form vorgeben - ist ja auch nicht sinn der Sache.
Oder um was geht es Dir? Du sprichst ja von langsam. Langsam ist es ja nicht, nur nicht in der Reihenfolge, wie Du es Dir vorstellst - oder?

Ohne den Link auf das Pastebin der Logs ist Deine Ansicht, was passiert, völlig verfälscht mit dem, was Du suggerierst. 😉

B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 6 Jahren

Ich entschuldige mich, wenn ich mich falsch ausgedrückt habe.

Mir geht es halt darum, dass es mit der LinkTo-Variante zu lange dauert, bis die eigentliche Message/Daten in dem 2. Block bzw. Waitblock ankommen/entern. Anstatt eine fertige Message direkt an den WaitBlock weiterzuleiten, gibt es scheinbar einen sehr niedrig priorisierten Task, der dann mit einmal alle fertig bearbeiteten Messages des I/O-Blocks in den WaitBlock "schiebt". Weiß jemand wann das ausgeführt wird?
Die Reihenfolge ist mir egal, es ist mir aber wichtig, dass die Messages nicht so blockweise transferiert werden. (Nein, es ist nicht wegen des fixen Delays wie bei Stackoverflow gedacht wird). Mit dem direkten Aufruf der Post-Methode kann ich dieses Problem umgehen, aber scheint mir wie gesagt ein Hack zu sein.

B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 6 Jahren

OMG! Lass mich raten mit ExecutionDataflowBlockOptions.EnsureOrdered ist das Problem behoben -.-

Wieso ist das standardmäßig true -.-

16.806 Beiträge seit 2008
vor 6 Jahren

Also anhand der Dokumentation und der Beschreibung von Dataflow: Degree of Parallelism hätte ich jetzt auch nicht dieses Verhalten erwartet.

B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 6 Jahren

Mit Setzen auf false ist das Problem damit behoben.

Widerspricht das nicht dem ganzen Konzept, wenn das standardmäßig true ist? Erst kann ich die Parallelisierung mittels MaxDegreeOfParallelism hochschrauben und dann warte ich doch auf den langsamsten Task?

6.911 Beiträge seit 2009
vor 6 Jahren

Hallo BlackMatrix,

kennst du das Whitepaper Introduction to TPL Dataflow? Vllt. hilft das fürs Verständnis warum und wieso manche Einstellungen standardmäßig so sind.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"