引言
Go 语言的设计鼓励开发者编写高效的并发程序。
并发模式
- 生产者-消费者模式:
- 一个或多个生产者 goroutine 通过 channel 将数据发送给消费者 goroutine。
- 示例:
ch := make(chan int)
go producer(ch)
go consumer(ch)- 工作池模式:
- 创建一组 worker goroutine 处理任务队列中的任务。
- 示例:
jobs := make(chan int, 100)
results := make(chan int, 100)
go workers(jobs, results)
for i := 0; i < 100; i++ {
jobs <- i
}
close(jobs)- 扇入/扇出模式:
- 扇入:多个 goroutine 向同一个 channel 发送数据。
- 扇出:一个 goroutine 向多个 channel 发送数据。
- 示例(扇入):
go func() { fanIn <- 1 }()
go func() { fanIn <- 2 }()
result := <-fanIn- 管道模式:
- 一系列 goroutine 通过 channel 连接,形成数据处理管道。
- 示例:
ch1 := make(chan int)
ch2 := make(chan int)
go producer(ch1)
go transformer(ch1, ch2)
go consumer(ch2)最佳实践
- 避免共享内存:
- 使用 channel 而不是共享内存来通信。
- 示例:
ch := make(chan int)
go func() {
ch <- 42
}()
result := <-ch- 错误处理:
- 在 goroutine 中处理错误,并确保错误能够正确传播。
- 示例:
chErr := make(chan error)
go func() {
chErr <- doSomething()
}()
if err := <-chErr; err != nil {
handle(err)
}- 通道关闭:
- 使用 close 函数关闭不再使用的通道。
- 示例:
close(ch)- goroutine 数量控制:
- 控制 goroutine 的数量以避免不必要的资源消耗。
- 示例:
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 执行任务
}()
}
wg.Wait()- 使用 context:
- 使用 context 包来管理 goroutine 的生命周期和取消操作。
- 示例:
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
fmt.Println("Canceled")
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
}
}()
time.Sleep(500 * time.Millisecond)
cancel()- 死锁预防:
- 确保 goroutine 正确地释放资源,避免死锁。
- 示例:
var mu sync.Mutex
mu.Lock()
defer mu.Unlock() // 执行任务- 竞争状态检测:
- 使用 sync/atomic 包和 testing 包中的 TestMain 函数来检测竞争状态。
- 示例:
func TestConcurrentAccess(t *testing.T) {
t.Parallel()
var counter int32
for i := 0; i < 1000; i++ {
go func() {
atomic.AddInt32(&counter, 1)
}()
}
time.Sleep(1 * time.Second)
if atomic.LoadInt32(&counter) != 1000 {
t.Errorf("Counter expected to be 1000, got %d", counter)
}
}编码实践
// https://go.dev/play/p/xgyepsc_jw7
package main
import (
"context"
"fmt"
"time"
)
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
time.Sleep(100 * time.Millisecond)
}
close(ch)
}
func consumer(ch <-chan int) {
for value := range ch {
fmt.Println("Received:", value)
}
}
func workers(jobs <-chan int, results chan<- int) {
for j := range jobs {
results <- j * j
}
}
func main() {
// 生产者-消费者模式
ch := make(chan int)
go producer(ch)
go consumer(ch)
// 工作池模式
jobs := make(chan int, 100)
results := make(chan int, 100)
go workers(jobs, results)
for i := 0; i < 100; i++ {
jobs <- i
}
close(jobs)
for i := 0; i < 100; i++ {
fmt.Println(<-results)
}
// 使用 context 取消 goroutine
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
fmt.Println("Canceled")
case <-time.After(1 * time.Second):
fmt.Println("Timeout && exit")
}
}()
time.Sleep(500 * time.Millisecond)
cancel()
} 