Laden...

Parallel.For: Initalisierung des ThreadLocalState mehr als einmal - aber warum?

Erstellt von tonka vor 13 Jahren Letzter Beitrag vor 13 Jahren 1.716 Views
tonka Themenstarter:in
373 Beiträge seit 2006
vor 13 Jahren
Parallel.For: Initalisierung des ThreadLocalState mehr als einmal - aber warum?

Hy@all,

ich habe ein Problem bei der Initalisuerung des ThreadLocalStates.

Ich möchte pro Thread einen Clone eines Objekt ablegen. Nun habe ich einfache eine Init-Methode für den ThreadLocalState geschrieben, die diesen Clone erstellt und im localState zu Verfügung stellt. Nach dem Testen habe ich gesehen, das die Init-Methode öfters als die Anzahl der Threads aufgerufen wird - aber warum? Ich dachte bis jetzt das diese nur bei der Thread-Erstellung aufgerufen wird.

Hier der Testcode:


Parallel.For<double>(0, size, () => InitThreadLocalState(), (k, loop, localstate) =>
            {
                Vector3D r = new Vector3D(1, 0, 0);
                return localstate;

            },
            (localstate) =>
            {
                lock (localobj) { Sum += localstate; } }
                    );

Wenn diese Verhalten normal ist, wie kann ich dann meine Init-Methode nur zur Threaderstellung aufrufen?

MfG
Tonka

tonka Themenstarter:in
373 Beiträge seit 2006
vor 13 Jahren

Mittlerweile habe ich herausgefunden, das die "Parallel.For" einige Thread-Zyklen den LocalState mitspeichert und nach "Hausnummer 10" die localstates zusammenzählt, egal wieweit die Schleife ist, und dann wird der LocalState neu initialisiert.

Somit stellt sich für mich die Frage, wie kann ich einem Thread einmal ein Objekt mitgeben, das den ganzen Lebenszyklus des Thread hindurch nicht verändert wird. Das Objekt muss zur Erstellung des Thread erzeugt und beim Zerstören des Threads zerstört werden => Somit ist der LocalState dafür nicht geeignet! Außerdem darf das Objekt nicht geshared werden!

Weiß hier irgendwer wie man soetwas realisieren kann?

5.742 Beiträge seit 2007
vor 13 Jahren

Das Objekt muss zur Erstellung des Thread erzeugt und beim Zerstören des Threads zerstört werden

Sicher, dass du das Objekt nicht auch zerstören willst, nachdem alle Aufgaben abgearbeitet sind?

Weiß hier irgendwer wie man soetwas realisieren kann?

Dann bleibt dir vielleicht noch die Kombination aus mit ThreadStaticAttribute markierten WeakReferencen und einer statsischen Liste:
Prüfe jedes Mal (evtl. auch in dem Initialisierungscallback; dann ist ja egal, wie oft es aufgerufen wird), ob das entsprechende Feld null ist. Wenn ja, erstellst du es und speicherst es in der Liste.
Am Ende musst du dann nur alle Elemente in der Liste disposen bzw. die Liste nullen, damit die Objekte tatsächlich eingesammelt werden können.
Ohne die Verwendung einer Liste kann es vorkommen, dass die Objekte evtl. schon vom GC eingesammelt werden, obwohl der entsprechende Thread nochmal aktiv werden könnte - die WeakReferencen verhindern hingegen, dass ein Thread die Objekte länger als nötig am Leben erhält (wenn er z.B. wieder in den ThreadPool zurück kehrt).

Alternativ kannst du vielleicht auch eine Art Pool implementieren?

tonka Themenstarter:in
373 Beiträge seit 2006
vor 13 Jahren

Hallo winSharp93,

Danke für deine Antwort.

Sicher, dass du das Objekt nicht auch zerstören willst, nachdem alle Aufgaben abgearbeitet sind?

Das ist fürm mich eigentlich ein das selbe - ich geh mal davon aus das du meinst wann es von GC zerstört wird, oder?

Dann bleibt dir vielleicht noch die Kombination aus mit ThreadStaticAttribute markierten WeakReferencen und einer statsischen Liste:
Prüfe jedes Mal (evtl. auch in dem Initialisierungscallback; dann ist ja egal, wie oft es aufgerufen wird), ob das entsprechende Feld null ist. Wenn ja, erstellst du es und speicherst es in der Liste.
Am Ende musst du dann nur alle Elemente in der Liste disposen bzw. die Liste nullen, damit die Objekte tatsächlich eingesammelt werden können.
Ohne die Verwendung einer Liste kann es vorkommen, dass die Objekte evtl. schon vom GC eingesammelt werden, obwohl der entsprechende Thread nochmal aktiv werden könnte - die WeakReferencen verhindern hingegen, dass ein Thread die Objekte länger als nötig am Leben erhält (wenn er z.B. wieder in den ThreadPool zurück kehrt).

Das klingt sehr gut, wusste nicht das es soetwas gibt. Jedoch habe ich hier Code-technisch einige Probleme das umzusetzt, da mir nicht ganz klar ist wo genau ich im Code das platzieren soll.

Hier ist mein momentaner TestCode (vielleicht könntest du in korrigieren):


//LocalState.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ParallelDataAccessTest.Data
{
    public class LocalState
    {
        [ThreadStatic()]// <= meinst du das hier
        public readonly Scene3D scene = null;

        public LocalState(Scene3D s)
        {
            scene = s;
        }
    }
}


    public class Tracer
    {
        public Scene3D scene = null; // dieses Object wird von ausen zugewiesen und muss pro Thread geklont werden
        public List<Scene3D> scenes = null; // meintest du das mit der Liste???

        public LocalState InitThreadLocalState()
        {
            Console.WriteLine("Init threadlocalstate ThreadID= " + Thread.CurrentThread.ManagedThreadId.ToString());

// hier ist mir leider nicht ganz klar, wie ich das von dir beschrieben Verhalten implimentieren soll!!!
           
            LocalState loci = new LocalState();
            return loci;
        }

        public void Start()
        {
            Stopwatch watch = new Stopwatch();
            Int32 size = 1000;
            LocalState Sum = new LocalState();           

            scenes = new List<Scene3D>();

            watch.Restart();
            Object localobj = new object();
            Parallel.For<LocalState>(0, size, () => InitThreadLocalState(), (k, loop, localstate) =>
            {
                localstate += Trace();
                Console.WriteLine("ThreadID= " + Thread.CurrentThread.ManagedThreadId.ToString() + " k= " +k.ToString());
                return localstate;

            },
            (localstate) =>
            {
                lock (localobj) { Sum += localstate; } }
                    );
            watch.Stop();

            Console.WriteLine("zw= " + Sum.ToString());
            Console.WriteLine("time= " + watch.Elapsed.ToString());
        }

Was soll ich genau ion der Liste speichern, die LocalStates, oder meine Klon-Objekte? Soetwas ähnliches habe ich schon probiert und habe dann eine OutOfMemoery-Exception bekommen.

Was bringt mir eigentlich die WeakReferencen, denn was macht es für einen Sinn, das der GC das Objekt terminieren kann obwohl es noch in Verwendung ist???

Alternativ kannst du vielleicht auch eine Art Pool implementieren?

Das habe ich mir auch schon gedacht, würde aber lieber Parallel.For benutzten, da hier angeblich die Performance besser ist!

MfG

5.742 Beiträge seit 2007
vor 13 Jahren

Das ist fürm mich eigentlich ein das selbe - ich geh mal davon aus das du meinst wann es von GC zerstört wird, oder

Nein, nicht ganz.
Die Parallel-Klasse verwendet intern den ThreadPool, um die Aufgabe auf mehrere Threads zu verteilen.
Und dieser lässt Threads nicht sofort sterben, wenn sie ihre Arbeit getan haben - stattdessen werden sie nur "schlafen gelegt", um evtl. später nochmal zum Einsatz zu kommen.
Ein Thread kann also deutlich länger leben, als die Ausführung einer ihm zugeteilten Aufgabe beansprucht.
Somit würde auch ein in einem mit ThreadStatic markierten Feld gespeichertes Objekt deutlich länger leben, als man eigentlich will.

Das klingt sehr gut, wusste nicht das es soetwas gibt. Jedoch habe ich hier Code-technisch einige Probleme das umzusetzt, da mir nicht ganz klar ist wo genau ich im Code das platzieren soll.

Ein kleines Problem von ThreadStatic ist, dass man es nur auf statische Felder anwenden kann.
Das sollte insofern aber kein Problem darstellen, da ein Thread zu einer Zeit auch immer nur eine Aufgabe bearbeitet - somit sollten auch keine Inkonsistenzen entstehen.

Die Liste und die WeakReferencen haben eigentlich nur einen Sinn (habe ich schon versucht, im ersten Beitrag zu erklären):
Der Wert eines ThreadStatic Fields kann erst vom GC aufegräumt werden, wenn der entsprechende Thread endgültig terminiert.
Da der Threac aber - wie oben beschrieben - allerdings evtl. "nur schlafen geht" und evtl. deutlich länger lebt, könnte auf diese Weise ein Speicherleck entstehen.
Somit muss man eine WekaReference verwenden, die dafür sorgt, dass das entsprechende Objekt trotzdem eingesammelt werden kann.
Daraus folgt dann allerdings, dass es auch schon zu früh eingesammelt werden kann (nämlich: Obwohl der Thread es später eigentlich doch noch verwenden sollte).
Daher muss man es zusätzlich nochmal irgendwo anders speichern. Hier bietet sich eine Liste an.

Das habe ich mir auch schon gedacht, würde aber lieber Parallel.For benutzten, da hier angeblich die Performance besser ist!

Da missverstehst du mich:
Ich meinte, dass es evtl. sinnvoll ist, nicht pauschal für jeden Thread ein Objekt zu erstellen, sondern, dass sich jeder Thread ein solches Objekt erzeugt und, wenn er es nicht mehr braucht, in z.B. eine Liste legt. Von dort kann es dann der nächste Thread abholen.
Somit muss das entsprechende Objekt nicht andauernd neu erstellt werden.

Allerdings ist es sehr stark von dem tatsächlichen Einsatzgebiet abhängig, wie genau man verfährt.
Wie lange dauert denn ein Schleifendurchlauf? Länger als eine Sekunde?
Kann man diese Scene3D vielleicht doch irgendwie an mehrere Threads verteilen, solange nicht mehrere gleichzeitig darauf zugreifen?

5.742 Beiträge seit 2007
vor 13 Jahren

So - ich habe mal ein kleines Beispiel gebastelt:
"obj" ist analog zu deiner "Szene3D" das Objekt, das es für jeden Thread nur einmal geben soll.

Einmal mit ThreadStatic:


class Program
{
    [ThreadStatic]
    private static WeakReference _obj;

    static void Main(string[] args)
    {
        List<object> objects = new List<object>();

        Parallel.For(0, 100, () =>
        {
            if (_obj == null)
            {
                object obj = new object(); //Hier wird das entsprechende Objekt angelegt
                objects.Add(obj); //Objekt muss in Liste gespeichert werden, damit es am Leben bleibt
                _obj = new WeakReference(obj);
            }

            Debug.Assert(_obj.IsAlive);

            return _obj.Target;
        }, (i, state, data) =>
        {
            object obj = data;
            Console.WriteLine("Thread: {0}; Objekt: {1}", Thread.CurrentThread.ManagedThreadId, obj.GetHashCode());

            int result = i * i * i * i;

            return data;
        }, o => { });

        objects.Clear(); //Jetzt können die Objekte eingesammelt werden

        Console.ReadKey();

    }
}

Und einmal die Pool-Variante:


static void Main(string[] args)
{
    ConcurrentStack<object> objects = new ConcurrentStack<object>();

    Parallel.For(0, 100, i =>
    {
        object obj;
        if (!objects.TryPop(out obj))
            obj = new object(); //Hier wird ein Objekt angelegt (aber nur, wenn kein "freies" vorhanden ist)

        Console.WriteLine("Thread: {0}; Objekt: {1}", Thread.CurrentThread.ManagedThreadId, obj.GetHashCode());

        int result = i * i * i * i;

        objects.Push(obj); //Hier wird das Objekt wieder abgegeben - an den nächsten der es "will"
    });


    Console.ReadKey();
}

Aber Achtung: die beiden Varianten sind nicht equivalent!
Beide lassen sich auch bestimmt noch optimieren (und soagar kombinieren).

Leider verfälscht das "Console.WriteLine" das Ergebnis (führt locking durch), sodass man die Unterschiede nicht sofort sieht:
Bei ersterer Variante hat jeder Thread sein eigenes Objekt, bei letzterer Variante hat jeder Thread zu jedem Zeitpunkt sein eigenes Objekt.

[EDIT]
Die Überladung von Parallel.For zu verwenden, dass man Objekte damit bereitstellt, die jeweils für einen Thread gültig sind, ist ein wenig zweckentfremdet. Daher wird das Initialisierungscallback wohl auch evtl. mehrfach aufgerufen.
[/EDIT]

tonka Themenstarter:in
373 Beiträge seit 2006
vor 13 Jahren

So hast du das also gemeint. Ich habe kurz deinen Code in mein Schema übersetzt, jetzt bekomme ich aber ein NullReferenceException

NullReferenceException not set to an instance of an object
Object reference not set to an instance of an object.

Ich gehe mal davon aus dass das die Meldung daher kommt, das der GC das object terminiert hat, oder?


//Tracer.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;

namespace ParallelObjectTest
{
    public class Tracer
    {
        [ThreadStatic()]
        private static WeakReference _WeakRefScene = null;

        public readonly Scene3D scene = null;

        public Tracer(Scene3D Scene)
        {
            this.scene = Scene;
        }

        public Scene3D CloneScene(Scene3D SceneToClone)
        {
            // hier muss noch Clone implimentiert werden
            return null;
        }

        public void Start()
        {
            List<object> objects = new List<object>();
            object lockobj = new object();

            double sum = 0.0;

            Parallel.For<double>(0, 10000, () =>
                {
                    Scene3D s = null;
                    if (_WeakRefScene == null)
                    {
                        s = CloneScene(this.scene);
                        objects.Add(s);
                        _WeakRefScene = new WeakReference(s);                        
                        Console.WriteLine("neues scene-objekt für thread: " + Thread.CurrentThread.ManagedThreadId.ToString());
                    }
                    else
                    {
                        Console.WriteLine("bestehendes objekt fuer thread: " + Thread.CurrentThread.ManagedThreadId.ToString());
                        s = (Scene3D)_WeakRefScene.Target;
                    }
                    s.Intersect();// hier passiert die NullException
                    
                    return 0.0;
                },
                (k, state, localstate) =>
                {
                    Console.WriteLine("k= " + k);
                    Thread.Sleep(1);
                    return 0.5;
                },
                (localstate) =>
                {
                    lock (lockobj)
                    {
                        sum += localstate;
                    }
                });

            objects.Clear();

        }
    }
}


//Scene3D.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ParallelObjectTest
{
    public class Scene3D
    {
        public bool Intersect()
        {
            // Funktion für Verschneidung für der Dreiecke
            return true;
        }
    }
}


//Program.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ParallelObjectTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Scene3D scene = new Scene3D();
            Tracer tracer = new Tracer(scene);
            tracer.Start();
            Console.WriteLine("press key for EXIT");
            Console.ReadLine();
        }
    }
}


Für mich kommt nur diese Variante in Frage, da das Scene3D Object dauernd arbeitet, somit bringt mir das zurücklegen nichts.

5.742 Beiträge seit 2007
vor 13 Jahren

Ich habe kurz deinen Code in mein Schema übersetzt, jetzt bekomme ich aber ein NullReferenceException

Warum wohl 😁

Tipp:
// hier muss noch Clone implimentiert werden
return null;

tonka Themenstarter:in
373 Beiträge seit 2006
vor 13 Jahren

Oooooh man, bin ich blind gggggg

Was lernt man daraus, man sollte NIE zu spät noch programmieren 😉

Probier das morgen bei großen Tracer aus!