Parallel LINQ in .NET C#: a basic example

Parallel LINQ – PLINQ – can be applied to LINQ to Objects. This type of LINQ which works with the IEnumerable and IEnumerable of T data types. There are extension methods in the PLINQ library that make parallel processing of the result from queries available. The result is that multiple data items are processed concurrently.

In fact it is not difficult to transform a “normal” LINQ statement to PLINQ.

Set up the data source:

int[] sourceData = new int[1000];
for (int i = 0; i < sourceData.Length; i++)
{
	sourceData[i] = i;
}

We want to extract all even numbers. We also want the items to be processed in a parallel fashion. The following query will do just that:

IEnumerable<int> parallelResults =
	from item in sourceData.AsParallel()
	where item % 2 == 0
	select item;

Note the AsParallel() extension method. Behind the scenes it creates an instance of the ParallelQuery class. Without the AsParallel extension you’d be using standard LINQ features hidden in the Enumerable class. This little extension makes sure that the parallel features are used instead.

Print the results:

foreach (int item in parallelResults)
{
	Console.WriteLine("Item {0}", item);
}

Run the code and you’ll see that the integers do not follow any particular order. This is the result of the parallel execution of the query. Remove the AsParallel extension from the query and integers will be presented in an ascending order. The method hides a lot of complexity from the programmer. It is up to PLINQ to decide how the query will be parallelised, we only indicate the request through the extension. PLINQ will try to optimise the query execution based on a range of parameters. If sequential execution seems to be a better fit then the items may still be processed sequentially despite your wish.

View the list of posts on the Task Parallel Library here.

Lazy task execution in .NET C#

Tasks in .NET can be executed lazily, i.e. only at the time when the result is needed. This may never come in certain situations. You can build tasks that are not executed until a lazy variable is required. The lazy variable is not initialised until it is read.

You can construct a lazy task in two separate steps:

Func<string> functionBody = new Func<string>(() =>
{
	Console.WriteLine("Function initialiser starting...");
	return "Some return value from the function initialiser";
});

Lazy<Task<string>> lazyInitialiser = new Lazy<Task<string>>
	(() =>
		Task<string>.Factory.StartNew(functionBody)
	);

Here we first create a function which we then pass to the Lazy constructor. You can specify the type of object that will be lazily initialised. One of the overloaded constructors of Lazy accepts a Func of Task of T which will be the initialisation function. Task.Factory.StartNew comes in handy here as we can supply our function.

You can achieve the same initialisation in one step as follows:¨

Lazy<Task<string>> inlineLazyInitialiser = new Lazy<Task<string>>(
	() => Task<string>.Factory.StartNew(() =>
	{
		Console.WriteLine("Inline initialiser starting...");
		return "Some return value from the inline lazy initialiser.";
	}));

We read the results as follows:

Console.WriteLine("Calling function initialiser within Lazy...");
Console.WriteLine(string.Concat("Result from task: ", lazyInitialiser.Value.Result));

Console.WriteLine("Calling inline lazy initialiser...");
Console.WriteLine(string.Concat("Result from task: ", inlineLazyInitialiser.Value.Result));

View the list of posts on the Task Parallel Library here.

Continuing with an attached child Task in .NET C#

A child Task, a.k.a a nested Task, is a Task that’s started within the body of another Task. The containing Task is called the parent.

We saw here how to create an attached child Task and here how to create continuation tasks. We can now marry the two concepts. If you don’t know what they are then make sure to check out those posts as well.

This is how you attach a child task to a parent, create a continuation task to the child task and attach the continuation task to the parent as well:

Task parent = Task.Factory.StartNew(() =>
{
	Console.WriteLine("Starting child task...");

	Task child = Task.Factory.StartNew(() =>
	{
		Console.WriteLine("Child running. Going to sleep for a sec.");
		Thread.Sleep(1000);
		Console.WriteLine("Child finished and throws an exception.");
		throw new Exception();
	}, TaskCreationOptions.AttachedToParent);

	child.ContinueWith(antecedent =>
	{
		// write out a message and wait
		Console.WriteLine("Continuation of child task running");
		Thread.Sleep(1000);
		Console.WriteLine("Continuation finished");
	}, TaskContinuationOptions.AttachedToParent
		| TaskContinuationOptions.OnlyOnFaulted);				
});

try
{
	Console.WriteLine("Waiting for parent task");
	parent.Wait();
	Console.WriteLine("Parent task finished");
}
catch (AggregateException ex)
{
	Console.WriteLine("Exception: {0}", ex.InnerException.GetType());
}

Note the TaskCreationOptions parameter in the StartNew method: AttachedToParent. We also introduce a filter to the continuation task: run it only if the task before that, i.e. the ‘child’ has faulted. Which will happen as we throw an exception in the body of ‘child’.

If you run this code the you’ll see that the exception thrown by the child task is caught in the try-catch block. The original exception has been packaged within an AggregateException. Also, the Wait() method will wait for the parent task to finish which in turn waits for the child to finish. So Wait() indirectly waits for any attached children the parent Task may have. Finally, you’ll see that the continuation task runs as ‘child’ has faulted.

View the list of posts on the Task Parallel Library here.

Creating an attached child Task in .NET C#

A child Task, a.k.a a nested Task, is a Task that’s started within the body of another Task. The containing Task is called the parent.

We saw here how to create detached child Tasks. They are not very interesting really. An attached child Task however is one which has a special relationship with the parent:

  • The parent waits for the child to finish before it completes
  • If the child Task throws an exception then the parent will catch it and re-throw it
  • The status of the parent depends on the status of the child

This is how you attach a child task to a parent:

Task parentTask = Task.Factory.StartNew(() =>
{	
        Console.WriteLine("Starting child task...");			
	Task childTask = Task.Factory.StartNew(() =>
	{
		Console.WriteLine("Child running. Going to sleep for a sec.");
		Thread.Sleep(1000);
		Console.WriteLine("Child finished and throws an exception.");
		throw new Exception();
	}, TaskCreationOptions.AttachedToParent);				
});

try
{
	Console.WriteLine("Waiting for parent task");
	parentTask.Wait();
	Console.WriteLine("Parent task finished");
}
catch (AggregateException ex)
{
	Console.WriteLine("Exception: {0}", ex.InnerException.GetType());
}

Note the TaskCreationOptions parameter in the StartNew method: AttachedToParent. If you run this code the you’ll see that the exception thrown by the child task is caught in the try-catch block. The original exception has been packaged within an AggregateException. Also, the Wait() method will wait for the parent task to finish which in turn waits for the child to finish. So Wait() indirectly waits for any attached children the parent Task may have. Note also that as the parent task finishes, its status is updated to ‘WaitingForChildrenToComplete’ which means exactly what it implies.

View the list of posts on the Task Parallel Library here.

Creating a detached child Task in .NET C#

A child Task, a.k.a a nested Task, is a Task that’s started within the body of another Task. The containing Task is called the parent.

A detached child Task is one which doesn’t have any relationship with the parent. The detached child Task will be scheduled normally and will have no effect on the parent.

There’s nothing special about creating a detached child Task:

Task parent = Task.Factory.StartNew(() =>
{
	Console.WriteLine("Starting child task...");
	Task childTask = Task.Factory.StartNew(() =>
	{
		Console.WriteLine("Child task running and stopping for a second");
		Thread.Sleep(1000);
		Console.WriteLine("Child task finished");
	});
});

Console.WriteLine("Waiting for parent task");
parent.Wait();
Console.WriteLine("Parent task finished");

We start the parent Task within which we start a child task. You can nest the tasks as you like: the child task can have its own child task(s) which in turn can have child tasks etc.

View the list of posts on the Task Parallel Library here.

Continuation tasks in .NET TPL: exception handling in task chains

We saw in previous posts on TPL how to chain tasks using the ContinueWhenAll and ContinueWhenAny methods. We also discussed strategies of exception handling in tasks. If you don’t know how to use these techniques make sure to read those posts first.

If you have multiple tasks chained together then special care should be taken when handling exceptions. There’s no automatic way to propagate an exception thrown by a task to the continuation task(s). However, the continuation task can always check the status of the antecedent and extract any exceptions. It can then process it in some way.

We start the first task:

Task antecedentTask = Task.Factory.StartNew(() =>
{
	Console.WriteLine("Mother of all tasks.");
});

The first continuation task throws an exception:

Task continuationTaskWithException = antecedentTask.ContinueWith(antecedent =>
{
	Console.WriteLine("Task that throws an exception.");
	throw new Exception();
});

The continuation of the continuation checks the status of the above task and re-throws the inner exception of its aggregate exception:

Task taskToRunAfterExceptionTask = continuationTaskWithException.ContinueWith(antecedent =>
{
	if (antecedent.Status == TaskStatus.Faulted)
	{
		throw antecedent.Exception.InnerException;
	}
	Console.WriteLine("Task that checks if previous task has faulted.");
});

We wait for the second continuation task to finish and handle the aggregate exception in some way:

try
{
	taskToRunAfterExceptionTask.Wait();
}
catch (AggregateException ex)
{
	ex.Handle(innerException =>
	{
		Console.WriteLine("Handled exception is of type: {0}", innerException.GetType());
		return true;
	});
}

If the second continuation task does not check the status of the task before that then the exception thrown by continuationTaskWithException will remain unhandled.

You can handle exceptions in a similar way when doing multitask continuations:

Task<int>[] motherTasks = new Task<int>[10];

Task<int> continuation = Task.Factory.ContinueWhenAll<int>(motherTasks, antecedents =>
{
	foreach (Task<int> task in antecedents)
	{
		if (task.Status == TaskStatus.Faulted)
		{
			//do something with the exception
		}
		else
		{
			//normal operations
		}
	}
	return 1234;
});

Handling exceptions in a ContinueWhenAny scenario is somewhat more complicated. Recall that even if a continuation task will have one antecedent – the one that finishes first – the other tasks in the antecedent task array will continue to run in the background. If one of those background tasks throws an exception then it will remain unhandled. A possible solution is to use ContinueWhenAll only for exception handling purposes:

Task<int>[] motherTasks = new Task<int>[10];

Task continuationTask = Task.Factory.ContinueWhenAny<int>(motherTasks,
	(Task<int> antecedent) =>
	{
		Console.WriteLine("Continuation task.");
	}, TaskContinuationOptions.NotOnFaulted);

Task exceptionHandlingContinuation = Task.Factory.ContinueWhenAll(motherTasks
	, antecedents =>
		{
			foreach (Task task in antecedents)
			{
				if (task.Status == TaskStatus.Faulted)
				{
					//exception handling code here
				}
			}
		});

View the list of posts on the Task Parallel Library here.

Continuation tasks in .NET TPL: cancelling continuation tasks

Tasks in .NET TPL make it easy to assign tasks that should run upon the completion of a certain task.

We saw in previous posts on TPL how to define continuation tasks. You’ll also find examples showing how to cancel tasks in TPL. Continuation tasks can be cancelled using the same techniques as with “normal” tasks. If you don’t know the basics, then make sure to check out the related posts on this page.

We declare the cancellation token source:

CancellationTokenSource cancellationTokenSource	= new CancellationTokenSource();

We then continue with the antecedent task and provide the token source in the constructor:

Task motherTask = Task.Factory.StartNew(() =>
{
	Console.WriteLine("Mother task running.");
	cancellationTokenSource.Token.WaitHandle.WaitOne();
	cancellationTokenSource.Token.ThrowIfCancellationRequested();
}, cancellationTokenSource.Token);

We wait indefinitely for the token source to be cancelled with the WaitOne() method.

The following continuation task is created with the same cancellation token. This has the effect that if the token source is cancelled then both the antecedent and the continuation tasks will be cancelled:

Task taskSharingCancellationToken = motherTask.ContinueWith(antecedent =>
{
	Console.WriteLine("This continuation task will never run as it shares the cancellation token with the antecedent.");
}, cancellationTokenSource.Token);

If you remove the token from the constructor then this continuation task will run after the antecedent has completed with an exception.

The following continuation task may seem correct at first as it should only run if the antecedent has been cancelled:

Task incorrectContinuation = motherTask.ContinueWith(antecedent =>
{
       Console.WriteLine("This task will never be scheduled");
}, cancellationTokenSource.Token, TaskContinuationOptions.OnlyOnCanceled, TaskScheduler.Current);

However, it also shares the same cancellation token as the antecedent, so it will never run. In other words if you want a continuation task to run even if the antecedent has been cancelled then do not share the same cancellation token across tasks.

The below task will run as it doesn’t share the cancellation token:

Task correctContinuation = motherTask.ContinueWith(antecedent =>
{
	Console.WriteLine("Continuation running as there's no cancellation token sharing.");
}, TaskContinuationOptions.OnlyOnCanceled);

We cancel the token and wait for the correct continuation task to finish:

cancellationTokenSource.Cancel();
correctContinuation.Wait();

View the list of posts on the Task Parallel Library here.

Continuation tasks in .NET TPL: any task in an array continued by a single task

Tasks in .NET TPL make it easy to assign tasks that should run upon the completion of a certain task.

In this post we saw how to create many tasks followed by a single continuation task that runs after all the antecedents have completed. We can also have the following scenario: a group of antecedent tasks are started and a single continuation task will be scheduled to run when the first task in the array of antecedent tasks has completed. You use the ContinueWhenAny() method to achieve this.

Make sure to read the post referred to above as ContinueWhenAll() and ContinueWhenAny() are very similar. Read especially the section on return values as it also applies to this post: just replace “ContinueWhenAll” with “ContinueWhenAny” in the examples. Also, read this post to apply a condition when the continuation task should be scheduled.

In the below example we’ll create a range of tasks that all return an integer: the number of milliseconds they were put to sleep. A CancellationToken object is used to introduce a random pause whose length is randomised between 100 and 1000 milliseconds. The access to the Random object is synchronised by a synchronisation primitive.

We declare a couple of elements first:

Task<int>[] motherTasks = new Task<int>[10];
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

Random random = new Random();
Object syncPrimitive = new object();

The task array is filled and each task is started and returns an integer:

for (int i = 0; i < 10; i++)
{
	motherTasks[i] = Task.Factory.StartNew<int>(() =>
	{
		int taskSleepInterval;
		lock (syncPrimitive)
		{
			taskSleepInterval = random.Next(100, 1000);
		}
		cancellationTokenSource.Token.WaitHandle.WaitOne(taskSleepInterval);
		cancellationTokenSource.Token.ThrowIfCancellationRequested();
		return taskSleepInterval;
	}, cancellationTokenSource.Token);
}

Next we declare the continuation task:

Task continuationTask = Task.Factory.ContinueWhenAny<int>(motherTasks,
	(Task<int> antecedent) =>
	{
		Console.WriteLine("The first task slept for {0} milliseconds",
			antecedent.Result);
	});

The continuation task will be scheduled to run as soon as the first task in the antecedent task array has completed.

We wait for the continuation task to finish and then cancel the token source:

continuationTask.Wait();
cancellationTokenSource.Cancel();

Run the code and you’ll see that the continuation task reports the result from the first task.

View the list of posts on the Task Parallel Library here.

Continuation tasks in .NET TPL: many tasks continued by a single task

Tasks in .NET TPL make it easy to assign tasks that should run upon the completion of a certain task.

We’ll need a basic object with a single property:

public class Series
{
	public int CurrentValue
	{
		get;
		set;
	}
}

In previous installments of the posts on TPL we saw how to create tasks that are scheduled to run after a single task has completed: check here and here. It is also possible to create many tasks and a single continuation task that runs after all the antecedents have completed. The ContinueWhenAll() method takes an array or Task objects, all of which have to complete before the continuation task can proceed.

We create 10 tasks each of which increases the its local series value by 1000 in total in a loop:

Series series = new Series();

Task<int>[] motherTasks = new Task<int>[10];

for (int i = 0; i < 10; i++)
{
	motherTasks[i] = Task.Factory.StartNew<int>((stateObject) =>
	{
		int seriesValue = (int)stateObject;
		for (int j = 0; j < 1000; j++)
		{
			seriesValue++;
		}
		return seriesValue;
	}, series.CurrentValue);
}

All 10 tasks are antecedent tasks in the chain. Let’s declare the continuation task:

Task continuation = Task.Factory.ContinueWhenAll<int>(motherTasks, antecedents =>
{
	foreach (Task<int> task in antecedents)
	{
		series.CurrentValue += task.Result;
	}
});

continuation.Wait();
Console.WriteLine(series.CurrentValue);

We extract the series value from each antecedent task and increase the overall series value by the individual task results, i.e. a 1000. The lambda expression looks awkward a bit: the array of antecedent tasks appears twice. Once as the first argument to the ContinueWhenAll method and then as an input to the lambda expression.

There’s one more thing to note about the syntax: ContinueWhenAll of T denotes the return type of the antecedent tasks. You can also specify the return type of the continuation. Here’s a list of possibilities:

The continuation and the antecedent tasks both return an int:

Task<int> continuation = Task.Factory.ContinueWhenAll<int>(motherTasks, antecedents =>
{
	foreach (Task<int> task in antecedents)
	{
		series.CurrentValue += task.Result;
	}
	return 1234;
});

Continuation is void and antecedent tasks return an int:

Task continuation = Task.Factory.ContinueWhenAll<int>(motherTasks, antecedents =>
{
	foreach (Task<int> task in antecedents)
	{
		series.CurrentValue += task.Result;
	}
});

All of them are void:

Task continuation = Task.Factory.ContinueWhenAll(motherTasks, antecedents =>
{
	foreach (Task<int> task in antecedents)
	{
		//do something
	}
});

Continuation returns an int and antecedent tasks are void:

Task<int> continuation = Task<int>.Factory.ContinueWhenAll(motherTasks, antecedents =>
{
	foreach (Task<int> task in antecedents)
	{
		//do something
	}
	return 1234;
});

Note that it’s enough to start the antecedent tasks, which we did with Task.Factory.StartNew. The continuation task will be scheduled automatically.

View the list of posts on the Task Parallel Library here.

Continuation tasks in .NET TPL: continuation with many tasks

Tasks in .NET TPL make it easy to assign tasks that should run upon the completion of a certain task.

We’ll need a basic object with a single property:

public class Series
{
	public int CurrentValue
	{
		get;
		set;
	}
}

Declare and start a task that increases the CurrentValue in a loop and return the Series. This task is called the antecedent task in a chain of tasks.

Task<Series> motherTask = Task.Factory.StartNew<Series>(() =>
{
	Series series = new Series();
	for (int i = 0; i < 5000; i++)
	{
		series.CurrentValue++;
	}
	return series;
});

So this simple task returns a Series object with a current value property.

You can create other tasks that will continue upon the completion of the antecedent task. You can actually specify a condition when the continuation task should start. The default value is that the continuation task will be scheduled to start when the preceding task has completed. See the next post on TPL for examples.

You can combine continuations in different ways. You can have a linear chain:

  1. First task completes
  2. Second task follows first task and completes
  3. Third task follows second task and completes
  4. etc. with task #4, #5…

…or

  1. First task completes
  2. Second and thirds tasks follow first task and they all run
  3. Fourth task starts after second task and completes
  4. etc. with task #6, #7…

So you can create a whole tree of continuations as you wish. You can build up each continuation task one by one. Let’s declare the first continuation task:

Task<int> firstContinuationTask
	= motherTask.ContinueWith<int>((Task<Series> antecedent) =>
	{
		Console.WriteLine("First continuation task reporting series value: {0}", antecedent.Result.CurrentValue);
		return antecedent.Result.CurrentValue * 2;
	});

The first continuation task reads the Series object built by the antecedent, the “mother” task. It prints out the CurrentValue of the Series. The continuation task also returns an integer: the series value multiplied by two.

We’ll let a second continuation task print out the integer returned by the first continuation task. I.e. from the point view of the second continuation task the first continuation task is the antecedent. We can say that “motherTask” is the “grandma” 🙂

Task secondContinuationTask
	= firstContinuationTask.ContinueWith((Task<int> antecedent) =>
	{
		Console.WriteLine("Second continuation task reporting series value: {0}", antecedent.Result);
	});

Note that it’s enough to start the first task, which we did with Task.Factory.StartNew. The continuation tasks will be scheduled automatically.

Alternatively you can declare the continuation tasks in a chained manner:

motherTask.ContinueWith<int>((Task<Series> antecedent) =>
{
	Console.WriteLine("First continuation task reporting series value: {0}", antecedent.Result.CurrentValue);
	return antecedent.Result.CurrentValue * 2;
}).ContinueWith((Task<int> antecedent) =>
{
	Console.WriteLine("Second continuation task reporting series value: {0}", antecedent.Result);
});

View the list of posts on the Task Parallel Library here.

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

ARCHIVED: Bite-size insight on Cyber Security for the not too technical.