Laden...

Async Task-Synchronisation

Erstellt von Palladin007 vor einem Jahr Letzter Beitrag vor einem Jahr 865 Views
Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr
Async Task-Synchronisation

'n Abend,

ich habe das leidige Thema lock in einem async Kontext - was nicht geht und mir ist auch klar, warum es nicht geht.
Meine Frage zielt eher darauf ab, wie man das umgehen könnte.

Für ein generelles lock (nur einer gleichzeitig) kann man ja ein AsyncSemaphore nehmen.
Allerdings scheitert das, wenn der selbe Task EnterAsync nochmal aufruft, bevor der Releaser disposed wurde, genau das kann/darf bei mir aber vorkommen.
Meine Überlegung ist daher, dass ein Objekt als Reservierungs-ID verwendet wird und nicht (wie bei lock/Monitor) als ID für die reservierte Ressource.

Ich versuche es Mal stichpunktartig zu beschreiben:* Enter mit Obj1 ⇒ darf weiter arbeiten.

  • Enter mit Obj1 ⇒ darf weiter arbeiten.
  • Enter mit Obj2 ⇒ muss warten.
  • Exit mit Obj1
  • Obj2 muss immer noch warten.
  • Exit mit Obj1
  • Obj2 darf weiter arbeiten.
  • Exit mit Obj2
  • Enter mit Obj3 ⇒ darf weiter arbeiten.

Und so weiter, wobei aber egal ist, aus welchem Thread oder Task das aufgerufen wird, es zählt nur, welches Objekt übergeben wurde.
Oder vielleicht wird es mit etwas Code klarer:


var taskCount = 5;
var counter = new CountdownEvent(taskCount);
var myLock = new AsyncLock();

for (int i = 0; i < taskCount; i++)
{
    var id = i;

    _ = Task.Run(async () =>
    {
        object obj = id;

        try
        {
            using (await myLock.EnterAsync(obj, default))
            {
                for (int i = 0; i < 3; i++)
                {
                    using (await myLock.EnterAsync(obj, default))
                    {
                        Thread.Sleep(10);
                        Console.Write(id);
                    }
                }

                Console.WriteLine();
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            throw;
        }
        finally
        {
            counter.Signal();
        }
    });
}

counter.Wait();

Die Ausgabe soll sowas sein wie:


111
444
000
222
333

Gibt es dafür schon etwas?

Mein erster Entwurf, der mit dem TestCode auch läuft:


public class AsyncLock
{
    private readonly object _syncRoot = new();
    private TaskCompletionSource? _waiter;
    private WeakReference? _obj;
    private uint _count;

    public Task<Releaser> EnterAsync(object obj, CancellationToken cancellationToken = default)
        => EnterAsync(obj, Timeout.InfiniteTimeSpan, cancellationToken);

    public async Task<Releaser> EnterAsync(object obj, TimeSpan timeout, CancellationToken cancellationToken = default)
    {
        var startTimestamp = DateTime.Now;

        while (true)
        {
            if (TryEnter(obj, out var releaser, out var waitTask))
                return releaser;

            if (timeout > Timeout.InfiniteTimeSpan)
                timeout -= DateTime.Now - startTimestamp;

            await waitTask
                .WaitAsync(timeout, cancellationToken)
                .ConfigureAwait(false);
        }
    }

    private bool TryEnter(object obj, [MaybeNullWhen(false)] out Releaser release, [MaybeNullWhen(true)] out Task waitTask)
    {
        lock (_syncRoot)
        {
            if (ValidateAccess(obj))
            {
                if (_obj?.IsAlive == false)
                    _count = 0;

                _count++;
                _obj = new(obj);

                release = new(this, obj);
                waitTask = null;
                return true;
            }
            else
            {
                _waiter ??= new();

                release = default;
                waitTask = _waiter.Task;
                return false;
            }
        }
    }

    public void Exit(object obj)
    {
        lock (_syncRoot)
        {
            if (ValidateAccess(obj))
            {
                if (_count == 1)
                {
                    _count = 0;
                    _obj = null;

                    Interlocked.Exchange(ref _waiter, null)?.TrySetResult();
                }
                else
                    _count--;
            }
        }
    }

    private bool ValidateAccess(object obj)
    {
        return _obj is null or { IsAlive: false }
            || ReferenceEquals(_obj.Target, obj);

    }

    public readonly struct Releaser : IDisposable
    {
        private readonly AsyncLock _owner;
        private readonly object _obj;

        internal Releaser(AsyncLock owner, object obj)
        {
            ArgumentNullException.ThrowIfNull(owner, nameof(owner));
            ArgumentNullException.ThrowIfNull(obj, nameof(obj));

            _owner = owner;
            _obj = obj;
        }

        public void Dispose()
            => _owner?.Exit(_obj);
    }
}

Mir wäre eine bestehende und gut getestete Lösung lieber, aber wenn es die nicht gibt:
Seht Ihr da mögliche Probleme? Ich habe nicht viel Erfahrung auf dem Gebiet
Wie könnte man das Ding nennen? 😁 AsyncLock oder AsyncMonitor finde ich nicht passend, da es sich ja eigentlich sehr anders verhält, als lock bzw. Monitor.

=========================

Die Lösung/Umsetzung des Problems: https://github.com/loop8ack/AsyncTicketLock

16.806 Beiträge seit 2008
vor einem Jahr

Kurze Hintergrundfrage aufgrund der Flow-Beschreibung: soll das eine State Machine werden?

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Nein, keine State Machine.
Die Flow-Beschreibung war nur ein Versuch, es halbwegs verständlich zu beschreiben, das ist bei so Themen ja immer etwas schwerer 🙂

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007,

erstell dir eine eigene Datenstrukture welche für die Synchronisation zuständig ist.
Inter kann diese entweder via SemaphoreSlim arbeiten od. (was mir besser gefällt) mit Channel<T>s.

Bei der Variante mit den Channel<T> gibts die "Kapazitäten" vor und wenns 0 wird, so heißt es warten.
Da das alles in deiner Datenstruktur gekapselt ist, kannst du je nach übergebenen Objekt weiter entscheiden.

Der Vorteil von Channel<T> ist auch, dass diese ValueTask-basiert sind und somit bei vielen Vorgängen wenn keine "Contention" vorliegt performanter sind.
Grundsätzlich lässt sich das aber auch alles per Semaphore(Slim) erledigen.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

erstell dir eine eigene Datenstrukture welche für die Synchronisation zuständig ist.

Also gibt's das noch nicht - das habe ich befürchtet.

Bei der Variante mit den Channel<T> gibts die "Kapazitäten" vor und wenns 0 wird, so heißt es warten.

Das wäre ja das Verhalten vom Semaphore, was ich aber gerade nicht haben möchte - deshalb ist auch eine auf Semaphore aufbauende Implementierung keine Lösung für mich.
Ich brauche das umgekehrte Verhalten, sodass 0 quasi ein Freifahrtschein für alle ist und der erste, der "gewinnt", darf danach exklusiv arbeiten, bis es wieder auf 0 steht.

Die Channel schaue ich mir aber trotzdem Mal an, ich hab nur grob was dazu gelesen und bin noch nicht tiefer eingestiegen - könnte sich noch lohnen.

Was sagst Du denn zu meinem Versuch? Das verhält sich bisher ja so, wie ich es brauche, nur wäre mir eine fertige Lösung lieber, vor allem, weil sich da so leicht schwer zu entdeckende Fehler einschleichen.
Man sagt dir ja einiges an Erfahrung und Detail-Wissen nach, über etwas Feedback/Kritik würde ich mich daher freuen 🙂
Alternativ kann ich auch unter "Code-Reviews" eine neue Frage auf machen (oder einer von euch spaltet ab), wenn es keine mehr oder weniger passende vorhandene Lösung gibt, ist es da vielleicht besser aufgehoben.

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007 (Teil 1),

habs für dich mit ein paar Code-Beispielen mehr versehen (musste den Beitrag aufteilen, da zuviel Zeichen vorhanden sind) 😉

das umgekehrte Verhalten, sodass 0 quasi ein Freifahrtschein für alle ist und der erste, der "gewinnt", darf danach exklusiv arbeiten, bis es wieder auf 0 steht.

So hab ich das eigentlich eh verstanden.
Vllt. hab ich mich im vorigen Kommentar unglücklich ausgedrückt, denn das von dir zitierte war nur für die Channel als Ersatz für die Semaphore gedacht.

Hier als Beispiel:


using System.Threading.Channels;

const int N = 3;

using CancellationTokenSource cts                       = new();
MySemaphareMadeWithChannels mySemaphareMadeWithChannels = new(2);
List<Task> tasks                                        = new(capacity: 3);

Print("Start");

tasks.Add(Do(cts.Token, ConsoleColor.Cyan));
tasks.Add(Do(cts.Token, ConsoleColor.Magenta));
tasks.Add(Do(cts.Token, ConsoleColor.White));

await Task.WhenAll(tasks);
Print("Done");
//-----------------------------------------------------------------------------
async Task Do(CancellationToken cancellationToken, ConsoleColor color)
{
    // Ohne Sync-Context und mit dem Standard-TaskScheduler wird die Continuation
    // einfach dem ThreadPool hinzugefügt.
    await Task.Yield();

    Print("before signal", color, ConsoleColor.Red);
    await mySemaphareMadeWithChannels.WaitAsync(cancellationToken);
    Print("behind signal", color, ConsoleColor.Green);

    // Für Demo ist Thread.Sleep geeigneter, da mit Task.Delay der Ausführungs-Thread
    // gewechselt werden kann.
    //await Task.Delay(Random.Shared.Next(750, 1000), cancellationToken);
    Thread.Sleep(Random.Shared.Next(750, 1000));

    mySemaphareMadeWithChannels.Release();
    Print("released signal", color, ConsoleColor.Yellow);
}
//-----------------------------------------------------------------------------
void Print(string message, ConsoleColor? colorForId = null, ConsoleColor? colorForMessage = null)
{
    if (colorForId is not null && colorForMessage is not null)
    {
        lock (tasks)
        {
            Console.ForegroundColor = colorForId.Value;
            Console.Write($"Thread-Id: {Environment.CurrentManagedThreadId,2}, ");
            Console.ForegroundColor = colorForMessage.Value;
            Console.WriteLine($"free spaces: {mySemaphareMadeWithChannels.FreeSpaces}, {message}");
            Console.ResetColor();
        }
    }
    else
    {
        Console.WriteLine($"Thread-Id: {Environment.CurrentManagedThreadId,2}, free spaces: {mySemaphareMadeWithChannels.FreeSpaces}, {message}");
    }
}
//-----------------------------------------------------------------------------
public sealed class MySemaphareMadeWithChannels
{
    private readonly Channel<int> _channel;
    //-------------------------------------------------------------------------
    public MySemaphareMadeWithChannels(int taskCount)
    {
        _channel = Channel.CreateBounded<int>(new BoundedChannelOptions(taskCount));

        for (int i = 0; i < taskCount; ++i)
        {
            _channel.Writer.TryWrite(42);
        }
    }
    //-------------------------------------------------------------------------
    public int FreeSpaces => _channel.Reader.Count;
    //-------------------------------------------------------------------------
    public ValueTask WaitAsync(CancellationToken cancellationToken = default)
    {
        ValueTask<int> readTask = _channel.Reader.ReadAsync(cancellationToken);

        // Falls synchron fertig, so kann die async-Statemachine, welche C# erstellt, vermieden werden
        if (readTask.IsCompleted)
        {
            // Der ValueTask kann aus einer IValueTaskSource erstellt worden sein. Diese muss
            // zurückgesetzt werden und das geschieht per Zugriff auf Result.
            // Mit GetAwaiter().GetResult() geschieht das Gleich, nur effizienter.
            readTask.GetAwaiter().GetResult();
            return ValueTask.CompletedTask;
        }

        return Core(readTask);

        static async ValueTask Core(ValueTask<int> task)
        {
            await task.ConfigureAwait(false);
        }
    }
    //-------------------------------------------------------------------------
    public void Release() => _channel.Writer.TryWrite(42);
}

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

6.911 Beiträge seit 2009
vor einem Jahr

(Teil 2)

Zurück zu deinem Problem: das lässt sich mit Channel ganz gut abbilden -- (für mich) der Vorteil dabei ist, dass ValueTasks im Spiel sind und v.a. bei Channels sind diese mittels IValueTaskSource umgesetzt, so dass diese Tasks selbst keine zusätzlichen Allokationen haben (beim await die AsyncStateMachineBox, welche die lokalen Variablen, etc. hält, gibt es aber auch hier, geht nicht anders).
Nur dass du gedanklich das Problem da auch invertieren musst. Etwas fertiges kann ich nicht nennen, aber ein Beispiel (mit etlichen Kommentaren).
Hab die Klasse ReEntryLock genannt, keine Ahnung ob es bessere Bezeichnungen gibt -- Namensgebung ist ziemlich das schwerste 😉


//#define PRINT_ID

using System.Diagnostics;
using System.Threading.Channels;

const int N                       = 5;
using CancellationTokenSource cts = new();
ReEntryLock reEntryLock           = new();
List<Task> tasks                  = new(capacity: N);

Console.WriteLine("Start");

tasks.Add(Do(new UserState(1, ConsoleColor.Cyan)   , cts.Token));
tasks.Add(Do(new UserState(2, ConsoleColor.Magenta), cts.Token));
tasks.Add(Do(new UserState(3, ConsoleColor.White)  , cts.Token));

await Task.WhenAll(tasks);
Console.WriteLine("Done");
//-----------------------------------------------------------------------------
async Task Do(UserState state, CancellationToken cancellationToken)
{
    await Task.Yield();

    Print(state, ConsoleColor.Red, "before signal");
    using (await reEntryLock.EnterAsync(state, cancellationToken))
    {
        Print(state, ConsoleColor.Green, "behind signal");

        for (int i = 0; i < 3; ++i)
        {
#if PRINT_ID
            using (await reEntryLock.EnterAsync(state, cancellationToken))
            {
                Thread.Sleep(Random.Shared.Next(750, 1000));        // für Demo kein Task.Delay
                Console.Write(state.Id);
            }
#else
            Print(state, ConsoleColor.Red, "before signal");
            using (await reEntryLock.EnterAsync(state, cancellationToken))
            {
                Print(state, ConsoleColor.Green, "behind signal");
                Thread.Sleep(Random.Shared.Next(750, 1000));        // für Demo kein Task.Delay
            }
            Print(state, ConsoleColor.Yellow, "released signal");
#endif
        }
#if PRINT_ID
        Console.WriteLine();
#endif
    }
    Print(state, ConsoleColor.Yellow, "released signal");
}
//-----------------------------------------------------------------------------
[DebuggerStepThrough]
void Print(UserState state, ConsoleColor color, string message)
{
#if !PRINT_ID
    lock (tasks)
    {
        Console.ForegroundColor = state.Color;
        Console.Write($"Thread-Id: {Environment.CurrentManagedThreadId}, Id: {state.Id}, ");
        Console.ForegroundColor = color;
        Console.WriteLine($"{message}");
        Console.ResetColor();
    }
#endif
}
//-----------------------------------------------------------------------------
public record UserState(int Id, ConsoleColor Color);

Im nächsten Teil der eigentliche Teil, auf den ich schon die ganze Zeit hinaus will...

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

6.911 Beiträge seit 2009
vor einem Jahr

(Teil 3)


//-----------------------------------------------------------------------------
// Keine Ahnung ob der Name treffend ist.
public sealed class ReEntryLock
{
    // Ein Channel ist hier praktisch, da er die interne Synchronisation und Signalisierung
    // der Tasks übernimmt.
    // Hier verwendet wir einen Channel mit Kapazität 1, da nur mit dem gleichen userState-Objekt
    // gleichzeitig eingetreten werden darf.
    private readonly Channel<State> _channel = Channel.CreateBounded<State>(new BoundedChannelOptions(1));

    // Damit wir wenig Allokationen haben, cachen wir das State-Objekt.
    private readonly State _state = new();
    //-------------------------------------------------------------------------
    // Hier hab ich es EnterAsync genennt, da wegen mehrfachem Eintritt das besser passt
    // als WaitAsync.
    public async ValueTask<Releaser> EnterAsync(object userState, CancellationToken cancellationToken = default)
    {
        while (true)
        {
            // Dieser lock könnte fein-granularer geschrieben werden. Aber dann müsste bei jeder
            // _channel.{Reader,Writer}.TryXYZ-Methode auf das Ergebnis reagiert werden. Somit
            // würde der Code nur unnötig verkompliziert. Daher lieber einfacher (und auch sicherer)
            // mit dem "äußeren" lock.
            lock (_state)
            {
                // Wir schauen ob ein Element im Channel ist.
                // Falls ja, so wurde der Lock schon betreten, daher müssen wir schauen
                // ob es das gleiche userState-Objekt ist um erneut betreten zu dürfen.
                if (_channel.Reader.TryPeek(out State? stateInChannel))
                {
                    if (ReferenceEquals(userState, stateInChannel.UserState))
                    {
                        // Es ist das gleiche Objekt, daher Zähler erhöhen und eintreten lassen.
                        stateInChannel.IncrementEnteredCount();
                        return new Releaser(this, userState);
                    }
                }
                // Es ist kein Element im Channel, daher eintreten lassen.
                else
                {
                    bool written = _channel.Writer.TryWrite(_state);
                    Debug.Assert(written);

                    _state.Reset(userState);
                    return new Releaser(this, userState);
                }
            }

            // Bleibt nichts anderes übrig als zu warten bis wieder Platz im Channel ist.
            await _channel.Writer.WaitToWriteAsync(cancellationToken).ConfigureAwait(false);
        }
    }
    //-------------------------------------------------------------------------
    public void Release(object userState)
    {
        Debug.Assert(_channel.Reader.Count > 0);

        // Wir schauen ob das userState-Objekt zum Objekt im Channel passt.
        // Falls ja, so dekrementieren wir den Zähler vom State im Channel.
        // Ist der Zähler 0, so wird der Channel geleert und ist somit wieder frei.
        //
        // - Ist der Channel leer, so passiert nichts.
        // - Ist es ein anderes userState-Objekt so passiert nichts.
        if (_channel.Reader.TryPeek(out State? stateInChannel))
        {
            lock (_state)
            {
                if (ReferenceEquals(userState, stateInChannel.UserState)
                && stateInChannel.DecrementEnteredCount() == 0)
                {
                    bool freedChannel = _channel.Reader.TryRead(out State? stateRead);

                    // Nur zur Sicherheit und da ich Debug.Assert normal gerne als "Verträge"
                    // Code verwende, da sonst so gut wie keine Kommentare vorhanden sind.
                    Debug.Assert(freedChannel);
                    Debug.Assert(ReferenceEquals(stateRead!.UserState, stateInChannel.UserState));
                }
            }
        }
    }
    //-------------------------------------------------------------------------
    // Eine struct wäre schön, geht hier aber nicht, da sie mutable sein muss
    // für das Inkrementieren vom Zähler, aber beim Schreiben in den Channel
    // wird eine Kopie (wegen Werttyp) erstellt und das Inkrementieren ist sinnlos.
    // Daher muss es ein Referenztyp sein.
    [DebuggerDisplay("EnteredCount: {_enteredCount,nq}, UserState: {UserState,nq}")]
    private class State
    {
        private int _enteredCount;
        public object? UserState { get; private set; }
        //---------------------------------------------------------------------
        public void Reset(object userState)
        {
            _enteredCount  = 1;
            this.UserState = userState;
        }
        //---------------------------------------------------------------------
        public int IncrementEnteredCount() => ++_enteredCount;
        public int DecrementEnteredCount() => --_enteredCount;
    }
    //-------------------------------------------------------------------------
    // Ist nicht nötig, aber so kann per using eleganter gearbeitet werden,
    // anstatt manuell Release aufrufen zu müssen (das ist in deinem Code ja auch schon so).
    public readonly struct Releaser : IDisposable
    {
        private readonly ReEntryLock _parent;
        private readonly object      _userState;
        //---------------------------------------------------------------------
        internal Releaser(ReEntryLock parent, object userState) => (_parent, _userState) = (parent, userState);
        //---------------------------------------------------------------------
        public void Dispose() => _parent?.Release(_userState);
    }
}

Natürlich ginge das ohne Channel auch, aber der Channel nimmt uns sehr viel komplizierte Arbeit ab.
Die Tasks, welche warten müssen, könnten auch per TaskCompletionSource<T>, IValueTaskSource, etc. erstellt werden. Aber wie denen dann signalisiert wird dass es weiter geht ist doch recht aufwändig -- v.a. wenns robust und sicher gemacht werden soll. Je nachdem wo die Anwendung laufen soll, kann auch noch der ExecutionContext usw. ins Spiel kommen. Das will ich mir und dir ersparen, daher sind die Channels verwendet worden.

  
[MaybeNullWhen(false)] out Releaser release  
  

Releaser ist ein Werttype, kann also nicht null sein, daher ist diese Annotation umsonst.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Ok, damit hab ich nicht gerechnet 😁
Ich brauche (neben der Arbeit) noch etwas Zeit, um mich da Mal ausführlich einzuarbeiten, besonders da ich die Channel bisher auch nur vom Namen kenne.
Gerade bei so einem schwierigen Thema übernehme ich ungern Code, ohne jede Zeile erklären zu können, teils aus Interesse, teils weil ich ihn warten muss ^^

Bis dahin erst einmal ein riesiges Dankeschön 🙂

Releaser ist ein Werttype, kann also nicht null sein, daher ist diese Annotation umsonst.

Ich weiß ^^
Ich habe es aber bewusst rein geschrieben, um den Unterschied zum zweiten out-Parameter offensichtlich zu machen, also dass je nach Rückgabewert nur einer von beiden genutzt werden soll - und es tut ja auch nicht weh.
Die TryEnter-Methode ist auch erst zum Schluss entstanden, weil die Enter-Methode für meinen Geschmack zu groß wurde und sich an der Stelle gut aufsplitten ließ.
Ich hatte erst etwas ganz anderes vor, hab das dann aber für übertrieben befunden und bin zu der zwar etwas merkwürdigen, dafür aber einfachen out-Lösung gekommen.

Hab die Klasse ReEntryLock genannt, keine Ahnung ob es bessere Bezeichnungen gibt -- Namensgebung ist ziemlich das schwerste

Oh ja 😁
Vielleicht irgendetwas mit "Ticket"?
Die ganzen anderen Worte, die mir in den Sinn kommen, sind bereits irgendwie vergeben 😠

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007,

und es tut ja auch nicht weh

Ist aber trotzdem falsch 😉

Noch zwei Dinge zu deinem Code:

  
if (ValidateAccess(obj))  
{  
    if (_obj?.IsAlive == false)  
        _count = 0;  
  
    _count++;  
    _obj = new(obj);  
  
    release = new(this, obj);  
    waitTask = null;  
    return true;  
}  
  

Würde ich als


if (ValidateAccess(obj))
{
    if (_obj?.IsAlive == false)
    {
        _count = 1;
        _obj = new(obj);
    }
    else
    {
        _count++;
    }

    release = new(this, obj);
    waitTask = null;
    return true;
}

schreiben. Das spart die wiederholte Allokation der WeakReference und _count kann gleich auf 1 gesetzt werden.

  
else  
{  
    _waiter ??= new();  
  
    release = default;  
    waitTask = _waiter.Task;  
    return false;  
}  
  

Da ist ein (latenter) Bug. Wenn TryEnter mit einem anderen obj aufgerufen wird als bisher in der WeakReference gehalten so wird jedesmal eine neue TaskCompletionSource erstellt. Bereits von der TaskCompletionSource (TCS) erzeugte Tasks können so nie komplettiert werden und das ist Fehlverhalten.
Um das zu korrigieren würde eine Liste mit den TCS benötigt werden. Das schaut den für die Allokationen gar nciht mehr gut aus und macht es doch recht kompliziert. Daher auch die Channels, da diese das ähnliche Problem intern recht elegant, aber nicht-trivial, lösen.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Das spart die wiederholte Allokation der WeakReference und _count kann gleich auf 1 gesetzt werden.

Stimmt, danke 🙂

Da ist ein (latenter) Bug. Wenn TryEnter mit einem anderen obj aufgerufen wird als bisher in der WeakReference gehalten so wird jedesmal eine neue TaskCompletionSource erstellt. Bereits von der TaskCompletionSource (TCS) erzeugte Tasks können so nie komplettiert werden und das ist Fehlverhalten.

Müsste das nicht durch das "??=" umgangen werden?
Die TCS wird nur dann erstellt, wenn es noch keine gibt, sodass am Ende alle auf den gleichen Task warten.
Oder ich übersehe etwas, denn die AsyncSemaphore-Implementierung von Stephen T nutzt ja auch eine Queue, ich dachte nur, dass ich die nicht brauche...

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007,

Müsste das nicht durch das "??=" umgangen werden?

Ups, das hatte ich nur alt = gelesen. Sorry. Da hast du recht, das passt und wegen der Synchronisierung gibt es da auch kein Race.

auf den gleichen Task warten.
...denn die AsyncSemaphore-Implementierung ... nutzt ja auch eine Queue Task kann ja mehrfach erwartet werden, daher passt das schon. Bei ValueTask geht das i.d.R. nicht bzw. ist es im Allgemeinen besser den VT nur 1x zu erwarten.
Vllt. "korrekter" wäre es mit einer Liste (od. Queue) für die wartenden Tasks, da es im Grunde ja verschieden sind -- für jeden wartenden Aufrufer einer.

Gestern hab ich auch einen Versuch unternommen mit ValueTask (IValueTaskSource) ohne Channel. Wurde aber schnell sehr komplex, so dass das wieder verworfen wurde, da eben die Channel hier schon alles bietet was benötigt wird.

Unabhängig davon frag ich mich schon ob es nicht ein passendes Konstrukt dafür schon gibt -- v.a. mit Namen, denn es gibt ja (unabhängig von Programmiersprache) mehr od. weniger überall die gleichen Primitiven (die oft auf Dijkstra zurückgehen).

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Da hast du recht, das passt und wegen der Synchronisierung gibt es da auch kein Race.

Gut zu wissen, dass ich da richtig gedacht habe ^^

Gestern hab ich auch einen Versuch unternommen mit ValueTask (IValueTaskSource) ohne Channel. Wurde aber schnell sehr komplex, so dass das wieder verworfen wurde, da eben die Channel hier schon alles bietet was benötigt wird.

Klar, wenn die Channels sowas in der Art schon machen, dann nehme ich natürlich auch diese Lösung.
Darauf zielte meine Frage hier ja ab: Eine mehr oder weniger fertige Lösung finden, damit diese Komplexität eben nicht testen muss.

Unabhängig davon frag ich mich schon ob es nicht ein passendes Konstrukt dafür schon gibt

Naja, für einen Thread kann man das selber Verhalten ja mit dem klassischen lock erreichen - zumindest in meinem Fall.
Nur für Async geht genau das nicht, mein Gedanke mit dem anderen Verhalten soll das gleiche Verhalten wie vom lock ermöglichen.
Warum es dafür noch nichts gibt, wundert mich aber auch, das Problem sollte es ja auch in anderen Projekten geben, async Synchronisierung ist ja kein neues Thema.

Aber wenn es das wirklich nicht gibt - wovon ich jetzt mal ausgehe, wenn Du und Abt nichts kennen und wir nichts finden - dann mache ich daraus ein kleines NuGet-Package.

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Ich schaue mir gerade deinen Code an und habe eine Frage zur Release-Methode:


public void Release(object userState)
{
    Debug.Assert(_channel.Reader.Count > 0);

    // Wir schauen ob das userState-Objekt zum Objekt im Channel passt.
    // Falls ja, so dekrementieren wir den Zähler vom State im Channel.
    // Ist der Zähler 0, so wird der Channel geleert und ist somit wieder frei.
    //
    // - Ist der Channel leer, so passiert nichts.
    // - Ist es ein anderes userState-Objekt so passiert nichts.
    if (_channel.Reader.TryPeek(out State? stateInChannel))
    {
        lock (_state)
        {
            if (ReferenceEquals(userState, stateInChannel.UserState)
            && stateInChannel.DecrementEnteredCount() == 0)
            {
                bool freedChannel = _channel.Reader.TryRead(out State? stateRead);

                // Nur zur Sicherheit und da ich Debug.Assert normal gerne als "Verträge"
                // Code verwende, da sonst so gut wie keine Kommentare vorhanden sind.
                Debug.Assert(freedChannel);
                Debug.Assert(ReferenceEquals(stateRead!.UserState, stateInChannel.UserState));
            }
        }
    }
}

Ich wollte eigentlich ein Problem ansprechen, dann habe ich deine Debug.Assert-Zeilen gesehen und dass sie genau dieses Problem asserten 😁
Wäre es nicht theoretisch denkbar, dass in der Release-Methode, während sie auf das lock wartet, ein zweiter Thread den gleichen userState releasen will und danach ein dritter Thread mit EnterAsync einen neuen userState hinzufügt?
In dem Fall würde das Release vom ersten Thread den State vom dritten Thread verwerfen, was natürlich nicht sein darf.

Wäre es nicht besser, das TryPeek zwei Mal - einfach vor dem lock und einmal im lock - zu haben?
Das äußere TryPeek ist dann ein Versuch, möglichst unnötiges Warten auf das lock zu vermeiden und das innere lock ist eine Versicherung, dass es auch wirklich drin ist.
Mit deinem zweiten Debug.Assert prüfst Du ja genau diesen Fall, aber das fällt dann ja nur im Debug auf.

Mein Gedanke wäre also sowas:


public void Release(object userState)
{
    Debug.Assert(_channel.Reader.Count > 0);

    // Wir schauen ob das userState-Objekt zum Objekt im Channel passt.
    // Falls ja, so dekrementieren wir den Zähler vom State im Channel.
    // Ist der Zähler 0, so wird der Channel geleert und ist somit wieder frei.
    //
    // - Ist der Channel leer, so passiert nichts.
    // - Ist es ein anderes userState-Objekt so passiert nichts.
    if (_channel.Reader.TryPeek(out _))
    {
        lock (_state)
        {
            if (_channel.Reader.TryPeek(out State? stateInChannel))
            {
                if (ReferenceEquals(userState, stateInChannel.UserState)
                    && stateInChannel.DecrementEnteredCount() == 0)
                {
                    bool freedChannel = _channel.Reader.TryRead(out State? stateRead);

                    // Nur zur Sicherheit und da ich Debug.Assert normal gerne als "Verträge"
                    // Code verwende, da sonst so gut wie keine Kommentare vorhanden sind.
                    Debug.Assert(freedChannel);
                    Debug.Assert(ReferenceEquals(stateRead!.UserState, stateInChannel.UserState));
                }
            }
        }
    }
}

PS:
Debug.Assert habe ich bisher noch nie genutzt - keine Ahnung, warum 😁
Aber wenn ich so darüber nachdenke, gefällt mir das Konzept, auch wenn der Code damit irgendwie komisch aussieht

D
261 Beiträge seit 2015
vor einem Jahr

Ich weiß nicht ob ich dein Bedenken richtig verstanden habe, aber:

Du hast im State noch den Zähler, d.h. wenn zwei identische Objekte freigeben wollen, dann ist der Zähler auch auf 2 und muss erst auf 0 fallen, bevor ein anderes Objekt mit EnterAsync "eintreten" darf. D.h. bevor nicht jedes Objekt A den Zähler dekrementiert (und somit freigegeben) hat, kann auch kein Objekt B in den Channel kommen.

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Hmm - stimmt eigentlich.
Klar, es wäre möglich, dass der Zähler auf 1 steht und zwei Mal freigegeben werden soll, aber das ist dann ein Synchronisationsfehler des Aufrufers.
Die Klasse soll ja nur die Aufrufe mit verschiedenen Objekten synchronisieren, alles, was das gleiche Objekt hat, darf ungehindert tun und lassen, was es will

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007,

ich hatte die gleiche Überlegung wie dannoe. Dieser Fall dürfte nie eintreten, sofern Enter auch richtig implementiert ist.

Im Allgemeinen hast du aber mit dem "double check lock" recht. Schnelle Prüfung, dann im lock nochmal Prüfen um zu Schauen ob eh durch kein Race sich der Zustand verändert hat.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Dann lasse ich es drin, tut ja nicht weh und der Channel sollte denke ich performant genug sein, dass das zweite Mal nicht auffällt.

Zum Namen: TicketLock? 😁
Es verhält sich wie ein Lock (bzw. Monitor), nur nicht anhand des Threads, sondern anhand eines Ticket-Objektes

Hier der aktuelle Code.
Ich habe mir erlaubt, ihn an meinen Stil anzupassen und ein paar Funktionen zu ergänzen - ich hoffe, Du nimmst mir das nicht übel 🙂


public sealed class AsyncTicketLock
{
    private readonly Channel<State> _channel = Channel.CreateBounded<State>(1);
    private readonly State _state = new();

    public async ValueTask<Releaser> EnterAsync(object ticket, CancellationToken cancellationToken = default)
    {
        while (true)
        {
            if (TryEnter(ticket, out var releaser))
                return releaser;

            await _channel.Writer
                .WaitToWriteAsync(cancellationToken)
                .ConfigureAwait(false);
        }
    }

    // Eine TryEnter-Methode brauche ich nicht, aber ich denke, dass sie für Andere nützlich sein kann
    public bool TryEnter(object ticket, out Releaser releaser)
    {
        var success = TryEnter(ticket);

        releaser = success
            ? new(this, ticket)
            : default;

        return success;
    }

    // Auch die ist nicht notwendig, aber ich finde sie übersichtlicher, wenn ich nicht bei jedem return den out-Parameter setzen muss
    public bool TryEnter(object ticket)
    {
        lock (_state)
        {
            if (_channel.Reader.TryPeek(out State? state))
            {
                if (ReferenceEquals(ticket, state.Ticket))
                {
                    state.IncrementEnteredCount();

                    return true;
                }

                return false;
            }
            else
            {
                var written = _channel.Writer.TryWrite(_state);

                Debug.Assert(written);

                _state.Reset(ticket);

                return true;
            }
        }
    }

    // Das "Try" brauche ich auch nicht, aber ich denke, dass es für Andere nützlich sein kann
    public bool Release(object ticket)
        => Release(ticket, all: false);

    // Anderer Code kann auf diese Weise garantiert alles freigeben
    // Wenn z.B. irgendwo ein Problem auftritt, dass nicht alles Released wurde,
    // dann kann diese Methode z.B. in Dispose oder finally aufgerufen und so jeder Zugang freigegeben werden
    public bool ReleaseAll(object ticket)
        => Release(ticket, all: true);

    private bool Release(object ticket, bool all)
    {
        Debug.Assert(_channel.Reader.Count > 0);

        if (!_channel.Reader.TryPeek(out _))
            return false;

        lock (_state)
        {
            if (!_channel.Reader.TryPeek(out var state))
                return false;

            if (!ReferenceEquals(ticket, state.Ticket))
                return false;

            var count = state.DecrementEnteredCount();

            Debug.Assert(count >= 0);

            if (all || count <= 0)
            {
                var freedChannel = _channel.Reader.TryRead(out var stateRead);

                Debug.Assert(freedChannel);
                Debug.Assert(ReferenceEquals(stateRead?.Ticket, state.Ticket));
            }

            return true;
        }
    }

    public readonly struct Releaser : IDisposable
    {
        private readonly AsyncTicketLock _parent;
        private readonly object _ticket;

        internal Releaser(AsyncTicketLock parent, object ticket)
            => (_parent, _ticket) = (parent, ticket);

        public void Dispose()
            => _parent?.Release(_ticket);
    }

    [DebuggerDisplay($"EnteredCount: {{{nameof(_enteredCount)},nq}}, UserState: {{{nameof(Ticket)},nq}}")]
    private sealed class State
    {
        private int _enteredCount;
        public object? Ticket { get; private set; }

        public void Reset(object ticket)
        {
            _enteredCount = 1;

            Ticket = ticket;
        }

        public int IncrementEnteredCount() => ++_enteredCount;
        public int DecrementEnteredCount() => --_enteredCount;
    }
}

Ich bin offen gegenüber Kritik, auch zum Stil ^^

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007,

zum Namen: Ticket gefällt mir ganz gut. Statt TicketLock tendiere ich zu TicketMonitor, da Monitor mehr mit Überwachen zu tun hat als mit Verschließen. Nur so vom Gefühl her.

auch zum Stil ^^

Wenn du schon fragst: ich mag die var nicht, wenn nicht direkt ersichtlich ist welcher Typ es ist. Z.B.

  
if (TryEnter(ticket, out var releaser))  
    return releaser;  
  

Welcher Typ releaser ist erschließt sich nur aus dem Kontext, dem gefolgt werden muss. In VS mit Intellisense gehts, aber wenn nur der Text vom Code vorhanden ist kann das lästig werden.

Ähnlich bei

  
var success = TryEnter(ticket);  
  

Tipp-Ersparnis kanns nicht sein, v.a. mit Intellisense 😉
Es ist schon naheliegend dass success vom Typ bool ist, aber eindeutig lässt sich das nur beantworten wenn die Methode TryEnter bekannt ist.
Ich würde da immer bool direkt hinschreiben. Das vermeidet auch potentielle Fehler, falls (irgendwann) der Rückgabetyp von TryEnter geändert werden sollte, wie z.B. in eine Enum


public enum EnterStata
{
    Success,
    LockHeldByOtherTicket,
    GenericEnterFailure     // in Memoriem an die GDI+ Zeiten ;-)
}

Außer bei anonyment Type und vllt. Linq gibt es seit C# 9 (mit "target typed new") für mich keinen Grund var zu verwenden.

  
releaser = success  
    ? new(this, ticket)  
    : default;  
  

Auch hier würde ich gerne sehen welche Instanz da erstellt wird. Das target typed new ist nett -- und ein super Ersatzt für var -- aber es sollte damit nicht übertrieben werden.


var foo0 = new Foo();   // OK, aber mit target typed new nicht nötig
Foo foo1 = new();       // Finde ich besser, da wir von links nach rechts lesen und so der Typ sofort klar ist

Falls sonst der Typ nicht direkt klar ist, so sollte dieser unbedingt explizit angegeben werden. Außer bei, wie oben schon erwähnt, anonyment Typen (da gehts nicht anders) und ev. bei Linq (da kann es u.U. mehr verwirren als helfen).

V.a. wenn viel Code gelesen wird (und teilweise auf dem Handy) ist das ungemein praktisch.

Ansonsten passt dein Code schon. Das Aufteilen in Methoden, auch in Hinblick auf die TryEnter-Methoden, ist praktisch.

Vllt. sollte noch ein Überladung mit Timeout, wie im ganz ursprünglichen Code von dir, mit rein. Das könnte per verlinkter CancellationTokenSource durchgeführt werden, indem per CancellationTokenSource.CreateLinkedTokenSource eine neue CTS erstellt wird, welche dann mit cts.CancelAfter spätestens beim Timeout getriggert wird.
Hinweis: wenn eine CTS mit einem Timeout erstellt wurde, so muss diese unbedingt Disposed werden, da sonst die Timer-Queue (inter in .NET) wächst (memory leak).

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Ja, das Thema var, das gibt's ja häufiger, war klar, dass Du das schreibst, bei dir war kein einziges var 😁
Für mich ist das Argument, dass die Bedeutung nicht offensichtlich ist, eher ein Grund dafür, die Benennungen anzupassen, die ist mMn. wichtiger.
Außerdem sorgt das var dafür, dass eine Änderung des Typs nicht automatisch überall für Compile-Fehler führt - wobei das aber auch als Nachteil sehen könnte, je nach Standpunkt.

V.a. wenn viel Code gelesen wird (und teilweise auf dem Handy) ist das ungemein praktisch.

Auf dem Handy habe ich noch nie Code gelesen - aber die Frage "Wer tut sowas?" kann ich mir vermutlich sparen 😁

Vllt. sollte noch ein Überladung mit Timeout, wie im ganz ursprünglichen Code von dir, mit rein.

Das könnte per verlinkter CancellationTokenSource durchgeführt werden

Das habe ich mir auch überlegt und weg gelassen, da der Aufrufer das ja auch selber machen kann, ohne die Komplexität der Klasse kennen zu müssen.
Aber schaden würde es auch nicht und Du hast schon recht, es macht die Klasse irgendwie "vollständiger", auch mit dem Hintergrund, es veröffentlichen zu wollen.
Ich baue es noch mit ein.

Statt TicketLock tendiere ich zu TicketMonitor, da Monitor mehr mit Überwachen zu tun hat als mit Verschließen.

Allerdings ist die Klasse ja nicht zum Überwachen da, sie sperrt nur den Zugang, oder gibt ihn wieder frei.
Die Monitor-Klasse liefert ja auch noch mehr, was einem regelrechten Ballspiel gleicht, das ist es hier ja nicht.

Ich hätte auch noch eine andere Idee:

Eine zweite Klasse die die gleichen Methoden anbietet, nur ohne Ticket.
Sie arbeitet dann intern mit AsyncLocal<object> und synchronisiert auf diese Weise automatisch.
Das erspart die Arbeit, wenn eine Instanz von mehreren Tasks genutzt wird, selber ein AsyncLocal zu verwalten.
Ich müsste nur recherchieren, was mit den Referenzen passiert, wenn die Tasks beendet sind, wäre doof, wenn ich dadurch ein MemoryLeak einbaue

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007,

bzgl. Lock <-> Monitor hast du recht. Wenns Monitor genannt würde, so sollte auch ein Pulse, etc. dabei sein. Daher ist wohl doch Lock-Suffix passender.

Eine zweite Klasse die die gleichen Methoden anbietet, nur ohne Ticket.

Das wäre dann ein normaler async lock?
Wie da jetzt das AsyncLocal<object> ins Spiel kommt erschließt sich mir nicht.
Die Klasse kann ja einfach an den TicketLock weiterdelegieren, wobei das Ticket intern erstellt wird.

Auf dem Handy habe ich noch nie Code gelesen - aber die Frage "Wer tut sowas?" kann ich mir vermutlich sparen

V.a. Code / PRs auf GitHub schau ich mir so auch an.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Neue Version* Timeout-Überladung der EnterAsync-Methode

  • Das Ticket wird jetzt als WeakReference gespeichert, wenn die Referenz verloren ist, gilt der State automatisch als freigegeben
  • Ein paar vars entfernt, wo es nicht direkt offensichtlich war. Bei Variablen mit "has" finde ich es aber ziemlich offensichtlich

public sealed class AsyncTicketLock
{
    private readonly Channel<State> _channel = Channel.CreateBounded<State>(1);
    private readonly State _state = new();

    public ValueTask<Releaser> EnterAsync(object ticket, int timeout, CancellationToken cancellationToken = default)
    {
        if (timeout == 0)
            return EnterAsync(ticket, cancellationToken);

        using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        timeoutCts.CancelAfter(timeout);

        return EnterAsync(ticket, timeoutCts.Token);
    }

    public ValueTask<Releaser> EnterAsync(object ticket, TimeSpan timeout, CancellationToken cancellationToken = default)
    {
        if (timeout == TimeSpan.Zero)
            return EnterAsync(ticket, cancellationToken);

        using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        timeoutCts.CancelAfter(timeout);

        return EnterAsync(ticket, timeoutCts.Token);
    }

    public async ValueTask<Releaser> EnterAsync(object ticket, CancellationToken cancellationToken = default)
    {
        while (true)
        {
            if (TryEnter(ticket, out Releaser releaser))
                return releaser;

            await _channel.Writer
                .WaitToWriteAsync(cancellationToken)
                .ConfigureAwait(false);
        }
    }

    public bool TryEnter(object ticket, out Releaser releaser)
    {
        var success = TryEnter(ticket);

        releaser = success
            ? new Releaser(this, ticket)
            : default;

        return success;
    }

    public bool TryEnter(object ticket)
    {
        lock (_state)
        {
            if (_channel.Reader.TryPeek(out State? state) && state.TryGetTicket(out object? stateTicket))
            {
                if (ReferenceEquals(ticket, stateTicket))
                {
                    state.IncrementEnteredCount();

                    return true;
                }

                return false;
            }
            else
            {
                var hasWritten = _channel.Writer.TryWrite(_state);

                Debug.Assert(hasWritten);

                _state.Reset(ticket);

                return true;
            }
        }
    }

    public bool Release(object ticket)
        => Release(ticket, all: false);

    public bool ReleaseAll(object ticket)
        => Release(ticket, all: true);

    private bool Release(object ticket, bool all)
    {
        if (!_channel.Reader.TryPeek(out _))
            return false;

        lock (_state)
        {
            if (!_channel.Reader.TryPeek(out State? state))
                return false;

            if (!state.TryGetTicket(out object? stateTicket))
                return false;

            if (!ReferenceEquals(ticket, stateTicket))
                return false;

            var count = state.DecrementEnteredCount();

            Debug.Assert(count >= 0);

            if (all || count <= 0)
            {
                var hasFreedChannel = _channel.Reader.TryRead(out State? readState);

                Debug.Assert(hasFreedChannel);

                if (readState is not null)
                {
                    readState.TryGetTicket(out object? readStateTicket);

                    Debug.Assert(ReferenceEquals(readStateTicket, stateTicket));
                }
            }

            return true;
        }
    }

    public readonly struct Releaser : IDisposable
    {
        private readonly AsyncTicketLock _parent;
        private readonly object _ticket;

        internal Releaser(AsyncTicketLock parent, object ticket)
        {
            _parent = parent;
            _ticket = ticket;
        }

        public void Dispose()
            => _parent?.Release(_ticket);
    }

    [DebuggerDisplay($"EnteredCount: {{{nameof(_enteredCount)},nq}}")]
    private sealed class State
    {
        private WeakReference<object>? _ticketRef;
        private int _enteredCount;

        public bool TryGetTicket([NotNullWhen(true)] out object? ticket)
        {
            if (_ticketRef is null)
            {
                ticket = null;
                return false;
            }

            return _ticketRef.TryGetTarget(out ticket);
        }

        public void Reset(object ticket)
        {
            _enteredCount = 1;

            _ticketRef = new WeakReference<object>(ticket);
        }

        public int IncrementEnteredCount() => ++_enteredCount;
        public int DecrementEnteredCount() => --_enteredCount;
    }
}

Das wäre dann ein normaler async lock?
Wie da jetzt das AsyncLocal<object> ins Spiel kommt erschließt sich mir nicht.

Ein normales AsyncLock erlaubt meine ich kein mehrfaches Enter, oder irre ich mich?
Ich meine sowas:


public sealed class LocalAsyncTicketLock
{
    private readonly AsyncLocal<object> _localTicket = new();
    private readonly AsyncTicketLock _ticketLock = new();

    private object Ticket
    {
        get => _localTicket.Value ??= new();
    }

    public ValueTask<AsyncTicketLock.Releaser> EnterAsync(int timeout, CancellationToken cancellationToken = default)
        => _ticketLock.EnterAsync(Ticket, timeout, cancellationToken);

    public ValueTask<AsyncTicketLock.Releaser> EnterAsync(TimeSpan timeout, CancellationToken cancellationToken = default)
        => _ticketLock.EnterAsync(Ticket, timeout, cancellationToken);

    public ValueTask<AsyncTicketLock.Releaser> EnterAsync(CancellationToken cancellationToken = default)
        => _ticketLock.EnterAsync(Ticket, cancellationToken);

    public bool TryEnter(out AsyncTicketLock.Releaser releaser)
        => _ticketLock.TryEnter(Ticket, out releaser);

    public bool TryEnter()
        => _ticketLock.TryEnter(Ticket);

    public bool Release()
        => _ticketLock.Release(Ticket);

    public bool ReleaseAll()
        => _ticketLock.ReleaseAll(Ticket);
}

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007,

was noch fehlt sind null-Checks fürs übergebene Ticket.

Das Ticket wird jetzt als WeakReference gespeichert, wenn die Referenz verloren ist, gilt der State automatisch als freigegeben

Das ist aber nicht mehr sehr deterministisch und kann schnell zu unkontrolliertem Verhalten führen.
Zumindest sollte das konfigurierbar sein, z.B. durch ein TicketBehavior-Argument. Somit würde es dann zwei (od. je nachdem wieviele solcher Behaviors zur Verfügung stehen) spezielle State-Klassen geben (mit gemeinsamer abstrakter Basis, so dass die konkreten Typen sealed sein können)

Als Überladung für das Timout würde ich nur (mehr) TimeSpan anbieten. Die int-Überladung somit weg, denn das kann jeder Benutzer selbst erstellen.
Den Trend sehe ich auch bei neuen .NET-APIs.

Kleiner Perf-Tipp:


public ValueTask<Releaser> EnterAsync(object ticket, TimeSpan timeout, CancellationToken cancellationToken = default)
{
    return timeout == TimeSpan.Zero
        ? EnterAsync(ticket, cancellationToken)
        : WithTimeout(ticket, timeout, cancellationToken);

    ValueTask<Releaser> WithTimeout(object ticket, TimeSpan timeout, CancellationToken cancellationToken)
    {
        using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        timeoutCts.CancelAfter(timeout);

        return EnterAsync(ticket, timeoutCts.Token);
    }
}

Ist schneller zur Laufzeit 😉
Das Erstellen der CancellationTokenSource erzeugt eine Menge (Maschinencode), daher wird EnterAsync eine recht große Methode, welche dann ihrerseits nicht mehr inlined wird bzw. werden soll, da einfach zu groß.
So wird die Methode aufgeteilt in eine lokale Funktion, so dass EnterAsync selbst klein bleibt und eher sicher vom JIT inlined wird.

Ein normales AsyncLock erlaubt meine ich kein mehrfaches Enter, oder irre ich mich?

Ich sprach auch nicht vom "AsyncLock", sondern vom "async lock" (also den selbst erstellten).

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

was noch fehlt sind null-Checks fürs übergebene Ticket.

Stimmt - in einem nullable Projekt vergesse ich die gerne Mal ^^

Das ist aber nicht mehr sehr deterministisch und kann schnell zu unkontrolliertem Verhalten führen.

Welches unkontrollierte Verhalten meinst Du?
Wenn kein Task mehr eine Referenz auf das Ticket hat, gibt es doch auch keinen Task mehr, der etwas damit tun wollen könnte, oder?

Aber die Idee, das konfigurierbar zu machen, gefällt mir.
Microsoft verwendet ja auch immer häufiger Options-Typen dafür, daher übernehme ich das auch so

Das Erstellen der CancellationTokenSource erzeugt eine Menge (Maschinencode), daher wird EnterAsync eine recht große Methode, welche dann ihrerseits nicht mehr inlined wird bzw. werden soll, da einfach zu groß.

Also ich wusste ja, dass die StateMachine Zeit kostet, weshalb ich in den Überladungen auch kein await genutzt habe.
Aber auf so einen Zusammenhang wäre ich im Leben nicht gekommen 😁
Aber woher weiß ich, was viel Maschinencode erzeugt? Muss ich das selber herausfinden (gibt ja Tools, die mir zeigen, was der JIT erzeugt), oder gibt es dazu Quellen?

Ich sprach auch nicht vom "AsyncLock", sondern vom "async lock" (also den selbst erstellten).

Achso - ja, aber "async lock" gibt es ja nicht, alle Alternativen, die ich bisher gesehen habe, basieren auf einem AsyncSemaphore und das erlaubt keine mehrfachen Enter.
Aber ja, im Grunde verhält sich diese Klasse dann wie ein "richtiges" async lock - wobei aber auch mehrere parallel laufende Tasks den gleichen ExecutionContext haben können, wenn sie vom selben Task gestartet wurden, habe ich festgestellt, das wäre dann wieder ein Problem.

Die neue Version:

(Und jetzt ist der Code auch zu lang für eine Nachricht)

Es gibt jetzt die drei State-Klassen, wie Du vorgeschlagen hast.
Die Option ist in einem struct (weder readonly noch record), da es ja eigentlich nur ein bool halten und für die Zukunft Platz für weitere Flags bieten soll.
Null-Checks sorgen nur im EnterAsync für eine Exception, da die Anderen alle bool zurück geben, und ich dann erwarten würde, dass die keine Exception werfen, geben die bei ticket=null auch false zurück.

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

public struct AsyncTicketLockOptions
{
    public bool WeakTicketReferencing { get; set; }
}

public sealed class AsyncTicketLock
{
    private readonly Channel<StateBase> _channel = Channel.CreateBounded<StateBase>(1);
    private readonly StateBase _state;

    public AsyncTicketLock()
        : this(default)
    {
    }
    public AsyncTicketLock(AsyncTicketLockOptions options)
    {
        _state = options.WeakTicketReferencing
            ? new WeakRefState()
            : new StrongRefState();
    }

    public ValueTask<Releaser> EnterAsync(object ticket, TimeSpan timeout, CancellationToken cancellationToken = default)
    {
        return timeout == TimeSpan.Zero
            ? EnterAsync(ticket, cancellationToken)
            : WithTimeout(ticket, timeout, cancellationToken);

        ValueTask<Releaser> WithTimeout(object ticket, TimeSpan timeout, CancellationToken cancellationToken)
        {
            using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

            timeoutCts.CancelAfter(timeout);

            return EnterAsync(ticket, timeoutCts.Token);
        }
    }

    public async ValueTask<Releaser> EnterAsync(object ticket, CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(ticket, nameof(ticket));

        while (true)
        {
            if (TryEnter(ticket, out Releaser releaser))
                return releaser;

            await _channel.Writer
                .WaitToWriteAsync(cancellationToken)
                .ConfigureAwait(false);
        }
    }

    public bool TryEnter(object ticket, out Releaser releaser)
    {
        var success = TryEnter(ticket);

        releaser = success
            ? new Releaser(this, ticket)
            : default;

        return success;
    }

    public bool TryEnter(object ticket)
    {
        if (ticket is null)
            return false;

        lock (_state)
        {
            if (_channel.Reader.TryPeek(out StateBase? state) && state.TryGetTicket(out object? stateTicket))
            {
                if (ReferenceEquals(ticket, stateTicket))
                {
                    state.IncrementEnteredCount();

                    return true;
                }

                return false;
            }
            else
            {
                var hasWritten = _channel.Writer.TryWrite(_state);

                Debug.Assert(hasWritten);

                _state.Reset(ticket);

                return true;
            }
        }
    }

    public bool Release(object ticket)
        => Release(ticket, all: false);

    public bool ReleaseAll(object ticket)
        => Release(ticket, all: true);

    private bool Release(object ticket, bool all)
    {
        if (ticket is null)
            return false;

        if (!_channel.Reader.TryPeek(out _))
            return false;

        lock (_state)
        {
            if (!_channel.Reader.TryPeek(out StateBase? state))
                return false;

            if (!state.TryGetTicket(out object? stateTicket))
                return false;

            if (!ReferenceEquals(ticket, stateTicket))
                return false;

            var count = state.DecrementEnteredCount();

            Debug.Assert(count >= 0);

            if (all || count <= 0)
            {
                var hasFreedChannel = _channel.Reader.TryRead(out StateBase? readState);

                Debug.Assert(hasFreedChannel);

                if (readState is not null)
                {
                    readState.TryGetTicket(out object? readStateTicket);

                    Debug.Assert(ReferenceEquals(readStateTicket, stateTicket));
                }
            }

            return true;
        }
    }

    public readonly struct Releaser : IDisposable
    {
        private readonly AsyncTicketLock _parent;
        private readonly object _ticket;

        internal Releaser(AsyncTicketLock parent, object ticket)
        {
            _parent = parent;
            _ticket = ticket;
        }

        public void Dispose()
            => _parent?.Release(_ticket);
    }

    private abstract class StateBase
    {
        private int _enteredCount;

        public int EnteredCount => _enteredCount;

        public void Reset(object ticket)
        {
            _enteredCount = 1;

            SetTicket(ticket);
        }

        public abstract bool TryGetTicket([NotNullWhen(true)] out object? ticket);
        protected abstract void SetTicket(object ticket);

        public int IncrementEnteredCount() => ++_enteredCount;
        public int DecrementEnteredCount() => --_enteredCount;
    }

    [DebuggerDisplay($"EnteredCount: {{{nameof(EnteredCount)},nq}}, UserState: {{{nameof(_ticket)},nq}}")]
    private sealed class StrongRefState : StateBase
    {
        private object? _ticket;

        public override bool TryGetTicket([NotNullWhen(true)] out object? ticket)
        {
            ticket = _ticket;
            return ticket is not null;
        }

        protected override void SetTicket(object ticket)
        {
            _ticket = ticket;
        }
    }

    [DebuggerDisplay($"EnteredCount: {{{nameof(EnteredCount)},nq}}")]
    private sealed class WeakRefState : StateBase
    {
        private WeakReference<object>? _ticketRef;

        public override bool TryGetTicket([NotNullWhen(true)] out object? ticket)
        {
            if (_ticketRef is null)
            {
                ticket = null;
                return false;
            }

            return _ticketRef.TryGetTarget(out ticket);
        }

        protected override void SetTicket(object ticket)
        {
            _ticketRef = new WeakReference<object>(ticket);
        }
    }
}

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

PS:


public ValueTask<Releaser> EnterAsync(object ticket, TimeSpan timeout, CancellationToken cancellationToken = default)
{
    return timeout == TimeSpan.Zero
        ? EnterAsync(ticket, cancellationToken)
        : WithTimeout(ticket, timeout, cancellationToken);

    async ValueTask<Releaser> WithTimeout(object ticket, TimeSpan timeout, CancellationToken cancellationToken)
    {
        using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        timeoutCts.CancelAfter(timeout);

        try
        {
            return await EnterAsync(ticket, timeoutCts.Token);
        }
        catch (OperationCanceledException)
            when (timeoutCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
        {
            throw new TimeoutException();
        }
    }
}

Hätte den Vorteil, dass eine besser Verständliche TimeoutException geworfen wird, anstelle einer OperationCanceledException.
Was meinst Du dazu?
Nachteil ist, dass dadurch wieder eine StateMachine nötig wird.

6.911 Beiträge seit 2009
vor einem Jahr

Hallo,

Welches unkontrollierte Verhalten meinst Du?
Wenn kein Task mehr eine Referenz auf das Ticket hat, gibt es doch auch keinen Task mehr, der etwas damit tun wollen könnte, oder?

Bei der WeakReference kann der GC das Target-Objekt abräumen wenn er glaubt es sei richtig. Ob dies aber für die Synchronisierung richtig ist kann der GC nicht wissen. Daher ist das Verhlaten vom Lock nicht mehr vorhersehbar und kann zu Bugs führen. Es sei denn das ganze Programm ist dafür ausgelegt bzw. es stellt kein Problem für die Geschäftslogik dar wenn auf einmal andere Ticket bearbeitet werden können.

Aber woher weiß ich, was viel Maschinencode erzeugt?

Selbst heruasfinden. Entweder einen JIT-Dump ansehen (nur den erzeugten Maschinencode), das geht recht einfach mit https://sharplab.io/ und anderen offline Tools (da nehme ich ganz gerne bei BenchmarkDotNet den DisassemblyDiagnoser).
Hier hat aber ein Blick in den Quellcode der Implementierung gereicht um das abschätzen zu können. https://source.dot.net/ ist da ganz praktisch.

geben die bei ticket=null auch false zurück

Diesen Fall würde ich als "undefiniert" für die Synchronisation sehen und daher eine NullReferenceException werfen. "Fail early" ist ein gutes Paradigma, denn so kann unterschieden werden ob tatsächlich null übergeben wurde od. ob der lock einfach nicht betreten werden kann.

Hätte den Vorteil, dass eine besser Verständliche TimeoutException geworfen wird, anstelle einer OperationCanceledException.
Was meinst Du dazu?
Nachteil ist, dass dadurch wieder eine StateMachine nötig wird.

Gute Idee, finde ich besser als wenn nur die OperationCanceledException geworfen wird.
Im Exception-Filter würde es aber reichen when (!cancellationToken.IsCancellationRequested) zu prüfen, denn sonst kann es eh nirgends herkommen.
Wegen der State-Machine sehe ich hier keine relevanten Nachteile, da eh schon genug Code produziert wird, da fällt das eher nicht mehr in Gewicht.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Diesen Fall würde ich als "undefiniert" für die Synchronisation sehen und daher eine NullReferenceException werfen. "Fail early" ist ein gutes Paradigma, denn so kann unterschieden werden ob tatsächlich null übergeben wurde od. ob der lock einfach nicht betreten werden kann.

Stimmt - ja, ich versuche möglichst ohne Exceptions zu arbeiten, wo es nicht notwendig ist (z.B. bei Try-Methoden), aber hier ist es ja inhaltlich und dank nullable reference types offensichtlich, dass kein null übergeben werden darf.
Außerdem behandle ich das Disposed genauso: Es fliegt eine Exception für einen Fall, der "eigentlich" nicht vorkommen darf.

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007,

mir erschließt sich kein Einsatz der WeakReference. Ich halte das sogar für gefährlich und als Bug-Quelle -- das gehört mMn wieder raus.

Warum?
Folgendes Szenario: Lock wurde mit ticket1 betreten, somit ist er für ticket2, etc. versperrt.
Im durch den Lock geschützten Abschnitt passiert jetzt eine Menge, so dass u.a. ein GC durchgeführt wird und der Target der WeakReference wird null. Dadurch ergibt dann state.TryGetTicket(out object? stateTicket) false und es wird versucht in den Channel zu schreiben, da ja klappen wird. D.h. ticket2 bekommt Eintritt zum Lock währen dieser Abschnitt eigentlich noch durch ticket1 geschützt sein sollte.

Od. gibt es einen konkreten Fall wo so ein Verhalten sinnvoll ist?
Das lässt sich äußerst schwer nachvollziehen und erst recht nicht vorhersagen.
Wenn das Verhalten drin bleiben sollte, so sollte auch Diagnose-Unterstützung (EventLog) dazu welche meldet dass ein GC passierte und das Ticket abgeräumt wurde.
Das Ganze wird dadurch aber eher ein Ungetum für etwas wofür ich keinen Einsatzbereich sehe.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007,

jetzt hab ich deine Antwort vorhin übersehen...

aber hier ist es ja inhaltlich und dank nullable reference types offensichtlich, dass kein null übergeben werden darf.

Eben nur "darf". Das hindert aber niemanden dennoch null zu übergeben -- v.a. wenn man nicht weiß woher das Objekt kommt.
Unabhängig von Nullability Annotations soll / muss bei public APIs auf null geprüft werden.
Das ist (leider) ein Missverständnis dieses Sprachfeatures. Siehe dazu auch die Diskussion in https://github.com/dotnet/docs/pull/28890#discussion_r841062932.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

Palladin007 Themenstarter:in
2.078 Beiträge seit 2012
vor einem Jahr

Naja, meine eigentliche Grundidee (und auch der Anwendungsfall, aus dem sie entstanden ist) sieht so aus, dass es mehrere Klassen gibt, die auf eine Ressource zugreifen.
Das Lock existiert also klassenübergreifend, z.B. statisch oder in einem Singleton.
Die einzelnen arbeitenden Klassen haben dann ihr Ticket jeweils pro Instanz und können somit erzwingen, dass nur diese eine Instanz etwas tun darf, bis sie selber fertig sind.

Nun kann es aber sein, dass die arbeitende Instanz aus irgendeinem Grund (das wäre definitiv ein Bug) nicht mehr existiert, das Lock vorher aber nicht korrekt freigegeben wurde.
In diesem Fall würde das Lock dann auf ewig gesperrt bleiben, weil niemand mehr eine Referenz auf das Ticket hat - also ein Deadlock.
Durch die WeakReference würde dieser Deadlock irgendwann (indirekt durch den GC) wieder freigegeben werden.

6.911 Beiträge seit 2009
vor einem Jahr

Hallo Palladin007,

Nun kann es aber sein, dass die arbeitende Instanz aus irgendeinem Grund ... nicht mehr existiert, das Lock vorher aber nicht korrekt freigegeben wurde.

Dazu ist der Releaser ja mit using (bzw. try-finally-Dispose) versehen. Somit wird auch im Fehlerfall der Lock korrekt verlassen.

Durch die WeakReference würde dieser Deadlock irgendwann (indirekt durch den GC) wieder freigegeben werden.

Wahrscheinlicher ist aber, dass dadurch jemand Zugriff zum Lock hat der nicht sollte -- siehe oben.
Statt der WeakReference, die eben nicht deterministisch ist, wäre eine Art Deadlock-Detection möglich. Schauen wie viele beim Warten vor dem Lock sind und wie viele im Lock sind. Wenn da nichts passiert (eine bestimmte Zeit) so mag das als Indiz für den Deadlock verwendet werden, der dann entsperrt wird.

Od. weniger kompliziert: eine automatische Entsperrung nach einer bestimmten Zeit. Wenn z.B. die Arbeiten im geschützen Gebiet X Sekunden dauern, so wird das Ticket automatisch noch 2X entfernt. Wird der Lock verlassen, so den Timer zurücksetzen, etc.

Dann ist das Verhalten wenigsten deterministisch und keine Bug-Quelle.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"