Using the BlockingCollection for thread-safe producer-consumer scenarios in .NET Part 3

In the previous post of this mini-series we started building our model for the producer-consumer scenario. We have two objects to begin with. First, WorkTask.cs which represents the task that’s added to the work queue by the producers and retrieved by the consumers to act upon. Second we have an object, WorkQueue to encapsulate the work queue itself. It will be the consumer of the work queue through the MonitorWorkQueue method.

We haven’t yet seen the the consumer class yet. We’ll do that in this post.

The producer will need a work queue which it can add the work items to. It will also need a method to continuously produce work items. We’ll let the infinite loop sleep for 2 seconds:

public class WorkItemProducer
{
	private WorkQueue _workQueue;

	public WorkItemProducer(WorkQueue workQueue)
	{
		_workQueue = workQueue;
	}

	public void ProduceWorkItems()
	{
		while (true)
		{
			Guid jobId = Guid.NewGuid();
			WorkTask wt = new WorkTask(string.Concat("Work with job ID ", jobId), DateTime.UtcNow);
			Debug.WriteLine(string.Format("Thread {0} added work {1} at {2} to the work queue.",
				Thread.CurrentThread.ManagedThreadId, wt.Description, wt.InsertedUtc));
			_workQueue.AddTask(wt);
			Thread.Sleep(2000);
		}
	}
}

The following method will start 3 producers and 2 consumers:

public class BlockingCollectionSampleService
{
	public void RunBlockingCollectionCodeSample()
	{
		WorkQueue workQueue = new WorkQueue(new ConcurrentQueue<WorkTask>());
		WorkItemProducer producerOne = new WorkItemProducer(workQueue);
		WorkItemProducer producerTwo = new WorkItemProducer(workQueue);
		WorkItemProducer producerThree = new WorkItemProducer(workQueue);

		Task producerTaskOne = Task.Run(() => producerOne.ProduceWorkItems());
		Task producerTaskTwo = Task.Run(() => producerTwo.ProduceWorkItems());
		Task producerTaskThree = Task.Run(() => producerThree.ProduceWorkItems());

		Task consumerTaskOne = Task.Run(() => workQueue.MonitorWorkQueue());
		Task consumerTaskTwo = Task.Run(() => workQueue.MonitorWorkQueue());

		Console.WriteLine("Tasks started...");
	}
}

If you call the RunBlockingCollectionCodeSample then you’ll see something like the following in the Debug window:

Thread 11 added work Work with job ID 7ec0fada-66b7-4696-867f-2987a4089402 at 2015-06-06 19:38:54 to the work queue.
Thread 9 added work Work with job ID 3f81b5b2-2050-4807-8d41-2e048fe31bfb at 2015-06-06 19:38:54 to the work queue.
Thread 10 added work Work with job ID 0fe46057-c812-454c-8aa0-cbcd96b58af9 at 2015-06-06 19:38:54 to the work queue.
Thread 12 processing work task Work with job ID 7ec0fada-66b7-4696-867f-2987a4089402, entered on 2015-06-06 19:38:54
Thread 12 processing work task Work with job ID 3f81b5b2-2050-4807-8d41-2e048fe31bfb, entered on 2015-06-06 19:38:54
Thread 12 processing work task Work with job ID 0fe46057-c812-454c-8aa0-cbcd96b58af9, entered on 2015-06-06 19:38:54
Thread 11 added work Work with job ID 6d73b9a2-db3b-4d32-b432-6ade85d6f567 at 2015-06-06 19:38:56 to the work queue.
Thread 10 added work Work with job ID 1f846bdf-3c91-4d28-bf6e-f1e4e3690d4f at 2015-06-06 19:38:56 to the work queue.
Thread 12 processing work task Work with job ID 6d73b9a2-db3b-4d32-b432-6ade85d6f567, entered on 2015-06-06 19:38:56
Thread 13 processing work task Work with job ID 1f846bdf-3c91-4d28-bf6e-f1e4e3690d4f, entered on 2015-06-06 19:38:56
Thread 9 added work Work with job ID f68abbdf-14d2-4975-82e0-ba179443e04b at 2015-06-06 19:38:56 to the work queue.
Thread 12 processing work task Work with job ID f68abbdf-14d2-4975-82e0-ba179443e04b, entered on 2015-06-06 19:38:56
Thread 11 added work Work with job ID bf279813-b563-4ecc-9bae-23c692c0890a at 2015-06-06 19:38:58 to the work queue.
Thread 10 added work Work with job ID 1f188f5c-5a38-49f9-a810-5459b3a1c675 at 2015-06-06 19:38:58 to the work queue.
Thread 13 processing work task Work with job ID bf279813-b563-4ecc-9bae-23c692c0890a, entered on 2015-06-06 19:38:58

So in my case threads 9, 10 and 11 were the producers, whereas threads 12 and 13 handled the consumers. Task with ID 7ec0fada-66b7-4696-867f-2987a4089402 was added on thread 11 and consumed on thread 12. Task ID 1f846bdf-3c91-4d28-bf6e-f1e4e3690d4f was inserted on thread 10 and consumed on thread 13.

This was probably the most simplistic setup of a producer-consumer scenario using a blocking collection. Both consumers and producers run for ever – or at least until an unhandled exception is thrown.

We’ll add a way for the producer to indicate to the consumer when it’s finished producing work items in the next post.

View the list of posts on the Task Parallel Library here.

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

One Response to Using the BlockingCollection for thread-safe producer-consumer scenarios in .NET Part 3

  1. Michael says:

    hi thanks for your code. it was very helpful. i tried your consumer producer queue without thread.sleep to test how fast it is and my I7 could only add 20 items per second into the queue with one producer and one consumer. I really thought this would be faster. hmm…

Leave a comment

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.