逐步学习Go-并发通道chan(channel)

内容目录

概述

Go的Routines并发模型是基于CSP,如果你看过七周七并发,那么你应该了解。

什么是CSP?

"Communicating Sequential Processes"(CSP)这个词组的含义来自其英文直译以及在计算机科学中的使用环境。

CSP是 Tony Hoare 在1978年提出的,论文地址在:https://dl.acm.org/doi/10.1145/359576.359585

拆字解释下

Communicating Sequential Processes(CSP)的三个单词:

  • C for Communicating: 通信,什么的通信那?进程/线程/协程的通信。

  • S for Sequenctial: 顺序的,什么的顺序?进程/线程/协程之间执行任务时应该是有顺序的,完全并行执行是理想化的,现实中就是要先指定完第一个或者第一批任务才能执行第二个或者第二批任务。

  • P for Processses: 进程,这个是进程,估计是因为这个概念提出来的时候比较早。我们这儿得抽象一下,Processes指的是进程/线程/协程。

那么我们来总结一下CSP,CSP就是多个能够进行通信,并且按照顺序执行任务的独立进程。这些进程在各自执行自己的任务的时候,还可以通过某种方式是进行通信。

在Golang中就是通过Channel进行通信。

好了,CSP解释完了,我们来看Go中的Channel,另外CSP的参与者Go Routine我在之前的文章中有提到过,大家可以去:逐步学习Go-协程goroutine

这张图就描述了CSP编程模型。
file

Go中routine代表图中的Process,Channel就是goroutine之间的连接。通道可以让一个goroutine发送信息到另一个goroutine。

Go中的channel

Go中Channel有两种类型:

  1. 无缓冲Channel(Unbuffered)
  2. 有缓冲Channel(Buffered)
    有缓冲的Channel其实就是一个环形缓冲队列;无缓冲的没有队列,因为读写都会阻塞。

Channel的定义

var channel名称 chan channel类型

// 类型自动推断
channel名称 := make(chan channel类型, buffer数量(int可以为0))

比如:我们可以这样来定义:

// 定义了一个channel,还没有make,不确定是否为有缓冲和无缓冲channel
var ch chan int

// 定义了一个chnnel, 容量为0,无缓冲channel
ch := make(chan int, 0)

// 定义了一个channel,容量为1,有缓冲channel
ch := make(chan int, 1)

我们实际使用的时候把Channel理解为队列就可以了。

Go中的Channel有两种类型:

  1. 无缓冲channel
  2. 有缓冲channel

无缓冲和有缓冲的特性如下:

  • 无缓冲Channel
    • 无缓冲Channel没有存储数据的能力
    • 发送方向Channel中发送数据的时候,发送方会阻塞直到有接受者接受这个数据
    • 无缓冲Channel典型应用就是go协程同步通信
    • 无缓冲Channel保证通信双方都要准备好数据交换
  • 有缓冲Channel
    • 有缓冲Channel需要定义Channel的容量
    • 发送方向有缓冲Channel发送数据的时候,只有容量满的时候才会阻塞
    • 接收方只有在有缓冲Channel为空时才会阻塞
    • 有缓冲通道的典型应用场景是生产者和消费者

Channel的操作

Channel主要支持2中操作:

  1. 发送(send)
  2. 接收(recv)

这三种操作在代码中的的定义和使用:

  1. 发送和接收都使用<-

来看代码:

// 先定义一个无缓冲channel
ch := make(chan int, 0)
ch := make(chan int)

// 发送数据到channel
ch <- 1

// 从channel中接收数据
<- ch

我们看到发送和接收都是使用<-,差别在于:

  1. ch在<-的左边,操作为发送
  2. ch在<-的右边,操作为接收

另外,channel在使用之前都要先创建,使用完毕后要关闭,分别使用makeclose关闭。

// 创建相当于分配channel(allocation)
ch := make(chan int, 0)

// 关闭channel,释放channel资源
defer close(ch)

channel创建完直接关闭了还能操作发送和接收吗?

这个问题我们通过写代码来测试,我们先来测试发送,然后再测试接收。

  • 发送数据到关闭的Channel

    func TestUnbufferedChannel_ShouldPanic_whenWriteValueToAClosedChannel(t *testing.T) {
    
    f := func() {
        ch := make(chan int)
        close(ch)
    
        ch <- 1
    }
    
    assert.Panics(t, f, "should panic")
    }

    运行截图:
    file

我们的UT PASS了表示发生了panic,这就说明我们不能向已经关闭的channel发送数据。

  • 在已经关闭的Channel上接收

func TestUnbufferedChannel_ShouldSuccess_whenRecvValueAtAClosedChannel(t *testing.T) {
    ch := make(chan int)
    close(ch)
    var val = <-ch
    assert.Equal(t, 0, val)
}

func TestUnbufferedChannel_ShouldSuccess_whenRecvEmptyValueAtAClosedChannel(t *testing.T) {
    ch := make(chan string)
    close(ch)
    var val = <-ch
    assert.Equal(t, "", val)
}

运行截图:
file

这两个UT都可以PASS,我只截图了一个PASS,这说明我们可以在一个关闭的channel上接收数据,只是接收到的都是0值。关于0值要特别说明一下,0值是针对不同类型的,比如:int的0值就是0,string的0值就是空字符串,指针的0值就是nil,看下面代码:
file

并非“任何后续的接收操作都将立即返回零值”,而是当channel中所有已发送的值都被接收后,接下来的接收操作会立即返回零值。

无缓冲channel

无缓冲通道顾名思义:就是没有数据缓冲能力的Channel,有goroutine向无缓冲Channel发送了数据就必须有另一个goroutine来接受,否则发送的goroutine会阻塞;反之,有goroutine从这个channel接受数据而没有另一个goroutine向这个channel发送,那么接受的goroutine也会阻塞。

应用场景:

  1. 部分任务需要同步就用无缓冲channel

来看场景代码:

有发送无接受

发送goroutine会被阻塞。


func TestUnbufferedChannel_ShouldWriteTimeout_WhenNoRoutineReadTheChannel(t *testing.T) {

    // 创建无缓冲channel
    c := make(chan int)
    is_timeout := false
    try_to_write_value := 1

    // when
    select {
    // 直接向channel中发送
    case c <- try_to_write_value:
    case <-time.After(3 * time.Second):
        // should
        is_timeout = true
    }

    assert.True(t, is_timeout)

}

file

有接受无发送

接收goroutine会被阻塞


func TestUnbufferedChannel_ShouldReadTimeout_WhenNoValueWriteToChannel(t *testing.T) {

    // 创建无缓冲channel
    c := make(chan int)
    is_timeout := false

    select {
    // 直接接受channel中的数据
    case <-c:
    case <-time.After(3 * time.Second):
        // should
        is_timeout = true
    }

    // 三秒后超时
    assert.True(t, is_timeout)

}

有发送有接受

有发送有接收,一切正常。

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    // 将累加结果发送到channel
    c <- sum
}

func TestUnbufferedChannel_ShouldRecvValues_WhenWriteValueToChannel(t *testing.T) {

    // 创建无缓冲channel
    c := make(chan int)

    // given
    s := []int{1, 2, 3, 4, 5, 6}

    // when
    // 执行数组累加
    go sum(s[:], c)
    ret1 := <-c

    // should
    // 和应该是21
    assert.Equal(t, 21, ret1)
}

file

使用无缓冲Channel控制并发


// 先定义一个worker函数
// worker函数从无缓冲channel中接收
// 可以接到到数据就执行后面的打印内容
// 打印完成后退出
func worker(id int, lock chan bool) {
    var shouldRun = <-lock
    if shouldRun {
        fmt.Printf("time: %v Worker %d is working\n", time.Now(), id)
        time.Sleep(time.Second)
        fmt.Printf("time: %v Worker %d has finished\n", time.Now(), id)
    }
}

func TestUnbufferedChannel_ShouldRunOneByOne_When(t *testing.T) {
    lock := make(chan bool, 1)

    // 启动5个goroutine等待释放接收
    for i := 0; i < 5; i++ {
        go worker(i, lock)
    }

    // 发送5个true到channel
    for i := 0; i < 5; i++ {
        lock <- true
        time.Sleep(time.Second)
    }

    close(lock)

    time.Sleep(10 * time.Second)
}

file

使用无缓冲Channel实现CompleteFuture.anyOf()

CompleteFuture.anyOf() 是 Java 中的一个函数,它返回一个新的 CompletableFuture,当给定的任何 CompletableFuture 完成时,返回的 CompletableFuture 也完成,并带有完成的 CompletableFuture 的结果。


// future函数使用time.Sleep模拟实际业务处理延迟
// 业务处理完成后将业务数据写入无缓冲Channel
func future(id int, delay time.Duration, resChan chan int) {
    time.Sleep(delay)
    fmt.Printf("Hi, I have finished my task, my id is %d\n", id)
    resChan <- id
}

// 接收一系列上面的future, 然后使用go routine启动这些future函数并将结果写入到result channel,最后再返回result channel。
func anyOf(futures ...<-chan int) <-chan int {
    result := make(chan int)
    for _, future := range futures {
        go func(f <-chan int) {
            result <- <-f
        }(future)
    }
    return result
}

func TestAnyOf_ShouldSuccess(t *testing.T) {
    // 创建无缓冲的 channel
    resChan1 := make(chan int)
    resChan2 := make(chan int)
    resChan3 := make(chan int)

    // 启动 goroutines
    go future(1, 3*time.Second, resChan1)
    go future(2, 2*time.Second, resChan2)
    go future(3, 5*time.Second, resChan3)

    result := anyOf(resChan1, resChan2, resChan3)

    assert.Equal(t, 2, <-result)
}

上面有两个比较让人纠结的语法:

  1. <-chan int
  2. result <- <-f
  • <-chan int表示只读通道,anyOf只能读取通道内的数据;有了只读就有只写,只写通道chan<- int
  • result <- <-f表示从通道f中接收数据并将数据写入到result通道。这一行相当于执行了
    v := <-f
    result <- v

    file

有缓冲channel

有缓冲channel就是你可以暂时把数据发送到channel,如果channel的缓冲区没有被占用完就不会阻塞,缓冲区被占用完了就被阻塞了。

特性:

  1. 发送goroutine在缓冲区没有用完之前不会阻塞,缓冲区被使用完了之后发送goroutine就会被阻塞
  2. 接受goroutine在缓冲区有数据时,不会阻塞,缓冲区没有数据时会被阻塞

有缓冲channel应用场景是什么?

  1. 任务队列就是最典型的场景,生产者消费者模型
  2. 其他无缓冲channel搞不定的就用有缓冲channel

实现一个有缓冲channel的RateLimiter

import (
    "sync"
    "sync/atomic"
    "testing"

    "fmt"
    "time"

    "github.com/stretchr/testify/assert"
)

type RateLimiter struct {
    tokens       chan struct{}
    refillTicker *time.Ticker
    closeCh      chan struct{}
}

func NewRateLimiter(rate int) *RateLimiter {
    r := &RateLimiter{
        tokens:       make(chan struct{}, rate),
        refillTicker: time.NewTicker(time.Second / time.Duration(rate)),
        closeCh:      make(chan struct{}),
    }

    go r.refill()

    return r
}

func (r *RateLimiter) refill() {
    for {
        select {
        case <-r.refillTicker.C:
            select {
            case r.tokens <- struct{}{}:
            default:
            }
        case <-r.closeCh:
            r.refillTicker.Stop()
            return
        }
    }
}

func (r *RateLimiter) Acquire() {
    <-r.tokens
}

func (r *RateLimiter) TryAcquire() bool {
    select {
    case <-r.tokens:
        return true
    default:
        return false
    }
}

func (r *RateLimiter) Close() {
    close(r.closeCh)
}

func myTask(id int) {
    fmt.Printf("time: %v workder %d is working\n", time.Now(), id)
    time.Sleep(20 * time.Millisecond)
    fmt.Printf("time: %v workder %d has finished\n", time.Now(), id)
}

func TestRateLimiter_ShouldPermitWithBlocking_WhenRequestOnce(t *testing.T) {
    rateLimiter := NewRateLimiter(100)

    startTime := time.Now()
    for i := 0; i < 1; i++ {
        rateLimiter.TryAcquire()
        myTask(i)
    }
    endTime := time.Now()

    elapsedTime := endTime.Sub(startTime)
    fmt.Printf("elapsed time: %v\n", elapsedTime)
    fmt.Printf("explect time: %v\n", 300*time.Millisecond)
    assert.True(t, elapsedTime < 300*time.Millisecond)
}

func TestRateLimiter_ShouldLimitPermits_WhenGivenLimitedResource(t *testing.T) {
    var counter int32 = 0
    rateLimiter := NewRateLimiter(100)
    wg := sync.WaitGroup{}
    startTime := time.Now()
    for i := range 1000 {
        wg.Add(1)
        go func() {
            rateLimiter.Acquire()
            myTask(i)
            atomic.AddInt32(&counter, 1)
            wg.Done()
        }()

    }
    wg.Wait()
    endTime := time.Now()
    elapsedTime := endTime.Sub(startTime)
    fmt.Printf("elapsed time: %v\n", elapsedTime)
    fmt.Printf("should greater than explect time: %v\n", 10*time.Second)
    assert.Equal(t, counter, int32(1000))
    assert.True(t, 10*time.Second < elapsedTime)
}

file

实现无缓冲Channel实现Java中的CyclicBarrier

CyclicBarrier 是一个同步工具,它允许一组线程互相等待,直到他们都到达了一个共同的屏障点。在涉及固定大小的线程团队必须偶尔相互等待的程序中,CyclicBarriers 非常有用。之所以称之为“循环”屏障,是因为在等待的线程被释放之后,它可以被重复使用。

  • await() 所有的参与者都调用了wait方法后返回或者被中断
    我们就实现这个await方法,暂时不支持中断,代码如下:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// CyclicBarrier 让一组goroutine在到达某个点之后才能继续执行
type CyclicBarrier struct {
    // 总goroutine数量
    participant int
    // 用于等待所有goroutine准备好
    waitGroup sync.WaitGroup
    // 无缓冲channel,用于goroutine间同步
    barrierChan chan struct{}
    running     int32
}

// NewCyclicBarrier 创建一个新的CyclicBarrier
func NewCyclicBarrier(participant int) *CyclicBarrier {
    b := &CyclicBarrier{
        participant: participant,
        barrierChan: make(chan struct{}),
        running:     int32(participant),
    }
    // 设置等待的goroutine数
    b.waitGroup.Add(participant)
    return b
}

// 当一个goroutine调用Wait时,
// 它将在屏障处等待,
// 直到所有goroutine都到达这里
func (b *CyclicBarrier) Wait() {
    // 一个goroutine准备好了
    b.waitGroup.Done()

    // 等待所有goroutine都准备好
    b.waitGroup.Wait()

    // 当所有goroutine都准备好了,关闭channel进行广播通知
    if atomic.AddInt32(&b.running, -1) == 0 {
        close(b.barrierChan)
    } else {
        // 等待通知
        <-b.barrierChan
    }

}

// 阻塞调用goroutine直到所有goroutine都调用了Wait方法,
// 屏障开放后,重新置为待关闭状态
func (b *CyclicBarrier) Await() {
    // 等待屏障开放的信号
    <-b.barrierChan

    // 重置屏障状态
    b.barrierChan = make(chan struct{})
    b.waitGroup.Add(b.participant)
}

func (b *CyclicBarrier) Close() {
    close(b.barrierChan)
}

func main() {
    // 这里我们设置3个goroutine参与
    barrier := NewCyclicBarrier(100)

    for i := 0; i < 100; i++ {
        go func(i int) {
            fmt.Printf("Goroutine %d is working...\n", i)
            // 模拟工作
            time.Sleep(time.Duration(i+1) * time.Second)
            fmt.Printf("Goroutine %d reached the barrier.\n", i)
            barrier.Wait()

            fmt.Printf("Goroutine %d passed the barrier.\n", i)
        }(i)
    }

    // 主goroutine等待所有goroutine都到达屏障
    barrier.Await()
    fmt.Println("All goroutines have passed the barrier")
}

参考

  1. https://github.com/golang/go/blob/master/src/runtime/chan.go#L30

编写不易,如有问题请评论告知

1人评论了“逐步学习Go-并发通道chan(channel)”

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部