理解async/await语法糖

概念

什么是进程

进程(Process) 进程就是一个程序的执行实例,进程包括运行中的程序和程序所使用到的内存和系统资源。进程是资源分配的最小单位。每一个进程都有自己唯一的标识 pid,pid 就是进程号。注意,进程并不等于的程序,进程只是承载着程序运行。一个进程下可以有多个线程。

什么是线程

线程是操作系统能够进行运算调度的最小单位,是程序中的一个执行流,线程不占有内存空间,它包括在进程的内存空间中。一个进程至少有一个线程,这个线程也就是主线程,如果主线退出那么进程也就退出了。

每个线程都会有自己的一组 CPU 寄存器,也就是线程上下文。所以不同的线程可以执行同样的函数互不影响。

线程可以分为:

  • 内核线程:由操作系统内核创建和撤销。
  • 用户线程:不需要内核支持而在用户程序中实现的线程。

并发和并行

并行,指的就是在同一时刻,有两个或两个以上的任务(这里指进程)的代码在处理器上执行。多个处理器或多核处理器是并行执行的必要条件。

并发不是并行,并发关乎结构,并行关乎执行,并发是应用设计与实现阶段要考虑的问题。是逻辑层面的概念。而并行是操作系统实际执行时的问题。并发考虑的是如何将应用划分为多个互相配合的、可独立执行的模块的问题。 采用并发设计的程序并不一定是并行执行的。

为什么需要多线程

在多核 CPU 时代,如果是单线程应用会导致 CPU 资源利用不到,如果程序中有密集的 I/O 操作也会导致程序执行效率低下。

多线程主要是为了应对 I/O 密集型的应用(web 应用),可以提高 CPU 利用率,譬如一个线程发生长耗时的 I/O 时,会把该线程从 CPU 上调度下来,并把其他的线程调度上去,继续计算。

Thread

Thread 在2002年发布的 .Net1.0 中就存在的实现多线程方式,经过20年的迭代,Thread 已经慢慢被微软抛弃,现在官方推荐使用 Task 来实现多线程。但是要理解 .Net 多线程还是需要了解一下多线程的发展史。

通过 new Thread() 来创建一个新的线程, 它接受两个参数,第一个是两个类型的委托,ParameterizedThreadStartThreadStart,第二个是线程最大占用堆大小 maxStackSize。如果线程超过设置的阈值,会在 Start 后抛出 Stack overflow 异常。

ThreadStart 是一个无参无返回值的委托。ParameterizedThreadStart 是一个有参无返回值的委托。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ThreadStart 
var start = new ThreadStart(() =>
{
Console.WriteLine($"t1,线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
});
var t1 = new Thread(start,1);
t1.Start();

// ParameterizedThreadStart
var pstart = new ParameterizedThreadStart((s) =>
{
Console.WriteLine($"{s},线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
});

var t2 = new Thread(pstart);
t2.Start("t2");

线程的调度完全由 CLR 来调度的,上面的例子 t1 t2 输出的顺序并不能控制。

但是对于一些优先级比较高的线程可以设置线程的优先级。CLR 会尽可能的在短时间内调度该线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 默认为Normal
t2.Priority = ThreadPriority.Highest;

// ThreadPriority 定义
public enum ThreadPriority
{
/*=========================================================================
** Constants for thread priorities.
=========================================================================*/
Lowest = 0,
BelowNormal = 1,
Normal = 2,
AboveNormal = 3,
Highest = 4
}

如果一定要 t1 先执行,可以在 Start 后使用 Join 来等待 t1 执行完毕后在继续执行下面的代码。

1
2
t1.Join();// 会等待t1 执行完毕后在继续执行下面代码。
...

默认情况下显式创建的线程是前台线程。前台线程可以在整个程序退出前正常执行完毕,而后台线程在程序退出时并不会考虑是否执行完毕而直接关闭。

1
2
3
4
5
6
7
8
9
var pstart = new ParameterizedThreadStart((s) =>
{
Thread.Sleep(TimeSpan.FromSeconds(1));
Console.WriteLine($"{s},线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
});
var t2 = new Thread(pstart);
t2.IsBackground = true; // 设置后后台线程
t2.Start("t2");

在绝大数情况下,上面的代码将不会输出任务内容。因为 t2 是后台线程。主线程在退出的时候是不会检查后台线程是否执行完毕的。

通过 ThreadState 枚举来获取线程当前的状态,状态表如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 获取线程状态
var state= t2.ThreadState;

public enum ThreadState
{
Running = 0,
StopRequested = 1,
SuspendRequested = 2,
Background = 4,
Unstarted = 8,
Stopped = 16,
WaitSleepJoin = 32,
Suspended = 64,
AbortRequested = 128,
Aborted = 256
}
名称 描述
Running 线程正在正常运行
StopRequested 线程正在被请求停止,仅供程序内部使用。这个状态基本用不上
SuspendRequested 线程正在请求被挂起,但是还未响应
Background 线程正在以后台线程的方式运行
Unstarted 尚未在线程上调用 Start() 方法
Stopped 线程已停止
WaitSleepJoin 线程因为调用了 Wait(),Sleep() 或 Join() 等方法处于阻塞状态
Suspended 线程已挂起
AbortRequested 线程的 Thread.Abort() 方法已被调用,但是线程还没停止
Aborted 线程已停止

Thread等待

线程等待有两种模式,内核模式(Kernel Mode)和用户模式(User Model)。Thread 分别提供了来实现这个两个模式,Sleep()SpinWait()

Sleep() 是放弃当前线程的时间片,把时间片还给 CLR ,当到时间后会重新去竞争线程的时间切片,在这个过程中会发生线程的上下文切换。

SpinWait() 并没有放弃线程的时间片,而是让线程执行一些没有意义的运算来实现等待,也就是线程的自旋。这个过程中不会发生上下文切换。SpinWait() 只有在极少情况下是有用的,譬如很短暂的等待,可以减少上下文切换,但是大部分情况是不推荐使用的。 结构体 SpinWait 提供更加精细的自旋控制。

现在微软已经不在建议利用 Thread 的方式去创建使用线程,很多方法都已经过时,且使用上有太多的不便捷。

ThreadPool

ThreadPool 是一个线程池,目的是减少用程序创建线程,这些线程花费大量时间处于睡眠状态,等待事件发生,造成资源浪费。线程池的线程分为 workerThreads 工作线程,主要用于程序计算操作,completionPortThreads I/O 线程 主要用于 I/O 操作,譬如 mysql、redis 等。

.Net 6 开始 ThreadPool 所有的公开的 API 都被替换成 PortableThreadPool 实现。PortableThreadPool 是新的线程管理调度机制。源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// .Net 6 
public static bool SetMaxThreads(int workerThreads, int completionPortThreads)
{
if (UsePortableThreadPool)
{
return PortableThreadPool.ThreadPoolInstance.SetMaxThreads(workerThreads, completionPortThreads);
}

return
workerThreads >= 0 &&
completionPortThreads >= 0 &&
SetMaxThreadsNative(workerThreads, completionPortThreads);
}
// .net 5
public static bool SetMaxThreads(int workerThreads, int completionPortThreads)
{
return
workerThreads >= 0 &&
completionPortThreads >= 0 &&
SetMaxThreadsNative(workerThreads, completionPortThreads);
}

ThreadPool 在初始化的时候是没有线程的,ThreadPool 是以队列的方式去执行任务,内部有一个全局队列,和绑定在每个线程上的本地队列。本地队列会一直向全局队列领取任务。本队队列如果在全局队列中未取到任务,会有一个偷窃机制,简单来说就是会去别的线程本地队列中领取任务。

使用 ThreadPool.QueueUserWorkItem 函数让其任务进入队列排队, 其中 preferLocal 参数用来控制进入本地队列还是全局队列,true 首选当前线程的本地队列中对任务进行排队。false 默认进入全局队列 ,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 默认进入全局对面
ThreadPool.QueueUserWorkItem(_ =>
{
Console.WriteLine($"t1.线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
});

Task.Run(() =>
{
Console.WriteLine($"t2.线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
// 进入当前线程的本地队列
ThreadPool.QueueUserWorkItem(_ =>
{
Console.WriteLine($"t3.线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
},true);
});

输出:

1
2
3
t1.线程Id:9,是否线程池线程:True,是否后台线程:True
t2.线程Id:8,是否线程池线程:True,是否后台线程:True
t3.线程Id:8,是否线程池线程:True,是否后台线程:True

要不要手动设置线程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Console.WriteLine($"主线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}。线程启动时间:{DateTime.Now.ToString("ss.ff")}");

ThreadPool.GetMaxThreads(out var maxWorkerThreads, out var maxCompletionPortThreads);
Console.WriteLine($"最大工作线程:{maxWorkerThreads},线程池中异步 I/O 线程:{maxCompletionPortThreads}");

// 处理器是10 核。默认就是10个
ThreadPool.GetMinThreads(workerThreads: out var minWorkerThreads, out var minCompletionPortThreads);
Console.WriteLine($"最小工作线程:{minWorkerThreads},线程池中异步 I/O 线程:{minCompletionPortThreads}");


for (var i = 1; i <= 20; i++)
{
ThreadPool.QueueUserWorkItem(async m =>
{
Console.WriteLine($"线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground},执行时间:{DateTime.Now.ToString("ss.ff")}");
Thread.Sleep(TimeSpan.FromSeconds(3));
});
}
Console.ReadLine();
Console.WriteLine($"线程池线程数: {ThreadPool.ThreadCount}");

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
主线程Id:1,是否线程池线程:False,是否后台线程:False。线程启动时间:48.017
最大工作线程:32767,线程池中异步 I/O 线程:1000
最小工作线程:10,线程池中异步 I/O 线程:10
1)线程Id:5,是否线程池线程:True,是否后台线程:True,执行时间:48.526
2)线程Id:8,是否线程池线程:True,是否后台线程:True,执行时间:48.526
3)线程Id:11,是否线程池线程:True,是否后台线程:True,执行时间:48.527
4)线程Id:9,是否线程池线程:True,是否后台线程:True,执行时间:48.527
5)线程Id:14,是否线程池线程:True,是否后台线程:True,执行时间:48.525
6)线程Id:13,是否线程池线程:True,是否后台线程:True,执行时间:48.525
7)线程Id:7,是否线程池线程:True,是否后台线程:True,执行时间:48.525
8)线程Id:10,是否线程池线程:True,是否后台线程:True,执行时间:48.528
9)线程Id:15,是否线程池线程:True,是否后台线程:True,执行时间:48.525
10)线程Id:12,是否线程池线程:True,是否后台线程:True,执行时间:48.525

11)线程Id:16,是否线程池线程:True,是否后台线程:True,执行时间:49.002
12)线程Id:17,是否线程池线程:True,是否后台线程:True,执行时间:50.000
13)线程Id:18,是否线程池线程:True,是否后台线程:True,执行时间:51.004

14)线程Id:10,是否线程池线程:True,是否后台线程:True,执行时间:51.533
15)线程Id:15,是否线程池线程:True,是否后台线程:True,执行时间:51.533
16)线程Id:7,是否线程池线程:True,是否后台线程:True,执行时间:51.533
17)线程Id:5,是否线程池线程:True,是否后台线程:True,执行时间:51.533
18)线程Id:8,是否线程池线程:True,是否后台线程:True,执行时间:51.533
19)线程Id:13,是否线程池线程:True,是否后台线程:True,执行时间:51.533
20)线程Id:14,是否线程池线程:True,是否后台线程:True,执行时间:51.533

线程池线程数: 13

最大工作线程默认值在32系统中默认最大工作线程是 1023,I/O 线程是 25,在64系统中默认最大工作线程是 32767,I/O 线程是 1000。

默认情况下最小线程数是宿主机的 CPU 核心数(虚拟核), 开发中非必要不要把最小值调到很大,可能会导致性能问题,因为在同一时间开始的任务过多,会导致所有的任务都会很慢。同样把最小值设置成小于核心数也会对性能有影响。

因为我的 CPU 是10核10线程。所以前10条输出几乎是并行执行的,从 11 执行开始因为前 10 条线程没返回给线程池,且线程池的线程数已经达到最小值,新的线程创建明显有一个滞后,因为新的任务会等待一段时间看是否有空闲线程,如果在等待后依旧没有空闲的线就会创建一个新的线程。14到20也并行执行,因为线程池已经有足够的线程去领取执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
// 设置最大工作线程
ThreadPool.SetMaxThreads(10, 10);
for (var i = 1; i <= 20; i++)
{
ThreadPool.QueueUserWorkItem(async m =>
{
Console.WriteLine($"线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground},执行时间:{DateTime.Now.ToString("ss.fff")}");
});
}
Console.ReadLine();
Console.WriteLine($"线程池线程数: {ThreadPool.ThreadCount}");

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
线程Id:9,是否线程池线程:True,是否后台线程:True,执行时间:25.805
线程Id:5,是否线程池线程:True,是否后台线程:True,执行时间:25.805
线程Id:10,是否线程池线程:True,是否后台线程:True,执行时间:25.805
线程Id:8,是否线程池线程:True,是否后台线程:True,执行时间:25.806
线程Id:7,是否线程池线程:True,是否后台线程:True,执行时间:25.806
线程Id:11,是否线程池线程:True,是否后台线程:True,执行时间:25.805
线程Id:13,是否线程池线程:True,是否后台线程:True,执行时间:25.805
线程Id:12,是否线程池线程:True,是否后台线程:True,执行时间:25.806
线程Id:14,是否线程池线程:True,是否后台线程:True,执行时间:25.857
线程Id:15,是否线程池线程:True,是否后台线程:True,执行时间:25.857
//下面线程都是复用的,所以也基本是并行执行的
线程Id:5,是否线程池线程:True,是否后台线程:True,执行时间:25.864
线程Id:12,是否线程池线程:True,是否后台线程:True,执行时间:25.864
线程Id:8,是否线程池线程:True,是否后台线程:True,执行时间:25.864
线程Id:12,是否线程池线程:True,是否后台线程:True,执行时间:25.864
线程Id:9,是否线程池线程:True,是否后台线程:True,执行时间:25.864
线程Id:15,是否线程池线程:True,是否后台线程:True,执行时间:25.864
线程Id:7,是否线程池线程:True,是否后台线程:True,执行时间:25.865
线程Id:14,是否线程池线程:True,是否后台线程:True,执行时间:25.865
线程Id:12,是否线程池线程:True,是否后台线程:True,执行时间:25.865
线程Id:10,是否线程池线程:True,是否后台线程:True,执行时间:25.866

线程池线程数: 10

最大线程数是 10,20 次任务分两次在几乎同一时刻执行。

最大线程数不能小于 CPU 核心数,10 核的 CPU 设置成1是没用的。也不能把最大线程数设置成比最小线程数小。不建议调整最大值。

ThreadPool不足

ThreadPool 虽然可以很好的管理线程的创建销毁。但是使用中依旧存在一些交互上的不便。

  • 自身不支持线程退出,可以通过 CancellationTokenSource
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var token = new CancellationTokenSource();
ThreadPool.QueueUserWorkItem(async m =>
{
while (!token.IsCancellationRequested)
{
Console.WriteLine("线程正在工作");
}
Console.WriteLine("线程被要求退出");
});
ThreadPool.QueueUserWorkItem(async m =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(100));
// 发出取消信号
token.Cancel();
});

Console.ReadKey();
  • 自身不支持 join 类似的等待操作,可以通过 ManualResetEvent 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
var signal = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(p =>
{
for (var i = 0; i < 10; i++)
{
Console.WriteLine(i);
}

// 发出信号已经完成。
signal.Set();
});
// 等待接受完成信号
signal.WaitOne();
Console.WriteLine("end");
  • 依旧不支持委托返回值。

Task

Task 全称 Task Parallel Library (TPL).NET 4.0 引进的并行库,Task 相对于 ThreadThreadPool 提供了丰富的 API,自身支持延续、取消、失败通知,并且返回一个 Task<TResult> 对象。 Task<TResult> 是一个处理基本的底层细节并提供了在任务生命周期内可由调用线程访问的方法和属性的对象。

Task 调度由 TaskScheduler 来负责的,TaskScheduler 提供两个类型 :

  • ThreadPoolTaskScheduler:会将所有的任务调度给 ThreadPool 线程池来执行,Task 默认这个机制。
  • FromCurrentSynchronizationContext:WPF 等GUI 应用程序专用,会把所有的任务都调度给主线程也就是UI线程去执行,所以不会使用线程池,
1
2
3
4
5
6
7
8
public static TaskScheduler Default => s_defaultTaskScheduler;
private static readonly TaskScheduler s_defaultTaskScheduler = new ThreadPoolTaskScheduler();

public static TaskScheduler FromCurrentSynchronizationContext()
{
return new SynchronizationContextTaskScheduler();
}

Task创建任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Task.Run(() =>
{
Console.WriteLine($"t1 线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
});

Task.Factory.StartNew(() =>
{
Console.WriteLine($"t2 线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
},TaskCreationOptions.LongRunning);

var t = new Task(() =>
{
Console.WriteLine($"t3 线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
});
t.Start(TaskScheduler.Default);

当不需要对任务调度进行控制时推荐使用 Task.RunFactory.StartNew 适用于长时间运行的任务,或者需要精细控制调度时使用。

TaskScheduler说明

  • None 默认,完全由 ThreadPool 来调度。

  • PreferFairness:以一种尽可能公平的方式安排任务,这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。

  • LongRunning 指定任务将是长时间运行的,调度器不会委托 Threadpool来的调度,而是创建一个新的线程去执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[SecurityCritical]
protected internal override void QueueTask(Task task)
{
if ((task.Options & TaskCreationOptions.LongRunning) != 0)
{
// Run LongRunning tasks on their own dedicated thread.
Thread thread = new Thread(s_longRunningThreadWork);
thread.IsBackground = true; // Keep this thread from blocking process shutdown
thread.Start(task);
}
else
{
// Normal handling for non-LongRunning tasks.
bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
ThreadPool.UnsafeQueueCustomWorkItem(task, forceToGlobalQueue);
}
}
  • AttachedToParent 将任务附加到任务层次中某个父级,默认情况下层级的任务是独立的,可以通过附加的方式使父子任务同步。
1
2
3
4
5
6
7
8
9
10
var t1 = Task.Factory.StartNew(() =>
{
Task.Factory.StartNew(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(3));
Console.WriteLine("t2 线程执行");
}, TaskCreationOptions.AttachedToParent);
});
// t1 会等待自己的子节点任务t2执行完毕后在继续。
t1.Wait();
  • DenyChildAttach:当前任务节点不允许子任务附加。Task.Run 创建的任务默认使用,所以无法附加子项。
1
2
3
4
5
6
7
8
9
var t1 = Task.Factory.StartNew(() =>
{
Task.Factory.StartNew(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(3));
Console.WriteLine("t2 线程执行");
}, TaskCreationOptions.AttachedToParent); // 这里无效附加
},TaskCreationOptions.DenyChildAttach);// 父节点不允许附加
t1.Wait();
  • HideScheduler 在task里面在创建任务将默认使用的TaskScheduler,基本用不上。

  • RunContinuationsAsynchronously 默认情况下任务的延续是不是一个线程由线程池的调度来决定, 可以手动强制延续任务以异步的方式执行。

1
2
3
4
5
6
7
8
Task.Factory.StartNew(() =>
{
Console.WriteLine($"t1线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread},是否后台线程:{Thread.CurrentThread.IsBackground}");
return 1;
}, TaskCreationOptions.RunContinuationsAsynchronously).ContinueWith(m =>
{
Console.WriteLine($"t1延续线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread},是否后台线程:{Thread.CurrentThread.IsBackground}");
});

输出:

1
2
t1线程Id:8,是否线程池线程:True,是否后台线程:True
t1延续线程Id:1,是否线程池线程:True,是否后台线程:True

但是不推荐使用 TaskCreationOptions 来控制,推荐使用 TaskContinuationOptions 来控制延续是否异步。

  • ExecuteSynchronously 强制使用同一线程;
  • RunContinuationsAsynchronously强制异步使用不同线程;

默认 RunContinuationsAsynchronously 优先级比ExecuteSynchronously 高。

1
2
3
4
5
6
7
8
Task.Factory.StartNew(() =>
{
Console.WriteLine($"t1,线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread},是否后台线程:{Thread.CurrentThread.IsBackground}");
return 1;
}).ContinueWith(m =>
{
Console.WriteLine($"t1延续线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread},是否后台线程:{Thread.CurrentThread.IsBackground}");
},TaskContinuationOptions.ExecuteSynchronously);

输出:

1
2
t1线程Id:8,是否线程池线程:True,是否后台线程:True
t1延续线程Id:8,是否线程池线程:True,是否后台线程:True

Task延续

通过 ContinueWith 延续任务

1
2
3
4
5
6
7
8
9
10
11
12
Task.Factory.StartNew(delegate
{
Console.WriteLine($"t1 程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
Thread.Sleep(TimeSpan.FromSeconds(5));
return 1;
}).ContinueWith(delegate(Task<int> m)
{
Console.WriteLine($"t2 程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground},{m.Result}");
});

输出:

1
2
t1 程Id:5,是否线程池线程:True,是否后台线程:True
t2 程Id:5,是否线程池线程:True,是否后台线程:True

延续的线程可能会和任务本身是同一线程,这个取决于线程池的调度。

通过 GetAwaiter 延续任务

1
2
3
4
5
6
7
8
9
10
11
12
var t1 = Task.Run(delegate
{
Console.WriteLine($"t1.线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
return 25;
}).GetAwaiter();

t1.OnCompleted(delegate(Task<int> m)
{
Console.WriteLine($"t2.线程Id:{Thread.CurrentThread.ManagedThreadId},是否线程池线程:{Thread.CurrentThread.IsThreadPoolThread}," +
$"是否后台线程:{Thread.CurrentThread.IsBackground}");
});

没有发现有 await/async 的味道了。

Task等待

Task 提供四种方式等待任务完成,都支持超时,间隔,取消操作。

名称 描述
Task.Wait 等待某个任务完成
Task.WaitAll 等待给定的一组任务全部完成
Task.WaitAny 等待给定的一组任务其中一个完成
Task.WaitAsync .Net6 新增的方法。等待某个任务完成,6以后的版本推荐使用此方法

用法都大致相同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var ts = new CancellationTokenSource();
var t1 = Task.Run(() => { });
t1.Wait();
// 指定毫秒内完成
t1.Wait(1000);
// 指定时间
t1.Wait(TimeSpan.FromSeconds(10));
// 活动取消
t1.Wait(ts.Token);
ts.Cancel();

var t2 = Task.Run(() => { });
var t3 = Task.Run(() => { });
// 等待t2或者t2其中一个执行完成
Task.WaitAny(t2, t3);

//等待 t2 和 t3都执行完成
Task.WaitAll(t2, t3);

// 指定时间内完成
t1.WaitAsync(TimeSpan.FromSeconds(10));

async await

async await.Net4.5 推出实现异步任务的语法。async await 其实就是GetAwaiter封装的语法糖。配合Task使用可以非常优雅的写异步操作代码,它本身并不会去创建一个新线程,线程的调度还是交给线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
namespace multithreading;

internal class Program
{
static async Task Main(string[] args)
{
Console.WriteLine(await t1());
}

static async Task<int> t1()
{
return await Task.Run(delegate { return 11; });
}
}

为什么说async await 其实就是GetAwaiter封装的语法糖呢?可以通过反编译软件看看上面代码编译之后是什么内容。

先看看反编译后的目录结构

看看 <Main>(string[]):void<Main>(string[]):Task 反编译后的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// <Main>(string[]):void
[SpecialName]
private static void <Main>(string[] args)
{
// 执行异步的 main
TaskAwaiter awaiter = Main(args).GetAwaiter();
((TaskAwaiter)(ref awaiter)).GetResult();
}

//<Main>(string[]):Task
[AsyncStateMachine(typeof(<Main>d__0))]
private static System.Threading.Tasks.Task Main(string[] args)
{
<Main>d__0 <Main>d__ = default(<Main>d__0);
// 初始化状态机
<Main>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
// 设置-1状态 (未完成)
<Main>d__.<>1__state = -1;
// 启动状态机调用 MoveNext()
((AsyncTaskMethodBuilder)(ref <Main>d__.<>t__builder)).Start<<Main>d__0>(ref <Main>d__);
return ((AsyncTaskMethodBuilder)(ref <Main>d__.<>t__builder)).get_Task();
}

是不是一目了然,在 <Main>(string[]):void中,因为 Main 是一个 async 入口,所以会在入口处调用Main(args).GetAwaiter()得到返回 TaskAwaiter 对象,并调用 TaskAwaiter.GetResult() 等待任务执行结束。

编译器会为每个标识 async 的方法打上 AsyncStateMachine(异步状态机)特性,根据该特性会生成一个以方法名命名的类,Main 生成的类名是<Main>d__0,该类会继承 IAsyncStateMachine 接口,并实现里面最重要的 MoveNext 方法。 除此之外 <Main>(string[]):Task 还会初始一个状态机,并启动状态机调用 MoveNext

<Main>d__0 反编译后 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
[StructLayout(3)]
[CompilerGenerated]
private struct <Main>d__0 : IAsyncStateMachine
{
public int <>1__state;

public AsyncTaskMethodBuilder <>t__builder;

private TaskAwaiter<int> <>u__1;

private void MoveNext()
{

int num = <>1__state;
try
{
TaskAwaiter<int> awaiter;
if (num != 0)
{
awaiter = t1().GetAwaiter();
if (!awaiter.get_IsCompleted())
{
num = (<>1__state = 0);
<>u__1 = awaiter;
((AsyncTaskMethodBuilder)(ref <>t__builder)).AwaitUnsafeOnCompleted<TaskAwaiter<int>, <Main>d__0>(ref awaiter, ref this);
return;
}
}
else
{
// 重新获取异步task对象
awaiter = <>u__1;
<>u__1 = default(TaskAwaiter<int>);
num = (<>1__state = -1);
}
Console.WriteLine(awaiter.GetResult());
}
catch (System.Exception exception)
{
<>1__state = -2;
((AsyncTaskMethodBuilder)(ref <>t__builder)).SetException(exception);
return;
}
// 设置状态机 转态 -2(已完成)
<>1__state = -2;
((AsyncTaskMethodBuilder)(ref <>t__builder)).SetResult();
}

[DebuggerHidden]
private void SetStateMachine(IAsyncStateMachine stateMachine)
{
((AsyncTaskMethodBuilder)(ref <>t__builder)).SetStateMachine(stateMachine);
}
}

首先会在 MoveNext 中获取状态机的状态,判断状态机状态不等于 0(执行状态),调用 t1().GetAwaiter() 获取 TaskAwaiter 对象,判断 t1 线程是否完成,未完成修改状态机为 0(执行状态),并且调用AsyncTaskMethodBuilder.AsyncVoidMethodBuilder方法,简单来说就是该方法会注册一个回调函数,当任务完成后会再次调用 MoveNext 方法。也就是说 MoveNext 会被调用两次,一直是开始任务,一次是等任务结束后拿结果。

状态机的状态 -1(未执行) 和 -2(已结束) 其他的状态均为执行状态。

t1()<t1>d__1 反编译后的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
[AsyncStateMachine(typeof(<t1>d__1))]
private static System.Threading.Tasks.Task<int> t1()
{
<t1>d__1 <t1>d__ = default(<t1>d__1);
// 创建状态机
<t1>d__.<>t__builder = AsyncTaskMethodBuilder<int>.Create();
// 状态初始化-1
<t1>d__.<>1__state = -1;
// 调用<t1>d__1>.MoveNext()
<t1>d__.<>t__builder.Start<<t1>d__1>(ref <t1>d__);
return <t1>d__.<>t__builder.get_Task();
}

[StructLayout(3)]
[CompilerGenerated]
private struct <t1>d__1 : IAsyncStateMachine
{
public int <>1__state;

public AsyncTaskMethodBuilder<int> <>t__builder;

private TaskAwaiter<int> <>u__1;

private void MoveNext()
{
int num = <>1__state;
int result;
try
{
TaskAwaiter<int> awaiter;
if (num != 0)
{
// task.去执行任务。
awaiter = System.Threading.Tasks.Task.Run<int>((Func<int>)(() => 11)).GetAwaiter();
if (!awaiter.get_IsCompleted())
{
num = (<>1__state = 0);
<>u__1 = awaiter;
<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter<int>, <t1>d__1>(ref awaiter, ref this);
return;
}
}
else
{
awaiter = <>u__1;
<>u__1 = default(TaskAwaiter<int>);
num = (<>1__state = -1);
}
result = awaiter.GetResult();
}
catch (System.Exception exception)
{
<>1__state = -2;
<>t__builder.SetException(exception);
return;
}
<>1__state = -2;
<>t__builder.SetResult(result);
}

[DebuggerHidden]
private void SetStateMachine(IAsyncStateMachine stateMachine)
{
<>t__builder.SetStateMachine(stateMachine);
}
}

Main 反编译后的结果都是大同小异,不同的 Main 里面是调用的 t1().GetAwaiter() 而这里是 Task.Run 去执行给定的委托。

Main 大致执行流程图:

使用中常见问题

在异步函数里面调用同步函数。

1
2
3
4
5
6
7
8
9
10
11
12
internal class Program
{
static async Task Main(string[] args)
{
Console.WriteLine(t1());
}

static int t1()
{
return 1;
}
}

反编译后结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[StructLayout(3)]
[CompilerGenerated]
private struct <Main>d__0 : IAsyncStateMachine
{
public int <>1__state;

public AsyncTaskMethodBuilder <>t__builder;

private void MoveNext()
{
try
{
Console.WriteLine(t1());
}
catch (System.Exception exception)
{
<>1__state = -2;
((AsyncTaskMethodBuilder)(ref <>t__builder)).SetException(exception);
return;
}
<>1__state = -2;
((AsyncTaskMethodBuilder)(ref <>t__builder)).SetResult();
}

[DebuggerHidden]
private void SetStateMachine(IAsyncStateMachine stateMachine)
{
((AsyncTaskMethodBuilder)(ref <>t__builder)).SetStateMachine(stateMachine);
}
}

可以看到在 async 中调用同步方法依旧会生成对应的类,但是并没有使用 Task 来执行,所以在异步函数里面只调用同步函数是有一定的性能损失的。

但是如果又有异步函数和同步函数呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
internal class Program
{
static async Task Main(string[] args)
{
t1();
await t2();
await Person.t3();
await t4();
}

static int t1()
{
return 1;
}

static async Task<int> t2()
{
return await Task.Run(delegate { return 1; });
}

static async Task<int> t4()
{
return await Task.Run(delegate { return 1; });
}
}

public static class Person
{
public static async Task<int> t3()
{
return await Task.Run(delegate { return 1; });
}
}

反编译后结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
[StructLayout(3)]
[CompilerGenerated]
private struct <Main>d__0 : IAsyncStateMachine
{
public int <>1__state;

public AsyncTaskMethodBuilder <>t__builder;

private TaskAwaiter<int> <>u__1;

private void MoveNext()
{
int num = <>1__state;
try
{
TaskAwaiter<int> awaiter;
switch (num)
{
default:
t1();
awaiter = t2().GetAwaiter();
if (!awaiter.get_IsCompleted())
{
//t2执行状态机状态=0
num = (<>1__state = 0);
<>u__1 = awaiter;
((AsyncTaskMethodBuilder)(ref <>t__builder)).AwaitUnsafeOnCompleted<TaskAwaiter<int>, <Main>d__0>(ref awaiter, ref this);
return;
}
// 如果t2 get_IsCompleted==true 直接执行t3(),一般是不会走到这里的。
goto IL_0072;
case 0:
awaiter = <>u__1;
<>u__1 = default(TaskAwaiter<int>);
num = (<>1__state = -1);
// t2()执行结束后执行t3()
goto IL_0072;
case 1:
awaiter = <>u__1;
<>u__1 = default(TaskAwaiter<int>);
num = (<>1__state = -1);
// t3()执行结束后执行t4
goto IL_00cd;
case 2:
{
awaiter = <>u__1;
<>u__1 = default(TaskAwaiter<int>);
num = (<>1__state = -1);
break;
}

IL_00cd:
awaiter.GetResult();
awaiter = t4().GetAwaiter();
if (!awaiter.get_IsCompleted())
{
// t4执行状态机状态=2
num = (<>1__state = 2);
<>u__1 = awaiter;
((AsyncTaskMethodBuilder)(ref <>t__builder)).AwaitUnsafeOnCompleted<TaskAwaiter<int>, <Main>d__0>(ref awaiter, ref this);
return;
}
break;

IL_0072:
awaiter.GetResult();
awaiter = Person.t3().GetAwaiter();
if (!awaiter.get_IsCompleted())
{
// t3执行状态机状态=1
num = (<>1__state = 1);
<>u__1 = awaiter;
((AsyncTaskMethodBuilder)(ref <>t__builder)).AwaitUnsafeOnCompleted<TaskAwaiter<int>, <Main>d__0>(ref awaiter, ref this);
return;
}
goto IL_00cd;
}
awaiter.GetResult();
}
catch (System.Exception exception)
{
<>1__state = -2;
((AsyncTaskMethodBuilder)(ref <>t__builder)).SetException(exception);
return;
}
<>1__state = -2;
((AsyncTaskMethodBuilder)(ref <>t__builder)).SetResult();
}

}

状态机为每个任务执行都给定了唯一的状态,通过分支判断和 goto 控制每个任务执行,大致执行流程:

  1. switch 进来状态机的状态为 -1 执行 default 分支,因为 t1() 是同步函数所以直接调用并不会更新状态机状态。
  2. 获取 t2() awaiter 对象,判断是否执行结束,未结束进入内部逻辑,更新状态机状态为 0 。且注册回调。
  3. t2 () 回调,状态机状态为 0 ,进入 case 0 分支,更新状态机状态 -1,然后 gotoIL_0072
  4. 在执行 t3() 前调用 t2().GetResult(),之后重复 2 3 4流程。直到最后一个任务执行完成后修改状态机状态成 -2。

使用场景

await/async 可以在开发中可以快速的写出健壮易读读的异步函数,但是await/async并不是万能的,有很多人认为await/async可以提高服务的响应速度,其实await/async并不能提高,await/async的优势在于充分利用 CPU 性能,让所有线程都忙起来,提高整体的服务吞吐量。

await/async 比较适用于一些耗时长的 I/O 操作,比如 mysqlredishttpclient 等网络操作。CPU 密集操作并不适用于 await/async

参考文章

总结

简单整理一下了 await/async 的来龙去脉。文中如果有错误请在评论区指出。谢谢大佬。


理解async/await语法糖
http://example.com/posts/33833.html
作者
她微笑的脸y
发布于
2022年7月20日
许可协议