Willkommen auf myCSharp.de! Anmelden | kostenlos registrieren
 | Suche | FAQ

Hauptmenü
myCSharp.de
» Startseite
» Forum
» Suche
» Regeln
» Wie poste ich richtig?

Mitglieder
» Liste / Suche
» Wer ist online?

Ressourcen
» FAQ
» Artikel
» C#-Snippets
» Jobbörse
» Microsoft Docs

Team
» Kontakt
» Cookies
» Spenden
» Datenschutz
» Impressum

  • »
  • Community
  • |
  • Diskussionsforum
Mehrere Enumerables ohne großen Buffer zusammenfassen
Uwe81
myCSharp.de - Member



Dabei seit:
Beiträge: 285
Herkunft: Ludwigshafen

Themenstarter:

Mehrere Enumerables ohne großen Buffer zusammenfassen

beantworten | zitieren | melden

Hallo,

Ich habe ein Array von Enumerables


IEnumerable<T>[] sources;

Typischerweise iterieren diese Enumerables nicht schnell, sind z.B. Dateien aus einem Document Management System. Der Overhead ist hoch, sodass sich wegen der hohen Latenz Parallelisierung lohnen würde, z.B. weil ich bei drei verschiedenen Repositories Dokumente runterlade.

Das Interface gibt mir für ein Repository alle Dokumente als IEnumerable<T>


Jetzt suche ich ein Enumerable, was sich wiefolgt berechnet.
Für jedes Sourceelement gibt es einen kleinen Buffer.
Die Enumerables werden parallel durchlaufen, d.h. wenn der Buffer für ein Enumerable nicht mehr gefüllt ist, wird das nächste Element bereits abgerufen.
Wenn ich für das "merged Enumerable" das nächste Element abfrage, erhalte ich irgend eines aus den buffern der unterliegenen enumerables, oder halt das nächste was da ist.


Ich bin neun in Tasks, PLINQ und parallel foreach. Daher verliere ich mich bei der Implementierung.

Das beste was ich bisher hinbekommen habe ist eine manuelle Implementierung mit Blocking Collections (noch nicht getestet)
Geht das wirklich nicht eleganter?


        public int MaxCapacity { get; set; }
        public IList<IIterateStrategy> IterateStrategies { get; set; };

        public IEnumerable<DSDownloadEntry> Iterate() {
            var collection = new BlockingCollection<DSDownloadEntry>(MaxCapacity);
            var tasks = new Task[IterateStrategies.Count];
            for(int i = 0; i<IterateStrategies.Count; i++) {
                int index = i;
                var strategy = IterateStrategies[index];
                tasks[i] = Task.Factory.StartNew(() => {
                    foreach (var item in IterateStrategies[index].Iterate()) {
                        collection.Add(new DSDownloadEntry() { Container = item });
                    }
                });
            }
            Task.Factory.ContinueWhenAll(tasks, _ => collection.CompleteAdding());
            return collection.GetConsumingEnumerable();
        }



Ergänzung:

Ich habe jetzt nochmal PLINQ versucht:


    class Producer {
        public string Prefix;
        public int Count;

        public IEnumerable<string> GetResults() {
            Random random = new Random();
            for (int i = 0; i < Count; i++) {
                var text = string.Format("{0}{1:000000}", Prefix, i);
                Console.WriteLine("Producing " + text);
                yield return text;
            }
            Console.WriteLine("############### DONE " + Prefix);
        }
    }
    class Program {
        public static void Main() {
            var producers = new[] {
                new Producer() {Prefix = "A", Count = 100000},
                new Producer() {Prefix = "B", Count = 100000},
                new Producer() {Prefix = "C", Count = 100000},
            };

            var merged = from p in producers.AsParallel()
                         from r in p.GetResults()
                         select r;

            foreach (var r in merged) {
                Console.WriteLine("Consumint " + r);
            }
        }
    }

Das Problem hier ist, das der Producer bereits weiterläuft, auch wenn nur wenig abgerufen wird.
Eine ergänzung "AsParallel().WithMergeOptions" hat nichts verändert.

Mein Wunsch wäre eben, dass pro Producer nur wenige Einträge gebuffert werden.
Dieser Beitrag wurde 2 mal editiert, zum letzten Mal von Uwe81 am .
private Nachricht | Beiträge des Benutzers
witte
myCSharp.de - Member



Dabei seit:
Beiträge: 960

beantworten | zitieren | melden

Hallo,
fassen wir mal zusammen:
* du hast mehrere Producer welche Objekte aufwändig von jeweils einer zugeordneten Datenquelle abrufen.
* diese Producer sollen vorab Objekte laden da dies lange dauern kann. Sie sollen aber nicht zuviele Objekte laden das dies zuviel Speicher verbrauchen kann.
* ein oder mehrere Konsumenten verarbeiten diese Objekte in dem die verschiedenen Ladepuffer zusammengeführt werden.
Was kann man machen?
* vllt TPL Dataflow verwenden. Die Idee dort ist die Datenströme wie ein Fließband/Materialflußanlage aufzuteilen. Es gibt dort z.B. ActionBlocks die zu einem Ganzen zusammengesteckt werden können und die auch begrenzt/gedrosselt werden können. Hier noch ein Artikel mit einer Beispielimplementierung.
* RX / Reactive Extensions / Reactive Framework: Dort werden Actionen definiert die ausgeführt werden wenn ein Ereignis (ein Dokument wurde heruntergeladen) eingetreten ist.
* self-made: Task.WhenAny, Concurent Queues ...
Dieser Beitrag wurde 2 mal editiert, zum letzten Mal von witte am .
private Nachricht | Beiträge des Benutzers
Programmierhans
myCSharp.de - Experte

Avatar #avatar-1651.gif


Dabei seit:
Beiträge: 4318
Herkunft: Zentralschweiz

beantworten | zitieren | melden

(ich habe nicht alles im Detail durchgelesen).

Aber wie wäre es mit einem ganz anderen Ansatz.

Mit einem ContinueWith.
https://msdn.microsoft.com/de-de/library/dd537612(v=vs.110).aspx

Die Objekte würden sich dann selber für den nächsten Prozesschritt anmelden sobald sie verfügbar sind.
Früher war ich unentschlossen, heute bin ich mir da nicht mehr so sicher...
private Nachricht | Beiträge des Benutzers