Thread safe queues in .NET
July 14, 2015 1 Comment
We saw how standard Queues work in .NET in this post. Queues are first-in-first-out collections where the first element added to the collection will the first to be removed. Queues are ideal to model consumer-producer scenarios where a producer adds items to a queue which are then processed by a consumer in the order they came in.
If you’d like to have a Queue that is shared among multiple threads in a multi-threaded application then the “normal” Queue of T object won’t be enough. You can never be sure of the current state of the queue in the very moment when a certain thread tries to dequeue an item from it. The queue may have been modified by another thread just a millisecond before and then the Dequeue method will fail. In general you should be very careful with how you share the resources among different threads.
The Queue object has a thread-safe equivalent in .NET called ConcurrentQueue which resides in the System.Collections.Concurrent namespace. Here’s some important terminology:
- We add items to the concurrent queue through the Enqueue method just like in the case of the single-threaded Queue object
- The Dequeue method has been replaced by TryDequeue which returns false in case there was nothing to retrieve from the queue. If it returns true then the dequeued item is returned as an “out” parameter
- Similarly the Peek method has been replaced with TryPeek which works in an analogous way
The following code fills up a queue of integers with 1000 elements and starts 4 different threads that all read from the shared queue.
public class ConcurrentQueueSampleService { private ConcurrentQueue<int> _integerQueue = new ConcurrentQueue<int>(); public void RunConcurrentQueueSample() { FillUpQueue(1000); Task readerOne = Task.Run(() => GetFromQueue()); Task readerTwo = Task.Run(() => GetFromQueue()); Task readerThree = Task.Run(() => GetFromQueue()); Task readerFour = Task.Run(() => GetFromQueue()); Task.WaitAll(readerOne, readerTwo, readerThree, readerFour); } private void FillUpQueue(int max) { for (int i = 0; i <= max; i++) { _integerQueue.Enqueue(i); } } private void GetFromQueue() { int res; bool success = _integerQueue.TryDequeue(out res); while (success) { Debug.WriteLine(res); success = _integerQueue.TryDequeue(out res); } } }
Each thread will try to dequeue items from the shared queue as long as TryDequeue returns false. In that case we know that there’s nothing more left in the collection.
View the list of posts on the Task Parallel Library here.
Pingback: Summary of thread-safe collections in .NET – .NET training with Jead