从零开始的Java gRPC

让我们探索如何在Java中实现gRPC。

gRPC(Google远程过程调用):gRPC是由Google开发的开源RPC架构,用于实现微服务之间的高速通信。gRPC允许开发人员集成使用不同语言编写的服务。gRPC使用Protobuf消息格式(协议缓冲区),这是一种高效、高度压缩的用于序列化结构化数据的消息格式。

对于某些用例,gRPC API可能比REST API更高效。

让我们尝试编写一个gRPC服务器。首先,我们需要编写几个描述服务和模型(DTO)的.proto文件。对于一个简单的服务器,我们将使用ProfileService和ProfileDescriptor。

ProfileService看起来像这样:

syntax = "proto3";
package com.deft.grpc;
import "google/protobuf/empty.proto";
import "profile_descriptor.proto";
service ProfileService {
  rpc GetCurrentProfile (google.protobuf.Empty) returns (ProfileDescriptor) {}
  rpc clientStream (stream ProfileDescriptor) returns (google.protobuf.Empty) {}
  rpc serverStream (google.protobuf.Empty) returns (stream ProfileDescriptor) {}
  rpc biDirectionalStream (stream ProfileDescriptor) returns (stream 	ProfileDescriptor) {}
}

gRPC支持多种客户端-服务器通信选项。我们将逐个介绍它们:

  • 普通的服务器调用 – 请求/响应。
  • 从客户端到服务器的流式传输。
  • 从服务器到客户端的流式传输。
  • 当然,还有双向流式传输。

ProfileService服务使用在导入部分指定的ProfileDescriptor:

syntax = "proto3";
package com.deft.grpc;
message ProfileDescriptor {
  int64 profile_id = 1;
  string name = 2;
}
  • int64对应Java中的Long类型。让配置文件的配置文件id。
  • String – 就像在Java中一样,这是一个字符串变量。

您可以使用Gradle或Maven构建项目。对我来说,使用Maven更方便。而且未来将使用Maven的代码。这是非常重要的,因为对于Gradle来说,未来的.proto文件生成会稍有不同,并且构建文件的配置也将不同。要编写一个简单的gRPC服务器,我们只需要一个依赖项:


    io.github.lognet
    grpc-spring-boot-starter
    4.5.4

这真是太不可思议了。这个启动器为我们做了大量的工作。

我们将创建的项目将看起来像这样:

我们需要GrpcServerApplication来启动Spring Boot应用程序。以及GrpcProfileService,它将实现来自.proto服务的方法。要使用protoc并从编写的.proto文件生成类,请将protobuf-maven-plugin添加到pom.xml。构建部分将如下所示:

  • protoSourceRoot – 指定.proto文件所在的目录。
  • outputDirectory – 选择要生成文件的目录。
  • clearOutputDirectory – 一个标志,表示不清除生成的文件。

在这个阶段,您可以构建一个项目。接下来,您需要进入我们在输出目录中指定的文件夹。生成的文件将在那里。现在您可以逐步实现GrpcProfileService

类声明将如下所示:

@GRpcService
public class GrpcProfileService extends ProfileServiceGrpc.ProfileServiceImplBase

GRpcService注解 – 将类标记为grpc-service bean。

由于我们从ProfileServiceGrpcProfileServiceImplBase继承了我们的服务,我们可以重写父类的方法。我们将首先重写的方法是getCurrentProfile

    @Override
    public void getCurrentProfile(Empty request, StreamObserver responseObserver) {
        System.out.println("getCurrentProfile");
        responseObserver.onNext(ProfileDescriptorOuterClass.ProfileDescriptor
                .newBuilder()
                .setProfileId(1)
                .setName("test")
                .build());
        responseObserver.onCompleted();
    }

要响应客户端,您需要在传递的StreamObserver上调用onNext方法。在发送响应之后,向客户端发送一个信号,告诉客户端服务器已经完成了工作onCompleted。当向getCurrentProfile服务器发送请求时,响应将是:

{
  "profile_id": "1",
  "name": "test"
}

接下来,让我们看一下服务器流。采用这种消息传递方法,客户端向服务器发送一个请求,服务器用一系列的消息响应客户端。例如,它在一个循环中发送五个请求。发送完成后,服务器向客户端发送一条消息,告诉客户端流的成功完成。

重写的服务器流方法将如下所示:

@Override
    public void serverStream(Empty request, StreamObserver responseObserver) {
        for (int i = 0; i < 5; i++) {
            responseObserver.onNext(ProfileDescriptorOuterClass.ProfileDescriptor
                .newBuilder()
                .setProfileId(i)
                .build());
        }
        responseObserver.onCompleted();
    }

因此,客户端将收到五个具有ProfileId的消息,与响应编号相等。

 {
  "profile_id":“ 0”,
  "name":“”
}
{
  "profile_id":"1",
  "name":“”
}
…
{
  "profile_id":"4",
  "name":“”
}
 

客户端流非常类似于服务器流。现在,客户端传输一系列消息,服务器处理这些消息。服务器可以立即处理消息,也可以等待来自客户端的所有请求,然后再处理它们。

  @Override
    public StreamObserver  clientStream(StreamObserver  responseObserver) {
        return new StreamObserver (){

            @Override
            public void onNext(ProfileDescriptorOuterClass.ProfileDescriptor profileDescriptor) {
                log.info(“来自客户端的ProfileDescriptor。个人资料ID:{}”,profileDescriptor.getProfileId());
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }
 

在客户端流中,您需要将 StreamObserver 返回给客户端,服务器将通过该观察者接收消息。如果流中发生错误(例如,不正确终止),将调用onError方法。

要实现双向流,需要组合从服务器和客户端创建流的操作。

  @Override
    public StreamObserver  biDirectionalStream(
            StreamObserver  responseObserver) {

        return new StreamObserver (){
            int pointCount = 0;
            @Override
            public void onNext(ProfileDescriptorOuterClass.ProfileDescriptor profileDescriptor) {
                log.info(“biDirectionalStream,pointCount {}”,pointCount);
                responseObserver.onNext(ProfileDescriptorOuterClass.ProfileDescriptor
                        .newBuilder()
                        .setProfileId(pointCount ++)
                        .build());
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }
 

在此示例中,响应客户端消息时,服务器将返回具有增加的 pointCount 的个人资料。

结论

我们介绍了使用 gRPC 在客户端和服务器之间进行消息传递的基本选项:实现了服务器流、客户端流和双向流。

文章由Sergey Golitsyn撰写

类似文章