内容目录
概述
在并发编程中,我们经常会遇到多个线程或协程访问共享资源的情况。为了保护这些资源不被同时修改,我们会用到"锁"的概念。
Go中提供了读写锁:sync.RWMutex。
sync.RWMutex是Go语言提供的一个基础同步原语,它是Reader/Writer Mutual Exclusion Lock的缩写,通常被称为"读写锁"。
读写锁允许多个读锁同时拥有者,但在任何时间点只允许一个写锁拥有者,或者没有锁拥有者。
这让读多写少的场景获得了更高的并发性能。
应用场景
- 典型应用场景就是读多写少
- 一写多读
提供的方法
sync.RWMutex提供了以下方法:
type RWMutex
// 获取写锁,有读锁或者写锁被其他goroutine使用则阻塞等待
func (rw *RWMutex) Lock()
// 尝试获取写锁,获取到则返回true,没有获取到则为false
func (rw *RWMutex) TryLock() bool
// 释放写锁
func (rw *RWMutex) Unlock()
// 获取读锁,
func (rw *RWMutex) RLock()
// 尝试获取读锁,获取到则返回true,没有获取到则为false
func (rw *RWMutex) TryRLock() bool
// 释放读锁
func (rw *RWMutex) RUnlock()
// 返回Locker
func (rw *RWMutex) RLocker() Locker
注意
使用RWMutex的时候,一旦调用了Lock方法,就不能再把该锁复制到其他地方使用,否则可能会出现各种问题。这是由于锁的状态(被哪个协程持有,是否已经被锁定等)是存储在RWMutex的结构体中,如果复制了RWMutex,那么复制后的RWMutex就会有一个全新的状态,锁的行为就会变得不可预测。
RWMutex和Mutex一样,一旦有了Lock调用就不能到处copy了,否则出现各种问题。
源码实现
RWMutex结构体
让我们一起深入Go的源码,看看RWMutex是如何实现的。
RWMutex 的结构体主要包括五个主要的字段,这些字段描述了锁的当前状态和持有者信息:
type RWMutex struct {
// Mutex,互斥锁。写者互斥锁,所有的写者加锁都调用w.Lock或者w.TryLock
w Mutex
// 写者信号量。当最后一个读者释放了锁,会触发一个信号通知writerSem
writerSem uint32
// 读者信号量。当写者释放了锁,会触发一个信号通知readerSem
readerSem uint32
// readerCount 记录当前持有读锁的协程数量。如果为负数,表示有写者在等待所有读者释放锁。如果为0,表示没有任何协程持有锁
readerCount atomic.Int32
// readerWait 记录写者需要等待的读者数量。当一个写者获取了锁之后,readerWait会设置为当前readerCount的值。当读者释放锁时,readerWait会递减1
readerWait atomic.Int32
}
读者加锁RLock()
加读锁时非常简单,就是将结构体中的readerCount加1,如果+1后为负数表示有写者等待则等待写者执行完成。
实现代码
func (rw *RWMutex) RLock() {
// 读者数量+1
if rw.readerCount.Add(1) < 0 {
// 加1以后如果readerCount是负数表示有写者持有了互斥锁
// 读者等待信号量释放
// 此时读锁已经加上了,等待写者释放信号量就可以了
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}
读者RTryLock()
这个函数是RWMutex中的TryRLock方法,它试图以非阻塞的方式获取读锁。让我们一步一步地看它是如何工作的。
先看图:
实现代码
func (rw *RWMutex) TryRLock() bool {
for {
// 查看当前读者数量
c := rw.readerCount.Load()
if c < 0 {
// 小于0表示有写者已经Penging,加锁失败
return false
}
// 读者数量+1,加读锁成功
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}
读者释放读锁RUnlock()
RUnlock方法用于释放读锁。 当一个读者完成读操作并想要释放锁时,就可以调用这个方法。
实现代码
func (rw *RWMutex) RUnlock() {
// 释放锁就是-1,
// 如果readerCount小于0表示有写者Pending
// 进入rUnlockSlow
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
// 边界问题处理
// r+1 ==0 表示没有读者加锁,却调用了释放读锁
// r+1 == -rwmutexMaxReaders表示没有读者加锁,有写者持有互斥锁却释放读锁
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
fatal("sync: RUnlock of unlocked RWMutex")
}
// 这表示这是最后一个读者了,最后一个读者要发送信号量通知写者不用等了
if rw.readerWait.Add(-1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
写者加锁Lock()
实现代码
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {
// 先持有互斥锁,已经有其他写者持有了互斥锁则等待
rw.w.Lock()
// rw.readerCount.Add(-rwmutexMaxReaders)这个表示先将readerCount设置为负数表示有写者在等待
// 再+rwmutexMaxReaders是为了求出当前reader的数量
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// 将当前reader的数量加到readerWait表示要等待的读者完成的个数
if r != 0 && rw.readerWait.Add(r) != 0 {
// 阻塞等待万有的读者完成释放信号量了
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}
写者加锁TryLock()
实现代码
func (rw *RWMutex) TryLock() bool {
// 调用互斥锁的TryLock,互斥锁TryLock返回false这儿也直接返回false
if !rw.w.TryLock() {
return false
}
// 加锁成功后
// 如果当前还有写者,CompareAndSwap就返回失败
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
// 返回失败就释放互斥锁
rw.w.Unlock()
// 加锁失败
return false
}
// 加锁成功
return true
}
写者解锁Unlock()
实现代码
func (rw *RWMutex) Unlock() {
// 这里是对Lock readerCount的逆向操作
// 在Lock的时候对readerCount减去了rwmutexMaxReaders,这次加回来;这样就还原了readerCount,即使在Lock之后依然有读者加锁
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
fatal("sync: Unlock of unlocked RWMutex")
}
// 然后循环看当前有多少读者正在等待信号,就释放多少次心血号
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}
测试
package mutex_test
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// 测试读写互斥锁在正常读锁定和解锁情况下的成功执行
func TestRWMutex_ShouldSuccess_WhenNormalReaderLockAndUnLock(t *testing.T) {
// 初始化一个读写互斥锁
rwmutex := sync.RWMutex{}
// 获取读锁
rwmutex.RLock()
// 设置成功标志为true,使用defer确保在函数结束时释放读锁
isSuccess := true
defer rwmutex.RUnlock()
// 记录日志表示测试成功
t.Log("success")
// 断言成功标志为true
assert.True(t, isSuccess)
}
// 测试RWMutex的写锁功能是否正常
func TestRWMutex_ShouldSuccess_WhenNormalWriterLockAndUnLock(t *testing.T) {
rwmutex := sync.RWMutex{} // 创建一个sync.RWMutex类型的变量
rwmutex.Lock() // 获取写锁
isSuccess := true // 标记为成功状态
defer rwmutex.Unlock() // 确保在函数退出时释放锁,避免死锁
t.Log("success") // 记录测试日志
assert.True(t, isSuccess) // 断言isSuccess为true,验证操作成功
}
// 函数测试了在正常情况下,
// 读写锁(RWMutex)的读锁(RLock)和写锁(Lock)的加锁与解锁操作是否成功。
func TestRWMutex_ShouldSuccess_WhenNormalReaderWriterLockAndUnLock(t *testing.T) {
// 初始化一个读写锁
rwmutex := sync.RWMutex{}
// 尝试获取读锁并立即释放
rwmutex.RLock()
rwmutex.RUnlock()
// 尝试获取写锁并立即释放
rwmutex.Lock()
rwmutex.Unlock()
// 标记测试为成功
isSuccess := true
// 记录测试成功日志
t.Log("success")
// 断言测试结果为真
assert.True(t, isSuccess)
}
// 测试读写锁在多协程情况下的读写互斥
func TestRWMutex_ShouldSuccess_WhenReaderAndWriterInDifferentRoutine(t *testing.T) {
// 初始化一个读写锁和等待组,用于协调不同协程的操作。
rwmutex := sync.RWMutex{}
wg := sync.WaitGroup{}
wg.Add(2) // 预期有两个协程完成操作
// 启动一个协程作为读锁持有者
go func() {
rwmutex.RLock() // 获取读锁
println("reader") // 打印读操作标识
rwmutex.RUnlock() // 释放读锁
wg.Done() // 表示读操作完成
}()
// 启动另一个协程作为写锁持有者
go func() {
rwmutex.Lock() // 获取写锁
println("writer") // 打印写操作标识
rwmutex.Unlock() // 释放写锁
wg.Done() // 表示写操作完成
}()
wg.Wait() // 等待所有协程完成操作
isSuccess := true
t.Log("success") // 记录测试成功
assert.True(t, isSuccess) // 断言测试结果为真
}
// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldBlockWriter_WhenMultipleReader(t *testing.T) {
rwmutex := sync.RWMutex{}
ch := make(chan bool)
wg := sync.WaitGroup{}
wg.Add(2)
for i := 0; i < 2; i++ {
go func(i int) {
wg.Done()
rwmutex.RLock()
println("reader Locked", i)
time.Sleep(10 * time.Second)
rwmutex.RUnlock()
println("reader UnLocked", i)
}(i)
}
go func() {
wg.Wait()
println("writer try to accquire wlock")
rwmutex.Lock()
println("writer has accquired wlock")
defer rwmutex.Unlock()
ch <- true
}()
<-ch
isSuccess := true
t.Log("success")
assert.True(t, isSuccess)
}
// 测试读写锁在多个写锁情况下的读写互斥
func TestRWMutex_ShouldBlockReaders_WhenWriterIsPresent(t *testing.T) {
rwmutex := sync.RWMutex{}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
println("writer try to accquire wlock")
rwmutex.Lock()
println("writer has accquired wlock")
wg.Done()
time.Sleep(10 * time.Second)
defer rwmutex.Unlock()
println("writer has released wlock")
}()
wg.Wait()
wg.Add(2)
for i := 0; i < 2; i++ {
go func(i int) {
println("reader try to lock", i)
rwmutex.RLock()
println("reader Locked", i)
rwmutex.RUnlock()
println("reader UnLocked", i)
wg.Done()
}(i)
}
wg.Wait()
isSuccess := true
t.Log("success")
assert.True(t, isSuccess)
}
// 测试读写锁在多个写锁情况下的读写互斥
func TestRWMutex_ShouldBlockConcurrentWriters(t *testing.T) {
rwmutex := sync.RWMutex{}
var blockedWriter bool
ch := make(chan bool)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
println("Writer 1 try to accquire wlock")
rwmutex.Lock()
println("Writer 1 has accquired wlock")
defer rwmutex.Unlock()
time.Sleep(15 * time.Second)
}()
go func() {
wg.Wait()
println("Writer 2 try to accquire wlock")
rwmutex.Lock()
println("Writer 2 has accquired wlock")
ch <- true
defer rwmutex.Unlock()
}()
select {
case <-ch:
blockedWriter = false
case <-time.After(20 * time.Second):
blockedWriter = true
}
assert.True(t, blockedWriter)
}
// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldLockSuccess_WhenTryingToReadLockTwice(t *testing.T) {
rwmutex := sync.RWMutex{}
writerWaitGroup := sync.WaitGroup{}
writerWaitGroup.Add(1)
go func() {
rwmutex.RLock()
println("readlock locked once")
rwmutex.RLock()
println("readlock locked twice")
rwmutex.RUnlock()
rwmutex.RUnlock()
defer writerWaitGroup.Done()
}()
writerWaitGroup.Wait()
isSuccess := true
assert.True(t, isSuccess)
}
// 测试读写锁在多个写锁情况下的读写互斥
func TestRWMutex_ShouldBeBlocked_WhenTryingToWriteLockTwice(t *testing.T) {
rwmutex := sync.RWMutex{}
ch := make(chan bool)
go func() {
rwmutex.Lock()
println("writelock locked once")
rwmutex.Lock()
println("writelock locked twice")
rwmutex.Unlock()
rwmutex.Unlock()
ch <- true
}()
isBlocked := false
select {
case <-ch:
println("should not execute this block")
assert.False(t, isBlocked)
case <-time.After(10 * time.Second):
isSuccess := true
println("executed timeout block")
assert.True(t, isSuccess)
}
}
// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldBeBlocked_WhenAccquireWriteLockThenReadLock(t *testing.T) {
rwmutex := sync.RWMutex{}
ch := make(chan bool)
go func() {
rwmutex.Lock()
println("writelock locked once")
rwmutex.RLock()
println("readlock locked twice")
rwmutex.RUnlock()
rwmutex.Unlock()
ch <- true
}()
isBlocked := false
select {
case <-ch:
println("should not execute this block")
assert.False(t, isBlocked)
case <-time.After(10 * time.Second):
isSuccess := true
println("executed timeout block")
assert.True(t, isSuccess)
}
}
// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldBeBlocked_WhenAccquireReadLockThenWriteLock(t *testing.T) {
rwmutex := sync.RWMutex{}
ch := make(chan bool)
go func() {
rwmutex.RLock()
println("readlock locked once")
rwmutex.Lock()
println("writelock locked twice")
rwmutex.Unlock()
rwmutex.RUnlock()
ch <- true
}()
isBlocked := false
select {
case <-ch:
println("should not execute this block")
assert.False(t, isBlocked)
case <-time.After(10 * time.Second):
isSuccess := true
println("executed timeout block")
assert.True(t, isSuccess)
}
}
// 测试读写锁在多个读锁情况下的读写互斥
func TestRWMutex_ShouldDeadlockOrBlocked_WhenLockOneGoroutineAccquiredLockAndAnotherGoroutineAccquireLockAgain(t *testing.T) {
var rwmutex1, rwmutex2 sync.RWMutex
wg := sync.WaitGroup{}
wg1 := sync.WaitGroup{}
ch := make(chan bool)
wg.Add(1)
wg1.Add(1)
go func() {
rwmutex1.Lock()
println("rwmutex1 locked")
wg.Done()
wg1.Wait()
println("rwmutex2 try to accquire lock")
rwmutex2.Lock()
}()
go func() {
wg.Wait()
rwmutex2.Lock()
println("rwmutex2 locked")
wg1.Done()
println("rwmutex1 try to accquire lock")
rwmutex1.Lock()
ch <- true
}()
isBlocked := false
select {
case <-ch:
println("should not execute this block")
assert.False(t, isBlocked)
case <-time.After(10 * time.Second):
isSuccess := true
println("executed timeout block")
assert.True(t, isSuccess)
}
}