Thread safe collections in .NET: ConcurrentQueue

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent queues

If you don’t know what Queues are then you can read about them here. The Queue of T generic collection has a thread-safe counterpart called ConcurrentQueue. Important methods:

  • Enqueue(T element): adds an item of type T to the collection
  • TryPeek(out T): tries to retrieve the next element from the collection without removing it. The value is set to the out parameter if the method succeeds. Otherwise it returns false.
  • TryDequeue(out T): tries to get the first element. It removes the item from the collection and sets the out parameter to the retrieved element. Otherwise the method returns false

The ‘try’ bit in the method names implies that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same queue you cannot be sure what’s in there when a specific thread tries to read from it.

Example

Declare and fill a concurrent queue:

ConcurrentQueue<int> concurrentQueue = new ConcurrentQueue<int>();

for (int i = 0; i < 5000; i++)
{
	concurrentQueue.Enqueue(i);
}

We’ll want to get the items from this collection and check if all of them have been retrieved using a counter. The counter will also be shared among the threads using the ‘lock’ technique we saw in this post – or actually something that is similar to the ‘lock’ keyword: the Interlocked class. Interlocked has an Increment method which accepts a ref int parameter. It will increment the incoming integer in an atomic operation.

int itemCount = 0;

Task[] queueTasks = new Task[20];
for (int i = 0; i < queueTasks.Length; i++)
{
	queueTasks[i] = Task.Factory.StartNew(() =>
	{
		while (concurrentQueue.Count > 0)
		{
			int currentElement;
			bool success = concurrentQueue.TryDequeue(out currentElement);
			if (success)
			{
				Interlocked.Increment(ref itemCount);
			}
		}
	});
}

The while loop will ensure that we’ll try to dequeue the items as long as there’s something left in the collection.

Wait for the tasks and print the number of items processed – the counter should have the same value as the number of items in the queue:

Task.WaitAll(queueTasks);
Console.WriteLine("Counter: {0}", itemCount);

View the list of posts on the Task Parallel Library here.

Suspending a Task using Thread.Sleep in .NET C#

You may want to put a Task to sleep for some time. You might want to check the state of an object before continuing. The Task continues after the sleep time.

One way to solve this is using the classic Thread.Sleep method since the Task library uses .NET threading support in the background.

First construct your cancellation token:

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;

Use this token in the constructor of the task:

Task task = Task.Factory.StartNew(() =>
{
	for (int i = 0; i < 100000; i++)
	{
		Thread.Sleep(10000);
		Console.WriteLine(i);
		cancellationToken.ThrowIfCancellationRequested();
	}
}, cancellationToken);

Note the Sleep method where we suspend the Task for 10 seconds.

You can use the cancellation token to interrupt the task:

cancellationTokenSource.Cancel();

In the previous post we discussed how to suspend a task using the WaitOne method of the cancellation token. The main difference between WaitOne and Sleep is that Sleep will block even if the task is cancelled whereas WaitOne will exit if the cancellation come before the sleep time is over.

View the list of posts on the Task Parallel Library here.

Suspending a Task using a CancellationToken in .NET C#

You may want to put a Task to sleep for some time. You might want to check the state of an object before continuing. The Task continues after the sleep time.

One way to solve this is using the WaitOne method of the WaitHandle property of the CancellationToken object.

Construct the CancellationToken object as follows:

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
CancellationToken cancellationToken = cancellationTokenSource.Token;

The WaitOne() method without parameters suspends the Task execution until the Cancel() method of the CancellationToken object has been called. It has two overloads where you can specify some time: the amount of time to wait in milliseconds or some TimeSpan. The task suspension lasts until the sleep time is over OR the Cancel() method on the cancellation token has been called, whichever happens first. The WaitOne method also has a boolean return value where ‘true’ means that the Task has been cancelled and false means the the allotted sleep time was over.

You can use the cancellation token and the WaitOne method as follows:

Task task = Task.Factory.StartNew(() =>
{
	for (int i = 0; i < 1000000; i++)
	{
		bool cancelled = cancellationToken.WaitHandle.WaitOne(10000);
		Console.WriteLine("Value {0}. Cancelled? {1}", i, cancelled);
		if (cancelled)
		{
			throw new OperationCanceledException(cancellationToken);
		}
	}
}, cancellationToken);

You can cancel the task as follows:

cancellationTokenSource.Cancel();

View the list of posts on the Task Parallel Library here.

Thread safe collections in .NET: ConcurrentBag

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent bags

Concurrent bags are similar to concurrent stacks and concurrent queues but there’s a key difference. Bags are unordered collections. This means that the order of the items is not the same as how they were inserted. So concurrent bags are ideal if you would like to share a List of T generic collection among several tasks.

Important methods:

  • Add(T element): adds an item of type T to the collection
  • TryPeek(out T): tries to retrieve the next element from the collection without removing it. The value is set to the out parameter if the method succeeds. Otherwise it returns false.
  • TryTake(out T): tries to get the first element. It removes the item from the collection and sets the out parameter to the retrieved element. Otherwise the method returns false

The ‘try’ bit in the method names imply that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same list you cannot be sure what’s in there when a specific thread tries to read from it.

Example

The example is almost identical to what we saw for the collections discussed previously in the posts on TPL.

Declare and fill a concurrent bag:

ConcurrentBag<int> concurrentBag = new ConcurrentBag<int>();

for (int i = 0; i < 5000; i++)
{
	concurrentBag.Add(i);
}

Next we’ll try to take every item from the bag. The bag will be accessed by several tasks at the same time. The counter variable – which is also shared and synchronised- will be used to check if all items have been retrieved.

int counter = 0;

Task[] bagTasks = new Task[20];
for (int i = 0; i < bagTasks.Length; i++)
{
	bagTasks[i] = Task.Factory.StartNew(() =>
	{
		while (concurrentBag.Count > 0)
		{
			int bagElement;
			bool success = concurrentBag.TryTake(out bagElement);
			if (success)
			{
				Interlocked.Increment(ref counter);
			}
		}
	});
}

The while loop will ensure that we’ll try to take the items as long as there’s something left in the collection.

Wait for the tasks and print the number of items processed – the counter should have the same value as the number of items in the bag:

Task.WaitAll(bagTasks);
Console.WriteLine("Counter: {0}", counter);

View the list of posts on the Task Parallel Library here.

Share data between Tasks using locks in .NET C#

You need to be careful when sharing mutable data across several tasks. If you don’t restrict the access to a shared variable then the tasks will all modify it in an unpredictable way. It’s because you cannot be sure how the tasks will be started and managed by the operating system: it depends on a range of parameters, such as the CPU usage, available memory, etc. If each task is supposed to modify a variable then maybe the first task won’t finish its work before the second task reads the same variable.

Consider the following:

class BankAccount
{
	public int Balance { get; set; }
}

The following code builds a list of tasks that each increase the balance by 1:

BankAccount account = new BankAccount();
List<Task> tasks = new List<Task>();

for (int i = 0; i < 5; i++)
{
	tasks.Add(Task.Factory.StartNew(() =>
	{
		for (int j = 0; j < 1000; j++)
		{
			account.Balance = account.Balance + 1;
		}
	}));
}
Task.WaitAll(tasks.ToArray());
Console.WriteLine(string.Concat("Expected value 5000", ", actual value ",account.Balance));

We expect the final balance to be 5000. Run the code and you will most likely get something less, such as 3856 or 4652. At times you get exactly 5000 but it’s unpredictable.

In such cases you need to identify the volatile critical regions in your code. In this example it’s simple:

account.Balance = account.Balance + 1;

It is the account balance that is modified by each thread so we need to lock it for access so that only one thread can modify it at a certain time. You can take the word “lock” almost literally as we use the lock keyword to do that. We take an object which will act as the synchronisation primitive. The lock object must be visible to all tasks:

object lockObject = new object();

You use the lock keyword as follows:

for (int i = 0; i < 5; i++)
{
	tasks.Add(Task.Factory.StartNew(() =>
	{
		for (int j = 0; j < 1000; j++)
		{
			lock (lockObject)
			{
				account.Balance = account.Balance + 1;
			}
		}
	}));
}

The volatile region of the code is locked using the lock object. The lock is released as soon as the current task has finished working with it.

Run the modified code and you should get the correct result.

View the list of posts on the Task Parallel Library here.

Getting a return value from a Task with C#

Sometimes you want to get a return value from a Task as opposed to letting it run in the background and forgetting about it. You’ll need to specify the return type as a type parameter to the Task object: a Task of T.

.NET 4.0

Without specifying an input parameter:

Task<int> task = new Task<int>(() => 
{
    int total = 0;
    for (int i = 0; i < 500; i++)
    {
        total += i;
    }
    return total;
});

task.Start();
int result = Convert.ToInt32(task.Result);

We count to 500 and return the sum. The return value of the Task can be retrieved using the Result property which can be converted to the desired type.

You can provide an input parameter as well:

Task<int> task = new Task<int>(obj => 
{
    int total = 0;
    int max = (int)obj;
    for (int i = 0; i < max; i++)
    {
        total += i;
    }
    return total;
}, 300);

task.Start();
int result = Convert.ToInt32(task.Result);

We specify that we want to count to 300.

.NET 4.5

The recommended way in .NET 4.5 is to use Task.FromResult, Task.Run or Task.Factory.StartNew:

FromResult:

public async Task DoWork()
{
       int res = await Task.FromResult<int>(GetSum(4, 5));	
}

private int GetSum(int a, int b)
{
	return a + b;
}

Please check out Stefan’s comments on the usage of FromResult in the comments section below the post.

Task.Run:

public async Task DoWork()
{
	Func<int> function = new Func<int>(() => GetSum(4, 5));
	int res = await Task.Run<int>(function);
}

private int GetSum(int a, int b)
{
	return a + b;
}

Task.Factory.StartNew:

public async Task DoWork()
{
	Func<int> function = new Func<int>(() => GetSum(4, 5));
	int res = await Task.Factory.StartNew<int>(function);			
}

private int GetSum(int a, int b)
{
	return a + b;
}

View the list of posts on the Task Parallel Library here.

Thread safe collections in .NET: ConcurrentDictionary

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent dictionaries

Concurrent dictionaries are thread-safe equivalent collections of “normal” dictionaries, i.e. key-value pair collections. Concurrent dictionaries are ideal if you would like to share a key-pair collection of types K and V among several tasks.

Important methods:

  • TryAdd(K key, V value): adds an new key-value pair to the collection. Returns true if the insertion was successful
  • TryGetValue(K key, out V value): tries to retrieve the value of the key. Returns true if the extraction was successful and the value is assigned to the out parameter
  • TryRemove(K key, out V value): tries to remove the key-value pair associated with the key. Returns true if the deletion was successful and the value is assigned to the out parameter
  • TryUpdate(K key, V new, V current): update the value of the key-value pair with the ‘new’ value if the current value is equal to ‘current’. Returns true if the update was successful
  • ContainsKey(K key): same as ContainsKey of the normal Dictionary class, i.e. returns true if the key is found in the dictionary

The ‘try’ bit in the method names imply that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same collection you cannot be sure what’s in there when a specific thread tries to read from it. E.g. even if ContainsKey returns true there’s no guarantee that the key-value pair is still present when the thread tries to read from it as another thread might have already removed it.

Example

We’ll need a simple object with a single property for the example:

class Series
{
	public int CurrentValue
	{
		get;
		set;
	}
}

The following code creates 20 tasks and each task increases the value of the CurrentValue property in the shared dictionary by 1000 in loop. So we’re expecting the final value to be 20000. We fill up the task array in a loop and start the tasks individually. The key-value in the dictionary may look like the following at a certain stage:

key: 0 (the task ID represented by the taskParameter object, which is the same as ‘i’ in the main loop), value: 40
key: 1, value 46
key: 2: value 43
.
.
.
key: 19, value 45

After the loop the values of each thread are added to the CurrentValue property:

Series series = new Series();
ConcurrentDictionary<int, int> concurrentDictionary = new ConcurrentDictionary<int, int>();
Task<int>[] taskArray = new Task<int>[20];
for (int i = 0; i < taskArray.Length; i++)
{
	concurrentDictionary.TryAdd(i, series.CurrentValue);

	taskArray[i] = Task.Factory.StartNew<int>((taskParameter) =>
	{
		int current;
		bool valueRetrieved;
		int key = Convert.ToInt32(taskParameter);
		for (int j = 0; j < 1000; j++)
		{
			valueRetrieved = concurrentDictionary.TryGetValue(key, out current);
			concurrentDictionary.TryUpdate(key, current + 1, current);
		}

		int result;
		valueRetrieved = concurrentDictionary.TryGetValue(key, out result);
		if (valueRetrieved)
		{
			return result;
		}
		else
		{
			throw new Exception(String.Format("No data item available for key {0}", taskParameter));
		}
	}, i);
}

for (int i = 0; i < taskArray.Length; i++)
{
	series.CurrentValue += taskArray[i].Result;
}

Console.WriteLine("Expected value {0}, Balance: {1}", 20000, series.CurrentValue);

View the list of posts on the Task Parallel Library here.

Thread safe collections in .NET: ConcurrentStack

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent stacks

If you don’t know what Stacks are then you can read about them here. The Stack of T generic collection has a thread-safe counterpart called ConcurrentStack. Important methods:

  • Push(T element): adds an item of type T to the collection
  • PushRange(T[] elements) and PushRange(T[] elements, int, int): same as Push but is used for adding an array of items to the collection
  • TryPeek(out T): tries to retrieve the next element from the collection without removing it. The value is set to the out parameter if the method succeeds. Otherwise it returns false.
  • TryPop(out T): tries to get the first element. It removes the item from the collection and sets the out parameter to the retrieved element. Otherwise the method returns false
  • TryPopRange(out T[] elements) and TryPopRange(out T[], int, int): same as TryPop but is used for arrays

The ‘try’ bit in the method names imply that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same stack you cannot be sure what’s in there when a specific thread tries to read from it.

Example

Declare and fill a concurrent stack:

ConcurrentStack<int> concurrentStack = new ConcurrentStack<int>();

for (int i = 0; i < 5000; i++)
{
	concurrentStack.Push(i);
}

Next we’ll try to pop every item from the stack. The stack will be accessed by several tasks at the same time. The counter variable – which is also shared – will be used to check if all items have been retrieved.

int counter = 0;

Task[] stackTasks = new Task[10];
for (int i = 0; i < stackTasks.Length; i++)
{
	stackTasks[i] = Task.Factory.StartNew(() =>
	{
		while (concurrentStack.Count > 0)
		{
			int currentElement;
			bool success = concurrentStack.TryPop(out currentElement);
			if (success)
			{
				Interlocked.Increment(ref counter);
			}
		}
	});
}

The while loop will ensure that we’ll try to pop the items as long as there’s something left in the collection.

Wait for the tasks and print the number of items processed – the counter should have the same value as the number of items in the stack:

Task.WaitAll(stackTasks);
Console.WriteLine("Counter: {0}", counter);

View the list of posts on the Task Parallel Library here.

7 ways to start a Task in .NET C#

New threads can be started using the Task Programming Library in .NET in – at last – 5 different ways.

You’ll first need to add the following using statement:

using System.Threading.Tasks;

The most direct way

Task.Factory.StartNew(() => {Console.WriteLine("Hello Task library!"); });

Using Action

Task task = new Task(new Action(PrintMessage));
task.Start();

…where PrintMessage is a method:

private void PrintMessage()
{
    Console.WriteLine("Hello Task library!");
}

Using a delegate

Task task = new Task(delegate { PrintMessage(); });
task.Start();

Lambda and named method

Task task = new Task( () => PrintMessage() );
task.Start();

Lambda and anonymous method

Task task = new Task( () => { PrintMessage(); } );
task.Start();

Using Task.Run in .NET4.5

public async Task DoWork()
{
	await Task.Run(() => PrintMessage());
}

Using Task.FromResult in .NET4.5 to return a result from a Task

public async Task DoWork()
{
	int res = await Task.FromResult<int>(GetSum(4, 5));	
}

private int GetSum(int a, int b)
{
	return a + b;
}

You cannot start a task that has already completed. If you need to run the same task you’ll need to initialise it again.

View the list of posts on the Task Parallel Library here.

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

ARCHIVED: Bite-size insight on Cyber Security for the not too technical.