Go语言零到一:并发编程(go 高并发编程)

引言

Go 语言的设计鼓励开发者编写高效的并发程序。

并发模式

  • 生产者-消费者模式
    • 一个或多个生产者 goroutine 通过 channel 将数据发送给消费者 goroutine。
    • 示例:
ch := make(chan int) 
go producer(ch) 
go consumer(ch)
  • 工作池模式
    • 创建一组 worker goroutine 处理任务队列中的任务。
    • 示例:
jobs := make(chan int, 100) 
results := make(chan int, 100) 
go workers(jobs, results) 
for i := 0; i < 100; i++ { 
 jobs <- i 
} 
close(jobs)
  • 扇入/扇出模式
    • 扇入:多个 goroutine 向同一个 channel 发送数据。
    • 扇出:一个 goroutine 向多个 channel 发送数据。
    • 示例(扇入):
go func() { fanIn <- 1 }() 
go func() { fanIn <- 2 }() 
result := <-fanIn
  • 管道模式
    • 一系列 goroutine 通过 channel 连接,形成数据处理管道。
    • 示例:
ch1 := make(chan int) 
ch2 := make(chan int) 
go producer(ch1) 
go transformer(ch1, ch2) 
go consumer(ch2)

最佳实践

  • 避免共享内存
    • 使用 channel 而不是共享内存来通信。
    • 示例:
ch := make(chan int) 
go func() { 
 ch <- 42 
}() 
result := <-ch
  • 错误处理
    • 在 goroutine 中处理错误,并确保错误能够正确传播。
    • 示例:
chErr := make(chan error) 
go func() { 
 chErr <- doSomething() 
}() 
if err := <-chErr; err != nil { 
 handle(err) 
}
  • 通道关闭
    • 使用 close 函数关闭不再使用的通道。
    • 示例:
close(ch)
  • goroutine 数量控制
    • 控制 goroutine 的数量以避免不必要的资源消耗。
    • 示例:
var wg sync.WaitGroup 
for i := 0; i < 10; i++ { 
	wg.Add(1) 
	go func() { 
	defer wg.Done() 
	// 执行任务 
 }() 
} 
wg.Wait()
  • 使用 context
    • 使用 context 包来管理 goroutine 的生命周期和取消操作。
    • 示例:
ctx, cancel := context.WithCancel(context.Background()) 
go func() { 
 select { 
 case <-ctx.Done(): 
	fmt.Println("Canceled") 
 case <-time.After(1 * time.Second): 
	fmt.Println("Timeout") 
 } 
}() 
time.Sleep(500 * time.Millisecond) 
cancel()
  • 死锁预防
    • 确保 goroutine 正确地释放资源,避免死锁。
    • 示例:
var mu sync.Mutex 
mu.Lock() 
defer mu.Unlock() // 执行任务
  • 竞争状态检测
    • 使用 sync/atomic 包和 testing 包中的 TestMain 函数来检测竞争状态。
    • 示例:
func TestConcurrentAccess(t *testing.T) { 
 t.Parallel() 
 var counter int32 
 for i := 0; i < 1000; i++ { 
 go func() { 
	atomic.AddInt32(&counter, 1) 
 }() 
 } 
 time.Sleep(1 * time.Second) 
 if atomic.LoadInt32(&counter) != 1000 { 
	t.Errorf("Counter expected to be 1000, got %d", counter) 
 } 
}

编码实践

// https://go.dev/play/p/xgyepsc_jw7
package main

import (
	"context"
	"fmt"
	"time"
)

func producer(ch chan<- int) {
	for i := 0; i < 10; i++ {
		ch <- i
		time.Sleep(100 * time.Millisecond)
	}
	close(ch)
}

func consumer(ch <-chan int) {
	for value := range ch {
		fmt.Println("Received:", value)
	}
}

func workers(jobs <-chan int, results chan<- int) {
	for j := range jobs {
		results <- j * j
	}
}

func main() {
	// 生产者-消费者模式
	ch := make(chan int)
	go producer(ch)
	go consumer(ch)

	// 工作池模式
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	go workers(jobs, results)
	for i := 0; i < 100; i++ {
		jobs <- i
	}
	close(jobs)
	for i := 0; i < 100; i++ {
		fmt.Println(<-results)
	}

	// 使用 context 取消 goroutine
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		select {
		case <-ctx.Done():
			fmt.Println("Canceled")
		case <-time.After(1 * time.Second):
			fmt.Println("Timeout && exit")
		}
	}()
	time.Sleep(500 * time.Millisecond)
	cancel()
}
原文链接:,转发请注明来源!