Thread safe collections in .NET: ConcurrentQueue

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent queues

If you don’t know what Queues are then you can read about them here. The Queue of T generic collection has a thread-safe counterpart called ConcurrentQueue. Important methods:

  • Enqueue(T element): adds an item of type T to the collection
  • TryPeek(out T): tries to retrieve the next element from the collection without removing it. The value is set to the out parameter if the method succeeds. Otherwise it returns false.
  • TryDequeue(out T): tries to get the first element. It removes the item from the collection and sets the out parameter to the retrieved element. Otherwise the method returns false

The ‘try’ bit in the method names implies that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same queue you cannot be sure what’s in there when a specific thread tries to read from it.

Example

Declare and fill a concurrent queue:

ConcurrentQueue<int> concurrentQueue = new ConcurrentQueue<int>();

for (int i = 0; i < 5000; i++)
{
	concurrentQueue.Enqueue(i);
}

We’ll want to get the items from this collection and check if all of them have been retrieved using a counter. The counter will also be shared among the threads using the ‘lock’ technique we saw in this post – or actually something that is similar to the ‘lock’ keyword: the Interlocked class. Interlocked has an Increment method which accepts a ref int parameter. It will increment the incoming integer in an atomic operation.

int itemCount = 0;

Task[] queueTasks = new Task[20];
for (int i = 0; i < queueTasks.Length; i++)
{
	queueTasks[i] = Task.Factory.StartNew(() =>
	{
		while (concurrentQueue.Count > 0)
		{
			int currentElement;
			bool success = concurrentQueue.TryDequeue(out currentElement);
			if (success)
			{
				Interlocked.Increment(ref itemCount);
			}
		}
	});
}

The while loop will ensure that we’ll try to dequeue the items as long as there’s something left in the collection.

Wait for the tasks and print the number of items processed – the counter should have the same value as the number of items in the queue:

Task.WaitAll(queueTasks);
Console.WriteLine("Counter: {0}", itemCount);

View the list of posts on the Task Parallel Library here.

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.

%d bloggers like this: