golang并发模式的思考

原创内容,转载请注明出处

Posted by Weakyon Blog on October 6, 2017

golang的使用一直是野路子。在看了几个国外开源项目(nsq,skynetservices)以后,才发现管道的重要性。

其实在公司的大型项目开发过程中早就意识到了这个问题,那就是mutex的使用是可以完全被channel所替代的。

但是channel的一些场景用mutex去实现会非常复杂。

一 实际开发中的例子

举一个资源池的例子

package main

import (
	"fmt"
	"sync"
	"time"
)

type Pool struct {
	ids		[]int
	maxSize int
	lock	sync.Mutex
}

func NewPool() (*Pool, error){
	p := &Pool{}
	return p, nil
}

func (this *Pool) Get() int{
	this.lock.Lock()
	defer this.lock.Unlock()
	if len(this.ids) == 0 {
		this.maxSize++
		return this.maxSize
    }
	i := this.ids[0]
	this.ids = this.ids[1:]
	return i
}

func (this *Pool) Put(i int) {
	this.lock.Lock()
	defer this.lock.Unlock()
	this.ids = append(this.ids, i)
	return
}

func main() {
	p,_ := NewPool()
	for i := 0;i < 3;i++ {
		go func() {
			for ;; {
				i := p.Get()
				fmt.Println(i)
				p.Put(i)
				time.Sleep(time.Second)
            }
        }()
    }
	time.Sleep(1000 * time.Second)
}

这里是一个简单的资源池,利用mutex互斥来getput资源。

这种情况下mutex完全能够胜任。

但是如果情况更复杂一些呢?

1.资源池有大小限制

2.资源的创建耗时较久,get资源时需要超时

此时用mutex去实现也可以,但是会很复杂,然而使用channel则简单的多

package main

import (
	"fmt"
	"time"
)

type getMsg struct {
	rch	chan int
}

type putMsg struct {
	res int
}

const max_size = 100

func (this *Pool) get(m *getMsg) {
	if len(this.ids) != 0 {
		i := this.ids[0]
		this.ids = this.ids[1:]
		m.rch <- i
		return
    }
	if this.counts >= max_size {
		this.getWaits = append(this.getWaits, m)
		return
    }
	time.Sleep(time.Second * 1)
	m.rch <- this.counts
	this.counts++
	return
}

func (this *Pool) put(m *putMsg) {
	if len(this.getWaits) != 0 {
		waitM := this.getWaits[0]
		waitM.rch <- m.res
		this.getWaits = this.getWaits[1:]
		return
    }
	this.ids = append(this.ids, m.res)
	return
}

type Pool struct {
	ids			[]int
	counts		int
    getWaits	[]*getMsg
	getChan chan *getMsg
	putChan chan *putMsg
}

func NewPool() (*Pool, error){
	p := &Pool{}
	p.getChan = make(chan *getMsg)
	p.putChan = make(chan *putMsg)
	go p.loop()
	return p, nil
}

func (this *Pool) Get() int{
	m := &getMsg{}
	m.rch = make(chan int, 1)
	this.getChan <- m

	return <-m.rch
}

func (this *Pool) Put(i int) {
	m := &putMsg{}
	m.res = i
	this.putChan <- m
}

func (this *Pool) loop() {
	for ;; {
		select {
		case m := <-this.getChan:
			this.get(m)
		case m := <-this.putChan:
			this.put(m)
        }
    }
}

func main() {
	p,_ := NewPool()
	for i := 0;i < 300;i++ {
		go func(index int) {
			for ;; {
				t := time.Now()
				i := p.Get()
				fmt.Println(index, i, time.Since(t))
				p.Put(i)
            }
        }(i)
    }
	time.Sleep(1000 * time.Second)
}
  1. channel的这个资源池实现,完成了大小限制的功能

  2. 加入超时功能也很简单

func (this *Pool) Get(dur time.Duration) (int, error){
	m := &getMsg{}
	m.rch = make(chan int, 1)
	this.getChan <- m

	select {
    case i := <-m.rch:
        return i, nil 
    case <-time.After(dur):
        return 0, fmt.Errorf("timeout")
    }
}

3.甚至还有一些优化

func (this *Pool) get(m *getMsg) {
    ...
	if this.counts >= max_size {
		this.getWaits = append(this.getWaits, m)
		return
    }
	done := make(chan int)
	dealDone := func() {
		i := <-done
		putM := &putMsg{}
		putM.res = i
		this.put(putM)
	}
	go func() {
		time.Sleep(time.Second * 1)
		i := int(time.Now().UnixNano())
		this.counts++
		done <- i
    }()
	select {
	case i := <-done:
		m.rch <- i
	case putM := <-this.putChan:
		m.rch <- putM.res
		dealDone()	
    }
	return
}

将阻塞的创建行为异步化,当创建未能成功,但是已经put释放了资源时。

将直接使用新的资源


可以看到,当使用channel实现时整个都是无锁化的。并且实现一些复杂功能时代码很直观。

可是如何选择golang中是使用channel还是mutex呢?大部分语言都是有mutex的支持来实现并发的,这种设计方式肯定是经过时间考验的。

那么golang为什么要搞出来channel这么一个“野路子”呢?这里得从golang并发模式的哲学开始说

二 golang的并发模式

在2012年的googleIO上,有一个关于并发模式的talks

Go videos from Google I/O 2012,这一篇的Go concurrency patterns

还有这一篇是一个补充,形象说明了golang的并发设计模式

https://blog.golang.org/concurrency-is-not-parallelism

这两个talks解释了协程+Channel并发模式的”Where does the idea come from”

Why?
Look around you. What do you see?
Do you see a single-stepping world doing one thing at a time?
Or do you see a complex world of interacting, independently behaving pieces?
That's why. Sequential processing on its own does not model the world's behavior.

以现实生活作为参考。这个世界不是单步运行的,而是将独立的行为组合起来的复杂世界。

协程+Channel对应完成了并发+通信的模式,

并发是指将程序分解成小片段独立执行的设计方法,通信是指的各个小片段之间的合作

这是golang的并发设计模式,包括Erlang等语言都是基于这种CSP模式(C. A. R. Hoare: Communicating Sequential Processes (CACM 1978))

可以看到,这是有理论支持的。

通过这种方式,可以将一个demo很简单的拓展成一个快速健壮的程序

三 MutexOrChannel

googleIO的这一篇talks还特别指出了Channel的使用场景

multiple inputs
multiple outputs
timeouts
failure

在多项输入输出时(和通信有关的程序为主),涉及到超时逻辑时,以及错误的传递(return层层传递错误当然没有管道来的方便)时,使用Channel会很方便

但是channel也不是万能的,talks说到滥用channel也是很麻烦的,例如一个计数器,原子锁很简单,非要用channel就很麻烦了

google有一篇文章专门讲了这个问题

https://github.com/golang/go/wiki/MutexOrChannel

这里对mutex和channel做了一个归纳

As a general guide, though:

Channel                             Mutex
passing ownership of data,          caches,
distributing units of work,         state,
communicating async results,

在传递数据的所有权,分发任务单位,等待异步结果时使用channel

在缓存数据的获取时(???没理解),在表达状态切换时,使用mutex

总结

上面都是举得一些例子,如果要总结的话。

有一句很有名的话 “Don’t communicate by sharing memory, share memory by communicating.”

其实不太完善

当你需要共享数据,而且共享数据这部分不会阻塞时(耗时较少),可以用mutex。

但是当如果耗时较多,那么在golang中使用channel来完成异步附带超时的设计较好。

参考资料:

Golang里的Future/Promise

golang实践-异步系统的无锁

google io 2012-Go concurrency patterns

https://blog.golang.org/concurrency-is-not-parallelism

google io 2013-Advanced Go Concurrency Patterns

https://blog.golang.org/pipelines

https://blog.golang.org/share-memory-by-communicating

channel独木难支

06 Oct 2017