gRPC - 客户端流式 RPC

现在让我们看看使用 gRPC 通信时客户端流式传输的工作原理。在本例中,客户端将搜索图书并将其添加到购物车。一旦客户端添加完所有图书,服务器就会向客户端提供结帐购物车值。

.proto 文件

首先让我们在 common_proto_files 中定义 bookstore.proto 文件 −

syntax = "proto3";
option java_package = "com.tp.bookstore";
service BookStore {
   rpc totalCartValue (stream Book) returns (Cart) {}
}
message BookSearch {
   string name = 1;
   string author = 2;
   int32 price = 3;
}
message Cart {
   int32 books = 1;
   int32 price = 2;
}

此处,以下块表示服务的名称"BookStore"和可调用的函数名称"totalCartValue"。"totalCartValue"函数接受类型为"Book"的输入,该输入是流。并且该函数返回类型为"Cart"的对象。因此,实际上,我们让客户端以流式方式添加书籍,一旦客户端完成,服务器就会向客户端提供购物车总价值。

service BookStore {
   rpc totalCartValue (stream Book) returns (Cart) {}
}

Now let us look at these types.

message Book {
   string name = 1;
   string author = 2;
   int32 price = 3;
}

客户端将发送其想要购买的"Book"。它可能不是完整的书籍信息;它可能只是书名。

message Cart {
   int32 books = 1;
   int32 price = 2;
}

服务器在获取图书列表后,将返回"Cart"对象,该对象只是客户端购买的图书总数和总价。

请注意,我们已经完成了 Maven 设置,可以自动生成我们的类文件以及我们的 RPC 代码。所以,现在我们可以简单地编译我们的项目 −

mvn clean install

这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放在 −

Protobuf 类代码:target/generated-sources/protobuf/java/com.tp.bookstore
Protobuf gRPC 代码:target/generated-sources/protobuf/grpc-java/com.tp.bookstore

设置 gRPC 服务器

现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。

让我们编写服务器代码来提供上述功能并将其保存在 com.tp.bookstore.BookeStoreServerClientStreaming.java

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class  BookeStoreServerClientStreaming {
   private static final Logger logger = Logger.getLoggerr(BookeStoreServerClientStreaming.class.getName());
 
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby")
         .setAuthor("Scott Fitzgerald")
         .setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird")
         .setAuthor("Harper Lee")
         .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India")
         .setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise")
         .setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman")
         .setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();
 
      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException, InterruptedException {
      final BookeStoreServerClientStreaming greetServer = new  BookeStoreServerClientStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book> totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            public void onNext(Book book) 
            logger.info("Searching for book with title starting with: " + book.getName());
            for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
               if(bookEntry.getValue().getName().startsWith(book.getName())){
                  logger.info("Found book, adding to cart:....");
                  bookCart.add(bookEntry.getValue());
               }
            }
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading book stream: " + t);
         }
         @Override
         public void onCompleted() {
            int cartValue = 0;
            for (Book book : bookCart) {
               cartValue += book.getPrice();
            }
            responseObserver.onNext(Cart.newBuilder()
               .setPrice(cartValue)
               .setBooks(bookCart.size()).build());
            responseObserver.onCompleted();
         }
      };
 
   }
}          

上述代码在指定端口启动一个 gRPC 服务器,并提供我们在 proto 文件中编写的功能和服务。让我们看一下上面的代码 −

  • main 方法开始,我们在指定端口创建一个 gRPC 服务器。

  • 但在启动服务器之前,我们为服务器分配了我们想要运行的服务,即在我们的例子中为 BookStore 服务。

  • 为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为 BookStoreImpl

  • 服务实例需要提供 .proto 文件 中存在的方法/函数的实现,即在我们的例子中为 totalCartValue 方法。

  • 现在,鉴于这是客户端流式传输的情况,服务器将获得当客户端添加书籍时,服务器会返回 proto 文件中定义的书籍。因此,服务器返回一个 自定义流观察器。此流观察器实现了发现新书籍时以及关闭流时发生的情况。

  • 当客户端添加书籍时,gRPC 框架将调用 onNext() 方法。此时,服务器将其添加到购物车中。在流式传输的情况下,服务器不会等待所有可用的书籍。

  • 当客户端完成添加书籍时,将调用流观察器的 onCompleted() 方法。此方法实现了客户端添加完 Book 后服务器想要发送的内容,即,它将 Cart 对象返回给客户端。

  • 最后,我们还有一个关闭钩子,以确保在执行完代码后干净地关闭服务器。

设置 gRPC 客户端

现在我们已经编写了服务器的代码,让我们设置一个可以调用这些函数的客户端。

让我们编写客户端代码来调用上述函数并将其保存在 com.tp.bookstore.BookStoreClientServerStreamingBlocking.java

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub;
import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientStreamingClient {
   private static final Logger logger = Logger.getLogger(BookStoreClientStreaming.class.getName());
   private final BookStoreStub stub;
	private boolean serverResponseCompleted = false; 
   StreamObserver<Book> streamClientSender;
   
   public BookStoreClientStreamingClient(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver<Cart>(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" + "
Total number of Books:" + cart.getBooks() + 
               "
Total Order Value:" + cart.getPrice());
         }
         @Override
         public void onCompleted() {
            //logger.info("Server: Done reading orderreading cart");
            serverResponseCompleted = true;
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();
 
      if(streamClientSender == null) {
         streamClientSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamClientSender != null);
      streamClientSender.onCompleted();
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
	   ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientStreamingClient client = new BookStoreClientStreamingClient(channel);
         String bookName = ""; 
 
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               client.completeOrder();
               break; 
            }
            client.addBook(bookName);
         }
 
         while(client.serverResponseCompleted == false) {
            Thread.sleep(2000);
         }
 
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

上述代码在指定端口启动 gRPC 服务器,并提供我们在 proto 文件中编写的功能和服务。让我们来看看上面的代码 −

  • main 方法开始,我们接受一个参数,即我们要搜索的书的标题。

  • 我们设置了一个与服务器进行 gRPC 通信的通道。

  • 接下来,我们使用创建的通道创建一个非阻塞存根。这是我们选择服务 "BookStore" 的地方,我们计划调用其函数。

  • 然后,我们只需创建在 .proto 文件中定义的预期输入,即我们的例子中的 Book,并添加我们希望服务器添加的标题。

  • 但考虑到这是客户端流的情况,我们首先为服务器创建一个流观察器。此服务器流观察器列出了服务器响应时需要执行的行为,即 onNext()onCompleted()

  • 使用存根,我们还获得了客户端流观察器。我们使用此流观察器发送要添加到购物车的数据,即 Book。我们最终进行调用并获取有效图书的迭代器。当我们迭代时,我们会获得服务器提供的相应书籍。

  • 一旦我们的订单完成,我们确保客户端流观察器已关闭。它告诉服务器计算购物车价值并将其作为输出提供。

  • 最后,我们关闭通道以避免任何资源泄漏。

所以,这就是我们的客户端代码。

客户端服务器调用

总而言之,我们想要做的是以下 −

  • 启动 gRPC 服务器。

  • 客户端通过通知服务器来添加书籍流。

  • 服务器在其商店中搜索书籍并将其添加到购物车中。

  • 当客户端完成订购时,服务器会响应客户端的购物车总价值。

现在,我们已经定义了我们的 proto 文件,编写了我们的服务器和客户端代码,让我们继续执行此代码并查看实际操作。

要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 上启动服务器 −

java -cp .	arget\grpc-point-1.0.jar 
com.tp.bookstore.BookeStoreServerClientStreaming

我们将看到以下输出 −

输出

Jul 03, 2021 10:37:21 PM 
com.tp.bookstore.BookeStoreServerStreaming start
INFO: Server started, listening on 50051

以上输出表示服务器已经启动。

现在,让我们启动客户端。

java -cp .	arget\grpc-point-1.0.jar 
com.tp.bookstore.BookStoreClientServerStreamingClient

让我们向客户端添加几本书。

Type book name to be added to the cart....
Gr
Jul 24, 2021 5:53:07 PM 
com.tp.bookstore.BookStoreClientStreamingClient addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
Pa
Jul 24, 2021 5:53:20 PM 
com.tp.bookstore.BookStoreClientStreamingClient addBook
INFO: Adding book with title starting with: Passage

Type book name to be added to the cart....

一旦我们添加了书籍并输入"EXIT",服务器就会计算购物车的价值,下面是我们得到的输出 −

输出

EXIT
Jul 24, 2021 5:53:33 PM 
com.tp.bookstore.BookStoreClientStreamingClient completeOrder
INFO: Done, waiting for server to create order summary...
Jul 24, 2021 5:53:33 PM 
com.tp.bookstore.BookStoreClientStreamingClient$1 onNext
INFO: Order summary:
Total number of Books: 2
Total Order Value: 800

因此,如我们所见,客户端能够添加书籍。一旦添加完所有书籍,服务器就会响应书籍总数和总价格。