Reading the outcome of parallel loops in .NET C#

The Parallel.For() and Parallel.ForEach() methods both return a ParallelLoopResult object. This object has two properties which you can use to read if Break or Stop have been called:

  • IsCompleted: true if all loop iterations have been completed without calling either Break or Stop
  • LowestBreakIteration: the index of the lowest iteration in which the Break method was called

Example:

ParallelLoopResult parallelLoopResult =
        Parallel.For(0, 10, (int index, ParallelLoopState parallelLoopState) =>
	{
		if (index == 5)
		{
			parallelLoopState.Stop();
		}
	});

Console.WriteLine("IsCompleted: {0}", parallelLoopResult.IsCompleted);
Console.WriteLine("BreakValue: {0}", parallelLoopResult.LowestBreakIteration.HasValue?       parallelLoopResult.LowestBreakIteration.Value
				: -1);

The properties return the following values:

IsCompleted (IC): false
LowestBreakIteration.HasValue (LBI): false

Here come the possible value pairs and their meaning:

  • IC true, LBI false: all iterations were completed without breaking or stopping
  • IC false, LBI false: Stop was called
  • IF false, LBI true: Break was called

View the list of posts on the Task Parallel Library here.

Parallel LINQ in .NET C#: using AsUnordered()

In this post we saw how to keep the order of item processing in a parallel query using the AsOrdered() extension method. We also mentioned that this comes at a slight performance cost.

The effect of using AsOrdered() in combined queries is that the order is restored at every step in the query. The performance cost of restoring the order occurs multiple times:

int[] integerList = new int[100];
for (int i = 0; i < integerList.Length; i++)
{
	integerList[i] = i;
}

var result =
	integerList.AsParallel().AsOrdered()
	.Take(10)
	.Select(item => new
	{
		SourceValue = item,
		ResultValue = Math.Pow(item, 2)
	});

foreach (var v in result)
{
	Console.WriteLine("Source {0}, Result {1}",
		v.SourceValue, v.ResultValue);
}

In this example the first 10 items are taken after AsOrdered() is called. AsOrdered() imposes the ordering on the Select subquery as well. Therefore ordering will be performed once again on a subquery that can be performed without ordering. This is not too efficient.

The solution comes with the AsUnordered() extension method:

int[] integerList = new int[100];
for (int i = 0; i < integerList.Length; i++)
{
	integerList[i] = i;
}

var result =
	integerList.AsParallel().AsOrdered()
	.Take(10).AsUnordered()
	.Select(item => new
	{
		SourceValue = item,
		ResultValue = Math.Pow(item, 2)
	});

foreach (var v in result)
{
	Console.WriteLine("Source {0}, Result {1}",
		v.SourceValue, v.ResultValue);
}

AsUnordered notifies PLINQ that ordering is not important. The first ten items are taken from the data source after the AsParallel() extension as before. The taken items will then be the basis for the Select subquery which can be performed without ordering.

Conclusion: the AsOrdered and AsUnordered extension methods are applied to all subqueries. In the above example if the Select query is followed by other queries then they will be unordered as well.

View the list of posts on the Task Parallel Library here.

Deferred execution in parallel LINQ in .NET C#

If you are familiar with LINQ then you are probably aware of the notion of deferred execution: queries are not carried out until they are needed. This is not different in parallel LINQ either. Let’s see an example:

Set up a data source:

int[] integerArray = new int[100];
for (int i = 0; i < integerArray.Length; i++)
{
	integerArray[i] = i;
}

Define a parallel query:

IEnumerable<double> results =
	integerArray.AsParallel().Select(item =>
	{
		return Math.Sqrt(item);
	});

The query has not been carried out at this point. It is carried out when the following foreach loop starts:

double sum = 0;
foreach (double result in results)
{
	sum += result;
}
Console.WriteLine("Total {0}", sum);

You can force query execution with the same extension methods as in “normal” LINQ, such as ToList, ToArray etc.:

IEnumerable<double> results =
	integerArray.AsParallel().Select(item =>
	{
		return Math.Sqrt(item);
	}).ToList();

In this case the query is executed as soon as it has been defined.

View the list of posts on the Task Parallel Library here.

Handling exceptions in parallel LINQ in .NET C#

We saw in this and this post how to handle exceptions that Tasks throw. It is not much different in parallel LINQ: the exception will be wrapped in an AggregateException.

The exception will be throw when the query is executed. So defining a parallel query will not throw an exception even if you explicitly throw one within the query. If you force the execution of the query with extension methods such as ToList, ToArray, ForAll etc., then the exception will be thrown immediately. Let’s see an example.

Define the data source:

int[] integerArray = new int[100];
for (int i = 0; i < integerArray.Length; i++)
{
	integerArray[i] = i;
}

Define the query:

IEnumerable<double> query =
	integerArray.AsParallel()
	.Select(item =>
	{
		if (item == 50)
		{
			throw new Exception();
		}
		return Math.Sqrt(item);
	});

Go through the results and handle the exception:

try
{
	foreach (double item in query)
	{
		Console.WriteLine("Result {0}", item);
	}
}
catch (AggregateException aggregateException)
{
	aggregateException.Handle(exception =>
	{
		Console.WriteLine("Handled exception of type: {0}",
			exception.GetType());
		return true;
	});
}

Run the code with Crtl+F5. You’ll see that the exception is thrown when the items are processed and then it’s handled. Items that were processed when the exception was thrown will complete so don’t assume that the parallel loop is interrupted at that moment.

View the list of posts on the Task Parallel Library here.

How to cancel parallel loops in .NET C#

Cancelling parallel loops in C# is similar to cancelling “normal” tasks. You will need to supply a ParallelOptions object to the parallel loop assigning a cancellation token to its CancellationToken property. If you don’t know these objects then make sure to check out the following posts:

When you cancel the cancellation token then no new iterations will be started in a parallel loop. However, those already running will finish. Parallel.For and Parallel.ForEach will then throw an OperationCanceledException.

Declare the cancellation token:

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

Create a Task that will cancel the token after 5 seconds:

Task.Factory.StartNew(() =>
{
	Thread.Sleep(5000);
	cancellationTokenSource.Cancel();
	Console.WriteLine("Token cancelled");
});

Define the parallel loop options and assign the cancellation token:

ParallelOptions parallelLoopOptions =
	new ParallelOptions()
	{
		CancellationToken = cancellationTokenSource.Token
	};

Perform some loop that is guaranteed to take more than 5 seconds:

try
{				
	Parallel.For(0, Int64.MaxValue, parallelLoopOptions, index =>
	{
		double result = Math.Pow(index, 3);
		Console.WriteLine("Index {0}, result {1}", index, result);
		Thread.Sleep(100);
	});
}
catch (OperationCanceledException)
{
	Console.WriteLine("Cancellation exception caught!");
}

Run the code and you’ll see that the parallel loop will likely run for slightly more than 5 seconds which is because loops running when the cancellation token is cancelled will be allowed to complete.

View the list of posts on the Task Parallel Library here.

Breaking parallel loops in .NET C# using the Break method

It’s not uncommon to break the execution of a for/foreach loop using the ‘break’ keyword. A for loop can look through a list of integers and if the loop body finds some matching value then the loop can be exited. It’s another discussion that ‘while’ and ‘do until’ loops might be a better alternative, but there you go.

You cannot simply break out from a parallel loop using the break keyword. However, we can achieve the effect with the ParallelLoopState class. In this post we looked at using the Stop method of the ParallelLoopState class. Here we’ll look at a slightly different method of the same class: Break(). Let’s say we have the following integer array…:

List<int> integers = new List<int>();

for (int i = 0; i <= 100; i++)
{
	integers.Add(i);
}

…and we want to break the loop as soon as we’ve found a number higher than 50. Both Parallel.For() and Parallel.ForEach() accepts an Action of T parameter as we saw before. This Action object has an overloaded version: Action of T and ParallelLoopState. The loop state is created automatically by the Parallel class. The loop state object has a Break method which stops the loop execution. To be more precise: if Break is called in the 5th iteration, then only those iterations will be started afterwards that are required to process items 1-4. Other iterations may have been started by the scheduler of course and they will run complete. So it is guaranteed that at least the first five items will be processed. Break() can even be called multiple times if the processing of multiple items results in breaking the code. In the below example if n separate threads are started with an integer higher than 50 then Break will be called n times:

Parallel.ForEach(integers, (int item, ParallelLoopState state) =>
{
	if (item > 50)
	{
		Console.WriteLine("Higher than 50: {0}, exiting loop.", item);
		state.Break();
	}
	else
	{
		Console.WriteLine("Less than 50: {0}", item);
	}
});

If Break is called more than once then the lowest item will be taken as a boundary by the Parallel class. If Break is called at items 5, 10 and 15 then all iterations required to process items 1-5 will be completed.

View the list of posts on the Task Parallel Library here.

How to cancel parallel LINQ queries in .NET C#

We saw in several posts on TPL on this blog how the CancellationToken object can be used to cancel Tasks. They can be used to cancel parallel queries as well. An instance of the token must be supplied to the WithCancellation extension method.

Define the cancellation token and the data source:

CancellationTokenSource cancellationTokenSource
	= new CancellationTokenSource();

int[] integerArray = new int[10000000];
for (int i = 0; i < integerArray.Length; i++)
{
	integerArray[i] = i;
}

Define the query. Notice how the token is provided to the query:

IEnumerable<double> query = integerArray
	.AsParallel()
	.WithCancellation(cancellationTokenSource.Token)
	.Select(item =>
	{
		return Math.Sqrt(item);
	});

Start a separate task that will cancel the token after 5 seconds:

Task.Factory.StartNew(() =>
{
	Thread.Sleep(5000);
	cancellationTokenSource.Cancel();
	Console.WriteLine("Token source cancelled");
});

Loop through the query results and catch the OperationCancelledException:

try
{
	foreach (double d in query)
	{
		Console.WriteLine("Result: {0}", d);
	}
}
catch (OperationCanceledException)
{
	Console.WriteLine("Caught cancellation exception");
}

Do not assume that cancelling the token will cause the item processing to stop immediately. Items that were already being processed when the token was cancelled will be completed.

View the list of posts on the Task Parallel Library here.

ultimatemindsettoday

A great WordPress.com site

Elliot Balynn's Blog

A directory of wonderful thoughts

HarsH ReaLiTy

A Good Blog is Hard to Find

Softwarearchitektur in der Praxis

Wissenswertes zu Webentwicklung, Domain-Driven Design und Microservices

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: