mike-neckのブログ

Java or Groovy or Swift or Golang

複数の goroutine で処理をした結果を集計するようなタイプの goroutine pipeline

chansync.WaitGroupclose(channel) を駆使して組み立てる

f:id:mike_neck:20181121224358p:plain

package main

import (
    "fmt"
    "time"
    "sync"
)

var items = []string {
    "foo",
    "bar",
    "baz",
    "qux",
    "quux",
}

func main() {
    producer := make(chan string)
    consumer := make(chan string)
    var wg sync.WaitGroup
    for index := 0; index < 3; index++ {
        wg.Add(1)
        go worker(index, producer, consumer, &wg)
    }

    go func() {
        wg.Wait()
        close(consumer)
    }()

    go func() {
        for _, item := range items {
            producer <- item
            time.Sleep(130 * time. Millisecond)
        }
        close(producer)
    }()

    for item := range consumer {
        fmt.Println(item)
    }
}

func worker(num int, producer, consumer chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    for item := range producer {
        consumer <- fmt.Sprintf("[worker-%d]%s[len: %d]", num, item, len(item))
        time.Sleep(300 * time. Millisecond)
    }
}