Parallel Programming Notes - Pipelines
pipeline pattern use parallel tasks & concurrent queues to process a sequence of input values
each task implements a stage of the pipeline
queues act as buffers
outputs occur in the same order as inputs
composed of a series of producer/consumer stages
use when there are too many dependencies for a parallel loop
The Basics
buffers are usually based on theBlockingCollection<T> type
pipeline with 4 stages
    input
    // stage 1
    read strings
    buffer 1
    // stage 2
    correct casing
    buffer 2
    // stage 3
    create sentences
    buffer 3
    // stage 4
    write sentences
    output
each stages reads from a dedicated input and writes to a particular output
all stages can execute at the same time because concurrent queues buffer shared inputs and outputs
a stage can add its value to its output buffer as long as there is room
if there is no room the stage blocks until buffer space become available
blocking can also occur on input
BlockingCollection<T> method CompleteAdding signals 'EOF'
the consumer can the shut down once the queue is empty
implementation of pipeline
    int seed = ...
    int bufferSize = ...
    var buffer1 = new BlockingCollection<string>(bufferSize);
    var buffer2 = new BlockingCollection<string>(bufferSize);
    var buffer3 = new BlockingCollection<string>(bufferSize);

    var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContunationOptions.None);

    var stage1 = f.StartNew(() => ReadStrings(buffer1, ...));
    var stage2 = f.StartNew(() => CorrectCase(buffer1, buffer2));
    var stage3 = f.StartNew(() => CreateSentences(buffer2, buffer3));
    var stage4 = f.StartNew(() => WriteSentences(buffer3));

    Task.WaitAll(stage1, stage2, stage3, stage4);
    
first stage of pipeline includes sequential loop that writes to the output buffer,br/> loop populates output buffer with values
values accessed by PhraseSource method which returns ordinary single-threaded instance of IEnumerable<string>
call to CompleteAdding method is done in finally block
    static void ReadStrings(BlockingCollection<string> output, int seed)
    {
        try
        {
            foreach(var phrase in PhraseSource(seed))
            {
                stage1AdditionalWork();
                output.Add(phrase);
            }
        }
        finally
        {
            output.CompleteAdding();
        }
    }
implementation of stages 2 & 3 are structured similarly to this code
    void DoStage(BlockingCollection<string> input, BlockingCollection<string> output)
    {
        try
        {
            foreach(var item in input.GetConsumingEnumerable())
            {
                var result = ...
                output.Add(result);
            }
        }
        finally
        {
            output.CompleteAdding();
        }
    }
last stage writes values to a stream
    static void WriteSentences(BlockingCollection<string> input)
    {
        using StreamWriter outfile = new StreamWriter(...)
        {
            foreach (var sentence in input.GetConsumingEnumerable())
            {
                var printSentence = ...
                outfile.WriteLine(printSentence);
            }
        }
    }
Performance Characteristics
when stages of a pipeline don't take the same amount of time, the speed of the pipeline is approximately the same as the speed of its slowest stage
the most efficient pipelines have stages of near equal speed
Canceling a Pipeline
observe cancellation token in two places
    void DoStage(BlockingCollection<string> input, BlockingCollection<string> output, CancellationToken token)
    {
        try
        {
            foreach(var item in input.GetConsumingEnumerable())
            {
                if(token.IsCancellationRequested)
                    break;
                var result = ...
                output.Add(result, token);
            }
        }
        catch(OperationCanceledException)
        {
        }
        finally
        {
            output.CompleteAdding();
        }
    }
natural place to check for cancellation is at beginning of loop
pass the token to the output BlockingCollection<T> to avoid possible deadlock
if type T implements IDispose, must call Dispose method on cancellation in stages
    int count = 0;
    int clockOffset = Environment.TickCount;
    var token = cts.Token;
    ImageInfo info = null;
    try
    {
        foreach (var fileName in fileNames)
        {
            if (token.IsCancellationRequested)
                break;
            info = LoadImage(fileName, sourceDir, count, clockOffset);
            original.Add(info, token);
            count += 1;
            info = null;
        }                
    }
    catch (Exception e)
    {
        // in case of exception, signal shutdown to other pipeline tasks
        cts.Cancel();
        if (!(e is OperationCanceledException))
            throw;
    }
    finally
    {
        original.CompleteAdding();
        if (info != null) info.Dispose();
    }
need to cleanup the queues on error or cancellation
    static void RunPipelined(IEnumerable<string> fileNames, string sourceDir, int queueLength, Action<ImageInfo> displayFn, CancellationTokenSource cts)
    {
        // Data pipes 
        var originalImages = new BlockingCollection<ImageInfo>(queueLength);
        var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength);
        var filteredImages = new BlockingCollection<ImageInfo>(queueLength);
        try
        {
            var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            Action<ImageInfo> updateStatisticsFn = info =>
            {
                info.QueueCount1 = originalImages.Count();
                info.QueueCount2 = thumbnailImages.Count();
                info.QueueCount3 = filteredImages.Count();
            };

            // Start pipelined tasks
            var loadTask = f.StartNew(() => LoadPipelinedImages(fileNames, sourceDir, originalImages, cts));
            var scaleTask = f.StartNew(() => ScalePipelinedImages(originalImages, thumbnailImages, cts));
            var filterTask = f.StartNew(() => FilterPipelinedImages(thumbnailImages, filteredImages, cts));
            var displayTask = f.StartNew(() => DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(), displayFn, updateStatisticsFn, cts));
            Task.WaitAll(loadTask, scaleTask, filterTask, displayTask);
        }
        finally
        {
            // in case of exception or cancellation, there might be bitmaps
            // that need to be disposed.
            DisposeImagesInQueue(originalImages);
            DisposeImagesInQueue(thumbnailImages);
            DisposeImagesInQueue(filteredImages);
        }
    }
    static void DisposeImagesInQueue(BlockingCollection<ImageInfo> queue)
    {
        if (queue != null)
        {
            queue.CompleteAdding();
            foreach (var info in queue)
            {
                info.Dispose();
            }
        }
    }
Handling Pipeline Exceptions
exceptions are similar to cancellations
difference is no default notification
when an exception occurs in a stage use special instantiation of CancellationTokenSource type to coordinate pipeline shutdown
    static void DoPipeline(CancellationToken token)
    {
        using (CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(token))
        {
            var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskCreationOptions.None);

            var stage1 = f.StartNew(() => DoStage1(..., cts));
            var stage2 = f.StartNew(() => DoStage2(..., cts));
            var stage3 = f.StartNew(() => DoStage3(..., cts));
            var stage4 = f.StartNew(() => DoStage4(..., cts));

            Task.WaitAll(stage1, stage2, stage3, stage4);
        }
    }
  
CancellationTokenSource.CreateLinkedTokenSource method creates a handle that allow response to an external cancellation request and also initiate & respond to an internal cancellation request
pass linked cancellation token to stages
    static void DoStage(... , CancellationTokenSource cts)
    {
        var token = cts.Token;
        try
        {
            foreach(var item in input.GetConsumingEnumerable())
            {
                if(token.IsCancellationRequested)
                    break;
                var result = ...
                output.Add(result, token);
            }
        }
        catch (Exception e)
        {
            cts.Cancel();
            if (!(e is OperationCanceledException))
                throw;
        }
        finally
        {
            ...
        }
    }        
after all pipeline tasks stop an AggregateException is thrown by the Task.WaitAll method
Load Balancing using Multiple Producers
BlockingCollection<T> allows more than one producer to read values
use static TakeFromAny method to implement load-balancing strategies for some scenarios
Pipelines & Streams
blocking collections & streams have some similarities
can use pipeline pattern with library methods that read & write to streams
can create a stream whose underlying implementation relies on tasks and blocking collections
Asynchronous Pipelines
in an asynchronous pipeline tasks aren't created until data becomes available
AsyncCall type is a queue that a producer puts data into
if no task is processing a new task is created
when task empties its queue the task exits
Anti-Patterns
Thread Starvation
pipeline requires all of its task to execute concurrently
if not enough threads blocking collections can fill up & block indefinitely
to guarantee a thread for each task use the LongRunning task creation option
Infinite Blocking Collection Waits
avoid by calling CancellationTokenSource.Cancel when an exception occurs in a pipeline stage
Forgetting GetConsumableEnumerable()
use GetConsumingEnumerable() method to iterate blocking collection
iterating the queue using IEnumerable uses a snapshot other queue rather than the queue itself
Using Other Producer/Consumer Collections
can use custom storage mechanism in place of BlockingCollection<T>
must implement IProducerConsumerCollection
other implementations include ConcurrentBag (unordered) & ConcurrentStack (LIFO) types
generally FIFO ordering is used in pipelines
Design Notes
many patterns include the pipeline pattern do not automatically scale with the number of cores
this is limitation unless additional parallelism is introduced within a pipeline stage
for high parallelism stages' execution times should all be similar
pipeline is gated by its slowest stage
buffer size is important
too small & the pipeline may be blocked
buffers should be large enough to absorb variability in pipeline flow but no larger
back to index
n4jvp.com