mike-neckのブログ

Java or Groovy or Swift or Golang

複数の goroutine で処理をした結果を集計するようなタイプの goroutine pipeline 改

この前書いた記事のコードを改善する。

f:id:mike_neck:20181121224358p:plain

改善のヒントは『Go 言語による並行処理』p.77〜78 の部分。チャネルの所有権を割り振ることで、デッドロック、 panic 、不適合なデータの書き込みを防げるようになる。

  • チャネルの所有者は…
    • チャネルを初期化
    • 書き込みを行うか、他の goroutine に所有権を移譲する
    • チャネルを閉じる
    • 読み込み専用チャネルを公開する

チャネルの所有者というのは関数を用いて表現する。チャネルの所有権のスコープを極力狭くすることで問題を回避する。


この前書いたコードでは次のように main で複数のチャネルを作っていた。

func main() {
  producer := make(chan string)
  consumer := make(chan string)
// 以下省略
}

改善版では、そうではなく、データ生成の goroutine を起動する関数の中でチャネルを作って、読み取り専用のチャネルを返すようにする。

func producer() <-chan string {
  output := make(chan string)
  go func() {
    defer close(output)
    for _, item := range items {
      output <- item
    }
  }
  return output
}

前回のコードは改善後にこのようになる。

import (
    "fmt"
    "time"
    "sync"
)

var items = []string {
    "foo",
    "bar",
    "baz",
    "qux",
    "quux",
}

func main() {
  prd := producer()
  con := worker(3, prd)
  consumer(con)
}

func producer() <-chan string {
  output := make(chan string)
  go func() {
    defer close(output)
    for _, item := range items {
      output <- item
      time.Sleep(130 * time. Millisecond)
    }
  }()
  return output
}

func worker(size int, input <-chan string) <-chan string {
  output := make(chan string)
  var task sync.WaitGroup

  for index := 0; index < size; index++ {
    num := index
    task.Add(1)
    go func() {
      defer task.Done()
      for item := range input {
        output <- fmt.Sprintf("[worker-%d]%s[len: %d]", num, item, len(item))
        time.Sleep(300 * time. Millisecond)
      }
    }()
  }

  go func() {
    task.Wait()
    close(output)
  }()

  return output
}

func consumer(input <-chan string) {
  for item := range input {
    fmt.Println(item)
  }
}

play.golang.org