求知若饥,虚心若愚
.NET4.0起发布的并行拓展库的两个基本组件:TPL(Task Parallel Library, 任务并行库)和PLINQ(Parallel LINQ,并行LINQ)。还有TAP(Task-based Asynchronous Pattern,基于任务的异步模式)以及配套的C#5.0语言支持。这些新高级抽象为多线程代码编写提供了有力支持,但老的多线程处理仍有意义。
多线程处理基础
多线程术语
核心/内核、进程、线程、线程安全、线程处理模型(代码要求调用者遵守的规则)、任务(任务代表要执行的一项工作,而线程代表做这项工作的工作者)、线程池
多线程处理的目标和实现
多线程处理主要用于两个方面:实现多任务(同时进行多项任务)和解决延迟(堵塞任务时的卡顿)
使用时间分片来实现并发运行,术语有时间片和上下文切换、异步
性能问题
上下文切换代价:将CPU当前的内部状态保存到内存中,还必须加载与新线程关联的状态。
上下文和时间分片都会造成性能消耗。
线程处理的问题
缺乏原子性、竞态条件、复杂的内存模型(寄存器同步)、死锁
使用System.Threading
使用并行扩展库的任务来编写多线程更方便,不用与线程打交道。但还是需要了解没有并行扩展库是怎么写的
使用System.Threading.Thread进行异步操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 using System; using System.Threading; public class RunningASeparateThread { public const int Repetitions = 1000; public static void Main() { ThreadStart threadStart = DoWork; Thread thread = new Thread(threadStart); thread.Start(); for(int count = 0; count < Repetitions; count ++) { Console.Write('-'); } //等等子线程结束,再运行下面 thread.Join(); } public static void DoWork() { Console.Write('+'); } } // 运行结果是两个交叉打印,一会子线程打印,一会主线程打印
代码为了在不同线程的上下文中运行,需要ThreadStart或ParameterizedThreadStart类型的委托来标志要执行的代码。(但其实可以传方法给线程构造)
线程管理
方法或属性 | 描述
---|---
Join | 让一个线程等待另一个线程,允许传入最长等候时间
IsBackGround | 标记为后台进程后,即使他还在运行操作系统也运行终止这个进程。
Priority | 优先级,时间片优先分给高优先级线程
ThreadState | 查看线程状态
Sleep | 不要在生产代码使用,用于让当前线程进入睡眠,让操作系统至少多少时间内(肯定比这个时间长)不分配资源,相当于准备睡觉不干活了。但可以传0表示可以将时间片分给其他人,下次再叫我。
Abort | 在.NET Core中不可用,尝试中断线程,不保证成功,可能会发生不可预测的结果。
线程池处理
- 目的在于避免启动和销毁线程的巨大开销
- 开发者对线程池进行操作,只需要告诉线程池需要做什么,它会分配线程去工作,工作完成会回收线程,重复利用。(有点类似对象池)
- 但它只适用于工作时间较短的作业,并且要求该工作能及时结束,这样才方便回归池里进行循环。如果池子用尽则会导致其他工作延迟,毕竟资源是有限的。
- 还有一点是无法对线程进行管理,这样多个线程同步就比较麻烦。
异步任务
当前我们已知的多线程是非常麻烦的,不知道异步操作何时完成(不要用轮询或者阻塞等待的方法)、线程池的使用(线程池避免创建太多线程,毕竟上下文也很花时间)、要避免死锁、要为不同操作提供原子性并同步数据访问。好在.NET4.0的TPL配合C#5.0可以帮助我们很多。
从Thread到Task
需要看Task改进了什么,相比Thread优势。再到TPL的异步写法,Task怎么分配线程(什么时候线程池什么时候独立分配)、存在什么问题(异步延续、异常处理),怎么演化出async和await(方便将同步代码改成异步代码)。内容多且难,需要多看几次。
并行迭代
Parallel.For
Parallel.ForEach
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 using System.Collections.Concurrent; class Program { public const int TestTimes = 100_000_000; public static void Main() { System.Diagnostics.Stopwatch stopwatch = new System.Diagnostics.Stopwatch(); stopwatch.Start(); // CalculateNormal(); // CalculateParallelFor(); // CalculateParallelForEach(); CalculatePLINQ(); stopwatch.Stop(); System.Console.WriteLine($"cost time {stopwatch.ElapsedMilliseconds} ms"); } private static void CalculateNormal() { var results = new double[TestTimes]; for (int i = 0; i < results.Length; i++) { results[i] = Factorial(i % 20 + 1); } System.Console.WriteLine(results.Average()); } private static void CalculateParallelFor() { var dict = new ConcurrentDictionary<int, int>(); var results = new double[TestTimes]; System.Threading.Tasks.Parallel.For(0, results.Length, i => { results[i] = Factorial(i % 20 + 1); dict.AddOrUpdate(Environment.CurrentManagedThreadId, 1, (_, count) => count + 1); }); System.Console.WriteLine(results.AsParallel().Average()); System.Console.WriteLine($"{dict.Count} thread used"); foreach (var pair in dict) System.Console.WriteLine($"Thread {pair.Key} use {pair.Value} time"); } private static void CalculateParallelForEach() { var values = Enumerable.Range(0, TestTimes); var results = new double[TestTimes]; Parallel.ForEach(values, (value, _) => { results[value] = Factorial(value % 20 + 1); }); System.Console.WriteLine(results.AsParallel().Average()); } private static void CalculatePLINQ() { var values = Enumerable.Range(0, TestTimes); System.Console.WriteLine((values.AsParallel().Select(value => Factorial(value % 20 + 1))).Average()); } private static double Factorial(int n) { return Enumerable.Range(1, n).Aggregate((a, b) => a * b); } }
并行执行LINQ查询
很多LINQ操作都看起来"极度可并行"
在集合后面调用AsParallel()后再调用相关查询即可启用并行处理
通过AggregateException来捕获并行中出现的所有异常
PLINQ也像并行任务那样可以取消的