网站开发 属于研发费用吗,wordpress怎么删除主题,网站开发属于IT行业,中山网站建设文化如何概述
Go的Routines并发模型是基于CSP#xff0c;如果你看过七周七并发#xff0c;那么你应该了解。
什么是CSP#xff1f;
Communicating Sequential Processes#xff08;CSP#xff09;这个词组的含义来自其英文直译以及在计算机科学中的使用环境。
CSP…概述
Go的Routines并发模型是基于CSP如果你看过七周七并发那么你应该了解。
什么是CSP
Communicating Sequential ProcessesCSP这个词组的含义来自其英文直译以及在计算机科学中的使用环境。
CSP是 Tony Hoare 在1978年提出的论文地址在Communicating sequential processes | Communications of the ACM
拆字解释下
Communicating Sequential Processes(CSP)的三个单词 C for Communicating: 通信什么的通信那进程/线程/协程的通信。 S for Sequenctial: 顺序的什么的顺序进程/线程/协程之间执行任务时应该是有顺序的完全并行执行是理想化的现实中就是要先指定完第一个或者第一批任务才能执行第二个或者第二批任务。 P for Processses: 进程这个是进程估计是因为这个概念提出来的时候比较早。我们这儿得抽象一下Processes指的是进程/线程/协程。
那么我们来总结一下CSPCSP就是多个能够进行通信并且按照顺序执行任务的独立进程。这些进程在各自执行自己的任务的时候还可以通过某种方式是进行通信。
在Golang中就是通过Channel进行通信。
好了CSP解释完了我们来看Go中的Channel另外CSP的参与者Go Routine我在之前的文章中有提到过大家可以去逐步学习Go-协程goroutine
这张图就描述了CSP编程模型。 Go中routine代表图中的ProcessChannel就是goroutine之间的连接。通道可以让一个goroutine发送信息到另一个goroutine。 Go中的channel
Go中Channel有两种类型
无缓冲Channel(Unbuffered)有缓冲Channel(Buffered) 有缓冲的Channel其实就是一个环形缓冲队列无缓冲的没有队列因为读写都会阻塞。
Channel的定义
var channel名称 chan channel类型// 类型自动推断
channel名称 : make(chan channel类型, buffer数量int可以为0)
COPY
比如我们可以这样来定义:
// 定义了一个channel还没有make不确定是否为有缓冲和无缓冲channel
var ch chan int// 定义了一个chnnel, 容量为0无缓冲channel
ch : make(chan int, 0)// 定义了一个channel容量为1有缓冲channel
ch : make(chan int, 1)
COPY
我们实际使用的时候把Channel理解为队列就可以了。
Go中的Channel有两种类型
无缓冲channel有缓冲channel
无缓冲和有缓冲的特性如下
无缓冲Channel 无缓冲Channel没有存储数据的能力发送方向Channel中发送数据的时候发送方会阻塞直到有接受者接受这个数据无缓冲Channel典型应用就是go协程同步通信无缓冲Channel保证通信双方都要准备好数据交换有缓冲Channel 有缓冲Channel需要定义Channel的容量发送方向有缓冲Channel发送数据的时候只有容量满的时候才会阻塞接收方只有在有缓冲Channel为空时才会阻塞有缓冲通道的典型应用场景是生产者和消费者
Channel的操作
Channel主要支持2中操作
发送(send)接收(recv)
这三种操作在代码中的的定义和使用
发送和接收都使用-
来看代码
// 先定义一个无缓冲channel
ch : make(chan int, 0)
ch : make(chan int)// 发送数据到channel
ch - 1// 从channel中接收数据
- ch
COPY
我们看到发送和接收都是使用-,差别在于
ch在-的左边操作为发送ch在-的右边操作为接收
另外channel在使用之前都要先创建使用完毕后要关闭分别使用make和close关闭。
// 创建相当于分配channelallocation
ch : make(chan int, 0)// 关闭channel释放channel资源
defer close(ch)
COPY
channel创建完直接关闭了还能操作发送和接收吗
这个问题我们通过写代码来测试我们先来测试发送然后再测试接收。 发送数据到关闭的Channel func TestUnbufferedChannel_ShouldPanic_whenWriteValueToAClosedChannel(t *testing.T) {f : func() {ch : make(chan int)close(ch)ch - 1
}assert.Panics(t, f, should panic)
} COPY 运行截图
我们的UT PASS了表示发生了panic这就说明我们不能向已经关闭的channel发送数据。
在已经关闭的Channel上接收 func TestUnbufferedChannel_ShouldSuccess_whenRecvValueAtAClosedChannel(t *testing.T) {ch : make(chan int)close(ch)var val -chassert.Equal(t, 0, val)
}func TestUnbufferedChannel_ShouldSuccess_whenRecvEmptyValueAtAClosedChannel(t *testing.T) {ch : make(chan string)close(ch)var val -chassert.Equal(t, , val)
}
COPY
运行截图 这两个UT都可以PASS我只截图了一个PASS,这说明我们可以在一个关闭的channel上接收数据只是接收到的都是0值。关于0值要特别说明一下0值是针对不同类型的比如int的0值就是0string的0值就是空字符串指针的0值就是nil看下面代码 并非“任何后续的接收操作都将立即返回零值”而是当channel中所有已发送的值都被接收后接下来的接收操作会立即返回零值。
无缓冲channel
无缓冲通道顾名思义就是没有数据缓冲能力的Channel有goroutine向无缓冲Channel发送了数据就必须有另一个goroutine来接受否则发送的goroutine会阻塞反之有goroutine从这个channel接受数据而没有另一个goroutine向这个channel发送那么接受的goroutine也会阻塞。
应用场景
部分任务需要同步就用无缓冲channel
来看场景代码
有发送无接受
发送goroutine会被阻塞。 func TestUnbufferedChannel_ShouldWriteTimeout_WhenNoRoutineReadTheChannel(t *testing.T) {// 创建无缓冲channelc : make(chan int)is_timeout : falsetry_to_write_value : 1// whenselect {// 直接向channel中发送case c - try_to_write_value:case -time.After(3 * time.Second):// shouldis_timeout true}assert.True(t, is_timeout)}COPY 有接受无发送
接收goroutine会被阻塞 func TestUnbufferedChannel_ShouldReadTimeout_WhenNoValueWriteToChannel(t *testing.T) {// 创建无缓冲channelc : make(chan int)is_timeout : falseselect {// 直接接受channel中的数据case -c:case -time.After(3 * time.Second):// shouldis_timeout true}// 三秒后超时assert.True(t, is_timeout)}COPY
有发送有接受
有发送有接收一切正常。
func sum(s []int, c chan int) {sum : 0for _, v : range s {sum v}// 将累加结果发送到channelc - sum
}func TestUnbufferedChannel_ShouldRecvValues_WhenWriteValueToChannel(t *testing.T) {// 创建无缓冲channelc : make(chan int)// givens : []int{1, 2, 3, 4, 5, 6}// when// 执行数组累加go sum(s[:], c)ret1 : -c// should// 和应该是21assert.Equal(t, 21, ret1)
}
COPY 使用无缓冲Channel控制并发 // 先定义一个worker函数
// worker函数从无缓冲channel中接收
// 可以接到到数据就执行后面的打印内容
// 打印完成后退出
func worker(id int, lock chan bool) {var shouldRun -lockif shouldRun {fmt.Printf(time: %v Worker %d is working\n, time.Now(), id)time.Sleep(time.Second)fmt.Printf(time: %v Worker %d has finished\n, time.Now(), id)}
}func TestUnbufferedChannel_ShouldRunOneByOne_When(t *testing.T) {lock : make(chan bool, 1)// 启动5个goroutine等待释放接收for i : 0; i 5; i {go worker(i, lock)}// 发送5个true到channelfor i : 0; i 5; i {lock - truetime.Sleep(time.Second)}close(lock)time.Sleep(10 * time.Second)
}COPY 使用无缓冲Channel实现CompleteFuture.anyOf()
CompleteFuture.anyOf() 是 Java 中的一个函数它返回一个新的 CompletableFuture当给定的任何 CompletableFuture 完成时返回的 CompletableFuture 也完成并带有完成的 CompletableFuture 的结果。 // future函数使用time.Sleep模拟实际业务处理延迟
// 业务处理完成后将业务数据写入无缓冲Channel
func future(id int, delay time.Duration, resChan chan int) {time.Sleep(delay)fmt.Printf(Hi, I have finished my task, my id is %d\n, id)resChan - id
}// 接收一系列上面的future, 然后使用go routine启动这些future函数并将结果写入到result channel最后再返回result channel。
func anyOf(futures ...-chan int) -chan int {result : make(chan int)for _, future : range futures {go func(f -chan int) {result - -f}(future)}return result
}func TestAnyOf_ShouldSuccess(t *testing.T) {// 创建无缓冲的 channelresChan1 : make(chan int)resChan2 : make(chan int)resChan3 : make(chan int)// 启动 goroutinesgo future(1, 3*time.Second, resChan1)go future(2, 2*time.Second, resChan2)go future(3, 5*time.Second, resChan3)result : anyOf(resChan1, resChan2, resChan3)assert.Equal(t, 2, -result)
}COPY
上面有两个比较让人纠结的语法
-chan intresult - -f
-chan int表示只读通道anyOf只能读取通道内的数据有了只读就有只写只写通道chan- intresult - -f表示从通道f中接收数据并将数据写入到result通道。这一行相当于执行了 v : -f
result - v COPY
有缓冲channel
有缓冲channel就是你可以暂时把数据发送到channel如果channel的缓冲区没有被占用完就不会阻塞缓冲区被占用完了就被阻塞了。
特性
发送goroutine在缓冲区没有用完之前不会阻塞缓冲区被使用完了之后发送goroutine就会被阻塞接受goroutine在缓冲区有数据时不会阻塞缓冲区没有数据时会被阻塞
有缓冲channel应用场景是什么
任务队列就是最典型的场景生产者消费者模型其他无缓冲channel搞不定的就用有缓冲channel
实现一个有缓冲channel的RateLimiter
import (syncsync/atomictestingfmttimegithub.com/stretchr/testify/assert
)type RateLimiter struct {tokens chan struct{}refillTicker *time.TickercloseCh chan struct{}
}func NewRateLimiter(rate int) *RateLimiter {r : RateLimiter{tokens: make(chan struct{}, rate),refillTicker: time.NewTicker(time.Second / time.Duration(rate)),closeCh: make(chan struct{}),}go r.refill()return r
}func (r *RateLimiter) refill() {for {select {case -r.refillTicker.C:select {case r.tokens - struct{}{}:default:}case -r.closeCh:r.refillTicker.Stop()return}}
}func (r *RateLimiter) Acquire() {-r.tokens
}func (r *RateLimiter) TryAcquire() bool {select {case -r.tokens:return truedefault:return false}
}func (r *RateLimiter) Close() {close(r.closeCh)
}func myTask(id int) {fmt.Printf(time: %v workder %d is working\n, time.Now(), id)time.Sleep(20 * time.Millisecond)fmt.Printf(time: %v workder %d has finished\n, time.Now(), id)
}func TestRateLimiter_ShouldPermitWithBlocking_WhenRequestOnce(t *testing.T) {rateLimiter : NewRateLimiter(100)startTime : time.Now()for i : 0; i 1; i {rateLimiter.TryAcquire()myTask(i)}endTime : time.Now()elapsedTime : endTime.Sub(startTime)fmt.Printf(elapsed time: %v\n, elapsedTime)fmt.Printf(explect time: %v\n, 300*time.Millisecond)assert.True(t, elapsedTime 300*time.Millisecond)
}func TestRateLimiter_ShouldLimitPermits_WhenGivenLimitedResource(t *testing.T) {var counter int32 0rateLimiter : NewRateLimiter(100)wg : sync.WaitGroup{}startTime : time.Now()for i : range 1000 {wg.Add(1)go func() {rateLimiter.Acquire()myTask(i)atomic.AddInt32(counter, 1)wg.Done()}()}wg.Wait()endTime : time.Now()elapsedTime : endTime.Sub(startTime)fmt.Printf(elapsed time: %v\n, elapsedTime)fmt.Printf(should greater than explect time: %v\n, 10*time.Second)assert.Equal(t, counter, int32(1000))assert.True(t, 10*time.Second elapsedTime)
}COPY 实现无缓冲Channel实现Java中的CyclicBarrier
CyclicBarrier 是一个同步工具它允许一组线程互相等待直到他们都到达了一个共同的屏障点。在涉及固定大小的线程团队必须偶尔相互等待的程序中CyclicBarriers 非常有用。之所以称之为“循环”屏障是因为在等待的线程被释放之后它可以被重复使用。
await() 所有的参与者都调用了wait方法后返回或者被中断 我们就实现这个await方法暂时不支持中断代码如下 package mainimport (fmtsyncsync/atomictime
)// CyclicBarrier 让一组goroutine在到达某个点之后才能继续执行
type CyclicBarrier struct {// 总goroutine数量participant int// 用于等待所有goroutine准备好waitGroup sync.WaitGroup// 无缓冲channel用于goroutine间同步barrierChan chan struct{}running int32
}// NewCyclicBarrier 创建一个新的CyclicBarrier
func NewCyclicBarrier(participant int) *CyclicBarrier {b : CyclicBarrier{participant: participant,barrierChan: make(chan struct{}),running: int32(participant),}// 设置等待的goroutine数b.waitGroup.Add(participant)return b
}// 当一个goroutine调用Wait时
// 它将在屏障处等待
// 直到所有goroutine都到达这里
func (b *CyclicBarrier) Wait() {// 一个goroutine准备好了b.waitGroup.Done()// 等待所有goroutine都准备好b.waitGroup.Wait()// 当所有goroutine都准备好了关闭channel进行广播通知if atomic.AddInt32(b.running, -1) 0 {close(b.barrierChan)} else {// 等待通知-b.barrierChan}}// 阻塞调用goroutine直到所有goroutine都调用了Wait方法
// 屏障开放后重新置为待关闭状态
func (b *CyclicBarrier) Await() {// 等待屏障开放的信号-b.barrierChan// 重置屏障状态b.barrierChan make(chan struct{})b.waitGroup.Add(b.participant)
}func (b *CyclicBarrier) Close() {close(b.barrierChan)
}func main() {// 这里我们设置3个goroutine参与barrier : NewCyclicBarrier(100)for i : 0; i 100; i {go func(i int) {fmt.Printf(Goroutine %d is working...\n, i)// 模拟工作time.Sleep(time.Duration(i1) * time.Second)fmt.Printf(Goroutine %d reached the barrier.\n, i)barrier.Wait()fmt.Printf(Goroutine %d passed the barrier.\n, i)}(i)}// 主goroutine等待所有goroutine都到达屏障barrier.Await()fmt.Println(All goroutines have passed the barrier)
}COPY
参考
go/src/runtime/chan.go at master · golang/go · GitHub逐步学习Go-并发通道chan(channel) – FOF编程网
编写不易如有问题请评论告知