Laden...

Async Task Pipelining

Erstellt von BlackMatrix vor 8 Jahren Letzter Beitrag vor 8 Jahren 1.897 Views
B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 8 Jahren
Async Task Pipelining

Hallo,

ich möchte eine Pipeline entwickeln, die als Aufgabe die Bearbeitung von Bildern hat. Dabei gibt es 3 Tasks (LoadAsync, EditAsync, RecognizeAsync), wovon die ersten beiden Tasks mit maximaler Parallelität nacheinander abgearbeitet werden können. Der letzte Task darf nicht parallel abgearbeitet werden und darf nachdem die ersten beiden Tasks verarbeitet wurden, immer nur durch einen Thread abgearbeitet werden.
Wie erreiche ich das?

Ich gebe zu, dass meine async/await Kenntnisse noch nicht ausgereift sind, daher wäre es schön, wenn ihr mich auch auf allgemeine Fehler im folgenden Code hinweisen könntet.
Der Ablauf sieht so aus, dass der Nutzer Dateien (Paths) auswählen kann, dann die Möglichkeit optional eine Filtermethode auszuwählen (SelectedFilterMethod) und zu guter Letzt eine Analyse (RecognizeAsync) durchführen. Diese Methode darf immer nur von einem bearbeitet werden. Sobald eine Verarbeitung vollständig ist, soll der Path aus der Oberfläche gelöscht werden und das Ergebnis angezeigt werden.


	public class BatchViewModel : ViewModelBase
	{
		private readonly ObservableCollection<FilterMethodBase> _filterMethods = new ObservableCollection<FilterMethodBase>();
		private readonly ObservableCollection<ResultViewModel> _results = new ObservableCollection<ResultViewModel>();
		private readonly ObservableCollection<string> _paths = new ObservableCollection<string>();
		private RelayCommand _executeCommand;

		public ObservableCollection<ResultViewModel> Results
		{
			get { return _results; }
		}

		public ObservableCollection<string> Paths
		{
			get { return _paths; }
		}
		public ObservableCollection<FilterMethodBase> FilterMethods
		{
			get { return _filterMethods; }
		}

		public ICommand ExecuteCommand
		{
			get { return _executeCommand ?? (_executeCommand = new RelayCommand(ExecuteAsync)); }
		}

		private async void ExecuteAsync()
		{
			Results.Clear();

			var dic = Paths.ToDictionary(ProcessAsync);
			while (dic.Count > 0)
			{
				var task = await Task.WhenAny(dic.Keys);
				Paths.Remove(dic[task]);
				dic.Remove(task);

				var result = await task;

				Results.Add(result);
			}
		}

		private async Task<ResultViewModel> ProcessAsync(string path)
		{
			byte[] imageToProceed;
			byte[] edit = null;
			var source = await LoadAsync(path);

			if (SelectedFilterMethod == null)
			{
				imageToProceed = source;
			}
			else
			{
				edit = await EditAsync(source);
				imageToProceed = edit;
			}

			var result = await RecognizeAsync(imageToProceed);

			return new ResultViewModel
			{
				Source = GetImageSource(source),
				Edit = edit == null ? null : GetImageSource(edit),
				Result = result
			};
		}

		private async Task<byte[]> LoadAsync(string path)
		{
			using (var fs = new FileStream(path,FileMode.Open, FileAccess.Read, FileShare.Read,4096, true))
			{
				var bytes = new byte[fs.Length];
				await fs.ReadAsync(bytes, 0, bytes.Length);
				return bytes;
			}
		}

		private Task<byte[]> EditAsync(byte[] bytes)
		{
			return SelectedFilterMethod.ExecuteAsync(bytes);
		}

		private async Task<string> RecognizeAsync(byte[] bytes)
		{
			string result;

// Dieser Task darf keine Parallelität aufweisen

			return result;
		}
}

6.911 Beiträge seit 2009
vor 8 Jahren

Hallo BlackMatrix,

ich hab mir deinen Code nicht angeschaut, aber ich denke wenn du dir Pipelines wirst du die Grundzüge erlernen. Mit TPL Dataflow gibt es eine Bibliothek die dir bei der Umsetzung auch helfen wird. So ist es wesentlich einfacher und mit viel weniger Infrastrukturrauschen umsetzbar.

mfG Gü

Stellt fachliche Fragen bitte im Forum, damit von den Antworten alle profitieren. Daher beantworte ich solche Fragen nicht per PM.

"Alle sagten, das geht nicht! Dann kam einer, der wusste das nicht - und hat's gemacht!"

B
BlackMatrix Themenstarter:in
218 Beiträge seit 2012
vor 8 Jahren

Sollte das genannte Problem nicht ausschließlich durch eine geschickte Anwendung von async/await und Task.WhenAny und Task.WhenAll gelöst werden können?

16.807 Beiträge seit 2008
vor 8 Jahren

Du hast Dir doch das Stichwort "Pipeline" selbst schon vorgelegt; und in der TPL gibt es genau für diese Zwecke das Pipelining mit entsprechenden Pattern.
Das ist nicht nur viel übersichtlicher; es skaliert auch um Welten besser.