Go 并发编程-3.Mutex使用陷阱

Lock/Unlock 不匹配

比如以下示例中:

1
2
3
4
5
6

func foo() {
var mu sync.Mutex
defer mu.Unlock()
fmt.Println("hello world!")
}

运行的时候就会发生 panic

Copy 已使用的 Mutex

Package sync 的同步原语在使用后是不能复制的,原因在于,Mutex 是一个有状态的对象,它的 state 字段记录这个锁的状态。如果你要复制一个已经加锁的 Mutex 给一个新的变量,那么新的刚初始化的变量居然被加锁了,这显然不符合你的期望,因为你期望的是一个零值的 Mutex。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

type Counter struct {
sync.Mutex
Count int
}


func main() {
var c Counter
c.Lock()
defer c.Unlock()
c.Count++
foo(c) // 复制锁
}

// 这里Counter的参数是通过复制的方式传入的
func foo(c Counter) {
c.Lock()
defer c.Unlock()
fmt.Println("in foo")
}

运行代码就会出现 fatal error: all goroutines are asleep - deadlock!

或者使用 vet 进行检测。vet 只能检测 Mutex 的 copy,不能检测重入等情况。

重入

Mutex 不是可重入的锁。

当一个线程获取锁时,如果没有其它线程拥有这个锁,那么,这个线程就成功获取到这个锁。之后,如果其它线程再请求这个锁,就会处于阻塞等待的状态。但是,如果拥有这把锁的线程再请求这把锁的话,不会阻塞,而是成功返回,所以叫可重入锁(有时候也叫做递归锁)。只要你拥有这把锁,你可以可着劲儿地调用,比如通过递归实现一些算法,调用者不会阻塞或者死锁。

目前 Mutex 的实现中没有记录哪个 goroutine 拥有这把锁。理论上,任何 goroutine 都可以随意地 Unlock 这把锁,所以没办法计算重入条件。所以,一旦误用 Mutex 的重入,就会导致报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

func foo(l sync.Locker) {
fmt.Println("in foo")
l.Lock()
bar(l)
l.Unlock()
}


func bar(l sync.Locker) {
l.Lock()
fmt.Println("in bar")
l.Unlock()
}


func main() {
l := &sync.Mutex{}
foo(l)
}

运行上述代码也会出现死锁的情况。

死锁

以下是一个死锁的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

package main


import (
"fmt"
"sync"
"time"
)


func main() {
// 派出所证明
var psCertificate sync.Mutex
// 物业证明
var propertyCertificate sync.Mutex


var wg sync.WaitGroup
wg.Add(2) // 需要派出所和物业都处理


// 派出所处理goroutine
go func() {
defer wg.Done() // 派出所处理完成


psCertificate.Lock()
defer psCertificate.Unlock()


// 检查材料
time.Sleep(5 * time.Second)
// 请求物业的证明
propertyCertificate.Lock()
propertyCertificate.Unlock()
}()


// 物业处理goroutine
go func() {
defer wg.Done() // 物业处理完成


propertyCertificate.Lock()
defer propertyCertificate.Unlock()


// 检查材料
time.Sleep(5 * time.Second)
// 请求派出所的证明
psCertificate.Lock()
psCertificate.Unlock()
}()


wg.Wait()
fmt.Println("成功完成")
}

物业处理需要派出所的证明,派出所处理需要物业的证明。

如何实现可重入锁

goroutine id 实现

可重入锁关键就是,实现的锁要能记住当前是哪个 goroutine 持有这个锁。可以采用 goroutine id 和 token 两种方式实现。

获取 goroutine id 有两种方式,简单的方式,就是通过 runtime.Stack 方法获取栈帧信息,栈帧信息里包含 goroutine id。

1
2
3
4
5
6
7
8
9
10
11
func GoID() int {
var buf [64]byte
n := runtime.Stack(buf[:], false)
// 得到id字符串
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
id, err := strconv.Atoi(idField)
if err != nil {
panic(fmt.Sprintf("cannot get goroutine id: %v", err))
}
return id
}

或者有现成的解决方案:https://github.com/petermattis/goid

1
2
3
4
5
6
7
8
9
10
package main

import (
"fmt"
"github.com/petermattis/goid"
)

func main() {
fmt.Println("Goroutine ID:", goid.Get())
}

可重入锁的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

// RecursiveMutex 包装一个Mutex,实现可重入
type RecursiveMutex struct {
sync.Mutex
owner int64 // 当前持有锁的goroutine id
recursion int32 // 这个goroutine 重入的次数
}

func (m *RecursiveMutex) Lock() {
gid := goid.Get()
// 如果当前持有锁的goroutine就是这次调用的goroutine,说明是重入
if atomic.LoadInt64(&m.owner) == gid {
m.recursion++
return
}
m.Mutex.Lock()
// 获得锁的goroutine第一次调用,记录下它的goroutine id,调用次数加1
atomic.StoreInt64(&m.owner, gid)
m.recursion = 1
}

func (m *RecursiveMutex) Unlock() {
gid := goid.Get()
// 非持有锁的goroutine尝试释放锁,错误的使用
if atomic.LoadInt64(&m.owner) != gid {
panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
}
// 调用次数减1
m.recursion--
if m.recursion != 0 { // 如果这个goroutine还没有完全释放,则直接返回
return
}
// 此goroutine最后一次调用,需要释放锁
atomic.StoreInt64(&m.owner, -1)
m.Mutex.Unlock()
}

owner 字段,记录当前锁的拥有者 goroutine 的 id;recursion 是辅助字段,用于记录重入的次数。拥有者可以多次调用 Lock,但是也必须调用相同次数的 Unlock,这样才能把锁释放。

Token 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

// Token方式的递归锁
type TokenRecursiveMutex struct {
sync.Mutex
token int64
recursion int32
}

// 请求锁,需要传入token
func (m *TokenRecursiveMutex) Lock(token int64) {
if atomic.LoadInt64(&m.token) == token { //如果传入的token和持有锁的token一致,说明是递归调用
m.recursion++
return
}
m.Mutex.Lock() // 传入的token不一致,说明不是递归调用
// 抢到锁之后记录这个token
atomic.StoreInt64(&m.token, token)
m.recursion = 1
}

// 释放锁
func (m *TokenRecursiveMutex) Unlock(token int64) {
if atomic.LoadInt64(&m.token) != token { // 释放其它token持有的锁
panic(fmt.Sprintf("wrong the owner(%d): %d!", m.token, token))
}
m.recursion-- // 当前持有这个锁的token释放锁
if m.recursion != 0 { // 还没有回退到最初的递归调用
return
}
atomic.StoreInt64(&m.token, 0) // 没有递归调用了,释放锁
m.Mutex.Unlock()
}

调用者自己提供一个 token,获取锁的时候把这个 token 传入,释放锁的时候也需要把这个 token 传入

调用示例

main() 函数中,我们创建了 10 个 goroutine 并启动,每个 goroutine 都执行了两次 Lock() 方法和两次 Unlock() 方法。由于可重入锁的存在,即使是同一个 goroutine 连续多次调用了加锁操作,也不会造成死锁问题。最后,我们通过 Wait() 方法等待所有 goroutine 执行完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
var wg sync.WaitGroup
var lock RecursiveMutex

for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()

lock.Lock()
fmt.Println("Locked")

lock.Lock() // 这里不会阻塞
fmt.Println("Nested locked")

lock.Unlock()
fmt.Println("Nested unlocked")

lock.Unlock()
fmt.Println("Unlocked")
}()
}

wg.Wait()
}