GAE/Go 事例紹介

公共オープンデータバス配信基盤 Opentrans.it

Daigo Ikeda

Knightso, LLC

アジェンダ

Opentrans.it紹介

Opentrans.itとは

Opentrans.itとは

実装概要

モデル設計

GTFSデータはDatastore(KVS)に保存する。
GTFSの各fileをモデル(Kind)に落とす。

モデル設計 - ER図

エンティティ設計のコツ

エンティティグループ図

エンティティグループの作り方

datastore.Keyに親エンティティのKeyを設定するだけ

func NewRouteIncompleteKey(c appengine.Context, agencyKey *datastore.Key) *datastore.Key {
    return datastore.NewIncompleteKey(c, KIND_ROUTE, agencyKey)
}

トランザクション

普通のトランザクション(Tx)

err = datastore.RunInTransaction(c, func(c appengine.Context) error {
    ...snip...
}, nil)

クロスグループトランザクション(XGTx)

err = datastore.RunInTransaction(c, func(c appengine.Context) error {
    ...snip...
}, &datastore.TransactionOptions{XG: true})

注意!
RunInTransactionはcommitで失敗すると3回までリトライする為、処理を冪等に記述する必要がある

トランザクション&タスクキュー

タスクキューへの追加をトランザクションに含める事で結果整合性を保証する。

var PutDocStopFunc = delay.Func("PutDocStopFunc", func(c appengine.Context, stopKey *datastore.Key, version int) error {
    ... save stop index for Search API ...
})
err := datastore.RunInTransaction(c, func(c appengine.Context) error {
    if err := ds.Put(c, p.Stop); err != nil {
        return errors.WrapOr(err)
    }
    ...snip...
    t, err := PutDocStopFunc.Task(c, p.Stop.Key, p.Stop.Version)
    if err != nil {
        return err
    }
    if _, err := taskqueue.Add(c, t, "queue1"); err != nil {
        return err
    }
    return nil
}, &datastore.TransactionOptions{XG: true})

タスクのAddはトランザクションで保証され、処理はリトライで保証される。

アーキテクチャ

今回はGAE/Goのハナシなのでバックエンドのみ紹介。。

Martini

Classy web development in Go

リフレクションバリバリで重いので最近嫌われている?

他には・・・

GAE & Martini

GAEもMartiniもnet/http準拠なので親和性が高い

シンプルな例

func init() {
    m := martini.Classic()
    m.Get("/hello/:name", func(params martini.Params, w http.ResponseWriter) string {
        w.Header().Set("Content-Type", "application/json")
        return "Hello! " + params["name"]
    })
    http.Handle("/", m)
}

Martini - URLルーティング

RESTなAPIを書くのに便利!

m.Group("/api/agencies", func(r martini.Router) {
    r.Get("", queryAgencies)
    r.Get("/:agencyId", getAgency)
    r.Post("", registerAgency)
    r.Put("/:agencyId", updateAgency)
    r.Delete("/:agencyId", deleteAgency)
})

パラメータの取得

func updateAgency(params martini.Params, rnd render.Render, c appengine.Context, w http.ResponseWriter, r *http.Request) {
    keyID, err := strconv.ParseInt(params["agencyId"], 10, 64)
    ...snip...
}

Martini - ミドルウェア

リクエストの前後に共通処理を埋め込む

r := martini.NewRouter()
m := martini.New()

m.Use(func(c martini.Context, r *http.Request) {
    ... pre-process ...
    c.Next()
    ... post-process ...
})

Martini - Dependency Injection

Handlerの引数はMartiniが型で判断してInjectしてくれる!

カスタムコンポーネントを使用したい場合はc.Mapで登録

m.Use(func(c martini.Context, r *http.Request) {
    ac := appengine.NewContext(r)
    c.Map(ac)
})
func someHandler(c appengine.Context, w http.ResponseWriter, r *http.Request) {
    ...snip...
}

GTFSフィード

パスの路線や便、時刻表データを配信

フォーマットは複数のcsvファイルをzipでまとめたもの

GTFSフィード - CSV

goの標準パッケージencoding/csvを使用

writer := csv.NewWriter(w)
 
titles := []string{
    "agency_id",
    "agency_name",
    "agency_url",
    "agency_timezone",
}
 
if err := writer.Write(titles); err != nil {
    return errors.WrapOr(err)
}

GTFSフィード - CSV

if err := mmaps.Agencies.ForEach(func(k *datastore.Key, v interface{}) error {
    agency := v.(*model.Agency)
    columns := []string{
        agency.ID,
        agency.Name,
        agency.URL,
    }

    err := writer.Write(columns)
    if err != nil {
        return errors.WrapOr(err)
    }

    return nil
}); err != nil {
    return errors.WrapOr(err)
}

writer.Flush()
if err := writer.Error(); err != nil {
    return errors.WrapOr(err)
}

GTFSフィード - ZIP

goの標準パッケージarchive/zipを使用

zipw := zip.NewWriter(w)

if feed, err := zipw.Create(GTFS_FEED_FILENAME_AGENCY); err != nil {
    return errors.WrapOr(err)
} else if err := generateCsvForAgency(mmaps, feed); err != nil {
    return errors.WrapOr(err)
}

if err := zipw.Close(); err != nil {
    return errors.WrapOr(err)
}

GTFS - データロード

GTFS - データロード

まず結果を格納する入れ物を用意

type Maps struct {
    Agencies      *ds.SyncMap
    Routes        *ds.SyncMap
    TripGroups    *ds.SyncMap
    Trips         *ds.SyncMap
    Stops         *ds.SyncMap
    Services      *ds.SyncMap
    Calendars     *ds.SyncMap
    CalendarDates *ds.SyncMap
    Shapes        *ds.SyncMap
}

GTFS - データロード

func (mmaps Maps) LoadAgencies(c appengine.Context, agencyKeys []*datastore.Key) chan error {
    ch := make(chan error)
    go func() {
        agencies := make([]*Agency, len(agencyKeys))
        for i := 0; i < len(agencyKeys); i++ {
            agencies[i] = new(Agency)
        }
        if err := ds.GetMulti(c, agencyKeys, agencies); err != nil {
            ch <- errors.WrapOr(err)
            return
        }
    
        var rerrChs [](chan error)
        for _, agency := range agencies {
            mmaps.Agencies.Put(agency.Key, agency)
            rerrChs = append(rerrChs, mmaps.LoadRoutes(c, agency.Routes))
            rerrChs = append(rerrChs, mmaps.LoadServices(c, agency.Services))
        }
    
        ch <- toErrors(rerrChs)
        return
    }()
    return ch
}

GTFS - データロード

func (mmaps Maps) LoadRoutes(c appengine.Context, routeKeys []*datastore.Key) chan error {
    ch := make(chan error)
    go func() {
        routes := make([]*Route, len(routeKeys))
        for i := 0; i < len(routeKeys); i++ {
            routes[i] = new(Route)
        }
        if err := ds.GetMulti(c, routeKeys, routes); err != nil {
            ch <- errors.WrapOr(err)
            return
        }
    
        rerrChs := make([](chan error), 0, len(routes))
        for _, route := range routes {
            mmaps.Routes.Put(route.Key, route)
            rerrChs = append(rerrChs, mmaps.LoadTripGroups(c, route.TripGroups))
        }
    
        ch <- toErrors(rerrChs)
        return
    }()
    return ch
}

GTFSリアルタイムフィード

バスの現在地情報を配信

フォーマットはProgocol Buffers

GTFSリアルタイム - ProtocolBuffers

下記ライブラリを使用

https://github.com/golang/protobuf

gtfs-realtime.protoをコンパイル

GTFSリアルタイムフィード

一部抜粋

message = &tsrt.FeedMessage{
    Header: &tsrt.FeedHeader{
        GtfsRealtimeVersion: proto.String("1.0"),
        Incrementality:      &incrementality,
        Timestamp:           proto.Uint64(uint64(time.Now().Unix())),
    },
}
entity := tsrt.FeedEntity{
    Id:        proto.String(fmt.Sprintf("%d", actualTrip.Device.IntID())),
    IsDeleted: proto.Bool(false),
}
data, err = proto.Marshal(message)
if err != nil {
    return nil, false, errors.WrapOr(err)
}

GTFSリアルタイムフィード

一般ユーザー画面(google maps)からポーリングしているので、edge cacheを効かせている

w.Header().Set("Cache-Control", fmt.Sprintf("Cache-Control: public, max-age=%d", cacheSec))
w.WriteHeader(200)

if _, err := w.Write(data); err != nil {
    return errors.WrapOr(err)
}

バス停検索

type docStop struct {
    ID                 search.Atom
    IDBigram           string
    Name               search.Atom
    NameBigram         string
    Yomi               search.Atom
    YomiBigram         string
    Desc               string
    Location           appengine.GeoPoint
    LocationType       search.Atom
    WheelchairBoarding search.Atom
    UpdatedAt          time.Time
    Deleted            search.Atom
}

バス停検索 - Index登録

index, err := search.Open(model.KIND_STOP)
if err != nil {
    return errors.WrapOr(err)
}
docID := strconv.FormatInt(stopKey.IntID(), 10)
_, err = index.Put(c, docID, newDocStop(&stop))
if err != nil {
    return errors.WrapOr(err)
}

バス停検索- Bigram

文字列を2文字ずつに分割してIndexに登録

作成したngramを空白区切りでSearch APIのInxexに突っ込めば勝手にインデックス作成してくれる

バス停検索

searchOptions := search.SearchOptions{
    Limit:   page * pageLen,
    IDsOnly: true,
    Sort: &search.SortOptions{
        Expressions: []search.SortExpression{
            search.SortExpression{
                Expr:    "Yomi",
                Reverse: true,
            },
        },
    },
}

var stationKeys []*datastore.Key
t := index.Search(c, queryString, &searchOptions)
for {
    id, err := t.Next(nil)
    stationKeyID, _ := strconv.ParseInt(id, 10, 64)
    stationKeys = append(stationKeys, model.NewStopKey(c, stationKeyID))
}

バス停検索 - GeoLocation

GeoLocationを用いて位置情報検索が可能

↓任意の緯度経度から任意の距離以内にあるバス停を検索する

queryString := fmt.Sprintf("distance(Location, geopoint(%f,%f)) < %d", lat, lon, rangeRadius)

ログの保存

バスが送信してくる現在値やバス停位置情報をログに保存する。

1. GAEフロントエンドインスタンスがリクエストを受け取る
2. プルタスクキュー経由でGAEバックエンドインスタンスにログを渡す
3. BigQueryにstreaming insert

詳細はまたの機会に(^_^;

Thank you

Daigo Ikeda

Knightso, LLC