凌云的博客

行胜于言

gRPC go 基础教程[译]

分类:grpc| 发布时间:2025-01-06 15:07:00

概述

原文: Basics tutorial

这个教程为 Go 程序员提供了一个关于如何使用 gRPC 的基础入门。

通过本教程的示例,您将学习如何:

  • 在 .proto 文件中定义一个服务。
  • 使用协议缓冲编译器生成服务器和客户端代码。
  • 使用 Go 的 gRPC API 编写一个简单的客户端和服务器来为您的服务提供支持。

假设您已经阅读了 gRPC 入门 并对 protocol buffers 有所了解。

请注意,本教程中的示例使用的是 proto3 版本的 protocol buffers:您可以在 proto3 语言指南Go 生成代码指南 中了解更多详细信息。

为什么使用 gRPC?

我们的示例是一个简单的路线映射应用,允许客户端获取路线上的特征信息、创建路线总结,并与服务器和其他客户端交换路线信息,如交通更新。

使用 gRPC,我们可以在 .proto 文件中一次性定义我们的服务,并生成任何支持 gRPC 的语言的客户端和服务器,这些客户端和服务器可以在从大型数据中心内部的服务器到您自己的平板电脑等各种环境中运行——gRPC 会为您处理不同语言和环境之间通信的所有复杂性。

我们还可以享受使用 protocol buffers 的所有优点,包括高效的序列化、简洁的 IDL 以及易于更新的接口。

安装

您应该已经安装了生成客户端和服务器接口代码所需的工具——如果还没有,请参阅 快速入门 中的 先决条件部分,按照说明进行操作。

获取示例代码

示例代码是 grpc-go 仓库的一部分。

  1. 您可以将仓库 下载为 zip 文件并解压,或者克隆仓库:
git clone -b v1.69.2 --depth 1 https://github.com/grpc/grpc-go
  1. 切换到示例目录:
cd grpc-go/examples/route_guide

定义服务

首先(正如您在 gRPC 入门中所了解的)我们是使用 protocol buffers 定义 gRPC 服务以及方法的请求和响应类型。 完整的 .proto 文件,请参见 routeguide/route_guide.proto

要定义一个服务,您需要在 .proto 文件中指定一个命名的 服务

service RouteGuide {
   ...
}

然后,您在服务定义内部定义 rpc 方法,并指定它们的请求和响应类型。 gRPC 允许定义四种类型的服务方法,所有这些方法都在 RouteGuide 服务中使用:

  • 简单 RPC:客户端使用存根向服务器发送请求,并等待响应返回,类似于正常的函数调用。
// 获取给定位置的特征。
rpc GetFeature(Point) returns (Feature) {}
  • 服务器端流式 rpc:客户端向服务器发送请求,并获取一个流以读取一系列的消息。 客户端从返回的流中读取,直到没有更多的消息。 正如我们示例中所见,您通过在响应类型前面放置 stream 关键字来指定服务器端流式方法。
// 获取给定矩形范围内的特征。结果是流式返回的,而不是一次性返回(例如,作为具有重复字段的响应消息),
// 因为矩形可能覆盖较大区域,并包含大量特征。
rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • 客户端流式 rpc:客户端写入一系列消息并将其发送到服务器,同样使用提供的流。 一旦客户端完成消息的写入,它会等待服务器读取所有消息并返回响应。 您通过在请求类型前面放置 stream 关键字来指定客户端流式方法。
// 接受正在遍历的路线上的一系列点流,在遍历完成后返回路线总结。
rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 双向流式 rpc:双方都使用读写流发送一系列消息。 两个流独立操作,因此客户端和服务器可以按任意顺序进行读写。 例如,服务器可以等待接收所有客户端的消息后再写入响应,也可以交替读取消息后写入消息,或者进行其他组合的读写操作。 每个流中的消息顺序得以保留。 您通过在请求类型和响应类型前面都放置 stream 关键字来指定这种方法。
// 接受在遍历路线时发送的 RouteNotes 流,同时接收其他 RouteNotes(例如来自其他用户)。
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

我们的 .proto 文件还包含所有请求和响应类型的 protocol buffer 消息类型定义——例如,Point 消息类型的定义:

// 点表示为纬度和经度对,采用 E7 表示法(度乘以 10**7,并四舍五入到最接近的整数)。
// 纬度应在 +/- 90 度之间,经度应在 +/- 180 度之间(包括边界值)。
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器代码

接下来,我们需要从 .proto 服务定义中生成 gRPC 客户端和服务器接口。 我们使用 protocol buffer 编译器 protoc 配合专门的 gRPC Go 插件来完成这一过程。 这与我们在 快速入门 中所做的类似。

examples/route_guide 目录下,运行以下命令:

$ protoc --go_out=. --go_opt=paths=source_relative \
    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
    routeguide/route_guide.proto

执行此命令后,会在 routeguide 目录下生成以下文件:

  • route_guide.pb.go:包含所有 protocol buffer 代码,用于填充、序列化和检索请求和响应消息类型。
  • route_guide_grpc.pb.go:包含以下内容:
    • 一个接口类型(或存根),客户端可以调用此接口,使用在 RouteGuide 服务中定义的方法。
    • 一个接口类型,服务器可以实现,包含在 RouteGuide 服务中定义的方法。

创建服务器

首先,让我们看看如何创建一个 RouteGuide 服务器。 如果您只对创建 gRPC 客户端感兴趣,可以跳过本节,直接前往 创建客户端(尽管您可能还是会觉得这部分内容很有趣!)。

要使我们的 RouteGuide 服务正常工作,需要完成两个部分:

  • 实现从服务定义生成的服务接口:执行服务的实际“工作”。
  • 运行一个 gRPC 服务器:监听来自客户端的请求,并将它们分发到正确的服务实现中。

您可以在 server/server.go 中找到我们的 RouteGuide 服务器示例。 接下来,让我们更仔细地了解它是如何工作的。

实现 RouteGuide

如您所见,我们的服务器有一个 routeGuideServer 结构体类型,它实现了生成的 RouteGuideServer 接口:

type routeGuideServer struct {
    ...
}
...

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
    ...
}
...

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
    ...
}
...

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
    ...
}
...

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
    ...
}
...
简单 RPC

routeGuideServer 实现了我们所有的服务方法。 首先让我们看看最简单的类型——GetFeature,它只是从客户端获取一个 Point,并返回数据库中对应的特征信息作为 Feature

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
  for _, feature := range s.savedFeatures {
    if proto.Equal(feature.Location, point) {
      return feature, nil
    }
  }
  // No feature was found, return an unnamed feature
  return &pb.Feature{Location: point}, nil
}

这个方法接收一个用于 RPC 的 context 对象和客户端的 Point protocol buffer 请求。 它返回一个 Feature 协议缓冲对象,其中包含响应信息,以及一个 error

在方法中,我们使用适当的信息填充 Feature,然后返回它并附带 nil 错误,告诉 gRPC 我们已经完成了 RPC 处理,可以将 Feature 返回给客户端。

服务器端流式 RPC

现在让我们来看一个流式 RPC 示例。 ListFeatures 是一个服务器端流式 RPC,我们需要向客户端发送多个 Feature

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
  for _, feature := range s.savedFeatures {
    if inRange(feature.Location, rect) {
      if err := stream.Send(feature); err != nil {
        return err
      }
    }
  }
  return nil
}

如您所见,这次在方法参数中,我们没有简单的请求和响应对象,而是接收一个请求对象(客户端希望查找 FeatureRectangle)以及一个特殊的 RouteGuide_ListFeaturesServer 对象,用于写入我们的响应。

在该方法中,我们填充所需数量的 Feature 对象,并使用 RouteGuide_ListFeaturesServerSend() 方法将它们发送给客户端。 最后,与简单 RPC 一样,我们返回一个 nil 错误,通知 gRPC 我们已完成响应的发送。

如果在调用过程中发生任何错误,我们会返回一个非空错误;gRPC 层会将其转换为适当的 RPC 状态,并将其发送到网络中。

客户端流式 RPC

现在让我们来看一个稍微复杂一点的例子:客户端流式 RPC 方法 RecordRoute。 在这个方法中,我们从客户端接收一个 Point 流,并返回一个包含旅程信息的 RouteSummary

与之前的示例不同,这次该方法没有请求参数。 取而代之的是,它接收一个 RouteGuide_RecordRouteServer 流对象,服务器可以使用该对象读取和写入消息:它可以使用 Recv() 方法接收客户端消息,使用 SendAndClose() 方法返回单一响应。

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
  var pointCount, featureCount, distance int32
  var lastPoint *pb.Point
  startTime := time.Now()
  for {
    point, err := stream.Recv()
    if err == io.EOF {
      endTime := time.Now()
      return stream.SendAndClose(&pb.RouteSummary{
        PointCount:   pointCount,
        FeatureCount: featureCount,
        Distance:     distance,
        ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
      })
    }
    if err != nil {
      return err
    }
    pointCount++
    for _, feature := range s.savedFeatures {
      if proto.Equal(feature.Location, point) {
        featureCount++
      }
    }
    if lastPoint != nil {
      distance += calcDistance(lastPoint, point)
    }
    lastPoint = point
  }
}

在方法体内,我们使用 RouteGuide_RecordRouteServerRecv() 方法,反复读取客户端的请求到一个请求对象(在这个例子中是 Point),直到没有更多的消息为止:服务器需要在每次调用后检查 Recv() 返回的错误。 如果错误是 nil,表示流仍然有效,服务器可以继续读取;如果是 io.EOF,则表示消息流已结束,服务器可以返回其 RouteSummary。 如果返回的是其他错误,我们将原样返回该错误,以便 gRPC 层将其转换为适当的 RPC 状态并发送给客户端。

双向流式 RPC

最后,让我们来看一个双向流式 RPC 的示例 —— RouteChat()

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      return nil
    }
    if err != nil {
      return err
    }
    key := serialize(in.Location)
                ... // look for notes to be sent to client
    for _, note := range s.routeNotes[key] {
      if err := stream.Send(note); err != nil {
        return err
      }
    }
  }
}

这次我们获得了一个 RouteGuide_RouteChatServer 流对象,和客户端流式 RPC 示例一样,它可以用来读取和写入消息。 然而,这次服务器在客户端仍在向其消息流写入消息的同时,就可以通过方法的流返回数据。

这里的读写语法与客户端流式方法非常相似,但服务器使用的是流的 Send() 方法,而不是 SendAndClose(),因为服务器可能需要发送多个响应。

尽管客户端和服务器接收到对方消息的顺序总是与发送时的顺序一致,但客户端和服务器可以按任意顺序进行读写 —— 这两个流是完全独立运行的。

启动服务器

一旦我们实现了所有方法,还需要启动一个 gRPC 服务器,以便客户端可以实际使用我们的服务。 以下代码片段展示了如何为我们的 RouteGuide 服务启动服务器:

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
  log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
...
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)

启动服务器的步骤:

  1. 指定监听端口:使用 net.Listen(...) 指定我们希望用于监听客户端请求的端口。
  2. 创建 gRPC 服务器实例:调用 grpc.NewServer(...) 创建一个 gRPC 服务器实例。
  3. 注册服务实现:通过 pb.RegisterRouteGuideServer(grpcServer, newServer()) 将我们的服务实现注册到 gRPC 服务器。
  4. 调用 Serve() 方法:使用我们的端口信息调用 Serve(),该方法会阻塞直到进程被终止或调用 Stop() 方法。

创建客户端

在这一部分,我们将介绍如何为我们的 RouteGuide 服务创建一个 Go 客户端。 您可以在 grpc-go/examples/route_guide/client/client.go 中查看我们完整的示例客户端代码。

创建存根

为了调用服务方法,我们首先需要创建一个 gRPC 通道来与服务器进行通信。 我们通过将服务器地址和端口号传递给 grpc.NewClient() 来创建这个通道,如下所示:

var opts []grpc.DialOption
...
conn, err := grpc.NewClient(*serverAddr, opts...)
if err != nil {
  ...
}
defer conn.Close()

您可以使用 DialOptions 来设置认证凭证(例如,TLS、GCE 凭证或 JWT 凭证),当服务需要这些凭证时,可以在 grpc.NewClient 中进行设置。 RouteGuide 服务不需要任何凭证。

一旦 gRPC 通道设置完成,我们就需要一个客户端存根来执行 RPC。 我们可以使用从示例 .proto 文件生成的 pb 包提供的 NewRouteGuideClient 方法来获取客户端存根:

client := pb.NewRouteGuideClient(conn)

调用服务方法

现在让我们看看如何调用服务方法。 需要注意的是,在 gRPC-Go 中,RPC 是以阻塞/同步模式进行的,这意味着 RPC 调用会等待服务器响应,并返回响应或错误。

简单 RPC

调用简单的 RPC GetFeature 几乎和调用本地方法一样简单。

feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
  ...
}

如您所见,我们在之前获取的存根上调用该方法。 在方法的参数中,我们创建并填充了一个请求的 protocol buffer 对象(在本例中是 Point)。 我们还传递了一个 context.Context 对象,它可以让我们在必要时改变 RPC 的行为,比如设置超时或取消正在进行的 RPC。 如果调用没有返回错误,那么我们可以从第一个返回值中读取服务器返回的响应信息。

log.Println(feature)
服务器端流式 RPC

这里我们调用服务器端流式方法 ListFeatures,该方法返回一个地理特征的流。 如果你已经阅读了 创建服务器 部分,你可能会发现这些内容很熟悉——流式 RPC 在客户端和服务器端的实现方式是相似的。

rect := &pb.Rectangle{ ... }  // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
  ...
}
for {
    feature, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
    }
    log.Println(feature)
}

与简单的 RPC 相似,我们向方法传递一个 context 和一个请求对象。 但与简单 RPC 不同的是,这里我们并没有直接得到一个响应对象,而是得到了一个 RouteGuide_ListFeaturesClient 实例。 客户端可以使用这个 RouteGuide_ListFeaturesClient 流来读取服务器的响应。

我们使用 RouteGuide_ListFeaturesClientRecv() 方法反复读取服务器的响应,并将它们填充到 protocol buffer 响应对象(在本例中是 Feature)中,直到没有更多消息为止。 每次调用 Recv() 后,客户端需要检查返回的错误 err。 如果是 nil,表示流仍然有效,客户端可以继续读取;如果是 io.EOF,则表示消息流结束;如果是其他错误,说明发生了 RPC 错误,错误会通过 err 返回。

客户端流式 RPC

客户端流式方法 RecordRoute 与服务器端流式方法类似,不同之处在于我们只需传递一个 context,并返回一个 RouteGuide_RecordRouteClient 流。 这个流可以用来向服务器发送和接收消息。

// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
  points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
  log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
  if err := stream.Send(point); err != nil {
    log.Fatalf("%v.Send(%v) = %v", stream, point, err)
  }
}
reply, err := stream.CloseAndRecv()
if err != nil {
  log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)

RouteGuide_RecordRouteClient 具有一个 Send() 方法,我们可以使用它向服务器发送请求。 一旦我们通过 Send() 方法将所有客户端请求写入流中,我们需要调用 CloseAndRecv() 来告诉 gRPC 我们已经完成写入,并且期待接收服务器的响应。 我们通过 CloseAndRecv() 返回的 err 获取 RPC 状态。 如果状态是 nil,那么 CloseAndRecv() 的第一个返回值将是一个有效的服务器响应。

双向流式 RPC

最后,让我们看看我们的双向流式 RPC RouteChat()。 与 RecordRoute 方法类似,我们只需要传递一个 context 对象,并返回一个可以用来同时写入和读取消息的流。 然而,这次我们通过方法的流返回值,同时客户端和服务器都可以继续在各自的消息流中写入消息。

stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      // read done.
      close(waitc)
      return
    }
    if err != nil {
      log.Fatalf("Failed to receive a note : %v", err)
    }
    log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
  }
}()
for _, note := range notes {
  if err := stream.Send(note); err != nil {
    log.Fatalf("Failed to send a note: %v", err)
  }
}
stream.CloseSend()
<-waitc

这里的读取和写入语法与我们的客户端流式方法非常相似,不同之处在于,一旦我们完成调用,就使用流的 CloseSend() 方法。 虽然每一方总是会按照写入的顺序接收到对方的消息,但客户端和服务器可以随意顺序地读取和写入消息——这些流是完全独立操作的。

试试看!

examples/route_guide 目录下执行以下命令:

  1. 运行服务器:
go run server/server.go
  1. 在另一个终端中,运行客户端:
go run client/client.go

你将看到类似如下的输出:

Getting feature for point (409146138, -746188906)
name:"Berkshire Valley Management Area Trail, Jefferson, NJ, USA" location:<latitude:409146138 longitude:-746188906 >
Getting feature for point (0, 0)
location:<>
Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 >
name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 >
...
name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 >
Traversing 56 points.
Route summary: point_count:56 distance:497013163
Got message First message at point(0, 1)
Got message Second message at point(0, 2)
Got message Third message at point(0, 3)
Got message First message at point(0, 1)
Got message Fourth message at point(0, 1)
Got message Second message at point(0, 2)
Got message Fifth message at point(0, 2)
Got message Third message at point(0, 3)
Got message Sixth message at point(0, 3)

注意
我们在本页中展示的客户端和服务器跟踪输出中省略了时间戳。