gRPC - 双向 RPC
现在让我们看看使用 gRPC 通信时客户端-服务器流是如何工作的。在这种情况下,客户端将搜索书籍并将其添加到购物车。每次添加书籍时,服务器都会以实时购物车值进行响应。
.proto 文件
首先让我们在 common_proto_files 中定义 bookstore.proto 文件 −
syntax = "proto3"; option java_package = "com.tp.bookstore"; service BookStore { rpc liveCartValue (stream Book) returns (stream Cart) {} } message Book { string name = 1; string author = 2; int32 price = 3; } message Cart { int32 books = 1; int32 price = 2; }
以下块表示服务的名称"BookStore"和可以调用的函数名称"liveCartValue"。"liveCartValue"函数接受类型为"Book"的输入,该输入是流。并且该函数返回类型为"Cart"的对象流。因此,实际上,我们让客户端以流式方式添加书籍,并且每当添加新书时,服务器都会向客户端响应当前购物车值。
service BookStore { rpc liveCartValue (stream Book) returns (stream Cart) {} }
Now let us look at these types.
message Book { string name = 1; string author = 2; int32 price = 3; }
客户将发送其想要购买的"Book"。它不必是完整的书籍信息;它可以只是书名。
message Cart { int32 books = 1; int32 price = 2; }
服务器在获取图书列表后,将返回 "Cart" 对象,该对象只是客户端购买的图书总数和总价。
请注意,我们已经完成了 Maven 设置,可以自动生成我们的类文件以及我们的 RPC 代码。所以,现在我们可以简单地编译我们的项目:
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放在
Protobuf 类代码:target/generatedsources/protobuf/java/com.tp.bookstore Protobuf gRPC 代码:target/generated-sources/protobuf/grpcjava/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。
让我们编写服务器代码来提供上述功能并将其保存在 com.tp.bookstore.BookeStoreServerBothStreaming.java −
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.bookstore.BookStoreOuterClass.Cart; public class BookeStoreServerBothStreaming { private static final Logger logger =Logger.getLogger(BookeStoreServerBothStreaming.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby") .setAuthor("Scott Fitzgerald") .setPrice(300).build()); bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird") .setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India") .setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise") .setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman") .setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port) .addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException,InterruptedException { final BookeStoreServerBothStreaming greetServer = newBookeStoreServerBothStreaming(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase { @Override public StreamObserver<Book>liveCartValue(StreamObserver<Cart> responseObserver) { return new StreamObserver<Book>() { ArrayList<Book> bookCart = new ArrayList<Book>(); int cartValue = 0; @Override public void onNext(Book book) { logger.info("Searching for book with titlestarting with: " + book.getName()); for (Entry<String, Book> bookEntry :bookMap.entrySet()) { if(bookEntry.getValue().getName().startsWith(book.getName())){ logger.info("Found book, adding tocart:...."); bookCart.add(bookEntry.getValue()); cartValue +=bookEntry.getValue().getPrice(); } } logger.info("Updating cart value..."); responseObserver.onNext(Cart.newBuilder() .setPrice(cartValue) .setBooks(bookCart.size()).build()); } @Override public void onError(Throwable t) { logger.info("Error while reading book stream: " + t); } @Override public void onCompleted() { logger.info("Order completed"); responseObserver.onCompleted(); } }; } } }
上述代码在指定端口启动一个 gRPC 服务器,并提供我们在 proto 文件中编写的功能和服务。让我们看一下上面的代码 −
从 main 方法开始,我们在指定端口创建一个 gRPC 服务器。
但在启动服务器之前,我们为服务器分配了我们想要运行的服务,即在我们的例子中为 BookStore 服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建服务实例,即在我们的例子中为 BookStoreImpl
服务实例需要提供 proto 文件中存在的方法/函数的实现,即在我们的例子中为 totalCartValue 方法。
现在,鉴于这是服务器和客户端流式传输的情况,服务器将获得Books(在 proto 文件中定义)在客户端添加它们时返回。因此,服务器返回自定义流观察器。此流观察器实现发现新书时发生的情况以及关闭流时发生的情况。
当客户端添加书籍时,gRPC 框架将调用 onNext() 方法。此时,服务器将其添加到购物车并使用响应观察器返回购物车值。在流式传输的情况下,服务器不会等待所有有效书籍都可用。
当客户端完成添加书籍时,将调用流观察器的 onCompleted() 方法。此方法实现了当客户端添加完 Books 后服务器想要执行的操作,即声明已完成接受客户端订单。
最后,我们还有一个关闭钩子,以确保在执行完代码后干净地关闭服务器。
设置 gRPC 客户端
现在我们已经编写了服务器代码,让我们设置一个可以调用这些函数的客户端。
让我们编写客户端代码来调用上述函数并将其保存在 com.tp.bookstore.BookStoreClientBothStreaming.java −
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreGrpc.BookStoreFutureStub; import com.tp.bookstore.BookStoreGrpc.BookStoreStub; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.bookstore.BookStoreOuterClass.Cart; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientBothStreaming { private static final Logger logger = Logger.getLogger(BookStoreClientBothStreaming.class.getName()); private final BookStoreStub stub; private boolean serverIntermediateResponseCompleted = true; private boolean serverResponseCompleted = false; StreamObserver<Book> streamClientSender; public BookStoreClientBothStreaming(Channel channel) { stub = BookStoreGrpc.newStub(channel); } public StreamObserver>Cart< getServerResponseObserver(){ StreamObserver>Cart< observer = new StreamObserver<Cart>(){ @Override public void onNext(Cart cart) { logger.info("Order summary:" + " Total number of Books:" + cart.getBooks()+ " Total Order Value:" cart.getPrice()); serverIntermediateResponseCompleted = true; } @Override public void onError(Throwable t) { logger.info("Error while reading response fromServer: " + t); } @Override public void onCompleted() { //logger.info("Server: Done reading orderreading cart"); serverResponseCompleted = true; } }; return observer; } public void addBook(String book) { logger.info("Adding book with title starting with: " + book); Book request = Book.newBuilder().setName(book).build(); if(streamClientSender == null) { streamClientSender =stub.liveCartValue(getServerResponseObserver()); } try { streamClientSender.onNext(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); } } public void completeOrder() { logger.info("Done, waiting for server to create ordersummary..."); if(streamClientSender != null); { streamClientSender.onCompleted(); } } public static void main(String[] args) throws Exception { String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { BookStoreClientBothStreaming client = new BookStoreClientBothStreaming(channel); String bookName = ""; while(true) { if(client.serverIntermediateResponseCompleted ==true) { System.out.println("Type book name to beadded to the cart...."); bookName = System.console().readLine(); if(bookName.equals("EXIT")) { client.completeOrder(); break; } client.serverIntermediateResponseCompleted = false; client.addBook(bookName); Thread.sleep(500); } } while(client.serverResponseCompleted == false) { Thread.sleep(2000); } } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
上述代码启动 gRPC 客户端并连接到指定端口的服务器,然后调用我们在 proto 文件中编写的函数和服务。让我们来看看上面的代码 −
从 main 方法开始,我们接受要添加到购物车的书籍名称。添加完所有书籍后,用户需要打印"EXIT"。
我们设置了一个通道,用于与服务器进行 gRPC 通信。
接下来,我们使用该通道创建一个非阻塞存根。这是我们选择服务 "BookStore" 的地方,我们计划调用其功能。
然后,我们只需创建 proto 文件中定义的预期输入,即我们的例子中的 Book,并添加我们希望服务器添加的标题。
但考虑到这是服务器和客户端流式传输的情况,我们首先为服务器创建一个 流观察器。此服务器流观察器列出了服务器响应时需要执行的行为,即 onNext() 和 onCompleted()。
使用存根,我们还获得了客户端流观察器。我们使用该流观察器发送数据,即要添加到购物车的Book。
一旦我们的订单完成,我们确保客户端流观察器已关闭。这会告诉服务器关闭流并执行清理。
最后,我们关闭通道以避免任何资源泄漏。
所以,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们想要做的是以下 −
启动 gRPC 服务器。
客户端通过通知服务器来添加书籍流。
服务器在其商店中搜索书籍并将其添加到购物车中。
每添加一本书,服务器都会告诉客户端购物车的价值。
当客户端完成订购时,服务器和客户端都会关闭流。
现在我们已经定义了我们的proto 文件,编写了我们的服务器和客户端代码,现在让我们执行此代码并查看实际操作。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 上启动服务器 −
java -cp . arget\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerClientStreaming
我们将看到以下输出 −
输出
Jul 03, 2021 10:37:21 PM com.tp.bookstore.BookeStoreServerStreaming start INFO: Server started, listening on 50051
此输出表明服务器已启动。
现在,让我们启动客户端。
java -cp . arget\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientBothStreaming
让我们向客户端添加一本书。
Jul 24, 2021 7:21:45 PM com.tp.bookstore.BookStoreClientBothStreaming main Type book name to be added to the cart.... Great Jul 24, 2021 7:21:48 PM com.tp.bookstore.BookStoreClientBothStreaming addBook INFO: Adding book with title starting with: Gr Jul 24, 2021 7:21:48 PM com.tp.bookstore.BookStoreClientBothStreaming$1 onNext INFO: Order summary: Total number of Books: 1 Total Order Value: 300
因此,如我们所见,我们获得了订单的当前购物车价值。现在让我们再向客户端添加一本书。
Type book name to be added to the cart.... Passage Jul 24, 2021 7:21:51 PM com.tp.bookstore.BookStoreClientBothStreaming addBook INFO: Adding book with title starting with: Pa Jul 24, 2021 7:21:51 PM com.tp.bookstore.BookStoreClientBothStreaming$1 onNext INFO: Order summary: Total number of Books: 2 Total Order Value: 800
一旦我们添加了书籍并输入"EXIT",客户端就会关闭。
Type book name to be added to the cart.... EXIT Jul 24, 2021 7:21:59 PM com.tp.bookstore.BookStoreClientBothStreaming completeOrder INFO: Done, waiting for server to create order summary...
因此,我们可以看到客户端能够添加书籍。在添加书籍时,服务器会响应当前购物车价值。