Laden...

Abarbeitung von Jobs in Queue oder in List<T>? Da Jobs abgebrochen werden können sollen

Erstellt von jacques vor 9 Jahren Letzter Beitrag vor 9 Jahren 4.424 Views
J
jacques Themenstarter:in
18 Beiträge seit 2013
vor 9 Jahren
Abarbeitung von Jobs in Queue oder in List<T>? Da Jobs abgebrochen werden können sollen

Hallo,

ich habe eine grundsätzliche Frage zur sequentiellen Abarbeitung von Jobs. Angenommen ich hätte Objekte der Klasse MyJob,
und ein solches Objekt kann den Status "offen", "wartet auf Bearbeitung", "wird bearbeitet" und "beendet" haben.
An sich erfolgt die Abarbeitung nach dem FIFO Prinzip. Von dem her würde sich meines Erachtens der Einsatz einer Queue anbieten:
Das heißt, wenn das Objekt noch nicht "enqueued" wurde, hat es den Status "offen", sobald es enqueued wurde nimmt es den Status
"wartet auf Bearbeitung" an, und sobald es "dequeued" wird, wird es bearbeitet, etc.
Nun habe ich aber das Problem, dass ein Job der sich bereits in der Queue befindet evtl. abgebrochen werden soll.
Was ich bisher gelesen habe sind Queues aber nicht darauf ausgelegt den Zugriff auf das x-te Element zuzulassen. Das geht ja an der Grundidee einer Queue vorbei.
Ein anderer Ansatz wäre die Verwendung einer List<MyJob>. Hier kann ich das Queue Verhalten selbst implementieren, und
ggf. auf das x-te Element zugreifen. Gibt es für meinen Anwendungsfall evtl. noch einen eleganteren Ansatz den ich übersehe?
Eng mit meiner Frage verknüpft wäre auch die Frage nach dem simultanen Zugriff auf Queues bzw. Lists.
Muss man bei beiden Ansätzen mit locks arbeiten, sollte die Gefahr bestehen dass gleichzeitig hinzugefügt / gelöscht wird?
Ich wäre für jeden Tipp dankbar.

Vielen Dank.

1.346 Beiträge seit 2008
vor 9 Jahren

Hallo jacques,

Auch eine List ist nicht besonders gut dafür geeignet Elemente aus der mitte zu entfernen. Die List Klasse ist intern als Array implementiert, so das wenn man ein Element löscht jeweils der komplette rest umkopiert werden muss.

Ein Kandidat der das Effizienter kann ist die Linked List. Diese ist besonders effizient beim Einfügen/Löschen an beliebigen Positionen.

Nachteil: Die Linked list ist nichtthread safe. hier muss man also mit locks

An threadsicheren Auflistungen gibt es die Klassen in System.Collections.Concurrent, aber leider keine Linked List.

Lieben Gruß,

pdelvo

16.842 Beiträge seit 2008
vor 9 Jahren

Das ist das Musterbeispiel für TPL Pipelines und die BlockingCollection - mit der Ausnahme, dass es hier eine fixe Reihenfolge gibt.

1.346 Beiträge seit 2008
vor 9 Jahren

BlockingCollection wäre super, wenn nicht die Anforderung wäre Operationen die sich in der Auflistung befinden zwichenzeitlich abzubrechen, was diese leider nicht unterstützt (meines Wissens nach)

16.842 Beiträge seit 2008
vor 9 Jahren

Das kommt drauf an, wie die Anforderung im Speziellen aussieht. Aber mit TryTake kann man ja Elemente aus der Queue (das steckt hinter einer BlockingCollection) auch wieder entfernen.
Das ist also in meinen Augen kein Hindernis. Eine Priorisierung müsste man jedoch selbst entwickeln, falls das gefordert ist (was oft der Fall ist).

Aber das hab ich zB in QuickIO implementiert und könnte man 1:1 kopieren.

W
955 Beiträge seit 2010
vor 9 Jahren

... außerdem kann man das mit einer CancellationTokenSource implementieren. Jobs, die Status "cancelled" haben gelten als completed und werden damit nicht mehr berechnet.

J
jacques Themenstarter:in
18 Beiträge seit 2013
vor 9 Jahren

Hallo zusammen,

vielen Dank für die Tipps!
Für mein Szenario hatte ich mir in der Zwischenzeit folgendes überlegt:


...
private Queue<MyJob> jobQueue = new Queue<MyJob>();
private System.Timers.Timer jobTimer;
private IList<int> jobsToAbort;
...
jobTimer = new System.Timers.Timer(2000);
jobTimer.Elapsed += jobTimer_Elapsed;
...

public void addJob(MyJob newJob)
{
     jobQueue.enqueue(newJob);
}

public void abortJobByID(int jobID)
{
     jobsToAbort.add(jobID);
}

//Checks queue for new Jobs forever... 
private void jobTimer_Elapsed(object sender, ElapsedEventArgs e)
{
       while ((currentProcessCount < maxConcurrentProcesses) && (jobQueue.Count > 0))
       {
           //Get the next job from the queue
           MyJob currentJob = (MyJob)jobQueue.Dequeue();
                
           //check for Abort (semi pseudo code)
           if(jobsToAbort.Contains(currentJob.jobID))
           {
                 //don't do anything with this job
                 currentJob = null;
                 
                 jobsToAbort.RemoveItem(currentJob.jobID)
                 continue;
           }

           Thread jobThread = new Thread(delegate() { startJob(currentJob); });
           jobThread.start();
           currentProcessCount++;
       }
}

private void startJob(MyJob myJob)
{
     try
     {
           myJob.startProcess();
           myJob.waitUntilFinished();
           currentProcessCount--;
     }
     catch(Exception ex)
     {
           currentProcessCount--;
           //handle error...
     }
}


So in etwa hatte ich mir das vorgestellt... aber wahrscheinlich ist der Ansatz zu trivial als dass er zum Erfolg führen könnte 😃
Ich lese mich jetzt mal in Pipelines und BlockingCollections ein. Auf den ersten Blick schien mir der Pipelines Einsatz für mein Szenario ein wenig zu komplex zu sein, aber das weiß ich ja dann in ein paar Stunden hoffentlich besser.
Nochmals vielen Dank an alle!

16.842 Beiträge seit 2008
vor 9 Jahren

Pipelining ist kinder-leicht. Deines ist so für Produktiv-Code jedenfalls nicht zu gebrauchen 😉
zB Thread-Sicherheit, Timer.. und Co.

J
jacques Themenstarter:in
18 Beiträge seit 2013
vor 9 Jahren

Hallo,

ich habe jetzt mal eine Version mit Hilfe von Pipelines erstellt (siehe Anhang).
Ich weiß nicht ob die UI Updates so beim Ändern des Status eines Jobs so ausgeführt werden sollten, aber prinzipiell funzt es:


        private void startPipeLineTest()
        {
            int BufferSize = 32;

            var buffer1 = new BlockingCollection<MyJob>(BufferSize);
            var buffer2 = new BlockingCollection<MyJob>(BufferSize);

            var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);

            // Stage 1: Generate and show x new MyJob objects
            var stage1 = f.StartNew(() => GenerateJobs(buffer1, 10));

            // Stage 2: Start each Job
            var stage2 = f.StartNew(() => StartJobs(buffer1, buffer2));
        }

        //Generate new MyJobs
        private void GenerateJobs(BlockingCollection<MyJob> output, int jobCount)
        {
            IList<MyJob> myJobList = new List<MyJob>();

            for (int i = 0; i < jobCount; i++)
            {
                MyJob newJob = new MyJob(i, this);
                addItem(newJob.JobID, newJob.JobStatus); 
                myJobList.Add(newJob);
            }

            try
            {
                foreach(MyJob newJob in myJobList)
                {            
                    output.Add(newJob);
                }
            }
            finally
            {
                myJobList.Clear();
                myJobList = null;
                output.CompleteAdding();
            }
        }

        private void addItem(int JobID, string JobStatus)
        {
            if (InvokeRequired)
            {
                // We're not in the UI thread, so we need to call BeginInvoke  
                BeginInvoke(new AddItemDelegate(addItem), new object[] { JobID, JobStatus });
                return;
            }

            ListViewItem lvi = new ListViewItem(new string[] { JobID.ToString(), JobStatus });
            job_lv.Items.Add(lvi);
        }

        private void StartJobs(BlockingCollection<MyJob> input, BlockingCollection<MyJob> output)
        {
            try
            {
                foreach (MyJob currentJob in input.GetConsumingEnumerable())
                {
                    currentJob.startJob();
                    output.Add(currentJob);
                }
            }
            finally
            {
                output.CompleteAdding();
            }
        }

        public void updateListView(int JobID, string JobStatus)
        {
            if (InvokeRequired)
            {
                // We're not in the UI thread, so we need to call BeginInvoke  
                BeginInvoke(new UpdateItemDelegate(updateListView), new object[] { JobID, JobStatus });
                return;
            }
            // Must be on the UI thread if we've got this far  

            //Iterate through all ListViewItems
            foreach (ListViewItem currenItem in job_lv.Items)
            {
                //find the one with the specific ID we are looking for
                int currentCopyJobID = Int32.Parse(currenItem.SubItems[0].Text);

                if (currentCopyJobID == JobID)
                {
                    //We found the right listViewItem

                    //update the status
                    currenItem.SubItems[1].Text = JobStatus;
                    return;
                }
            }
        }

Nach dem Starten der Jobs durchlaufen diese nacheinander die verschiedenen Status und aktualisieren diesen auch in der UI.
Nun hätte ich noch zwei Fragen dazu:

  1. Ist es möglich, dass maximal 2 Jobs gleichzeitig gestartet werden?
  2. Wie erreicht man, dass die Pipeline immer auf neue Jobs wartet und mit der Abarbeitung anfängt sobald neue Jobs eingehen?

Vielen Dank.

J
jacques Themenstarter:in
18 Beiträge seit 2013
vor 9 Jahren

Hallo,

ein kleines Update von mir:
Anhand von Abts Beispiel C# – Warteschlangen – Die BlockingCollection (vielen Dank an Abt!) habe ich mein Projekt ein wenig umgestellt und nutze nun eine BlockingCollection (siehe Anhang).
Das funktioniert soweit super.
Nur das Abbrechen der Abarbeitung ist derzeit so konzipiert, dass alle bereits laufenden Jobs zu Ende laufen sollen:


public void Abschliessen( )
{
    // Abbrechen anfordern
    TokenSource.Cancel( );

    // Warten, bis sich alle Tasks beendet haben
    Task.WaitAll( _tasks);
}

Könnte man den bereits laufenden Tasks ebenfalls mitteilen, dass sie abbrechen sollen, bzw. auch einzelnen Jobs die noch nicht ausgeführt werden?
TryTake hilft mir in dem Fall meines Erachtens nicht viel wenn ich einen Job haben möchte der sich in der "Mitte" der BlockingCollection befindet.

Vielen Dank!

49.485 Beiträge seit 2005
vor 9 Jahren

Hallo jacques,

wenn ein Job abgebrochen werden soll, bevor er aus der Queue herausgeholt wurde, ist es doch ganz einfach. Mach eine Property Cancel in de Job-Klasse und frag diese direkt nach dem Herausholen ab. Wenn sie true ist, starte die Task erst gar nicht. Fertig.

Wenn du im gleichen gelockten Abschnitt, in dem die Property angefragt wird, dem Job z.B. durch eine weitere Property mitteilst, dass er nun gestartet wurde, kannst du beim vielleicht später stattfindenden Setzen der Property Cancel das Abbrechen der Task anstoßen.

Das Abbrechen von Task ist eine Standard-Aufgabe, für die es Standard-Löungen gibt. Sichtwort ist Cancelation. Siehe z.B. die schon genannte CancellationTokenSource-Klasse.

herbivore

16.842 Beiträge seit 2008
vor 9 Jahren

Das wird sogar in meinem Beispielcode gemacht...
Ansonsten ist das Warten ein Muss, da eben alles asynchron ist.

J
jacques Themenstarter:in
18 Beiträge seit 2013
vor 9 Jahren

Hallo herbivore,

danke für Deinen Tipp.
Was ich nicht so ganz verstehe ist, wie ich einem Job in meiner Queue ein Property setzen kann bevor ich den Job aus der Queue heraushole.
Alternativ würde mir nur einfallen, dass ich die IDs der abzubrechenden Jobs separat verwalte und beim Herausholen eines Jobs aus der Queue mit dieser Liste abgleiche um dann zu entscheiden ob ein Job abgebrochen werden soll oder nicht. Sorry dass ich mich bei dem Thema so anstelle.

Viele Grüße

49.485 Beiträge seit 2005
vor 9 Jahren

Hallo jacques,

irgendwoher muss du ja wissen, welcher Job abgebrochen werden soll. Typischerweise wirst du eine Referenz auf das betreffende Job-Objekt haben. Ob diese dann zusätzlich in der Queue eingetragen ist, spielt doch keine Rolle.

Wenn dir das ein Problem macht, dann reden wir womöglich über [Hinweis] Wie poste ich richtig? Punkt 1.1.1.

herbivore

J
jacques Themenstarter:in
18 Beiträge seit 2013
vor 9 Jahren

Hallo zusammen,

vielen Dank für die hilfreichen Tipps.

Viele Grüße