WaitGroup介绍
WaitGroup 提供了三个方法:
1 2 3
| func (wg *WaitGroup) Add(delta int) func (wg *WaitGroup) Done() func (wg *WaitGroup) Wait()
|
- Add,用来设置 WaitGroup 的计数值;
- Done,用来将 WaitGroup 的计数值减 1,其实就是调用了 Add(-1);
- Wait,调用这个方法的 goroutine 会一直阻塞,直到 WaitGroup 的计数值变为 0。
解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type noCopy struct{}
type WaitGroup struct { noCopy noCopy state1 [3]uint32 }
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } }
|
WaitGroup中state方法的内存对齐
noCopy
是一个空的结构体,大小为0,不需要做内存对齐,所以可以忽略这个字段。
使用了uint32
的数组来构造state1
字段,然后根据系统的位数的不同构造不同的返回值,下面说说怎么通过sate1这个字段构建waiter数、计数值、信号量的。
state1
状态和信号量处理
state1
这里总共被分配了12
个字节,这里被设计了三种状态:
- 其中对齐的
8
个字节作为状态,高32
位为计数的数量,低32
位为等待的goroutine
数量
- 其中的
4
个字节作为信号量存储
提供了(wg *WaitGroup) state() (statep *uint64, semap *uint32)
从state1
字段中取出他的状态和信号量。
首先unsafe.Pointer
来获取state1的地址值然后转换成uintptr类型的,然后判断一下这个地址值是否能被8整除,这里通过地址 mod 8的方式来判断地址是否是64位对齐。
因为有内存对齐的存在,在64位架构里面WaitGroup结构体state1起始的位置肯定是64位对齐的,所以在64位架构上用state1前两个元素并成uint64来表示statep,state1最后一个元素表示semap;
但是在32位架构里面,一个字长是4bytes,要操作64位的数据分布在两个数据块中,需要两次操作才能完成访问。如果两次操作中间有可能别其他操作修改,不能保证原子性。
同理32位架构想要原子性的操作8bytes,需要由调用方保证其数据地址是64位对齐的,否则原子访问会有异常。
在32位架构中,WaitGroup在初始化的时候,分配内存地址的时候是随机的,所以WaitGroup结构体state1起始的位置不一定是64位对齐,可能会是:uintptr(unsafe.Pointer(&wg.state1))%8 = 4
,如果出现这样的情况,那么就需要用state1的第一个元素做padding,用state1的后两个元素合并成uint64来表示statep。
Add 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func (wg *WaitGroup) Add(delta int) { statep, semap := wg.state() ... state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32) w := uint32(state) ... if v < 0 { panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } if v > 0 || w == 0 { return } if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
|
Wait方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func (wg *WaitGroup) Wait() { statep, semap := wg.state() ... for { state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) if v == 0 { ... return } if atomic.CompareAndSwapUint64(statep, state, state+1) { ... runtime_Semacquire(semap) if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } ... return } } }
|
Tip
写个例子去debug一下,看一下个状态下w,v的值,就会清晰很多