gRPC - 客户端调用

gRPC 客户端支持两种类型的客户端调用,即客户端如何调用服务器。以下是两种方式 −

  • 阻止客户端调用

  • 异步客户端调用

在本章中,我们将逐一介绍这两种方式。

阻止客户端调用

gRPC 支持阻止客户端调用。这意味着一旦客户端调用服务,客户端就不会继续执行其余的代码,直到它从服务器获得响应。请注意,一元调用和服务器流式调用都可能出现阻塞客户端调用。

请注意,一元调用和服务器流式调用都可能出现阻塞客户端调用。

以下是一元阻塞客户端调用的示例。

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientUnaryBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlocking.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
   public BookStoreClientUnaryBlocking(Channel channel) {
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request = BookSearch.newBuilder().setName(bookName).build();

      Book response;
      try {
         response = blockingStub.first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
         return;
      }
      logger.info("Got following book from server: " + response);
   }
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientUnaryBlocking client = new
         BookStoreClientUnaryBlocking(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

在上面的例子中,我们有,

public BookStoreClientUnaryBlocking(Channel channel) {
   blockingStub = BookStoreGrpc.newBlockingStub(channel);
}

这意味着我们将使用阻塞 RPC 调用。

然后,我们有,

BookSearch request = BookSearch.newBuilder().setName(bookName).build();

Book response;
response = breakingStub.first(request);

在这里我们使用 blockingStub 调用 RPC 方法 first() 来获取书籍详细信息。

同样,对于服务器流式传输,我们可以使用阻塞存根−

logger.info("Querying for book with author: " + author);
BookSearch request =
BookSearch.newBuilder().setAuthor(author).build();

Iterator<Book> response;
try {
   response = blockingStub.searchByAuthor(request);
   while(response.hasNext()) {
   logger.info("Found book: " + response.next());
}

我们调用 RPC 方法 searchByAuthor 并迭代响应,直到服务器流尚未结束。

非阻塞客户端调用

gRPC 支持非阻塞客户端调用。这意味着当客户端调用服务时,它不需要等待服务器响应。要处理服务器响应,客户端只需传入观察者,该观察者指示收到响应后要做什么。请注意,非阻塞客户端调用既可以用于一元调用,也可以用于流式调用。但是,我们将专门研究服务器流式调用的情况,以将其与阻塞调用进行比较。

请注意,非阻塞客户端调用既可以用于一元调用,也可以用于流式调用。但是,我们将特别关注服务器流式调用的情况,将其与阻塞调用进行比较。

以下是服务器流式非阻塞客户端调用的示例

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientServerStreamingNonBlocking {
   private static final Logger logger = Logger.getLogger(BookStoreClientServerStreamingNonBlocking.class.getName());
   private final BookStoreGrpc.BookStoreStub nonBlockingStub;
   public BookStoreClientServerStreamingNonBlocking(Channelchannel) {
      nonBlockingStub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver<Book> getServerResponseObserver(){
      StreamObserver<Book> observer = new
      StreamObserver<Book>(){
         @Override
         public void onNext(Book book) {
            logger.info("Server returned following book: " +book);
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response fromServer: " + t);
         }
         @Override
         public void onCompleted() {
            logger.info("Server returned following book: " + book);
         }
      };
      return observer;
   }
   public void getBook(String author) {
      logger.info("Querying for book with author: " + author);
      BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
      try {
         nonBlockingStub.searchByAuthor(request,getServerResponseObserver());
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
         return;
      }
   }
   public static void main(String[] args) throws Exception {
      String authorName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress)
         .usePlaintext()
         .build();
      try {
         BookStoreClientServerStreamingNonBlocking client = new
         BookStoreClientServerStreamingNonBlocking(channel);
         client.getBook(authorName);
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

正如我们在上面的例子中看到的,

public BookStoreClientUnaryNonBlocking(Channel channel) {
   nonBlockingStub = BookStoreGrpc.newStub(channel);
}

它定义了存根是非阻塞的。同样,以下代码用于处理我们从服务器获得的响应。一旦服务器发送响应,我们就会记录输出。

public StreamObserver<Book> getServerResponseObserver(){
   StreamObserver<Book> observer = new
   StreamObserver<Book>(){
   ....
   ....
   return observer;
}

以下 gRPC 调用是非阻塞调用。

logger.info("Querying for book with author: " + author);
BookSearch request = BookSearch.newBuilder().setAuthor(author).build();
try {
   nonBlockingStub.searchByAuthor(request, getServerResponseObserver());
}

这就是我们确保客户端无需等待服务器完成 searchByAuthor 执行的方法。当服务器返回 Book 对象时,流观察器对象将直接处理该操作。