My External Storage

Jan 14, 2018 - 7 minute read - Comments - Go gRPC

[Go]gRPC Basics: GoからgRPCのストリーミングRPCを理解する

gRPC-goのクイックスタート、サーバーレリフレクションを試したので、次はgRPC Basics: Goをやってみた。クイックスタートを終えているので、そちらで学習できている部分(環境構築手順、基本的な概念)には触れない。 クイックスタートでは出てこなかったStreaming RPCについてまとめた。

gRPC Basics - Go
https://grpc.io/docs/tutorials/basic/go.html

なお、上の公式ページの 元MarkDownとgrpc-goリポジトリ内の 同等の内容のMarkDownを比較すると、grpc-goの中の文書の方が新しいので、そちらを読んだ。 Basicsで使うサンプルコードは以下のリポジトリURLにある。

https://github.com/grpc/grpc-go/tree/master/examples/route_guide

TL;DR

  • gRPCはクライアント/サーバーからの単方向ストリーム、双方向ストリームをサポートしている
  • protoファイルから自動生成された構造体のSend/Recvメソッドによってストリームを操作することができる
  • ストリームからio.EOFが取得されたら送信側からのストリームが終了したことを意味する
  • ストリーミングの方式によって、RPC終了時にストリーム操作用の構造体に定義されたメソッドを実行する場合もある

各ストリーミング形式ごとに定義されたRPC終了時に実行が必要なメソッド

ストリーミング方式 Server側 Client側
server-to-client - -
client-to-server SendAndClose CloseAndRecv
Bidirectional - CloseSend

Streaming RPC

gRPCではクライアント/サーバ(あるいは両方)からデータをストリーミング処理で渡すメソッドの定義も行う事ができる。 protoファイルにある該当部分の定義は以下。

https://github.com/grpc/grpc-go/blob/master/examples/route_guide/routeguide/route_guide.proto

  // A server-to-client streaming RPC.
  //
  // Obtains the Features available within the given Rectangle.  Results are
  // streamed rather than returned at once (e.g. in a response message with a
  // repeated field), as the rectangle may cover a large area and contain a
  // huge number of features.
  rpc ListFeatures(Rectangle) returns (stream Feature) {}

  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  rpc RecordRoute(stream Point) returns (RouteSummary) {}

  // A Bidirectional streaming RPC.
  //
  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

protoファイルの定義としてはストリームにしたいメソッドの引数型あるいは戻り値型の前にstreamと予約語を追加するだけだ。

gRPC Basics: Go

チュートリアルのサンプルコードのRouteGuideの定義からStreamng RPCの使い方を読み取る。

gRPC Basics - Go
https://grpc.io/docs/tutorials/basic/go.html

A server-side streaming RPC

サーバからクライアントに対してストリームでデータを渡すRPC。

サーバサイドの定義

https://github.com/grpc/grpc-go/blob/1cd234627e6f392ade0527d593eb3fe53e832d4a/examples/route_guide/routeguide/route_guide.pb.go#L375

Goの場合は以下のようなメソッドと構造体定義がprotoファイルから生成される。

// プロトコルバッファーの定義は以下。
// rpc ListFeatures(Rectangle) returns (stream Feature) {}
ListFeatures(*Rectangle, RouteGuide_ListFeaturesServer) error

// サーバからクライアントに対してFeature構造体をストリーミングする構造体のインターフェース定義
type RouteGuide_ListFeaturesServer interface {
    Send(*Feature) error
    grpc.ServerStream
}

サーバサイドでストリーミングを送信する実装

https://github.com/grpc/grpc-go/blob/master/examples/gotutorial.md#server-side-streaming-rpc

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
    for _, feature := range s.savedFeatures {
        if inRange(feature.Location, rect) {
            if err := stream.Send(feature); err != nil {
                return err
            }
        }
    }
    return nil
}

ストリーミングは戻り値ではなく、メソッドの引数経由で行うようになる。RouteGuide_ListFeaturesServerSend(*Feature)メソッドを持っているので、このメソッドを用いてデータをストリーミングする。 ストリーミングを終了する場合はreturn nil(何らかの異常で終了する場合は当然return error)を戻して終了する。

クライアントサイドの定義

https://github.com/grpc/grpc-go/blob/1cd234627e6f392ade0527d593eb3fe53e832d4a/examples/route_guide/routeguide/route_guide.pb.go#L232

サーバーからストリームを受け取るクライアント側のメソッド定義は以下。

// プロトコルバッファーの定義は以下。
// rpc ListFeatures(Rectangle) returns (stream Feature) {}
ListFeatures(ctx context.Context, in *Rectangle, opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient, error)

// サーバーからストリーミングされるFeature構造体を受け取る構造体のインターフェース定義
type RouteGuide_ListFeaturesClient interface {
    Recv() (*Feature, error)
    grpc.ClientStream
}

クライアント側のコードはプロトコルバッファーの定義と似たような形で生成される。戻り値のインターフェースにストリームを受け取るRecv() (*Feature, error)メソッドが定義されている。

クライアントサイドでストリーミングを受け取る実装

https://github.com/grpc/grpc-go/blob/master/examples/gotutorial.md#server-side-streaming-rpc-1

サーバ側のRouteGuide_ListFeaturesServer.Sendメソッドでストリーミングされたデータを、RouteGuide_ListFeaturesClient.Recvメソッドで受け取る。実際にメソッドを利用する際は次のような形になる。

rect := &pb.Rectangle{ ... }  // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
    ...
}
for {
    feature, err := stream.Recv()
    if err == io.EOF { // サーバ側でストリーミングが正常に終了(return nil)された
        break
    }
    if err != nil {
        log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
    }
    log.Println(feature)
}

サーバ側でストリーミングが正常に終了(return nil)されたときは、Recvメソッドの戻り値としてio.EOFを受け取ることになる。

A client-side streaming RPC

クライアントからサーバに対してストリームでデータを渡すRPC。

サーバサイドの定義

https://github.com/grpc/grpc-go/blob/1cd234627e6f392ade0527d593eb3fe53e832d4a/examples/route_guide/routeguide/route_guide.pb.go#L380

// プロトコルバッファーの定義は以下。
// rpc RecordRoute(stream Point) returns (RouteSummary) {}
RecordRoute(RouteGuide_RecordRouteServer) error

// クライアントからPoint構造体をストリーミングで受け取る構造体のインターフェース定義
type RouteGuide_RecordRouteServer interface {
    SendAndClose(*RouteSummary) error
    Recv() (*Point, error)
    grpc.ServerStream
}

クライアントからのストリーミングが終了したときはRouteGuide_RecordRouteServer.SendAndCloseを呼ぶ必要がある。

サーバサイドでストリーミングを受け取る実装

https://github.com/grpc/grpc-go/blob/master/examples/gotutorial.md#client-side-streaming-rpc

サーバ側の実装の概要は以下。クライアントからのストリーミングが終了した時、RouteGuide_RecordRouteServer.Recvメソッドの戻り値がio.EOFとなるので、RouteGuide_RecordRouteServer.SendAndCloseメソッドを実行してメソッドを終了する。

for {
    point, err := stream.Recv() // streamはメソッド引数のRouteGuide_RecordRouteServer
    if err == io.EOF {
        endTime := time.Now()
        return stream.SendAndClose(&pb.RouteSummary{
          // Initialize
        })
    }
    if err != nil {
        return err
    }
    // Do something...
}

クライアントサイドの定義

https://github.com/grpc/grpc-go/blob/1cd234627e6f392ade0527d593eb3fe53e832d4a/examples/route_guide/routeguide/route_guide.pb.go#L237

// プロトコルバッファーの定義は以下。
// rpc RecordRoute(stream Point) returns (RouteSummary) {}
RecordRoute(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RecordRouteClient, error)

// クライアントからサーバに対してPoint構造体をストリーミングする構造体のインターフェース定義
type RouteGuide_RecordRouteClient interface {
    Send(*Point) error
    CloseAndRecv() (*RouteSummary, error)
    grpc.ClientStream
}

server-side streaming RPCのときはreturn nilでストリームを終了していたが、client-sideの場合はRouteGuide_RecordRouteClient.CloseAndRecvメソッドでストリームを終了する。

クライアントでストリーミングを送信する実装

https://github.com/grpc/grpc-go/blob/master/examples/gotutorial.md#client-side-streaming-rpc-1

エラー処理などをほぼ省略した実装は以下の通り。明示的にCloseAndRecvメソッドを呼ぶ。

stream, err := client.RecordRoute(context.Background())
for _, point := range points { // pointsはストリーミングするPoint群
    if err := stream.Send(point); err != nil {
        log.Fatalf("%v.Send(%v) = %v", stream, point, err)
    }
}
reply, err := stream.CloseAndRecv()

A bidirectional streaming RPC

サーバ/クライアント双方でストリーミングを行いながらやりとりするRPC

サーバサイドの定義

https://github.com/grpc/grpc-go/blob/1cd234627e6f392ade0527d593eb3fe53e832d4a/examples/route_guide/routeguide/route_guide.pb.go#L385

// プロトコルバッファーの定義は以下。
// rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
RouteChat(RouteGuide_RouteChatServer) error

// 双方向ストリーミングを行う構造体のインターフェース定義
type RouteGuide_RouteChatServer interface {
    Send(*RouteNote) error
    Recv() (*RouteNote, error)
    grpc.ServerStream
}

ひとつの構造体で送信用、受信用のストリームの送受信が行える。

双方向ストリーミングを行うサーバ側の実装

https://github.com/grpc/grpc-go/blob/master/examples/gotutorial.md#bidirectional-streaming-rpc

for {
    in, err := stream.Recv() // streamはメソッドの引数のRouteNote
      if err == io.EOF {
          return nil
      }
      if err != nil {
          return err
      }
      key := serialize(in.Location)
                  ... // look for notes to be sent to client
      for _, note := range s.routeNotes[key] {
          if err := stream.Send(note); err != nil {
              return err
          }
      }
}

RecvメソッドとSendメソッドでストリームの送受信を行う点は変わらない。また、io.EOFをクライアントから受け取ったあとにSendAndCloseメソッドの類を実行する必要もない。

クライアントサイドの定義

https://github.com/grpc/grpc-go/blob/1cd234627e6f392ade0527d593eb3fe53e832d4a/examples/route_guide/routeguide/route_guide.pb.go#L242

// プロトコルバッファーの定義は以下。
// rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
RouteChat(ctx context.Context, opts ...grpc.CallOption) (RouteGuide_RouteChatClient, error)

// 双方向ストリーミングを行う構造体のインターフェース定義
type RouteGuide_RouteChatClient interface {
    Send(*RouteNote) error
    Recv() (*RouteNote, error)
    grpc.ClientStream
}

サーバー側同様、送信用、受信用のストリームの操作がひとつのインターフェースにまとめられている。

双方向ストリーミングを行うクライアント側の実装

https://github.com/grpc/grpc-go/blob/master/examples/gotutorial.md#bidirectional-streaming-rpc-1

stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
          // read done.
          close(waitc)
          return
        }
        if err != nil {
            log.Fatalf("Failed to receive a note : %v", err)
        }
        log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
    }
}()
for _, note := range notes {
    if err := stream.Send(note); err != nil {
        log.Fatalf("Failed to send a note: %v", err)
    }
}
stream.CloseSend()
<-waitc

基本的な使い方は今まで同様。最後にCloseSendメソッドを実行して終わるのがお作法らしい。

チュートリアルを終えて

protoファイルのひとつの定義からサーバー用、クライアント用のインターフェースや実装が複数自動生成されるため、コードだけ見ても使い方がよくわからなかったのだが、今回のBasicsでだいぶわかった気がする。ただ、このチュートリアルもRPCの引数が単一のstreamだけのサンプルだったりするので、複数引数で定義すると操作方法が変わるかもしれない(あるいは定義できない?)

gRPC関連の記事

関連記事