C#: BlockingCollection<T> am Beispiel MetroFtpClient

Innerhalb des MetroFtpClients (https://github.com/steve600/MetroFtpClient) gibt es eine Warteschlange um die auszuführenden Up- und Downloads zu verwalten. Bei der Abarbeitung der Warteschlange wünscht man sich nun oft einen gewissen Grad an Parallelität um die Performance zu steigern (z.B. mehrere simultane Downloads). Mit .NET 4.0 hat Microsoft einen großen Schritt in diese Richtung getan und den Entwicklern durch die sogenannten ConcurrentCollections (Quelle [1]) viel Arbeit abgenommen. In diesem Beitrag wird auf Basis des MetroFtpClients gezeigt wie man eine solche ConcurrentCollection einsetzen kann um mehrere simultane Downloads zu realisieren.

Grundlagen

Einen neuen Thread in C# zu starten ist kein größeres Problem. Die Herausforderung liegt eher darin, die Daten zu identifizieren auf welche potentiell aus mehreren Threads heraus simultan zugegriffen werden kann. Dann liegt es am Entwickler diese Daten zu synchronisieren um simultane Zugriffe zu verhindern, so dass „sicher“ auf die Daten zugegriffen werden kann (z.B. mit lock). Mit dem .NET Framework 4.0 führte Microsoft den System.Collections.Concurrent-Namespace ein, der mehrere Auflistungsklassen enthält, die sowohl threadsicher als auch skalierbar sind. Diesen Auflistungen können sicher und effizient aus mehreren Threads heraus Elemente hinzugefügt bzw. entfernt werden, ohne dass eine zusätzliche Synchronisierung im Benutzercode erforderlich ist. Hier mal eine Übersicht über die zur Verfügung gestellten Auflistungsklassen:

Threadsichere Auflistungstypen

NameBeschreibung
BlockingCollection<T>Stellt eine Auflistung bereit, die es ermöglicht Producer-/Consumer-Szenarios zu implementieren (wobei Producer Daten in die Collection schreiben während die Consumer Daten lesen). Dabei ist der Auflistungstyp generisch.

Darüber hinaus werden Begrenzungs- und Blockierungsfunktionen für jeden Typ bereitgestellt, der die IProducerConsumerCollection-Schnittstelle implementiert.
ConcurrentDictionary<TKey, TValue>Threadsichere Implementierung eines Wörterbuchs von Schlüssel-Wert-Paaren.
ConcurrentQueue<T> (*)Threadsichere Implementierung einer First In, First Out (FIFO)-Warteschlange.
ConcurrentStack<T> (*)Threadsichere Implementierung eines Last In, First Out (LIFO)-Stapels.
ConcurrentBag<T> (*)Threadsichere Implementierung einer ungeordneten Auflistung von Elementen.
IProducerConsumerCollection<T>Die Schnittstelle, die ein Typ implementieren muss, damit sie in BlockingCollection verwendet werden kann.

Die mit (*) gekennzeichneten Auflistungstypen implementieren das Interface IProducerConsumerCollection<T>.

In diesem Beitrag möchte ich jetzt nicht auf alle zur Verfügung gestellten Auflistungsklassen im Detail eingehen. Hier geht es vor allem um die BlockingCollection<T>-Klasse, welche im MetroFtpClient eingesetzt wird um eine Download-Queue zu implementieren, welche simultane Downloads/Uploads ermöglicht.

BlockingCollection<T>

Die BlockingCollection hat zwei Features, welche sie von anderen Auflistungsklassen unterscheidet:

  • Bounding
  • Blocking

Bounding gibt die maximale Anzahl von Objekten an, die innerhalb der BlockingCollection gespeichert werden können. Wenn ein Producer-Thread nun das maximale Limit erreicht wird dieser solange geblockt (Blocking) bis ein Consumer-Thread Einträge aus der BlockingCollection entfernt. Währenddessen geht der Producer-Thread in den Sleep-Modus. Ist die BlockingCollection dagegen leer wird der Consumer-Thread solange blockiert bis der Producer-Thread neue Einträge in die BlockingCollection einfügt.

BlockingCollection<T> erzeugen

Nachfolgend mal ein Beispiel zur Erzeugung einer BlockingCollection:

Default Collection Type

Standardmäßig nutzt die BlockingCollection die ConcurrentQeueue-Klasse für die Auflistung. Jetzt besteht die Möglichkeit noch andere Auflistungsklassen zu verwenden, aber nur solche, die das IProducerConsumerCollection-Interface implementieren. ConcurrentStack und ConcurrentBag implementieren out-of-the-box die IProducerConsumerCollection-Schnittstelle (siehe Tabelle weiter oben im Beitrag). Darüber hinaus besteht die Möglichkeit eine eigene Collection-Klasse auf Basis der IProducerConsumerCollection-Schnittstelle zu implementieren und diese zu verwenden. Nachfolgend mal ein Beispiel unter Verwendung der ConcurrentBag-Klasse mit einer oberen Grenze von 10 Einträgen:

DownloadQueue innerhalb des MetroFtpClient

Kommen wir zur Download-Queue innerhalb des MetroFtpClient. Hier wird eine BlockingCollection verwendet um eine Download/Upload-Queue zu realisieren. Da es bei der Klasse BlockingCollection<T> um eine generische Auflistung handelt können die Auflistungstypen frei gewählt werden. Für die angesprochene Download-Queue werden Objekte vom folgenden Typ innerhalb der BlockingCollection verwaltet:

Die Erzeugung der BlockingCollection ist dann ziemlich einfach:

Die BlockingCollection selbst wird aber innerhalb einer Klasse namens DownloadQueue erzeugt. Innerhalb dieser Klasse wird die eigentliche BlockingCollection verwaltet und einige Methoden für das Handling bereitgestellt (z.B. neuen Download in die Queue einreihen) bereitgestellt.

Im Konstruktor der Klasse werden die Worker-Threads für die simultanen Downloads erzeugt (die Anzahl der Worker-Threads kann als Parameter mitgegeben werden -> Zeile 15). Diese Threads führen die Consume-Methode aus (Zeile 22). Innerhalb der Consume-Methode wird die Warteschlange mittels einer foreach-Schleife abgearbeitet. Hier kommt die GetConsumingEnumerable()-Methode der BlockingCollection zum Einsatz (Zeile 81). Die GetConsumingEnumerable()-Methode blockiert jetzt so lange, bis ein Element entnommen werden konnte, die Warteschlange abgearbeitet wurde oder die maximale Anzahl an Threads erreicht wurde. Der eigentliche Download/Upload wird dann in Zeile 94 mittels eines Action-Delegaten ausgeführt (hier ist man dann flexibel und kann eine beliebige Methode mitgeben). Mit der Methode EnqueueTask in Zeile 42 und 54 können neue Downloads in die Warteschlange eingereiht werden (auch wenn die Warteschlange aktuell abgearbeitet wird).

Alternativen

Es gibt natürlich auch einige Alternativen um ein solches Szenario zu realisieren:

Bei Gelegenheit werde ich einen eigenen Beitrag dazu schreiben.

Fazit

Durch die Verwendung der BlockingCollection ist keine Thread-Synchronisierung (z.B. durch lock-Statements) mehr notwendig. Dies spart eine Menge an Quellcode und dies wirkt sich natürlich direkt auf die Wartbarkeit bzw. Code-Qualität aus (wie an der gezeigten DownloadQeue zu sehen sind nur wenige Zeilen Quellcode notwendig). Durch die Unterstützung von Bounding und Blocking wird die Umsetzung von Producer-Consumer-Szenarios erheblich vereinfacht.

Der Quellcode ist hier zu finden: https://github.com/steve600/MetroFtpClient

Literaturverzeichnis und Weblinks

Abk.Quelle
[1]System.Collections.Concurrent-Namespace
https://msdn.microsoft.com/de-de/library/system.collections.concurrent(v=vs.110).aspx
[2]BlockingCollection<T>-Klasse
https://msdn.microsoft.com/de-de/library/dd267312.aspx
[3]TPL Dataflow
https://msdn.microsoft.com/de-de/library/hh228603(v=vs.110).aspx
[4]Reactive-Extensions / Rx.NET
https://github.com/Reactive-Extensions/Rx.NET

Fork me on GitHub