0


Go语言的并发安全与互斥锁

线程通讯

在程序中不可避免的出现并发或者并行,一般来说对于一个程序大多数是遵循开发语言的启动顺序。例如,对于go语言来说,一般入口为

  1. main

,main中依次导入

  1. import

导入的包,并按顺序执行

  1. init

方法,之后在按照调用顺序执行程序。所以一般情况下程序是串行的。如下所示:
在这里插入图片描述

在很多时候串行并不满足要求,程序同时需要满足,很多客户访问,例如web程序必须要设置为并发的才能满足众多请求。

go语言通过

  1. go

关键字实现新线程,如下·:

  1. gofunc(){
  2. fmt.Println("-------开启一个新线程-----")}()

线程通讯,go语言实现线程通讯是通过进程通讯实现内存共享。go语言内置

  1. chan

数据结构实现线程通讯与数据共享。

  1. package main
  2. import("fmt""sync""time")var ch =make(chanint)var wati sync.WaitGroup
  3. funcmain(){
  4. wati.Add(2)goproducter()gocustomer()
  5. wati.Wait()}funcproducter(){for i :=0; i <10; i++{
  6. ch <- i *10
  7. fmt.Println("-----等待1秒")
  8. time.Sleep(time.Second *1)}
  9. wati.Done()close(ch)}funccustomer(){for{
  10. x, ok :=<-ch
  11. fmt.Println("-----------------custome", x, ok)if ok {
  12. fmt.Println("-----", x)}else{
  13. fmt.Println("----no data")break}}
  14. wati.Done()}

chan本身在设计上是并发安全的。这意味着多个协程可以同时安全地对一个chan进行发送和接收操作,而无需额外的同步措施。Go 语言的运行时系统会自动处理这些操作的并发安全性,保证数据的正确传递和协程的正确同步。

线程等待

在多线程中,各个线程是是独立的,并不知道线程完成的次序,所以线程等待也很重要,如下代码:

  1. package main
  2. import("os""sync")var Filestream =make(chan[]byte)funcmain(){var str =[]byte("hello")goRead(str)goWrite()}funcRead(by []byte){
  3. Filestream <- by
  4. close(Filestream)}funcWrite()error{
  5. file, err := os.Create("test.txt")if err !=nil{return err
  6. }
  7. by :=<-Filestream
  8. _, err = file.Write(by)if err !=nil{return err
  9. }returnnil}

上述代码通过

  1. chan

实现了多线程创建文件,但是实际上执行代码并不能成功,这是由于主线程没有等待其他线程,导致主线程在过早结束,程序结束,通过

  1. sync.WaitGroup

库实现线程等待,改造后的代码如下:

  1. package main
  2. import("os""sync")var Filestream =make(chan[]byte)var wait sync.WaitGroup
  3. funcmain(){var str =[]byte("hello")
  4. wait.Add(2)goRead(str)goWrite()
  5. wait.Wait()}funcRead(by []byte){
  6. Filestream <- by
  7. close(Filestream)
  8. wait.Done()}funcWrite()error{
  9. file, err := os.Create("test.txt")if err !=nil{return err
  10. }
  11. by :=<-Filestream
  12. _, err = file.Write(by)if err !=nil{return err
  13. }
  14. wait.Done()returnnil}

并发安全

数据锁

在并发时就会出现资源被竞争的情况,这就是涉及到并发安全了,例如对于一个数组

  1. var list []string

在对这个数组插入数据时,对于同一时刻进行的操作数据就可能会丢失,因此在操作数据时一定要保证操作的数据结构是并发安全的,例如

  1. sync.Map

就是线程安全的go语言底层实现了。

那么如何对自定义的数据结构体实现并发安全呢,就要用到互斥锁了。互斥锁是一种用于多线程或多协程编程中的同步原语,其主要目的是保护共享资源,防止多个线程或协程同时访问和修改这些资源,从而避免数据竞争和并发冲突。

在并发编程中,多个线程或协程可能会同时访问和修改同一个共享变量。如果不加以控制,就可能导致数据竞争,即多个操作同时对同一个数据进行读写,从而导致数据的不一致性或错误结果。

互斥锁通过提供一种互斥访问的机制,确保在任何时刻只有一个线程或协程能够访问被保护的共享资源。当一个线程或协程获取了互斥锁后,其他试图获取该锁的线程或协程就会被阻塞,直到锁被释放。

Go语言的互斥锁被

  1. sync.Mutex

实现。在go语言中提供了

  1. sync.Map

map类型是并发安全的。要实现自己的并发安全需要借助``sync.Mutex`如下:

  • 并发安装的结构体
  1. type safeArr[T any]struct{
  2. sync.Mutex
  3. data []T
  4. }func(self *safeArr[T])Add(item T){
  5. self.Lock()
  6. self.data =append(self.data, item)
  7. self.Unlock()}func(self *safeArr[T])Remove(index int){
  8. self.Lock()if index >=0&& index <len(self.data){
  9. self.data =append(self.data[:index], self.data[index+1:]...)}
  10. self.Unlock()}func(self *safeArr[T])Data()[]T {
  11. self.Lock()
  12. self.Unlock()return self.data
  13. }

注意,data 是不能直接向外部暴露的,不可以使用

  1. append

直接操作,必须通过并发安装的

  1. Add

方法。

  • 测试
  1. var arr safeArr[string]funcTest(c *fiber.Ctx)error{
  2. arr.Add("sss")return c.JSON(fiber.Map{"data": arr.data,})}funcmain(){
  3. app := fiber.New()
  4. app.Get("/test", Test)gofunc(){for{if requestCount ==0{
  5. wg.Done()}}}()
  6. app.Listen(":8081")}

通过上述代码测试代码测试不用路由请求进来后的线程对并发安全的

  1. safeArr

的处理是否满足要求。如下所示:

请添加图片描述
同步的路由线程进来之后,对全局的变量可以实现操作,这里并不能体现它是并发安全的,我们将

  1. Add

方法的

  1. self.Unlock()

注释掉,再次启动服务,请求接口如下:

请添加图片描述

如上图所示,在注释掉解锁代码后,再次请求会一直堵塞等待解锁,可以看出上述定义的结构体就是并发安全的了。

互斥锁会使线程变为阻塞线程,等待解锁,而不是直接停掉。

方法锁

如何实现对方法枷锁,同一时刻只允许一个线程使用该方法。对方法加锁和对数据结构加锁一样,再方法内部加锁。

  1. var arr []stringvar wu sync.Mutex
  2. funcTest(c *fiber.Ctx)error{
  3. wu.Lock()defer wu.Unlock()
  4. arr =append(arr,"ssss")return c.JSON(fiber.Map{"data": arr,})}funcmain(){
  5. app := fiber.New()
  6. app.Get("/test", Test)gofunc(){for{if requestCount ==0{
  7. wg.Done()}}}()
  8. app.Listen(":8081")}

如果将

  1. defer wu.Unlock()

注释掉,也会是线程进入等待状态,如下:

请添加图片描述

但是又会有新的问题,由于线程是独立的,此时存在一个线程处于阻塞状态,但是却可以再次发送新的请求,再阻塞的这段时间内,可以一直发送请求,多次请求会也会造成数据的错误,如下所示:
请添加图片描述
如上图可以发现,互斥锁可以是方法变成并发安全的,但是在线程等待的过程中仍然可以发送请求。

Redis互斥锁

在上一节的方法互斥锁中,需要额外的需求,就是如果当前的线程占用某个资源时,新的线程不会处于阻塞状态而是直接停止,这时就需要有外部的标识记录资源的占用情况,就需要借助r内存数据库如redis了。如下:

  • redis初始化
  1. var lock sync.Mutex
  2. var Client *CustomRedisClient
  3. type CustomRedisClient struct{
  4. redis.Conn
  5. }funcInit(){
  6. conn, err := redis.Dial("tcp", config.RedisVar.Host)if err !=nil{panic(errors.New("conn redis failed"))}_, err = conn.Do("auth", config.RedisVar.Password)if err !=nil{panic(errors.New("redis auth failed"))}
  7. c := CustomRedisClient{conn}
  8. Client =&c
  9. fmt.Println("conn redis success")}func(c *CustomRedisClient)Cmd(commandName string, args ...interface{})(reply interface{}, err error){
  10. lock.Lock()defer lock.Unlock()return c.Do(commandName, args...)}
  • redis互斥锁
  1. funcLock(name string)(error,*string){
  2. apply, err := Client.Cmd("GET", name)if apply !=nil|| err !=nil{return errors.New("locking"),nil}
  3. uid := utils.UUID()
  4. fmt.Printf("name: %v, uid: %v \n", name, uid)_, err = Client.Cmd("SET", name, uid,"EX",5)if err !=nil{
  5. Client.Cmd("DEL", name)return errors.New("redis set err"),nil}returnnil,&uid
  6. }funcUnlock(name string, uid string)error{
  7. reply, err := Client.Cmd("GET", name)if reply ==nil&& err ==nil{returnnil}
  8. s :=string(reply.([]byte))if s != uid {
  9. Client.Cmd("DEL", name)return errors.New("can not unlock")}_, err = Client.Cmd("DEL", name)if err !=nil{
  10. Client.Cmd("DEL", name)return err
  11. }returnnil}

使用上述的redis互斥锁就可以显示,当某个资源被线程占用时,另一个资源进来会直接停掉。

对代码改造使用新的互斥锁,如下:

  1. funcTest(c *fiber.Ctx)error{
  2. err, s := redis_util.Lock("test")if err !=nil{if errors.Is(err, errors.New("locking")){return c.JSON(fiber.Map{"data":"locking",})}else{return c.JSON(fiber.Map{"data": err.Error(),})}}defer redis_util.Unlock("test",*s)
  3. time.Sleep(time.Second *15)
  4. arr =append(arr,"ssss")return c.JSON(fiber.Map{"data": arr,})}

请添加图片描述

如上图所示,当资源被占用是会返回

  1. locking

,需要注意的是redis存储的互斥标识的时间一定要大于或等于程序的执行时间,不然程序还未执行玩,redis占用标识就销毁了,导致错误。


本文转载自: https://blog.csdn.net/xwh3165037789/article/details/143597828
版权归原作者 xvwen 所有, 如有侵权,请联系我们删除。

“Go语言的并发安全与互斥锁”的评论:

还没有评论