我忘记在哪看到过陈硕说过,对于非内核开发者而言,看 Linux 内核源码有效的方式是只看主干逻辑,忽略错误分支,避免捡了芝麻丢了西瓜。本文及未来的源码解析都是建立在这个思想基础上的。

Channel 在使用 Go 进行并发编程中起到了非常核心的作用。Go 使用 goroutine 和 Channel 实现了 CSP 并发模型,关于 CSP,可以看这篇博客:并发之痛 Thread,Goroutine,Actor。本文无意于讨论并发编程,关注点只在 Channel 的具体实现上面,只有知道了其真正的实现方式,才能知道 Channel 能解决什么问题,不能解决什么。

  • Go 版本:1.9.2
  • 源码文件:runtime/chan.go

Struct

每个 Channel 都是一个 hcahn 数据结构:

type hchan struct {
	qcount   uint           // buffer中数据总数
	dataqsiz uint           // buffer的容量
	buf      unsafe.Pointer // 指向 buffer 的指针
	elemsize uint16         // channel中数据类型的大小
	closed   uint32         // channel是否关闭,0 => false,其他都是true
	elemtype *_type         // channel数据类型
	sendx    uint           // 对于 send 行为而言,该次可放入位置的 index
	recvx    uint           // 对于 recv 行为而言,该次可读取位置的 index
	recvq    waitq          // 接收 goroutine 等待队列
	sendq    waitq          // 发送 goroutine 等待队列

	lock     mutex          // 互斥锁
}

从这个数据结构,我们可以有个大体的印象,即 Channel 结构是一个头结点,保存一些元信息,其中有个指针指向真正存放数据的 buffer 对象,而这个 buffer 可以理解为一个数组。另外,通过 Mutex 来保护临界区的并发。

Make

创建一个 Channel 的代码结构如下:

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem
    
    // 忽略了边界检查代码

    var c *hchan
    // 如果是无指针类型 || channel size 为0
    if elem.kind&kindNoPointers != 0 || size == 0 {
    	// 一次性把所需内存分配出来
    	// 注意, 这里的 hchanSize 已经是按 8 字节对齐后的大小了。
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        // size 大于 0,即分配了有效的 buffer 内存,令 c.buf 指向 buffer 数组
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            c.buf = unsafe.Pointer(c)
        }
    } else {
    	// 分两次分配内存
        c = new(hchan)
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    return c
}

有一个地方还没有想明白:

Note: 区分是否一次性分配所需内存的关键在于 kindNoPointers,什么是所谓的无指针类型?为什么要分开考虑是否一次性分配内存?

skoo 的博客上画了一张图,较好的总结了整个 Channel 的结构: hchan 当然,要注意, Hchan 这个头不一定和后面的 buffer 在内存分布上是连续的。(话又说回来,即使逻辑上连续,到真实物理内存上,也不一定连续,我说这个干嘛。。。)

Send

当我们在代码中写 ch <- x ,实际上是调用了 chansend 函数,该函数代码结构为:

// ep: 数据的指针
// block:是否阻塞,详见下文
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   // 往 nil 发数据
    if c == nil {
    	// 非阻塞情况下,直接返回 false
        if !block {
            return false
        }
        // gopark 将当前 goroutine 休眠,等待第一个参数唤醒,但这里传入的是nil,所以会一直休眠下去。
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }
    // ...
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }
    // 开始加锁
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    // 情况1:buf可用数据为空,有goroutine阻塞在取数据上
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        // 直接将数据发给该 goroutine.
        // send 会调用 goready,用以唤醒拿到数据的 goroutine.
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    // 情况2:没有阻塞在获取数据的 goroutine,且 buf 内还有空间
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        // 获取当前可插入数据的指针
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        // 复制数据
        typedmemmove(c.elemtype, qp, ep)
        // send index 前移
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 可用数据数增加
        c.qcount++
        unlock(&c.lock)
        return true
    }
    // 情况3. buf 内空间已满
    // 非阻塞情况下,直接返回 false
    if !block {
        unlock(&c.lock)
        return false
    }
    // 否则,阻塞该 goroutine.
    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    // ...
    gp.waiting = mysg
    gp.param = nil
    // 将该 goroutine 的结构放入 sendq 队列
    c.sendq.enqueue(mysg)
    // 休眠
    // 等待 goready 唤醒
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
    // ...
    return true
}

关于阻塞非阻塞

一般情况下,传入的参数都是 block=true,即阻塞模式,一个往 channel 中插入数据的 goroutine 会阻塞到插入成功为止。

非阻塞是只这种情况:

select {
case c <- v:
	... foo
default:
	... bar
}

这种情况下,编译器会将其改为:

if selectnbsend(c, v) {
	... foo
} else {
	... bar
}

其中, selectnbsend 调用 chansend 时,传入的 block=false.

关于临界区

加锁解锁的代价非常低,当我们讨论锁的代价的时候,我们值得是临界区产生的锁竞争。下面分析下三种情况的临界区。

  • 情况 1

    goroutine 之间的数据复制

  • 情况 2

    数据复制到 buffer 中,更新两次(sendx / qcount)内存

  • 情况 3

    获取当前 goroutinne,构造 mysg 结构,放入队列

可以看到,情况1的临界区最小,情况3的临界区最大。

Recv

同样的,当我们代码中出现 x <- ch,实际最终调用的是 chanrecv 函数,该函数代码结构为:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// 从 nil 收数据,将会一直休眠
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
		throw("unreachable")
	}
	// 加锁
	lock(&c.lock)
	// ...
	
	// 情况1. sendq 队列不为空
	if sg := c.sendq.dequeue(); sg != nil {
		// 取 recv 队列队首数据给当前 goroutine
		// 将 sendq 队列队首数据放入 buffer 中相同的 slot
		// 通过 goready 唤醒相应 sendq 中的 goroutine
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	
	// 情况2. 没有阻塞在 sendq 的 goroutine,且buffer中有数据
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		// 复制数据
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		// recv index 前移
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		// 可用数据减少
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
	
	// 情况3. buffer空间为空,阻塞该 goroutine.
	gp := getg()
	mysg := acquireSudog()
	// ...
	// 将该 goroutine 相应的结构放入 recvq
	c.recvq.enqueue(mysg)
	// 休眠,等待相应的 goready 唤醒
	goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

	return true, !closed
}

recv 的逻辑和 send 互为表里,相互配合,各自维护一个 index 指针来指向自己的生产 / 消费队列,整体不难理解。

Close

关闭一个 Channel 只有一些逻辑上的考量,具体逻辑如下:

  1. 检查 channel 状态,不能重复关闭 channel
  2. 释放 recvq 中所有的 reader,将其加入 gist 队列中
  3. 释放 sendq 中所有的 sender,将其加入 gist 队列中(sender会panic)
  4. 遍历 gist 队列中的结构,通过 goready 唤醒这个 goroutine.

Summary

软件工程没有银弹,Channel 也不是万能药。在高并发写多读少的情况下,Channel 内部的那把锁产生的代价可能会非常大,这时可能就需要考虑使用其他的数据结构+共享机制了。

产生这个想法是在分析异步日志库的性能问题是发现的,多goroutine写+单goroutine读,中间只通过一个 Channel 来传递日志信息,性能损耗非常可观,关于异步日志库的问题,后续会有更多博客发出来。


参考: