GoroutineとChannel

Go言語でConcurrencyプログラミング

6 September 2014

Daigo Ikeda

Knightso, LLC

アジェンダ

本スライドで使用するサンプルコード

Goroutine

Goroutineとは

Go言語でConcurrencyを実現する仕組み

Concurrencyとは・・・複数のタスクを同時に処理すること

Goroutineの特徴

とにかく使ってみよう

関数呼び出しの前に「go」つけるだけ!カンタン!

package main

import "fmt"

func hoge() {
    fmt.Println("hoge!")
}

func main() {
    // ↓にgoつけてみよう!
    hoge()
}

goつける前と後で結果を比べてみましょう。

あれ?

結果が[no output]になりましたね?

これは、関数hogeがGoroutineとして呼び出され呼び出し元(main関数)に制御が戻っているからです。

main関数はfmt.Printlnの実行前に終了してしまっている為、プログラムが全体が終了しています。

それならば・・・

Goroutine実行後少しsleepしてみましょう。

package main

import (
    "fmt"
    "time"
)

func hoge() {
    fmt.Println("hoge!")
}

func main() {
    go hoge()
    time.Sleep(time.Microsecond)
}

"hoge!"が出力されましたね!\(^o^)/

1マイクロ秒待っている間に関数hoge(Goroutine)が実行されました!

実は・・・

GoはデフォルトでCPUを一つしか使わない設定の為、その設定のままでは並列処理が行われているわけではありません。同時に動いているGoroutineは一つだけで、その他は待機しています。
各GoroutineはsleepやIO待ちが合った場合に他Goroutineに制御がスイッチします。

runtime.GOMAXPROCS関数で使用するCPU数を指定出来ます。その場合、Goroutineはその数だけ並列に実行可能となります。

    runtime.GOMAXPROCS(4)

runtime.NumCPU関数でその環境で使用可能なCPU数が取得出来るのでそれを指定してもよいでしょう。

    runtime.GOMAXPROCS(runtime.NumCPU())

残念ながら本スライドやGo PlaygroundはGoogle App Engine上で動いている為、NumCPUは1になっており、並列処理を試すことは出来ません(´・ω・`)

Goroutineと匿名関数

Goroutineは匿名関数に対しても呼び出すことが出来ます。

package main

import (
    "fmt"
    "time"
)

func main() {
    go func() {
        fmt.Println("hoge!")
    }()

    time.Sleep(time.Microsecond)
}

Goroutineとメソッド

Goroutineはメソッドに対しても呼び出すことが出来ます。

package main

import (
	"fmt"
	"time"
)

type hoge string

func (h hoge) hello() {
    fmt.Printf("hello %s!\n", h)
}

func main() {
    var h hoge = "hoge"
    go h.hello()
    time.Sleep(time.Microsecond)
}

但しこれは発表者個人の考えですが、あまり推奨しません。
後述しますが、複数Goroutineから共通のリソースにアクセスするには細心の注意を払う必要があります。
そもそもメソッドはレシーバ(メソッドの持ち主)のリソースにアクセスする使い方が多いため、気軽に使ってしまうと知らずにrace conditionを生み出す可能性が高いと思っています。

Goroutineとクロージャ

Goroutineはクロージャに対しても呼び出すことが出来ます。

package main

import (
	"fmt"
	"time"
)

func inc() func() {
    i := 0
    return func() {
        i++
        fmt.Printf("i=%d\n", i)
    }
}

func main() {
    f := inc()
    go f()
    go f()
    go f()
    time.Sleep(time.Second)
}

これも推奨しません。理由はメソッドとほぼ同じです。
クロージャは通常レキシカルスコープの変数を参照・更新する用途で使用される為、そのままGoroutineで実行してしまうとほぼ問題が起きます。
※上記コード例にも深刻な問題があります。考えてみて下さい。

Goroutineの落とし穴

Goroutineと言うよりは、マルチスレッドプログラミング全般に言えることですが・・・

複数のGoroutineから同じリソースにアクセスした場合、競合する可能性があります。

例えば・・・

銀行口座に預金する処理を考えてみます。

package main

import (
	"fmt"
	"time"
)

var balance int = 0
func deposit(money int) {
    newbalance := balance + money
    balance = newbalance
}
func main() {
    go deposit(100)
    go deposit(100)
    go deposit(100)
    time.Sleep(time.Second)
    fmt.Printf("balance:%d\n", balance)
}

実行すると期待する結果が見られると思います。
これは使用CPU数が1の為各goroutineがアトミックに動作している為です。
newbalanceの算出と、balanceの更新の行の間にsleepを入れてみましょう。
預金したはずの残高が記録されてませんね・・・(´・ω・`)

問題を解決する為には

共有リソースに対する処理を複数Goroutineが同時に行えない様に排他処理(ロック)を組み込みます。
ここではsyncパッケージのMutexを使用します。

package main

import (
	"fmt"
	"time"
	"sync"
)

var balance int = 0
var mutex sync.Mutex
func deposit(money int) {
    mutex.Lock()
    defer mutex.Unlock()
    newbalance := balance + money
    time.Sleep(time.Microsecond)
    balance = newbalance
}
// start 2 OMIT
func main() {
	go deposit(100)
	go deposit(100)
	go deposit(100)
	time.Sleep(time.Second) // これ!
	fmt.Printf("balance:%d\n", balance)
}
// end 2 OMIT

期待する結果がでましたね!\(^o^)/

おまけ

今迄main関数が先に終了しない様にSleep入れていましたが、これイケてないですよね?

func main() {
    go deposit(100)
    go deposit(100)
    go deposit(100)
    time.Sleep(time.Second) // これ!
    fmt.Printf("balance:%d\n", balance)
}

Goroutineがもし高速で終了していたら余計なSleepをしてしまいますし、もしGoroutine処理が予想よりも時間がかかってしまったら結局同じ問題が起きてしまいます(´・ω・`)

そんな貴方にWaitGroup!

syncパッケージのWaitGroupを使うと、複数Goroutineの終了を待つことができます。

package main

import (
	"fmt"
	"time"
	"sync"
)

var balance int = 0
var mutex sync.Mutex
func deposit(money int, wg *sync.WaitGroup) {
    defer wg.Done() // Goroutine終了を通知
    mutex.Lock()
    defer mutex.Unlock()
    newbalance := balance + money
    time.Sleep(time.Microsecond)
    balance = newbalance
}
func main() {
    var wg sync.WaitGroup
    wg.Add(3) // 3個のGoroutineを待つよ!
    go deposit(100, &wg)
    go deposit(100, &wg)
    go deposit(100, &wg)
    wg.Wait() // 全部終了まで待機!
    fmt.Printf("balance:%d\n", balance)
}

Channel

Channelとは

Go言語にはシンプルにConcurrencyなプログラムを行う為のしくみが組み込みで用意されています。

それがChannelです!

Channelとは

とにかく使ってみよう

ChannelはSliceやMapと同様参照型で、make()関数で生成できます。

package main

import "fmt"

func main() {
    ch := make(chan string)

    go func() {
        ch <- "hello!" // 送信!
        ch <- "world!" // 送信!
    }()

    fmt.Println(<-ch) // 受信!
    fmt.Println(<-ch) // 受信!
}

SleepやWaitGroupを使わなくてもmain関数が先に終了していないのは、channelの受信がデータを受け取るまでブロックされているからです。

バッファ付きチャネル

make関数の第2引数でバッファサイズを指定出来ます。
バッファサイズがフルになるまでは送信がブロックされません。

package main

import "fmt"
import "time"

func main() {
    ch := make(chan int, 10)

    go func() {
        for i := 0; i < 10; i++ {
            ch <- i // 送信!
            fmt.Printf("sent %d\n", i)
        }
    }()

    for i := 0; i < 10; i++ {
        time.Sleep(time.Microsecond)
        fmt.Println(<-ch) // 受信!
        fmt.Printf("received %d\n", i)
    }
}

バッファサイズを変更して結果を見てみましょう。

close

close関数でchannelを閉じます。
閉じたchannelには送信出来ません。パニクります。
受信は、値がすでに入っていれば閉じたchannelに対しても可能です。空の閉じたchannelに対して受信を行った場合は値は0値になり、第2戻り値(bool)にfalseが返ります。これをチェックすることでGoroutineの終了を監視することもできます。

close

package main

import "fmt"

func main() {
    ch := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            ch <- i // 送信!
            fmt.Printf("sent %d\n", i)
        }
        close(ch) 
    }()

    for {
        i, ok := <-ch
        if !ok {
            break
        }
        fmt.Printf("received %d\n", i)
    }
}

rangeでイテレート

ChannelはSliceやMapの様にrange文でイテレートすることが出来ます。
受信されるまでブロックし、受信されたらそのデータを受け取ります。
Channelがcloseされたらループが終了します。

package main

import "fmt"

func main() {
    ch := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            ch <- i // 送信!
            fmt.Printf("sent %d\n", i)
        }
        close(ch) 
    }()

    for i := range ch{
        fmt.Printf("received %d\n", i)
    }
}

受信専用channel、送信専用channel

Channel型には受信専用、送信専用を明示することが出来ます。
これにより関数の引数や戻り値などで間違った使われ方を防ぐことができます。
デフォルトの送受信用Channelは、受信専用、送信専用にキャスト出来ます。逆は出来ません。

受信専用channel、送信専用channel

package main

import "fmt"

func send(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i // 送信!
    }
    close(ch)
}
func receive(ch <-chan int) {
    for {
        i, ok := <-ch
        if !ok {
            break
        }
        fmt.Printf("received %d\n", i)
    }
}
func main() {
    ch := make(chan int)

    go send(ch)

    receive(ch)
}

デッドロック

送信可能なgoroutineのいない空のchannelから受信を行おうとするとデッドロックでパニクります。

package main

func main() {
    ch := make(chan int)
    <-ch
}

デッドロック

受信可能なgoroutineのいないバッファに空きのないchannelに送信を行おうとしてもデッドロックでパニクります。

package main

func main() {
    ch := make(chan int)
    ch<-999
}

バッファに空きがある場合はデッドロックになりません。

select文

select〜case文を使用すると、複数channelを同時に監視することが出来ます。

package main

import (
	"fmt"
	"sync"
)

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    var wg sync.WaitGroup

    wg.Add(1)
    go func() {
        select { // いずれか受信するまでブロック
        case v := <-ch1:
            fmt.Println(v)
        case v := <-ch2:
            fmt.Println(v)
        }
        wg.Done()
    }()

    ch2<-999
    wg.Wait()
}

select文

受信だけでなく送信もcaseに記述することが出来ます。

package main

import (
	"fmt"
	"sync"
)

func main() {
    in := make(chan int)
    out := make(chan string)
    var wg sync.WaitGroup

    wg.Add(1)
    go func() {
        select { // いずれか受信するまでブロック
        case v := <-in:
            fmt.Println(v)
        case out<-"out!!":
        }
        wg.Done()
    }()
    fmt.Println(<-out)
    wg.Wait()
}

select文 - default

select文にもswitchと同様defaultが指定出来ます。
defaultが無い場合いずれかのcaseのchannel送受信が行われるまでブロックされますが、defaultがあると全てのChannel送受信がなかった場合にブロックされずにdefaultが実行されます。

select文 - default

package main

import (
	"fmt"
	"sync"
)

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        select {
        case v := <-ch1:
            fmt.Println(v)
        case v := <-ch2:
            fmt.Println(v)
        default: // ch1, ch2のどちらも受信出来なかった場合
            fmt.Println("default!")
        }
        fmt.Println("done!")
        wg.Done()
    }()

    wg.Wait()
}

Channelの送受信を行いたいが、出来ない場合にブロックされたくない場合にも使用出来ます。

プログラミングパターン

for-select loop

    req := make(chan int)
    count := make(chan int)

    go func() { // メインループ
        var c int
        for {
            select {
            case v := <-req:
                fmt.Printf("received %d\n", v)
                c++
            case count <- c:
            }
        }
    }()

Goroutineで無限forループを回してselect文+Channelで外部とやり取りします。
デーモンやサーバーなどの常駐プログラム作る際に便利です。
goroutineのローカル変数で状態を管理出来るので、とてもシンプルにConcurrentなプログラムを書く事が出来ます。

for-select loop

package main

import (
	"fmt"
	"time"
)

func main() {
	// start 1 OMIT
	req := make(chan int)
	count := make(chan int)

	go func() { // メインループ
		var c int
		for {
			select {
			case v := <-req:
				fmt.Printf("received %d\n", v)
				c++
			case count <- c:
			}
		}
	}()
	// end 1 OMIT

    go func() { // リクエスト送信ループ
        for i := 0; ; i++ {
            time.Sleep(time.Second)
            req <- i
        }
    }()

    for { // 5秒に一回count確認
        time.Sleep(5 * time.Second)
        fmt.Printf("count=%d\n", <-count)
    }
}

上記サンプルは無限ループなのでkillして終了させて下さい。

Request/Response

発表者はchan chanパターンとも呼んでいます(^^;

channel経由でメッセージを送る際に、そのメッセージに対する結果を受け取りたいことがあります。
そのchannelは双方向ではないのでレスポンス用のchannelを用意する必要がありますが、複数Goroutineが共有するchannelだと自分のメッセージに対するレスポンスを特定するのが面倒です。
そんなときにこのパターンが使用出来ます。

リクエスト用Channelにレスポンス用Channelを送信します。リクエストを受け取った側は結果をレスポンス用Channelに送信します。そうすることで送信者が結果を受け取ることができます。

Request/Response

    req := make(chan int)
    count := make(chan int)
    stop := make(chan chan error) // ここがchan chan

    go func() { // メインループ
        var c int
        for {
            select {
            case v := <-req:
                fmt.Printf("received %d\n", v)
                c++
            case count <- c:
            case sch := <- stop:
                // tried some stop process, but failed
                sch<-errors.New("stop failed...")
            }
        }
    }()

mainループに対してChannel経由で停止要求を送ります。
このサンプルでは停止処理失敗のエラーをレスポンスで返しています。

Request/Response

package main

import (
	"fmt"
	"time"
	"errors"
)

func main() {
	// start 1 OMIT
	req := make(chan int)
	count := make(chan int)
	stop := make(chan chan error) // ここがchan chan

	go func() { // メインループ
		var c int
		for {
			select {
			case v := <-req:
				fmt.Printf("received %d\n", v)
				c++
			case count <- c:
			case sch := <- stop:
				// tried some stop process, but failed
				sch<-errors.New("stop failed...")
			}
		}
	}()
	// end 1 OMIT

    go func() { // リクエスト送信ループ
        for i := 0; ; i++ {
            time.Sleep(time.Second)
            req <- i
        }
    }()

    go func() {
        <-time.After(15 * time.Second) // 15秒後に終了要求
        sch := make(chan error)
        stop<-sch
        if err := <-sch; err != nil {
            fmt.Println(err)
        }
    }()

    c := time.Tick(5 * time.Second) // 5秒に一回count確認
    for _ = range c {
        fmt.Printf("count=%d\n", <-count)
    }
}

Future

非同期で起動した処理(Goroutine)の結果を、後で必要な時に参照するパターンです。
参照時にまだ結果が得られていない場合はブロックされます。
Go言語ではChannelがそのままFutureの役割をします。

package main

import (
	"fmt"
	"time"
)

func someLongJob(i int) <-chan string {
    future := make(chan string)
    go func() {
        time.Sleep(time.Duration(5 - i) * time.Second) // some long job
        future <- fmt.Sprintf("success! %d", i)
    }()
    return future
}
func main() {
    futures := [](<-chan string){}
    for i := 0; i < 5; i++ { // 先ずGoroutineを全て起動
        futures = append(futures, someLongJob(i))
    }
    for _, future := range futures { // 全て起動し終わってから結果を参照
        fmt.Println(<-future)
    }
}

Coroutine/Generator

Concurrencyとは直接関係ないですが、GoroutineとChannelを使用する事で他言語にあるCoroutineやGeneratorと同等の機能を実現することが出来ます。

package main

import (
	"fmt"
)

func seq(max int) <-chan int {
    gen := make(chan int)
    go func() {
        for i := 0; i <= max; i++ {
            gen<-i
        }
        close(gen)
    }()
    return gen
}
func main() {
    g := seq(20)

    fmt.Println(<-g)
    fmt.Println(<-g)

    for i := range g { // rangeでイテレートも可能
        fmt.Println(i)
    }
}

サンプルはシーケンスを生成するGeneratorです。

最後に

FYI:

Thank you

Daigo Ikeda

Knightso, LLC