C# 并行编程 之 PLINQ 规约操作和聚合函数

概要

PLINQ可以简化对一个序列或一个组中所有成员应用同一个函数的过程,这个过程称之为规约操作。类似Sum()函数就是一个规约操作。PLINQ提供一个可重载Aggregate的接口,这里用户可以定义自己的规约函数。

规约操作是对每一个成员进行的操作,当操作完成后有可能需要将操作结果进行汇总得到一个最终的结果,这个就是聚合的概念。

规约操作

示例中要求计算 1 到 50000000中能被5整除的数除以PI以后得到的平均数。它可以用LINQ完成,也可以用PLINQ完成。

代码示例:

using System; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Diagnostics; using System.Linq; using System.IO; using System.Collections.Generic; namespace Sample6_2_plinq_calculate { class Program { static int NUM_INTS = 50000000; static IEnumerable GenerateInputeData() { return Enumerable.Range(1, NUM_INTS); } static ParallelQuery GenerateInputeData4Parallel() { return ParallelEnumerable.Range(1, NUM_INTS); } static void Main(string[] args) { var seqTarget = GenerateInputeData(); Console.WriteLine("============================================================"); Console.WriteLine("TEST NORMAL LINQ"); Console.WriteLine("============================================================"); var swatchpn = Stopwatch.StartNew(); var seqQuery = (from intNum in seqTarget where ((intNum % 5) == 0) select (intNum / Math.PI)).Average(); swatchpn.Stop(); Console.WriteLine("LINQ Result: " + seqQuery + " LINQ Use Time: {0}", swatchpn.Elapsed); var palTarget = GenerateInputeData4Parallel(); Console.WriteLine("\n\n"); Console.WriteLine("============================================================"); Console.WriteLine("TEST PARALLEL LINQ"); Console.WriteLine("============================================================"); var swatchp = Stopwatch.StartNew(); var palQuery = (from intNum in palTarget.AsParallel() where ((intNum % 5) == 0) select (intNum / Math.PI)).Average(); swatchp.Stop(); Console.WriteLine("PLINQ Result: " + palQuery + " LINQ Use Time: {0}", swatchp.Elapsed); Console.ReadLine(); } } }

测试结果:

C# 并行编程 之 PLINQ 规约操作和聚合函数

聚合操作

代码示例会计算一个数组的标准偏差,偏度,和峰度来说明聚合的使用。
顺便补补数学吧:

标准偏差:一种量度数据分布的分散程度之标准,用以衡量数据值偏离算术平均值的程度。标准偏差越小,这些值偏离平均值就越少,反之亦然。标准偏差的大小可通过标准偏差与平均值的倍率关系来衡量。
图片公式来自百度百科。
C# 并行编程 之 PLINQ 规约操作和聚合函数


偏度:偏度系数是描述分布偏离对称性程度的一个特征数。当分布左右对称时,偏度系数为0。当偏度系数大于0时,即重尾在右侧时,该分布为右偏。当偏度系数小于0时,即重尾在左侧时,该分布左偏。

C# 并行编程 之 PLINQ 规约操作和聚合函数


峰度:表示分布相对于正太分布而言是更加高耸还是更加平坦。正值表示相对高耸的分布,负值表示相对平坦的峰度。简单的说,峰度是描述分布形态的陡缓程度。也可以这样理解,在相同的标准差下,峰度系数越大,分布就有更多的极端值,那么其余值必然要更加集中在众数周围,其分布必然就更加陡峭。



关于Aggregate 函数的参数说明参考
https://msdn.microsoft.com/en-us/zh-en/library/dd383667(v=vs.110).aspx

关于参数的简单说明:

seed:是累加器初始化的值。 update accumulator function:对数组中每一个值进行运算,PLINQ中由于它是对数据源进行了分区然后并行运算的,这一步产生的结果其实是保存的每一个分区的计算结果。 combine accumulator function:将每一分区的计算结果进行累加,得到一个总的数组的累加结果。 result selector:对累加结果进行运算,得到最终的结果,也就是返回值。

示例的重点并不是各种数字运算,而是说明Aggregate() 可以对数据源每一个元素运算后将结果进行汇总再次运算,它可以在一个步骤中完成,省去了分别编写的麻烦。而且它对数据运算时是数据分区,任务并行的。vcD4NCjxwPs/Cw+bKx7zGy+O1xLT6wuvKvsD9o7o8L3A+DQo8cHJlIGNsYXNzPQ=="brush:java;"> using System; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Diagnostics; using System.Linq; using System.IO; using System.Collections.Generic; namespace Sample6_2_plink_aggregate { class Program { static void Main(string[] args) { int[] inputInts = {0,3,4,8,15,22,34,57,68,32,30}; var mean = inputInts.AsParallel().Average(); var standarddeviation = inputInts.AsParallel().Aggregate( 0d, // seed // update accumulator function // An accumulator function to be invoked on each element in a partition (subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2), // combine accumulator function // An accumulator function to be invoked on the yielded accumulator result from each partition. (total, thisTask) => total + thisTask, // result selector // A function to transform the final accumulator value into the result value. (finalSum) => Math.Sqrt((finalSum / (inputInts.Count()-1))) ); var skewness = inputInts.AsParallel().Aggregate( 0d, // seed // update accumulator function // An accumulator function to be invoked on each element in a partition (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3), // combine accumulator function // An accumulator function to be invoked on the yielded accumulator result from each partition. (total, thisTask) => total + thisTask, // result selector // A function to transform the final accumulator value into the result value. (finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count()-1)*(inputInts.Count()-2)) ); var kurtosis = inputInts.AsParallel().Aggregate( 0d, // seed // update accumulator function // An accumulator function to be invoked on each element in a partition (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4), // combine accumulator function // An accumulator function to be invoked on the yielded accumulator result from each partition. (total, thisTask) => total + thisTask, // result selector // A function to transform the final accumulator value into the result value. (finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) / ((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) - (3 * Math.Pow((inputInts.Count() - 2), 2)) / ((inputInts.Count() - 2) * (inputInts.Count() - 3)) ); Console.WriteLine("============================================================"); Console.WriteLine("TEST Parallel LINQ Calculate Result"); Console.WriteLine("============================================================"); Console.WriteLine("Mean : {0}", mean); Console.WriteLine("Standard Deviaton : {0}", standarddeviation); Console.WriteLine("Skewness : {0}", skewness); Console.WriteLine("Kurtosis : {0}", kurtosis); Console.ReadLine(); } } }

并发的PLINQ任务和任务的取消

PLINQ同样也可以和其他形式的并发任务一起使用。例如在计算 标准偏差,偏度和峰度的过程中。
实际的执行顺序是 平均值 => 标准偏差 => 偏度 => 峰度

但根据运算的公式,完全可以把偏度和峰度进行并行化处理的。标准差是他们公共的输入。
平均值 => 标准偏差 => 偏度
=> 峰度

它们完全可以使用ContinueWith操作,如果有超时控制或取消需要的话,可以使用WithCancellation() 接口。

代码示例:
代码中用函数将 PLINQ 的操作又进行了封装,然后用Task的方式进行并行化的调用。deferredCancelTask 是一个捣乱任务,如果把注释打开,在2秒时它会发出一个Cancel信号,取消任务的执行,并且在异常处理时打印任务的状态。

using System; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Diagnostics; using System.Linq; using System.IO; using System.Collections.Generic; namespace Sample6_4_parallel_task_with_plinq { class Program { private static ParallelQuery inputInts = ParallelEnumerable.Range(1, 100000000); private static double CalculateMean(System.Threading.CancellationToken ct) { return inputInts.AsParallel().WithCancellation(ct).Average(); } private static double CalculateStandardDeviation(System.Threading.CancellationToken ct, double mean) { return inputInts.AsParallel().WithCancellation(ct).Aggregate( 0d, // seed // update accumulator function // An accumulator function to be invoked on each element in a partition (subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2), // combine accumulator function // An accumulator function to be invoked on the yielded accumulator result from each partition. (total, thisTask) => total + thisTask, // result selector // A function to transform the final accumulator value into the result value. (finalSum) => Math.Sqrt((finalSum / (inputInts.Count() - 1))) ); } private static double CalculateSkewness(System.Threading.CancellationToken ct, double mean, double standarddeviation) { return inputInts.AsParallel().WithCancellation(ct).Aggregate( 0d, // seed // update accumulator function // An accumulator function to be invoked on each element in a partition (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3), // combine accumulator function // An accumulator function to be invoked on the yielded accumulator result from each partition. (total, thisTask) => total + thisTask, // result selector // A function to transform the final accumulator value into the result value. (finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count() - 1) * (inputInts.Count() - 2)) ); } private static double CalculateKurtosis(System.Threading.CancellationToken ct, double mean, double standarddeviation) { return inputInts.AsParallel().WithCancellation(ct).Aggregate( 0d, // seed // update accumulator function // An accumulator function to be invoked on each element in a partition (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4), // combine accumulator function // An accumulator function to be invoked on the yielded accumulator result from each partition. (total, thisTask) => total + thisTask, // result selector // A function to transform the final accumulator value into the result value. (finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) / ((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) - (3 * Math.Pow((inputInts.Count() - 2), 2)) / ((inputInts.Count() - 2) * (inputInts.Count() - 3)) ); } static void Main(string[] args) { Console.WriteLine("============================================================"); Console.WriteLine("TEST Parallel TASK work with PLINQ"); Console.WriteLine("============================================================"); var cts = new System.Threading.CancellationTokenSource(); var ct = cts.Token; var TaskMean = new Task(()=> CalculateMean(ct), ct); var TaskSTDev = TaskMean.ContinueWith((t) => { return CalculateStandardDeviation(ct, t.Result); }, TaskContinuationOptions.OnlyOnRanToCompletion); var TaskSkewness = TaskSTDev.ContinueWith((t) => { return CalculateSkewness(ct, TaskMean.Result, t.Result); }, TaskContinuationOptions.OnlyOnRanToCompletion); var TaskKurtosis = TaskSTDev.ContinueWith((t) => { return CalculateKurtosis(ct, TaskMean.Result, t.Result); }, TaskContinuationOptions.OnlyOnRanToCompletion); //var deferredCancelTask = Task.Factory.StartNew(() => { System.Threading.Thread.Sleep(2000); cts.Cancel();}); try { TaskMean.Start(); Task.WaitAll(TaskSkewness, TaskKurtosis); Console.WriteLine("Mean : {0}", TaskMean.Result); Console.WriteLine("Standard Deviaton : {0}", TaskSTDev.Result); Console.WriteLine("Skewness : {0}", TaskSkewness.Result); Console.WriteLine("Kurtosis : {0}", TaskKurtosis.Result); } catch(AggregateException aex) { foreach (var ex in aex.InnerExceptions) { //Console.WriteLine(ex.ToString()); if (ex is TaskCanceledException) { Console.WriteLine("Mean Task: {0}", TaskMean.Status); Console.WriteLine("Standard Deviation Task: {0}", TaskSTDev.Status); Console.WriteLine("Skewness Task: {0}", TaskSkewness.Status); Console.WriteLine("Kurtosis Task: {0}", TaskKurtosis.Status); } } } Console.ReadLine(); } } }

分类:默认分类 时间:2015-03-07 人气:2
本文关键词:
分享到:

相关文章

Copyright (C) quwantang.com, All Rights Reserved.

趣玩堂 版权所有 京ICP备15002868号

processed in 0.036 (s). 10 q(s)