并发任务的固定模式 定时任务并发

虽然我们可以自己使用 chan 来实现很多并发的框架和模式,但实际应用的时候先查查 sync 包总是没错的,比如 sync/atomic 包对原子操作进行了支持,上一个章节的 *x = *x +1,其实可以直接使用 atomic.AddUint64(&x, 1) 来实现。

标准库 sync.Once 可以实现只执行一次的功能,比如你指向创建一个对象(单件)。下面这个 born 函数永远只会造同一个人:

type person struct{}
var instance *person
var once sync.Once
func born() *person {
once.Do(func() {
fmt.Println("构建 instance 对象")
instance = &person{}
})
return instance
}

我们启动 10 个 goroutine 来测试一下:

func getInstance() {
wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
instance := born()
fmt.Printf("%p\n", instance) // 0x1254790
}()
}
wg.Wait()
}

每次获取到的 instance 的地址都是一样的,说明是同一个对象,它的实现机制如下:

func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

首先是对通过 mutex 和 atomic.StoreUint32 配合来实现,看到这里 doSlow 函数就奇怪了,你不是有 mutex 吗? 为什么还要来一发 aotmic 的函数操作呢? doSlow 函数名其实已经说明了一切。doSlow 说明这个函数很慢,因为 mutex 是一个成本很高的操作,而整数的原子读写却轻量很多,所以 atomic.LoadUint32 先判断了一下 o.done 是不是为 0,不是的话就滚蛋了,少量的竞争者最后能到达 if o.done == 0 的判断,这个提前的判断提高了性能(请注意 defer 语句执行的顺序,对 mutex 解锁 Unlock 比设置 o.done 为 1 的操作后执行。

sync/atomic 不仅能对基本的数值类型进行原子操作,atomic.Value 还能对 interface{} 数据类型进行操作。

既然系统包这么好用,为什么不把所有的应用模式封装完呢? 有时候系统包并不好用,看下面的 mutex 的例子:

func main() {
var mu sync.Mutex
go func(){
fmt.Println("NO.1中国人")
mu.Lock()
}()
mu.Unlock()
}

程序的意图简洁清晰明了,看起来好像没有什么问题,但是执行大概率是报错:

fatal error: sync: unlock of unlocked mutex

因为 go func 的 goroutine 和 main 的 goroutine 执行顺序并不能保证谁先谁后,mu.Unlock 的时候 mu.Lock 还未执行,所以 Unlock 直接抛出了错误。要修改它也很麻烦,要这样写:

func main() {
var mu sync.Mutex
mu.Lock
go func(){
fmt.Println("NO.1中国人")
mu.Unock()
}()
mu.lock()
}

是不是不好理解? 第二个 mu.Lock 的时候会等着 mu.Unlock 先执行,有点绕口。但是如果使用无缓冲的通道来重构代码不但简单而且很可靠:

func main() {
exit := make(chan int)
go func() {
fmt.Println("NO.1中国人")
<-exit
}()
exit <- 1
}

为什么很可靠? 因为 exit 没有缓冲,不管 exit <- 1 先执行到 还是 <-exit 先执行到,都得等待同步交接,咱们谁先到都见面聊。这个例子中 <- exit 和 exit <- 1 的位置可以交换,他们的读写本身并没有意义,chan 的类型也是无意义的,用 string、struct{} 都行,但是 chan 完成了比 mutex 很易懂的代码。把这个例子扩展一下,启动指定数量的任务:

func lockBatch(taskCount int) {
waitGroup := make(chan struct{}, taskCount)
for i := 0; i < cap(waitGroup); i++ {
go func(index int) {
fmt.Printf("NO.1中国人%d\n", index)
waitGroup <- struct{}{}
}(i)
}
for i := 0; i < cap(waitGroup); i++ {
<-waitGroup
}
}

这不就是 sync.WaitGroup 干的事情吗? 这就是框架,或者说模式。

这个模式可以启动 N 个协程,来做 N 件事情,每个协程和任务的比例是 1:1,不过太多的协程可能不利于管理和回收,能不能抽象出 N 个协程做 M 件事情的模式呢? 为了让 goroutine 可以不断的做任务,可以使用一个有缓冲的通道来保存任务,极端的情况是只有一个 goroutine 来做任务,它不断的从缓冲通道里取出任务执行,知道通道关闭为止,由于不同的 goroutine 可以安全的从通道中读取任务,因此可以随意调节 goroutine 的数目而不用修改代码。定一个 handle 函数表示做任务的工人:

var wg sync.WaitGroup
func handle(tasks chan string, index int) {
defer wg.Done()
for {
// 读取关闭的无缓冲通道,返回零值和false
// 读取关闭的有缓冲通道,先把缓存数据读完后,再读取返回零值和false
task, ok := <-tasks
if !ok {
// 任务通道关闭了,不会再有新任务了
fmt.Printf("handle %d over\n", index)
return
}
fmt.Printf("干活儿 %s By 工人 %d\n", task, index)
}
}

tasks 表示任务的需求,如果只需要一个工人,直接 go handle 就行了,多个工人只需一个循环,工人下班的标志依靠 wg 来保证,干完活就能下班,而 tasks 通道用来接收任务。

func createTasks(goroutines, taskCount int) {
tasks := make(chan string, taskCount)
// 启动 N
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go handle(tasks, i)
}
// 设置 M
for k := 0; k <= taskCount; k++ {
tasks <- fmt.Sprintf("任务 %d", k)
}
close(tasks)
wg.Wait()
}

参数 goroutines 是工人的个数(协程的个数),而 taskCount 是任务的数量。需要注意的时候,在通道 close 关闭后,通道中的数据依然可以读取直到读完后再次读取事 task 是零值(这里是空字符串,因为 chan 的类型是 string),ok 为 false,也就是没有任务了,工人都可以下班了(wg.Done),而 wg.Wait 会等所有工人都下班后,才把工厂的门关上(函数返回)。如此,就完成了 N:M 的任务模型,这里 N 和 M 可以通过调用参数传递数量,而你不用修改内部的代码。

进一步延伸这个例子,就是生产者和消费者,而生产者和消费者的数量可以是任何比例。

func producer(name string, tasks chan<- string) {
index := 0
for {
tasks <- fmt.Sprintf("%s-task-%d", name, index)
time.Sleep(50 * time.Millisecond)
index++
}
}
func consumer(name string, tasks <-chan string) {
for task := range tasks {
fmt.Printf("%s-%s has been token\n", name, task)
time.Sleep(50 * time.Millisecond)
}
}

producer 生产者是个是循环,源源不断的造东西,而 consumer 一直从 tasks 里取货,模拟一个工厂的代码:

func createFactory(stockSize, producerCount, consumerCount int) {
tasks := make(chan string, stockSize)
for i := 0; i <= producerCount; i++ {
go producer(fmt.Sprintf("p%d", i), tasks)
}
for i := 0; i <= consumerCount; i++ {
go consumer(fmt.Sprintf("c%d", i), tasks)
}
exit := make(chan os.Signal, 1)
// ctrl+c 中断程序
signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)
<-exit
}

stockSize 是车间大小,表示了最多能存多少货,producerCount 是生产者个数,consumerCount 是消费者个数,当生产者 pdoducer 足够多的时候,把仓库 stockSize 塞满了就阻塞了没法再生产了,除非消费者 consumer 来拿走一些;反之如果消费者足够多的时候,仓库里没货也只能等着了,除非生产者 producer 造点货出来。

为了达到平衡,如果希望生产者以稳定的效率工作,不多不少,如何控制呢? 非常简单:

func producer2(max int, tasks chan<- string, doSth func() string) {
lines := make(chan int, max)
for {
lines <- 1
tasks <- doSth()
<-lines
}
}

max 表示最大的生产线,每次开一个生产线,就往通道里计数一个,生产完了,再从通道取出一个,因为通道最大是 max,满了就阻塞了,随时都保证了只有 max 条生产线存在。

下面的这个例子实现的功能是,等待一个对象完成自己的工作,和前面不同的是,它把 chan 的状态作为对象的一部分:

type request struct {
data []interface{} // 模拟要处理的数据
finish chan struct{} // 完成后的标志
}
func newRequest(data ...interface{}) *request {
return &request{data, make(chan struct{}, 1)}
}
func doWork(req *request) {
time.Sleep(1 * time.Second) // 模拟耗时
fmt.Println(req.data) // 工作内容就是打印
close(req.finish) // req.finish <- struct{}{} 也行
}
func createWork() {
req := newRequest(1, "string", true)
go doWork(req)
for i := 0; i < 100; i++ {
time.Sleep(50 * time.Millisecond)
fmt.Println("继续做事,我很忙")
}
<-req.finish
}

createWork 函数通过并发提高了处理效率,单独开了一条线去处理 request,同时它可以继续往后做事,但是它保证在函数返回前 req 对象肯定被处理了。关于 chan 和并发,还有很多有用的固定模式,希望这几个例子可以抛砖引玉。

本章节的代码 https://github.com/developdeveloper/go-demo/tree/master/16-concurrent-pattern

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注