同步
一、锁的使用
I. 互斥锁 是并发程序对共享资(临界区)源进行访问控制的主要手段,由标准库sync.Mutex类型提供。
II. sync.Mutex零值表示未被锁定的互斥量,提供俩个方法 Lock和Unlock,前者用于锁定,后者用于解锁
III.//案例1
var mx sync.Mutex
mx.Lock()
defer mx.Unlock()
//案例2
func main() {
var mx sync.Mutex
log.Println("Lock the lock (G0)")
mx.Lock()
log.Println("the lock is locked.(G0)")
for i:=1;i<=3;i++ {
go func(i int) {
log.Printf("lock the lock (G%d)\n",i)
mx.Lock()
log.Printf("the lock is locked (G%d)\n",i)
}(i)
}
time.Sleep(time.Second)
log.Println("unlock the (G0)")
mx.Unlock()
log.Println("the lock is unlocked.(G0)")
time.Sleep(time.Second)
}
/**
E:\source\go\src\exam>go run main.go
2022/10/08 14:11:46 Lock the lock (G0)
2022/10/08 14:11:46 the lock is locked.(G0)
2022/10/08 14:11:46 lock the lock (G3)
2022/10/08 14:11:46 lock the lock (G2)
2022/10/08 14:11:46 lock the lock (G1)
2022/10/08 14:11:47 unlock the (G0)
2022/10/08 14:11:47 the lock is unlocked.(G0)
2022/10/08 14:11:47 the lock is locked (G3)
*/
IV. main函数的goroutine 称G0,另外又开了3个goroutine 分别称G1,G2,G3.在启用这3个Goroutine之前已经对互斥锁mx进行了锁定。
VI. 我们看到G1,G2,G3 在G0 锁定后,都被阻塞了。原因是该互斥锁已经处于锁定状态。
VII. 随后我们释放了G0,之后G3获得了互斥锁的锁定,其余依然处于锁定状态。
VIII. 其他语言可能会忘记锁定的互斥量,忘记解锁。然而Go语言提供了defer mx.Unlock 语句 极大地降低了可能性
IX. 互斥锁的锁定操作和解锁操作必须成对出现,如果对于一个已经锁定的互斥量,重复锁定将会被阻塞,直到该互斥锁回到解锁状态。
X. 虽然互斥锁可以在多个goroutine共享,但我们强烈建议把同一个互斥锁的成对锁定与解锁操作放在同一个层次中。
I. 读写锁针对于读写操作的互斥锁,与普通的互斥锁最大的不同在于 可以分别针对读操作和写操作进行 锁定和解锁操作。
II. 读写锁允许任意个读操作同时进行,同一时刻只允许有一个写操作在进行;在写操作时,读操作也不被允许。
III.
var mx sync.RWMutex
mx.Lock()
mx.Unlock()
mx.RLock()
mx.RUnlock()
IV. 前者对应写操作的锁定和解锁,后者对应读操作的锁定和解锁;写解锁会唤醒被阻塞的读锁goroutine 。读解锁在确定无其他读锁时,会唤醒被阻塞的写锁goroutine。
VI. 对于未被写锁定的读写锁 进行写解锁,会引发运行时恐慌;对于未被读锁定的读写锁 进行 读解锁不会。
I. 创建一个文件来存放数据
II. 同一时刻允许有多个goroutine分别对文件写操作和读操作,写入操作不能穿插,并且彼此独立。读操作要相互独立,每次读取的数据块不能重复,且按顺序读取。
III.package main
import (
"errors"
"io"
"os"
"sync"
)
type Data []byte
type DataFile interface {
//一次读取一个数据块
Read() (rsn int64,d Data,err error)
//一次写入一个数据块
Write(d Data) (wsn int64,err error)
//返回最后读取数据块的序号
Rsn() int64
//返回最后写入数据块的序号
Wsn() int64
//写入或读取数据块的 长度
DataLen() int64
}
type myDataFile struct {
//文件描述符
f *os.File
//读写锁
fMutex sync.RWMutex
//写入偏移量
wOffset int64
//读入偏移量
rOffset int64
//写偏移量 互斥锁
wMutex sync.Mutex
//读偏移量互斥锁
rMutex sync.Mutex
//数据块长度
dataLen uint32
}
func NewDataFile(path string,dataLen uint32) (DataFile,error) {
f,err := os.Create(path)
if err != nil {
return nil, err
}
if dataLen ==0 {
return nil, errors.New("invalid data length")
}
df := &myDataFile{f: f,dataLen: dataLen}
return df,nil
}
/**
1. 获取并更新读偏移量
2. 根据偏移量 从文件中读取指定len一块数据
3. 把数据块封装成Data类型,
*/
func (f *myDataFile) Read() (rsn int64,d Data,err error) {
//1. 获取并更新读偏移量
var offset int64
f.rMutex.Lock()
offset = f.rOffset
f.rOffset += int64(f.dataLen)
f.rMutex.Unlock()
//2. 根据偏移量 从文件中读取指定len一块数据
rsn = offset/int64(f.dataLen)
bytes := make([]byte,f.dataLen)
// 3. 把数据块封装成Data类型,
for {
f.fMutex.RLock()
_,err := f.f.WriteAt(bytes,offset)
if err != nil {
if err == io.EOF {
f.fMutex.RUnlock()
continue
}
f.fMutex.RUnlock()
return 0, nil, err
}
}
d = bytes
return
}
func (f *myDataFile) Write(d Data) (wsn int64,err error) {
//1. 读入写入偏移量并修改
var offset int64
f.wMutex.Lock()
offset = f.wOffset
f.wOffset += int64(f.dataLen)
f.wMutex.Unlock()
//写入数据块
wsn = offset/int64(f.dataLen)
var bytes []byte
if len(d) > int(f.dataLen) {
bytes = d[:f.dataLen]
} else {
bytes = d
}
f.fMutex.Lock()
defer f.fMutex.Unlock()
_,err = f.f.WriteAt(bytes,offset)
return wsn, err
}
/**
*/
func (f *myDataFile) Rsn() int64 {
f.rMutex.Lock()
defer f.rMutex.Unlock()
return f.rOffset/int64(f.dataLen)
}
/**
1. 互斥锁 锁定操作
2. 同时通过defer及时解锁
*/
func (f *myDataFile) Wsn() int64 {
f.wMutex.Lock()
defer f.wMutex.Unlock()
return f.wOffset/int64(f.dataLen)
}
func (f *myDataFile) DataLen() int64 {
return 0
}
IV. 需要设置三个锁,对文件的读写锁,分别
对读偏移量,写偏移量的两个互斥锁。
VI. 但在读的goroutine > 写goroutine时,写goroutine阻塞,导致写操作没有机会。最终致使没有数据可读/io.EOF,需要针对这种边界做特别处理
二、条件变量
func (f *myDataFile) Write(d Data) (wsn int64,err error) {
//1. 读入写入偏移量并修改
var offset int64
f.wMutex.Lock()
offset = f.wOffset
f.wOffset += int64(f.dataLen)
f.wMutex.Unlock()
//写入数据块
wsn = offset/int64(f.dataLen)
var bytes []byte
if len(d) > int(f.dataLen) {
bytes = d[:f.dataLen]
} else {
bytes = d
}
f.fMutex.Lock()
defer f.fMutex.Unlock()
_,err = f.f.WriteAt(bytes,offset)
f.cond.Signal() //通知读锁定获得锁
return wsn, err
}
三、原子操作
四、执行一次
五、WaitGroup
六、临时对象池