Modifying a shared integer in a thread-safe manner in .NET

The Interlocked class in the System.Threading namespace provides a number of useful methods to modify the value of an integer that is shared among multiple threads.

Consider the following code. It updates the same shared integer in a loop on 5 different threads and prints the final status at the end. 3 threads add 1 to the integer 600 times and 2 threads subtracts 1 600 times. Therefore we’re expecting the shared integer to be 600 at the end:

Read more of this post

Reading the outcome of parallel loops in .NET C#

The Parallel.For() and Parallel.ForEach() methods both return a ParallelLoopResult object. This object has two properties which you can use to read if Break or Stop have been called:

  • IsCompleted: true if all loop iterations have been completed without calling either Break or Stop
  • LowestBreakIteration: the index of the lowest iteration in which the Break method was called

Example:

ParallelLoopResult parallelLoopResult =
        Parallel.For(0, 10, (int index, ParallelLoopState parallelLoopState) =>
	{
		if (index == 5)
		{
			parallelLoopState.Stop();
		}
	});

Console.WriteLine("IsCompleted: {0}", parallelLoopResult.IsCompleted);
Console.WriteLine("BreakValue: {0}", parallelLoopResult.LowestBreakIteration.HasValue?       parallelLoopResult.LowestBreakIteration.Value
				: -1);

The properties return the following values:

IsCompleted (IC): false
LowestBreakIteration.HasValue (LBI): false

Here come the possible value pairs and their meaning:

  • IC true, LBI false: all iterations were completed without breaking or stopping
  • IC false, LBI false: Stop was called
  • IF false, LBI true: Break was called

View the list of posts on the Task Parallel Library here.

Parallel LINQ in .NET C#: using AsUnordered()

In this post we saw how to keep the order of item processing in a parallel query using the AsOrdered() extension method. We also mentioned that this comes at a slight performance cost.

The effect of using AsOrdered() in combined queries is that the order is restored at every step in the query. The performance cost of restoring the order occurs multiple times:

int[] integerList = new int[100];
for (int i = 0; i < integerList.Length; i++)
{
	integerList[i] = i;
}

var result =
	integerList.AsParallel().AsOrdered()
	.Take(10)
	.Select(item => new
	{
		SourceValue = item,
		ResultValue = Math.Pow(item, 2)
	});

foreach (var v in result)
{
	Console.WriteLine("Source {0}, Result {1}",
		v.SourceValue, v.ResultValue);
}

In this example the first 10 items are taken after AsOrdered() is called. AsOrdered() imposes the ordering on the Select subquery as well. Therefore ordering will be performed once again on a subquery that can be performed without ordering. This is not too efficient.

The solution comes with the AsUnordered() extension method:

int[] integerList = new int[100];
for (int i = 0; i < integerList.Length; i++)
{
	integerList[i] = i;
}

var result =
	integerList.AsParallel().AsOrdered()
	.Take(10).AsUnordered()
	.Select(item => new
	{
		SourceValue = item,
		ResultValue = Math.Pow(item, 2)
	});

foreach (var v in result)
{
	Console.WriteLine("Source {0}, Result {1}",
		v.SourceValue, v.ResultValue);
}

AsUnordered notifies PLINQ that ordering is not important. The first ten items are taken from the data source after the AsParallel() extension as before. The taken items will then be the basis for the Select subquery which can be performed without ordering.

Conclusion: the AsOrdered and AsUnordered extension methods are applied to all subqueries. In the above example if the Select query is followed by other queries then they will be unordered as well.

View the list of posts on the Task Parallel Library here.

Deferred execution in parallel LINQ in .NET C#

If you are familiar with LINQ then you are probably aware of the notion of deferred execution: queries are not carried out until they are needed. This is not different in parallel LINQ either. Let’s see an example:

Set up a data source:

int[] integerArray = new int[100];
for (int i = 0; i < integerArray.Length; i++)
{
	integerArray[i] = i;
}

Define a parallel query:

IEnumerable<double> results =
	integerArray.AsParallel().Select(item =>
	{
		return Math.Sqrt(item);
	});

The query has not been carried out at this point. It is carried out when the following foreach loop starts:

double sum = 0;
foreach (double result in results)
{
	sum += result;
}
Console.WriteLine("Total {0}", sum);

You can force query execution with the same extension methods as in “normal” LINQ, such as ToList, ToArray etc.:

IEnumerable<double> results =
	integerArray.AsParallel().Select(item =>
	{
		return Math.Sqrt(item);
	}).ToList();

In this case the query is executed as soon as it has been defined.

View the list of posts on the Task Parallel Library here.

Handling exceptions in parallel LINQ in .NET C#

We saw in this and this post how to handle exceptions that Tasks throw. It is not much different in parallel LINQ: the exception will be wrapped in an AggregateException.

The exception will be throw when the query is executed. So defining a parallel query will not throw an exception even if you explicitly throw one within the query. If you force the execution of the query with extension methods such as ToList, ToArray, ForAll etc., then the exception will be thrown immediately. Let’s see an example.

Define the data source:

int[] integerArray = new int[100];
for (int i = 0; i < integerArray.Length; i++)
{
	integerArray[i] = i;
}

Define the query:

IEnumerable<double> query =
	integerArray.AsParallel()
	.Select(item =>
	{
		if (item == 50)
		{
			throw new Exception();
		}
		return Math.Sqrt(item);
	});

Go through the results and handle the exception:

try
{
	foreach (double item in query)
	{
		Console.WriteLine("Result {0}", item);
	}
}
catch (AggregateException aggregateException)
{
	aggregateException.Handle(exception =>
	{
		Console.WriteLine("Handled exception of type: {0}",
			exception.GetType());
		return true;
	});
}

Run the code with Crtl+F5. You’ll see that the exception is thrown when the items are processed and then it’s handled. Items that were processed when the exception was thrown will complete so don’t assume that the parallel loop is interrupted at that moment.

View the list of posts on the Task Parallel Library here.

Cancelling a Task with a composite cancellation token in .NET C#

You cannot directly interrupt a Task in .NET while it’s running. You can do it indirectly through the CancellationTokenSource object. This object has a CancellationToken property which must be passed into the constructor of the Task:

CancellationTokenSource cancellationTokenSource	= new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;

You can create a composite token that consists of several other tokens. The Task will then be cancelled if any of the underlying tokens has been cancelled. Here’s how you create a composite token:

CancellationTokenSource cancellationTokenSourceOne = new CancellationTokenSource();
CancellationTokenSource cancellationTokenSourceTwo = new CancellationTokenSource();
CancellationTokenSource cancellationTokenSourceThree = new CancellationTokenSource();
CancellationTokenSource compositeTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSourceOne.Token, cancellationTokenSourceTwo.Token, cancellationTokenSourceThree.Token);

Use this composite in the constructor of the Task:

Task task = Task.Factory.StartNew(() =>
{
	for (int i = 0; i < 100000; i++)
	{
		if (token.IsCancellationRequested)
	        {
		       Console.WriteLine("Task cancellation requested");
		       throw new OperationCanceledException(token);
	        }
        	else
        	{
         		Console.WriteLine(i);
        	}	 
	}
}, compositeTokenSource.Token);

You can cancel the task by calling the Cancel() method of any of the tokens in the composite:


cancellationTokenSourceTwo.Cancel();

Note that this method only signals the wish to cancel a task. .NET will not actively interrupt the task, you’ll have to monitor the status through the IsCancellationRequested property. It is your responsibility to stop the task. In this example we throw an OperationCanceledException which is a must in order to correctly acknowledge the cancellation. If you forget this step then the task status will not be set correctly. Once the task has been requested the stop it cannot be restarted.

If that’s all you want to do, i.e. throw an OperationCanceledException, then there’s a shorter version:

cancellationToken.ThrowIfCancellationRequested();

This will perform the cancellation check and throw the exception in one step. The loop can thus be simplified as follows:

Task task = Task.Factory.StartNew(() =>
{
	for (int i = 0; i < 100000; i++)
	{
		//shorthand
        	compositeTokenSource.ThrowIfCancellationRequested();
		Console.WriteLine(i);
	}
}, compositeTokenSource);

View the list of posts on the Task Parallel Library here.

How to cancel parallel loops in .NET C#

Cancelling parallel loops in C# is similar to cancelling “normal” tasks. You will need to supply a ParallelOptions object to the parallel loop assigning a cancellation token to its CancellationToken property. If you don’t know these objects then make sure to check out the following posts:

When you cancel the cancellation token then no new iterations will be started in a parallel loop. However, those already running will finish. Parallel.For and Parallel.ForEach will then throw an OperationCanceledException.

Declare the cancellation token:

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

Create a Task that will cancel the token after 5 seconds:

Task.Factory.StartNew(() =>
{
	Thread.Sleep(5000);
	cancellationTokenSource.Cancel();
	Console.WriteLine("Token cancelled");
});

Define the parallel loop options and assign the cancellation token:

ParallelOptions parallelLoopOptions =
	new ParallelOptions()
	{
		CancellationToken = cancellationTokenSource.Token
	};

Perform some loop that is guaranteed to take more than 5 seconds:

try
{				
	Parallel.For(0, Int64.MaxValue, parallelLoopOptions, index =>
	{
		double result = Math.Pow(index, 3);
		Console.WriteLine("Index {0}, result {1}", index, result);
		Thread.Sleep(100);
	});
}
catch (OperationCanceledException)
{
	Console.WriteLine("Cancellation exception caught!");
}

Run the code and you’ll see that the parallel loop will likely run for slightly more than 5 seconds which is because loops running when the cancellation token is cancelled will be allowed to complete.

View the list of posts on the Task Parallel Library here.

Breaking parallel loops in .NET C# using the Break method

It’s not uncommon to break the execution of a for/foreach loop using the ‘break’ keyword. A for loop can look through a list of integers and if the loop body finds some matching value then the loop can be exited. It’s another discussion that ‘while’ and ‘do until’ loops might be a better alternative, but there you go.

You cannot simply break out from a parallel loop using the break keyword. However, we can achieve the effect with the ParallelLoopState class. In this post we looked at using the Stop method of the ParallelLoopState class. Here we’ll look at a slightly different method of the same class: Break(). Let’s say we have the following integer array…:

List<int> integers = new List<int>();

for (int i = 0; i <= 100; i++)
{
	integers.Add(i);
}

…and we want to break the loop as soon as we’ve found a number higher than 50. Both Parallel.For() and Parallel.ForEach() accepts an Action of T parameter as we saw before. This Action object has an overloaded version: Action of T and ParallelLoopState. The loop state is created automatically by the Parallel class. The loop state object has a Break method which stops the loop execution. To be more precise: if Break is called in the 5th iteration, then only those iterations will be started afterwards that are required to process items 1-4. Other iterations may have been started by the scheduler of course and they will run complete. So it is guaranteed that at least the first five items will be processed. Break() can even be called multiple times if the processing of multiple items results in breaking the code. In the below example if n separate threads are started with an integer higher than 50 then Break will be called n times:

Parallel.ForEach(integers, (int item, ParallelLoopState state) =>
{
	if (item > 50)
	{
		Console.WriteLine("Higher than 50: {0}, exiting loop.", item);
		state.Break();
	}
	else
	{
		Console.WriteLine("Less than 50: {0}", item);
	}
});

If Break is called more than once then the lowest item will be taken as a boundary by the Parallel class. If Break is called at items 5, 10 and 15 then all iterations required to process items 1-5 will be completed.

View the list of posts on the Task Parallel Library here.

How to cancel parallel LINQ queries in .NET C#

We saw in several posts on TPL on this blog how the CancellationToken object can be used to cancel Tasks. They can be used to cancel parallel queries as well. An instance of the token must be supplied to the WithCancellation extension method.

Define the cancellation token and the data source:

CancellationTokenSource cancellationTokenSource
	= new CancellationTokenSource();

int[] integerArray = new int[10000000];
for (int i = 0; i < integerArray.Length; i++)
{
	integerArray[i] = i;
}

Define the query. Notice how the token is provided to the query:

IEnumerable<double> query = integerArray
	.AsParallel()
	.WithCancellation(cancellationTokenSource.Token)
	.Select(item =>
	{
		return Math.Sqrt(item);
	});

Start a separate task that will cancel the token after 5 seconds:

Task.Factory.StartNew(() =>
{
	Thread.Sleep(5000);
	cancellationTokenSource.Cancel();
	Console.WriteLine("Token source cancelled");
});

Loop through the query results and catch the OperationCancelledException:

try
{
	foreach (double d in query)
	{
		Console.WriteLine("Result: {0}", d);
	}
}
catch (OperationCanceledException)
{
	Console.WriteLine("Caught cancellation exception");
}

Do not assume that cancelling the token will cause the item processing to stop immediately. Items that were already being processed when the token was cancelled will be completed.

View the list of posts on the Task Parallel Library here.

Parallel LINQ in .NET C#: keeping the order

In this post we saw a simple example of PLINQ. We saw that the items are processed in an arbitrary order.

Consider the following:

int[] sourceData = new int[10];
for (int i = 0; i < sourceData.Length; i++)
{
	sourceData[i] = i;
}

IEnumerable<int> parallelResults =
	from item in sourceData.AsParallel()
	where item % 2 == 0
	select item;

foreach (int item in parallelResults)
{
	Console.WriteLine("Item {0}", item);
}

When I ran the code on my machine I got the following output:

0, 4, 6, 8, 2

What if you want the items to be processed in an ascending order? Just append the AsOrdered() extension method:

IEnumerable<int> parallelResults =
	from item in sourceData.AsParallel().AsOrdered()
	where item % 2 == 0
	select item;

This produces the same order as a sequential query execution but with the benefits of parallel execution. However, there’s a small cost to this. Without ordering PLINQ could freely decide how to start and optimise the threads. Now there’s a performance cost to restore the order with the AsOrdered extension method. Keep this in mind and only use ordering if it really matters. Otherwise you can order the resulting list with “normal” LINQ afterwards.

View the list of posts on the Task Parallel Library here.

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.

%d bloggers like this: