Parallel Programming Notes - Parallel Aggregation
works for any binary operation that is associative
.NET implementation also expects operations to be commutative
uses unshared local variables that are merged at the end of computation
aka Parallel Reduction as it combines multiple inputs to a single output
The Basics
sequential version calculates a sum
    double[] sequence = ...
    double sum = 0.0d;
    for(int i; i < sequence.Length; i++)
    {
        sum += someMethod(sequence[i]);
    }
LINQ version is
    double[] sequence = ...
    double sum = (from x in sequence
                  select someMethod(x)).Sum();
PLINQ version is almost identical
    double[] sequence = ...
    double sum = (from x in sequence.AsParallel()
                  select someMethod(x)).Sum();
PLINQ has built-in standard query operators that count the number of elements and calculate the avarage, maximum or minimum
PLINQ has operators that create and combine sets
duplicate elimination
union
intersection
difference
and operators that transform sequences
concatenation
filtering
partitioning
and operators that group (projection)
can use the Aggregate extension to define needed aggregation operator
    double[] sequence = ...
    return (from x in sequence.AsParallel()
            select someMethod(x)).Aggregate(1.0d, (y1, y2) => y1 * y2);
Example
social network where subscribers can designate other subscribers as friends
recommends new friends by identifying others subscribers who are friends of friends
only recommends candidates who have largest number of mutual friends
first candidates identified in independent parallel operations
then candidates are ranked and selected in an aggregation operation
data structure describing ID 0 with IDs 1 & 2 as friends
    0 -> { 1, 2 }
repository contains
    0 -> { 1, 2 }
    1 -> { 0, 2, 3 }
    2 -> { 0, 1, 3, 4 }
    3 -> { 1, 2 }
    4 -> { 2 }
subscribers with IDs 3 & 4 are to be recommended to subscriber ID 0
ID 4 has a higher rank with 2 mutual friends
result described as a multiset, set where each element occurs only once yet the set can represent duplicates
   { 4(2), 3(1) }
two phases to operation
map - create collection of candidate IDs that can contain duplicates, actually collection of multisets where each candidate has a multiplicity of one
    { 3(1), 4(1), 4(1)
reduce - aggregates collection to create multiset, aggregation performed by applying binary operator, in this example the operation is a multiset union with a result
   { 4(2), 3(1) }
follow map/reduce operation with
rank candidates by sorting against multiplicity
selects candidates with highest multiplicity
sequential version
    public IDMultisetItemList PotentialFriendsSequential(SubscriberID id, int maxCandidates)
    {
        // map
        var foafsList = new List<IDMultiset>();
        foreach(SubscriberID friend in subscribers[id].Friends)
        {
            var foafs = subscribers[friend].FriendsCopy();
            foafs.RemoveWhere(foaf => foaf == id || subscribers[id].Friends.Contains(foaf));
            foafList.Add(Multiset.Create(foafs));
        }
        // reduce
        IDMultiset candidates = new IDMultiset();
        foreach(IDMultiset foafs in foafList)
        {
            candidates = Multiset.Union(foafs, candidates);
        }
        // postprocess
        return Multiset.MostNumerous(candidates, maxCandidates);
    }
PLINQ version
    public IDMultisetItemList PotentialFriendsPLinq(SubscriberID id, int maxCandidates)
    {
        var candidates = subscribers[id].Friends.AsParallel()
                            .SelectMany(friend =< subscribers[friend].Friends)
                            .Where(foaf =< foaf != id && !(subscribers[id].Friends.Contains(foaf))
                            .GroupBy(foaf => foaf)
                            .Select(foafGroup => new IDMultisetItem(foafGroup.Key, foafGroup.Count()));


        // postprocess
        return Multiset.MostNumerous(candidates, maxCandidates);
    }
Using Parallel Loops for Aggregation
Parallel.ForEach & Parallel.For have overloaded methods that can implememnt the parallel aggregation pattern
    double[] sequence = ...
    object lockObject = new object();
    double sum = 0.0d;

    Parallel.ForEach( sequence, // values to be aggregated
                      () => 0.0d, // init local partial result
                      // the loop body
                      (x, loopState, partialResult)
                      {
                           return Normalize(x) + partialResult;
                      },
                      // final step of each local context
                      (localPartialSum) =>
                      {
                        lock(lockObject)
                        {
                            sum += localPartialSum;
                        }
                      });
    return sum;
Using a Range Partitioner for Aggregation
many iterations of little work can introduce overhead
Partitioner object allows an embedded sequential for loop inside a Parallel.ForEach loop
should profile app before deciding to use a Partitioner object or not
    double[] sequence = ...
    object lockObject = new object();
    double sum = 0.0d;
    var rangePartioner = Partioner.Create(0, sequence.Length);

    Parallel.ForEach( rangePartioner, // input intervals
                      sequence, // values to be aggregated
                      () => 0.0d, // init local partial result
                      // the loop body for each interval
                      (range, loopState, initialValue)
                      {
                           double partialSum = initialValue;
                           for(int i = range.Item1; i < range.Item2; i++)
                           {
                                partialSum += Normalize(sequence[i]);
                           }
                           return partialSum;
                      },
                      // final step of each local context
                      (localPartialSum) =>
                      {
                        lock(lockObject)
                        {
                            sum += localPartialSum;
                        }
                      });
    return sum;
Using PLINQ Aggregation with Range Selection
PLINQ Aggregate extension method includes overloaded version that allows very general application of parallel aggregation pattern
financial simulation example performs repeated simulations & aggregates results into a histogram
two dependencies
result histogram
instances of Random type
    int[] histogram = makeEmptyHistogram();
    
    return ParallelEnumerable.Range(0, count).Aggregate(
                            // create empty local accumulator object that includes all task-local state
                            () => new Tuple<int[], Random>(makeEmptyHistogram(), new Random(SampleUtils.MakeRandomSeed())),
                            // run simulation adding result to local accumulator
                            (localAccumulator, i) =>
                            {
                                // get next random value for each iteration
                                var sample = localAccumulator.Item2.NextDouble();

                                if (sample > 0.0 && sample < 1.0)
                                {
                                    // perform simulation
                                    var simulationResult = doSimulation(sample, mean, stdDev);
                                    // add result to histogram
                                    int histogramBucket = (int)Math.Floor(simulationResult / BucketSize);
                                    if (0 <= histogramBucket && histogramBucket < TableSIze)
                                        localAccumulator.Item1[histogramBucket] += 1;    
                                }
                                return localAccumulator;
                            },
                            // combine local results pair-wise
                            (localAccumulator1, localAccumulator2) = >
                            {
                                return new Tuplenew Tuple<int[], Random>(combineHistograms(localAccumulator1.Item1, localAccumulator2.Item1), null);
                            },
                            // extract answer from final combination
                            finalAccumulator => finalAccumulator.Item1);
no locks required
Design Notes
parallel aggregation requires additional step in algorithm to merge partial results
partitioned input values -> accumulate subtotals (independent tasks) -> add subtotal to result (coordinated by locks)
PLINQ does not use locks, final merge step is expressed as a binary operator
PLINQ requires less synchronization so it's more scalable
anti-pattern, runs slower than sequential because of synchronization overhead
    double sequence = ...
    object lockObject = new object();
    double sum = 0.0d;

    Parallel.For(0, sequenceLength, i =>
    {
        lock(lockObject)
        {
            sum += sequence[i];
        }
    });
    return sum;
    
back to index
n4jvp.com