gRPC - 一元 gRPC
现在我们将了解 gRPC 框架支持的各种类型的通信。我们将使用 Bookstore 的示例,客户端可以在其中搜索并下订单以进行图书配送。
让我们看看 一元 gRPC 通信,我们让客户端搜索书名并随机返回一本与查询的书名匹配的书。
.proto 文件
首先让我们在 common_proto_files 中定义 bookstore.proto 文件 −
syntax = "proto3"; option java_package = "com.tp.bookstore"; service BookStore { rpc first (BookSearch) returns (Book) {} } message BookSearch { string name = 1; string author = 2; string genre = 3; } message Book { string name = 1; string author = 2; int32 price = 3; }
现在让我们仔细看看上面代码块中的每一行。
syntax = "proto3";
语法
这里的"语法"代表我们使用的 Protobuf 版本。我们使用的是最新版本 3,因此模式可以使用对版本 3 有效的所有语法。
package tutorial;
这里的 package 用于解决冲突,例如,我们有多个同名的类/成员。
option java_package = "com.tp.bookstore";
此参数特定于 Java,即从 .proto 文件自动生成代码的包。
service BookStore { rpc first (BookSearch) returns (Book) {} }
这表示服务的名称"BookStore"和可以调用的函数名称"first"。"first"函数接受"BookSearch"类型的输入并返回"Book"类型的输出。因此,实际上,我们让客户端搜索标题并返回与查询的标题匹配的一本书。
现在让我们看看这些类型。
message BookSearch { string name = 1; string author = 2; string genre = 3; }
在上述代码块中,我们定义了 BookSearch,它包含 name、author 和 genre 等属性。客户端应该将"BookSearch"类型的对象发送到服务器。
message Book { string name = 1; string author = 2; int32 price = 3; }
这里,我们还定义了,给定一个"BookSearch",服务器将返回"Book",其中包含book 属性以及书籍的价格。服务器应该将"Book"类型的对象发送给客户端。
请注意,我们已经完成了 Maven 设置,可以自动生成我们的类文件以及我们的 RPC 代码。所以,现在我们可以简单地编译我们的项目 −
mvn clean install
这应该会自动生成我们使用 gRPC 所需的源代码。源代码将放在 −
Protobuf 类代码:target/generated-sources/protobuf/java/com.tp.bookstore Protobuf gRPC 代码:target/generated-sources/protobuf/grpc-java/com.tp.bookstore
设置 gRPC 服务器
现在我们已经定义了包含函数定义的 proto 文件,让我们设置一个可以调用这些函数的服务器。
让我们编写服务器代码来提供上述函数并将其保存在 com.tp.bookstore.BookeStoreServerUnary.java −
示例
package com.tp.bookstore; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; public class BookeStoreServerUnary { private static final Logger logger = Logger.getLogger(BookeStoreServerUnary.class.getName()); static Map<String, Book> bookMap = new HashMap<>(); static { bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby") .setAuthor("Scott Fitzgerald") .setPrice(300).build()); bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird") .setAuthor("Harper Lee") .setPrice(400).build()); bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India") .setAuthor("E.M.Forster") .setPrice(500).build()); bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise") .setAuthor("Scott Fitzgerald") .setPrice(600).build()); bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman") .setAuthor("Harper Lee") .setPrice(700).build()); } private Server server; private void start() throws IOException { int port = 50051; server = ServerBuilder.forPort(port) .addService(new BookStoreImpl()).build().start(); logger.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.err.println("Shutting down gRPC server"); try { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(System.err); } } }); } public static void main(String[] args) throws IOException, InterruptedException { final BookeStoreServerUnary greetServer = new BookeStoreServerUnary(); greetServer.start(); greetServer.server.awaitTermination(); } static class BookStoreImpl extends BookStoreGrpc.BookStoreImplBase { @Override public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) { logger.info("Searching for book with title: " + searchQuery.getName()); List<String> matchingBookTitles = bookMap.keySet().stream().filter(title -> title.startsWith(searchQuery.getName().trim())).collect(Collectors.toList()); Book foundBook = null; if(matchingBookTitles.size() > 0) { foundBook = bookMap.get(matchingBookTitles.get(0)); } responseObserver.onNext(foundBook); responseObserver.onCompleted(); } } }
上述代码在指定端口启动一个 gRPC 服务器,并提供我们在 proto 文件中编写的功能和服务。让我们看一下上面的代码 −
从 main 方法开始,我们在指定端口创建一个 gRPC 服务器。
但在启动服务器之前,我们为服务器分配了我们想要运行的服务,即在我们的例子中为 BookStore 服务。
为此,我们需要将服务实例传递给服务器,因此我们继续创建一个服务实例,即在我们的例子中为 BookStoreImpl
服务实例需要提供 .proto 文件中存在的方法/函数的实现,即在我们的例子中为 first 方法。
该方法需要一个在 .proto 文件中定义的类型的对象,即,对于我们来说,BookSearch
该方法在可用的 bookMap 中搜索书籍,然后通过调用 onNext() 方法返回 Book。完成后,服务器通过调用 onCompleted()
宣布输出已完成
最后,我们还有一个关闭挂钩,以确保在执行完代码后干净地关闭服务器。
设置 gRPC 客户端
现在我们已经为服务器编写了代码,让我们设置一个可以调用这些函数的客户端。
让我们编写客户端代码来调用上述函数并将其保存在 com.tp.bookstore.BookStoreClientUnaryBlocking.java −
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientUnaryBlocking { private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName()); private final BookStoreGrpc.BookStoreBlockingStub blockingStub; public BookStoreClientUnaryBlocking(Channel channel) { blockingStub = BookStoreGrpc.newBlockingStub(channel); } public void getBook(String bookName) { logger.info("Querying for book with title: " + bookName); BookSearch request = BookSearch.newBuilder().setName(bookName).build(); Book response; try { response = blockingStub.first(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; } logger.info("Got following book from server: " + response); } public static void main(String[] args) throws Exception { String bookName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { BookStoreClientUnaryBlocking client = new BookStoreClientUnaryBlocking(channel); client.getBook(bookName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
上述代码在指定端口启动 gRPC 服务器,并提供我们在 proto 文件中编写的功能和服务。让我们来看看上面的代码 −
从 main 方法开始,我们接受一个参数,即我们要搜索的书的标题。
我们设置了一个用于与服务器进行 gRPC 通信的通道。
然后,我们使用该通道创建一个阻塞存根。在这里,我们选择我们计划调用其功能的服务"BookStore"。 "存根"只不过是一个包装器,它向调用者隐藏了远程调用的复杂性。
然后,我们只需创建 .proto 文件中定义的预期输入,即在我们的例子中为 BookSearch,并添加我们希望服务器搜索的标题名称。
我们最终进行调用并等待服务器的结果。
最后,我们关闭通道以避免任何资源泄漏。
所以,这就是我们的客户端代码。
客户端服务器调用
总而言之,我们想要做的是以下 −
启动 gRPC 服务器。
客户端向服务器查询具有给定名称/标题的书籍。
服务器在商店中搜索该书。
然后服务器响应该书及其其他属性。
现在,我们已经定义了 proto 文件,编写了服务器和客户端代码,让我们继续执行此代码并查看实际操作。
要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 上启动服务器 −
java -cp 。 arget\grpc-point-1.0.jar com.tp.bookstore.BookeStoreServerUnary
我们将看到以下输出 −
输出
Jul 03, 2021 7:21:58 PM com.tp.bookstore.BookeStoreServerUnary start INFO: 服务器已启动,正在监听 50051
以上输出表示服务器已启动。
现在,让我们启动客户端。
java -cp . arget\grpc-point-1.0.jar com.tp.bookstore.BookStoreClientUnaryBlocking "To Kill"
我们将看到以下输出 −
输出
Jul 03, 2021 7:22:03 PM com.tp.bookstore.BookStoreClientUnaryBlocking getBook INFO: Querying for book with title: To Kill Jul 03, 2021 7:22:04 PM com.tp.bookstore.BookStoreClientUnaryBlocking getBook INFO: Got following book from server: name: "To Kill MockingBird" author: "Harper Lee" price: 400
因此,正如我们所见,客户端能够通过向服务器查询书名来获取书籍详细信息。