GAE/Go 事例紹介

公共オープンデータバス配信基盤 Opentrans.it

15:00 28 Feb 2015

Daigo Ikeda

Knightso, LLC

アジェンダ

2

Opentrans.it紹介

3

Opentrans.itとは

4

Opentrans.itとは

5

実装概要

6

モデル設計

GTFSデータはDatastore(KVS)に保存する。
GTFSの各fileをモデル(Kind)に落とす。

7

モデル設計 - ER図

8

エンティティ設計のコツ

9

エンティティグループ図

10

エンティティグループの作り方

datastore.Keyに親エンティティのKeyを設定するだけ

func NewRouteIncompleteKey(c appengine.Context, agencyKey *datastore.Key) *datastore.Key {
    return datastore.NewIncompleteKey(c, KIND_ROUTE, agencyKey)
}
11

トランザクション

普通のトランザクション(Tx)

err = datastore.RunInTransaction(c, func(c appengine.Context) error {
    ...snip...
}, nil)

クロスグループトランザクション(XGTx)

err = datastore.RunInTransaction(c, func(c appengine.Context) error {
    ...snip...
}, &datastore.TransactionOptions{XG: true})

注意!
RunInTransactionはcommitで失敗すると3回までリトライする為、処理を冪等に記述する必要がある

12

トランザクション&タスクキュー

タスクキューへの追加をトランザクションに含める事で結果整合性を保証する。

var PutDocStopFunc = delay.Func("PutDocStopFunc", func(c appengine.Context, stopKey *datastore.Key, version int) error {
    ... save stop index for Search API ...
})
err := datastore.RunInTransaction(c, func(c appengine.Context) error {
    if err := ds.Put(c, p.Stop); err != nil {
        return errors.WrapOr(err)
    }
    ...snip...
    t, err := PutDocStopFunc.Task(c, p.Stop.Key, p.Stop.Version)
    if err != nil {
        return err
    }
    if _, err := taskqueue.Add(c, t, "queue1"); err != nil {
        return err
    }
    return nil
}, &datastore.TransactionOptions{XG: true})

タスクのAddはトランザクションで保証され、処理はリトライで保証される。

13

アーキテクチャ

今回はGAE/Goのハナシなのでバックエンドのみ紹介。。

14

Martini

Classy web development in Go

リフレクションバリバリで重いので最近嫌われている?

他には・・・

15

GAE & Martini

GAEもMartiniもnet/http準拠なので親和性が高い

シンプルな例

func init() {
    m := martini.Classic()
    m.Get("/hello/:name", func(params martini.Params, w http.ResponseWriter) string {
        w.Header().Set("Content-Type", "application/json")
        return "Hello! " + params["name"]
    })
    http.Handle("/", m)
}
16

Martini - URLルーティング

RESTなAPIを書くのに便利!

m.Group("/api/agencies", func(r martini.Router) {
    r.Get("", queryAgencies)
    r.Get("/:agencyId", getAgency)
    r.Post("", registerAgency)
    r.Put("/:agencyId", updateAgency)
    r.Delete("/:agencyId", deleteAgency)
})

パラメータの取得

func updateAgency(params martini.Params, rnd render.Render, c appengine.Context, w http.ResponseWriter, r *http.Request) {
    keyID, err := strconv.ParseInt(params["agencyId"], 10, 64)
    ...snip...
}
17

Martini - ミドルウェア

リクエストの前後に共通処理を埋め込む

r := martini.NewRouter()
m := martini.New()

m.Use(func(c martini.Context, r *http.Request) {
    ... pre-process ...
    c.Next()
    ... post-process ...
})
18

Martini - Dependency Injection

Handlerの引数はMartiniが型で判断してInjectしてくれる!

カスタムコンポーネントを使用したい場合はc.Mapで登録

m.Use(func(c martini.Context, r *http.Request) {
    ac := appengine.NewContext(r)
    c.Map(ac)
})
func someHandler(c appengine.Context, w http.ResponseWriter, r *http.Request) {
    ...snip...
}
19

GTFSフィード

パスの路線や便、時刻表データを配信

フォーマットは複数のcsvファイルをzipでまとめたもの

20

GTFSフィード - CSV

goの標準パッケージencoding/csvを使用

writer := csv.NewWriter(w)
 
titles := []string{
    "agency_id",
    "agency_name",
    "agency_url",
    "agency_timezone",
}
 
if err := writer.Write(titles); err != nil {
    return errors.WrapOr(err)
}
21

GTFSフィード - CSV

if err := mmaps.Agencies.ForEach(func(k *datastore.Key, v interface{}) error {
    agency := v.(*model.Agency)
    columns := []string{
        agency.ID,
        agency.Name,
        agency.URL,
    }

    err := writer.Write(columns)
    if err != nil {
        return errors.WrapOr(err)
    }

    return nil
}); err != nil {
    return errors.WrapOr(err)
}

writer.Flush()
if err := writer.Error(); err != nil {
    return errors.WrapOr(err)
}
22

GTFSフィード - ZIP

goの標準パッケージarchive/zipを使用

zipw := zip.NewWriter(w)

if feed, err := zipw.Create(GTFS_FEED_FILENAME_AGENCY); err != nil {
    return errors.WrapOr(err)
} else if err := generateCsvForAgency(mmaps, feed); err != nil {
    return errors.WrapOr(err)
}

if err := zipw.Close(); err != nil {
    return errors.WrapOr(err)
}
23

GTFS - データロード

24

GTFS - データロード

まず結果を格納する入れ物を用意

type Maps struct {
    Agencies      *ds.SyncMap
    Routes        *ds.SyncMap
    TripGroups    *ds.SyncMap
    Trips         *ds.SyncMap
    Stops         *ds.SyncMap
    Services      *ds.SyncMap
    Calendars     *ds.SyncMap
    CalendarDates *ds.SyncMap
    Shapes        *ds.SyncMap
}
25

GTFS - データロード

func (mmaps Maps) LoadAgencies(c appengine.Context, agencyKeys []*datastore.Key) chan error {
    ch := make(chan error)
    go func() {
        agencies := make([]*Agency, len(agencyKeys))
        for i := 0; i < len(agencyKeys); i++ {
            agencies[i] = new(Agency)
        }
        if err := ds.GetMulti(c, agencyKeys, agencies); err != nil {
            ch <- errors.WrapOr(err)
            return
        }
    
        var rerrChs [](chan error)
        for _, agency := range agencies {
            mmaps.Agencies.Put(agency.Key, agency)
            rerrChs = append(rerrChs, mmaps.LoadRoutes(c, agency.Routes))
            rerrChs = append(rerrChs, mmaps.LoadServices(c, agency.Services))
        }
    
        ch <- toErrors(rerrChs)
        return
    }()
    return ch
}
26

GTFS - データロード

func (mmaps Maps) LoadRoutes(c appengine.Context, routeKeys []*datastore.Key) chan error {
    ch := make(chan error)
    go func() {
        routes := make([]*Route, len(routeKeys))
        for i := 0; i < len(routeKeys); i++ {
            routes[i] = new(Route)
        }
        if err := ds.GetMulti(c, routeKeys, routes); err != nil {
            ch <- errors.WrapOr(err)
            return
        }
    
        rerrChs := make([](chan error), 0, len(routes))
        for _, route := range routes {
            mmaps.Routes.Put(route.Key, route)
            rerrChs = append(rerrChs, mmaps.LoadTripGroups(c, route.TripGroups))
        }
    
        ch <- toErrors(rerrChs)
        return
    }()
    return ch
}
27

GTFSリアルタイムフィード

バスの現在地情報を配信

フォーマットはProgocol Buffers

28

GTFSリアルタイム - ProtocolBuffers

下記ライブラリを使用

https://github.com/golang/protobuf

gtfs-realtime.protoをコンパイル

29

GTFSリアルタイムフィード

一部抜粋

message = &tsrt.FeedMessage{
    Header: &tsrt.FeedHeader{
        GtfsRealtimeVersion: proto.String("1.0"),
        Incrementality:      &incrementality,
        Timestamp:           proto.Uint64(uint64(time.Now().Unix())),
    },
}
entity := tsrt.FeedEntity{
    Id:        proto.String(fmt.Sprintf("%d", actualTrip.Device.IntID())),
    IsDeleted: proto.Bool(false),
}
data, err = proto.Marshal(message)
if err != nil {
    return nil, false, errors.WrapOr(err)
}
30

GTFSリアルタイムフィード

一般ユーザー画面(google maps)からポーリングしているので、edge cacheを効かせている

w.Header().Set("Cache-Control", fmt.Sprintf("Cache-Control: public, max-age=%d", cacheSec))
w.WriteHeader(200)

if _, err := w.Write(data); err != nil {
    return errors.WrapOr(err)
}
31

バス停検索

type docStop struct {
    ID                 search.Atom
    IDBigram           string
    Name               search.Atom
    NameBigram         string
    Yomi               search.Atom
    YomiBigram         string
    Desc               string
    Location           appengine.GeoPoint
    LocationType       search.Atom
    WheelchairBoarding search.Atom
    UpdatedAt          time.Time
    Deleted            search.Atom
}
32

バス停検索 - Index登録

index, err := search.Open(model.KIND_STOP)
if err != nil {
    return errors.WrapOr(err)
}
docID := strconv.FormatInt(stopKey.IntID(), 10)
_, err = index.Put(c, docID, newDocStop(&stop))
if err != nil {
    return errors.WrapOr(err)
}
33

バス停検索- Bigram

文字列を2文字ずつに分割してIndexに登録

作成したngramを空白区切りでSearch APIのInxexに突っ込めば勝手にインデックス作成してくれる

34

バス停検索

searchOptions := search.SearchOptions{
    Limit:   page * pageLen,
    IDsOnly: true,
    Sort: &search.SortOptions{
        Expressions: []search.SortExpression{
            search.SortExpression{
                Expr:    "Yomi",
                Reverse: true,
            },
        },
    },
}

var stationKeys []*datastore.Key
t := index.Search(c, queryString, &searchOptions)
for {
    id, err := t.Next(nil)
    stationKeyID, _ := strconv.ParseInt(id, 10, 64)
    stationKeys = append(stationKeys, model.NewStopKey(c, stationKeyID))
}
35

バス停検索 - GeoLocation

GeoLocationを用いて位置情報検索が可能

↓任意の緯度経度から任意の距離以内にあるバス停を検索する

queryString := fmt.Sprintf("distance(Location, geopoint(%f,%f)) < %d", lat, lon, rangeRadius)
36

ログの保存

バスが送信してくる現在値やバス停位置情報をログに保存する。

1. GAEフロントエンドインスタンスがリクエストを受け取る
2. プルタスクキュー経由でGAEバックエンドインスタンスにログを渡す
3. BigQueryにstreaming insert

詳細はまたの機会に(^_^;

37

Thank you

15:00 28 Feb 2015

Daigo Ikeda

Knightso, LLC

Use the left and right arrow keys or click the left and right edges of the page to navigate between slides.
(Press 'H' or navigate to hide this message.)