Using a thread-safe queue collection in .NET

We looked at the Queue collection type in .NET in this post. We saw how it could be used as a first-in-first-out collection. A new element is placed at the end of the queue and the first element to be removed from it is the one in front of the queue.

A Queue is an ideal container for tasks. Let’s create a class called WorkTask to represent tasks. I didn’t want to call this object a “Task” so that it is not confused with the Task object in the Task Parallel Library (TPL). If you don’t know what TPL means then take a look at the post referenced at the end of this article.

Here’s the WorkTask object:

public class WorkTask
{
	public string Description { get; set; }
}

Consider the following code where we fill up a queue in two subsequent method calls and then dequeue the items one by one:

public class WorkTaskSampleService
{
	private Queue<WorkTask> _workTasks;

	public WorkTaskSampleService()
	{
		_workTasks = new Queue<WorkTask>();
	}

	public void RunSequentialSample()
	{
		AddTasks("John", "Wake up", "Eat breakfast", "Get dressed", "Go to work", "Drop the kids");
		PerformTasks();
		AddTasks("Karen", "Have lunch", "Talk to colleagues", "Go home", "Take shower", "Go to bed");
		PerformTasks();
	}

	private void PerformTasks()
	{
		while (_workTasks.Count > 0)
		{
			WorkTask wt = _workTasks.Dequeue();
			Debug.WriteLine("Performing task: " + wt.Description);
		}
	}

	private void AddTasks(string user, params string[] workTasks)
	{
		for (int i = 0; i < workTasks.Length; i++)
		{
			_workTasks.Enqueue(new WorkTask() { Description = string.Format("Work name: {0}, given to {1}", workTasks[i], user) });
		}
	}
}

If you call the RunSequentialSample method then you’ll get the following debug output:

Performing task: Work name: Wake up, given to John
Performing task: Work name: Eat breakfast, given to John
Performing task: Work name: Get dressed, given to John
Performing task: Work name: Go to work, given to John
Performing task: Work name: Drop the kids, given to John
Performing task: Work name: Have lunch, given to Karen
Performing task: Work name: Talk to colleagues, given to Karen
Performing task: Work name: Go home, given to Karen
Performing task: Work name: Take shower, given to Karen
Performing task: Work name: Go to bed, given to Karen

This is exactly as expected, there’s nothing special here.

Now let’s say that you’d like to fill up and consume the Queue on different threads. We simulate that multiple processes are allowed to add new tasks to the queue. We’ll use the Task.Run method we saw in this post:

public void RunParallelSample()
{
	Task firstTaskList = Task.Run(() => AddTasks("John", "Wake up", "Eat breakfast", "Get dressed", "Go to work", "Drop the kids"));
	Task performInitialTasks = Task.Run(() => PerformTasks());
	Task secondTaskList = Task.Run(() => AddTasks("Karen", "Have lunch", "Talk to colleagues", "Go home", "Take shower", "Go to bed"));
	Task performAdditionalTasks = Task.Run(() => PerformTasks());
	Task.WaitAll(firstTaskList, performInitialTasks, secondTaskList, performAdditionalTasks);
}

If you call the RunParallelSample method then depending on your CPU and the default task scheduler you’ll probably most often get the same output as above. However, if you run the method enough times, say 10-15, then you’ll get an exception eventually. The exception is most likely to occur at this part of the code:

Debug.WriteLine("Performing task: " + wt.Description);

The exception will tell you that “wt” is null. The reason is that there are two treads trying to read from the same queue. One thread retrieves the last element and the other one still remains within the while loop as the condition yielded count > 0 probably just a millisecond before that. Then Dequeue yields null and wt.Description fails.

It’s easy to see why it’s dangerous to share objects like that among threads that all read and write to that object.

In this very case the solution is not too complex as .NET has a built-in thread-safe variant of the Queue class, the aptly called ConcurrentQueue. It resides in the System.Collections.Concurrent namespace. It has no Dequeue method. Instead the TryDequeue method returns a bool which indicates whether an item was dequeued or not.

We first need to change the queue type:

private ConcurrentQueue<WorkTask> _workTasks;

public WorkTaskSampleService()
{
	_workTasks = new ConcurrentQueue<WorkTask>();
}

…and modify the PerformTasks method as follows:

private void PerformTasks()
{
	while (_workTasks.Count > 0)
	{
		WorkTask wt;
		bool dequeueSuccess = _workTasks.TryDequeue(out wt);
		if (dequeueSuccess)
		{
			Debug.WriteLine("Performing task: " + wt.Description);
		}
	}
}

View the list of posts on the Task Parallel Library here.

Advertisements

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

ultimatemindsettoday

A great WordPress.com site

Elliot Balynn's Blog

A directory of wonderful thoughts

Robin Sedlaczek's Blog

Developer on Microsoft Technologies

HarsH ReaLiTy

A Good Blog is Hard to Find

Softwarearchitektur in der Praxis

Wissenswertes zu Webentwicklung, Domain-Driven Design und Microservices

the software architecture

thoughts, ideas, diagrams,enterprise code, design pattern , solution designs

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

Anything around ASP.NET MVC,WEB API, WCF, Entity Framework & AngularJS

Cyber Matters

Bite-size insight on Cyber Security for the not too technical.

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: