跳至主要內容

Go中的并发问题

程序员李某某原创Golang并发大约 4 分钟

Go中的并发问题

临界变量

先看个并发问题的示例

var wg sync.WaitGroup
var x int = 0	// 共享变量
func main(){
    wg.Add(2)
    
    go add()
    go add()
    
   	wg.Wait()
	fmt.Println(x)
}

func add(){
	defer wg.Done()
	for i := 0 ;i < 100000;i++{
		x++
	}
}

数量不对

互斥锁

加上锁

var lock sync.Mutex		// 互斥锁
func add() {
	defer wg.Done()
	for i := 0; i < 50000; i++ {
		lock.Lock()
		x++
		lock.Unlock()
	}
}

读写锁

适用于读远远大于写的情况,读不锁,写锁

var lock sync.RWMutex

func main() {
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go read()
	}
	wg.Wait()
}

func read(){
	defer wg.Done()
	rwLock.RLock()//如果只是读数据,那么这个锁不产生影响,但是读写同时发生的时候,就会有影响
	fmt.Println("开始读取数据")
	time.Sleep(time.Second)
	fmt.Println("读取数据成功")
	rwLock.RUnlock()
}
func write(){
	defer wg.Done()
	rwLock.Lock()
	fmt.Println("开始修改数据")
	time.Sleep(time.Second * 10)
	fmt.Println("修改数据成功")
	rwLock.Unlock()
}

Once

很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等

sync包中提供了一个针对只执行一次场景的解决方案–sync.Once

sync.Once只有一个Do方法

加载配置文件示例

  • 延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。

  • 因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,

  • 而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的

  • 所以需要延迟加载(懒加载)

var icons map[string]image.Image

func loadIcons() {
	icons = map[string]image.Image{
		"left":  Icons("left.png"),
		"up":    Icons("up.png"),
		"right": Icons("right.png"),
		"down":  Icons("down.png"),
	}
}

// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
	if icons == nil {
		loadIcons()
	}
	return icons[name]
}

多个goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个goroutine都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:

func loadIcons() {
	icons = make(map[string]image.Image)
	icons["left"] = loadIcon("left.png")
	icons["up"] = loadIcon("up.png")
	icons["right"] = loadIcon("right.png")
	icons["down"] = loadIcon("down.png")
}

在这种情况下就会出现即使判断了icons不是nil也不意味着变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其他的goroutine操作,但是这样做又会引发性能问题。

使用sync.Once改造的示例代码如下:

var loadIconsOnce sync.Once

// Icon 是并发安全的
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}

并发安全的单例模式

type Singleton struct {}
var instance *Singleton
var once sync.Once

func GetInstance() *Singleton{
    once.Do(func (){
        instance = &Singleton{}
    })
    return instance
}

看看Once的结构体就知道,内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

Map

map不是并发安全的,看个示例

func main() {

	m := make(map[string]int)

	for i := 0; i < 5; i++ {
		wg.Add(1)
    i := i
		go func() {
			defer wg.Done()
			m[strconv.Itoa(i)] = i
		}()
	}

	wg.Wait()
  fmt.Println(m)  // fatal error: concurrent map writes
}

使用并发的Map,sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法

func main() {

	var m sync.Map
	for i := 0; i < 100; i++ {
		wg.Add(1)
		i := i
		go func() {
			defer wg.Done()
			m.Store(strconv.Itoa(i), i)
			v, _ := m.Load(strconv.Itoa(i))
			fmt.Printf("k = %d, v = %d\n", i, v)
		}()
	}
	wg.Wait()
}

原子操作

atomic包提供了很多原子操作

方法解释
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
读取操作
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
写入操作
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
修改操作
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
比较并交换操作

生产消费模型

package main
import (
	"fmt"
	"strconv"
)

func main(){
	
	productChan := make(chan Product ,100)
	shopChan := make(chan Product ,100)
	exitChan := make(chan bool,1)

	// 10个生产
	for  i:=0;i<10;i++{
		go producer(productChan,100,i)
	}
	// 1个运输
	go storage(productChan,shopChan)
	// 1个消费
	go cousumer(shopChan,1000,exitChan)
	
	for ;<-exitChan;{
		continue
	}
}
type Product struct{
	name string
}
func producer(productChan chan<- Product, count,i int){
	for{
		product := Product{name:"生产线"+strconv.Itoa(i)+"\t商品"+strconv.Itoa(count)}
		productChan<-product
		count--
		if count < 1{
			return
		}
	}
}

func storage(productChan <-chan Product, shopChan chan<- Product){
	for{
		product:=<-productChan
		shopChan<-product
		// fmt.Println("运输",product.name)
	}
}

func cousumer(shopChan <-chan Product, count int, exitChan chan<- bool){
	for{
		product:=<-shopChan
		fmt.Println("消费了",product.name)
		
		count--
		if count < 1{
			exitChan<-true
			return
		}
	}
}

上次编辑于:
贡献者: ext.liyuanhao3