gRPC - 客户端调用
gRPC 客户端支持两种类型的客户端调用,即客户端如何调用服务器。以下是两种方式 −
阻止客户端调用
异步客户端调用
在本章中,我们将逐一介绍这两种方式。
阻止客户端调用
gRPC 支持阻止客户端调用。这意味着一旦客户端调用服务,客户端就不会继续执行其余的代码,直到它从服务器获得响应。请注意,一元调用和服务器流式调用都可能出现阻塞客户端调用。
请注意,一元调用和服务器流式调用都可能出现阻塞客户端调用。
以下是一元阻塞客户端调用的示例。
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientUnaryBlocking { private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName()); private final BookStoreGrpc.BookStoreBlockingStubblockingStub; public BookStoreClientUnaryBlocking(Channel channel) { blockingStub = BookStoreGrpc.newBlockingStub(channel); } public void getBook(String bookName) { logger.info("Querying for book with title: " + bookName); BookSearch request = BookSearch.newBuilder().setName(bookName).build(); Book response; try { response = blockingStub.first(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; } logger.info("Got following book from server: " + response); } public static void main(String[] args) throws Exception { String bookName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { BookStoreClientUnaryBlocking client = new BookStoreClientUnaryBlocking(channel); client.getBook(bookName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
在上面的例子中,我们有,
public BookStoreClientUnaryBlocking(Channel channel) { blockingStub = BookStoreGrpc.newBlockingStub(channel); }
这意味着我们将使用阻塞 RPC 调用。
然后,我们有,
BookSearch request = BookSearch.newBuilder().setName(bookName).build(); Book response; response = breakingStub.first(request);
在这里我们使用 blockingStub 调用 RPC 方法 first() 来获取书籍详细信息。
同样,对于服务器流式传输,我们可以使用阻塞存根−
logger.info("Querying for book with author: " + author); BookSearch request = BookSearch.newBuilder().setAuthor(author).build(); Iterator<Book> response; try { response = blockingStub.searchByAuthor(request); while(response.hasNext()) { logger.info("Found book: " + response.next()); }
我们调用 RPC 方法 searchByAuthor 并迭代响应,直到服务器流尚未结束。
非阻塞客户端调用
gRPC 支持非阻塞客户端调用。这意味着当客户端调用服务时,它不需要等待服务器响应。要处理服务器响应,客户端只需传入观察者,该观察者指示收到响应后要做什么。请注意,非阻塞客户端调用既可以用于一元调用,也可以用于流式调用。但是,我们将专门研究服务器流式调用的情况,以将其与阻塞调用进行比较。
请注意,非阻塞客户端调用既可以用于一元调用,也可以用于流式调用。但是,我们将特别关注服务器流式调用的情况,将其与阻塞调用进行比较。
以下是服务器流式非阻塞客户端调用的示例
示例
package com.tp.bookstore; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import com.tp.bookstore.BookStoreOuterClass.Book; import com.tp.bookstore.BookStoreOuterClass.BookSearch; import com.tp.greeting.GreeterGrpc; import com.tp.greeting.Greeting.ServerOutput; import com.tp.greeting.Greeting.ClientInput; public class BookStoreClientServerStreamingNonBlocking { private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingNonBlocking.class.getName()); private final BookStoreGrpc.BookStoreStub nonBlockingStub; public BookStoreClientServerStreamingNonBlocking(Channelchannel) { nonBlockingStub = BookStoreGrpc.newStub(channel); } public StreamObserver<Book> getServerResponseObserver(){ StreamObserver<Book> observer = new StreamObserver<Book>(){ @Override public void onNext(Book book) { logger.info("Server returned following book: " +book); } @Override public void onError(Throwable t) { logger.info("Error while reading response fromServer: " + t); } @Override public void onCompleted() { logger.info("Server returned following book: " + book); } }; return observer; } public void getBook(String author) { logger.info("Querying for book with author: " + author); BookSearch request = BookSearch.newBuilder().setAuthor(author).build(); try { nonBlockingStub.searchByAuthor(request,getServerResponseObserver()); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus()); return; } } public static void main(String[] args) throws Exception { String authorName = args[0]; String serverAddress = "localhost:50051"; ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress) .usePlaintext() .build(); try { BookStoreClientServerStreamingNonBlocking client = new BookStoreClientServerStreamingNonBlocking(channel); client.getBook(authorName); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } }
正如我们在上面的例子中看到的,
public BookStoreClientUnaryNonBlocking(Channel channel) { nonBlockingStub = BookStoreGrpc.newStub(channel); }
它定义了存根是非阻塞的。同样,以下代码用于处理我们从服务器获得的响应。一旦服务器发送响应,我们就会记录输出。
public StreamObserver<Book> getServerResponseObserver(){ StreamObserver<Book> observer = new StreamObserver<Book>(){ .... .... return observer; }
以下 gRPC 调用是非阻塞调用。
logger.info("Querying for book with author: " + author); BookSearch request = BookSearch.newBuilder().setAuthor(author).build(); try { nonBlockingStub.searchByAuthor(request, getServerResponseObserver()); }
这就是我们确保客户端无需等待服务器完成 searchByAuthor 执行的方法。当服务器返回 Book 对象时,流观察器对象将直接处理该操作。