Go · #concurrency#go#channel#patterns

Go Channel模式与并发编程实战

2023.07.12 Go 8 min 3.2k
// 目录 · contents

引言

Channel是Go语言并发编程的核心原语之一。Go的设计哲学——“Don’t communicate by sharing memory; share memory by communicating”——正是通过channel来实现的。本文将从channel的底层实现出发,系统介绍各种channel并发模式,并通过实际案例展示如何在生产环境中运用这些模式。

Channel内部结构

每个channel在runtime中由hchan结构体表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// runtime/chan.go (simplified)
type hchan struct {
qcount uint // 当前队列中的元素数量
dataqsiz uint // 环形缓冲区大小(buffered channel的容量)
buf unsafe.Pointer // 指向环形缓冲区
elemsize uint16 // 每个元素的大小
closed uint32 // 是否已关闭
elemtype *_type // 元素类型
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 等待接收的goroutine队列
sendq waitq // 等待发送的goroutine队列
lock mutex // 互斥锁
}
graph LR
    subgraph "hchan 结构"
        Lock["mutex lock"]
        Buf["环形缓冲区<br/>[0][1][2]...[n]"]
        SendQ["sendq<br/>等待发送队列"]
        RecvQ["recvq<br/>等待接收队列"]
        SendX["sendx: 发送指针"]
        RecvX["recvx: 接收指针"]
    end

    G1["G1 (sender)"] -->|阻塞| SendQ
    G2["G2 (receiver)"] -->|阻塞| RecvQ
    SendX --> Buf
    RecvX --> Buf

Buffered vs Unbuffered Channel

sequenceDiagram
    participant S as Sender
    participant CH as Unbuffered Channel
    participant R as Receiver

    Note over CH: Unbuffered (同步)
    S->>CH: send (阻塞等待接收者)
    R->>CH: receive
    CH-->>S: 解除阻塞
    CH-->>R: 获取数据

    Note over CH: Buffered (异步)
    S->>CH: send (缓冲区未满,立即返回)
    S->>CH: send (缓冲区已满,阻塞)
    R->>CH: receive (缓冲区非空,立即返回)
    CH-->>S: 缓冲区有空位,解除阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import "fmt"

func main() {
// Unbuffered channel: 发送和接收必须同时就绪
unbuffered := make(chan int)
go func() {
unbuffered <- 42 // 阻塞,直到有接收者
}()
val := <-unbuffered // 阻塞,直到有发送者
fmt.Println("Unbuffered:", val)

// Buffered channel: 缓冲区未满时发送不阻塞
buffered := make(chan int, 3)
buffered <- 1 // 不阻塞
buffered <- 2 // 不阻塞
buffered <- 3 // 不阻塞
// buffered <- 4 // 会阻塞,因为缓冲区已满

fmt.Println("Buffered:", <-buffered, <-buffered, <-buffered)
}

Channel操作的行为总结

操作 nil channel closed channel 正常 channel
发送 永久阻塞 panic 阻塞或成功
接收 永久阻塞 返回零值 阻塞或成功
关闭 panic panic 成功关闭

模式一:Fan-Out / Fan-In

Fan-Out将工作分发给多个goroutine并行处理,Fan-In将多个goroutine的结果汇聚到一个channel。

graph LR
    subgraph "Fan-Out"
        Source["数据源"] --> W1["Worker 1"]
        Source --> W2["Worker 2"]
        Source --> W3["Worker 3"]
    end

    subgraph "Fan-In"
        W1 --> Merge["合并"]
        W2 --> Merge
        W3 --> Merge
        Merge --> Output["输出"]
    end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package main

import (
"fmt"
"sync"
)

// fanOut 将任务分发给多个worker
func fanOut(input <-chan int, numWorkers int) []<-chan int {
workers := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
workers[i] = worker(input, i)
}
return workers
}

// worker 处理输入并返回结果channel
func worker(input <-chan int, id int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for val := range input {
// 模拟处理:将值翻倍
result := val * 2
fmt.Printf("Worker %d: %d -> %d\n", id, val, result)
output <- result
}
}()
return output
}

// fanIn 将多个channel合并为一个
func fanIn(channels ...<-chan int) <-chan int {
merged := make(chan int)
var wg sync.WaitGroup

for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for val := range c {
merged <- val
}
}(ch)
}

go func() {
wg.Wait()
close(merged)
}()

return merged
}

func main() {
// 创建数据源
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 20; i++ {
input <- i
}
}()

// Fan-Out: 3个worker并行处理
workers := fanOut(input, 3)

// Fan-In: 合并结果
results := fanIn(workers...)

// 收集结果
var total int
for result := range results {
total += result
}
fmt.Printf("Total: %d\n", total)
}

模式二:Pipeline

Pipeline模式将处理过程分解为多个阶段,每个阶段通过channel连接,形成数据处理流水线。

graph LR
    A["生成器<br/>generate"] -->|chan int| B["过滤器<br/>filter"]
    B -->|chan int| C["转换器<br/>transform"]
    C -->|chan int| D["消费者<br/>consume"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import "fmt"

// generate 生成整数序列
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}

// filter 只保留偶数
func filter(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}

// square 计算平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}

func main() {
// 构建 pipeline: generate -> filter(偶数) -> square
nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
evens := filter(nums)
squared := square(evens)

// 消费结果
for result := range squared {
fmt.Println(result) // 4, 16, 36, 64, 100
}
}

模式三:Timeout与Cancellation

使用select + time.After实现超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"time"
)

func slowOperation() <-chan string {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second) // 模拟慢操作
ch <- "operation completed"
}()
return ch
}

func main() {
result := slowOperation()

select {
case val := <-result:
fmt.Println("Got result:", val)
case <-time.After(2 * time.Second):
fmt.Println("Timeout! Operation took too long.")
}
}

使用done channel实现取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"fmt"
"time"
)

// longRunningTask 支持通过done channel取消
func longRunningTask(done <-chan struct{}) <-chan int {
results := make(chan int)
go func() {
defer close(results)
for i := 0; ; i++ {
select {
case <-done:
fmt.Println("Task cancelled")
return
case results <- i:
time.Sleep(200 * time.Millisecond)
}
}
}()
return results
}

func main() {
done := make(chan struct{})
results := longRunningTask(done)

// 读取5个结果后取消
for i := 0; i < 5; i++ {
fmt.Println("Result:", <-results)
}

close(done) // 发送取消信号
time.Sleep(100 * time.Millisecond) // 等待清理
fmt.Println("Done")
}

模式四:Rate Limiting(速率限制)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
"fmt"
"time"
)

func main() {
requests := make(chan int, 10)
for i := 1; i <= 10; i++ {
requests <- i
}
close(requests)

// 简单速率限制:每200ms处理一个请求
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter // 等待令牌
fmt.Println("Processing request", req, "at", time.Now().Format("15:04:05.000"))
}

// 突发速率限制:允许短时间突发3个请求
burstyLimiter := make(chan time.Time, 3)
// 预填充3个令牌
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
// 每200ms补充一个令牌
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()

burstyRequests := make(chan int, 10)
for i := 1; i <= 10; i++ {
burstyRequests <- i
}
close(burstyRequests)

for req := range burstyRequests {
<-burstyLimiter
fmt.Println("Bursty request", req, "at", time.Now().Format("15:04:05.000"))
}
}

模式五:Or-Done Channel

Or-Done模式用于从一个channel读取数据,同时支持提前取消:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main

import "fmt"

// orDone 包装一个channel,使其在done关闭时也能退出
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
valStream := make(chan int)
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case valStream <- v:
case <-done:
return
}
}
}
}()
return valStream
}

// or 合并多个done channel,任一关闭则返回
func or(channels ...<-chan struct{}) <-chan struct{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}

orDone := make(chan struct{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}

func main() {
// 使用or模式合并多个取消信号
done := make(chan struct{})
data := make(chan int)

go func() {
defer close(data)
for i := 0; i < 100; i++ {
data <- i
}
}()

// 读取前5个值后取消
count := 0
for val := range orDone(done, data) {
fmt.Println(val)
count++
if count >= 5 {
close(done)
break
}
}
}

模式六:Semaphore(信号量)

使用buffered channel实现信号量,控制并发度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"fmt"
"sync"
"time"
)

// Semaphore 使用buffered channel实现信号量
type Semaphore struct {
sem chan struct{}
}

func NewSemaphore(maxConcurrency int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, maxConcurrency),
}
}

func (s *Semaphore) Acquire() {
s.sem <- struct{}{} // 获取令牌,满则阻塞
}

func (s *Semaphore) Release() {
<-s.sem // 释放令牌
}

func main() {
sem := NewSemaphore(3) // 最多3个并发
var wg sync.WaitGroup

for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem.Acquire()
defer sem.Release()

fmt.Printf("Worker %d started at %s\n", id, time.Now().Format("15:04:05"))
time.Sleep(1 * time.Second) // 模拟工作
fmt.Printf("Worker %d finished at %s\n", id, time.Now().Format("15:04:05"))
}(i)
}

wg.Wait()
}

模式七:Generator(生成器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import "fmt"

// fibonacci 返回一个斐波那契数列生成器channel
func fibonacci(done <-chan struct{}) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
a, b := 0, 1
for {
select {
case <-done:
return
case ch <- a:
a, b = b, a+b
}
}
}()
return ch
}

func main() {
done := make(chan struct{})
fib := fibonacci(done)

// 获取前10个斐波那契数
for i := 0; i < 10; i++ {
fmt.Printf("F(%d) = %d\n", i, <-fib)
}
close(done)
}

性能考量

Channel vs Mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"sync"
"sync/atomic"
"testing"
)

// channel方式的计数器
func BenchmarkChannelCounter(b *testing.B) {
ch := make(chan int, 1)
ch <- 0
for i := 0; i < b.N; i++ {
val := <-ch
val++
ch <- val
}
}

// mutex方式的计数器
func BenchmarkMutexCounter(b *testing.B) {
var mu sync.Mutex
counter := 0
for i := 0; i < b.N; i++ {
mu.Lock()
counter++
mu.Unlock()
}
}

// atomic方式的计数器
func BenchmarkAtomicCounter(b *testing.B) {
var counter int64
for i := 0; i < b.N; i++ {
atomic.AddInt64(&counter, 1)
}
}

一般规则: - 通信场景用channel:goroutine之间传递数据 - 保护共享资源用mutex:简单的共享状态 - 简单数值操作用atomic:计数器、标志位

避免Channel泄漏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 错误示例:goroutine泄漏
func leakySearch(query string) <-chan string {
ch := make(chan string)
go func() {
// 如果没人接收,这个goroutine永远不会结束
ch <- doSearch(query)
}()
return ch
}

// 正确做法:使用buffered channel或context
func safeSearch(ctx context.Context, query string) <-chan string {
ch := make(chan string, 1) // buffered,即使无人接收也不会泄漏
go func() {
select {
case ch <- doSearch(query):
case <-ctx.Done():
}
}()
return ch
}

总结

本文系统介绍了Go Channel的核心知识和常用并发模式:

  1. Channel内部结构:基于环形缓冲区和等待队列的实现
  2. Fan-Out/Fan-In:并行处理与结果汇聚
  3. Pipeline:多阶段流水线处理
  4. Timeout/Cancellation:超时控制与优雅取消
  5. Rate Limiting:速率限制与突发控制
  6. Or-Done:组合取消信号
  7. Semaphore:并发度控制

掌握这些模式并理解其适用场景,是编写健壮、高效的Go并发程序的关键。在实际开发中,建议优先使用标准库提供的高层抽象(如contexterrgroup),只在需要精细控制时才使用底层的channel模式。

作者 · authorzt
发布 · date2023-07-12
篇幅 · length3.2k 字 · 8 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论