Hallo,
ich konnte bei Stackoverflow leider noch keine passende Antwort zu einer ähnlichen Frage erhalten, daher probiere ich es hier.
Ich habe zwei TransformBlock, die in einem Kreislauf angeordnet sind und ihre Daten weiterreichen. TransformBlock 1 stellt einen Block dar, der I/O Operationen durchführt und auf maximal 50 parallel arbeitende Tasks beschränkt werden soll. Danach sollen die eingelesenen Daten + Metadaten des Einlesens an den nächsten Block weitergereicht werden, der dann aufgrund der Metadaten entscheidet, ob diese erneut (nach einer gewissen Wartezeit) in den I/O Prozess geschickt werden sollen oder aber ob sie aussortiert werden.
Der I/O Block ist auf 50 begrenz (MaxDegreeOfParallelism = 50), der Warte/Aussortierblock ist unlimitiert.
Nun habe ich das Phänomen, schicke (TransformBlock.Post) ich eine große Anzahl von Elementen in den I/O Block und sind fertig bearbeitet, habe ich festgestellt, dass es eine halbe Ewigkeit dauert, bis diese in den 2. Block weitergegeben werden. Ich habe es mit Logeinträgen mal beobachtet und gesehen, dass etwa 10 Minuten kein Eintrag "gelinkt" wird, dann aber gut 1000 Einträge mit einmal. Teilweise sind die Auswirkungen so groß, dass dann mein 1. Block unausgelastet ist, weil der 2. Block viel zu spät seine Daten erhält.
Normalerweise würde ich es so implementieren, weil es sich so "richtiger" anfühlt:
public void Start()
{
_ioBlock = new TransformBlock<Data,Tuple<Data, MetaData>>(async data =>
{
var metaData = await ReadAsync(data).ConfigureAwait(false);
return new Tuple<Data, MetaData>(data, metaData);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });
_waitBlock = new TransformBlock<Tuple<Data, MetaData>,Data>(async dataMetaData =>
{
var data = dataMetaData.Item1;
var metaData = dataMetaData.Item2;
if (!metaData.Repost)
{
return null;
}
await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
return data;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
_ioBlock.LinkTo(_waitBlock);
_waitBlock.LinkTo(_ioBlock, data => data != null);
_waitBlock.LinkTo(DataflowBlock.NullTarget<Data>());
foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
{
_ioBlock.Post(data);
}
}
Aufgrund des beschriebenen Problems muss ich es aber so schreiben, damit es meiner Vorstellung entspricht. Dadurch werden die Daten schneller an den nächsten Block weitergeleitet. Ich finde aber, dass es eher ein "Hack" ist:
public void Start()
{
_ioBlock = new ActionBlock<Data>(async data =>
{
var metaData = await ReadAsync(data).ConfigureAwait(false);
var dataMetaData= new Tuple<Data, MetaData>(data, metaData);
_waitBlock.Post(dataMetaData);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 });
_waitBlock = new ActionBlock<Tuple<Data, MetaData>>(async dataMetaData =>
{
var data = dataMetaData.Item1;
var metaData = dataMetaData.Item2;
if (metaData.Repost)
{
await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
_ioBlock.Post(data);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
{
_ioBlock.Post(data);
}
}
Weiß jemand Rat?
Du baust hier irgendwie TPL Pipelining nach, kann das sein?
Wenn ja - warum?
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code
Du baust hier irgendwie TPL Pipelining nach, kann das sein?
Wenn ja - warum?
Was meinst du mit nachbauen? Ich nutze sie?
System.Threading.Tasks.Dataflow
Nein, Du benutzt nicht das Prinzip von TPL Piplines mit den entsprechenden Collections.
Ich sehe jetzt nicht, warum Du nicht den Pipline Pattern verwenden könntest.
Bekommst geschenkt.
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code
Wo genau liegt jetzt der Unterschied zwischen den beiden? Auf den ersten Blick scheint mir TPL Pipelines veraltet und weniger "griffig". Auch scheint der Grad der Parallelisierung nicht mit wenig Aufwand gesetzt werden zu können?
Veraltet ist ein mutiger Begriff, denn TPL Dataflow stammt aus der Prototyp-Entwicklung von async/await aber hat es dann nie wirklich ins Framework geschafft; hat also bezogen auf den von Dir verwendeten Begriff "veraltet" schon ein wenig auf dem Buckel (~6-7 Jahre) - kam dann aber doch als NuGet raus.
Meiner Meinung nach wurde da auch weniger rein investiert, da ganz viele Szenarien von TPL Dataflow eben schon durch Reactive Extensions super einfach lösbar sind und das bereits eine große Verbreitung und Community hat.
(Wir hatten dazu erst gestern einen Thread, aber mit WPF-UI Fokus Reactive Extensions (RX) in WPF: Erfahrungen und/oder Bewertungen?)
Ich sehe auch den Pipeline Pattern aus Architektur-sicht weit dem Dataflow überlegen, da ich viel isolierter arbeiten kann und das Skalieren viel einfacher umsetzbar ist.
Während ich bei Pipelines die Paralellisierung von Außen dynamisch bestimmen und kontrollieren kann, geht das bei DataFlow nur umständlich und auch nur von Innen.
Ich muss ja auch den Code wirklich mit allen möglichen DataFlow-Klassen und -Elementen anreichern, damit das alles klappt, was man eben bei Pipelines nicht muss.
Sehe wenig Nutzen derzeit am Dataflow Feature von TPL und würde es in meinen Projekten auch nicht verwenden.
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code
Ich sehe auch den Pipeline Pattern aus Architektur-sicht weit dem Dataflow überlegen, da ich viel isolierter arbeiten kann und das Skalieren viel einfacher umsetzbar ist.
Während ich bei Pipelines die Paralellisierung von Außen dynamisch bestimmen und kontrollieren kann, geht das bei DataFlow nur umständlich und auch nur von Innen.
Ja, wenn das der Fall wäre, dann müsste man sich tatsächlich über eine Alternative Gedanken machen. Dataflow funktioniert am besten, wenn man statische Blöcke hat, bei denen die der Parallelisierungsgrad fix ist.
Ich habe jetzt mal auf TPL Pipelines umgebaut und ich muss ganz ehrlich sagen, da ist viel Code dabei, den ich bei Dataflow eindeutig nicht schreiben muss. Beim Schreiben des Codes erinnert mich das ein wenig an meine zweite Variante mit dem direkten TransformBlock.Post anstatt dem Linken. Hinzu kommt, dass ich noch nicht den Fall implementiert habe, wenn die IO-Pipeline (Max. 50 Pipeline) mal nicht prall gefüllt ist. Scheinbar muss ich da noch einen weiteren Buffer einführen mit BoundedCapacities limitiert auf 50.
Ist denn der erstmal so angedacht mit TPL Pipelining und entspricht in etwa dem von oben oder siehst du da noch irgendwelche Fallstricke, die ich (noch) nicht sehe?
public void Start()
{
var ioBuffer = new BlockingCollection<Data>();
var waitBuffer = new BlockingCollection<Tuple<Data, MetaData>>();
ConsumeIOBuffer(ioBuffer, waitBuffer);
ConsumeWaitBuffer(waitBuffer, ioBuffer);
foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
{
ioBuffer.Add(data);
}
}
private static void ConsumeWaitBuffer(BlockingCollection<Tuple<Data, MetaData>> waitBuffer, BlockingCollection<Data> ioBuffer)
{
Task.Run(() =>
{
while (true)
{
var dataMetaData = waitBuffer.Take();
var _ = WaitAndAddAsync(dataMetaData, ioBuffer);
}
});
}
private static async Task WaitAndAddAsync(Tuple<Data, MetaData> dataMetaData, BlockingCollection<Data> ioBuffer)
{
var data = dataMetaData.Item1;
var metaData = dataMetaData.Item2;
if (metaData.Repost)
{
await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
ioBuffer.Add(data);
}
}
private void ConsumeIOBuffer(BlockingCollection<Data> ioBuffer, BlockingCollection<Tuple<Data, MetaData>> waitBuffer)
{
Task.Run(async () =>
{
var tasks = new Dictionary<Task<MetaData>, Data>();
const int maxDegreeOfParallelism = 50;
while (true)
{
var data = ioBuffer.Take();
var task = ReadAsync(data);
tasks[task] = data;
if (tasks.Count >= maxDegreeOfParallelism)
{
var finishedTask = await Task.WhenAny(tasks.Keys).ConfigureAwait(false);
var metaData = await finishedTask.ConfigureAwait(false);
var dataMetaData = new Tuple<Data, MetaData>(tasks[finishedTask], metaData);
waitBuffer.Add(dataMetaData);
tasks.Remove(finishedTask);
}
}
});
}
Edit:
So wie ich das sehe, wenn man den Fall kleiner maxDegreeOfParallelism betrachtet, wird es richtig kompliziert. Jedes Hinzufügen eines Elements zum IOBuffer müsste erneut schauen, welcher Task fertig ist (Task.WhenAny) um ihn an den WaitBuffer weiterzugeben?
Sehe wenig Nutzen derzeit am Dataflow Feature von TPL und würde es in meinen Projekten auch nicht verwenden.
Es würde mich ja schon mal interessieren, wie das in meinem Fall mit TPL Pipelines mit evtl. minimal größerem Aufwand bewerkstelligt werden könnte.
Also mir fehlt noch ein bisschen der Durchblick bei den Pipelines. Ich hab mir jetzt gedacht, es müssen 3 Buffer werden. Der 1. Buffer nimmt erstmal alle Datenelemente entgegen und ist unlimitert. Der 2. Buffer müsste der I/O Buffer werden, der dann auf 50 mittels bounded capacity begrenzt wird. Nun ist die Frage von welchem Datentyp dieser sein müsste. Entweder wieder ein BlockingCollection<Data>, der eben auf 50 limitert ist. Dann hätte man so etwas was ich im letzten Beispiel in der Methode ConsumeIOBuffer versucht hatte. Ich glaube da baue ich aber etwas nach, was eigentlich schon durch die BlockingCollection erreicht werden sollte. Oder aber man baut einen Buffer ala BlockingCollection<Task<...>>, dann hätte ich aber das Problem, dass ich nicht gezielt fertiggestellte/completed Tasks aus der BlockingCollection nehmen (Take) kann.
Übersehe ich noch etwas oder wo ist mein Denkfehler?
Es werden niemals die Buffer selbst limitiert, sondern die verarbeiteten Elemente - zB. die Consumer Tasks.
Der Buffer selbst weiß niemals, wie vielen Producern er eigentlich Daten eingeschoben bekommt bzw. von Consumer wieder entnommen werden.
Ist ihm auch wurst.
Hangfire verfolgt genau das gleiche Ziel, hat einfach einen In-Process Buffer in Form einer BlockingCollection, sondern zB. eine SQL Datenbank oder einen Redis.
Der Buffer-Typ ist technisch wurst; ergibt sich nur durch die Anforderungen (flüchtig vs. persistent).
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code
mMn macht es durchaus Sinn auch die (InMemory-)Buffer zu begrenzen, denn ansonsten kann es passieren, dass der Speicher überläuft.
Das passiert immer dann, wenn der Buffer schneller befüllt als abgearbeitet werden kann und es eben mehr Elemente gibt als in den Speicher passen.
Wenn Du in so eine Sizing Situation kommst, dann ist InMemory einfach auch das Falsche.
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code
Was ist Dir unklar?
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code
Hallo,
ich sehe das so wie Sir Rufo. Der Buffer kann / muss eine Limitierung haben und wenn diese erreicht ist, so werden die Producer-Tasks blockiert. Mit der BlockingCollection<T> aus der TPL ist genau so und nur so kann sicher ein "Überlauf" verhindert werden.
Das hat auch nichts mit einer Limitierung von In-Memory zu tun od. sonst etwas.
Ließe sich durch eine andere Architekture diese Limitierung vermeiden, so wäre das besser und die Anwendung skaliert auch besser. Aber auch in solch einer Architektur sollte zur Sicherheit eine Buffer-Limitierung vorhanden sein.
mfG Gü
Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.
"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"
Hber auch in solch einer Architektur sollte zur Sicherheit eine Buffer-Limitierung vorhanden sein.
Nein. Meine Meinung nach ein deutliches Nein.
Was soll das denn für eine Sicherheit sein? Soll dann das Add blockieren und Du kommst in eine Situation, an dem die Anwendung hart abbrechen muss, weil das schließlich eine Situation ist, die nicht automatisch gelöst werden kann?
Welchen Sinn hat das denn?
Der Buffer ist doch genau dazu da, dass Messages auflaufen dürfen, wenn nicht schnell genug abgearbeitet werden kann, zB weil derzeit / temporär ein Consumer nicht online ist.
Macht absolut kein Sinn in meinen Augen.
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code
Hallo Abt,
warum soll die Anwendung hart abbrechen müssen?
Der Producer ist einfach blockiert*, bis der Buffer wieder Platz hat. Standard Producer/Consumer-Szenario.
Ist es deiner Meinung nach besser wenn der Buffer "übergeht" und es so zu einer OutOfMemoryException kommt? Das kann doch keine Logik sein.
Der Buffer ist doch genau dazu da, dass Messages auflaufen dürfen,
Genau, aber nicht bis zum App-Crash, sondern nur bis zu einem bestimmten Limit.
So etwas macht absolut Sinn in meinen Augen.
* bzw. durch die TryXxx-Methoden ist sogar keine Thread-Blockade durch eine Synchronisationsprimitive nötig, sondern es kann eine alternative Ausführungspfad gewählt werden.
mfG Gü
Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.
"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"
Naja, dann muss man auch lesen, was ich geschrieben hab.
Wenn ein Buffer in ein Szenario kommt, dass er Größen wie eine OutOfMemory-Exception kommt, dann ist ein InMemory-Szenario halt auch völlig deplatziert.
Um sowas vorzubeugen gibt es persistierende Queues.
Und nein, einen Producer zu blockieren ist kein adäquatives Mittel.
Ich wüsste nicht wieso. Es macht auch keinen Sinn.
Das ist nicht notwendig, wenn die Queue die richtige Technologie hat.
Dafür ist sie da.
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code
Hallo Abt,
also gut ich fasse zusammen: Wenn du "Nein" sagst, sowie dass es keinen Sinn macht und du nicht weißt wieso, dann gilt automatisch deine Antwort die auch gelesen werden soll. Super! Da ist mir die Zeit zu schade um zu diskutieren.
Nur noch so viel: hast du dir einmal überlegt wo überall ein Producer/Consumer eingesetzt wird und wieviele davon ohne Limitierung auskommen? Vermutlich schon...wozu frag ich überhaupt.
Hallo zusammen,
es gibt etliche Implementierung eines Buffers für ein Producer/Consumer-Szenario die eine Limitierung / Bounded Capacity haben und das aus gutem Grund, denn es mag verschiedene Anwendungsfälle geben, in denen eine maximale Bufferkapazität nicht überschritten werden darf. BlockingCollection<T>
ist so ein Beispiel für einen In-Memory Buffer der in .net dabei ist.
Zitat von: Erzeuger-Verbraucher-Problem
Ist die Aufnahmekapazität der Datenstruktur beschränkt, so soll die Zugriffsregelung ferner verhindern, dass ein erzeugender Prozess auf die Datenstruktur zugreift, wenn die Aufnahmekapazität der Datenstruktur bereits ausgeschöpft ist.
Es gibt aber auch Anwendungsfälle, bei denen ein Producer nicht blockiert werden darf. Auch hierfür gibt es gute Gründe, wie z.B. dass periodische Abtastwerte nicht "verschluckt" werden dürfen.
Für diese Fälle hat Abt passende Möglichkeiten genannt.
Hallo BlackMatrix,
ich konnte bei Stackoverflow leider noch keine passende Antwort
Kannst du dann wenigstens die andere Frage verlinken?
mfG Gü
Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.
"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"
ich konnte bei Stackoverflow leider noch keine passende Antwort
Kannst du dann wenigstens die andere Frage verlinken?
Kann ich 😃
TPL Dataflow LinkTo TransformBlock is very slow
Hatte den Code oben mal mit Logeinträgen versehen:
Resultat:
Es dauert eben bis zu 50 Sekunden ein fertiges Ergebnis des I/O Blocks an den Waitblock übergeben wird.
2018-03-21 15:37:03.6352|DEBUG|Starter.StackoverFlowTest|Exit I/O block 29...|
2018-03-21 15:37:56.9446|DEBUG|Starter.StackoverFlowTest|Enter wait block 29...|
Meine eigentliche Aussage wird irgendwie nicht beachtet:
Wenn die BlockingCollection künstlich blockiert werden muss - weil der Speicher sonst platzt - ist denn dann ein InMemory-Verfahren überhaupt nocht der richtige weg und kann das nicht ausgelagert werden?
Wenn die Datenbank nicht mehr in den Speicher passt, dann lagern wir das doch auch aus.
Wieso nicht denn auch mit Buffern? Das ist Alltag (auch wenn nur ein kleiner Bestandteil) in verteilten Architekturen.
Es ist was völlig anderes, wenn das InMemory-Verfahren nicht ersetzt werden kann.
@BlackMatrix: bitte keine externen Bildhoster.
[Hinweis] Wie poste ich richtig? 6.1 Bilder als Dateianhang hochladen
Ich versteh trotzdem Dein Problem nicht.
Es hatte sich so angehört (für mich), dass Deine Anwendung 50 Sekunden nichts macht.
Jetzt geht es Dir einfach nur um die Reihenfolge der Tasks, oder wie?
Es gibt keine Garantie in Deinem Code, dass nach einem Exit sofort ein Enter kommt.
Die Reihenfolge des Task Schedulers kannst Du hier gar nicht in der Form vorgeben - ist ja auch nicht sinn der Sache.
Oder um was geht es Dir? Du sprichst ja von langsam. Langsam ist es ja nicht, nur nicht in der Reihenfolge, wie Du es Dir vorstellst - oder?
Ohne den Link auf das Pastebin der Logs ist Deine Ansicht, was passiert, völlig verfälscht mit dem, was Du suggerierst. 😉
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code
Ich entschuldige mich, wenn ich mich falsch ausgedrückt habe.
Mir geht es halt darum, dass es mit der LinkTo-Variante zu lange dauert, bis die eigentliche Message/Daten in dem 2. Block bzw. Waitblock ankommen/entern. Anstatt eine fertige Message direkt an den WaitBlock weiterzuleiten, gibt es scheinbar einen sehr niedrig priorisierten Task, der dann mit einmal alle fertig bearbeiteten Messages des I/O-Blocks in den WaitBlock "schiebt". Weiß jemand wann das ausgeführt wird?
Die Reihenfolge ist mir egal, es ist mir aber wichtig, dass die Messages nicht so blockweise transferiert werden. (Nein, es ist nicht wegen des fixen Delays wie bei Stackoverflow gedacht wird). Mit dem direkten Aufruf der Post-Methode kann ich dieses Problem umgehen, aber scheint mir wie gesagt ein Hack zu sein.
OMG! Lass mich raten mit ExecutionDataflowBlockOptions.EnsureOrdered ist das Problem behoben -.-
Wieso ist das standardmäßig true -.-
Also anhand der Dokumentation und der Beschreibung von Dataflow: Degree of Parallelism hätte ich jetzt auch nicht dieses Verhalten erwartet.
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code
Mit Setzen auf false ist das Problem damit behoben.
Widerspricht das nicht dem ganzen Konzept, wenn das standardmäßig true ist? Erst kann ich die Parallelisierung mittels MaxDegreeOfParallelism hochschrauben und dann warte ich doch auf den langsamsten Task?
Hallo BlackMatrix,
kennst du das Whitepaper Introduction to TPL Dataflow? Vllt. hilft das fürs Verständnis warum und wieso manche Einstellungen standardmäßig so sind.
mfG Gü
Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.
"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"