Beschreibung:
Um einem einmal gestarteten Worker-Thread (weitere) Arbeitsaufträge zu übermitteln, kann man die folgende kleine Klasse SyncQueue <T> verwenden. Da man den Typ T frei wählen kann, kann man selbst festlegen, welche Informationen man an den Worker-Thread übermittelt und kann dafür natürlich auch eine eigene, passende Arbeitsauftragsklasse definieren.
Der Worker-Thread braucht dann nur in seiner Hauptschleife das jeweils nächste Element aus der Queue zu holen (Dequeue) und die Verarbeitung durchzuführen. Steht kein Element in der Queue, blockiert der Thread automatisch solange, bis ein neues Element eingestellt wird.
Jeder andere Thread kann zu einem beliebigen Zeitpunkt mit Enqueue einen Auftrag einstellen. Mehrere Worker-Threads können sich bei Bedarf eine SyncQueue <T> teilen. Es dürfen also gleichzeitig beliebig viele Threads Elemente in die Queue einstellen und/oder Elemente aus der Queue herausholen.
Synchronisiert ist natürlich nur die Queue selbst, nicht die Elemente in der Queue. Deshalb sollte ein Thread, nachdem er einen Arbeitsauftrag in die Queue eingestellt hat, nicht mehr auf den Arbeitsauftrag zugreifen. Auch alle anderen Zugriffe der (Worker-)Threads auf gemeinsame Daten müssen weiterhin selbst synchronisiert werden.
Gegenüber der synchronisierten Queue aus dem .NET Framework, die man mit Queue.Synchronized erhält, gibt es zwei wichtige Unterschiede:*Queue.Synchronized liefert eine untypisierte Collection, wogegen meine Queue generisch und damit typsicher ist. *Queue.Dequeue wirft eine Exception, wenn die Queue leer ist, statt wie meine Queue still und brav zu warten, bis wieder ein Element mit Enqueue hinzugefügt wurde. Meine Queue macht also genau das, was man für eine Job-Queue braucht.
Gegenüber ThreadPool.QueueUserWorkItem hat man eine bessere Kontrolle darüber, wie viele Worker-Threads die Anfragen bearbeiten sollen. So kann bei ThreadPool.QueueUserWorkItem "die Anzahl der Arbeitsthreads [...] nicht auf eine Anzahl festgelegt werden, die kleiner als die Anzahl der Prozessoren im Computer ist." Außerdem kann man der SyncQueue die Arbeitsaufträge in Form beliebiger Objekte, aber trotzdem typsicher übergeben.
Hier der Code inkl. einem kleinen Test-Programm:
// --- schnipp ---
using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;
//*****************************************************************************
public class SyncQueue <T>
{
//--------------------------------------------------------------------------
private Queue <T> _q = new Queue <T> ();
//==========================================================================
// <summary>Trägt einen Eintrag (ohne zu warten) ein</summary>
public void Enqueue (T tItem)
{
lock (this) {
_q.Enqueue (tItem);
Monitor.Pulse (this);
Debug.WriteLine (Thread.CurrentThread.Name + " Enqueue ==> " + _q.Count);
}
}
//==========================================================================
// <summary>
// Holt einen Eintrag aus der Queue heraus und wartet dabei nötigenfalls
// solange bis wieder ein Eintrag vorhanden ist.
// </summary>
public T Dequeue ()
{
lock (this) {
while (_q.Count == 0) {
Debug.WriteLine (Thread.CurrentThread.Name + " Wait");
Monitor.Wait (this);
}
Debug.WriteLine (Thread.CurrentThread.Name + " Dequeue ==> " + (_q.Count - 1));
return _q.Dequeue ();
}
}
}
// --- schnapp ---
//*****************************************************************************
static class App
{
//-------------------------------------------------------------------------
private static SyncQueue <int> _sq = new SyncQueue <int> ();
private static Random _rand = new Random ();
//==========================================================================
public static void Main (string [] astrArg)
{
Debug.Listeners.Clear ();
Debug.Listeners.Add (new ConsoleTraceListener ());
for (int i = 0; i < 4; ++i) {
Thread t = new Thread (RunDequeue);
t.Name = "D" + i;
t.Start ();
}
for (int i = 0; i < 4; ++i) {
Thread t = new Thread (RunEnqueue);
t.Name = "E" + i;
t.Start ();
}
}
//==========================================================================
private static void RunEnqueue ()
{
for (int i = 1 ; i <= 25; ++i) {
_sq.Enqueue (100);
Thread.Sleep (_rand.Next (1000));
}
}
//==========================================================================
private static void RunDequeue ()
{
for (int i = 1 ; i <= 25; ++i) {
_sq.Dequeue ();
Thread.Sleep (_rand.Next (995));
}
}
}
PS: Der Code der SyncQueue ist schon länger unter Applikation mit Warteschlange [SyncQueue<T>-Klasse] verfügbar. Da er oft verlinkt wurde, habe ich mich jetzt entschlossen, ihn auch hier bei den Snippets einzustellen.
PPS: Ich verwende absichtlich und entgegen der Empfehlungen von Microsoft **lock (this)**
, auch wenn das zugegeben erst bei Erweiterungen der Queue (Sichtwort: Enumerator) seine vollen Vorteile ausspielen würde. Die Gründe habe ich in Warum lock (lock_object) statt lock (eigentliches_object)? und den beiden verlinkten Threads dargelegt.
PPPS: Ab .NET 4.0 gibt es die BlockingCollection<T>-Klasse aus dem Namespace System.Collections.Concurrent. Diese bietet mit den Methoden Add und Take ein mit meiner SyncQueue vergleichbares Verhalten.
PPPPS: Es ist relativ einfach, die SyncQueue bei Bedarf auf Arbeitsaufträge mit Prioritäten zu erweitern. Im Grunde muss man nur die interne Queue _q
durch eine (eigene) Queue ersetzen, die immer den Arbeitsauftrag mit der höchsten Priorität zuerst zurück gibt. Dann muss man an der Synchronisierung überhaupt nichts ändern. Die Priorität eines Arbeitsauftrags kann man als zusätzlichen Parameter der Enqueue-Methode übergeben, die sie einfach an die Enqueue-Methode der inneren Queue weiterreicht.
PPPPS: Es gibt eine Variante der SyncQueue von Corpsegrinder mit maximalem Füllstand, zu finden im Das Programmier-Spiel: nette Übungsaufgaben für zwischendurch, die ich für korrekt implementiert halte.
Schlagwörter: 1000 Worte, Queue, Schlange, Warteschlange, Jobqueue, Auflistung, Collection, synchronisiert, synchronisierte, synchronisieren, synchronize, synchronized, synchronizing, SynchronizedQueue, threadsafety, thread safety, threadsafe, thread safe, threadsicher, thread-sicher Job, Jobs, Auftrag, Aufträge, Arbeitsauftrag, Arbeitsaufträge, Methode, Methoden, Thread, Threads, BackgroundWorker, ThreadPool, ausführen, starten, übergeben, Worker, Reader, Writer, read, write, add, get, remove
thx für das snippet, hab ich Monitor.Pulse bischen kennengelernt.
Aber wie beendet man das Spielchen eigentlich richtig?
Ich denk, es ist ja nur ausnahmsweise so, dass der Consumer-Thread weiß, wie viele Item abzuholen sind.
Habe ich testweise mal das Enqueuing etwas verkleinert, und natürlich hing die Anwendung dann mit 4 Threads, die am Waiten waren.
Der frühe Apfel fängt den Wurm.
Hallo ErfinderDesRades,
wie man den Worker am besten beendet, hängt von den Rahmenbedingungen ab (z.B. davon wie lange die Operationen, die der Worker ausführt, im Schnitt bzw. maximal dauern) und davon, was man erreichen will (z.B. ob diese Operationen bis zum Ende ausgeführt werden sollen oder abgebrochen werden können).
Wenn der Worker erst beendet werden soll, wenn alle anstehenden Operationen abgearbeitet sind, dann reicht es eine Quit-Nachricht in die Queue zu stellen, auf die hin sich der Worker selbst beendet (return).
Wenn der Worker nach Abschluss der gerade laufenden Operation beendet werden soll, würde es an sich reichen in der Hauptschleife ein Abbruch-Flag (volatile boolsche Variable) zu prüfen, das (die) von außen gesetzt wird. Aber da es sein kann, dass gerade keine Operation läuft, muss man zusätzlich noch eine Quit- oder Dummy-Nachricht in die Queue stellen.
Wenn es mehrere Worker gibt, die sich dieselbe Queue teilen, können diese zwar alle dasselbe Abbruch-Flag verwenden, aber man muss für jeden Worker eine Quit- bzw. Dummy-Nachricht schicken.
Wenn die laufende Operation so schnell wie möglich abgebrochen werden soll, muss es wohl IsBackground=true und/oder Thread.Abort sein. Mit allen Gefahren, die das mit sich bringt, eine laufende Operation mittendrin zu killen. Aber das wurde ja schon an anderer Stelle im Forum ausführlich besprochen.
herbivore
Hi!
Also ich hab das mal probiert, mit mehreren Workern, die dann beenden, wenn sie eine Quit-Nachricht auffinden.
Da muß man gleich unterscheiden: bei Klassen kann man einfach null
als Quit-Nachricht verwenden, bei structs
muß man intern eine Queue<Nullable<T>>
wrappern, scheint mir.
Bei diesem Teil liefert Dequeue()
beim Antreffen einer Quit-Nachricht einerseits null
zurück, stellt es aber zusätzlich in die Queue zurück, damit die anderen Threads die Quit-Nachricht ebenfalls erhalten.
Außerdem habich gefunden, dass Monitor.Wait()
nicht in eine While-Schleife muß, sondern if
reicht, weil der Thread wird ja mit Sicherheit nur dann gepulst, wenn ein Item drin ist.
Diese Implementation scheint sogar mit dem gegebenen Szenario fertigzuwerden: nämlich mehrere Producer und mehrere Consumer-Threads.
Mehrere Producer, das führt hier dazu, dass mehrere Quit-Nachrichten eingestellt werden, was eigentlich ja Quatsch ist.
Diese Queue reagiert darauf, indem die Quit-Nachrichten übersprungen werden, falls noch weitere (ordentliche) Items drin sind.
Es kann aber auch vorkommen, dass alle Consumer beendet sind, und dann danach noch ein Producer ein Item einstellt, und das bleibt dann unverarbeitet.
Also lieber nur einen Producer-Thread nehmen.
public class SyncQueue<T>where T:struct {
private Queue<Nullable<T>> _q = new Queue<Nullable<T>>();
public void Finish() {
lock(this) {
_q.Enqueue(null);
Monitor.Pulse(this);
}
}
public void Enqueue(T itm) {
lock(this) {
Debug.WriteLine(string.Format("{0}: _q[{1}] = {2}",
Thread.CurrentThread.Name, _q.Count, itm));
_q.Enqueue(itm);
Monitor.Pulse(this);
}
}
public Nullable<T> Dequeue() {
lock(this) {
//if(_q.Count == 0) Monitor.Wait(this); Achtung: if reicht nicht, siehe Beitrag von herbivore
while(_q.Count == 0) Monitor.Wait(this);
var itm = _q.Dequeue();
while(!itm.HasValue) {
if(_q.Count == 0) {
_q.Enqueue(null);
Monitor.Pulse(this);
return null;
}
itm = _q.Dequeue();
}
return itm;
}
}
}
static class App {
private static SyncQueue<int> _sq = new SyncQueue<int>();
private static Queue _doneJobs = Queue.Synchronized(new Queue());
private static Random _rand = new Random();
public static void Main(string[] astrArg) {
Debug.Listeners.Clear();
Debug.Listeners.Add(new ConsoleTraceListener());
for(int i = 0; i < 4; ++i) {
Thread t = new Thread(RunDequeue);
t.Name = "D" + i;
t.Start();
}
for(int i = 0; i < 4; ++i) {
Thread t = new Thread(RunEnqueue);
t.Name = "E" + i;
t.Start();
}
}
private static void RunEnqueue() {
var offs = int.Parse(Thread.CurrentThread.Name.Substring(1)) * 25;
for(int i = 0; i < 25; ++i) {
var n = offs + i;
_sq.Enqueue(n);
Thread.Sleep(_rand.Next(1000));
}
_sq.Finish();
}
private static void RunDequeue() {
var itm = _sq.Dequeue();
while(itm.HasValue) {
Debug.WriteLine(Thread.CurrentThread.Name +": dequeue "+ itm.Value);
Thread.Sleep(_rand.Next(995));
_doneJobs.Enqueue(itm.Value);
itm = _sq.Dequeue();
}
Debug.WriteLine(Thread.CurrentThread.Name + " finished" + _doneJobs.Count);
}
}
Der frühe Apfel fängt den Wurm.
Hallo ErfinderDesRades,
Außerdem habich gefunden, dass
Monitor.Wait()
nicht in eine While-Schleife muß, sondernif
reicht, weil der Thread wird ja mit Sicherheit nur dann gepulst, wenn ein Item drin ist.
nein, ein if reicht nicht, es muss ein while sein. Folgendes Szenario ist möglich:
1.Zwei Consumer, ein Producer, Queue leer, keiner im lock, keiner im Wait
1.Consumer1 Dequeue: betritt die Sperre (bei lock
) und läuft - da die Queue leer ist - aufs Wait (womit die Sperre wieder frei ist)
1.ProducerA Enqueue: betritt die Sperre (bei lock
), aber dann erfolgt zufällig ein Threadwechsel, bevor das Pulse erreicht wurde
1.Consumer2 Dequeue: versucht die Sperre zu betreten, aber da ist ja ProducerA drin, also wartet Consumer2 auf die Sperre (bei lock
)
1.ProducerA läuft weiter und führt das Pulse aus.
1.Dadurch wartet Consumer1 wieder auf die Sperre (bei Wait
), stellt sich also hinter dem schon wartenden Consumer2 an.
1.ProducerA verlässt die Sperre
1.Ein Item in der Queue, Consumer2 und Consumer1 warten in dieser Reihenfolge auf die Sperre
1.Consumer2 betritt die Sperre (bei lock
), holt das gerade eingestellt Element ab und verlässt die Sperre
1.Queue leer, Consumer1 wartet auf die Speere
1.Consumer1 betritt die Sperre (bei Wait
), obwohl die Queue schon wieder leer ist.
Dass durch das Pulse nicht automatisch der Thread, der gepulst wurde, als nächster die Sperre erhält, steht sogar explizit in der :rtfm: Doku. Der gepulste Thread muss also damit rechnen, dass ihm ein anderer Thread "sein" Element weggeschnappt hat. Das ist zwar unwahrscheinlich und daher durch Testen schwer zu finden, aber möglich. Deshalb die while-Schleife.
Aus vergleichbaren Gründen muss das Pulse im Enqueue auch jedesmal ausgeführt werden und nicht etwa nur wenn _q.Count == 1. Sonst könnte es durch einen ungünstigen Threadwechsel passieren, dass zwei Enqueues direkt hintereinander durchgeführt werden, obwohl schon zwei Dequeues wegen leerer Queue warten. Beim zweitem Enqueue wäre dann aber _q.Count == 2 und wenn es nun kein Pulse auslösen würde, würde das zweite Dequeue weiter warten, obwohl ein Element in der Queue vorhanden ist.
Das zeigt auch sehr schön, dass richtige Synchronisierung kein leichtes Geschäft ist (in Programmier-Spiel gibt es weitere Beispiele für Fallstricke). Für den Verwender ist es daher ein großer Vorteil, dass die korrekte Synchronisierung schon in der Klasse SyncQueue steckt und von dieser gekapselt wird, so dass er sich nicht mehr selbst darum kümmern muss.
bei Klassen kann man einfach null als Quit-Nachricht verwenden
Null als Quit-Nachricht zu verwenden, kann man durchaus machen. Aber ich denke, es sollte nicht die Queue festlegen, dass Null als Quit-Nachricht anzusehen ist, sondern eine Vereinbarung zwischen Producern und Consumern sein.
Diese Queue reagiert darauf, indem die Quit-Nachrichten übersprungen werden, falls noch weitere (ordentliche) Items drin sind.
Es kann zwar Fälle geben, in denen es sinnvoll ist, die Quit-Nachricht ans Ende zu verschieben, aber ich halte es nicht für ausgemacht, dass das immer oder auch nur meistens das Gewünschte ist. Deshalb lasse ich meine Queue so wie sie ist. 🙂
herbivore
Hallo,
Zitat:
Diese Queue reagiert darauf, indem die Quit-Nachrichten übersprungen werden, falls noch weitere (ordentliche) Items drin sind.
Es kann zwar Fälle geben, in denen es sinnvoll ist, die Quit-Nachricht ans Ende zu verschieben, aber ich halte es nicht für ausgemacht, dass das immer oder auch nur meistens das Gewünschte ist. Deshalb lasse ich meine Queue so wie sie ist. 😃
Für diesen Fall könnte man SyncQueue
um die Eigenschaft IsEmpty
erweitern, welche bei einem Quit-Objekt prüft, ob die Queue leer ist. Falls nicht, wird das Objekt einfach wieder in die Queue gelegt, d.h. hier wären die Consumer wieder in der Pflicht.
Nobody is perfect. I'm sad, i'm not nobody 🙁
Ich habe bei QuickIO.NET - Performante Dateioperationen ebenfalls mit null als Quit-Nachrichten getestet und es war enorm fehleranfällig (vor allem beim re-enqueue von Elementen (Second-Try-Ansatz)).
Genau dieses Vorgehen hab ich nämlich in C# – Warteschlangen – Die BlockingCollection gezeigt - das muss ich noch verbessern.
Das Abfragen von Flags ist definitiv die bessere Variante. Im QuickIO-Source sieht das so aus (Methode: StartConsuming)
Im Prinzip ist in QuickIO eine sehr performante und komfortable Queue umgesetzt; könnte man vielleicht extrahieren und ebenfalls hier als Snippet bereit stellen.
- performance is a feature -
Microsoft MVP - @Website - @AzureStuttgart - github.com/BenjaminAbt - Sustainable Code