mike-neckのブログ

Java or Groovy or Swift or Golang

JSUG勉強会 - 「Spring Boot ベースの DDD サンプル徹底解説」に行ってきた話

JSUG 勉強会 Spring Boot ベースの DDD サンプル徹底解説 に行ってきました。

jsug.doorkeeper.jp

DDD については、考え方的にはわかっているところもあるのですが、実際のコードを見てがっかりしたり、コードの例が理想的すぎて現実味がないといったことから、理解から遠いところにある印象があったので、この勉強会のアイデアがあると増田さんがツイートしていたときから興味を持っていました。しかし、最近 jsugjjug の勉強会の募集を開始した旨のメール/ツイートを見逃してしまうことがあり(ここ2〜3回見逃している)、この勉強会もその一つで実際に見逃していて、申し込んだときにはキャンセル待ちが発生していました。前世での徳が高かったのか、前日の夕方くらいになって参加できることになり、キャンセルしていただいた方には非常に感謝しております。

f:id:mike_neck:20180802004547p:plain

続きを読む

複数の goroutine で処理をした結果を集計するようなタイプの goroutine pipeline 改

この前書いた記事のコードを改善する。

f:id:mike_neck:20181121224358p:plain

改善のヒントは『Go 言語による並行処理』p.77〜78 の部分。チャネルの所有権を割り振ることで、デッドロック、 panic 、不適合なデータの書き込みを防げるようになる。

  • チャネルの所有者は…
    • チャネルを初期化
    • 書き込みを行うか、他の goroutine に所有権を移譲する
    • チャネルを閉じる
    • 読み込み専用チャネルを公開する

チャネルの所有者というのは関数を用いて表現する。チャネルの所有権のスコープを極力狭くすることで問題を回避する。


この前書いたコードでは次のように main で複数のチャネルを作っていた。

func main() {
  producer := make(chan string)
  consumer := make(chan string)
// 以下省略
}

改善版では、そうではなく、データ生成の goroutine を起動する関数の中でチャネルを作って、読み取り専用のチャネルを返すようにする。

func producer() <-chan string {
  output := make(chan string)
  go func() {
    defer close(output)
    for _, item := range items {
      output <- item
    }
  }
  return output
}

前回のコードは改善後にこのようになる。

import (
    "fmt"
    "time"
    "sync"
)

var items = []string {
    "foo",
    "bar",
    "baz",
    "qux",
    "quux",
}

func main() {
  prd := producer()
  con := worker(3, prd)
  consumer(con)
}

func producer() <-chan string {
  output := make(chan string)
  go func() {
    defer close(output)
    for _, item := range items {
      output <- item
      time.Sleep(130 * time. Millisecond)
    }
  }()
  return output
}

func worker(size int, input <-chan string) <-chan string {
  output := make(chan string)
  var task sync.WaitGroup

  for index := 0; index < size; index++ {
    num := index
    task.Add(1)
    go func() {
      defer task.Done()
      for item := range input {
        output <- fmt.Sprintf("[worker-%d]%s[len: %d]", num, item, len(item))
        time.Sleep(300 * time. Millisecond)
      }
    }()
  }

  go func() {
    task.Wait()
    close(output)
  }()

  return output
}

func consumer(input <-chan string) {
  for item := range input {
    fmt.Println(item)
  }
}

play.golang.org

複数の goroutine で処理をした結果を集計するようなタイプの goroutine pipeline

chansync.WaitGroupclose(channel) を駆使して組み立てる

f:id:mike_neck:20181121224358p:plain

package main

import (
    "fmt"
    "time"
    "sync"
)

var items = []string {
    "foo",
    "bar",
    "baz",
    "qux",
    "quux",
}

func main() {
    producer := make(chan string)
    consumer := make(chan string)
    var wg sync.WaitGroup
    for index := 0; index < 3; index++ {
        wg.Add(1)
        go worker(index, producer, consumer, &wg)
    }

    go func() {
        wg.Wait()
        close(consumer)
    }()

    go func() {
        for _, item := range items {
            producer <- item
            time.Sleep(130 * time. Millisecond)
        }
        close(producer)
    }()

    for item := range consumer {
        fmt.Println(item)
    }
}

func worker(num int, producer, consumer chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    for item := range producer {
        consumer <- fmt.Sprintf("[worker-%d]%s[len: %d]", num, item, len(item))
        time.Sleep(300 * time. Millisecond)
    }
}