Laden...

Mehrere Enumerables ohne großen Buffer zusammenfassen

Erstellt von Uwe81 vor 5 Jahren Letzter Beitrag vor 5 Jahren 1.926 Views
U
Uwe81 Themenstarter:in
282 Beiträge seit 2008
vor 5 Jahren
Mehrere Enumerables ohne großen Buffer zusammenfassen

Hallo,

Ich habe ein Array von Enumerables


IEnumerable<T>[] sources;

Typischerweise iterieren diese Enumerables nicht schnell, sind z.B. Dateien aus einem Document Management System. Der Overhead ist hoch, sodass sich wegen der hohen Latenz Parallelisierung lohnen würde, z.B. weil ich bei drei verschiedenen Repositories Dokumente runterlade.

Das Interface gibt mir für ein Repository alle Dokumente als IEnumerable<T>

Jetzt suche ich ein Enumerable, was sich wiefolgt berechnet.
Für jedes Sourceelement gibt es einen kleinen Buffer.
Die Enumerables werden parallel durchlaufen, d.h. wenn der Buffer für ein Enumerable nicht mehr gefüllt ist, wird das nächste Element bereits abgerufen.
Wenn ich für das "merged Enumerable" das nächste Element abfrage, erhalte ich irgend eines aus den buffern der unterliegenen enumerables, oder halt das nächste was da ist.

Ich bin neun in Tasks, PLINQ und parallel foreach. Daher verliere ich mich bei der Implementierung.

Das beste was ich bisher hinbekommen habe ist eine manuelle Implementierung mit Blocking Collections (noch nicht getestet)
Geht das wirklich nicht eleganter?


        public int MaxCapacity { get; set; }
        public IList<IIterateStrategy> IterateStrategies { get; set; };

        public IEnumerable<DSDownloadEntry> Iterate() {
            var collection = new BlockingCollection<DSDownloadEntry>(MaxCapacity);
            var tasks = new Task[IterateStrategies.Count];
            for(int i = 0; i<IterateStrategies.Count; i++) {
                int index = i;
                var strategy = IterateStrategies[index];
                tasks[i] = Task.Factory.StartNew(() => {
                    foreach (var item in IterateStrategies[index].Iterate()) {
                        collection.Add(new DSDownloadEntry() { Container = item });
                    }
                });
            }
            Task.Factory.ContinueWhenAll(tasks, _ => collection.CompleteAdding());
            return collection.GetConsumingEnumerable();
        }

Ergänzung:

Ich habe jetzt nochmal PLINQ versucht:


    class Producer {
        public string Prefix;
        public int Count;

        public IEnumerable<string> GetResults() {
            Random random = new Random();
            for (int i = 0; i < Count; i++) {
                var text = string.Format("{0}{1:000000}", Prefix, i);
                Console.WriteLine("Producing " + text);
                yield return text;
            }
            Console.WriteLine("############### DONE " + Prefix);
        }
    }
    class Program {
        public static void Main() {
            var producers = new[] {
                new Producer() {Prefix = "A", Count = 100000},
                new Producer() {Prefix = "B", Count = 100000},
                new Producer() {Prefix = "C", Count = 100000},
            };

            var merged = from p in producers.AsParallel()
                         from r in p.GetResults()
                         select r;

            foreach (var r in merged) {
                Console.WriteLine("Consumint " + r);
            }
        }
    }

Das Problem hier ist, das der Producer bereits weiterläuft, auch wenn nur wenig abgerufen wird.
Eine ergänzung "AsParallel().WithMergeOptions" hat nichts verändert.

Mein Wunsch wäre eben, dass pro Producer nur wenige Einträge gebuffert werden.

W
955 Beiträge seit 2010
vor 5 Jahren

Hallo,
fassen wir mal zusammen:
* du hast mehrere Producer welche Objekte aufwändig von jeweils einer zugeordneten Datenquelle abrufen.
* diese Producer sollen vorab Objekte laden da dies lange dauern kann. Sie sollen aber nicht zuviele Objekte laden das dies zuviel Speicher verbrauchen kann.
* ein oder mehrere Konsumenten verarbeiten diese Objekte in dem die verschiedenen Ladepuffer zusammengeführt werden.
Was kann man machen?
* vllt TPL Dataflow verwenden. Die Idee dort ist die Datenströme wie ein Fließband/Materialflußanlage aufzuteilen. Es gibt dort z.B. ActionBlocks die zu einem Ganzen zusammengesteckt werden können und die auch begrenzt/gedrosselt werden können. Hier noch ein Artikel mit einer Beispielimplementierung.
* RX / Reactive Extensions / Reactive Framework: Dort werden Actionen definiert die ausgeführt werden wenn ein Ereignis (ein Dokument wurde heruntergeladen) eingetreten ist.
* self-made: Task.WhenAny, Concurent Queues ...

4.221 Beiträge seit 2005
vor 5 Jahren

(ich habe nicht alles im Detail durchgelesen).

Aber wie wäre es mit einem ganz anderen Ansatz.

Mit einem ContinueWith.

https://msdn.microsoft.com/de-de/library/dd537612(v=vs.110).aspx

Die Objekte würden sich dann selber für den nächsten Prozesschritt anmelden sobald sie verfügbar sind.

Früher war ich unentschlossen, heute bin ich mir da nicht mehr so sicher...