実践!Go並行処理!

20 Jul 2019

Daigo Ikeda

Knightso, LLC

Profile

Daigo Ikeda
@hogedigo

Knightso, LLC
http://www.knightso.co.jp/
Shizuoka, JAPAN

2

本発表の趣旨

3

流れ

4

Beforehand

GorountieとChannelをおさらい

5

Let's start!

6

お題1: 外部IO

API呼び出し関数(モック)

7

メイン処理

8

Try!

Before:

9

After 1: WaitGroupを利用した場合

Adter 2: ErrGroupを利用した場合

10

お題2: たくさん外部IO

API呼び出し関数(モック)

11

メイン処理

12

Try!

Before:

13

After: Channelを用いたFutureパターン

14

お題3: 同時実行数の抑制

メイン処理

15

Try!

Before:

16

After: バッファ付きchannelを使ったセマフォ

参考

17

お題4: 巨大なデータの変換処理

18

メイン処理

19

package main

import (
	"archive/zip"
	"bufio"
	"bytes"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"math/rand"
	"time"

	"github.com/knightso/base/errors"
)

func main() {
	fmt.Println("start")
	start := time.Now()

	// START1 OMIT
	urlsScanner := bufio.NewScanner(newURLsReader())

	buf := bytes.NewBuffer(make([]byte, 0, 2048))

	for urlsScanner.Scan() {
		line := urlsScanner.Text()

		links, err := findLinks(line)
		if err != nil {
			log.Fatal(err)
		}

		for _, link := range links {
			if _, err := fmt.Fprintln(buf, link); err != nil {
				log.Fatal(err)
			}
		}
	}
	time.Sleep(1000)

	if err := urlsScanner.Err(); err != nil {
		log.Fatal(err)
	}
	// END1 OMIT

    fmt.Println(buf.String()) // just log

    zipBuf := bytes.NewBuffer(make([]byte, 0, 2048))

    if err := writeZip("test.zip", zipBuf, buf); err != nil {
        log.Fatal(err)
    }

    if err := writeToRemote(zipBuf); err != nil {
        log.Fatal(err)
    }

	fmt.Println("time: ", time.Now().Sub(start))
}

func newURLsReader() io.Reader {
	// mocking remote storage file
	r, w := io.Pipe()
	go func() {
		defer w.Close()
		for i := 0; i < 1000; i++ {
			if _, err := fmt.Fprintf(w, "http://www.example.com/%d\n", i); err != nil {
				log.Fatal(err)
			}
			time.Sleep(time.Millisecond) // mock latency
		}
	}()
	return r
}

func findLinks(url string) ([]string, error) {
	// mocking remote fetch
	latency := rand.Intn(10) + 1
	time.Sleep(time.Duration(latency) * time.Millisecond)

	links := make([]string, 0, 3)
	for i := 0; i < 3; i++ {
		links = append(links, fmt.Sprintf("%s/%d", url, i))
	}

	return links, nil
}

func writeZip(fileName string, w io.Writer, r io.Reader) error {
	zipWriter := zip.NewWriter(w)

	fileWriter, err := zipWriter.Create(fileName)
	if err != nil {
		return errors.WrapOr(err)
	}

	if count, err := io.Copy(fileWriter, r); err != nil {
		return errors.WrapOr(err)
	} else {
		fmt.Println("zipped", count)
	}

	if err := zipWriter.Flush(); err != nil {
		return errors.WrapOr(err)
	}

	if err := zipWriter.Close(); err != nil {
		return errors.WrapOr(err)
	}

	return nil
}

func writeToRemote(reader io.Reader) error {
	// mocking write to remote storage
	size, err := io.Copy(ioutil.Discard, reader)
	if err != nil {
		return err
	}

	fmt.Printf("wrote: %d\n", size)
	return nil
}
20

Try!

Before:

21

After:

22

Thank you

20 Jul 2019

Daigo Ikeda

Knightso, LLC

Use the left and right arrow keys or click the left and right edges of the page to navigate between slides.
(Press 'H' or navigate to hide this message.)