Parallel Programming Notes - Parallel Tasks
data parallelism - single operation is applied to many inputs e.g. parallel loop
task parallelism - multiple operations each with its own input
tasks implemented by System.Threading.Tasks.Task type
static Task.Factory used to create and schedule new tasks
new tasks are placed in work queue
tasks run when associated task scheduler removes the from the queue
task scheduler attempts to optimize overall throughput by controlling system's degree of concurrency
The Basics
sequential operation is
    DoLeft();
    DoRight();
methods are independent
call in parallel using
   Parallel.Invoke(DoLeft, DoRight);
Invoke method takes params array argument list
Invoke method returns when all tasks have completed
can't assume all parallel tasks will run immediately
delegate methods can either complete or throw an exception
all exceptions are rethrown as inner exceptions of AggregateException
internally Parallel.Invoke uses methods of Task objects
    Task t1 = TaskFactory.StartNew(DoLeft);
    Task t2 = TaskFactory.StartNew(DoRight);

    Task.WaitAll(t1, t2);
    
Example
image processing is done on two bitmaps
bitmaps are then alpha-blended
SetToGray and Rotate are independent methods
    static int SequentialImageProcessing(Bitmap source1, Bitmap source2, Bitmap layer1, Bitmap layer2, Graphics blender)
    {
        SetToGray(source1, layer1);
        Rotate(source2, layer2);
        Blend(layer1, layer2, blender);

        return source1.Width;
    }
explicitly assigning tasks to run in parallel
    static int ParallelImageProcessing(Bitmap source1, Bitmap source2, Bitmap layer1, Bitmap layer2, Graphics blender)
    {
        Task toGray = Task.Factory.StartNew(() => SetToGray(source1, layer1));
        Task rotate = Task.Factory.StartNew(() => Rotate(source2, layer2);
        Task.WaitAll(toGray, rotate);
        Blend(layer1, layer2, blender);

        return source1.Width;
    }
same code using parallel invoke
    static int ParallelImageProcessing(Bitmap source1, Bitmap source2, Bitmap layer1, Bitmap layer2, Graphics blender)
    {
        Parallel.Invoke(() => SetToGray(source1, layer1), () => Rotate(source2, layer2));
        Task.WaitAll(toGray, rotate);
        Blend(layer1, layer2, blender);

        return source1.Width;
    }
Canceling a Task
tasks use a cooperative cancellation model
running task must poll for cancellation request at appropriate intervals
.NET uses two types
one allows program to request cancellation
one checks for cancellation requests
    CancellationTokenSource cts = new CancellationTokenSource();
    CancellationToken token = cts.Token;

    Task myTask = Task.Factory.StartNew(() =>
    {
        for (...)
        {
            // throws OperationCanceledException
            token.ThrowIfCancellationRequested();
            ...
        }
    }

    // elsewhere
    cts.Cancel();
        
OperationCanceledException should not be handled within task
use profiling to determine where and how often cancellation checks should be made
Handling Exceptions
if Task object's delegate throws an unhandled exception, the task terminates and its status property is set to TaskStatus.Faulted
exception is temporarily unobserved, Task Parallel Library catches exception and records its details
Ways to Observe an Unhandled Task Exception
invoking faulted task's Wait method causes exception to be observed, exception is thrown in the calling context of Wait method
Task type's static WaitAll method observes unhandled exceptions in more than one task
Parallel.Invoke includes implicit call to WaitAll
WaitAll invocation groups unhandled exceptions as details in AggregateException object
getting Exception property of faulted task causes unhandled exception to be observed, returns an AggregateException object, exception is not rethrown
AggregationExceptions
runtime collects unhandled exceptions and wraps them in an AggregateException
TPL throws exception in context of waiting thread
InnerExceptions property is collection of unhandled exceptions
inner exceptions' stack trace is preserved
OperationCanceledException will be an inner exception
The Handle Method
AggregateException.Handle invokes user-provided delegate method for each of the inner exceptions
return value of delegate is a boolean representing whether the exception was handled or not
    try
    {
        Task t = Task.Factory.StartNew(...);
        ...
        t.Wait();
    }
    catch(AggregateException ae)
    {
        ae.Handle(e =>
        {
            if(e is MyException)
            {
                ...
                return true;
            }
            else
            {
                return false;
            }
        });
    }
if any unhandled exceptions remain the Handle method packages the exceptions into an AggregateException which is then rethrown
The Flatten Method
when tasks are nested an AggregateException can contain AggregateExceptions
method converts tree of inner exceptions into a flat sequence of inner exceptions
    try
    {
        Task t1 = Task.Factory.StartNew(() =>
        {
            Task t2 = Task.Factory.StartNew(() =>
            {
                ...
                throw new MyException();
            });
            t2.Wait();
        });
        ...
        t1.Wait();
    }
    catch(AggregateException ae)
    {
        ae.Flatten().Handle(e =>
        {
            if(e is MyException)
            {
                ...
                return true;
            }
            else
            {
                return false;
            }
        });
    }
Waiting for the First Task to Complete
Task.WaitAll method returns when a set of tasks completes
Task.WaitAny method returns when the first of a set of tasks completes
    var taskIndex = -1;
    Task[] tasks = new Task[]
                    {
                        Task.Factory.StartNew(DoLeft),
                        Task.Factory.StartNew(DoRight),
                        Task.Factory.StartNew(DoCenter)
                    };
    Task[] allTasks = tasks;
    
    // print completion notices as tasks finish
    while(tasks.Length > 0)
    {
        taskIndex = Task.WaitAny(tasks);
        Console.WriteLine("Finished task : {0}", taskIndex + 1);
        tasks = tasks.Where((t) => t != tasks[taskIndex]).ToArray();
    }
    // observe any exceptions
    try
    {
        Task.WaitAll(tasks);
    }
    catch(AggregateException ex)
    {
        ...
    }
Task.WaitAny does not observe exceptions
Speculative Execution
perform an operation in anticipation of a particular result from another operation
if the other operation's result is different, cancel operation and restart using unexpected result
run parallel searches
    SpeculativeInvoke(SearchLeft, SearchRight, SearchMiddle);
    ...
    public static void SpeculativeInvoke(params Action<CancellationToken>[] actions)
    {
        var cts = new CancellationTokenSource();
        var token = cts.Token;
        var tasks = (from a in actions
                     select Task.Factory.StartNew(() => a(token), token)).ToArray();
        // wait for fastest task to complete
        Task.WaitAny(tasks);
        // cancel slower tasks
        cts.Cancel
        // wait for cancellation to finish and observe exceptions
        try
        {
            Task.WaitAll(tasks);
        }
        catch(AggregateException ae)
        {
            // filter out the exception caused by cancellation
            ae.Flatten().Handle(e => e is OperationCanceledException);
        }
        finally
        {
            if(cts != null)
                cts.Dispose();
        }
    }
Creating Tasks with Custom Scheduling
use custom task scheduler with overloaded versions of Task.Factory.New
common case is when task needs to run in context of a particular thread, thread affinity required
Anti-Patterns
Variables Captured by Closures
closures may refer to variables declared outside of their lexical scope
    for(int i = 0; i < 4; i++)
    {
        // buggy
        Task.Factory.StartNew(() => Console.WriteLine(i));
    }
i is shared by all the closures of the loop
    for(int i = 0; i < 4; i++)
    {
        var tmp = i;
        Task.Factory.StartNew(() => Console.WriteLine(tmp));
    }
tmp is not shared
Disposing of a Resources Needed by a Task
    Task<string> t;
    using (var file = new StringReader("text"))
    {
        t = Task<string>.Factory.StartNew(() => file.ReadLine());
    }
    // bug - file has been disposed
    Console.WriteLine(t.Result);
Avoid Thread Abort
terminating tasks with Thread.Abort leaves App Domain in a Potentially unstable state
aborting thread pool worker thread is never recommended
Design Notes
Tasks & Threads
when a task runs the task scheduler invokes the task's delegate in a thread of its choosing
no migration across threads at run-time
Tasks Life Cycle
Waiting To Run -> Canceled
Waiting To Run -> Running -> Canceled
Waiting To Run -> Running -> Faulted
Waiting To Run -> Running -> RunToCompletion
Writing a Custom Task Scheduler
.NET framework includes two task scheduler implementations
default task scheduler
task scheduler runs on a target synchronization context
scenarios
want single degree of parallelism across multiple loops and tasks instead of across a single loop
want to implement alternative scheduling algorithms
want to use specific set of threads such as STA threads instead of thread pool worker threads
Unobserved Task Exceptions
unobserved task exceptions will be observed in the finalizer thread context
if unhandled exception occurs in finalizer by default the run-time will terminate the process
existing unmanaged resource handles will not be released
can subscribe to UnobservedExceptionEvent of the TaskScheduler & handle the exceptions as they propagate in the finalizer context
during finalization the task's status determines how the unobserved task exception is treated
OperationCanceledException types are only reported via the UnobservedExceptionEvent, they are not propagated
Relationship Between Data Parallelism & Task Parallelism
Parallel.Invoke is task parallelism
TPL may use parallel loops (data parallelism) to perform the same operation (invoking the delegate) on each piece of data
Default Task Scheduler
default task scheduler is tightly integrated with the thread pool
worker threads are managed by ThreadPool type
default task scheduler abstractly is an improved thread pool
work items return a handle that a thread can use as a wait condition
The Thread Pool
simplest form consists of a global queue & threads processing work items on a FIFO basis
scaling issues, each end of queue can only be accessed by one thread at a time
all forms of synchronization have potential to block
heavy synchronization expense
Decentralized Scheduling Techniques
.NET framework provides local task queues for each worker thread in thread pool
local queue needs very limited synchronization
local queues constructed using special concurrent data structure known as a work-stealing queue
queue allows lock-free pushes & pops
makes task execution less predictable
local queues use LIFO to avoid synchronization costs
Work Stealing
when a worker thread finds both its local queue and the global queue empty, the system 'steals' a task from the public end of a local queue
Top-level Tasks in Global Queue
tasks are placed in global queue when task factory method is invoked by a thread which is not a thread pool worker
tasks in global queue are top-level tasks
Subtasks in Local Queue
when thread pool worker creates a task, the task scheduler puts the task in the thread's local queue
assumption that minimizing worst case latency isn't important
goal is to optimize overall system throughput
tasks in local queue are subtasks
Inlined Execution of Subtasks
sometimes a task needs a second task to complete before it can continue
if Task.Wait or Task.WaitAll is called from thread pool worker TPL can determine if second task has started
if second task has not started it can be run in the context of the first task's thread
Thread Injection
thread pool manages number of threads in the pool
built-in heuristics determine when to add & remove threads
two mechanisms
starvation avoidance mechanism adds worker threads when no progress is made on queued items
hill-climbing heuristic to maximize throughput using fewest threads possible
goal of starvation avoidance is to prevent deadlock
goal of hill-climbing heuristic is to improve utilization of cores
Bypassing The Thread Pool
use LongRunning task creation option as argument to task factory methods
mostly used for tasks with long I/O related wait conditions
cannot use inline execution for subtasks
back to index
n4jvp.com