Go源码解析之mutex

分享到:

Overview


概要

今天我们来看看Go中的互斥锁 sync/mutex。本文基于go1.15.5 进行分析。

我们借用互斥锁在维基百科上的定义:互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。

在Go中我们无法直接操作线程,使用 go 关键字启动的是goroutine,但goroutine的背后还是操作系统的线程,所以在此我们讨论的是多个goroutine之间的互斥锁。

用法

互斥锁的使用非常简单,初始化一个mutex,它的默认状态是unlock的。

Lock 方法表示这是临界区的开始,后续代码在访问公共资源时是受控的。调用该方法时,如果互斥锁已经是加锁的状态,goroutine将一直阻塞,直到锁释放。

Unlock 方法表示这是临界区的结束,之前的代码在访问公共资源时是受控的,但之后的将不再受控。调用该方法时,如果互斥锁是未加锁的状态,将会产生一个runtime error。

举个反例

 1package main
 2
 3import (
 4	"fmt"
 5	"runtime"
 6	"sync"
 7	"time"
 8)
 9
10var (
11	number int
12	mutex  sync.Mutex
13)
14
15func main() {
16	runtime.GOMAXPROCS(10)
17
18	for i := 0; i < 1000; i++ {
19		go Add()
20	}
21
22	time.Sleep(time.Second)
23	fmt.Println(number)
24}
25
26func Add() {
27	number++
28}

源代码可以在 Playground 查看。

我们预先声明了两个变量: number 是全局变量,即公共资源;mutex 即互斥锁。

然后在main函数中的 runtime.GOMAXPROCS(10) 即Line#16,十分重要。它表示Go将启动10个线程来处理任务,这样才能模拟多线程的情况。如果你是单核单线程的CPU或者设置了runtime.GOMAXPROCS(1),本例子将不适用,因为单线程环境不存在并发访问。

之后我们启动1000个goroutine对全局变量 number 进行累加。

最后等待1s(1000次累加肯定不会超过1s,这里仅做demo,真实环境不要这么玩),并打印结果。

将该程序执行多次,我们会发现打印的结果并不总是1000,有时是979,有时是941等等结果。

这就是因为 number 是公共资源,多个goroutine在对其进行累加时可能是多线程并发进行的,累加时有些线程获取到的是旧值,累加完成之后又将旧值的计算结果赋值给了 number,导致部分并发计算的结果被覆盖了。

正确写法

 1package main
 2
 3import (
 4	"fmt"
 5	"runtime"
 6	"sync"
 7	"time"
 8)
 9
10var (
11	number int
12	mutex  sync.Mutex
13)
14
15func main() {
16	runtime.GOMAXPROCS(10)
17
18	for i := 0; i < 1000; i++ {
19		go Add()
20	}
21
22	time.Sleep(time.Second)
23	fmt.Println(number)
24}
25
26func Add() {
27	mutex.Lock()
28	defer mutex.Unlock()
29	number++
30}

源代码可以在 Playground 查看。

这段代码与之前的区别就是Line#27,Line#28,我们对累加操作进行了保护,声明这里是临界区,禁止多个goroutine并发访问。

无论我们执行多少次代码,发现结果始终都是1000。这在Go中是如何实现的呢?请继续往下看。

一探究竟

打开源代码,我们看到一段关于 Mutex fairness 的注释。

 1	// Mutex fairness.
 2	//
 3	// Mutex can be in 2 modes of operations: normal and starvation.
 4	// In normal mode waiters are queued in FIFO order, but a woken up waiter
 5	// does not own the mutex and competes with new arriving goroutines over
 6	// the ownership. New arriving goroutines have an advantage -- they are
 7	// already running on CPU and there can be lots of them, so a woken up
 8	// waiter has good chances of losing. In such case it is queued at front
 9	// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
10	// it switches mutex to the starvation mode.
11	//
12	// In starvation mode ownership of the mutex is directly handed off from
13	// the unlocking goroutine to the waiter at the front of the queue.
14	// New arriving goroutines don't try to acquire the mutex even if it appears
15	// to be unlocked, and don't try to spin. Instead they queue themselves at
16	// the tail of the wait queue.
17	//
18	// If a waiter receives ownership of the mutex and sees that either
19	// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
20	// it switches mutex back to normal operation mode.
21	//
22	// Normal mode has considerably better performance as a goroutine can acquire
23	// a mutex several times in a row even if there are blocked waiters.
24	// Starvation mode is important to prevent pathological cases of tail latency.

大致意思是:

  1. Mutex是一把公平锁(Mutex fairness)
  2. Mutex有两种模式:正常模式饥饿模式
    1. 正常模式:在正常模式,waiters是按照先进先出的顺序进行排队,但是一个被唤醒的waiter不会直接占有锁,而是需要和其他新请求锁的goroutines一起竞争锁的所有权。新请求锁的goroutines有一个优势 --- 它们已经运行在CPU中并且可能数量不少,所以一个被唤醒的waiter有很大机会会竞争输了。在这种情况它将被排在等待队列的前面。如果waiter超过1ms没有成功获取锁,锁将切换为饥饿模式
    2. 饥饿模式:在饥饿模式,锁的所有权直接从一个刚解锁的goroutine手中直接传递到等待队列最前面的waiter手中。新请求锁的goroutines不会尝试获取锁即使看起来是未锁的状态,也不会尝试自旋。取而代之的是,它们将排在等待队列的队尾。
  3. 如果waiter获取到锁的所有权时,发现自己是队列中最后一个waiter或者自己等待时间小于1ms,那么锁将切换回正常模式
  4. 正常模式拥有非常好的性能表现,因为即使存在阻塞的waiter,一个goroutine也能够多次获取锁。
  5. 饥饿模式对于预防极端的长尾时延(tail latency)非常重要。

PS:这里waiterwaiters表示等待的goroutines。

作者详细的描述了互斥锁的设计思路与运行过程,这对于我们理解代码至关重要。

在我们开始阅读代码之前,这里有几个const值非常重要,我们先介绍一下。

1const (
2	mutexLocked = 1 << iota // mutex is locked
3	mutexWoken
4	mutexStarving
5	mutexWaiterShift = iota
6	starvationThresholdNs = 1e6
7)

mutexLocked 值是1,代表已锁状态。

mutexWoken 值是2,代表唤醒状态。

mutexStarving 值是4,代表饥饿状态。

mutexWaiterShift 值是3,代表waiter计数器开始的位移量。

starvationThresholdNs 代表1ms

让我们继续,Mutex的定义非常简单,但没有注释,我们并不知道state与sema分别是做什么用的。

1type Mutex struct {
2	state int32
3	sema  uint32
4}

Lock

那我们就带着问题看看Lock方法吧。

 1func (m *Mutex) Lock() {
 2	// Fast path: grab unlocked mutex.
 3	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
 4		if race.Enabled {
 5			race.Acquire(unsafe.Pointer(m))
 6		}
 7		return
 8	}
 9	// Slow path (outlined so that the fast path can be inlined)
10	m.lockSlow()
11}

Line#3 是一个CompareAndSwapInt32 。这个方法我们之前聊过(请查看Go源码解析之atomic),将 m.state0 比较,如果相等那么 mutexLocked 的值将赋值给 m.state 并返回 true。也就是说这里就可以初步判断锁的状态。 mutexLocked 是在代码的开头部分声明的常量。通过变量名与此我们可以推断出来state可以表示互斥锁的状态

Line#4-6 是开启race检测时的一些逻辑,我们暂时忽略,下次专门写一篇文章介绍。

Line#10 我们看到调用了自身的方法 lockSlow() ,让我们看看这个方法。

slowLock 这段代码稍微长一些,我们直接在代码里通过注释进行解析。

  1func (m *Mutex) lockSlow() {
  2	var waitStartTime int64 // 当前goroutine的等待时间
  3	starving := false       // 当前goroutine是否饥饿状态
  4	awoke := false          // 当前goroutine是否唤醒状态
  5	iter := 0               // 当前goroutine自旋次数
  6	old := m.state          // copy锁的状态为历史状态
  7
  8	for {
  9		// Don't spin in starvation mode, ownership is handed off to waiters
 10		// so we won't be able to acquire the mutex anyway.
 11		// 在饥饿模式不进行自旋,锁的所有权会自己移交给waiters。
 12		// 所以无论如何我们都无法获取锁。
 13
 14		// 当锁是locked状态并且当前goroutine可以自旋时,开始自旋。
 15		// 当锁是starving状态,就直接false,不自旋。
 16		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
 17			// Active spinning makes sense.
 18			// Try to set mutexWoken flag to inform Unlock
 19			// to not wake other blocked goroutines.
 20			// 触发自旋是有意义的。
 21			// 尝试设置woken标志来通知unlock,以便不唤起其他阻塞的goroutines。
 22
 23			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
 24				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
 25				// 如果当前goroutine是未唤醒状态,互斥锁也是未唤醒状态,并且互斥锁的waiter数量不等于0,
 26				// 就比较锁的最新状态(m.state)和历史状态(old),如果未发生改变,将锁的状态更新为woken。
 27				// 并且设置当前goroutine为awoke状态。
 28				awoke = true
 29			}
 30
 31			// 自旋
 32           // https://github.com/golang/go/blob/go1.15.5/src/runtime/proc.go#L6055-L6057
 33            // https://github.com/golang/go/blob/go1.15.5/src/runtime/asm_amd64.s#L574-L580
 34			runtime_doSpin()
 35
 36			// 自旋次数递增
 37			iter++
 38
 39			// copy锁的状态为历史状态,自旋期间其他goroutine可能修改了state,所以要更新。
 40			old = m.state
 41
 42			// 继续尝试自旋
 43			continue
 44		}
 45
 46		new := old // copy锁的历史状态为new状态。
 47
 48		// Don't try to acquire starving mutex, new arriving goroutines must queue.
 49		// 饥饿模式时不尝试获取锁,新来的goroutines必须排队。
 50		// 如果锁的历史状态(old)不是starving状态,将锁的新状态(new)更新为locked状态。
 51		if old&mutexStarving == 0 {
 52			new |= mutexLocked
 53		}
 54
 55		// 如果锁的历史状态(old)是locked状态或者是starving状态,将锁的waiter数量加1。
 56		if old&(mutexLocked|mutexStarving) != 0 {
 57			new += 1 << mutexWaiterShift
 58		}
 59
 60		// The current goroutine switches mutex to starvation mode.
 61		// But if the mutex is currently unlocked, don't do the switch.
 62		// Unlock expects that starving mutex has waiters, which will not
 63		// be true in this case.
 64		// 当前goroutine切换锁为饥饿模式。
 65		// 当锁是unlocked状态时,不切换为饥饿模式。
 66		// unlock期望饥饿模式的锁有waiters,但是在本例中不会出现。
 67
 68		// 如果当前goroutine是starving状态且锁的历史状态(old)是locked状态,将锁的新状态(new)更新为starving状态。
 69		if starving && old&mutexLocked != 0 {
 70			new |= mutexStarving
 71		}
 72
 73
 74
 75		// 如果当前goroutine是awoke状态
 76		if awoke {
 77			// The goroutine has been woken from sleep,
 78			// so we need to reset the flag in either case.
 79			// goroutine已经从sleep状态被唤醒。
 80			// 我们需要重置flag状态。
 81
 82			if new&mutexWoken == 0 { // 如果锁的新状态(new)不是woken状态,抛异常,状态不一致。
 83				throw("sync: inconsistent mutex state")
 84			}
 85
 86			// &^ 是 bit clear (AND NOT)
 87			// https://golang.org/ref/spec#Arithmetic_operators
 88			// 取消锁的新状态(new)的woken状态标志。
 89			new &^= mutexWoken
 90		}
 91
 92
 93
 94		// 比较锁的最新状态(m.state)和历史状态(old),如果未发生改变,那么更新为new。
 95		if atomic.CompareAndSwapInt32(&m.state, old, new) {
 96			// 如果cas更新成功,并且锁的历史状态(old)即不是locked也不是starving,那么结束循环,通过CAS加锁成功。
 97			if old&(mutexLocked|mutexStarving) == 0 {
 98				break // locked the mutex with CAS
 99			}
100
101			// If we were already waiting before, queue at the front of the queue.
102			// 如果之前已经等待,将排在队列前面。
103
104			// 当前goroutine是否等待过。
105			queueLifo := waitStartTime != 0
106
107			// 如果开始等待时间为0,更新为当前时间为开始等待时间。
108			if waitStartTime == 0 {
109				waitStartTime = runtime_nanotime()
110			}
111
112			// 通过信号量获取锁
113			// runtime实现代码:https://github.com/golang/go/blob/go1.15.5/src/runtime/sema.go#L69-L72
114			// runtime信号量获取:https://github.com/golang/go/blob/go1.15.5/src/runtime/sema.go#L98-L153
115			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
116
117			// 如果当前goroutine是starving状态或者等待时间大于1ms,更新当前goroutine为starving状态。
118			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
119
120			// 更新锁的历史状态(old)
121			old = m.state
122
123			// 如果锁是饥饿状态,才执行里面的代码。
124			if old&mutexStarving != 0 {
125				// If this goroutine was woken and mutex is in starvation mode,
126				// ownership was handed off to us but mutex is in somewhat
127				// inconsistent state: mutexLocked is not set and we are still
128				// accounted as waiter. Fix that.
129				// 如果当前goroutine是唤醒状态并且锁在饥饿模式,
130				// 锁的所有权转移给当前goroutine,但是锁处于不一致的状态中:mutexLocked没有设置
131				// 并且我们将任然被认为是waiter。这个状态需要被修复。
132
133				// 如果锁的历史状态(old)是locked或者woken的,或者waiters的数量不为0,触发锁状态异常。
134				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
135					throw("sync: inconsistent mutex state")
136				}
137
138				// 当前goroutine获取锁,waiter数量-1
139				delta := int32(mutexLocked - 1<<mutexWaiterShift)
140
141				// 如果当前goroutine不是starving状态或者锁的历史状态(old)的waiter数量是1,delta减去3。
142				if !starving || old>>mutexWaiterShift == 1 {
143					// Exit starvation mode.
144					// Critical to do it here and consider wait time.
145					// Starvation mode is so inefficient, that two goroutines
146					// can go lock-step infinitely once they switch mutex
147					// to starvation mode.
148					// 退出饥饿模式
149					// 在这里这么做至关重要,还要考虑等待时间。
150					// 饥饿模式是非常低效率的,一旦两个goroutine将互斥锁切换为饥饿模式,它们便可以无限锁。
151
152					delta -= mutexStarving
153				}
154
155				// 更新锁的状态
156				atomic.AddInt32(&m.state, delta)
157				break
158			}
159
160			// 当前goroutine更新为awoke状态
161			awoke = true
162
163			// 当前goroutine自旋次数清零
164			iter = 0
165		} else {
166			// 更新锁的历史状态(old)
167			old = m.state
168		}
169	}
170
171	if race.Enabled {
172		race.Acquire(unsafe.Pointer(m))
173	}
174}

主流程并不是很容易理解,建议多阅读几遍。这里一定要弄清楚的是,这个方法会被多个goroutine并发调用,局部变量的状态表示当前goroutine的状态,m.state就是锁的状态。

我们继续深入了解一下主流程中的 runtime_doSpin 方法,可以看到doSpin就是循环执行 PAUSE 指令30次。PAUSE 指令简单来说就是提升自旋等待循环(spin-wait loop)的性能,还可以省电。

1//go:linkname sync_runtime_doSpin sync.runtime_doSpin
2//go:nosplit
3func sync_runtime_doSpin() {
4	procyield(active_spin_cnt)  // 	active_spin_cnt = 30
5}

procyield 汇编代码如下:

1TEXT runtime·procyield(SB),NOSPLIT,$0-0
2	MOVL	cycles+0(FP), AX
3again:
4	PAUSE
5	SUBL	$1, AX
6	JNZ	again
7	RET

我们再来看看主流程中的 runtime_SemacquireMutex 方法,这是一个信号量操作。关于信号量,以下是go给出的描述:

 1// Semaphore implementation exposed to Go.
 2// Intended use is provide a sleep and wakeup
 3// primitive that can be used in the contended case
 4// of other synchronization primitives.
 5// Thus it targets the same goal as Linux's futex,
 6// but it has much simpler semantics.
 7//
 8// That is, don't think of these as semaphores.
 9// Think of them as a way to implement sleep and wakeup
10// such that every sleep is paired with a single wakeup,
11// even if, due to races, the wakeup happens before the sleep.
12//
13// See Mullender and Cox, ``Semaphores in Plan 9,''
14// https://swtch.com/semaphore.pdf

大意就是,Go的信号量与其他同步原语中的信号量不同,在Go中应该把信号量比作sleep与wakeup的机制。

runtime_SemacquireMutex

1//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
2func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
3	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
4}

semacquire1

 1func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
 2	gp := getg()
 3	if gp != gp.m.curg {
 4		throw("semacquire not on the G stack")
 5	}
 6
 7	// Easy case.
 8	if cansemacquire(addr) {
 9		return
10	}
11
12	// Harder case:
13	//	increment waiter count
14	//	try cansemacquire one more time, return if succeeded
15	//	enqueue itself as a waiter
16	//	sleep
17	//	(waiter descriptor is dequeued by signaler)
18	s := acquireSudog()
19	root := semroot(addr)
20	t0 := int64(0)
21	s.releasetime = 0
22	s.acquiretime = 0
23	s.ticket = 0
24	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
25		t0 = cputicks()
26		s.releasetime = -1
27	}
28	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
29		if t0 == 0 {
30			t0 = cputicks()
31		}
32		s.acquiretime = t0
33	}
34	for {
35		lockWithRank(&root.lock, lockRankRoot)
36		// Add ourselves to nwait to disable "easy case" in semrelease.
37		atomic.Xadd(&root.nwait, 1)
38		// Check cansemacquire to avoid missed wakeup.
39		if cansemacquire(addr) {
40			atomic.Xadd(&root.nwait, -1)
41			unlock(&root.lock)
42			break
43		}
44		// Any semrelease after the cansemacquire knows we're waiting
45		// (we set nwait above), so go to sleep.
46		root.queue(addr, s, lifo)
47		goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
48		if s.ticket != 0 || cansemacquire(addr) {
49			break
50		}
51	}
52	if s.releasetime > 0 {
53		blockevent(s.releasetime-t0, 3+skipframes)
54	}
55	releaseSudog(s)
56}

这里信号量获取操作简单来说就是把自己丢进等待队列,然后等待被唤起。

我们继续看看 Unlock 是怎么实现的。

Unlock

Unlock 的源代码相较于Lock就简单很多,首先看到 Fast path,就是去除锁的标志位,看是否已解锁。

 1// Unlock unlocks m.
 2// It is a run-time error if m is not locked on entry to Unlock.
 3//
 4// A locked Mutex is not associated with a particular goroutine.
 5// It is allowed for one goroutine to lock a Mutex and then
 6// arrange for another goroutine to unlock it.
 7func (m *Mutex) Unlock() {
 8	if race.Enabled {
 9		_ = m.state
10		race.Release(unsafe.Pointer(m))
11	}
12
13	// Fast path: drop lock bit.
14    // 如果waiter数量为0,三个标志位去除locked后也为0,那么可以解锁了。
15	new := atomic.AddInt32(&m.state, -mutexLocked)
16	if new != 0 {
17		// Outlined slow path to allow inlining the fast path.
18		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
19		m.unlockSlow(new)
20	}
21}

我们继续看看 unlockSlow 的源代码。

 1func (m *Mutex) unlockSlow(new int32) {
 2	if (new+mutexLocked)&mutexLocked == 0 {
 3		throw("sync: unlock of unlocked mutex")
 4	}
 5
 6	if new&mutexStarving == 0 {
 7		// 如果不是饥饿模式
 8		old := new
 9		for {
10			// If there are no waiters or a goroutine has already
11			// been woken or grabbed the lock, no need to wake anyone.
12			// In starvation mode ownership is directly handed off from unlocking
13			// goroutine to the next waiter. We are not part of this chain,
14			// since we did not observe mutexStarving when we unlocked the mutex above.
15			// So get off the way.
16			// 如果waiter数量为0,锁的三个标志位任一非0,直接返回
17			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
18				return
19			}
20
21			// Grab the right to wake someone.
22			// 尝试将锁更新为woken状态,如果成功了,就通过信号量去唤醒goroutine。
23			new = (old - 1<<mutexWaiterShift) | mutexWoken
24			if atomic.CompareAndSwapInt32(&m.state, old, new) {
25				runtime_Semrelease(&m.sema, false, 1)
26				return
27			}
28			old = m.state
29		}
30	} else {
31		// Starving mode: handoff mutex ownership to the next waiter, and yield
32		// our time slice so that the next waiter can start to run immediately.
33		// Note: mutexLocked is not set, the waiter will set it after wakeup.
34		// But mutex is still considered locked if mutexStarving is set,
35		// so new coming goroutines won't acquire it.
36		// 饥饿模式直接手把手交接锁的控制权
37		runtime_Semrelease(&m.sema, true, 1)
38	}
39}

到这里我们已经大概清楚 Mutex 加锁与解锁的过程。

在这个过程中,经过分析和推理,我们可以断定:

sema是信号量状态标志。

state被被切分为4块,每块分别有不同的作用,如下表:

31-3位2位1位0位
Waiter的数量Starving状态Woken状态Locked状态

总结

通过源码分析,我们了解到Mutex的内部实现非常的巧妙。Go的作者为了保证性能与公平思考的非常多,但是这并不是银弹,在实际开发中我们不能肆无忌惮的使用互斥锁,尤其是一些对性能要求较高的场景和业务。在设计程序时,我们应当使用恰当的锁(比如读写锁),并尽量降低锁的颗粒度避免频繁互斥,或者使用一些无锁的方案。

参考文档

  1. 一份详细注释的go Mutex源码
  2. 源码剖析 golang 中 sync.Mutex
comments powered by Disqus