Mutex解析
初版的 Mutex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| type Mutex struct { key int32; sema int32; } func xadd(val *int32, delta int32) (new int32) { for { v := *val; if cas(val, v, v+delta) { return v+delta; } } panic("unreached") } func (m *Mutex) Lock() { if xadd(&m.key, 1) == 1 { return; } sys.semacquire(&m.sema); } func (m *Mutex) Unlock() { if xadd(&m.key, -1) == 0 { return; } sys.semrelease(&m.sema); }
|
可以看到,这简单几行就可以实现一个排外锁。通过cas
对 key
进行加一, 如果key
的值是从0
加到1
, 则直接获得了锁。否则通过semacquire
进行sleep, 被唤醒的时候就获得了锁。
源码分析
互斥锁有两种状态:正常状态和饥饿状态。
在正常状态下,所有等待锁的goroutine按照FIFO顺序等待。唤醒的goroutine不会直接拥有锁,而是会和新请求锁的goroutine竞争锁的拥有。新请求锁的goroutine具有优势:它正在CPU上执行,而且可能有好几个,所以刚刚唤醒的goroutine有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的goroutine会加入到等待队列的前面。 如果一个等待的goroutine超过1ms没有获取锁,那么它将会把锁转变为饥饿模式。
在饥饿模式下,锁的所有权将从unlock的gorutine直接交给交给等待队列中的第一个。新来的goroutine将不会尝试去获得锁,即使锁看起来是unlock状态, 也不会去尝试自旋操作,而是放在等待队列的尾部。
如果一个等待的goroutine获取了锁,并且满足一以下其中的任何一个条件:(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将锁的状态转换为正常状态。
正常状态有很好的性能表现,饥饿模式也是非常重要的,因为它能阻止尾部延迟的现象。
我们要从多线程(goroutine)的并发场景去理解为什么实现中有很多的分支。
当一个goroutine获取这个锁的时候, 有可能这个锁根本没有竞争者, 那么这个goroutine轻轻松松获取了这个锁。而如果这个锁已经被别的goroutine拥有, 就需要考虑怎么处理当前的期望获取锁的goroutine。同时, 当并发goroutine很多的时候,有可能会有多个竞争者, 而且还会有通过信号量唤醒的等待者。
1 2 3 4
| type Mutex struct { state int32 sema uint32 }
|
state
是一个共用的字段, 第0个 bit 标记这个mutex
是否已被某个goroutine所拥有, 下面为了描述方便称之为state
已加锁,或者mutex
已加锁。 如果第0个 bit为0, 下文称之为state
未被锁, 此mutex目前没有被某个goroutine所拥有。
第1个 bit 标记这个mutex
是否已唤醒, 也就是有某个唤醒的goroutine
要尝试获取锁。
第2个 bit 标记这个mutex
状态, 值为1表明此锁已处于饥饿状态。
其余bit用于记录等待得到锁的goroutine数
同时,尝试获取锁的goroutine也有状态,有可能它是新来的goroutine,也有可能是被唤醒的goroutine, 可能是处于正常状态的goroutine, 也有可能是处于饥饿状态的goroutine。
Lock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } }
|
Unlock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func (m *Mutex) Unlock() { new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false) return } old = m.state } } else { runtime_Semrelease(&m.sema, true) } }
|
根据Unlock
的代码分析,下面的哪个答案正确?
如果一个goroutine g1
通过Lock
获取了锁, 在持有锁的期间, 另外一个goroutine g2
调用Unlock
释放这个锁, 会出现什么现象?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| package main import ( "sync" "time" ) func main() { var mu sync.Mutex go func() { mu.Lock() time.Sleep(10 * time.Second) mu.Unlock() }() time.Sleep(time.Second) mu.Unlock() select {} }
|
锁被另一个goroutine解了,再次解锁时会panic