oohcode

$\bigodot\bigodot^H \rightarrow CODE$

go channel 原理

本篇主要介绍chan的内部实现原理(基于go1.12), 通过源码和图形的方式展示chan的内部结构及对chan进行操作的过程。

make chan

在进入源码分析之前,我们假设自己并不知道去哪里看其源码,我们先简单的创建一个chan

1
2
3
4
5
package main

func main() {
_ = make(chan int, 3)
}

为了分析其内部实现,我们可以通过compile工具对其编译生成伪汇编代码:

1
go tool compile -S chan.go

生成的汇编代码重点的内容入下:

1
2
3
4
5
6
"".main STEXT size=71 args=0x0 locals=0x20
0x0000 00000 (chan1.go:3) TEXT "".main(SB), ABIInternal, $32-0
...
0x0031 00049 (chan1.go:4) CALL runtime.makechan(SB)
...
0x0045 00069 (chan1.go:3) JMP 0

可以看到执行make其实最终执行的是runtime.makechan这个函数,这个函数的实现在runtime/chan.go文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func makechan(t *chantype, size int) *hchan {
elem := t.elem
...
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
...
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
...
return c

可以看到最终会返回一个*hchan类型,这个就是chan的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
type hchan struct {
qcount uint // 队列中有数据的个数
dataqsiz uint // 循环队列的大小z
buf unsafe.Pointer // 指向循环队列的地址
elemsize uint16
closed uint32 // chan的关闭状态
elemtype *_type // element type
sendx uint // 队列中下一个要发送的数据的下标
recvx uint // 队列中下一个要接收的数据的下标
recvq waitq // 等待接受的G队列
sendq waitq // 等待发送的G队列
lock mutex // 操作chan是需要加锁
}

执行完上面的make后,生成的chan如下:

send chan

为了了解我们往chan发送的时候都做了什么我可能先写一个demo:

1
2
3
4
5
6
package main

func main() {
c := make(chan int, 3)
c <- 3
}

查看其汇编代码:

1
2
3
4
5
6
7
8
"".main STEXT size=97 args=0x0 locals=0x20
0x0000 00000 (chan2.go:3) TEXT "".main(SB), ABIInternal, $32-0
...
0x0031 00049 (chan2.go:4) CALL runtime.makechan(SB)
...
0x004b 00075 (chan2.go:5) CALL runtime.chansend1(SB)
...
0x005f 00095 (chan2.go:3) JMP 0

可以看出我们往chan发送数据其实执行的是runtime.chansend1函数,这个函数很简简单,只是调用了runtime.chansend函数,我们主要看一下runtime.chansend函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
...
lock(&c.lock)
// 往已经 closed 的 chan 发送数据会直接 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
// 如果有接收队列,则进入send函数
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
// 没有接收队列,buf还没有满,则直接往里放数据
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz { //如果sendx == dataqsize, 证明buf满了,
c.sendx = 0 // c.sendx=0保证了又从头开始,形成了一个循环队列
}
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}

//获取一个sudog结构, 把当前发送数据所在的g和要发送的数据都放到这里
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 把这个sudog结构体放到发送对队列中
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) //阻塞当前g,直到由于可以发送数据而被唤醒
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}

下面我们有一个图来表示其过程,图中主要分为下面几个步骤:

  1. 往上面初始化好的hchan结构体发送第 1 个数据: 数据放到buf[0]的位置
  2. hchan结构体发送第 2 个数据: 数据放到buf[1]的位置
  3. hchan结构体发送第 3 个数据: 数据放到buf[2]的位置, 这时buf满了
  4. buf满了的hchan结构体发送第 4 个数据: g1会放到sudog结构体中,并放到sendq队列中,等待被唤醒
  5. buf满了的hchan结构体发送第 5 个数据: g2会放到sudog结构体中,并放到sendq队列中,等待被唤醒

recv chan

同上面一样,我们先写一个demo看看recv调用的是哪个函数:

1
2
3
4
5
6
package main

func main() {
c := make(chan int, 3)
<-c
}
1
2
3
4
5
6
7
"".main STEXT size=94 args=0x0 locals=0x20
...
0x0031 00049 (chan3.go:4) CALL runtime.makechan(SB)
...
0x0048 00072 (chan3.go:5) CALL runtime.chanrecv1(SB)
...
0x005c 00092 (chan3.go:3) JMP 0

同样runtime.chanrecv1也是简单调用了runtime.chanrecv函数,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

lock(&c.lock)
// 如果chan已经被关闭,并且qcount==0, 则返回默认零值+false(如x, ok := <- c, x是零值,ok=false)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
//如果在接收的时候有发送队列存在,则执行recv函数
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 如果存在buf, 存在数据
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx) //获取recvx位置的地址
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // 把recvx位置的数据copy到接收的变量中
}
typedmemclr(c.elemtype, qp) // 清空原来recvx位置的数据
c.recvx++
if c.recvx == c.dataqsiz { // 如果recvx == dataqsiz 证明已经到达最后一个,需要从头开始
c.recvx = 0 //从头开始,形成一个循环队列
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog() // 获取一个sudog结构,把对应的g和接收数据的变量地址放到sudog中
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 把sudog放入接收队列中
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3) //阻塞当前g,直到被唤醒

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

上面说到如果存在发送队列就会执行recv函数,下面看一下这个函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//对于nobuf的chan, 直接copy数据
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx) // 获取接收数据的位置
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp) //把recvx位置的数据copy到接收的变量中
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem) // 把发送队列的数据copy到当前recvx的位置
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 因为上面把发送队列的数据copy到了recvx, 为了保证下一个位置属按照顺序的,需要sendx = recvx
// 这几步保证了chan是一个FIFO的过程
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 把出队的g放到ready中,下次调度就可以运行了,不再阻塞
}

下面我们有一个图来表示接收数据的过程,图中主要分为下面几个步骤:

  1. 初始的hchan是上面send之后的结构
  2. g3执行接收操作,首先会把发送队列中的第 1 个g1出队,然后把buf[0]的数据赋值到g3中,再把g1的数据赋值到buf[0]
  3. g3执行接收操作,首先会把发送队列中的第 2 个g2出队,然后把buf[1]的数据赋值到g3中,再把g2的数据赋值到buf[1]
  4. 这个时候没有发送队列了,所以可以直接把buf[2]中的书赋值到g3
  5. 把下一个数据buf[0]中的书赋值到g3
  6. 把最后一个数据buf[1]中的书赋值到g3
  7. 已经没有数据可以赋值给g3了,所以g3被放入sudog结构体中,入队到了接收队列, 进入阻塞状态

send chan again

上面介绍send说到如果发送数据的时候有recvq队列就会调用send函数,这个函数的具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg) // no buf 直接同步
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx) // 获取recvx位置
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep) //直接把要发送的数据 copy 到 recvq 队列出队的 g 中
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 把g放到ready队列中,下次有机会被调度,不再阻塞
}

close

当我们close掉一个chan都发生了什么呢? 下面写一个closedemo:

1
2
3
4
5
6
package main

func main() {
c := make(chan int, 3)
close(c)
}

1
2
3
4
5
6
"".main STEXT size=85 args=0x0 locals=0x20
...
0x0031 00049 (chan4.go:4) CALL runtime.makechan(SB)
...
0x003f 00063 (chan4.go:5) CALL runtime.closechan(SB)
0x0053 00083 (chan4.go:3) JMP 0

可以调用了runtime.closechan函数,对应的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel")) // 已经关闭的 chan 不能再关闭
}

if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}

c.closed = 1 // 关闭状态设置为 1

var glist gList
// release all readers
// 遍历所有recvq 队列, 从队列中去掉,并清空其内容,把所有g都放到glist结构中
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}

// 遍历所有 sendq 队列, 从队列中去掉,把所有g都放到glist结构中
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
// 把刚才所有放到 glist 中的 g 都改为ready 状态,使其不再阻塞
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

下面我们分别看一下:

  1. 当存在recvq队列时:

  2. 当存在sendq队列时:

no buffer chan

前面讲的都是带bufferchan, 还有一种是经常使用的不带bufferchan,其实处理起来更简单,前面源码部分已经有涉及了,下面看一下操作过程:

  1. make一个不带bufferchan
  2. g1向这个chan发送数据, 由于没有接收者而被阻塞,放到sendq
  3. g2继续想这个chan发送数据,继续放到sendq
  4. 来一个接收者g3, 这时把g1sendq中出队,并把elem的值赋值给g3x
  5. g3继续接收,把g2sendq中出队,并把elem的值赋值给g3x
  6. 没有发送队列存在,g3也进入了阻塞状态,放到了recvq队列中

下面是其图形化展示:

参考

图解Go的channel底层原理
Go 1.12 runtime/chan.go
GopherCon 2017: Kavya Joshi - Understanding Channels