Using the BlockingCollection for thread-safe producer-consumer scenarios in .NET Part 2

In the previous post we briefly went through the producer-consumer scenario in general. We also mentioned another class in the System.Collections.Concurrent namespace that we haven’t seen before, i.e. the BlockingCollection of T.

We’ll start looking into this object in this post.

The BlockingCollection object can act as a wrapper around another concurrent collection which implements the IProducerConsumer interface. By now we know that there are 3 of them: ConcurrentBag, ConcurrentQueue and ConcurrentStack. The BlockingCollection can offer at least one extra service that those “plain” thread-safe collections cannot. It has a mechanism that will let the consumer thread to awaken only if there’s a new element in the work queue. You can thereby avoid having to loop constantly to check for new elements in the collection with some interval.

Let’s start by a simple object that will represent the work to be performed by the consumer threads:

public class WorkTask
{
	public WorkTask(string description, DateTime insertedUtc)
	{
		Description = description;
		InsertedUtc = insertedUtc;
	}

	public string Description { get; set; }
	public DateTime InsertedUtc { get; set; }
}

That’s fairly simple I hope.

Next we’ll need an object that holds the work queue itself. Before we see the entire class here’s how you can construct a blocking collection with a concurrent queue as the underlying thread-safe collection:

public class WorkQueue
{
	private BlockingCollection<WorkTask> _workQueue;

	public WorkQueue()
	{
		_workQueue = new BlockingCollection<WorkTask>(new ConcurrentQueue<WorkTask>());
	}
}

So we pass in an object which fulfils the IProducerConsumer interface. Let’s make this more generic so that the caller can decide which implementation to pass in:

public class WorkQueue
{
	private BlockingCollection<WorkTask> _workQueue;

	public WorkQueue(IProducerConsumerCollection<WorkTask> workTaskCollection)
	{
		_workQueue = new BlockingCollection<WorkTask>(workTaskCollection);
	}
}

We’ll also need a public method to add a new WorkItem object to the work queue and another method to monitor the queue. The ConcurrentCollection.Take() method will block the thread until there’s an element to be retrieved. Here’s the first version of the WorkQueue object:

public class WorkQueue
{
	private BlockingCollection<WorkTask> _workQueue;

	public WorkQueue(IProducerConsumerCollection<WorkTask> workTaskCollection)
	{
		_workQueue = new BlockingCollection<WorkTask>(workTaskCollection);
	}

	public void AddTask(WorkTask workTask)
	{
		_workQueue.Add(workTask);
	}

	public void MonitorWorkQueue()
	{
		while (true)
		{
			WorkTask wt = _workQueue.Take();
			Debug.WriteLine(string.Format("Thread {0} processing work task {1}, entered on {2}", Thread.CurrentThread.ManagedThreadId, wt.Description, wt.InsertedUtc));
		}
	}
}

We’ll keep exploring how blocking collections are used in .NET in the next post.

View the list of posts on the Task Parallel Library here.

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

3 Responses to Using the BlockingCollection for thread-safe producer-consumer scenarios in .NET Part 2

  1. Bhushan says:

    Really good tutorial. Loved it !!!

    How do I use this code in my projects? Any licensing in particular?

  2. Yash says:

    I enjoy your posts on all items..very weel explained with good examples

Leave a comment

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.