Using the BlockingCollection for thread-safe producer-consumer scenarios in .NET Part 4
September 11, 2015 1 Comment
In the previous post we saw how to wire up the producer and the consumer in a very simplistic producer-consumer scenario in .NET using a BlockingCollection. We started a number of producer and consumer threads and checked their output in the Debug window.
In this post we’ll see how we can send a signal to the consumer that all producers finished adding new items to the work queue. This is to simulate a scenario where you can determine in advance when all expected items have been added to the work queue.
The key to adding this functionality is the CompleteAdding() method of BlockingCollection. It marks the collection as closed and won’t accept any more items. Calling BlockingCollection.Take() will throw an exception of type InvalidOperationException so we’ll need to catch that.
Here’s the updated WorkQueue class:
public class WorkQueue { private BlockingCollection<WorkTask> _workQueue; public WorkQueue(IProducerConsumerCollection<WorkTask> workTaskCollection) { _workQueue = new BlockingCollection<WorkTask>(workTaskCollection); } public void AddTask(WorkTask workTask) { _workQueue.Add(workTask); } public void AllItemsAdded() { _workQueue.CompleteAdding(); } public void MonitorWorkQueue() { while (true) { try { WorkTask wt = _workQueue.Take(); Debug.WriteLine(string.Format("Thread {0} processing work task {1}, entered on {2}", Thread.CurrentThread.ManagedThreadId, wt.Description, wt.InsertedUtc)); } catch (InvalidOperationException e) { Debug.WriteLine(string.Format("Work queue on thread {0} has been closed.", Thread.CurrentThread.ManagedThreadId)); break; } } } }
For the producer we’ll generate a random upper limit for a for-loop. The upper limit will indicate the maximum number of work items to be added to the queue. Here’s the updated ProduceWorkItems method:
public void ProduceWorkItems() { Random random = new Random(); int upperLimit = random.Next(5, 11); for (int i = 0; i <= upperLimit; i++) { Guid jobId = Guid.NewGuid(); WorkTask wt = new WorkTask(string.Concat("Work with job ID ", jobId), DateTime.UtcNow); Debug.WriteLine(string.Format("Thread {0} added work {1} at {2} to the work queue in iteration {3}.", Thread.CurrentThread.ManagedThreadId, wt.Description, wt.InsertedUtc, i + 1)); _workQueue.AddTask(wt); Thread.Sleep(2000); } }
We also need to modify the code that starts up the threads. As we know when the producers will be done we can wait for all producer threads to finish. When the producer threads have completed their work we can tell the consumer that it can be closed:
public class BlockingCollectionSampleService { public void RunBlockingCollectionCodeSample() { WorkQueue workQueue = new WorkQueue(new ConcurrentQueue<WorkTask>()); WorkItemProducer producerOne = new WorkItemProducer(workQueue); WorkItemProducer producerTwo = new WorkItemProducer(workQueue); WorkItemProducer producerThree = new WorkItemProducer(workQueue); Task producerTaskOne = Task.Run(() => producerOne.ProduceWorkItems()); Task producerTaskTwo = Task.Run(() => producerTwo.ProduceWorkItems()); Task producerTaskThree = Task.Run(() => producerThree.ProduceWorkItems()); Task consumerTaskOne = Task.Run(() => workQueue.MonitorWorkQueue()); Task consumerTaskTwo = Task.Run(() => workQueue.MonitorWorkQueue()); Task.WaitAll(producerTaskOne, producerTaskThree, producerTaskTwo); workQueue.AllItemsAdded(); Task.WaitAll(consumerTaskOne, consumerTaskTwo); Console.WriteLine("Tasks finished..."); } }
The output in the debug window will be similar to what we saw before except that the threads will exit when they have returned:
Thread 12 processing work task Work with job ID d493a853-ae6a-442e-b12a-87bb3db23488, entered on 2015-06-06 20:18:58
Thread 10 added work Work with job ID e176bc3a-6131-4dfc-b4fd-dc4ee9181589 at 2015-06-06 20:18:58 to the work queue in iteration 6.
Thread 13 processing work task Work with job ID e176bc3a-6131-4dfc-b4fd-dc4ee9181589, entered on 2015-06-06 20:18:58
Thread 6 added work Work with job ID 99757e78-b7c3-4368-90b9-8eab6054a9a3 at 2015-06-06 20:19:00 to the work queue in iteration 7.
Thread 12 processing work task Work with job ID 99757e78-b7c3-4368-90b9-8eab6054a9a3, entered on 2015-06-06 20:19:00
Work queue on thread 12 has been closed.
Work queue on thread 13 has been closed.
We’ll finish off this mini-series in the next post where we’ll take a look at some other customisation points of a blocking collection.
View the list of posts on the Task Parallel Library here.
Great series of articles, very clear and objective! I come from a Go background, which uses this pattern a lot with channels and goroutines, and I wasn’t very happy with what .Net had to offer until I read your posts.
Just a question though, after reading this I didn’t like the fact that an exception is thrown to indicate that the queue was completed. That is because I strongly believe exceptions should only be thrown if something unexpected happens. And because the queue is expected to complete eventually that is not the case here.
So I did some research and I found the IsCompleted property:
“Gets whether this BlockingCollection has been marked as complete for adding and is empty.”
The last part is extremely important because you don’t want your loop to end before reading all items.
So your MonitorWorkQueue method could be rewritten as:
public void MonitorWorkQueue()
{
while (!_workQueue.IsCompleted)
{
WorkTask wt = _workQueue.Take();
Debug.WriteLine(string.Format(“Thread {0} processing work task {1}, entered on {2}”, Thread.CurrentThread.ManagedThreadId, wt.Description, wt.InsertedUtc));
}
}
Is there a specific reason why you used the exception instead? I believe the IsCompleted property provides a simpler and cleaner solution.