Go源码解析之mutex
Overview
概要
今天我们来看看Go中的互斥锁 sync/mutex
。本文基于go1.15.5
进行分析。
我们借用互斥锁在维基百科上的定义:互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。
在Go中我们无法直接操作线程,使用 go
关键字启动的是goroutine,但goroutine的背后还是操作系统的线程,所以在此我们讨论的是多个goroutine之间的互斥锁。
用法
互斥锁的使用非常简单,初始化一个mutex,它的默认状态是unlock的。
Lock
方法表示这是临界区的开始,后续代码在访问公共资源时是受控的。调用该方法时,如果互斥锁已经是加锁的状态,goroutine将一直阻塞,直到锁释放。
Unlock
方法表示这是临界区的结束,之前的代码在访问公共资源时是受控的,但之后的将不再受控。调用该方法时,如果互斥锁是未加锁的状态,将会产生一个runtime error。
举个反例
1package main
2
3import (
4 "fmt"
5 "runtime"
6 "sync"
7 "time"
8)
9
10var (
11 number int
12 mutex sync.Mutex
13)
14
15func main() {
16 runtime.GOMAXPROCS(10)
17
18 for i := 0; i < 1000; i++ {
19 go Add()
20 }
21
22 time.Sleep(time.Second)
23 fmt.Println(number)
24}
25
26func Add() {
27 number++
28}
源代码可以在 Playground 查看。
我们预先声明了两个变量: number
是全局变量,即公共资源;mutex
即互斥锁。
然后在main函数中的 runtime.GOMAXPROCS(10) 即Line#16,十分重要。它表示Go将启动10个线程来处理任务,这样才能模拟多线程的情况。如果你是单核单线程的CPU或者设置了runtime.GOMAXPROCS(1)
,本例子将不适用,因为单线程环境不存在并发访问。
之后我们启动1000个goroutine对全局变量 number
进行累加。
最后等待1s(1000次累加肯定不会超过1s,这里仅做demo,真实环境不要这么玩),并打印结果。
将该程序执行多次,我们会发现打印的结果并不总是1000,有时是979,有时是941等等结果。
这就是因为 number
是公共资源,多个goroutine在对其进行累加时可能是多线程并发进行的,累加时有些线程获取到的是旧值
,累加完成之后又将旧值的计算结果赋值给了 number
,导致部分并发计算的结果被覆盖了。
正确写法
1package main
2
3import (
4 "fmt"
5 "runtime"
6 "sync"
7 "time"
8)
9
10var (
11 number int
12 mutex sync.Mutex
13)
14
15func main() {
16 runtime.GOMAXPROCS(10)
17
18 for i := 0; i < 1000; i++ {
19 go Add()
20 }
21
22 time.Sleep(time.Second)
23 fmt.Println(number)
24}
25
26func Add() {
27 mutex.Lock()
28 defer mutex.Unlock()
29 number++
30}
源代码可以在 Playground 查看。
这段代码与之前的区别就是Line#27
,Line#28
,我们对累加操作进行了保护,声明这里是临界区,禁止多个goroutine并发访问。
无论我们执行多少次代码,发现结果始终都是1000。这在Go中是如何实现的呢?请继续往下看。
一探究竟
打开源代码,我们看到一段关于 Mutex fairness 的注释。
1 // Mutex fairness.
2 //
3 // Mutex can be in 2 modes of operations: normal and starvation.
4 // In normal mode waiters are queued in FIFO order, but a woken up waiter
5 // does not own the mutex and competes with new arriving goroutines over
6 // the ownership. New arriving goroutines have an advantage -- they are
7 // already running on CPU and there can be lots of them, so a woken up
8 // waiter has good chances of losing. In such case it is queued at front
9 // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
10 // it switches mutex to the starvation mode.
11 //
12 // In starvation mode ownership of the mutex is directly handed off from
13 // the unlocking goroutine to the waiter at the front of the queue.
14 // New arriving goroutines don't try to acquire the mutex even if it appears
15 // to be unlocked, and don't try to spin. Instead they queue themselves at
16 // the tail of the wait queue.
17 //
18 // If a waiter receives ownership of the mutex and sees that either
19 // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
20 // it switches mutex back to normal operation mode.
21 //
22 // Normal mode has considerably better performance as a goroutine can acquire
23 // a mutex several times in a row even if there are blocked waiters.
24 // Starvation mode is important to prevent pathological cases of tail latency.
大致意思是:
Mutex
是一把公平锁(Mutex fairness)
。Mutex
有两种模式:正常模式
和饥饿模式
正常模式
:在正常模式,waiters
是按照先进先出的顺序进行排队,但是一个被唤醒的waiter
不会直接占有锁,而是需要和其他新请求锁的goroutines一起竞争锁的所有权。新请求锁的goroutines有一个优势 --- 它们已经运行在CPU中并且可能数量不少,所以一个被唤醒的waiter有很大机会会竞争输了。在这种情况它将被排在等待队列的前面。如果waiter
超过1ms
没有成功获取锁,锁将切换为饥饿模式
。饥饿模式
:在饥饿模式,锁的所有权直接从一个刚解锁的goroutine手中直接传递到等待队列最前面的waiter
手中。新请求锁的goroutines不会尝试获取锁即使看起来是未锁的状态,也不会尝试自旋。取而代之的是,它们将排在等待队列的队尾。
- 如果
waiter
获取到锁的所有权时,发现自己是队列中最后一个waiter
或者自己等待时间小于1ms
,那么锁将切换回正常模式
。 正常模式
拥有非常好的性能表现,因为即使存在阻塞的waiter
,一个goroutine也能够多次获取锁。饥饿模式
对于预防极端的长尾时延(tail latency)非常重要。
PS:这里waiter
和waiters
表示等待的goroutines。
作者详细的描述了互斥锁的设计思路与运行过程,这对于我们理解代码至关重要。
在我们开始阅读代码之前,这里有几个const值非常重要,我们先介绍一下。
1const (
2 mutexLocked = 1 << iota // mutex is locked
3 mutexWoken
4 mutexStarving
5 mutexWaiterShift = iota
6 starvationThresholdNs = 1e6
7)
mutexLocked
值是1
,代表已锁状态。
mutexWoken
值是2
,代表唤醒状态。
mutexStarving
值是4,代表饥饿状态。
mutexWaiterShift
值是3,代表waiter计数器开始的位移量。
starvationThresholdNs
代表1ms
。
让我们继续,Mutex
的定义非常简单,但没有注释,我们并不知道state与sema分别是做什么用的。
1type Mutex struct {
2 state int32
3 sema uint32
4}
Lock
那我们就带着问题看看Lock
方法吧。
1func (m *Mutex) Lock() {
2 // Fast path: grab unlocked mutex.
3 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
4 if race.Enabled {
5 race.Acquire(unsafe.Pointer(m))
6 }
7 return
8 }
9 // Slow path (outlined so that the fast path can be inlined)
10 m.lockSlow()
11}
Line#3
是一个CompareAndSwapInt32
。这个方法我们之前聊过(请查看Go源码解析之atomic),将 m.state
与 0
比较,如果相等那么 mutexLocked
的值将赋值给 m.state
并返回 true。也就是说这里就可以初步判断锁的状态。 mutexLocked
是在代码的开头部分声明的常量。通过变量名与此我们可以推断出来state可以表示互斥锁的状态
。
Line#4-6
是开启race检测时的一些逻辑,我们暂时忽略,下次专门写一篇文章介绍。
Line#10
我们看到调用了自身的方法 lockSlow()
,让我们看看这个方法。
slowLock 这段代码稍微长一些,我们直接在代码里通过注释进行解析。
1func (m *Mutex) lockSlow() {
2 var waitStartTime int64 // 当前goroutine的等待时间
3 starving := false // 当前goroutine是否饥饿状态
4 awoke := false // 当前goroutine是否唤醒状态
5 iter := 0 // 当前goroutine自旋次数
6 old := m.state // copy锁的状态为历史状态
7
8 for {
9 // Don't spin in starvation mode, ownership is handed off to waiters
10 // so we won't be able to acquire the mutex anyway.
11 // 在饥饿模式不进行自旋,锁的所有权会自己移交给waiters。
12 // 所以无论如何我们都无法获取锁。
13
14 // 当锁是locked状态并且当前goroutine可以自旋时,开始自旋。
15 // 当锁是starving状态,就直接false,不自旋。
16 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
17 // Active spinning makes sense.
18 // Try to set mutexWoken flag to inform Unlock
19 // to not wake other blocked goroutines.
20 // 触发自旋是有意义的。
21 // 尝试设置woken标志来通知unlock,以便不唤起其他阻塞的goroutines。
22
23 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
24 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
25 // 如果当前goroutine是未唤醒状态,互斥锁也是未唤醒状态,并且互斥锁的waiter数量不等于0,
26 // 就比较锁的最新状态(m.state)和历史状态(old),如果未发生改变,将锁的状态更新为woken。
27 // 并且设置当前goroutine为awoke状态。
28 awoke = true
29 }
30
31 // 自旋
32 // https://github.com/golang/go/blob/go1.15.5/src/runtime/proc.go#L6055-L6057
33 // https://github.com/golang/go/blob/go1.15.5/src/runtime/asm_amd64.s#L574-L580
34 runtime_doSpin()
35
36 // 自旋次数递增
37 iter++
38
39 // copy锁的状态为历史状态,自旋期间其他goroutine可能修改了state,所以要更新。
40 old = m.state
41
42 // 继续尝试自旋
43 continue
44 }
45
46 new := old // copy锁的历史状态为new状态。
47
48 // Don't try to acquire starving mutex, new arriving goroutines must queue.
49 // 饥饿模式时不尝试获取锁,新来的goroutines必须排队。
50 // 如果锁的历史状态(old)不是starving状态,将锁的新状态(new)更新为locked状态。
51 if old&mutexStarving == 0 {
52 new |= mutexLocked
53 }
54
55 // 如果锁的历史状态(old)是locked状态或者是starving状态,将锁的waiter数量加1。
56 if old&(mutexLocked|mutexStarving) != 0 {
57 new += 1 << mutexWaiterShift
58 }
59
60 // The current goroutine switches mutex to starvation mode.
61 // But if the mutex is currently unlocked, don't do the switch.
62 // Unlock expects that starving mutex has waiters, which will not
63 // be true in this case.
64 // 当前goroutine切换锁为饥饿模式。
65 // 当锁是unlocked状态时,不切换为饥饿模式。
66 // unlock期望饥饿模式的锁有waiters,但是在本例中不会出现。
67
68 // 如果当前goroutine是starving状态且锁的历史状态(old)是locked状态,将锁的新状态(new)更新为starving状态。
69 if starving && old&mutexLocked != 0 {
70 new |= mutexStarving
71 }
72
73
74
75 // 如果当前goroutine是awoke状态
76 if awoke {
77 // The goroutine has been woken from sleep,
78 // so we need to reset the flag in either case.
79 // goroutine已经从sleep状态被唤醒。
80 // 我们需要重置flag状态。
81
82 if new&mutexWoken == 0 { // 如果锁的新状态(new)不是woken状态,抛异常,状态不一致。
83 throw("sync: inconsistent mutex state")
84 }
85
86 // &^ 是 bit clear (AND NOT)
87 // https://golang.org/ref/spec#Arithmetic_operators
88 // 取消锁的新状态(new)的woken状态标志。
89 new &^= mutexWoken
90 }
91
92
93
94 // 比较锁的最新状态(m.state)和历史状态(old),如果未发生改变,那么更新为new。
95 if atomic.CompareAndSwapInt32(&m.state, old, new) {
96 // 如果cas更新成功,并且锁的历史状态(old)即不是locked也不是starving,那么结束循环,通过CAS加锁成功。
97 if old&(mutexLocked|mutexStarving) == 0 {
98 break // locked the mutex with CAS
99 }
100
101 // If we were already waiting before, queue at the front of the queue.
102 // 如果之前已经等待,将排在队列前面。
103
104 // 当前goroutine是否等待过。
105 queueLifo := waitStartTime != 0
106
107 // 如果开始等待时间为0,更新为当前时间为开始等待时间。
108 if waitStartTime == 0 {
109 waitStartTime = runtime_nanotime()
110 }
111
112 // 通过信号量获取锁
113 // runtime实现代码:https://github.com/golang/go/blob/go1.15.5/src/runtime/sema.go#L69-L72
114 // runtime信号量获取:https://github.com/golang/go/blob/go1.15.5/src/runtime/sema.go#L98-L153
115 runtime_SemacquireMutex(&m.sema, queueLifo, 1)
116
117 // 如果当前goroutine是starving状态或者等待时间大于1ms,更新当前goroutine为starving状态。
118 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
119
120 // 更新锁的历史状态(old)
121 old = m.state
122
123 // 如果锁是饥饿状态,才执行里面的代码。
124 if old&mutexStarving != 0 {
125 // If this goroutine was woken and mutex is in starvation mode,
126 // ownership was handed off to us but mutex is in somewhat
127 // inconsistent state: mutexLocked is not set and we are still
128 // accounted as waiter. Fix that.
129 // 如果当前goroutine是唤醒状态并且锁在饥饿模式,
130 // 锁的所有权转移给当前goroutine,但是锁处于不一致的状态中:mutexLocked没有设置
131 // 并且我们将任然被认为是waiter。这个状态需要被修复。
132
133 // 如果锁的历史状态(old)是locked或者woken的,或者waiters的数量不为0,触发锁状态异常。
134 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
135 throw("sync: inconsistent mutex state")
136 }
137
138 // 当前goroutine获取锁,waiter数量-1
139 delta := int32(mutexLocked - 1<<mutexWaiterShift)
140
141 // 如果当前goroutine不是starving状态或者锁的历史状态(old)的waiter数量是1,delta减去3。
142 if !starving || old>>mutexWaiterShift == 1 {
143 // Exit starvation mode.
144 // Critical to do it here and consider wait time.
145 // Starvation mode is so inefficient, that two goroutines
146 // can go lock-step infinitely once they switch mutex
147 // to starvation mode.
148 // 退出饥饿模式
149 // 在这里这么做至关重要,还要考虑等待时间。
150 // 饥饿模式是非常低效率的,一旦两个goroutine将互斥锁切换为饥饿模式,它们便可以无限锁。
151
152 delta -= mutexStarving
153 }
154
155 // 更新锁的状态
156 atomic.AddInt32(&m.state, delta)
157 break
158 }
159
160 // 当前goroutine更新为awoke状态
161 awoke = true
162
163 // 当前goroutine自旋次数清零
164 iter = 0
165 } else {
166 // 更新锁的历史状态(old)
167 old = m.state
168 }
169 }
170
171 if race.Enabled {
172 race.Acquire(unsafe.Pointer(m))
173 }
174}
主流程并不是很容易理解,建议多阅读几遍。这里一定要弄清楚的是,这个方法会被多个goroutine并发调用,局部变量的状态表示当前goroutine的状态,m.state就是锁的状态。
我们继续深入了解一下主流程中的 runtime_doSpin 方法,可以看到doSpin就是循环执行 PAUSE
指令30次。PAUSE
指令简单来说就是提升自旋等待循环(spin-wait loop)的性能,还可以省电。
1//go:linkname sync_runtime_doSpin sync.runtime_doSpin
2//go:nosplit
3func sync_runtime_doSpin() {
4 procyield(active_spin_cnt) // active_spin_cnt = 30
5}
procyield 汇编代码如下:
1TEXT runtime·procyield(SB),NOSPLIT,$0-0
2 MOVL cycles+0(FP), AX
3again:
4 PAUSE
5 SUBL $1, AX
6 JNZ again
7 RET
我们再来看看主流程中的 runtime_SemacquireMutex
方法,这是一个信号量操作。关于信号量,以下是go给出的描述:
1// Semaphore implementation exposed to Go.
2// Intended use is provide a sleep and wakeup
3// primitive that can be used in the contended case
4// of other synchronization primitives.
5// Thus it targets the same goal as Linux's futex,
6// but it has much simpler semantics.
7//
8// That is, don't think of these as semaphores.
9// Think of them as a way to implement sleep and wakeup
10// such that every sleep is paired with a single wakeup,
11// even if, due to races, the wakeup happens before the sleep.
12//
13// See Mullender and Cox, ``Semaphores in Plan 9,''
14// https://swtch.com/semaphore.pdf
大意就是,Go的信号量与其他同步原语中的信号量不同,在Go中应该把信号量比作sleep与wakeup的机制。
1//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
2func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
3 semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
4}
1func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
2 gp := getg()
3 if gp != gp.m.curg {
4 throw("semacquire not on the G stack")
5 }
6
7 // Easy case.
8 if cansemacquire(addr) {
9 return
10 }
11
12 // Harder case:
13 // increment waiter count
14 // try cansemacquire one more time, return if succeeded
15 // enqueue itself as a waiter
16 // sleep
17 // (waiter descriptor is dequeued by signaler)
18 s := acquireSudog()
19 root := semroot(addr)
20 t0 := int64(0)
21 s.releasetime = 0
22 s.acquiretime = 0
23 s.ticket = 0
24 if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
25 t0 = cputicks()
26 s.releasetime = -1
27 }
28 if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
29 if t0 == 0 {
30 t0 = cputicks()
31 }
32 s.acquiretime = t0
33 }
34 for {
35 lockWithRank(&root.lock, lockRankRoot)
36 // Add ourselves to nwait to disable "easy case" in semrelease.
37 atomic.Xadd(&root.nwait, 1)
38 // Check cansemacquire to avoid missed wakeup.
39 if cansemacquire(addr) {
40 atomic.Xadd(&root.nwait, -1)
41 unlock(&root.lock)
42 break
43 }
44 // Any semrelease after the cansemacquire knows we're waiting
45 // (we set nwait above), so go to sleep.
46 root.queue(addr, s, lifo)
47 goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
48 if s.ticket != 0 || cansemacquire(addr) {
49 break
50 }
51 }
52 if s.releasetime > 0 {
53 blockevent(s.releasetime-t0, 3+skipframes)
54 }
55 releaseSudog(s)
56}
这里信号量获取操作简单来说就是把自己丢进等待队列,然后等待被唤起。
我们继续看看 Unlock
是怎么实现的。
Unlock
Unlock 的源代码相较于Lock就简单很多,首先看到 Fast path
,就是去除锁的标志位,看是否已解锁。
1// Unlock unlocks m.
2// It is a run-time error if m is not locked on entry to Unlock.
3//
4// A locked Mutex is not associated with a particular goroutine.
5// It is allowed for one goroutine to lock a Mutex and then
6// arrange for another goroutine to unlock it.
7func (m *Mutex) Unlock() {
8 if race.Enabled {
9 _ = m.state
10 race.Release(unsafe.Pointer(m))
11 }
12
13 // Fast path: drop lock bit.
14 // 如果waiter数量为0,三个标志位去除locked后也为0,那么可以解锁了。
15 new := atomic.AddInt32(&m.state, -mutexLocked)
16 if new != 0 {
17 // Outlined slow path to allow inlining the fast path.
18 // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
19 m.unlockSlow(new)
20 }
21}
我们继续看看 unlockSlow 的源代码。
1func (m *Mutex) unlockSlow(new int32) {
2 if (new+mutexLocked)&mutexLocked == 0 {
3 throw("sync: unlock of unlocked mutex")
4 }
5
6 if new&mutexStarving == 0 {
7 // 如果不是饥饿模式
8 old := new
9 for {
10 // If there are no waiters or a goroutine has already
11 // been woken or grabbed the lock, no need to wake anyone.
12 // In starvation mode ownership is directly handed off from unlocking
13 // goroutine to the next waiter. We are not part of this chain,
14 // since we did not observe mutexStarving when we unlocked the mutex above.
15 // So get off the way.
16 // 如果waiter数量为0,锁的三个标志位任一非0,直接返回
17 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
18 return
19 }
20
21 // Grab the right to wake someone.
22 // 尝试将锁更新为woken状态,如果成功了,就通过信号量去唤醒goroutine。
23 new = (old - 1<<mutexWaiterShift) | mutexWoken
24 if atomic.CompareAndSwapInt32(&m.state, old, new) {
25 runtime_Semrelease(&m.sema, false, 1)
26 return
27 }
28 old = m.state
29 }
30 } else {
31 // Starving mode: handoff mutex ownership to the next waiter, and yield
32 // our time slice so that the next waiter can start to run immediately.
33 // Note: mutexLocked is not set, the waiter will set it after wakeup.
34 // But mutex is still considered locked if mutexStarving is set,
35 // so new coming goroutines won't acquire it.
36 // 饥饿模式直接手把手交接锁的控制权
37 runtime_Semrelease(&m.sema, true, 1)
38 }
39}
到这里我们已经大概清楚 Mutex
加锁与解锁的过程。
在这个过程中,经过分析和推理,我们可以断定:
sema是信号量状态标志。
state被被切分为4块,每块分别有不同的作用,如下表:
31-3位 | 2位 | 1位 | 0位 |
---|---|---|---|
Waiter的数量 | Starving状态 | Woken状态 | Locked状态 |
总结
通过源码分析,我们了解到Mutex的内部实现非常的巧妙。Go的作者为了保证性能与公平思考的非常多,但是这并不是银弹,在实际开发中我们不能肆无忌惮的使用互斥锁,尤其是一些对性能要求较高的场景和业务。在设计程序时,我们应当使用恰当的锁(比如读写锁),并尽量降低锁的颗粒度避免频繁互斥,或者使用一些无锁的方案。