Using a thread-safe queue collection in .NET
July 22, 2015 Leave a comment
We looked at the Queue collection type in .NET in this post. We saw how it could be used as a first-in-first-out collection. A new element is placed at the end of the queue and the first element to be removed from it is the one in front of the queue.
A Queue is an ideal container for tasks. Let’s create a class called WorkTask to represent tasks. I didn’t want to call this object a “Task” so that it is not confused with the Task object in the Task Parallel Library (TPL). If you don’t know what TPL means then take a look at the post referenced at the end of this article.
Here’s the WorkTask object:
public class WorkTask { public string Description { get; set; } }
Consider the following code where we fill up a queue in two subsequent method calls and then dequeue the items one by one:
public class WorkTaskSampleService { private Queue<WorkTask> _workTasks; public WorkTaskSampleService() { _workTasks = new Queue<WorkTask>(); } public void RunSequentialSample() { AddTasks("John", "Wake up", "Eat breakfast", "Get dressed", "Go to work", "Drop the kids"); PerformTasks(); AddTasks("Karen", "Have lunch", "Talk to colleagues", "Go home", "Take shower", "Go to bed"); PerformTasks(); } private void PerformTasks() { while (_workTasks.Count > 0) { WorkTask wt = _workTasks.Dequeue(); Debug.WriteLine("Performing task: " + wt.Description); } } private void AddTasks(string user, params string[] workTasks) { for (int i = 0; i < workTasks.Length; i++) { _workTasks.Enqueue(new WorkTask() { Description = string.Format("Work name: {0}, given to {1}", workTasks[i], user) }); } } }
If you call the RunSequentialSample method then you’ll get the following debug output:
Performing task: Work name: Wake up, given to John
Performing task: Work name: Eat breakfast, given to John
Performing task: Work name: Get dressed, given to John
Performing task: Work name: Go to work, given to John
Performing task: Work name: Drop the kids, given to John
Performing task: Work name: Have lunch, given to Karen
Performing task: Work name: Talk to colleagues, given to Karen
Performing task: Work name: Go home, given to Karen
Performing task: Work name: Take shower, given to Karen
Performing task: Work name: Go to bed, given to Karen
This is exactly as expected, there’s nothing special here.
Now let’s say that you’d like to fill up and consume the Queue on different threads. We simulate that multiple processes are allowed to add new tasks to the queue. We’ll use the Task.Run method we saw in this post:
public void RunParallelSample() { Task firstTaskList = Task.Run(() => AddTasks("John", "Wake up", "Eat breakfast", "Get dressed", "Go to work", "Drop the kids")); Task performInitialTasks = Task.Run(() => PerformTasks()); Task secondTaskList = Task.Run(() => AddTasks("Karen", "Have lunch", "Talk to colleagues", "Go home", "Take shower", "Go to bed")); Task performAdditionalTasks = Task.Run(() => PerformTasks()); Task.WaitAll(firstTaskList, performInitialTasks, secondTaskList, performAdditionalTasks); }
If you call the RunParallelSample method then depending on your CPU and the default task scheduler you’ll probably most often get the same output as above. However, if you run the method enough times, say 10-15, then you’ll get an exception eventually. The exception is most likely to occur at this part of the code:
Debug.WriteLine("Performing task: " + wt.Description);
The exception will tell you that “wt” is null. The reason is that there are two treads trying to read from the same queue. One thread retrieves the last element and the other one still remains within the while loop as the condition yielded count > 0 probably just a millisecond before that. Then Dequeue yields null and wt.Description fails.
It’s easy to see why it’s dangerous to share objects like that among threads that all read and write to that object.
In this very case the solution is not too complex as .NET has a built-in thread-safe variant of the Queue class, the aptly called ConcurrentQueue. It resides in the System.Collections.Concurrent namespace. It has no Dequeue method. Instead the TryDequeue method returns a bool which indicates whether an item was dequeued or not.
We first need to change the queue type:
private ConcurrentQueue<WorkTask> _workTasks; public WorkTaskSampleService() { _workTasks = new ConcurrentQueue<WorkTask>(); }
…and modify the PerformTasks method as follows:
private void PerformTasks() { while (_workTasks.Count > 0) { WorkTask wt; bool dequeueSuccess = _workTasks.TryDequeue(out wt); if (dequeueSuccess) { Debug.WriteLine("Performing task: " + wt.Description); } } }
View the list of posts on the Task Parallel Library here.