福建路桥建设有限公司网站,建设网站iss,建筑工程网cnas,空包网站建设Go的并发模型已经在https://guisu.blog.csdn.net/article/details/129107148 详细说明。 1、channel使用详解 1、channel概述
Go的CSP并发模型#xff0c;是通过goroutine和channel来实现的。
channel是Go语言中各个并发结构体(goroutine)之前的通信机制。 通俗的讲#xf…Go的并发模型已经在https://guisu.blog.csdn.net/article/details/129107148 详细说明。 1、channel使用详解 1、channel概述
Go的CSP并发模型是通过goroutine和channel来实现的。
channel是Go语言中各个并发结构体(goroutine)之前的通信机制。 通俗的讲就是各个goroutine之间通信的”管道“有点类似于Linux中的管道。Go并发的核心哲学是不要通过共享内存进行通信; 相反通过沟通分享记忆。channel是Go提供goroutine间的通信方式使用channel可以使多个goroutine之间通信。channel是进程内的通信方式通过channel传递对象的过程和调用函数时的参数传递行为比较一致比如也可以传递指针等。
如需跨进程通信Go建议用分布式系统的方法来解决如使用Socket或者HTTP等通信协议Go语言在网络方面也有非常完善的支持。
主要应用场景
数据交流当作并发的 buffer 或者 queue解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者Producer和消费者Consumer。数据传递一个goroutine将数据交给另一个goroutine相当于把数据的拥有权托付出去。信号通知一个goroutine可以将信号(closingcloseddata ready等)传递给另一个或者另一组goroutine。任务编排可以让一组goroutine按照一定的顺序并发或者串行的执行这就是编排功能。锁机制利用channel实现互斥机制。2、channel基本语法
每个channel都有一个特殊的类型也就是channels可发送数据的类型。一个可以发送int类型数据的channel一般写为chan int。声明通道var 通道变量 chan 通道类型var channame chan ElementType创建通道makechan 数据类型, [缓冲大小]
channel跟map类似的在使用之前都需要使用make进行初始化ch1 : make(chan int, 5) 未初始化的channel零值默认为nil是一种特殊的 chan对值是 nil 的 chan 的发送接收调用者总是会阻塞。
var ch chan int
fmt.Println(ch) // nil
channel的基本用法非常简单它提供了三种类型分别为只能接收只能发送既能接收也能发送这三种类型 var channame chan - ElementType //只写只能发送ElementType var channame - chan ElementType //只读只能从chan里接收ElementType
var channame chan ElementType //能读能写既能接收也能发送
我们把既能发送也能接收的chan被称为双向chan把只能接收或者只能发送的chan称为单向。
而对于close方法只能是发送通道拥有。 箭头总是射向左边的元素类型总在最右边。 如果箭头指向 chan就表示可以往 chan 中发送(写)数据 如果箭头远离 chan就表示 chan 会往外吐数据即能从chan里接收(读)数据 channel - 1 //向channel添加一个值为1
- channel //从channel取出一个值
a : - channel //从channel取出一个值并赋值给a
a,b : - channel //从channel取出一个值赋值给a如果channel已经关闭或channel没有值b为false
3、通信机制
成对出现在通信过程中传数据channel - data和取数据-channel必然会成对出现因为这边传那边取两个goroutine之间才会实现通信。阻塞不管传还是取必阻塞直到另外的goroutine传或者取为止。channel仅允许被一个goroutine读写。
1只能接收数据的chan
package main import fmt// a 表示只能接收数据的chan
func goChanA(a -chan int) {b : -afmt.Println(只能接收数据的channal[a]接收到的数据值为, b)
}func main() {ch : make(chan int, 2)go goChanA(ch)// 往ch中写入数据值ch - 2time.Sleep(time.Second)
}结果只能接收数据的channal[a]接收到的数据值为 2
2只能发送数据的chan
package main import fmtfunc main() {ch : make(chan- int, 2)ch - 200
}往 chan 中发送一个数据使用“ch-”。 这里的 ch 是 chan int 类型或者是 chan -int。 3同步主协程和子协程之间通信
func main(){ch : make(chan int)go func() {ch - 996 //向ch添加元素}()a : - chfmt.Println(a)fmt.Println(程序结束)
}4)、两个子协程的通信
使用channel实现两个goroutine之间通信。
func two() {tc : make(chan string)ch : make(chan int)// 第一个协程go func() {tc - 协程A我在添加数据ch - 1}()// 第二个协程go func() {content : - tcfmt.Printf(协程B我在读取数据:%s\n,content)ch - 2}()- ch- chfmt.Println(程序结素)
}
func main(){two()
}5、channel仅允许被一个goroutine读写。
package main
import (fmttime
)
func goRoutineA(a -chan int) {val : -afmt.Println(goRoutineA received the data, val)
}
func goRoutineB(b chan int) {val : -bfmt.Println(goRoutineB received the data, val)
}
func main() {ch : make(chan int, 3)go goRoutineA(ch)go goRoutineB(ch)ch - 3time.Sleep(time.Second * 1)
}
6)、一直阻塞的情况
如果当前协程正在从一个没有任何值的通道中读取数据那么当前协程会阻塞并且等待其他协程往此通道写入值。因此读操作将被阻塞。类似的如果你发送数据到一个通道它将阻塞当前协程直到有其他协程从通道中读取数据。此时写操作将阻塞 。
主线程在进行通道操作的时候造成死锁
package mainimport fmtfunc main() {fmt.Println(mainGo start)channel : make(chan string)// 给通道 channel 传入一个数据GoLang.channel - GoLang//此时主线程将阻塞直到有协程接收这个数据. Go的调度器开始调度协程接收通道 channel 的数据// 但是由于没有协程接受没有协程是可被调度的。所有协程都进入休眠状态即是主程序阻塞了。fmt.Println(mainGo stop)
}/*
报错
mainGo go start
fatal error: all goroutines are asleep - deadlock! //所有协程都进入休眠状态,死锁goroutine 1 [chan send]:
main.main()
*/4、channel缓冲区
无缓冲通道make(chan int)指在接收前没有能力保存任何值的通道这种类型的通道要求发送goroutine和接收goroutine同时准备好才能完成发送和接收操作。有缓冲通道make(chan int, 2)指在被接收前能存储一个或者多个值的通道这种类型的通道并不强制要求goroutine之间必须同时完成发送和接收。 例子
package mainimport fmtfunc main() {ch1 : make(chan int)ch1 - 5rec : -ch1fmt.Println(ch1被接受程序结束:rec:,, rec)
}
//fatal error: all goroutines are asleep - deadlock! 由于ch1没有缓冲区channel没有缓冲区的话 只有在有接收方能够接收值的时候才能发送成功否则会一直处于等待发送的阶段。同理如果对一个无缓冲通道执行接收操作时没有任何向通道中发送值的操作那么也会导致接收操作阻塞。 如果想要运行成功那么在发送信息前就应该有另外的协程等待着接收
package mainimport (fmttime
)func main() {ch1 : make(chan int)go receive(ch1)ch1 - 5time.Sleep(time.Second)}
func receive(ch1 chan int) {for {select {case rec2 : -ch1:fmt.Println(ch1被接受程序结束:rec:,, rec2)}}
}
//ch1被接受程序结束:rec:, 5 但是如果有缓冲区就能避免程序阻塞可以将发送的channel放在缓冲区直至有接收方将它接收
向channel添加数据超过缓存会出现死锁
func main() {ch : make(chan int,3)ch - 1//- chch - 1ch - 1ch - 1fmt.Println(ok)
}-----
5、 阻塞的 gorutinue 与资源泄露
在 2012 年 Google I/O 大会上Rob Pike 的 Go Concurrency Patterns 演讲讨论 Go 的几种基本并发模式如 完整代码 中从数据集中获取第一条数据的函数 func First(query string, replicas []Search) Result {c : make(chan Result)replicaSearch : func(i int) { c - replicas[i](query) }for i : range replicas {go replicaSearch(i)}return -c}
在搜索重复时依旧每次都起一个 goroutine 去处理每个 goroutine 都把它的搜索结果发送到结果 channel 中channel 中收到的第一条数据会直接返回。
返回完第一条数据后其他 goroutine 的搜索结果怎么处理他们自己的协程如何处理
在 First() 中的结果 channel 是无缓冲的这意味着只有第一个 goroutine 能返回由于没有 receiver其他的 goroutine 会在发送上一直阻塞。如果你大量调用则可能造成资源泄露。
为避免泄露你应该确保所有的 goroutine 都能正确退出有 2 个解决方法 使用带缓冲的 channel确保能接收全部 goroutine 的返回结果 func First(query string, replicas ...Search) Result { c : make(chan Result,len(replicas)) searchReplica : func(i int) { c - replicas[i](query) }for i : range replicas {go searchReplica(i)}return -c}
使用 select 语句配合能保存一个缓冲值的 channel default 语句
default 的缓冲 channel 保证了即使结果 channel 收不到数据也不会阻塞 goroutine func First(query string, replicas ...Search) Result { c : make(chan Result,1)searchReplica : func(i int) {select {case c - replicas[i](query):default:}}for i : range replicas {go searchReplica(i)}return -c} 使用特殊的废弃cancellation channel 来中断剩余 goroutine 的执行 func First(query string, replicas ...Search) Result { c : make(chan Result)done : make(chan struct{})defer close(done)searchReplica : func(i int) {select {case c - replicas[i](query):case - done:}}for i : range replicas {go searchReplica(i)}return -c}
Rob Pike 为了简化演示没有提及演讲代码中存在的这些问题。不过对于新手来说可能会不加思考直接使用。 二、使用Select来进行调度 select就是用来监听和channel有关的IO操作当 IO 操作发生时触发相应的动作。
Select 和 swith结构很像但是select中的case的条件只能是I/O。 Select 的使用方式类似于 switch 语句它也有一系列 case 分支和一个默认的分支。 每个 case分支会对应一个通道的通信接收或发送过程。select 会一直等待直到其中的某个 case 的通信操作完成时就会执行该 case分支对应的语句。 具体格式如下
select {
case -ch1://...
case rec : -ch2://...
case ch3 - 10://...
default://默认操作
}
select里面case是随机执行的如果都不满足条件那么就执行default
select总结
每个case必须是一个I/O操作case是随机执行的如果多个 case 同时满足select 会随机选择一个执行。如果所有case不能执行那么会执行default如果所有case不能执行且没有default会出现阻塞对于没有 case 的 select 会一直阻塞可用于阻塞 main 函数防止退出
实现一个一直接收消息
func main() {ch : make(chan int)for i : 1; i 10; i {go func(j int) {ch - j}(i)}for {select {case a1 : - ch:fmt.Println(a1)default:}}
}
示例2:
package mainimport (fmttime
)func goRoutineD(ch chan int, i int) {time.Sleep(time.Second * 3)ch - i
}
func goRoutineE(chs chan string, i string) {time.Sleep(time.Second * 3)chs - i}func main() {ch : make(chan int, 5)chs : make(chan string, 5)go goRoutineD(ch, 5)go goRoutineE(chs, ok)select {case msg : -ch:fmt.Println( received the data , msg)case msgs : -chs:fmt.Println( received the data , msgs)default:fmt.Println(no data received )time.Sleep(time.Second * 1)}}
运行程序因为当前时间没有到3s所以select 选择defult
no data received
修改程序我们注释掉default并多执行几次结果为
received the data 5
received the data ok
received the data ok
received the data ok
select语句会阻塞直到监测到一个可以执行的IO操作为止而这里goRoutineD和goRoutineE睡眠时间是相同的都是3s从输出可看出从channel中读出数据的顺序是随机的。
再修改代码goRoutineD睡眠时间改成4s
func goRoutineD(ch chan int, i int) {time.Sleep(time.Second * 4)ch - i
}
此时会先执行goRoutineEselect 选择case msgs : -chs。
三、死锁deadlock 指两个或两个以上的协程的执行过程中由于竞争资源或由于彼此通信而造成的一种阻塞的现象。
在非缓冲信道若发生只流入不流出或只流出不流入就会发生死锁。
下面是一些死锁的例子
package mainfunc main() {ch : make(chan int)ch - 3
}
上面情况向非缓冲通道写数据会发生阻塞导致死锁。解决办法创建缓冲区 ch : make(chan int3)
package main
import (fmt
)func main() {ch : make(chan int)fmt.Println(-ch)
}
向非缓冲通道读取数据会发生阻塞导致死锁。 解决办法开启缓冲区先向channel写入数据。
package mainfunc main() {ch : make(chan int, 3)ch - 3ch - 4ch - 5ch - 6
}
写入数据超过缓冲区数量也会发生死锁。解决办法将写入数据取走。
死锁的情况有很多这里不再赘述。
还有一种情况向关闭的channel写入数据不会产生死锁产生panic。
package mainfunc main() {ch : make(chan int, 3)ch - 1close(ch)ch - 2
}
解决办法别向关闭的channel写入数据。 四、channel实现原理 1、channel数据结构
channel一个类型管道通过它可以在goroutine之间发送消息和接收消息。它是golang在语言层面提供的goroutine间的通信方式。通过源代码分析程序执行过程源码src/runtime/chan.go
channel结构体hchan
type hchan struct {qcount uint // 当前队列列中剩余元素个数dataqsiz uint // 环形队列长度即可以存放的元素个数即缓冲区的大小即makechan TNN.buf unsafe.Pointer // 环形队列列指针elemsize uint16 // 每个元素的⼤⼩closed uint32 // 标识关闭状态表示当前通道是否处于关闭状态。创建通道后该字段设置为0即通道打开; 通过调用close将其设置为1通道关闭。elemtype *_type // 元素类型用于数据传递过程中的赋值sendx uint // 队列下标指示元素写⼊入时存放到队列列中的位置 xrecvx uint // 队列下标指示元素从队列列的该位置读出 recvq waitq // 等待读消息的goroutine队列sendq waitq // 等待写消息的goroutine队列lock mutex // 互斥锁chan不允许并发读写
} type waitq struct {first *sudoglast *sudog
}从数据结构可以看出channel由队列、类型信息、goroutine等待队列组成。
2、实现方式
创建channel 有两种一种是带缓冲的channel一种是不带缓冲的channel
// 带缓冲
ch : make(chan Task, 6)
// 不带缓冲
ch : make(chan int)
下图展示了可缓存6个元素的channel底层的数据模型如下图
func makechan(t *chantype, size int) *hchan {elem : t.elem...
} dataqsiz指向队列的长度为6即可缓存6个元素buf指向队列的内存队列中还剩余两个元素qcount当前队列中剩余的元素个数sendx指后续写入元素的位置recvx指从该位置读取数据
等待队列
从channel中读数据如果channel缓冲区为空或者没有缓冲区当前goroutine会被阻塞向channel中写数据如果channel缓冲区已满或者没有缓冲区当前goroutine会被阻塞。
被阻塞的goroutine将会被挂在channel的等待队列中
因读阻塞的goroutine会被向channel写入数据的goroutine唤醒因写阻塞的goroutine会被从channel读数据的goroutine唤醒
下面展示了一个没有缓冲区的channel有几个goroutine阻塞等待数据 注意一般情况下recvq和sendq至少有一个为空。只有一个例外那就是同一个goroutine使用select语句向channel一边写数据一边读数据。
3、向channel写数据
ch : make(chan int, 3)
创建通道后的缓冲通道结构
hchan struct {qcount uint : 0 dataqsiz uint : 3 buf unsafe.Pointer : 0xc00007e0e0 elemsize uint16 : 8 closed uint32 : 0 elemtype *runtime._type : {size:8 ptrdata:0 hash:4149441018 tflag:7 align:8 fieldalign:8 kind:130 alg:0x55cdf0 gcdata:0x4d61b4 str:1055 ptrToThis:45152}sendx uint : 0 recvx uint : 0 recvq runtime.waitq : {first:nil last:nil}sendq runtime.waitq : {first:nil last:nil}lock runtime.mutex : {key:0}
} 写入数据ch - 3底层hchan数据流程如图 1、锁定整个通道结构。
2、确定写入如果recvq队列不为空说明缓冲区没有数据或者没有缓冲区此时直接从recvq等待队列中取出一个Ggoroutine并把数据写入最后把该G唤醒结束发送过程
3、如果recvq为Empty则确定缓冲区是否可用。如果可用从当前goroutine复制数据写入缓冲区结束发送过程。
4、如果缓冲区已满则要写入的元素将保存在当前正在执行的goroutine的结构中并且当前goroutine将在sendq中排队并从运行时挂起进入休眠等待被读goroutine唤醒。
5、写入完成释放锁。
这里我们要注意几个属性buf、sendx、lock的变化。 3、从channel读取操作
几乎和写入操作相同
func goRoutineA(a -chan int) {val : -afmt.Println(goRoutineA received the data, val)
}
底层hchan数据流程如图 1、先获取channel全局锁
2、如果等待发送队列sendq不为空有等待的goroutine 1若没有缓冲区直接从sendq队列中取出Ggoroutine直接取出goroutine并读取数据然后唤醒这个goroutine结束读取释放锁结束读取过程 2若有缓冲区说明此时缓冲区已满从缓冲队列中首部读取数据再从sendq等待发送队列中取出G把G中的数据写入缓冲区buf队尾结束读取释放锁
3、如果等待发送队列sendq为空没有等待的goroutine 1若缓冲区有数据直接读取缓冲区数据结束读取释放锁。 2没有缓冲区或缓冲区为空将当前的goroutine加入recvq排队进入睡眠等待被写goroutine唤醒。结束读取释放锁。
流程图
ecvq和
recvq和sendq 结构
recvq和sendq基本上是链表看起来基本如下