概述
在并发编程的世界里,Go 语言以其轻量级线程——goroutines 和优秀的同步机制——channels 而备受青睐。但除了channels,Go 标准库sync包下的WaitGroup(等待组)也是管理goroutines同步的重要工具。今天,我们就来聊聊如何使用WaitGroup来等待一组goroutines的完成,以及它的一些注意点。
WaitGroup 简介
想象一下,你的应用程序启动了一大堆goroutines来处理各种任务,咱们的主goroutine需要等待它们都跑完才能继续进行。这时候,WaitGroup就登场了。
简单来说,WaitGroup用来等待一组goroutines的结束。主goroutine会通过调用Add()
方法来告知WaitGroup需要等待多少个goroutines。每当一个子goroutine完成任务后,就调用Done()
方法表示它跑完了。主goroutine则可以通过Wait()
方法阻塞自己,直至所有子goroutine都告一段落。
但要小心,一旦开始用WaitGroup,就不要复制它,否则会导致不可预料的结果。同时,你也需要确保上一轮的Wait()
调用完成后,才能开始下一轮的任务添加(Add),这样才能复用WaitGroup。
WaitGroup 在实战中
让我们来看看一些基本的WaitGroup使用示例,以及它们是如何测试的。
1. 基础使用
你首先要做的是初始化一个WaitGroup,给它告知你将要等待的goroutines数量,启动goroutine后进行相应的任务,然后在任务完成后调用Done()
。最后,用Wait()
阻塞等待所有goroutine完成。
var wg sync.WaitGroup
wg.Add(1) // 我们将等待一个goroutine
go func() {
// 执行任务...
wg.Done() // 任务完成,通知WaitGroup
}()
wg.Wait() // 阻塞,直到所有goroutine完成
2. 并发中的使用
WaitGroup同样适用于并发情况。如果你启动了多个goroutines,只需要在Add()
方法中传入正确的数量,然后每个goroutine在完成时调用一次Done()
。
var wg sync.WaitGroup
numOfGoroutines := 10 // 假设我们有10个并发的goroutines
wg.Add(numOfGoroutines) // 设置WaitGroup等待的goroutines数量
for i := 0; i < numOfGoroutines; i++ {
go func(i int) {
// 执行任务...
wg.Done() // 此goroutine完成,通知WaitGroup
}(i)
}
wg.Wait() // 等所有goroutine运行完毕
3. 复用WaitGroup
WaitGroup是可以复用的,条件是你必须在上一轮的Wait()
调用结束后进行新一轮的添加。这保证了每一轮的goroutines都能正确地被等待。
4. 超时等待
有时候,你可能会担心某些goroutine会永久阻塞WaitGroup,这时可以通过带有超时机制的方式来防范。
var wg sync.WaitGroup
ch := make(chan bool)
wg.Add(1)
go func() {
// 执行长时间任务...
wg.Done()
}()
// 另一个goroutine等待或超时
go func() {
select {
case <-ch:
// WaitGroup已经解锁
case <-time.After(5 * time.Second):
// 超时
}
}()
wg.Wait()
5. 错误使用场景
WaitGroup是很有用,但它也有一些规研功夫。例如,你不能添加负数的goroutines,这在逻辑上是没有意义的。同样,如果没有先调用Add()
,直接调用Done()
是无效的,会引起panic。总之,遵循它的规则,它就能帮你很好地管理goroutines的同步问题。
package waitgroup_test
import (
"fmt"
"runtime"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// 这是对Go语标准库中sync包下的WaitGroup的描述。
// WaitGroup用于等待一组并发的goroutine结结束。
// 主goroutine会调用Add方法来设置需要等待的goroutine的数量。
// 然后,每个goroutine在结束之后需要调用Done方法。
// 同时,可以使用Wait方法阻塞,直到所有的goroutine都结束。
// 在使用WaitGroup的过程中,一旦开始使用就不能再进行复制。
// 在 Go 内存模型的术语中,对 Done 的调用 “在” 它解除的任何 Wait 调用返回前“同步”。
// 这是一种保证内存可见性的机制,即一旦Done被调用,Wait就能获知并返回。
// 在并发编程中,这种机制能确保相互依赖的操作的正确顺序,避免了潜在的竞态条件。
// [很重要]如果你想复用一个WaitGroup,等待几组不相关的事件,你必须确保在调用新一轮的Add之前,所有先前的Wait调用都已经返回完毕。
// TestWaitGroup_ShouldComplete_WhenAddCountEqualsDoneCount 测试了一个正常的
// WaitGroup 使用场景。我们添加了一个需要等待的 goroutine,当这个 goroutine 完成时,
// WaitGroup 的 Wait 方法应该返回。这是最基础的使用 WaitGroup 的场景。
func TestWaitGroup_ShouldComplete_WhenAddCountEqualsDoneCount(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
println("Hello WaitGroup")
wg.Done()
}()
wg.Wait()
completed := 1
assert.Equal(t, 1, completed)
}
// TestWaitGroup_ShouldCompleted_WhenMultipleGoroutineDoNormalAddAndDone 用于测试多个
// 并发的 Goroutine 是否能够正确地被 WaitGroup 跟踪以及同步。
// 我们创建了 loopCount 个并发的 Goroutine,并且每个 Goroutine 会在完成任务后调用 Done 方法标记。
// 我们在主 Goroutine 中 阻塞等待所有的 Goroutine 完成。
// 如果所有的 goroutine 都能顺利地完成数据处理且调用了Done 方法,WaitGroup 的 Wait 方法将会顺利返回,解除阻塞并且继续执行。
func TestWaitGroup_ShouldCompleted_WhenMultipleGoroutineDoNormalAddAndDone(t *testing.T) {
wg := sync.WaitGroup{}
loopCount := 100
wg.Add(loopCount)
for i := 0; i < loopCount; i++ {
go func(i int) {
println("executed ", i, "")
wg.Done()
}(i)
}
isSuccess := true
wg.Wait()
assert.True(t, isSuccess)
}
// TestWaitGroup_ShouldCompleted_WhenReuseWaitGroupAfterWaitReturnedAtFirstRound 用于测试在 WaitGroup 对象的 Wait 方法返回后,
// 是否可以再次复用 WaitGroup 对象来等待新的并发的 Goroutine。
// 首先,我们创建了 loopCount 个并发的 Goroutine,每个 Goroutine 完成任务之后都会调用 Done 方法。
// 在主 Goroutine 中我们调用 Wait 方法来阻塞等待这一组 Goroutine 的完成。
// 如果所有的 Goroutine 完成任务并调用了 Done 方法,那么 Wait 方法应该能够返回,说明第一轮的并发操作成功完成。
// 在第一轮的并发操作完成后,我们再次复用了这个 WaitGroup 对象,再次添加 loopCount 个并发的 Goroutine 并在主 Goroutine 中调用 Wait 方法阻塞等待。
// 如果这一轮的并发操作也能够成功完成,那么说明 WaitGroup 对象在 Wait 方法返回后可以被再次复用。
func TestWaitGroup_ShouldCompleted_WhenReuseWaitGroupAfterWaitReturnedAtFirstRound(t *testing.T) {
wg := sync.WaitGroup{}
loopCount := 100
// 第一轮
wg.Add(loopCount)
for i := 0; i < loopCount; i++ {
go func(i int) {
println("executed ", i, "")
wg.Done()
}(i)
}
isFirstRoundSuccess := false
wg.Wait()
isFirstRoundSuccess = true
// 第二轮
wg.Add(loopCount)
for i := 0; i < loopCount; i++ {
go func(i int) {
println("executed ", i, "")
wg.Done()
}(i)
}
isNextRoundSuccess := false
wg.Wait()
isNextRoundSuccess = true
assert.True(t, isFirstRoundSuccess)
assert.True(t, isNextRoundSuccess)
}
// TestWaitGroup_ShouldAllReturned_WhenMultipleGoroutineWaitForDone 测试了多个Goroutine
// 在同一时刻等待一个WaitGroup。在这个测试中,添加了一个Goroutine到WaitGroup并且该Goroutine会在
// 一段时间后调用Done()方法。而在其他的Goroutine中,它们通过调用Wait()方法等待这个WaitGroup。
// 所有等待该WaitGroup的Goroutine都应该在Done被调用后解锁。如果所有的Goroutine都顺利解锁并且Wait方法返回,
// 则表明WaitGroup可以正确地等待所有的Goroutine完成。
func TestWaitGroup_ShouldAllReturned_WhenMultipleGoroutineWaitForDone(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
time.Sleep(time.Second)
wg.Done()
println("Done")
}()
go func() {
wg.Wait()
println("Wait complete 1")
}()
go func() {
wg.Wait()
println("Wait complete 2")
}()
isSuccess := false
wg.Wait()
isSuccess = true
assert.True(t, isSuccess)
}
// TestWaitGroup_ShouldBlocked_WhenAddCountGreaterThanDoneCount 测试了一种特殊情况
// 当我们添加了更多的需要等待的 goroutine 比实际完成的 goroutine 时,WaitGroup 的 Wait
// 方法应该阻塞,直至等待的 goroutine 完成。在这个测试用例中,我们专门添加了一个超时
// 机制来判断 Wait 是否正常阻塞。
func TestWaitGroup_ShouldBlocked_WhenAddCountGreaterThanDoneCount(t *testing.T) {
wg := sync.WaitGroup{}
ch := make(chan bool)
wg.Add(2)
go func() {
println("Hello WaitGroup")
wg.Done()
}()
go func() {
wg.Wait()
ch <- true
}()
isCompleted := false
isTimeout := false
select {
case <-ch:
isCompleted = true
case <-time.After(10 * time.Second):
isTimeout = true
}
assert.False(t, isCompleted)
assert.True(t, isTimeout)
}
// TestWaitGroup_ShouldPanic_WhenAddNegtiveCounterWithoutAddPositiveCounter 在添加负数
// goroutine 时,WaitGroup 应该 panic。这是因为添加负数的 goroutine 没有任何意义,反而
// 会导致等待的 goroutine 数量变为负数,这是一种错误的使用方式。
func TestWaitGroup_ShouldPanic_WhenAddNegtiveCounterWithoutAddPositiveCounter(t *testing.T) {
wg := sync.WaitGroup{}
assert.Panics(t, func() { wg.Add(-1) })
}
// TestWaitGroup_ShouldPainic_WhenTryToReusePreviousWaitHasReturned 当我们尝试在上一个
// Wait 调用还没有返回时,复用 WaitGroup,WaitGroup 应该 panic。
// 这是因为如果我们在上一个Wait 调用没有返回之前就复用 WaitGroup,那么新增的 goroutine 可能会影响上一个 Wait 调用
// 的正确返回结果。
func TestWaitGroup_ShouldPainic_WhenTryToReusePreviousWaitHasReturned(t *testing.T) {
var wg sync.WaitGroup
runtime.GOMAXPROCS(3)
wg.Add(1)
go func() {
fmt.Println("Wait executed")
wg.Wait()
fmt.Println("Wait completed")
}()
go func() {
time.Sleep(time.Second * 5)
wg.Done()
fmt.Println("Add executed")
wg.Add(1)
}()
go func() {
time.Sleep(time.Second * 5)
fmt.Println("Done executed")
wg.Done()
}()
time.Sleep(100 * time.Second)
}
// TestWaitGroup_ShouldPanic_WhenNoAddCalled 用于测试在没有通过 Add 方法添加任何需要
// 等待的 Goroutine 的情况下,直接调用 Done 方法是否会抛出 panic。
// 根据 WaitGroup 的使用规则,我们必须显式地通过调用 Add 方法告诉 WaitGroup,有多少个 Goroutine 需要等待完成。
// 如果我们没有通过 Add 方法添加任何需要等待的 Goroutine,而直接调用 Done 方法,那么应该
// 会抛出 panic,因为 Done 方法预期会有一些任务需要它去完成,然而这些任务并未被正确添加到
// WaitGroup 中。
func TestWaitGroup_ShouldPanic_WhenNoAddCalled(t *testing.T) {
wg := sync.WaitGroup{}
assert.Panics(t, func() {
wg.Done()
})
}
倒数第二个测试用例的截图:
总结
sync.WaitGroup
是Go语言中处理并发编程时非常实用的工具,能够帮助你优雅地同步多个goroutines的执行。不过,要记住,对其正确使用是至关重要的,否则可能会引入bug或导致程序崩溃。正确地使用WaitGroup,你的并发程序会稳如老狗,放心大胆地去编并发吧!
有一些场景不好复现,大家有兴趣可以看下sync.WaitGroup源码。源码还是挺简单的