Thread safe collections in .NET: ConcurrentQueue

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent queues

If you don’t know what Queues are then you can read about them here. The Queue of T generic collection has a thread-safe counterpart called ConcurrentQueue. Important methods:

  • Enqueue(T element): adds an item of type T to the collection
  • TryPeek(out T): tries to retrieve the next element from the collection without removing it. The value is set to the out parameter if the method succeeds. Otherwise it returns false.
  • TryDequeue(out T): tries to get the first element. It removes the item from the collection and sets the out parameter to the retrieved element. Otherwise the method returns false

The ‘try’ bit in the method names implies that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same queue you cannot be sure what’s in there when a specific thread tries to read from it.

Example

Declare and fill a concurrent queue:

ConcurrentQueue<int> concurrentQueue = new ConcurrentQueue<int>();

for (int i = 0; i < 5000; i++)
{
	concurrentQueue.Enqueue(i);
}

We’ll want to get the items from this collection and check if all of them have been retrieved using a counter. The counter will also be shared among the threads using the ‘lock’ technique we saw in this post – or actually something that is similar to the ‘lock’ keyword: the Interlocked class. Interlocked has an Increment method which accepts a ref int parameter. It will increment the incoming integer in an atomic operation.

int itemCount = 0;

Task[] queueTasks = new Task[20];
for (int i = 0; i < queueTasks.Length; i++)
{
	queueTasks[i] = Task.Factory.StartNew(() =>
	{
		while (concurrentQueue.Count > 0)
		{
			int currentElement;
			bool success = concurrentQueue.TryDequeue(out currentElement);
			if (success)
			{
				Interlocked.Increment(ref itemCount);
			}
		}
	});
}

The while loop will ensure that we’ll try to dequeue the items as long as there’s something left in the collection.

Wait for the tasks and print the number of items processed – the counter should have the same value as the number of items in the queue:

Task.WaitAll(queueTasks);
Console.WriteLine("Counter: {0}", itemCount);

View the list of posts on the Task Parallel Library here.

Suspending a Task using a CancellationToken in .NET C#

You may want to put a Task to sleep for some time. You might want to check the state of an object before continuing. The Task continues after the sleep time.

One way to solve this is using the WaitOne method of the WaitHandle property of the CancellationToken object.

Construct the CancellationToken object as follows:

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;

The WaitOne() method without parameters suspends the Task execution until the Cancel() method of the CancellationToken object has been called. It has two overloads where you can specify some time: the amount of time to wait in milliseconds or some TimeSpan. The task suspension lasts until the sleep time is over OR the Cancel() method on the cancellation token has been called, whichever happens first. The WaitOne method also has a boolean return value where ‘true’ means that the Task has been cancelled and false means the the allotted sleep time was over.

You can use the cancellation token and the WaitOne method as follows:

Task task = Task.Factory.StartNew(() =>
{
	for (int i = 0; i < 1000000; i++)
	{
		bool cancelled = cancellationToken.WaitHandle.WaitOne(10000);
		Console.WriteLine("Value {0}. Cancelled? {1}", i, cancelled);
		if (cancelled)
		{
			throw new OperationCanceledException(cancellationToken);
		}
	}
}, cancellationToken);

You can cancel the task as follows:

cancellationTokenSource.Cancel();

View the list of posts on the Task Parallel Library here.

Thread safe collections in .NET: ConcurrentBag

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent bags

Concurrent bags are similar to concurrent stacks and concurrent queues but there’s a key difference. Bags are unordered collections. This means that the order of the items is not the same as how they were inserted. So concurrent bags are ideal if you would like to share a List of T generic collection among several tasks.

Important methods:

  • Add(T element): adds an item of type T to the collection
  • TryPeek(out T): tries to retrieve the next element from the collection without removing it. The value is set to the out parameter if the method succeeds. Otherwise it returns false.
  • TryTake(out T): tries to get the first element. It removes the item from the collection and sets the out parameter to the retrieved element. Otherwise the method returns false

The ‘try’ bit in the method names imply that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same list you cannot be sure what’s in there when a specific thread tries to read from it.

Example

The example is almost identical to what we saw for the collections discussed previously in the posts on TPL.

Declare and fill a concurrent bag:

ConcurrentBag<int> concurrentBag = new ConcurrentBag<int>();

for (int i = 0; i < 5000; i++)
{
	concurrentBag.Add(i);
}

Next we’ll try to take every item from the bag. The bag will be accessed by several tasks at the same time. The counter variable – which is also shared and synchronised- will be used to check if all items have been retrieved.

int counter = 0;

Task[] bagTasks = new Task[20];
for (int i = 0; i < bagTasks.Length; i++)
{
	bagTasks[i] = Task.Factory.StartNew(() =>
	{
		while (concurrentBag.Count > 0)
		{
			int bagElement;
			bool success = concurrentBag.TryTake(out bagElement);
			if (success)
			{
				Interlocked.Increment(ref counter);
			}
		}
	});
}

The while loop will ensure that we’ll try to take the items as long as there’s something left in the collection.

Wait for the tasks and print the number of items processed – the counter should have the same value as the number of items in the bag:

Task.WaitAll(bagTasks);
Console.WriteLine("Counter: {0}", counter);

View the list of posts on the Task Parallel Library here.

Share data between Tasks using locks in .NET C#

You need to be careful when sharing mutable data across several tasks. If you don’t restrict the access to a shared variable then the tasks will all modify it in an unpredictable way. It’s because you cannot be sure how the tasks will be started and managed by the operating system: it depends on a range of parameters, such as the CPU usage, available memory, etc. If each task is supposed to modify a variable then maybe the first task won’t finish its work before the second task reads the same variable.

Consider the following:

class BankAccount
{
	public int Balance { get; set; }
}

The following code builds a list of tasks that each increase the balance by 1:

BankAccount account = new BankAccount();
List<Task> tasks = new List<Task>();

for (int i = 0; i < 5; i++)
{
	tasks.Add(Task.Factory.StartNew(() =>
	{
		for (int j = 0; j < 1000; j++)
		{
			account.Balance = account.Balance + 1;
		}
	}));
}
Task.WaitAll(tasks.ToArray());
Console.WriteLine(string.Concat("Expected value 5000", ", actual value ",account.Balance));

We expect the final balance to be 5000. Run the code and you will most likely get something less, such as 3856 or 4652. At times you get exactly 5000 but it’s unpredictable.

In such cases you need to identify the volatile critical regions in your code. In this example it’s simple:

account.Balance = account.Balance + 1;

It is the account balance that is modified by each thread so we need to lock it for access so that only one thread can modify it at a certain time. You can take the word “lock” almost literally as we use the lock keyword to do that. We take an object which will act as the synchronisation primitive. The lock object must be visible to all tasks:

object lockObject = new object();

You use the lock keyword as follows:

for (int i = 0; i < 5; i++)
{
	tasks.Add(Task.Factory.StartNew(() =>
	{
		for (int j = 0; j < 1000; j++)
		{
			lock (lockObject)
			{
				account.Balance = account.Balance + 1;
			}
		}
	}));
}

The volatile region of the code is locked using the lock object. The lock is released as soon as the current task has finished working with it.

Run the modified code and you should get the correct result.

View the list of posts on the Task Parallel Library here.

Getting a return value from a Task with C#

Sometimes you want to get a return value from a Task as opposed to letting it run in the background and forgetting about it. You’ll need to specify the return type as a type parameter to the Task object: a Task of T.

.NET 4.0

Without specifying an input parameter:

Task<int> task = new Task<int>(() => 
{
    int total = 0;
    for (int i = 0; i < 500; i++)
    {
        total += i;
    }
    return total;
});

task.Start();
int result = Convert.ToInt32(task.Result);

We count to 500 and return the sum. The return value of the Task can be retrieved using the Result property which can be converted to the desired type.

You can provide an input parameter as well:

Task<int> task = new Task<int>(obj => 
{
    int total = 0;
    int max = (int)obj;
    for (int i = 0; i < max; i++)
    {
        total += i;
    }
    return total;
}, 300);

task.Start();
int result = Convert.ToInt32(task.Result);

We specify that we want to count to 300.

.NET 4.5

The recommended way in .NET 4.5 is to use Task.FromResult, Task.Run or Task.Factory.StartNew:

FromResult:

public async Task DoWork()
{
       int res = await Task.FromResult<int>(GetSum(4, 5));	
}

private int GetSum(int a, int b)
{
	return a + b;
}

Please check out Stefan’s comments on the usage of FromResult in the comments section below the post.

Task.Run:

public async Task DoWork()
{
	Func<int> function = new Func<int>(() => GetSum(4, 5));
	int res = await Task.Run<int>(function);
}

private int GetSum(int a, int b)
{
	return a + b;
}

Task.Factory.StartNew:

public async Task DoWork()
{
	Func<int> function = new Func<int>(() => GetSum(4, 5));
	int res = await Task.Factory.StartNew<int>(function);			
}

private int GetSum(int a, int b)
{
	return a + b;
}

View the list of posts on the Task Parallel Library here.

Thread safe collections in .NET: ConcurrentDictionary

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent dictionaries

Concurrent dictionaries are thread-safe equivalent collections of “normal” dictionaries, i.e. key-value pair collections. Concurrent dictionaries are ideal if you would like to share a key-pair collection of types K and V among several tasks.

Important methods:

  • TryAdd(K key, V value): adds an new key-value pair to the collection. Returns true if the insertion was successful
  • TryGetValue(K key, out V value): tries to retrieve the value of the key. Returns true if the extraction was successful and the value is assigned to the out parameter
  • TryRemove(K key, out V value): tries to remove the key-value pair associated with the key. Returns true if the deletion was successful and the value is assigned to the out parameter
  • TryUpdate(K key, V new, V current): update the value of the key-value pair with the ‘new’ value if the current value is equal to ‘current’. Returns true if the update was successful
  • ContainsKey(K key): same as ContainsKey of the normal Dictionary class, i.e. returns true if the key is found in the dictionary

The ‘try’ bit in the method names imply that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same collection you cannot be sure what’s in there when a specific thread tries to read from it. E.g. even if ContainsKey returns true there’s no guarantee that the key-value pair is still present when the thread tries to read from it as another thread might have already removed it.

Example

We’ll need a simple object with a single property for the example:

class Series
{
	public int CurrentValue
	{
		get;
		set;
	}
}

The following code creates 20 tasks and each task increases the value of the CurrentValue property in the shared dictionary by 1000 in loop. So we’re expecting the final value to be 20000. We fill up the task array in a loop and start the tasks individually. The key-value in the dictionary may look like the following at a certain stage:

key: 0 (the task ID represented by the taskParameter object, which is the same as ‘i’ in the main loop), value: 40
key: 1, value 46
key: 2: value 43
.
.
.
key: 19, value 45

After the loop the values of each thread are added to the CurrentValue property:

Series series = new Series();
ConcurrentDictionary<int, int> concurrentDictionary = new ConcurrentDictionary<int, int>();
Task<int>[] taskArray = new Task<int>[20];
for (int i = 0; i < taskArray.Length; i++)
{
	concurrentDictionary.TryAdd(i, series.CurrentValue);

	taskArray[i] = Task.Factory.StartNew<int>((taskParameter) =>
	{
		int current;
		bool valueRetrieved;
		int key = Convert.ToInt32(taskParameter);
		for (int j = 0; j < 1000; j++)
		{
			valueRetrieved = concurrentDictionary.TryGetValue(key, out current);
			concurrentDictionary.TryUpdate(key, current + 1, current);
		}

		int result;
		valueRetrieved = concurrentDictionary.TryGetValue(key, out result);
		if (valueRetrieved)
		{
			return result;
		}
		else
		{
			throw new Exception(String.Format("No data item available for key {0}", taskParameter));
		}
	}, i);
}

for (int i = 0; i < taskArray.Length; i++)
{
	series.CurrentValue += taskArray[i].Result;
}

Console.WriteLine("Expected value {0}, Balance: {1}", 20000, series.CurrentValue);

View the list of posts on the Task Parallel Library here.

Thread safe collections in .NET: ConcurrentStack

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent stacks

If you don’t know what Stacks are then you can read about them here. The Stack of T generic collection has a thread-safe counterpart called ConcurrentStack. Important methods:

  • Push(T element): adds an item of type T to the collection
  • PushRange(T[] elements) and PushRange(T[] elements, int, int): same as Push but is used for adding an array of items to the collection
  • TryPeek(out T): tries to retrieve the next element from the collection without removing it. The value is set to the out parameter if the method succeeds. Otherwise it returns false.
  • TryPop(out T): tries to get the first element. It removes the item from the collection and sets the out parameter to the retrieved element. Otherwise the method returns false
  • TryPopRange(out T[] elements) and TryPopRange(out T[], int, int): same as TryPop but is used for arrays

The ‘try’ bit in the method names imply that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same stack you cannot be sure what’s in there when a specific thread tries to read from it.

Example

Declare and fill a concurrent stack:

ConcurrentStack<int> concurrentStack = new ConcurrentStack<int>();

for (int i = 0; i < 5000; i++)
{
	concurrentStack.Push(i);
}

Next we’ll try to pop every item from the stack. The stack will be accessed by several tasks at the same time. The counter variable – which is also shared – will be used to check if all items have been retrieved.

int counter = 0;

Task[] stackTasks = new Task[10];
for (int i = 0; i < stackTasks.Length; i++)
{
	stackTasks[i] = Task.Factory.StartNew(() =>
	{
		while (concurrentStack.Count > 0)
		{
			int currentElement;
			bool success = concurrentStack.TryPop(out currentElement);
			if (success)
			{
				Interlocked.Increment(ref counter);
			}
		}
	});
}

The while loop will ensure that we’ll try to pop the items as long as there’s something left in the collection.

Wait for the tasks and print the number of items processed – the counter should have the same value as the number of items in the stack:

Task.WaitAll(stackTasks);
Console.WriteLine("Counter: {0}", counter);

View the list of posts on the Task Parallel Library here.

7 ways to start a Task in .NET C#

New threads can be started using the Task Programming Library in .NET in – at last – 5 different ways.

You’ll first need to add the following using statement:

using System.Threading.Tasks;

The most direct way

Task.Factory.StartNew(() => {Console.WriteLine("Hello Task library!"); });

Using Action

Task task = new Task(new Action(PrintMessage));
task.Start();

…where PrintMessage is a method:

private void PrintMessage()
{
    Console.WriteLine("Hello Task library!");
}

Using a delegate

Task task = new Task(delegate { PrintMessage(); });
task.Start();

Lambda and named method

Task task = new Task( () => PrintMessage() );
task.Start();

Lambda and anonymous method

Task task = new Task( () => { PrintMessage(); } );
task.Start();

Using Task.Run in .NET4.5

public async Task DoWork()
{
	await Task.Run(() => PrintMessage());
}

Using Task.FromResult in .NET4.5 to return a result from a Task

public async Task DoWork()
{
	int res = await Task.FromResult<int>(GetSum(4, 5));	
}

private int GetSum(int a, int b)
{
	return a + b;
}

You cannot start a task that has already completed. If you need to run the same task you’ll need to initialise it again.

View the list of posts on the Task Parallel Library here.

Exception handling in async methods in .NET4.5 MVC4 with C#

In this post we’ll take a look at how to handle exceptions that are thrown by actions that are awaited. My previous post already included some exception handling techniques in MVC4 but here we will concentrate on exceptions thrown by await actions. Check my previous 3 posts for the full story behind the code examples shown here.

We will simulate some problems by intentionally throwing an exception in GetResultAsync and GetDataAsync:

public async Task<String> GetDataAsync(CancellationToken ctk)
        {
            StringBuilder dataBuilder = new StringBuilder();
            dataBuilder.Append("Starting GetData on thread id ").Append(Thread.CurrentThread.ManagedThreadId)
                .Append(". ");
            ctk.ThrowIfCancellationRequested();
            await Task.Delay(2000);
            throw new Exception("Something terrible has happened!");
            dataBuilder.Append("Results from the database. ").Append(Environment.NewLine);
            dataBuilder.Append("Finishing GetData on thread id ").Append(Thread.CurrentThread.ManagedThreadId)
                .Append(".");
            return dataBuilder.ToString();
        }
public async Task<String> GetResultAsync(CancellationToken ctk)
        {
            StringBuilder resultBuilder = new StringBuilder();
            resultBuilder.Append("Starting GetResult on thread id ").Append(Thread.CurrentThread.ManagedThreadId)
                .Append(". ");
            ctk.ThrowIfCancellationRequested();
            await Task.Delay(2000);
            throw new Exception("The service is down!");
            resultBuilder.Append("This is the result of a long running calculation. ");
            resultBuilder.Append("Finishing GetResult on thread id ").Append(Thread.CurrentThread.ManagedThreadId)
                .Append(".");
            return resultBuilder.ToString();
        }

Also, increase the timeout value of the Index action so that we do not get a Timeout Exception:

[AsyncTimeout(4000)]
[HandleError(ExceptionType = typeof(TimeoutException), View = "Timeout")]
public async Task<ActionResult> Index(CancellationToken ctk)

Before you run the application change the ‘mode’ attribute of the customErrors element in the web.config to “Off” as we want to see the debug data.

It does not come as a surprise that we run into an exception:

Intentional exception YSOD

If you had worked with the TPL library before then you may have expected an AggregateException that wraps all exceptions encountered during the parallel calls. However, TPL behaves slightly differently in conjunction with the await keyword. It is still an AggregateException that is instantiated behind the scenes but the .NET runtime will only throw the first exception that was encountered during the method execution.

This is good news: we can set up our try-catch structures as usual; we don’t need to worry about inspecting an AggregateException anymore.

View the list of MVC and Web API related posts here.

Timeout exceptions with async/await in .NET4.5 MVC4 with C#

This post will discuss timeouts that occur with await and async in .NET4.5. For clarity on async and await in MVC4 check out my previous two blog posts: Await and async in .NET4.5 and Async controllers and actions in .NET4.5 MVC4

As await operations may involve some seriously long running actions, such as calling a slow web service, it can be a good idea to specify a timeout. We may not want to make the visitor wait 60 seconds just to see an error message afterwards. If your experience tells you that a web service normally responds within 5 seconds at most then it may be pointless waiting 50-60 seconds as you can be sure something has gone wrong. ASP.NET has a default request timeout of 90 seconds – correct me here if I’m wrong – but we can specify other values directly in code with an attribute: AsyncTimeout that takes the timeout value in milliseconds as parameter.

In addition to the AsyncTimeout attribute you’ll also need to supply an additional parameter of type CancellationToken to the async action. This parameter can be used by the long running services to check if the user has requested a cancellation. The CancellationToken has an IsCancellationRequested property which provides exactly this type of information. In our example we’ll pass this token to the service calls and use it to throw an exception if the request has been cancelled. As our services are not real service calls, there is no clean-up work to do but imagine that if an IO operation is interrupted by a user then the cancellation token can throw an exception and you can clean up all open resources or roll back the database operations in a catch clause.

You can read more about cancellation tokens on MSDN: Cancellation token on MSDN

Update service methods:

public async Task<String> GetDataAsync(CancellationToken ctk)
        {
            StringBuilder dataBuilder = new StringBuilder();
            dataBuilder.Append("Starting GetData on thread id ").Append(Thread.CurrentThread.ManagedThreadId)
                .Append(". ");
            ctk.ThrowIfCancellationRequested();
            await Task.Delay(2000);
            dataBuilder.Append("Results from the database. ").Append(Environment.NewLine);
            dataBuilder.Append("Finishing GetData on thread id ").Append(Thread.CurrentThread.ManagedThreadId)
                .Append(".");
            return dataBuilder.ToString();
        }
public async Task<String> GetResultAsync(CancellationToken ctk)
        {
            StringBuilder resultBuilder = new StringBuilder();
            resultBuilder.Append("Starting GetResult on thread id ").Append(Thread.CurrentThread.ManagedThreadId)
                .Append(". ");
            ctk.ThrowIfCancellationRequested();
            await Task.Delay(2000);
            resultBuilder.Append("This is the result of a long running calculation. ");
            resultBuilder.Append("Finishing GetResult on thread id ").Append(Thread.CurrentThread.ManagedThreadId)
                .Append(".");
            return resultBuilder.ToString();
        }

We know that the Index() action needs about 2 seconds to complete so let’s try something more aggressive to see what happens:

[AsyncTimeout(1000)]
        public async Task<ActionResult> Index(CancellationToken ctk)
        {
            DateTime startDate = DateTime.UtcNow;

            HomePageViewModel viewModel = new HomePageViewModel();
            viewModel.AddMessage(string.Concat("Starting Action on thread id ", Thread.CurrentThread.ManagedThreadId));
            CalculationService calcService = new CalculationService();
            DatabaseService dataService = new DatabaseService();

            Task<String> calculationResultTask = calcService.GetResultAsync(ctk);
            Task<String> databaseResultTask = dataService.GetDataAsync(ctk);

            await Task.WhenAll(calculationResultTask, databaseResultTask);

            viewModel.AddMessage(calculationResultTask.Result);
            viewModel.AddMessage(databaseResultTask.Result);

            DateTime endDate = DateTime.UtcNow;
            TimeSpan diff = endDate - startDate;

            viewModel.AddMessage(string.Concat("Finishing Action on thread id ", Thread.CurrentThread.ManagedThreadId));
            viewModel.AddMessage(string.Concat("Action processing time: ", diff.TotalSeconds));
            return View(viewModel);
        }

It is no surprise that we get a timout exception upon running the application:

Timeout YSOD

The yellow screen of death is great for debugging but not so nice in a production environment. To turn on custom error messages you must change web.config: locate the customErrors tag under system.web and change the mode attribute to “On” for the production environment. If your web.config does not have this tag then add it:

<system.web>
    <customErrors mode="On"></customErrors>

There is a default view in the Shared folder within Views called Error.cshtml. After modifying the web.config file the user will be redirected to that view upon an unhandled exception:

Error.cshtml screen

You can of course create custom views for errors and then specify which error view to show using attributes. Example:

[AsyncTimeout(1000)]
        [HandleError(ExceptionType = typeof(TimeoutException), View = "Timeout")]
        public async Task<ActionResult> Index(CancellationToken ctk)

This way you can specify error views for specific types of unhandled exceptions.

The next post will look at exception handling in async methods.

View the list of MVC and Web API related posts here.

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

ARCHIVED: Bite-size insight on Cyber Security for the not too technical.