実践!Go並行処理!

20 Jul 2019

Daigo Ikeda

Knightso, LLC

Profile

Daigo Ikeda
@hogedigo

Knightso, LLC
http://www.knightso.co.jp/
Shizuoka, JAPAN

2

本発表の趣旨

3

流れ

4

Beforehand

GorountieとChannelをおさらい

5

Let's start!

6

お題1: 外部IO

API呼び出し関数(モック)

func callAPI() (int, error) {
    // mocking IO latency
    latency := rand.Intn(3) + 1
    time.Sleep(time.Duration(latency) * time.Second)

    return latency, nil
}
7

メイン処理

package main

import (
	"fmt"
	"log"
	"math/rand"
	"time"
)

func main() {
	fmt.Println("start")
	start := time.Now()

    res1, err := callAPI()
    if err != nil {
        log.Fatal(err)
    }

    res2, err := callAPI()
    if err != nil {
        log.Fatal(err)
    }

    res3, err := callAPI()
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(res1, res2, res3)

	fmt.Println("time: ", time.Now().Sub(start))
}

// STARTAPI OMIT
func callAPI() (int, error) {
	// mocking IO latency
	latency := rand.Intn(3) + 1
	time.Sleep(time.Duration(latency) * time.Second)

	return latency, nil
}

// ENDAPI OMIT
8

Try!

Before:

9

After 1: WaitGroupを利用した場合

Adter 2: ErrGroupを利用した場合

10

お題2: たくさん外部IO

API呼び出し関数(モック)

func callAPI(id int) (string, error) {
    // mocking IO latency
    latency := rand.Intn(100) + 1
    time.Sleep(time.Duration(latency) * time.Millisecond)

    // mocking error
    if latency%3 == 0 {
        return "", fmt.Errorf("got stupid! latency:%d", latency)
    }

    return fmt.Sprintf("%d:%d", id, latency), nil
}
11

メイン処理

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type strOrErr struct {
	value string
	err   error
}

func main() {
	fmt.Println("start")
	start := time.Now()

    reslist := make([]strOrErr, 0, 100)

    for i := 0; i < 100; i++ {
        res, err := callAPI(i)
        reslist = append(reslist, strOrErr{res, err})
    }

    for _, res := range reslist {
        fmt.Printf("res:%s, err:%v\n", res.value, res.err)
    }

	fmt.Println("time: ", time.Now().Sub(start))
}

// STARTAPI OMIT
func callAPI(id int) (string, error) {
	// mocking IO latency
	latency := rand.Intn(100) + 1
	time.Sleep(time.Duration(latency) * time.Millisecond)

	// mocking error
	if latency%3 == 0 {
		return "", fmt.Errorf("got stupid! latency:%d", latency)
	}

	return fmt.Sprintf("%d:%d", id, latency), nil
}

// ENDAPI OMIT
12

Try!

Before:

13

After: Channelを用いたFutureパターン

14

お題3: 同時実行数の抑制

メイン処理

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type strOrErr struct {
	value string
	err   error
}

func main() {
	fmt.Println("start")
	start := time.Now()

    reslist := make([]chan strOrErr, 100)

    for i := 0; i < 100; i++ {
        reslist[i] = make(chan strOrErr)

        go func(i int) {
            res, err := callAPI(i)
            reslist[i] <- strOrErr{res, err}
        }(i)
    }

    for _, ch := range reslist {
        res := <-ch
        fmt.Printf("res:%s, err:%v\n", res.value, res.err)
    }

	fmt.Println("time: ", time.Now().Sub(start))
}

// STARTAPI OMIT
func callAPI(id int) (string, error) {
	// mocking IO latency
	latency := rand.Intn(100) + 1
	time.Sleep(time.Duration(latency) * time.Millisecond)

	// mocking error
	if latency%3 == 0 {
		return "", fmt.Errorf("got stupid! latency:%d", latency)
	}

	return fmt.Sprintf("%d:%d", id, latency), nil
}

// ENDAPI OMIT
15

Try!

Before:

16

After: バッファ付きchannelを使ったセマフォ

参考

17

お題4: 巨大なデータの変換処理

18

メイン処理

package main

import (
	"archive/zip"
	"bufio"
	"bytes"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"math/rand"
	"time"

	"github.com/knightso/base/errors"
)

func main() {
	fmt.Println("start")
	start := time.Now()

    urlsScanner := bufio.NewScanner(newURLsReader())

    buf := bytes.NewBuffer(make([]byte, 0, 2048))

    for urlsScanner.Scan() {
        line := urlsScanner.Text()

        links, err := findLinks(line)
        if err != nil {
            log.Fatal(err)
        }

        for _, link := range links {
            if _, err := fmt.Fprintln(buf, link); err != nil {
                log.Fatal(err)
            }
        }
    }
    time.Sleep(1000)

    if err := urlsScanner.Err(); err != nil {
        log.Fatal(err)
    }

	// START2 OMIT
	fmt.Println(buf.String()) // just log

	zipBuf := bytes.NewBuffer(make([]byte, 0, 2048))

	if err := writeZip("test.zip", zipBuf, buf); err != nil {
		log.Fatal(err)
	}

	if err := writeToRemote(zipBuf); err != nil {
		log.Fatal(err)
	}

	// END2 OMIT

	fmt.Println("time: ", time.Now().Sub(start))
}

func newURLsReader() io.Reader {
	// mocking remote storage file
	r, w := io.Pipe()
	go func() {
		defer w.Close()
		for i := 0; i < 1000; i++ {
			if _, err := fmt.Fprintf(w, "http://www.example.com/%d\n", i); err != nil {
				log.Fatal(err)
			}
			time.Sleep(time.Millisecond) // mock latency
		}
	}()
	return r
}

func findLinks(url string) ([]string, error) {
	// mocking remote fetch
	latency := rand.Intn(10) + 1
	time.Sleep(time.Duration(latency) * time.Millisecond)

	links := make([]string, 0, 3)
	for i := 0; i < 3; i++ {
		links = append(links, fmt.Sprintf("%s/%d", url, i))
	}

	return links, nil
}

func writeZip(fileName string, w io.Writer, r io.Reader) error {
	zipWriter := zip.NewWriter(w)

	fileWriter, err := zipWriter.Create(fileName)
	if err != nil {
		return errors.WrapOr(err)
	}

	if count, err := io.Copy(fileWriter, r); err != nil {
		return errors.WrapOr(err)
	} else {
		fmt.Println("zipped", count)
	}

	if err := zipWriter.Flush(); err != nil {
		return errors.WrapOr(err)
	}

	if err := zipWriter.Close(); err != nil {
		return errors.WrapOr(err)
	}

	return nil
}

func writeToRemote(reader io.Reader) error {
	// mocking write to remote storage
	size, err := io.Copy(ioutil.Discard, reader)
	if err != nil {
		return err
	}

	fmt.Printf("wrote: %d\n", size)
	return nil
}
19

package main

import (
	"archive/zip"
	"bufio"
	"bytes"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"math/rand"
	"time"

	"github.com/knightso/base/errors"
)

func main() {
	fmt.Println("start")
	start := time.Now()

	// START1 OMIT
	urlsScanner := bufio.NewScanner(newURLsReader())

	buf := bytes.NewBuffer(make([]byte, 0, 2048))

	for urlsScanner.Scan() {
		line := urlsScanner.Text()

		links, err := findLinks(line)
		if err != nil {
			log.Fatal(err)
		}

		for _, link := range links {
			if _, err := fmt.Fprintln(buf, link); err != nil {
				log.Fatal(err)
			}
		}
	}
	time.Sleep(1000)

	if err := urlsScanner.Err(); err != nil {
		log.Fatal(err)
	}
	// END1 OMIT

    fmt.Println(buf.String()) // just log

    zipBuf := bytes.NewBuffer(make([]byte, 0, 2048))

    if err := writeZip("test.zip", zipBuf, buf); err != nil {
        log.Fatal(err)
    }

    if err := writeToRemote(zipBuf); err != nil {
        log.Fatal(err)
    }

	fmt.Println("time: ", time.Now().Sub(start))
}

func newURLsReader() io.Reader {
	// mocking remote storage file
	r, w := io.Pipe()
	go func() {
		defer w.Close()
		for i := 0; i < 1000; i++ {
			if _, err := fmt.Fprintf(w, "http://www.example.com/%d\n", i); err != nil {
				log.Fatal(err)
			}
			time.Sleep(time.Millisecond) // mock latency
		}
	}()
	return r
}

func findLinks(url string) ([]string, error) {
	// mocking remote fetch
	latency := rand.Intn(10) + 1
	time.Sleep(time.Duration(latency) * time.Millisecond)

	links := make([]string, 0, 3)
	for i := 0; i < 3; i++ {
		links = append(links, fmt.Sprintf("%s/%d", url, i))
	}

	return links, nil
}

func writeZip(fileName string, w io.Writer, r io.Reader) error {
	zipWriter := zip.NewWriter(w)

	fileWriter, err := zipWriter.Create(fileName)
	if err != nil {
		return errors.WrapOr(err)
	}

	if count, err := io.Copy(fileWriter, r); err != nil {
		return errors.WrapOr(err)
	} else {
		fmt.Println("zipped", count)
	}

	if err := zipWriter.Flush(); err != nil {
		return errors.WrapOr(err)
	}

	if err := zipWriter.Close(); err != nil {
		return errors.WrapOr(err)
	}

	return nil
}

func writeToRemote(reader io.Reader) error {
	// mocking write to remote storage
	size, err := io.Copy(ioutil.Discard, reader)
	if err != nil {
		return err
	}

	fmt.Printf("wrote: %d\n", size)
	return nil
}
20

Try!

Before:

21

After:

22

Thank you

20 Jul 2019

Daigo Ikeda

Knightso, LLC

Use the left and right arrow keys or click the left and right edges of the page to navigate between slides.
(Press 'H' or navigate to hide this message.)