2025年3月28日 星期五 甲辰(龙)年 月廿七 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Go语言

Go Channel 应用模式

时间:12-14来源:作者:点击数:4
城东书院 www.cdsy.xyz

Channel 是 Go 中的一种类型,和 goroutine 一起为 Go 提供了并发技术, 它在开发中得到了广泛的应用。Go 鼓励人们通过 Channel 在 goroutine 之间传递数据的引用(就像把数据的 owner 从一个 goroutine 传递给另外一个 goroutine), Effective Go 总结了这么一句话:​

Do not communicate by sharing memory; instead, share memory by communicating.

在 Go 内存模型 指出了 channel 作为并发控制的一个特性:

A send on a channel happens before the corresponding receive from that channel completes. (Golang Spec)

除了正常的在 goroutine 之间安全地传递共享数据, Channel 还可以玩出很多的花样(模式), 本文列举了一些 channel 的应用模式。

促成本文诞生的因素主要包括:

  1. eapache 的 channels 库
  2. concurrency in go 这本书
  3. Francesc Campoy 的 justforfun 系列中关于 merge channel 的实现
  4. 我在出版 Scala 集合手册这本书中对 Scala 集合的启发

下面就让我们以实例的方式看看这么模式吧。

Lock/TryLock 模式

我们知道, Go 的标准库 sync 有 Mutex ,可以用来作为锁,但是 Mutex 却没有实现 TryLock 方法。

我们对于 TryLock 的定义是当前 goroutine 尝试获得锁, 如果成功,则获得了锁,返回 true, 否则返回 false。我们可以使用这个方法避免在获取锁的时候当前 goroutine 被阻塞住。

本来,这是一个常用的功能,在一些其它编程语言中都有实现,为什么 Go 中没有实现的? issue#6123 有详细的讨论,在我看来,Go 核心组成员本身对这个特性没有积极性,并且认为通过 channel 可以实现相同的方式。

Hacked Lock/TryLock 模式

其实,对于标准库的 sync.Mutex 要增加这个功能很简单,下面的方式就是通过 hack 的方式为 Mutex 实现了 TryLock 的功能。

  • const mutexLocked = 1 << iota
  • type Mutex struct {
  • mu sync.Mutex
  • }
  • func (m *Mutex) Lock() {
  • m.mu.Lock()
  • }
  • func (m *Mutex) Unlock() {
  • m.mu.Unlock()
  • }
  • func (m *Mutex) TryLock() bool {
  • return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.mu)), 0, mutexLocked)
  • }
  • func (m *Mutex) IsLocked() bool {
  • return atomic.LoadInt32((*int32)(unsafe.Pointer(&m.mu))) == mutexLocked
  • }

如果你看一下 Mutex 实现的源代码,就很容易理解上面的这段代码了,因为 mutex 实现锁主要利用 CAS 对它的一个 int32 字段做操作。

上面的代码还额外增加了一个 IsLocked 方法,不过这个方法一般不常用,因为查询和加锁这两个方法执行的时候不是一个原子的操作,素以这个方法一般在调试和打日志的时候可能有用。

TryLock By Channel

既然标准库中不准备在 Mutex 上增加这个方法,而是推荐使用 channel 来实现,那么就让我们看看如何使用 channel 来实现。

  • type Mutex struct {
  • ch chan struct{}
  • }
  • func NewMutex() *Mutex {
  • mu := &Mutex{make(chan struct{}, 1)}
  • mu.ch <- struct{}{}
  • return mu
  • }
  • func (m *Mutex) Lock() {
  • <-m.ch
  • }
  • func (m *Mutex) Unlock() {
  • select {
  • case m.ch <- struct{}{}:
  • default:
  • panic("unlock of unlocked mutex")
  • }
  • }
  • func (m *Mutex) TryLock() bool {
  • select {
  • case <-m.ch:
  • return true
  • default:
  • }
  • return false
  • }
  • func (m *Mutex) IsLocked() bool {
  • return len(m.ch) == 0
  • }

主要是利用 channel 边界情况下的阻塞特性实现的。

你还可以将缓存的大小从 1 改为 n,用来处理 n 个锁(资源)。

TryLock with Timeout

有时候,我们在获取一把锁的时候,由于有竞争的关系,在锁被别的 goroutine 拥有的时候,当前 goroutine 没有办法立即获得锁,只能阻塞等待。标准库并没有提供等待超时的功能,我们尝试实现它。

  • type Mutex struct {
  • ch chan struct{}
  • }
  • func NewMutex() *Mutex {
  • mu := &Mutex{make(chan struct{}, 1)}
  • mu.ch <- struct{}{}
  • return mu
  • }
  • func (m *Mutex) Lock() {
  • <-m.ch
  • }
  • func (m *Mutex) Unlock() {
  • select {
  • case m.ch <- struct{}{}:
  • default:
  • panic("unlock of unlocked mutex")
  • }
  • }
  • func (m *Mutex) TryLock(timeout time.Duration) bool {
  • timer := time.NewTimer(timeout)
  • select {
  • case <-m.ch:
  • timer.Stop()
  • return true
  • case <-time.After(timeout):
  • }
  • return false
  • }
  • func (m *Mutex) IsLocked() bool {
  • return len(m.ch) == 0
  • }

你也可以把它用 Context 来改造,不是利用超时,而是利用 Context 来取消/超时获得锁的操作,这个作业留给读者来实现。

Or Channel 模式

当你等待多个信号的时候,如果收到任意一个信号, 就执行业务逻辑,忽略其它的还未收到的信号。

举个例子, 我们往提供相同服务的 n 个节点发送请求,只要任意一个服务节点返回结果,我们就可以执行下面的业务逻辑,其它 n-1 的节点的请求可以被取消或者忽略。当 n=2 的时候,这就是 back request 模式。 这样可以用资源来换取 latency 的提升。

需要注意的是,当收到任意一个信号的时候,其它信号都被忽略。如果用 channel 来实现,只要从任意一个 channel 中接收到一个数据,那么所有的 channel 都可以被关闭了(依照你的实现,但是输出的 channel 肯定会被关闭)。

有三种实现的方式: goroutine、reflect 和递归。

Goroutine 方式

  • func or(chans ...<-chan interface{}) <-chan interface{} {
  • out := make(chan interface{})
  • go func() {
  • var once sync.Once
  • for _, c := range chans {
  • go func(c <-chan interface{}) {
  • select {
  • case <-c:
  • once.Do(func() { close(out) })
  • case <-out:
  • }
  • }(c)
  • }
  • }()
  • return out
  • }

or 函数可以处理 n 个 channel,它为每个 channel 启动一个 goroutine,只要任意一个 goroutine 从 channel 读取到数据,输出的 channel 就被关闭掉了。

为了避免并发关闭输出 channel 的问题,关闭操作只执行一次。

Reflect 方式

Go 的反射库针对 select 语句有专门的数据( reflect.SelectCase ) 和函数( reflect.Select ) 处理。

所以我们可以利用反射“随机”地从一组可选的 channel 中接收数据,并关闭输出 channel。

这种方式看起来更简洁。

  • func or(channels ...<-chan interface{}) <-chan interface{} {
  • switch len(channels) {
  • case 0:
  • return nil
  • case 1:
  • return channels[0]
  • }
  • orDone := make(chan interface{})
  • go func() {
  • defer close(orDone)
  • var cases []reflect.SelectCase
  • for _, c := range channels {
  • cases = append(cases, reflect.SelectCase{
  • Dir: reflect.SelectRecv,
  • Chan: reflect.ValueOf(c),
  • })
  • }
  • reflect.Select(cases)
  • }()
  • return orDone
  • }

递归方式

递归方式一向是比较开脑洞的实现,下面的方式就是分而治之的方式,逐步合并 channel,最终返回一个 channel。

  • func or(channels ...<-chan interface{}) <-chan interface{} {
  • switch len(channels) {
  • case 0:
  • return nil
  • case 1:
  • return channels[0]
  • }
  • orDone := make(chan interface{})
  • go func() {
  • defer close(orDone)
  • switch len(channels) {
  • case 2:
  • select {
  • case <-channels[0]:
  • case <-channels[1]:
  • }
  • default:
  • m := len(channels) / 2
  • select {
  • case <-or(channels[:m]...):
  • case <-or(channels[m:]...):
  • }
  • }
  • }()
  • return orDone
  • }

在后面的扇入(合并) 模式中,我们还是会使用相同样的递归模式来合并多个输入 channel,根据 justforfun 的测试结果,这种递归的方式要比 goroutine、Reflect 更有效。

Or-Done-Channel 模式

这种模式是我们经常使用的一种模式,通过一个信号 channel(done) 来控制(取消) 输入 channel 的处理。

一旦从 done channel 中读取到一个信号,或者 done channel 被关闭, 输入 channel 的处理则被取消。

这个模式提供一个简便的方法,把 done channel 和 输入 channel 融合成一个输出 channel。

  • func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
  • valStream := make(chan interface{})
  • go func() {
  • defer close(valStream)
  • for {
  • select {
  • case <-done:
  • return
  • case v, ok := <-c:
  • if ok == false {
  • return
  • }
  • select {
  • case valStream <- v:
  • case <-done:
  • }
  • }
  • }
  • }()
  • return valStream
  • }

扇入模式

扇入模式(FanIn) 是将多个同样类型的输入 channel 合并成一个同样类型的输出 channel,也就是 channel 的合并。

Goroutine 方式

每个 channel 起一个 goroutine。

  • func fanIn(chans ...<-chan interface{}) <-chan interface{} {
  • out := make(chan interface{})
  • go func() {
  • var wg sync.WaitGroup
  • wg.Add(len(chans))
  • for _, c := range chans {
  • go func(c <-chan interface{}) {
  • for v := range c {
  • out <- v
  • }
  • wg.Done()
  • }(c)
  • }
  • wg.Wait()
  • close(out)
  • }()
  • return out
  • }

Reflect

利用反射库针对 select 语句的处理合并输入 channel。

下面这种实现方式其实还是有些问题的, 在输入 channel 读取比较均匀的时候比较有效,否则性能比较低下。

  • func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
  • out := make(chan interface{})
  • go func() {
  • defer close(out)
  • var cases []reflect.SelectCase
  • for _, c := range chans {
  • cases = append(cases, reflect.SelectCase{
  • Dir: reflect.SelectRecv,
  • Chan: reflect.ValueOf(c),
  • })
  • }
  • for len(cases) > 0 {
  • i, v, ok := reflect.Select(cases)
  • if !ok { //remove this case
  • cases = append(cases[:i], cases[i+1:]...)
  • continue
  • }
  • out <- v.Interface()
  • }
  • }()
  • return out
  • }

递归方式

这种方式虽然理解起来不直观,但是性能还是不错的(输入 channel 不是很多的情况下递归层级不会很高,不会成为瓶颈)。

  • func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
  • switch len(chans) {
  • case 0:
  • c := make(chan interface{})
  • close(c)
  • return c
  • case 1:
  • return chans[0]
  • case 2:
  • return mergeTwo(chans[0], chans[1])
  • default:
  • m := len(chans) / 2
  • return mergeTwo(
  • fanInRec(chans[:m]...),
  • fanInRec(chans[m:]...))
  • }
  • }
  • func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
  • c := make(chan interface{})
  • go func() {
  • defer close(c)
  • for a != nil || b != nil {
  • select {
  • case v, ok := <-a:
  • if !ok {
  • a = nil
  • continue
  • }
  • c <- v
  • case v, ok := <-b:
  • if !ok {
  • b = nil
  • continue
  • }
  • c <- v
  • }
  • }
  • }()
  • return c
  • }

Tee 模式

扇出模式(FanOut) 是将一个输入 channel 扇出为多个 channel。

扇出行为至少可以分为两种:

  1. 从输入 channel 中读取一个数据,发送给每个输入 channel,这种模式称之为 Tee 模式
  2. 从输入 channel 中读取一个数据,在输出 channel 中选择一个 channel 发送

本节只介绍第一种情况,下一节介绍第二种情况

Goroutine 方式

将读取的值发送给每个输出 channel, 异步模式可能会产生很多的 goroutine。

  • func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
  • go func() {
  • defer func() {
  • for i := 0; i < len(out); i++ {
  • close(out[i])
  • }
  • }()
  • for v := range ch {
  • v := v
  • for i := 0; i < len(out); i++ {
  • i := i
  • if async {
  • go func() {
  • out[i] <- v
  • }()
  • } else {
  • out[i] <- v
  • }
  • }
  • }
  • }()
  • }

Reflect 方式

这种模式一旦一个输出 channel 被阻塞,可能会导致后续的处理延迟。

  • func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
  • go func() {
  • defer func() {
  • for i := 0; i < len(out); i++ {
  • close(out[i])
  • }
  • }()
  • cases := make([]reflect.SelectCase, len(out))
  • for i := range cases {
  • cases[i].Dir = reflect.SelectSend
  • }
  • for v := range ch {
  • v := v
  • for i := range cases {
  • cases[i].Chan = reflect.ValueOf(out[i])
  • cases[i].Send = reflect.ValueOf(v)
  • }
  • for _ = range cases { // for each channel
  • chosen, _, _ := reflect.Select(cases)
  • cases[chosen].Chan = reflect.ValueOf(nil)
  • }
  • }
  • }()
  • }

分布模式

分布模式将从输入 channel 中读取的值往输出 channel 中的其中一个发送。

Goroutine 方式

roundrobin 的方式选择输出 channel。

  • func fanOut(ch <-chan interface{}, out []chan interface{}) {
  • go func() {
  • defer func() {
  • for i := 0; i < len(out); i++ {
  • close(out[i])
  • }
  • }()
  • // roundrobin
  • var i = 0
  • var n = len(out)
  • for v := range ch {
  • v := v
  • out[i] <- v
  • i = (i + 1) % n
  • }
  • }()
  • }

Reflect 方式

利用发射随机的选择。

  • func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
  • go func() {
  • defer func() {
  • for i := 0; i < len(out); i++ {
  • close(out[i])
  • }
  • }()
  • cases := make([]reflect.SelectCase, len(out))
  • for i := range cases {
  • cases[i].Dir = reflect.SelectSend
  • cases[i].Chan = reflect.ValueOf(out[i])
  • }
  • for v := range ch {
  • v := v
  • for i := range cases {
  • cases[i].Send = reflect.ValueOf(v)
  • }
  • _, _, _ = reflect.Select(cases)
  • }
  • }()
  • }

eapache

eapache/channels 提供了一些 channel 应用模式的方法,比如上面的扇入扇出模式等。

因为 go 本身的 channel 无法再进行扩展, eapache/channels 库定义了自己的 channel 接口,并提供了与 channel 方便的转换。

eapache/channels 提供了四个方法:

  • Distribute: 从输入 channel 读取值,发送到其中一个输出 channel 中。当输入 channel 关闭后,输出 channel 都被关闭
  • Tee: 从输入 channel 读取值,发送到所有的输出 channel 中。当输入 channel 关闭后,输出 channel 都被关闭
  • Multiplex: 合并输入 channel 为一个输出 channel, 当所有的输入都关闭后,输出才关闭
  • Pipe: 将两个 channel 串起来

同时对上面的四个函数还提供了 WeakXXX 的函数,输入关闭后不会关闭输出。

下面看看对应的函数的例子。

Distribute

  • func testDist() {
  • fmt.Println("dist:")
  • a := channels.NewNativeChannel(channels.None)
  • outputs := []channels.Channel{
  • channels.NewNativeChannel(channels.None),
  • channels.NewNativeChannel(channels.None),
  • channels.NewNativeChannel(channels.None),
  • channels.NewNativeChannel(channels.None),
  • }
  • channels.Distribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
  • //channels.WeakDistribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
  • go func() {
  • for i := 0; i < 5; i++ {
  • a.In() <- i
  • }
  • a.Close()
  • }()
  • for i := 0; i < 6; i++ {
  • var v interface{}
  • var j int
  • select {
  • case v = <-outputs[0].Out():
  • j = 0
  • case v = <-outputs[1].Out():
  • j = 1
  • case v = <-outputs[2].Out():
  • j = 2
  • case v = <-outputs[3].Out():
  • j = 3
  • }
  • fmt.Printf("channel#%d: %d\n", j, v)
  • }
  • }

Tee

  • func testTee() {
  • fmt.Println("tee:")
  • a := channels.NewNativeChannel(channels.None)
  • outputs := []channels.Channel{
  • channels.NewNativeChannel(channels.None),
  • channels.NewNativeChannel(channels.None),
  • channels.NewNativeChannel(channels.None),
  • channels.NewNativeChannel(channels.None),
  • }
  • channels.Tee(a, outputs[0], outputs[1], outputs[2], outputs[3])
  • //channels.WeakTee(a, outputs[0], outputs[1], outputs[2], outputs[3])
  • go func() {
  • for i := 0; i < 5; i++ {
  • a.In() <- i
  • }
  • a.Close()
  • }()
  • for i := 0; i < 20; i++ {
  • var v interface{}
  • var j int
  • select {
  • case v = <-outputs[0].Out():
  • j = 0
  • case v = <-outputs[1].Out():
  • j = 1
  • case v = <-outputs[2].Out():
  • j = 2
  • case v = <-outputs[3].Out():
  • j = 3
  • }
  • fmt.Printf("channel#%d: %d\n", j, v)
  • }
  • }

Multiplex

  • func testMulti() {
  • fmt.Println("multi:")
  • a := channels.NewNativeChannel(channels.None)
  • inputs := []channels.Channel{
  • channels.NewNativeChannel(channels.None),
  • channels.NewNativeChannel(channels.None),
  • channels.NewNativeChannel(channels.None),
  • channels.NewNativeChannel(channels.None),
  • }
  • channels.Multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
  • //channels.WeakMultiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
  • go func() {
  • for i := 0; i < 5; i++ {
  • for j := range inputs {
  • inputs[j].In() <- i
  • }
  • }
  • for i := range inputs {
  • inputs[i].Close()
  • }
  • }()
  • for v := range a.Out() {
  • fmt.Printf("%d ", v)
  • }
  • }

Pipe

  • func testPipe() {
  • fmt.Println("pipe:")
  • a := channels.NewNativeChannel(channels.None)
  • b := channels.NewNativeChannel(channels.None)
  • channels.Pipe(a, b)
  • // channels.WeakPipe(a, b)
  • go func() {
  • for i := 0; i < 5; i++ {
  • a.In() <- i
  • }
  • a.Close()
  • }()
  • for v := range b.Out() {
  • fmt.Printf("%d ", v)
  • }
  • }

集合操作

从 channel 的行为来看,它看起来很像一个数据流,所以我们可以实现一些类似 Scala 集合的操作。

Scala 的集合类提供了丰富的操作(方法), 当然其它的一些编程语言或者框架也提供了类似的方法, 比如 Apache Spark、Java Stream、ReactiveX 等。

下面列出了一些方法的实现,我相信经过一些人的挖掘,相关的方法可以变成一个很好的类库,但是目前我们先看一些例子。

skip

skip 函数是从一个 channel 中跳过开一些数据,然后才开始读取。

skipN

skipN 跳过开始的 N 个数据。

  • func skipN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
  • takeStream := make(chan interface{})
  • go func() {
  • defer close(takeStream)
  • for i := 0; i < num; i++ {
  • select {
  • case <-done:
  • return
  • case <-valueStream:
  • }
  • }
  • for {
  • select {
  • case <-done:
  • return
  • case takeStream <- <-valueStream:
  • }
  • }
  • }()
  • return takeStream
  • }
skipFn

skipFn 提供 Fn 函数为 true 的数据,比如跳过偶数。

  • func skipFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
  • takeStream := make(chan interface{})
  • go func() {
  • defer close(takeStream)
  • for {
  • select {
  • case <-done:
  • return
  • case v := <-valueStream:
  • if !fn(v) {
  • takeStream <- v
  • }
  • }
  • }
  • }()
  • return takeStream
  • }
skipWhile

跳过开头函数 fn 为 true 的数据。

  • func skipWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
  • takeStream := make(chan interface{})
  • go func() {
  • defer close(takeStream)
  • take := false
  • for {
  • select {
  • case <-done:
  • return
  • case v := <-valueStream:
  • if !take {
  • take = !fn(v)
  • if !take {
  • continue
  • }
  • }
  • takeStream <- v
  • }
  • }
  • }()
  • return takeStream
  • }

take

skip 的反向操作,读取一部分数据。

takeN

takeN 读取开头 N 个数据。

  • func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
  • takeStream := make(chan interface{})
  • go func() {
  • defer close(takeStream)
  • for i := 0; i < num; i++ {
  • select {
  • case <-done:
  • return
  • case takeStream <- <-valueStream:
  • }
  • }
  • }()
  • return takeStream
  • }
takeFn

takeFn 只筛选满足 fn 的数据。

  • func takeFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
  • takeStream := make(chan interface{})
  • go func() {
  • defer close(takeStream)
  • for {
  • select {
  • case <-done:
  • return
  • case v := <-valueStream:
  • if fn(v) {
  • takeStream <- v
  • }
  • }
  • }
  • }()
  • return takeStream
  • }
takeWhile

takeWhile 只挑选开头满足 fn 的数据。

  • func takeWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
  • takeStream := make(chan interface{})
  • go func() {
  • defer close(takeStream)
  • for {
  • select {
  • case <-done:
  • return
  • case v := <-valueStream:
  • if !fn(v) {
  • return
  • }
  • takeStream <- v
  • }
  • }
  • }()
  • return takeStream
  • }

flat

平展(flat) 操作是一个有趣的操作。

如果输入是一个 channel,channel 中的数据还是相同类型的 channel, 那么 flat 将返回一个输出 channel,输出 channel 中的数据是输入的各个 channel 中的数据。

它与扇入不同,扇入的输入 channel 在调用的时候就是固定的,并且以数组的方式提供,而 flat 的输入是一个 channel,可以运行时随时的加入 channel。

  • func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
  • valStream := make(chan interface{})
  • go func() {
  • defer close(valStream)
  • for {
  • select {
  • case <-done:
  • return
  • case v, ok := <-c:
  • if ok == false {
  • return
  • }
  • select {
  • case valStream <- v:
  • case <-done:
  • }
  • }
  • }
  • }()
  • return valStream
  • }
  • func flat(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
  • valStream := make(chan interface{})
  • go func() {
  • defer close(valStream)
  • for {
  • var stream <-chan interface{}
  • select {
  • case maybeStream, ok := <-chanStream:
  • if ok == false {
  • return
  • }
  • stream = maybeStream
  • case <-done:
  • return
  • }
  • for val := range orDone(done, stream) {
  • select {
  • case valStream <- val:
  • case <-done:
  • }
  • }
  • }
  • }()
  • return valStream
  • }

map

map 和 reduce 是一组常用的操作。

map 将一个 channel 映射成另外一个 channel, channel 的类型可以不同。

  • func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
  • out := make(chan interface{})
  • if in == nil {
  • close(out)
  • return out
  • }
  • go func() {
  • defer close(out)
  • for v := range in {
  • out <- fn(v)
  • }
  • }()
  • return out
  • }

因为 map 是 go 的关键字,所以我们不能命名函数类型为 map ,这里用 mapChan 代替。

比如你可以处理一个公司员工工资的 channel, 输出一个扣税之后的员工工资的 channel。

reduce

  • func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
  • if in == nil {
  • return nil
  • }
  • out := <-in
  • for v := range in {
  • out = fn(out, v)
  • }
  • return out
  • }

你可以用 reduce 实现 sum 、 max 、 min 等聚合操作。

总结

本文列出了 channel 的一些深入应用的模式,相信通过阅读本文,你可以更加深入的了解 Go 的 channel 类型,并在开发中灵活的应用 channel。也欢迎你在评论中提出更多的 channel 的应用模式。

所有的代码可以在 github 上找到:smallnest/channels 。

参考资料

  1. https://github.com/kat-co/concurrency-in-go-src
  2. https://github.com/campoy/justforfunc/tree/master/27-merging-chans
  3. https://github.com/eapache/channels
  4. https://github.com/LK4D4/trylock
  5. https://stackoverflow.com/questions/36391421/explain-dont-communicate-by-sharing-memory-share-memory-by-communicating
  6. https://github.com/lrita/gosync
  7. https://www.ardanlabs.com/blog/2017/10/the-behavior-of-channels.html
城东书院 www.cdsy.xyz
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐