Cwww3's Blog

Record what you think

0%

WaitGroup

WaitGroup介绍

WaitGroup 提供了三个方法:

1
2
3
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
  • Add,用来设置 WaitGroup 的计数值;
  • Done,用来将 WaitGroup 的计数值减 1,其实就是调用了 Add(-1);
  • Wait,调用这个方法的 goroutine 会一直阻塞,直到 WaitGroup 的计数值变为 0。

解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type noCopy struct{}

type WaitGroup struct {
// 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
noCopy noCopy
// 一个复合值,用来表示waiter数、计数值、信号量
state1 [3]uint32
}
// 获取state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}

WaitGroup中state方法的内存对齐

noCopy是一个空的结构体,大小为0,不需要做内存对齐,所以可以忽略这个字段。

使用了uint32的数组来构造state1字段,然后根据系统的位数的不同构造不同的返回值,下面说说怎么通过sate1这个字段构建waiter数、计数值、信号量的。

state1状态和信号量处理

state1这里总共被分配了12个字节,这里被设计了三种状态:

  • 其中对齐的8个字节作为状态,高32位为计数的数量,低32位为等待的goroutine数量
  • 其中的4个字节作为信号量存储

提供了(wg *WaitGroup) state() (statep *uint64, semap *uint32)state1字段中取出他的状态和信号量。

首先unsafe.Pointer来获取state1的地址值然后转换成uintptr类型的,然后判断一下这个地址值是否能被8整除,这里通过地址 mod 8的方式来判断地址是否是64位对齐。

因为有内存对齐的存在,在64位架构里面WaitGroup结构体state1起始的位置肯定是64位对齐的,所以在64位架构上用state1前两个元素并成uint64来表示statep,state1最后一个元素表示semap;

但是在32位架构里面,一个字长是4bytes,要操作64位的数据分布在两个数据块中,需要两次操作才能完成访问。如果两次操作中间有可能别其他操作修改,不能保证原子性。

同理32位架构想要原子性的操作8bytes,需要由调用方保证其数据地址是64位对齐的,否则原子访问会有异常。

在32位架构中,WaitGroup在初始化的时候,分配内存地址的时候是随机的,所以WaitGroup结构体state1起始的位置不一定是64位对齐,可能会是:uintptr(unsafe.Pointer(&wg.state1))%8 = 4,如果出现这样的情况,那么就需要用state1的第一个元素做padding,用state1的后两个元素合并成uint64来表示statep。

image-20210819203755697 image-20210819204844516

Add 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (wg *WaitGroup) Add(delta int) {
// 获取状态值
statep, semap := wg.state()
...
// 高32bit是计数值v,所以把delta左移32,增加到计数上
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 获取计数器的值
v := int32(state >> 32)
// 获取waiter的值
w := uint32(state)
...
// 任务计数器不能为负数
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// wait不等于0说明已经执行了Wait,此时不容许Add(正数)
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 计数器的值大于或者没有waiter在等待,直接返回
if v > 0 || w == 0 {
return
}
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 根据上面的条件 此时v等于0且w>0
// statep清零 可以在Wait()阻塞之后可以重用
*statep = 0
// w一般等于1,一般是main函数执行了一次Wait()方法
for ; w != 0; w-- {
//释放信号量,执行一次释放一个,唤醒一个等待者,一般是唤醒main(goroutine)
runtime_Semrelease(semap, false, 0)
}
}

Wait方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
...
for {
state := atomic.LoadUint64(statep)
// 获取counter
v := int32(state >> 32)
// 获取waiter
w := uint32(state)
// counter为零,不需要等待直接返回
if v == 0 {
...
return
}
// 使用CAS将waiter加1 执行一次Wait方法,wait值就加一,通过循环保证一定能加上
if atomic.CompareAndSwapUint64(statep, state, state+1) {
...
// 挂起等待唤醒 等待Add方法中runtime_Semrelease唤醒
runtime_Semacquire(semap)
// 唤醒之后statep应该是清零的 如果不为零,表示WaitGroup又被重复使用,这会panic
// 在全部调用Wait的goroutine都被唤醒后wg才能再次使用
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
...
// 直接返回
return
}
}
}

Tip

写个例子去debug一下,看一下个状态下w,v的值,就会清晰很多

Donate comment here.