前言
在 第四讲 我们介绍了 main goroutine 是如何运行的。其中针对 main goroutine 介绍了调度函数 schedule 是怎么工作的,对于整个调度器的调度策略并没有介绍,这点是不完整的,这一讲会完善调度器的调度策略部分。
调度时间点
runtime.schedule 实现了调度器的调度策略。那么对于调度时间点,查看哪些函数调用的 runtime.schedule 即可顺藤摸瓜理出调度器的调度时间点,如下图:

调度时间点不是本讲的重点,这里有兴趣的同学可以顺藤摸瓜,摸摸触发调度时间点的路径,这里就跳过了。
调度策略
调度策略才是我们的重点,进到 runtime.schedule:
1// One round of scheduler: find a runnable goroutine and execute it.
2// Never returns.
3func schedule() {
4 mp := getg().m // 获取当前执行线程
5
6top:
7 pp := mp.p.ptr() // 获取执行线程绑定的 P
8 pp.preempt = false
9
10 // Safety check: if we are spinning, the run queue should be empty.
11 // Check this before calling checkTimers, as that might call
12 // goready to put a ready goroutine on the local run queue.
13 if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
14 throw("schedule: spinning with local work")
15 }
16
17 gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
18
19 ...
20 execute(gp, inheritTime) // 执行找到的 goroutine
21}
runtime.schedule 的重点在 findRunnable()。findRunnable() 函数很长,为避免影响可读性,这里对大部分流程做了注释,后面在有重点的加以介绍。进入 findRunnable():
1// Finds a runnable goroutine to execute.
2// Tries to steal from other P's, get g from local or global queue, poll network.
3// tryWakeP indicates that the returned goroutine is not normal (GC worker, trace
4// reader) so the caller should try to wake a P.
5func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
6 mp := getg().m // 获取当前执行线程
7
8top:
9 pp := mp.p.ptr() // 获取线程绑定的 P
10 ...
11
12 // Check the global runnable queue once in a while to ensure fairness.
13 // Otherwise two goroutines can completely occupy the local runqueue
14 // by constantly respawning each other.
15 if pp.schedtick%61 == 0 && sched.runqsize > 0 {
16 lock(&sched.lock)
17 gp := globrunqget(pp, 1)
18 unlock(&sched.lock)
19 if gp != nil {
20 return gp, false, false
21 }
22 }
23
24 // local runq
25 if gp, inheritTime := runqget(pp); gp != nil { // 从 P 的本地队列中获取 goroutine
26 return gp, inheritTime, false
27 }
28
29 // global runq
30 if sched.runqsize != 0 { // 如果本地队列获取不到就判断全局队列中有无 goroutine
31 lock(&sched.lock) // 如果有的话,为全局变量加锁
32 gp := globrunqget(pp, 0) // 从全局队列中拿 goroutine
33 unlock(&sched.lock) // 为全局变量解锁
34 if gp != nil {
35 return gp, false, false
36 }
37 }
38
39 // 如果全局队列中没有 goroutine 则从 network poller 中取 goroutine
40 if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
41 ...
42 }
43
44 // 如果 network poller 中也没有 goroutine,那么尝试从其它 P 中偷 goroutine
45 // Spinning Ms: steal work from other Ps.
46 //
47 // Limit the number of spinning Ms to half the number of busy Ps.
48 // This is necessary to prevent excessive CPU consumption when
49 // GOMAXPROCS>>1 but the program parallelism is low.
50 // 如果下面两个条件至少有一个满足,则进入偷 goroutine 逻辑
51 // 条件 1: 当前线程是 spinning 自旋状态
52 // 条件 2: 当前活跃的 P 要远大于自旋的线程,说明需要线程去分担活跃线程的压力,不要睡觉了
53 if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
54 if !mp.spinning { // 因为是两个条件至少满足一个即可,这里首先判断当前线程是不是自旋状态
55 mp.becomeSpinning() // 如果不是,更新线程的状态为自旋状态
56 }
57
58 gp, inheritTime, tnow, w, newWork := stealWork(now) // 偷 goroutine
59 if gp != nil {
60 // Successfully stole.
61 return gp, inheritTime, false // 如果 gp 不等于 nil,表示偷到了,返回偷到的 goroutine
62 }
63 if newWork {
64 // There may be new timer or GC work; restart to
65 // discover.
66 goto top // 如果 gp 不等于 nil,且 network 为 true,则跳到 top 标签重新找 goroutine
67 }
68
69 now = tnow
70 if w != 0 && (pollUntil == 0 || w < pollUntil) {
71 // Earlier timer to wait for.
72 pollUntil = w
73 }
74 }
75
76 ...
77 if sched.runqsize != 0 { // 偷都没偷到,还要在找一遍全局队列,防止偷的过程中,全局队列又有 goroutine 了
78 gp := globrunqget(pp, 0)
79 unlock(&sched.lock)
80 return gp, false, false
81 }
82
83 if !mp.spinning && sched.needspinning.Load() == 1 { // 在判断一遍,如果 mp 不是自旋状态,且 sched.needspinning == 1 则更新 mp 为自旋,调用 top 重新找一遍 goroutine
84 // See "Delicate dance" comment below.
85 mp.becomeSpinning()
86 unlock(&sched.lock)
87 goto top
88 }
89
90 // 实在找不到 goroutine,表明当前线程多, goroutine 少,准备挂起线程
91 // 首先,调用 releasep 取消线程和 P 的绑定
92 if releasep() != pp {
93 throw("findrunnable: wrong p")
94 }
95
96 ...
97 now = pidleput(pp, now) // 将解绑的 P 放到全局空闲队列中
98 unlock(&sched.lock)
99
100 wasSpinning := mp.spinning // 到这里 mp.spinning == true,线程处于自旋状态
101 if mp.spinning {
102 mp.spinning = false // 设置 mp.spinning = false,这是要准备休眠了
103 if sched.nmspinning.Add(-1) < 0 { // 将全局变量的自旋线程数减 1,因为当前线程准备休眠,不偷 goroutine 了
104 throw("findrunnable: negative nmspinning")
105 }
106 ...
107 }
108 stopm() // 线程休眠,直到唤醒
109 goto top // 能执行到这里,说明线程已经被唤醒了,继续找一遍 goroutine
110}
看完线程的调度策略我都要被感动到了,何其的敬业,穷尽一切方式去找活干,找不到活,休眠之前还要在找一遍,真的是劳模啊。
大致流程是比较清楚的,我们把其中一些值得深挖的部分在单拎出来。
首先,从本地队列中找 goroutine,如果找不到则进入全局队列找,这里如果看 gp := globrunqget(pp, 0) 可能会觉得疑惑,从全局队列中拿 goroutine 为什么要把 P 传进去,我们看这个函数在做什么:
1// Try get a batch of G's from the global runnable queue.
2// sched.lock must be held. // 注释说的挺清晰了,把全局队列的 goroutine 放到 P 的本地队列
3func globrunqget(pp *p, max int32) *g {
4 assertLockHeld(&sched.lock)
5
6 if sched.runqsize == 0 {
7 return nil
8 }
9
10 n := sched.runqsize/gomaxprocs + 1 // 全局队列是线程共享的,这里要除 gomaxprocs 平摊到每个线程绑定的 P
11 if n > sched.runqsize {
12 n = sched.runqsize // 执行到这里,说明 gomaxprocs == 1
13 }
14 if max > 0 && n > max {
15 n = max
16 }
17 if n > int32(len(pp.runq))/2 {
18 n = int32(len(pp.runq)) / 2 // 如果 n 比本地队列长度的一半要长,则 n == len(P.runq)/2
19 }
20
21 sched.runqsize -= n // 全局队列长度减 n,准备从全局队列中拿 n 个 goroutine 到 P 中
22
23 gp := sched.runq.pop() // 把全局队列队头的 goroutine 拿出来,这个 goroutine 是要返回的 goroutine
24 n-- // 拿出了一个队头的 goroutine,这里 n 要减 1
25 for ; n > 0; n-- {
26 gp1 := sched.runq.pop() // 循环拿全局队列中的 goroutine 出来
27 runqput(pp, gp1, false) // 将拿出的 goroutine 放到全局队列中
28 }
29 return gp
30}
调用 globrunqget 说明本地队列没有 goroutine 要从全局队列拿,那么就可以把全局队列中的 goroutine 放到 P 中,提高了全局队列 goroutine 的优先级。
如果全局队列也没找到 goroutine,在从 network poller 找,如果 network poller 也没找到,则准备进入自旋,从别的线程的 P 那里偷活干。我们看线程是怎么偷活的:
1// stealWork attempts to steal a runnable goroutine or timer from any P.
2//
3// If newWork is true, new work may have been readied.
4//
5// If now is not 0 it is the current time. stealWork returns the passed time or
6// the current time if now was passed as 0.
7func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
8 pp := getg().m.p.ptr() // pp 是当前线程绑定的 P
9
10 ranTimer := false
11
12 const stealTries = 4 // 线程偷四次,每次都要随机循环一遍所有 P
13 for i := 0; i < stealTries; i++ {
14 stealTimersOrRunNextG := i == stealTries-1
15
16 for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { // 为保证偷的随机性,随机开始偷 P。随机开始,后面每个 P 都可以轮到
17 ...
18 p2 := allp[enum.position()] // 从 allp 中获取 P
19 if pp == p2 {
20 continue // 如果获取的是当前线程绑定的 P,则继续循环下一个 P
21 }
22 ...
23 // Don't bother to attempt to steal if p2 is idle.
24 if !idlepMask.read(enum.position()) { // 判断拿到的 P 是不是 idle 状态,如果是,表明 P 还没有 goroutine,跳过它,偷下一家
25 if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil { // P 不是 idle,调用 runqsteal 偷它!
26 return gp, false, now, pollUntil, ranTimer
27 }
28 }
29 }
30 }
31
32 // No goroutines found to steal. Regardless, running a timer may have
33 // made some goroutine ready that we missed. Indicate the next timer to
34 // wait for.
35 return nil, false, now, pollUntil, ranTimer
36}
线程随机的偷一个可偷的 P,偷 P 的实现在 runqsteal,查看 runqsteal 怎么偷的:
1// Steal half of elements from local runnable queue of p2
2// and put onto local runnable queue of p.
3// Returns one of the stolen elements (or nil if failed). // 给宝宝饿坏了,直接偷一半的 goroutine 啊,够狠的!
4func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
5 t := pp.runqtail // t 指向当前 P 本地队列的队尾
6 n := runqgrab(p2, &pp.runq, t, stealRunNextG) // runqgrab 把 P2 本地队列的一半 goroutine 拿到 P 的 runq 队列中
7 if n == 0 {
8 return nil
9 }
10 n--
11 gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr() // 把偷到的本地队列队尾的 goroutine 拿出来
12 if n == 0 {
13 return gp // 如果只偷到了这一个,则直接返回。有总比没有好
14 }
15 h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with consumers
16 if t-h+n >= uint32(len(pp.runq)) {
17 throw("runqsteal: runq overflow") // 如果 t-h+n >= len(p.runq) 表示偷多了...
18 }
19 atomic.StoreRel(&pp.runqtail, t+n) // 更新 P 的本地队列的队尾
20 return gp
21}
这个偷就是把“地主家”(P2)的余粮 (goroutine) 给它抢一半过来,没办法我也要吃饭啊。
如果连偷都没偷到(好吧,太惨了点…),那就准备休眠了,不干活了还不行嘛。不干活之前在去看看全局队列有没有 goroutine 了(口是心非的 M 人)。还是没活,好吧,准备休眠了。
准备休眠,首先解除和 P 的绑定:
1func releasep() *p {
2 gp := getg()
3
4 if gp.m.p == 0 {
5 throw("releasep: invalid arg")
6 }
7 pp := gp.m.p.ptr()
8 if pp.m.ptr() != gp.m || pp.status != _Prunning {
9 print("releasep: m=", gp.m, " m->p=", gp.m.p.ptr(), " p->m=", hex(pp.m), " p->status=", pp.status, "\n")
10 throw("releasep: invalid p state")
11 }
12 ...
13 gp.m.p = 0
14 pp.m = 0
15 pp.status = _Pidle
16 return pp
17}
就是指针的解绑操作,代码很清晰,连注释都不用,我们也不讲了。
解绑之后,pidleput 把空闲的 P 放到全局空闲队列中。
接着,更新线程的状态,从自旋更新为非自旋,调用 stopm 准备休眠:
1// Stops execution of the current m until new work is available.
2// Returns with acquired P.
3func stopm() {
4 gp := getg() // 当前线程执行的 goroutine
5
6 ...
7
8 lock(&sched.lock)
9 mput(gp.m) // 将线程放到全局空闲线程队列中
10 unlock(&sched.lock)
11 mPark()
12 acquirep(gp.m.nextp.ptr())
13 gp.m.nextp = 0
14}
stopm 将线程放到全局空闲线程队列,接着调用 mPark 休眠线程:
1// mPark causes a thread to park itself, returning once woken.
2//
3//go:nosplit
4func mPark() {
5 gp := getg()
6 notesleep(&gp.m.park) // notesleep 线程休眠
7 noteclear(&gp.m.park)
8}
9
10func notesleep(n *note) {
11 gp := getg()
12 if gp != gp.m.g0 {
13 throw("notesleep not on g0")
14 }
15 ns := int64(-1)
16 if *cgo_yield != nil {
17 // Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
18 ns = 10e6
19 }
20 for atomic.Load(key32(&n.key)) == 0 { // 这里通过 n.key 判断线程是否唤醒,如果等于 0,表示未唤醒,线程继续休眠
21 gp.m.blocked = true
22 futexsleep(key32(&n.key), 0, ns) // 调用 futex 休眠线程,线程会“阻塞”在这里,直到被唤醒
23 if *cgo_yield != nil {
24 asmcgocall(*cgo_yield, nil)
25 }
26 gp.m.blocked = false // “唤醒”,设置线程的 blocked 标记为 false
27 }
28}
29
30// One-time notifications.
31func noteclear(n *note) {
32 n.key = 0 // 执行到 noteclear 说明,线程已经被唤醒了,这时候线程重置 n.key 标志位为 0
33}
线程休眠是通过调用 futex 进入操作系统内核完成线程休眠的,关于 futex 的内容可以参考 这里。
线程的 n.key 是休眠的标志位,当 n.key 不等于 0 时表示有线程在唤醒休眠线程,线程从休眠状态恢复到正常状态。唤醒休眠线程通过调用 notewakeup(&nmp.park) 函数实现:
1func notewakeup(n *note) {
2 old := atomic.Xchg(key32(&n.key), 1)
3 if old != 0 {
4 print("notewakeup - double wakeup (", old, ")\n")
5 throw("notewakeup - double wakeup")
6 }
7 futexwakeup(key32(&n.key), 1) // 调用 futexwakeup 唤醒休眠线程
8}
首先,线程是怎么找到休眠线程的?线程通过全局空闲线程队列找到空闲的线程,并且将空闲线程的休眠标志位 m.park 传给 notewakeup,最后调用 futexwakeup 唤醒休眠线程。
值得一提的是,唤醒的线程在唤醒之后还是会继续找可运行的 goroutine 直到找到:
1func stopm() {
2 ...
3 mPark() // 如果 mPark 返回,表示线程被唤醒,开始正常工作
4 acquirep(gp.m.nextp.ptr()) // 前面休眠前,线程已经和 P 解绑了。这里在给线程找一个 P 绑定
5 gp.m.nextp = 0 // 线程已经绑定到 P 了,重置 nextp
6}
基本这就是调度策略中很重要的一部分,线程如何找 goroutine。找到 goroutine 之后调用 gogo 执行该 goroutine。
小结
本讲继续丰富了调度器的调度策略,下一讲,我们开始非 main goroutine 的介绍。