逐步学习Go-sync.Mutex(详解与实战)

内容目录

概述

Go中提供了互斥锁:sync.Mutex。sync.Mutex提供了以下方法:


type Mutex
// 加锁。如果已经有goroutine持有了锁,那么就阻塞等待直到持有锁
func (m *Mutex) Lock()

// 尝试加锁。如果加锁成功就返回true,否则返回失败
func (m *Mutex) TryLock() bool

// 解锁。Lock和TryLock都使用Unlock来解锁。
// 如果Mutex没有调用Lock直接调用Unlock会panic。我们有UT来测试这个场景。
func (m *Mutex) Unlock()

使用锁的最佳实践:加锁之后立即调用解锁,示例如下:


mux := sync.Mutex{}

mux.Lock()
defer mux.Unlock()
//接下来的所业务逻辑

sync.Mutex在Lock之后不能进行copy否则造成各种问题。

公平性

sync.Mutex的公平性在实现源码里面有详细的说明。Mutex共有两种操作模式:

  1. 普通
  2. 饥饿

普通操作模式

在普通操作模式下,goroutine在没有获取到锁时会加入等待队列,队列是先进先出(FIFO order),但是如果其他goroutine释放了锁,在队列中等待锁的gorouine会和新到达的goroutine竞争锁的拥有权。一般来说新到达的goroutine会获胜因为新到达的goroutine正在CPU上运行,所以等待队列中的goroutine很可能会竞争失败。但是在竞争失败后会将唤醒的goroutine放到队列前面。如果等待的goroutine尝试获取锁的时间超过了1毫秒,那么Mutex切换到饥饿模式。

总结: 普通操作模式下,新的goutines可能会插队(抢占),这样提高了新任务的处理效率,但是可能导致队列中的goroutine一直等不到锁。为了防止等待的goroutine一直等不到锁而饿死,Mutex会被切换到饥饿操作模式。

饥饿操作模式

饥饿操作模式下,新的goroutines达到后不会抢占锁而是排队等待,排在队尾。

其他goroutines释放了锁以后队头的goroutines会获取锁。在等待队列中的最后一个goroutine获取到锁以后会将Mutex的操作模式切换到普通操作模式。

总结:在饥饿模式下,解锁的goroutine会直接将锁的所有权交给等待队列中的第一个goroutine,从而保证等待者能够及时获取到锁,防止饥饿现象。

对比

对比项 普通模式 饥饿模式
锁的所有权转移 等待者和新到来的goroutines向互斥锁竞争 解锁的goroutine直接将所有权交给队列中的第一个等待者
性能 高,一个goroutine可以连续多次获得互斥锁,即使有等待者被阻塞 稍低,因为每次解锁后,都会直接将所有权传递给等待队列中的下一个goroutine
公平性 较差,新到来的goroutine更有可能获得锁 高,等待时间较长的goroutine会被优先考虑
防止饥饿现象 无明显机制 有利于阻止饥饿现象,因为锁的所有权直接从当前的持有者转交给等待队列中的下一个goroutine
转换触发机制 如果一个等待者尝试获取锁超过1ms都失败了,就会切换到饥饿模式 如果一个goroutine得到锁后,看到它是队列中的最后一个等待者,或者它等待的时间小于1ms,就会切换回普通模式

普通模式和饥饿模式是性能和公平性的一个权衡。

sync.Mutex也有锁升级

熟悉Java的小伙伴们可能知道Java中synchronized会锁升级:无锁->偏向锁->轻量级锁->重量级锁。
ReentrantLock默认就是非公平锁:新到来的线程也会抢占一下锁,不行再排队。

默认情况下,Go语言的 sync.Mutex 是处于普通模式(类似于轻量级锁),其中主要使用的是自旋等待的方式(我个人测试是自旋4次),并且当锁被释放时,新到来的goroutine和等待队列中的goroutine并没有明确的优先级,任何一个goroutine都有可能获取到锁。

当一个goroutine在等待锁超过一定时间(默认为1ms)后,会将互斥锁设置为饥饿模式(类似于重量级锁)。在这个模式下,对锁的竞争会变得更加有序,锁会直接从当前的持有者传递给等待队列中的下一个goroutine,这就避免了新到来的goroutine可能"插队"成功的情况,降低了等待队列中的goroutine的饥饿可能。此模式下锁的获取由原来的可能的"插队"成功,变成了公平的FIFO顺序,意思是系统会保证等待时间最长的goroutine能优先获取到锁。饥饿模式虽然更公平但是会带来更多的上下文切换开销。

我们可以以锁升级的方式来理解普通模式到饥饿模式再到普通模式切换过程。

使用测试

其实测试也没太多好测的,因为使用场景比较简单。测试用例覆盖了以下场景:

  1. 测试互斥锁在未锁定状态时能否成功锁定。
  2. 测试当一个协程已经锁定互斥锁时,其他协程尝试锁定是否会被阻塞。
  3. 测试在没有其他互斥锁的情况下,尝试使用 TryLock 方法是否能成功锁定互斥锁。
  4. 测试当已有协程锁定互斥锁时,尝试使用 TryLock 方法是否不能成功锁定互斥锁。
  5. 测试在多个协程并发情况下,使用互斥锁来保护自增操作是否线程安全。
  6. 测试在未锁定互斥锁的情况下进行解锁操作是否会报错。
  7. 测试当一协程长时间持有锁时,其他协程尝试获取互斥锁是否会发生饥饿现象。

测试代码


import (
    "runtime"
    "sync"
    "testing"
    "time"

    "github.com/stretchr/testify/assert"
)

// TestMutex_ShouldLock_WhenNotLockedBefore 测试互斥锁在未锁定状态时能否成功锁定。
// 参数 t 用于测试上下文,提供测试控制和日志记录功能。
func TestMutex_ShouldLock_WhenNotLockedBefore(t *testing.T) {
    // 创建一个互斥锁
    mux := sync.Mutex{}
    // 尝试锁定互斥锁
    mux.Lock()
    // 标记锁定是否成功
    isSuccess := true
    // 确保在函数退出时解锁,避免死锁
    defer mux.Unlock()

    // 验证锁定是否成功
    assert.True(t, isSuccess)
}

// TestMutex_ShouldBlock_WhenUsingLockAndOneRoutineHasLocked 测试当一个协程已经锁定互斥锁时,
// 其他协程尝试锁定是否会被阻塞。
// 参数 t 用于测试上下文,提供测试控制和日志记录功能。
func TestMutex_ShouldBlock_WhenUsingLockAndOneRoutineHasLocked(t *testing.T) {
    // 创建一个互斥锁
    mux := sync.Mutex{}
    // 创建一个等待组,用于同步协程
    wg := sync.WaitGroup{}
    // 添加一个计数,表示要等待的一个协程
    wg.Add(1)

    // 启动一个协程去锁定互斥锁
    go func() {
        // 表示协程启动完成
        wg.Done()
        // 尝试锁定互斥锁
        mux.Lock()
        // 确保在函数退出时解锁
        defer mux.Unlock()
        // 持锁一段时间,模拟锁定状态
        time.Sleep(5 * time.Second)
    }()

    // 等待协程启动完成
    wg.Wait()
    println("go routine started")
    // 主协程尝试锁定互斥锁,应被阻塞
    mux.Lock()
    // 标记锁定是否成功
    isSuccess := true
    // 确保解锁
    defer mux.Unlock()

    // 验证主协程是否成功锁定
    assert.True(t, isSuccess)
}

// TestMutex_ShouldLocked_WhenTryLockAndNoOtherLockers 测试在没有其他锁定的情况下,
// 尝试使用 TryLock 方法是否能成功锁定互斥锁。
// 参数 t 用于测试上下文,提供测试控制和日志记录功能。
func TestMutex_ShouldLocked_WhenTryLockAndNoOtherLockers(t *testing.T) {
    // 创建一个互斥锁
    mux := sync.Mutex{}
    // 尝试立即锁定互斥锁
    isSuccess := mux.TryLock()
    // 确保解锁
    defer mux.Unlock()

    // 验证是否成功锁定
    assert.True(t, isSuccess)
}

// TestMutex_ShouldNotLocked_WhenTryLockAndOtherLockers 测试当已有协程锁定互斥锁时,
// 尝试使用 TryLock 方法是否不能成功锁定互斥锁。
// 参数 t 用于测试上下文,提供测试控制和日志记录功能。
func TestMutex_ShouldNotLocked_WhenTryLockAndOtherLockers(t *testing.T) {
    // 创建一个互斥锁
    mux := sync.Mutex{}
    // 创建一个等待组,用于同步协程
    wg := sync.WaitGroup{}
    // 添加一个计数,表示要等待的一个协程
    wg.Add(1)
    // 启动一个协程去锁定互斥锁
    go func() {
        // 表示协程启动完成
        //这里先调用Done是为了不继续阻塞wg.Wait()让wg.Wait()之后的mutex.Lock可以继续执行
        wg.Done()
        // 尝试锁定互斥锁
        mux.Lock()
        // 确保在函数退出时解锁
        defer mux.Unlock()
        // 持锁一段时间,模拟锁定状态
        time.Sleep(5 * time.Second)
    }()

    // 等待协程启动完成
    wg.Wait()
    // 尝试立即锁定互斥锁
    isSuccess := mux.TryLock()
    // 确保解锁
    defer mux.Unlock()

    // 验证是否未能成功锁定
    assert.False(t, isSuccess)
}

// TestMutex_ShouldIncrementCounterSuccess_WhenUseMultipleGoroutineAndAddCounterInLock 测试在多个协程并发情况下,使用Mutex锁定来增加计数器是否成功
// 参数:
// - t *testing.T: 测试环境的句柄,用于报告测试失败和日志记录
// 返回值: 无
func TestMutex_ShouldIncrementCounterSuccess_WhenUseMultipleGoroutineAndAddCounterInLock(t *testing.T) {
    // 初始化互斥锁、等待组和计数器
    mux := sync.Mutex{}
    wg := sync.WaitGroup{}
    counter := 0
    // 设置运行时的最大协程数为10,以模拟并发环境
    runtime.GOMAXPROCS(10)

    // 添加100个协程来并发执行增加计数器的操作
    wg.Add(100)
    for i := 0; i < 100; i++ {
        go func() {
            // 在协程结束时释放等待组
            defer wg.Done()
            // 加锁以确保对计数器的操作是互斥的
            mux.Lock()
            defer mux.Unlock()
            // 增加计数器
            counter++
        }()
    }
    // 等待所有协程完成
    wg.Wait()

    // 验证计数器是否被正确地增加了100次
    assert.Equal(t, 100, counter)
}

// TestMutex_ShouldPanic_WhenUnlockWithoutLock 测试当互斥锁没有被锁定时,尝试解锁是否会引发Panic。
// 参数 t *testing.T 用于测试的上下文,提供测试控制和日志记录功能。
func TestMutex_ShouldPanic_WhenUnlockWithoutLock(t *testing.T) {
    // 创建一个 sync.Mutex 实例。
    mux := sync.Mutex{}
    // 使用 assert 包的 Panics 函数来断言是否会引发Panic。
    assert.Panics(t, func() {
        // 尝试解锁一个没有被锁定的互斥锁。
        mux.Unlock()
    })
}

// TestMutex_ShouldStarvation_WhenOneRoutineHoldLockedForLongTime 测试当一个协程长时间持有互斥锁时,
// 其他协程是否会出现饥饿现象。参数 t *testing.T 用于测试的上下文,提供测试控制和日志记录功能。
func TestMutex_ShouldStarvation_WhenOneRoutineHoldLockedForLongTime(t *testing.T) {
    // 定义一个 sync.Mutex 实例用于测试互斥锁饥饿问题。
    var mu sync.Mutex
    // 用于同步协程开始的 waitgroup。
    var start sync.WaitGroup
    // 用于同步协程结束的 waitgroup。
    var done sync.WaitGroup

    start.Add(1) // 准备启动一个协程。
    done.Add(1)  // 等待一个协程完成。
    go func() {  // 启动一个协程长时间持有锁。
        start.Done() // 表示协程已启动并准备好。
        mu.Lock()    // 获取锁并长时间持有。
        time.Sleep(1000 * time.Second)
        mu.Unlock() // 最终释放锁。
        done.Done() // 表示协程已完成。
    }()

    start.Wait()                 // 等待协程开始并持有锁。
    time.Sleep(time.Millisecond) // 稍微延时以确保锁被持有。

    start.Add(1) // 准备启动另一个协程。
    done.Add(1)  // 等待另一个协程完成。
    go func() {  // 启动另一个协程尝试获取锁,以测试是否出现饥饿。
        start.Done()                             // 表示协程已启动并准备好。
        mu.Lock()                                // 尝试获取锁。
        t.Log("Starving goroutine got the lock") // 如果获取到锁,则记录日志。
        mu.Unlock()                              // 最终释放锁。
        done.Done()                              // 表示协程已完成。
    }()

    start.Wait()                 // 等待第二个协程开始。
    time.Sleep(time.Millisecond) // 稍微延时以确保尝试获取锁的协程已运行。

    mu.Lock()                            // 主协程尝试获取锁,以进一步测试饥饿情况。
    t.Log("Main goroutine got the lock") // 如果获取到锁,则记录日志。
    mu.Unlock()                          // 释放主协程持有的锁。

    done.Wait() // 等待所有协程完成,确保测试完整执行。
}

发表评论

您的电子邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部