Exception handling in the .NET Task Parallel Library with C#: using Handle()

In this post we saw how to catch unhandled exceptions thrown by tasks. Recall that the AggregateException object has a list of inner exceptions thrown by the tasks that are waited on.

You may want to handle each type of exception in a different way. There may be types of exception that you can handle locally and some other unexpected ones that should be propagated further up the stack trace. The AggregateException object has a Handle() method where you can specify the action taken depending on the type of the inner exception. The function should return true if the exception can be dealt with locally and false if it should be propagated upwards.

Construct two tasks using the cancellation token in the first one:

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;

Task firstTask = Task.Factory.StartNew(() =>
{
	cancellationToken.WaitHandle.WaitOne(-1);
	throw new OperationCanceledException(cancellationToken);
}, cancellationToken);

Task secondTask = Task.Factory.StartNew(() =>
{
	throw new NullReferenceException();
});

The effect of WaitHandle.WaitOne(-1) is to wait forever until the token has been cancelled.

Cancel the token which will interrupt the first task:

cancellationTokenSource.Cancel();

Wait for the tasks, catch the AggregateException and call its Handle method:

try
{
	Task.WaitAll(firstTask, secondTask);
}
catch (AggregateException ex)
{
	ex.Handle((innerException) =>
	{
		if (innerException is OperationCanceledException)
		{
			return true;
		}
		else
		{
			return false;
		}
	}
}

We declare that OperationCanceledException is an expected type and can be dealt with locally. All other types should be re-thrown.

If you run the above code in a console app make sure to run it without debugging (Ctrl + F5), otherwise the code execution will unnecessarily stop when the OperationCanceledException is thrown in the first task.

You’ll see that the NullReferenceException is re-thrown as we return false for all types except for the OperationCanceledException type. Test returning true instead: the exception will be “swallowed” within the Handle method.

View the list of posts on the Task Parallel Library here.

Exception handling in the .NET Task Parallel Library with C#: the basics

Handling exceptions in threading is essential. Unhandled exceptions can cause your application to exit in an unexpected and unpredictable way.

If a task throws an exception that’s not caught within the task body then the exception remains hidden at first. It bubbles up again when a trigger method is called, such as Task.Wait, Task.WaitAll, etc. The exception will then be wrapped in an AggregateException object which has an InnerException property. This inner exception will hold the actual type of exception thrown by the tasks that are waited on.

Construct and start the tasks:

Task firstTask = Task.Factory.StartNew(() =>
{
	ArgumentNullException exception = new ArgumentNullException();
	exception.Source = "First task";
	throw exception;
});
Task secondTask = Task.Factory.StartNew(() =>
{
	throw new ArgumentException();
});
Task thirdTask = Task.Factory.StartNew(() =>
{
	Console.WriteLine("Hello from the third task.");
});

Wait for all the tasks, catch the aggregate exception and analyse its inner exceptions:

try
{
	Task.WaitAll(firstTask, secondTask, thirdTask);
}
catch (AggregateException ex)
{
	foreach (Exception inner in ex.InnerExceptions)
	{
		Console.WriteLine("Exception type {0} from {1}",
			inner.GetType(), inner.Source);
	}
}

Note: if you run the above code in Debug mode in a console application then the execution will stop at each exception thrown. To simulate what’s going to happen in a deployed application make sure you run the code without debugging (Ctlr + F5).

View the list of posts on the Task Parallel Library here.

Share data between Tasks using locks in .NET C# in separate regions

In the this post we saw how to use the ‘lock’ keyword to restrict access to a single critical region. You can reuse the same lock object to synchronise access to the shared variable in separate regions. The modified version of the code in the referenced post looks as follows with two critical regions:

BankAccount account = new BankAccount();

List<Task> incrementTasks = new List<Task>();
List<Task> decrementTasks = new List<Task>();

object lockObject = new object();

for (int i = 0; i < 5; i++)
{
	incrementTasks.Add(Task.Factory.StartNew(() =>
	{
		for (int j = 0; j < 1000; j++)
		{
			lock (lockObject)
			{
				account.Balance++;
			}
		}
	}));
}

for (int i = 0; i < 5; i++)
{
	decrementTasks.Add(Task.Factory.StartNew(() =>
	{
		for (int j = 0; j < 1000; j++)
		{
			lock (lockObject)
			{
				account.Balance--;
			}
		}
	}));
}

Task.WaitAll(incrementTasks.ToArray());
Task.WaitAll(decrementTasks.ToArray());

Console.WriteLine(string.Concat( "Expected value: 0, actual value: ", account.Balance));

We have two sets of tasks: one that increases the balance and another that reduces it. By reusing the same lock object in both cases we can ensure that it’s always at most one task that has access to the shared variable.

View the list of posts on the Task Parallel Library here.

Thread safe collections in .NET: ConcurrentQueue

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent queues

If you don’t know what Queues are then you can read about them here. The Queue of T generic collection has a thread-safe counterpart called ConcurrentQueue. Important methods:

  • Enqueue(T element): adds an item of type T to the collection
  • TryPeek(out T): tries to retrieve the next element from the collection without removing it. The value is set to the out parameter if the method succeeds. Otherwise it returns false.
  • TryDequeue(out T): tries to get the first element. It removes the item from the collection and sets the out parameter to the retrieved element. Otherwise the method returns false

The ‘try’ bit in the method names implies that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same queue you cannot be sure what’s in there when a specific thread tries to read from it.

Example

Declare and fill a concurrent queue:

ConcurrentQueue<int> concurrentQueue = new ConcurrentQueue<int>();

for (int i = 0; i < 5000; i++)
{
	concurrentQueue.Enqueue(i);
}

We’ll want to get the items from this collection and check if all of them have been retrieved using a counter. The counter will also be shared among the threads using the ‘lock’ technique we saw in this post – or actually something that is similar to the ‘lock’ keyword: the Interlocked class. Interlocked has an Increment method which accepts a ref int parameter. It will increment the incoming integer in an atomic operation.

int itemCount = 0;

Task[] queueTasks = new Task[20];
for (int i = 0; i < queueTasks.Length; i++)
{
	queueTasks[i] = Task.Factory.StartNew(() =>
	{
		while (concurrentQueue.Count > 0)
		{
			int currentElement;
			bool success = concurrentQueue.TryDequeue(out currentElement);
			if (success)
			{
				Interlocked.Increment(ref itemCount);
			}
		}
	});
}

The while loop will ensure that we’ll try to dequeue the items as long as there’s something left in the collection.

Wait for the tasks and print the number of items processed – the counter should have the same value as the number of items in the queue:

Task.WaitAll(queueTasks);
Console.WriteLine("Counter: {0}", itemCount);

View the list of posts on the Task Parallel Library here.

Suspending a Task using Thread.Sleep in .NET C#

You may want to put a Task to sleep for some time. You might want to check the state of an object before continuing. The Task continues after the sleep time.

One way to solve this is using the classic Thread.Sleep method since the Task library uses .NET threading support in the background.

First construct your cancellation token:

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;

Use this token in the constructor of the task:

Task task = Task.Factory.StartNew(() =>
{
	for (int i = 0; i < 100000; i++)
	{
		Thread.Sleep(10000);
		Console.WriteLine(i);
		cancellationToken.ThrowIfCancellationRequested();
	}
}, cancellationToken);

Note the Sleep method where we suspend the Task for 10 seconds.

You can use the cancellation token to interrupt the task:

cancellationTokenSource.Cancel();

In the previous post we discussed how to suspend a task using the WaitOne method of the cancellation token. The main difference between WaitOne and Sleep is that Sleep will block even if the task is cancelled whereas WaitOne will exit if the cancellation come before the sleep time is over.

View the list of posts on the Task Parallel Library here.

Suspending a Task using a CancellationToken in .NET C#

You may want to put a Task to sleep for some time. You might want to check the state of an object before continuing. The Task continues after the sleep time.

One way to solve this is using the WaitOne method of the WaitHandle property of the CancellationToken object.

Construct the CancellationToken object as follows:

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;

The WaitOne() method without parameters suspends the Task execution until the Cancel() method of the CancellationToken object has been called. It has two overloads where you can specify some time: the amount of time to wait in milliseconds or some TimeSpan. The task suspension lasts until the sleep time is over OR the Cancel() method on the cancellation token has been called, whichever happens first. The WaitOne method also has a boolean return value where ‘true’ means that the Task has been cancelled and false means the the allotted sleep time was over.

You can use the cancellation token and the WaitOne method as follows:

Task task = Task.Factory.StartNew(() =>
{
	for (int i = 0; i < 1000000; i++)
	{
		bool cancelled = cancellationToken.WaitHandle.WaitOne(10000);
		Console.WriteLine("Value {0}. Cancelled? {1}", i, cancelled);
		if (cancelled)
		{
			throw new OperationCanceledException(cancellationToken);
		}
	}
}, cancellationToken);

You can cancel the task as follows:

cancellationTokenSource.Cancel();

View the list of posts on the Task Parallel Library here.

Thread safe collections in .NET: ConcurrentBag

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent bags

Concurrent bags are similar to concurrent stacks and concurrent queues but there’s a key difference. Bags are unordered collections. This means that the order of the items is not the same as how they were inserted. So concurrent bags are ideal if you would like to share a List of T generic collection among several tasks.

Important methods:

  • Add(T element): adds an item of type T to the collection
  • TryPeek(out T): tries to retrieve the next element from the collection without removing it. The value is set to the out parameter if the method succeeds. Otherwise it returns false.
  • TryTake(out T): tries to get the first element. It removes the item from the collection and sets the out parameter to the retrieved element. Otherwise the method returns false

The ‘try’ bit in the method names imply that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same list you cannot be sure what’s in there when a specific thread tries to read from it.

Example

The example is almost identical to what we saw for the collections discussed previously in the posts on TPL.

Declare and fill a concurrent bag:

ConcurrentBag<int> concurrentBag = new ConcurrentBag<int>();

for (int i = 0; i < 5000; i++)
{
	concurrentBag.Add(i);
}

Next we’ll try to take every item from the bag. The bag will be accessed by several tasks at the same time. The counter variable – which is also shared and synchronised- will be used to check if all items have been retrieved.

int counter = 0;

Task[] bagTasks = new Task[20];
for (int i = 0; i < bagTasks.Length; i++)
{
	bagTasks[i] = Task.Factory.StartNew(() =>
	{
		while (concurrentBag.Count > 0)
		{
			int bagElement;
			bool success = concurrentBag.TryTake(out bagElement);
			if (success)
			{
				Interlocked.Increment(ref counter);
			}
		}
	});
}

The while loop will ensure that we’ll try to take the items as long as there’s something left in the collection.

Wait for the tasks and print the number of items processed – the counter should have the same value as the number of items in the bag:

Task.WaitAll(bagTasks);
Console.WriteLine("Counter: {0}", counter);

View the list of posts on the Task Parallel Library here.

Share data between Tasks using locks in .NET C#

You need to be careful when sharing mutable data across several tasks. If you don’t restrict the access to a shared variable then the tasks will all modify it in an unpredictable way. It’s because you cannot be sure how the tasks will be started and managed by the operating system: it depends on a range of parameters, such as the CPU usage, available memory, etc. If each task is supposed to modify a variable then maybe the first task won’t finish its work before the second task reads the same variable.

Consider the following:

class BankAccount
{
	public int Balance { get; set; }
}

The following code builds a list of tasks that each increase the balance by 1:

BankAccount account = new BankAccount();
List<Task> tasks = new List<Task>();

for (int i = 0; i < 5; i++)
{
	tasks.Add(Task.Factory.StartNew(() =>
	{
		for (int j = 0; j < 1000; j++)
		{
			account.Balance = account.Balance + 1;
		}
	}));
}
Task.WaitAll(tasks.ToArray());
Console.WriteLine(string.Concat("Expected value 5000", ", actual value ",account.Balance));

We expect the final balance to be 5000. Run the code and you will most likely get something less, such as 3856 or 4652. At times you get exactly 5000 but it’s unpredictable.

In such cases you need to identify the volatile critical regions in your code. In this example it’s simple:

account.Balance = account.Balance + 1;

It is the account balance that is modified by each thread so we need to lock it for access so that only one thread can modify it at a certain time. You can take the word “lock” almost literally as we use the lock keyword to do that. We take an object which will act as the synchronisation primitive. The lock object must be visible to all tasks:

object lockObject = new object();

You use the lock keyword as follows:

for (int i = 0; i < 5; i++)
{
	tasks.Add(Task.Factory.StartNew(() =>
	{
		for (int j = 0; j < 1000; j++)
		{
			lock (lockObject)
			{
				account.Balance = account.Balance + 1;
			}
		}
	}));
}

The volatile region of the code is locked using the lock object. The lock is released as soon as the current task has finished working with it.

Run the modified code and you should get the correct result.

View the list of posts on the Task Parallel Library here.

Getting a return value from a Task with C#

Sometimes you want to get a return value from a Task as opposed to letting it run in the background and forgetting about it. You’ll need to specify the return type as a type parameter to the Task object: a Task of T.

.NET 4.0

Without specifying an input parameter:

Task<int> task = new Task<int>(() => 
{
    int total = 0;
    for (int i = 0; i < 500; i++)
    {
        total += i;
    }
    return total;
});

task.Start();
int result = Convert.ToInt32(task.Result);

We count to 500 and return the sum. The return value of the Task can be retrieved using the Result property which can be converted to the desired type.

You can provide an input parameter as well:

Task<int> task = new Task<int>(obj => 
{
    int total = 0;
    int max = (int)obj;
    for (int i = 0; i < max; i++)
    {
        total += i;
    }
    return total;
}, 300);

task.Start();
int result = Convert.ToInt32(task.Result);

We specify that we want to count to 300.

.NET 4.5

The recommended way in .NET 4.5 is to use Task.FromResult, Task.Run or Task.Factory.StartNew:

FromResult:

public async Task DoWork()
{
       int res = await Task.FromResult<int>(GetSum(4, 5));	
}

private int GetSum(int a, int b)
{
	return a + b;
}

Please check out Stefan’s comments on the usage of FromResult in the comments section below the post.

Task.Run:

public async Task DoWork()
{
	Func<int> function = new Func<int>(() => GetSum(4, 5));
	int res = await Task.Run<int>(function);
}

private int GetSum(int a, int b)
{
	return a + b;
}

Task.Factory.StartNew:

public async Task DoWork()
{
	Func<int> function = new Func<int>(() => GetSum(4, 5));
	int res = await Task.Factory.StartNew<int>(function);			
}

private int GetSum(int a, int b)
{
	return a + b;
}

View the list of posts on the Task Parallel Library here.

Thread safe collections in .NET: ConcurrentDictionary

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent dictionaries

Concurrent dictionaries are thread-safe equivalent collections of “normal” dictionaries, i.e. key-value pair collections. Concurrent dictionaries are ideal if you would like to share a key-pair collection of types K and V among several tasks.

Important methods:

  • TryAdd(K key, V value): adds an new key-value pair to the collection. Returns true if the insertion was successful
  • TryGetValue(K key, out V value): tries to retrieve the value of the key. Returns true if the extraction was successful and the value is assigned to the out parameter
  • TryRemove(K key, out V value): tries to remove the key-value pair associated with the key. Returns true if the deletion was successful and the value is assigned to the out parameter
  • TryUpdate(K key, V new, V current): update the value of the key-value pair with the ‘new’ value if the current value is equal to ‘current’. Returns true if the update was successful
  • ContainsKey(K key): same as ContainsKey of the normal Dictionary class, i.e. returns true if the key is found in the dictionary

The ‘try’ bit in the method names imply that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same collection you cannot be sure what’s in there when a specific thread tries to read from it. E.g. even if ContainsKey returns true there’s no guarantee that the key-value pair is still present when the thread tries to read from it as another thread might have already removed it.

Example

We’ll need a simple object with a single property for the example:

class Series
{
	public int CurrentValue
	{
		get;
		set;
	}
}

The following code creates 20 tasks and each task increases the value of the CurrentValue property in the shared dictionary by 1000 in loop. So we’re expecting the final value to be 20000. We fill up the task array in a loop and start the tasks individually. The key-value in the dictionary may look like the following at a certain stage:

key: 0 (the task ID represented by the taskParameter object, which is the same as ‘i’ in the main loop), value: 40
key: 1, value 46
key: 2: value 43
.
.
.
key: 19, value 45

After the loop the values of each thread are added to the CurrentValue property:

Series series = new Series();
ConcurrentDictionary<int, int> concurrentDictionary = new ConcurrentDictionary<int, int>();
Task<int>[] taskArray = new Task<int>[20];
for (int i = 0; i < taskArray.Length; i++)
{
	concurrentDictionary.TryAdd(i, series.CurrentValue);

	taskArray[i] = Task.Factory.StartNew<int>((taskParameter) =>
	{
		int current;
		bool valueRetrieved;
		int key = Convert.ToInt32(taskParameter);
		for (int j = 0; j < 1000; j++)
		{
			valueRetrieved = concurrentDictionary.TryGetValue(key, out current);
			concurrentDictionary.TryUpdate(key, current + 1, current);
		}

		int result;
		valueRetrieved = concurrentDictionary.TryGetValue(key, out result);
		if (valueRetrieved)
		{
			return result;
		}
		else
		{
			throw new Exception(String.Format("No data item available for key {0}", taskParameter));
		}
	}, i);
}

for (int i = 0; i < taskArray.Length; i++)
{
	series.CurrentValue += taskArray[i].Result;
}

Console.WriteLine("Expected value {0}, Balance: {1}", 20000, series.CurrentValue);

View the list of posts on the Task Parallel Library here.

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

ARCHIVED: Bite-size insight on Cyber Security for the not too technical.