Go语言中的互斥锁与同步机制:原子操作的妙用

发表时间: 2022-10-22 15:02

同步

一、锁的使用

  1. go语言除了提供并发编程模型还提供了同步工具(sync,sync/atomic)包括互斥锁,读写锁,
  2. 互斥锁

I. 互斥锁 是并发程序对共享资(临界区)源进行访问控制的主要手段,由标准库sync.Mutex类型提供。

II. sync.Mutex零值表示未被锁定的互斥量,提供俩个方法 Lock和Unlock,前者用于锁定,后者用于解锁

III.//案例1
var mx sync.Mutex
mx.Lock()
defer mx.Unlock()
//案例2
func main() {
var mx sync.Mutex
log.Println("Lock the lock (G0)")
mx.Lock()
log.Println("the lock is locked.(G0)")
for i:=1;i<=3;i++ {
go func(i int) {
log.Printf("lock the lock (G%d)\n",i)
mx.Lock()
log.Printf("the lock is locked (G%d)\n",i)
}(i)
}

time.Sleep(time.Second)
log.Println("unlock the (G0)")
mx.Unlock()
log.Println("the lock is unlocked.(G0)")
time.Sleep(time.Second)
}

/**
E:\source\go\src\exam>go run main.go
2022/10/08 14:11:46 Lock the lock (G0)
2022/10/08 14:11:46 the lock is locked.(G0)
2022/10/08 14:11:46 lock the lock (G3)
2022/10/08 14:11:46 lock the lock (G2)
2022/10/08 14:11:46 lock the lock (G1)
2022/10/08 14:11:47 unlock the (G0)
2022/10/08 14:11:47 the lock is unlocked.(G0)
2022/10/08 14:11:47 the lock is locked (G3)
*/

IV. main函数的goroutine 称G0,另外又开了3个goroutine 分别称G1,G2,G3.在启用这3个Goroutine之前已经对互斥锁mx进行了锁定。

VI. 我们看到G1,G2,G3 在G0 锁定后,都被阻塞了。原因是该互斥锁已经处于锁定状态。

VII. 随后我们释放了G0,之后G3获得了互斥锁的锁定,其余依然处于锁定状态。

VIII. 其他语言可能会忘记锁定的互斥量,忘记解锁。然而Go语言提供了defer mx.Unlock 语句 极大地降低了可能性

IX. 互斥锁的锁定操作和解锁操作必须成对出现,如果对于一个已经锁定的互斥量,重复锁定将会被阻塞,直到该互斥锁回到解锁状态。

X. 虽然互斥锁可以在多个goroutine共享,但我们强烈建议把同一个互斥锁的成对锁定与解锁操作放在同一个层次中。

  1. 读写锁

I. 读写锁针对于读写操作的互斥锁,与普通的互斥锁最大的不同在于 可以分别针对读操作和写操作进行 锁定和解锁操作。

II. 读写锁允许任意个读操作同时进行,同一时刻只允许有一个写操作在进行;在写操作时,读操作也不被允许。

III.

var mx sync.RWMutex
mx.Lock()
mx.Unlock()
mx.RLock()
mx.RUnlock()


IV. 前者对应写操作的锁定和解锁,后者对应读操作的锁定和解锁;写解锁会唤醒被阻塞的读锁goroutine 。读解锁在确定无其他读锁时,会唤醒被阻塞的写锁goroutine。

VI. 对于未被写锁定的读写锁 进行写解锁,会引发运行时恐慌;对于未被读锁定的读写锁 进行 读解锁不会。

  1. 锁的案例

I. 创建一个文件来存放数据

II. 同一时刻允许有多个goroutine分别对文件写操作和读操作,写入操作不能穿插,并且彼此独立。读操作要相互独立,每次读取的数据块不能重复,且按顺序读取。

III.package main

import (
"errors"
"io"
"os"
"sync"
)

type Data []byte

type DataFile interface {
//一次读取一个数据块
Read() (rsn int64,d Data,err error)
//一次写入一个数据块
Write(d Data) (wsn int64,err error)
//返回最后读取数据块的序号
Rsn() int64
//返回最后写入数据块的序号
Wsn() int64
//写入或读取数据块的 长度
DataLen() int64
}


type myDataFile struct {
//文件描述符
f *os.File
//读写锁
fMutex sync.RWMutex
//写入偏移量
wOffset int64
//读入偏移量
rOffset int64
//写偏移量 互斥锁
wMutex sync.Mutex
//读偏移量互斥锁
rMutex sync.Mutex
//数据块长度
dataLen uint32
}

func NewDataFile(path string,dataLen uint32) (DataFile,error) {
f,err := os.Create(path)
if err != nil {
return nil, err
}
if dataLen ==0 {
return nil, errors.New("invalid data length")
}
df := &myDataFile{f: f,dataLen: dataLen}
return df,nil
}

/**
1. 获取并更新读偏移量
2. 根据偏移量 从文件中读取指定len一块数据
3. 把数据块封装成Data类型,
*/
func (f *myDataFile) Read() (rsn int64,d Data,err error) {
//1. 获取并更新读偏移量
var offset int64
f.rMutex.Lock()
offset = f.rOffset
f.rOffset += int64(f.dataLen)
f.rMutex.Unlock()

//2. 根据偏移量 从文件中读取指定len一块数据
rsn = offset/int64(f.dataLen)

bytes := make([]byte,f.dataLen)
// 3. 把数据块封装成Data类型,
for {
f.fMutex.RLock()
_,err := f.f.WriteAt(bytes,offset)
if err != nil {
if err == io.EOF {
f.fMutex.RUnlock()
continue
}
f.fMutex.RUnlock()
return 0, nil, err
}
}
d = bytes
return
}
func (f *myDataFile) Write(d Data) (wsn int64,err error) {
//1. 读入写入偏移量并修改
var offset int64
f.wMutex.Lock()
offset = f.wOffset
f.wOffset += int64(f.dataLen)
f.wMutex.Unlock()
//写入数据块
wsn = offset/int64(f.dataLen)
var bytes []byte
if len(d) > int(f.dataLen) {
bytes = d[:f.dataLen]
} else {
bytes = d
}
f.fMutex.Lock()
defer f.fMutex.Unlock()
_,err = f.f.WriteAt(bytes,offset)
return wsn, err
}

/**
*/
func (f *myDataFile) Rsn() int64 {
f.rMutex.Lock()
defer f.rMutex.Unlock()
return f.rOffset/int64(f.dataLen)
}


/**
1. 互斥锁 锁定操作
2. 同时通过defer及时解锁
*/
func (f *myDataFile) Wsn() int64 {
f.wMutex.Lock()
defer f.wMutex.Unlock()
return f.wOffset/int64(f.dataLen)
}


func (f *myDataFile) DataLen() int64 {
return 0
}

IV. 需要设置三个锁,对文件的读写锁,分别

对读偏移量,写偏移量的两个互斥锁。

VI. 但在读的goroutine > 写goroutine时,写goroutine阻塞,导致写操作没有机会。最终致使没有数据可读/io.EOF,需要针对这种边界做特别处理

二、条件变量

  1. 需要借助sync.NewCond函数创建 sync.Cond条件变量类型。而sync.Cond有三个方法Wait,Signal,BroadCast方法。分别代表了等待通知,单发通知,广播通知。
  2. Wait方法会自动对与该条件变量关联的锁进行解锁,同时使调用方所在goroutine被阻塞。一旦该方法收到通知,就会尝试再次锁定该锁。如果锁定成功,就会唤醒被阻塞的goroutine
  3. Signal,BroadCast作用都是 发送 通知 来唤醒 被Wait阻塞的goroutine,不同的是,前者是单一通知。后者是广播通知
  4. /**
    1. 获取并更新读偏移量
    2. 根据偏移量 从文件中读取指定len一块数据
    3. 把数据块封装成Data类型,
    */
    func (f *myDataFile) Read() (rsn int64,d Data,err error) {
    //1. 获取并更新读偏移量
    var offset int64
    f.rMutex.Lock()
    offset = f.rOffset
    f.rOffset += int64(f.dataLen)
    f.rMutex.Unlock()

    //2. 根据偏移量 从文件中读取指定len一块数据
    f.cond= sync.NewCond(&f.fMutex)
    rsn = offset/int64(f.dataLen)
    bytes := make([]byte,f.dataLen)
    // 3. 把数据块封装成Data类型,
    f.fMutex.RLock()
    defer f.fMutex.Unlock()
    for {
    _,err := f.f.WriteAt(bytes,offset)
    if err != nil {
    if err == io.EOF {
    f.cond.Wait() //通过条件变量阻塞当前 goroutine 让其他goroutine 有写的机会

    continue
    }
    f.fMutex.RUnlock()
    return 0, nil, err
    }
    }

    d = bytes
    return
    }
  5. 文件内容读操作造成EOF错误时,通过条件变量让当前 goroutine暂时放弃对fmutex 读锁,阻塞当前携程,并等待通知的到来。
  6. 在写操作完成后,应该及时向条件变量发送signal通知唤醒读goroutine

func (f *myDataFile) Write(d Data) (wsn int64,err error) {
//1. 读入写入偏移量并修改
var offset int64
f.wMutex.Lock()
offset = f.wOffset
f.wOffset += int64(f.dataLen)
f.wMutex.Unlock()

//写入数据块
wsn = offset/int64(f.dataLen)
var bytes []byte
if len(d) > int(f.dataLen) {
bytes = d[:f.dataLen]
} else {
bytes = d
}
f.fMutex.Lock()
defer f.fMutex.Unlock()
_,err = f.f.WriteAt(bytes,offset)
f.cond.Signal() //通知读锁定获得锁
return wsn, err
}

三、原子操作

  1. 原子操作是过程不能被中断的操作,针对某个值的原子操作在进行中,cpu不会再进行其他针对该值的操作
  2. go语言提供int32,int64,uint32,uint64,uintptr,unsafe.Pointer,类型,对应的操作有 增/减/比较并交换/载入/存储/交换
  3. 原子增减操作即可实现对被操作值得增加或减小。atomic.AddInt64(&i,3)
  4. 比较并交换,atomic.CompareAndSwapInt32(&value,old,new).仅当old与value相等时,new会替换掉old值。
  5. 载入,atomic.LoadInt32(&value),原子读取变量value
  6. 存储,原子存储某个值得过程中,任何cpu都不会进行针对同一个值得读或写入操作。

四、执行一次

  1. sync.Once 类型变量 接受一个无参数,无结果的函数值。该方法一旦被调用,就会调用由参数传进来的那个函数。无论我们调用该方法多少次,无论我们多次调用传递给它的参数值是否相同,都仅有第一次调用有效。
  2. package main

    import (
    "fmt"
    "sync"
    "time"
    )
    var once sync.Once
    func main() {
    var num int
    sign := make(chan bool)
    f := func(ii int ) func() {
    return func() {
    num = num + ii *2
    sign <- true
    }
    }

    for i:=0;i<3;i++ {
    fi := f(i+1)
    go once.Do(fi)
    }

    for j:=0;j<3;j++ {
    select {
    case <-sign:
    fmt.Println("received a signal")
    case <-time.After(time.Millisecond* 100):
    fmt.Println("timeout.")

    }
    }
    fmt.Printf("Num:%d\n",num)
    }
    /**
    2022/10/09 11:27:44 0
    2022/10/09 11:27:44 received a signal
    2022/10/09 11:27:44 timeout.
    2022/10/09 11:27:44 timeout.
    2022/10/09 11:27:44 Num:2

    */

五、WaitGroup

  1. var wg sync.WaitGroup 是一个结构体类型,其中计数值是0;可以通过wg.Add方法增加或减少计数值,但千万别让计数值为负数,会产生运行恐慌。也可以通过wg.Done方法减少计数值。
  2. 调用wg.Wait方法会检查计数值是否为0;如果不为0,就会阻塞当前goroutine。
  3. package main

    import "sync"

    func main() {
    var wg sync.WaitGroup
    wg.Add(3)
    go func() { // G2
    wg.Done()
    }()
    go func() { // G3
    wg.Done()
    }()
    go func() { // G4
    wg.Done()
    }()
    wg.Wait()
    }

六、临时对象池

  1. package main

    import (
    "log"
    "runtime"
    "runtime/debug"
    "sync"
    "sync/atomic"
    )

    func main() {
    defer debug.SetGCPercent(debug.SetGCPercent(-1))
    var count int32
    newFunc := func() interface{} {
    return atomic.AddInt32(&count,1)
    }
    pool := sync.Pool{New: newFunc}
    v1 := pool.Get()
    log.Printf("v1:%v\n",v1)
    //临时对象池的存储
    pool.Put(newFunc())
    //pool.Put(newFunc())
    //pool.Put(newFunc())
    v2 := pool.Get()
    log.Printf("v2:%v\n",v2)

    //垃圾回收对临时对象池的影响
    debug.SetGCPercent(100)
    runtime.GC()
    v3 := pool.Get()
    log.Printf("v3:%v\n",v3)
    //pool.New = nil
    v4 := pool.Get()
    log.Printf("v4:%v\n",v4)
    }
    /**
    E:\source\go\src\exam>go run main.go
    2022/10/09 12:04:42 v1:1
    2022/10/09 12:04:42 v2:2
    2022/10/09 12:04:42 v3:3
    2022/10/09 12:04:42 v4:4
    */
  2. 临时对象池的对象值可能在任何时候被移除,并不会通知池的调用方。在垃圾回收器开始回收内存垃圾的时候。
  3. 被赋值给New字段的函数被临时用来创建对象