Using the BlockingCollection for thread-safe producer-consumer scenarios in .NET Part 4

In the previous post we saw how to wire up the producer and the consumer in a very simplistic producer-consumer scenario in .NET using a BlockingCollection. We started a number of producer and consumer threads and checked their output in the Debug window.

In this post we’ll see how we can send a signal to the consumer that all producers finished adding new items to the work queue. This is to simulate a scenario where you can determine in advance when all expected items have been added to the work queue.

The key to adding this functionality is the CompleteAdding() method of BlockingCollection. It marks the collection as closed and won’t accept any more items. Calling BlockingCollection.Take() will throw an exception of type InvalidOperationException so we’ll need to catch that.

Here’s the updated WorkQueue class:

public class WorkQueue
{
	private BlockingCollection<WorkTask> _workQueue;

	public WorkQueue(IProducerConsumerCollection<WorkTask> workTaskCollection)
	{
		_workQueue = new BlockingCollection<WorkTask>(workTaskCollection);
	}

	public void AddTask(WorkTask workTask)
	{
		_workQueue.Add(workTask);
	}

	public void AllItemsAdded()
	{
		_workQueue.CompleteAdding();
	}

	public void MonitorWorkQueue()
	{
		while (true)
		{
			try
			{
				WorkTask wt = _workQueue.Take();
				Debug.WriteLine(string.Format("Thread {0} processing work task {1}, entered on {2}", Thread.CurrentThread.ManagedThreadId, wt.Description, wt.InsertedUtc));
			}
			catch (InvalidOperationException e)
			{
				Debug.WriteLine(string.Format("Work queue on thread {0} has been closed.", Thread.CurrentThread.ManagedThreadId));
				break;
			}
		}
	}
}

For the producer we’ll generate a random upper limit for a for-loop. The upper limit will indicate the maximum number of work items to be added to the queue. Here’s the updated ProduceWorkItems method:

public void ProduceWorkItems()
{
	Random random = new Random();
	int upperLimit = random.Next(5, 11);
	for (int i = 0; i <= upperLimit; i++)
	{
		Guid jobId = Guid.NewGuid();
		WorkTask wt = new WorkTask(string.Concat("Work with job ID ", jobId), DateTime.UtcNow);
		Debug.WriteLine(string.Format("Thread {0} added work {1} at {2} to the work queue in iteration {3}.",
			Thread.CurrentThread.ManagedThreadId, wt.Description, wt.InsertedUtc, i + 1));
		_workQueue.AddTask(wt);
		Thread.Sleep(2000);
	}			
}

We also need to modify the code that starts up the threads. As we know when the producers will be done we can wait for all producer threads to finish. When the producer threads have completed their work we can tell the consumer that it can be closed:

public class BlockingCollectionSampleService
{
	public void RunBlockingCollectionCodeSample()
	{
		WorkQueue workQueue = new WorkQueue(new ConcurrentQueue<WorkTask>());
		WorkItemProducer producerOne = new WorkItemProducer(workQueue);
		WorkItemProducer producerTwo = new WorkItemProducer(workQueue);
		WorkItemProducer producerThree = new WorkItemProducer(workQueue);

		Task producerTaskOne = Task.Run(() => producerOne.ProduceWorkItems());
		Task producerTaskTwo = Task.Run(() => producerTwo.ProduceWorkItems());
		Task producerTaskThree = Task.Run(() => producerThree.ProduceWorkItems());
		Task consumerTaskOne = Task.Run(() => workQueue.MonitorWorkQueue());
		Task consumerTaskTwo = Task.Run(() => workQueue.MonitorWorkQueue());
		Task.WaitAll(producerTaskOne, producerTaskThree, producerTaskTwo);
		workQueue.AllItemsAdded();			
		Task.WaitAll(consumerTaskOne, consumerTaskTwo);
		Console.WriteLine("Tasks finished...");
	}
}

The output in the debug window will be similar to what we saw before except that the threads will exit when they have returned:

Thread 12 processing work task Work with job ID d493a853-ae6a-442e-b12a-87bb3db23488, entered on 2015-06-06 20:18:58
Thread 10 added work Work with job ID e176bc3a-6131-4dfc-b4fd-dc4ee9181589 at 2015-06-06 20:18:58 to the work queue in iteration 6.
Thread 13 processing work task Work with job ID e176bc3a-6131-4dfc-b4fd-dc4ee9181589, entered on 2015-06-06 20:18:58
Thread 6 added work Work with job ID 99757e78-b7c3-4368-90b9-8eab6054a9a3 at 2015-06-06 20:19:00 to the work queue in iteration 7.
Thread 12 processing work task Work with job ID 99757e78-b7c3-4368-90b9-8eab6054a9a3, entered on 2015-06-06 20:19:00
Work queue on thread 12 has been closed.
Work queue on thread 13 has been closed.

We’ll finish off this mini-series in the next post where we’ll take a look at some other customisation points of a blocking collection.

View the list of posts on the Task Parallel Library here.

Advertisements

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

ultimatemindsettoday

A great WordPress.com site

Elliot Balynn's Blog

A directory of wonderful thoughts

Robin Sedlaczek's Blog

Developer on Microsoft Technologies

HarsH ReaLiTy

My goal with this blog is to offend everyone in the world at least once with my words… so no one has a reason to have a heightened sense of themselves. We are all ignorant, we are all found wanting, we are all bad people sometimes.

Softwarearchitektur in der Praxis

Wissenswertes zu Webentwicklung, Domain-Driven Design und Microservices

the software architecture

thoughts, ideas, diagrams,enterprise code, design pattern , solution designs

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

Anything around ASP.NET MVC,WEB API, WCF, Entity Framework & AngularJS

Cyber Matters

Bite-size insight on Cyber Security for the not too technical.

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: