前言
皇天不负有心人,终于我们到了运行 main goroutine 环节了。让我们走起来,看看一个 goroutine 到底是怎么运行的。
运行 goroutine
稍微回顾下前面的内容,第一讲 Go 程序初始化,介绍了 Go 程序是怎么进入到 runtime 的,随之揭开 runtime 的面纱。第二讲,介绍了调度器的初始化,要运行 goroutine 调度器是必不可少的,只有调度器准备就绪才能开始工作。第三讲,介绍了 main goroutine 是如何创建出来的,只有创建一个 goroutine 才能开始运行,否则执行代码无从谈起。这一讲,我们继续介绍如何运行 main goroutine。
我们知道 main goroutine 此时处于 _Grunnable 状态,要使得 main goroutine 处于 _Grunning 状态,还需要将它和 P 绑定。毕竟 P 是负责调度任务给线程处理的,只有和 P 绑定线程才能处理相应的 goroutine。
绑定 P
回到代码 newproc:
1func newproc(fn *funcval) {
2 gp := getg()
3 pc := getcallerpc()
4 systemstack(func() {
5 newg := newproc1(fn, gp, pc) // 创建 newg,这里是 main goroutine
6
7 pp := getg().m.p.ptr() // 获取当前工作线程绑定的 P,这里是 g0.m.p = allp[0]
8 runqput(pp, newg, true) // 绑定 allp[0] 和 main goroutine
9
10 if mainStarted { // mainStarted 还未启动,这里是 false
11 wakep()
12 }
13 })
14}
进入 runqput 函数查看 main goroutine 是怎么和 allp[0] 绑定的:
1// runqput tries to put g on the local runnable queue.
2// If next is false, runqput adds g to the tail of the runnable queue.
3// If next is true, runqput puts g in the pp.runnext slot.
4// If the run queue is full, runnext puts g on the global queue.
5// Executed only by the owner P.
6func runqput(pp *p, gp *g, next bool) {
7 ...
8 if next {
9 retryNext:
10 oldnext := pp.runnext // 从 P 的 runnext 获取下一个将要执行的 goroutine,这里 pp.runnext = nil
11 if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { // 将 P 的 runnext 更新为 gp,这里的 gp 是 main goroutine
12 goto retryNext
13 }
14 if oldnext == 0 { // 如果 P 原来要执行的 goroutine 是 nil,则直接返回,这里创建的是 main goroutine 将直接返回
15 return
16 }
17 gp = oldnext.ptr() // 如果不为 nil,表示是一个将要执行的 goroutine。后续对这个被赶走的 goroutine 进行处理
18 }
19
20retry:
21 h := atomic.LoadAcq(&pp.runqhead)
22 t := pp.runqtail
23
24 if t-h < uint32(len(pp.runq)) { // P 的队尾和队头指向本地运行队列 runq,如果当前队列长度小于 runq 则将赶走的 goroutine 添加到队尾
25 pp.runq[t%uint32(len(pp.runq))].set(gp)
26 atomic.StoreRel(&pp.runqtail, t+1)
27 return
28 }
29
30 if runqputslow(pp, gp, h, t) { // 如果当前 P 的队列长度等于不小于 runq,表示本地队列满了,将赶走的 goroutine 添加到全局队列中
31 return
32 }
33
34 goto retry
35}
runqput 函数绑定 P 和 goroutine,同时处理 P 中的本地运行队列。基本流程在注释中已经介绍的比较清楚了。
这里我们绑定的是 main goroutine,直接绑定到 P 的 runnext 成员即可。不过对于 runqput 的整体处理来说,还需要在介绍一下 runqputslow 函数:
1// Put g and a batch of work from local runnable queue on global queue.
2// Executed only by the owner P.
3func runqputslow(pp *p, gp *g, h, t uint32) bool {
4 var batch [len(pp.runq)/2 + 1]*g // 定义 batch,长度是 P.runq 的一半。batch 用来装 g
5
6 // First, grab a batch from local queue.
7 n := t - h
8 n = n / 2
9 if n != uint32(len(pp.runq)/2) {
10 throw("runqputslow: queue is not full")
11 }
12 for i := uint32(0); i < n; i++ {
13 batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr() // 从 P 的 runq 中拿出一半的 g 到 batch 中
14 }
15 if !atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume // 更新 P 的 runqhead 的指向,它指向的是本地队列的头
16 return false
17 }
18 batch[n] = gp // 将赶走的 goroutine 放到 batch 尾
19
20 if randomizeScheduler { // 如果是随机调度的话,这里还要打乱 batch 中 g 的顺序以保证随机性
21 for i := uint32(1); i <= n; i++ {
22 j := fastrandn(i + 1)
23 batch[i], batch[j] = batch[j], batch[i]
24 }
25 }
26
27 // Link the goroutines.
28 for i := uint32(0); i < n; i++ {
29 batch[i].schedlink.set(batch[i+1]) // batch 中 goroutine 的 schedlink 按顺序指向其它 goroutine,构造一个链表
30 }
31 var q gQueue // gQueue 是一个包含头和尾的指针,将头和尾指针分别指向 batch 的头 batch[0] 和尾 batch[n]
32 q.head.set(batch[0])
33 q.tail.set(batch[n])
34
35 // Now put the batch on global queue.
36 lock(&sched.lock) // 操作全局变量 sched,为 sched 加锁
37 globrunqputbatch(&q, int32(n+1)) // globrunqputbatch 将 q 指向的 batch 传给全局变量 sched
38 unlock(&sched.lock) // 解锁
39 return true
40}
41
42func globrunqputbatch(batch *gQueue, n int32) {
43 assertLockHeld(&sched.lock)
44
45 sched.runq.pushBackAll(*batch) // 这里将 sched.runq 指向 batch
46 sched.runqsize += n // sched 的 runqsize 加 n,n 表示新添加进 sched.runq 的 goroutine
47 *batch = gQueue{}
48}
如果 P 的本地队列已满,则在 runqputslow 中拿出本地队列的一半 goroutine 放到 sched.runq 全局队列中。这里本地队列是固定长度,容量有限,用数组来表示队列。而全局队列长度是不固定的,用链表来表示全局队列。
我们可以画出示意图如下图,注意示意图只是加深理解,和我们这里运行 main goroutine 的流程没关系:

运行 main goroutine
P 和 main goroutine 绑定之后,理论上已经可以运行 main goroutine 了。继续看代码执行的什么:
1> runtime.rt0_go() /usr/local/go/src/runtime/asm_amd64.s:358 (PC: 0x45434a)
2Warning: debugging optimized function
3 353: PUSHQ AX
4 354: CALL runtime·newproc(SB)
5 355: POPQ AX
6 356:
7 357: // start this M
8=> 358: CALL runtime·mstart(SB) // 调用 mstart 意味着当前线程开始工作了;mstart 是一个永不返回的函数
9 359:
10 360: CALL runtime·abort(SB) // mstart should never return
11 361: RET
12 362:
向下执行:
1(dlv) si
2> runtime.mstart() /usr/local/go/src/runtime/asm_amd64.s:394 (PC: 0x4543c0)
3Warning: debugging optimized function
4TEXT runtime.mstart(SB) /usr/local/go/src/runtime/asm_amd64.s
5=> asm_amd64.s:394 0x4543c0 e87b290000 call $runtime.mstart0
6 asm_amd64.s:395 0x4543c5 c3 ret
调用 runtime.mstart0:
1func mstart0() {
2 gp := getg() // gp = g0
3 ...
4 mstart1()
5 ...
6}
调用 mstart1:
1func mstart1() {
2 gp := getg() // gp = g0
3
4 // 保存线程执行的栈,当线程进入 schedule 函数就不会返回,这意味着线程执行的栈是可复用的
5 gp.sched.g = guintptr(unsafe.Pointer(gp))
6 gp.sched.pc = getcallerpc()
7 gp.sched.sp = getcallersp()
8
9 ...
10 if fn := gp.m.mstartfn; fn != nil { // 执行 main goroutine,fn == nil
11 fn()
12 }
13
14 ...
15 schedule() // 线程进入 schedule 调度循环,该循环是永不返回的
16}
进入 schedule:
1func schedule() {
2 mp := getg().m // mp = m0
3 ...
4top:
5 pp := mp.p.ptr() // pp = allp[0]
6 pp.preempt = false
7
8 // 线程有两种状态,自旋和非自旋。自旋表示线程没有工作,在找工作阶段。非自旋表示线程正在工作
9 // 这里如果线程自旋,但是线程绑定的 P 本地队列有 goroutine 则报异常
10 if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
11 throw("schedule: spinning with local work")
12 }
13
14 // blocks until work is available
15 gp, inheritTime, tryWakeP := findRunnable() // 找一个处于 _Grunnable 状态的 goroutine 出来
16
17 ...
18 execute(gp, inheritTime) // 运行该 goroutine,这里运行的是 main goroutine
19}
schedule 中的重点是 findRunaable 函数,进入该函数:
1// Finds a runnable goroutine to execute.
2// Tries to steal from other P's, get g from local or global queue, poll network.
3// tryWakeP indicates that the returned goroutine is not normal (GC worker, trace
4// reader) so the caller should try to wake a P.
5func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
6 mp := getg().m // mp = m0
7
8top:
9 pp := mp.p.ptr() // pp = allp[0] = p0
10
11 ...
12 // Check the global runnable queue once in a while to ensure fairness.
13 // Otherwise two goroutines can completely occupy the local runqueue
14 // by constantly respawning each other.
15 // 官方的注释对这一段逻辑已经解释的很详细了,我们就跳过了,偷个懒
16 if pp.schedtick%61 == 0 && sched.runqsize > 0 {
17 lock(&sched.lock)
18 gp := globrunqget(pp, 1)
19 unlock(&sched.lock)
20 if gp != nil {
21 return gp, false, false
22 }
23 }
24
25 // local runq
26 // 从 P 的本地队列找 goroutine
27 if gp, inheritTime := runqget(pp); gp != nil {
28 return gp, inheritTime, false
29 }
30 ...
31}
findRunnable 中首先为了公平,每调用 schedule 函数 61 次就要从全局可运行队列中获取 goroutine,防止全局队列中的 goroutine 被“饿死”。接着从 P 的本地队列中获取 goroutine,这里运行的是 main goroutine 将从 P 的本地队列中获取 goroutine。查看 runqget:
1func runqget(pp *p) (gp *g, inheritTime bool) {
2 // If there's a runnext, it's the next G to run.
3 next := pp.runnext
4 // If the runnext is non-0 and the CAS fails, it could only have been stolen by another P,
5 // because other Ps can race to set runnext to 0, but only the current P can set it to non-0.
6 // Hence, there's no need to retry this CAS if it fails.
7 if next != 0 && pp.runnext.cas(next, 0) {
8 return next.ptr(), true
9 }
10
11 for {
12 h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
13 t := pp.runqtail
14 if t == h {
15 return nil, false
16 }
17 gp := pp.runq[h%uint32(len(pp.runq))].ptr()
18 if atomic.CasRel(&pp.runqhead, h, h+1) { // cas-release, commits consume
19 return gp, false
20 }
21 }
22}
注释已经比较详细了,首先拿到 P 的 runnext 作为要运行的 goroutine。如果拿到的 goroutine 不是空,则重置 runnext,并且返回拿到的 goroutine。如果拿到的 goroutine 是空的,则从本地队列中拿 goroutine。
通过 findRunnable 我们拿到可执行的 main goroutine。接着调用 execute 执行 main goroutine。
进入 execute:
1func execute(gp *g, inheritTime bool) {
2 mp := getg().m // mp = m0
3
4 mp.curg = gp // mp.curg = g1
5 gp.m = mp // gp.m = m0
6 casgstatus(gp, _Grunnable, _Grunning) // 更新 goroutine 的状态为 _Grunning
7 gp.waitsince = 0
8 gp.preempt = false
9 gp.stackguard0 = gp.stack.lo + stackGuard
10 if !inheritTime {
11 mp.p.ptr().schedtick++
12 }
13
14 ...
15 gogo(&gp.sched)
16}
在 execute 中将线程和 gouroutine 关联起来,更新 goroutine 的状态,然后调用 gogo 完成从 g0 栈到 gp 栈的切换,gogo 是用汇编编写的,原因如下:
1gogo 函数也是通过汇编语言编写的,这里之所以需要使用汇编,是因为 goroutine 的调度涉及不同执行流之间的切换。
2
3前面我们在讨论操作系统切换线程时已经看到过,执行流的切换从本质上来说就是 CPU 寄存器以及函数调用栈的切换,然而不管是 go 还是 c 这种高级语言都无法精确控制 CPU 寄存器,因而高级语言在这里也就无能为力了,只能依靠汇编指令来达成目的。
进入 gogo,gogo 传入的是 goroutine 的 sched 结构:
1TEXT runtime·gogo(SB), NOSPLIT, $0-8
2 MOVQ buf+0(FP), BX // gobuf
3 MOVQ gobuf_g(BX), DX // gobuf 的 g 赋给 DX
4 MOVQ 0(DX), CX // make sure g != nil
5 JMP gogo<>(SB) // 跳转到私有函数 gogo<>
6
7TEXT gogo<>(SB), NOSPLIT, $0
8 get_tls(CX) // 获取当前线程 tls 中的 goroutine
9 MOVQ DX, g(CX)
10 MOVQ DX, R14 // set the g register
11 MOVQ gobuf_sp(BX), SP // restore SP
12 MOVQ gobuf_ret(BX), AX // AX = gobuf.ret
13 MOVQ gobuf_ctxt(BX), DX // DX = gobuf.ctxt
14 MOVQ gobuf_bp(BX), BP // BP = gobuf.bp
15 MOVQ $0, gobuf_sp(BX) // clear to help garbage collector
16 MOVQ $0, gobuf_ret(BX)
17 MOVQ $0, gobuf_ctxt(BX)
18 MOVQ $0, gobuf_bp(BX)
19 MOVQ gobuf_pc(BX), BX // BX = gobuf.pc
20 JMP BX // 跳转到 gobuf.pc
在 gogo<> 中完成 g0 到 gp 栈的切换:MOVQ gobuf_sp(BX), SP,并且跳转到 gobuf.pc 执行。我们看 gobuf.pc 要执行的指令地址是什么:
1asm_amd64.s:421 0x45363a 488b5b08 mov rbx, qword ptr [rbx+0x8]
2=> asm_amd64.s:422 0x45363e ffe3 jmp rbx
3(dlv) regs
4 Rbx = 0x000000000042ee80
执行 JMP BX 跳转到 0x000000000042ee80:
1(dlv) si
2> runtime.main() /usr/local/go/src/runtime/proc.go:144 (PC: 0x42ee80)
3Warning: debugging optimized function
4TEXT runtime.main(SB) /usr/local/go/src/runtime/proc.go
5=> proc.go:144 0x42ee80 4c8d6424e8 lea r12, ptr [rsp-0x18]
终于我们揭开了它的神秘面纱,这个指令指向的是 runtime.main 函数的第一条汇编指令。也就是说,跳转到了 runtime.main,这个函数会调用我们 main 包下的 main 函数。查看 runtime.main 函数:
1// The main goroutine.
2func main() {
3 mp := getg().m // mp = m0
4
5 if goarch.PtrSize == 8 {
6 maxstacksize = 1000000000 // 扩栈,栈的最大空间是 1GB
7 } else {
8 maxstacksize = 250000000
9 }
10
11 ...
12 // Allow newproc to start new Ms.
13 mainStarted = true
14
15 if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
16 systemstack(func() {
17 newm(sysmon, nil, -1) // 开启监控线程,这个线程很重要,我们后续会讲,这里先放着,让 sysmon 飞一会儿
18 })
19 }
20
21 ...
22 // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
23 fn := main_main // 这里的 main_main 链接的是 main 包中的 main 函数
24 fn() // 执行 main.main
25 ...
26 runExitHooks(0)
27
28 exit(0) // 执行完 main.main 之后调用 exit 退出线程
29 for {
30 var x *int32
31 *x = 0
32 }
33}
runtime.main 是在 main goroutine 栈中执行的。在函数中调用 main.main 执行我们写的用户代码:
1(dlv) n
2266: fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
3=> 267: fn()
4(dlv) s
5> main.main() ./hello.go:3 (PC: 0x45766a)
6Warning: debugging optimized function
7 1: package main
8 2:
9=> 3: func main() {
10 4: println("Hello World")
11 5: }
main.main 执行完之后线程调用 exit(0) 退出程序。
小结
至此我们的 main goroutine 就执行完了,花了四讲才算走通了一个 main goroutine,真不容易呀。当然,关于 Go runtime 调度器的故事还没结束,下一讲我们继续。