gRPC - 超时和取消

gRPC 支持为请求分配超时。这是一种执行请求取消的方法。它有助于避免将客户端和服务器的资源用于结果对客户端无用的请求。

请求超时

gRPC 支持为客户端和服务器指定超时。

  • 客户端可以在运行时指定在取消请求之前要等待的时间。

  • 服务器还可以在其端检查是否需要满足请求或客户端是否已放弃请求。

让我们举一个例子,客户端希望在 2 秒内得到响应,但服务器需要更长的时间。因此,这是我们的服务器代码

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;

public class BookeStoreServerUnaryTimeout {
   private static final Logger logger = Logger.getLogger(BookeStoreServerUnaryTimeout.class.getName());

   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby",Book.newBuilder().setName("Great Gatsby")
        .setAuthor("Scott Fitzgerald")
        .setPrice(300).build());
      bookMap.put("To Kill MockingBird",Book.newBuilder().setName("To Kill MockingBird")
        .setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India",Book.newBuilder().setName("Passage to India")
        .setAuthor("E.M.Forster")
        .setPrice(500).build());
      bookMap.put("The Side of Paradise",Book.newBuilder().setName("The Side of Paradise")
        .setAuthor("Scott Fitzgerald")
        .setPrice(600).build());
      bookMap.put("Go Set a Watchman",Book.newBuilder().setName("Go Set a Watchman")
        .setAuthor("Harper Lee")
        .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port)
         .addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerUnaryTimeout greetServer = new
      BookeStoreServerUnaryTimeout();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extends
   BookStoreGrpc.BookStoreImplBase {
      @Override
      public void first(BookSearch searchQuery, StreamObserver<Book> responseObserver) {
         logger.info("Searching for book with title: " +searchQuery.getName());
         logger.info("This may take more time...");
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         List<String> matchingBookTitles = bookMap.keySet().stream()
            .filter(title ->
         title.startsWith(searchQuery.getName().trim()))
            .collect(Collectors.toList());
         Book foundBook = null;
         if(matchingBookTitles.size() > 0) {
            foundBook = bookMap.get(matchingBookTitles.get(0));
         }
         responseObserver.onNext(foundBook);
         responseObserver.onCompleted();
      }
   }
}

在上面的代码中,服务器搜索客户端提供的book书名。我们添加了一个dummy sleep,以便我们可以看到请求超时。

这是我们的客户端代码

示例

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.greeting.GreeterGrpc;
import com.tp.greeting.Greeting.ServerOutput;
import com.tp.greeting.Greeting.ClientInput;

public class BookStoreClientUnaryBlockingTimeout {
   private static final Logger logger = Logger.getLogger(BookStoreClientUnaryBlockingTimeout.class.getName());
   private final BookStoreGrpc.BookStoreBlockingStubblockingStub;
   public BookStoreClientUnaryBlockingTimeout(Channel channel){
      blockingStub = BookStoreGrpc.newBlockingStub(channel);
   }
   public void getBook(String bookName) {
      logger.info("Querying for book with title: " + bookName);
      BookSearch request =BookSearch.newBuilder().setName(bookName).build();
      Book response;
      try {
         response = blockingStub.withDeadlineAfter(2,TimeUnit.SECONDS).first(request);
      } catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}",e.getStatus());
         return;
      }
      logger.info("Got following book from server: " +response);
   }
   public static void main(String[] args) throws Exception {
      String bookName = args[0];
      String serverAddress = "localhost:50051";
      ManagedChannel channel =ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreClientUnaryBlockingTimeout client = newBookStoreClientUnaryBlockingTimeout(channel);
         client.getBook(bookName);
      } finally {
         channel.shutdownNow().awaitTermination(5,
         TimeUnit.SECONDS);
      }
   }
}

上述代码使用要搜索的标题调用服务器。但更重要的是,它为 gRPC 调用提供了2 秒的超时

现在让我们看看它的实际效果。要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 上启动服务器 −

java -cp .	arget\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerUnaryTimeout

我们将看到以下输出 −

输出

Jul 31, 2021 12:29:31 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout start
INFO: Server started, listening on 50051

以上输出表明服务器已启动。

Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: Searching for book with title: Great

Jul 31, 2021 12:29:35 PM
com.tp.bookstore.BookeStoreServerUnaryTimeout$BookStoreImpl
first
INFO: This may take more time...

现在,让我们启动客户端。

java -cp .	arget\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout Great

我们将得到以下输出 −

输出

Jul 31, 2021 12:29:34 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook

INFO: Querying for book with title: Great
Jul 31, 2021 12:29:36 PM
com.tp.bookstore.BookStoreClientUnaryBlockingTimeout getBook
WARNING: RPC failed: Status{code=DEADLINE_EXCEEDED,
description=deadline exceeded after 1.970455800s.
[buffered_nanos=816522700,
remote_addr=localhost/127.0.0.1:50051], cause=null}

因此,我们可以看到,客户端在 2 秒内没有收到响应,因此它取消了请求并将其称为超时,即 DEADLINE_EXCEEDED

请求取消

gRPC 支持从客户端和服务器端取消请求。客户端可以在运行时指定在取消请求之前要等待的时间。服务器还可以在其端检查是否需要满足请求或客户端是否已经放弃请求。

让我们看一个客户端流式传输的示例,其中客户端调用取消。因此,这是我们的服务器代码

示例

package com.tp.bookstore;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.BookSearch;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookeStoreServerClientStreaming {
   private static final Logger logger = Logger.getLogger(BookeStoreServerClientStreaming.class.getName());
   static Map<String, Book> bookMap = new HashMap<>();
   static {
      bookMap.put("Great Gatsby", Book.newBuilder().setName("Great Gatsby").setAuthor("Scott   Fitzgerald").setPrice(300).build());
      bookMap.put("To Kill MockingBird", Book.newBuilder().setName("To Kill MockingBird").setAuthor("Harper Lee")
        .setPrice(400).build());
      bookMap.put("Passage to India", Book.newBuilder().setName("Passage to India").setAuthor("E.M.Forster")
         .setPrice(500).build());
      bookMap.put("The Side of Paradise", Book.newBuilder().setName("The Side of Paradise").setAuthor("Scott Fitzgerald")
         .setPrice(600).build());
      bookMap.put("Go Set a Watchman", Book.newBuilder().setName("Go Set a Watchman").setAuthor("Harper Lee")
         .setPrice(700).build());
   }
   private Server server;
   private void start() throws IOException {
      int port = 50051;
      server = ServerBuilder.forPort(port).addService(new BookStoreImpl()).build().start();

      logger.info("Server started, listening on " + port);
      Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
            System.err.println("Shutting down gRPC server");
            try {
               server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               e.printStackTrace(System.err);
            }
         }
      });
   }
   public static void main(String[] args) throws IOException,InterruptedException {
      final BookeStoreServerClientStreaming greetServer = newBookeStoreServerClientStreaming();
      greetServer.start();
      greetServer.server.awaitTermination();
   }
   static class BookStoreImpl extendsBookStoreGrpc.BookStoreImplBase {
      @Override
      public StreamObserver<Book>
      totalCartValue(StreamObserver<Cart> responseObserver) {
         return new StreamObserver<Book>() {
            ArrayList<Book> bookCart = new ArrayList<Book>();
            @Override
            public void onNext(Book book) {
               logger.info("Searching for book with titlestarting with: " + book.getName());
               for (Entry<String, Book> bookEntry : bookMap.entrySet()) {
                  if(bookEntry.getValue().getName().startsWith(book.getName())){
                     logger.info("Found book, adding to cart:....");
                     bookCart.add(bookEntry.getValue());
                  }
               }
            }
            @Override
            public void onError(Throwable t) {
               logger.info("Error while reading book stream:" + t);
            }
            @Override
            public void onCompleted() {
               int cartValue = 0;
               for (Book book : bookCart) {
                  cartValue += book.getPrice();
               }
               responseObserver.onNext(Cart.newBuilder().setPrice(cartValue).setBooks(bookCart.size()).build());
               responseObserver.onCompleted();
            }
         };   
      }
   }
}

此服务器代码是客户端流式传输的一个简单示例。服务器仅跟踪客户端想要的书籍,最后提供订单的购物车总价值。

但这里就取消请求而言没有什么特别的,因为这是客户端会调用的。所以,让我们看看客户端代码。

package com.tp.bookstore;

import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.tp.bookstore.BookStoreGrpc.BookStoreStub;
import com.tp.bookstore.BookStoreOuterClass.Book;
import com.tp.bookstore.BookStoreOuterClass.Cart;

public class BookStoreClientStreamingClientCancelation {
   private static final Logger logger = Logger.getLogger(BookStoreClientStreamingClientCancelation.class.getName());
   private final BookStoreStub stub;
   StreamObserver<Book> streamClientSender;
   private CancellableContext withCancellation;
   public BookStoreClientStreamingClientCancelation(Channel channel) {
      stub = BookStoreGrpc.newStub(channel);
   }
   public StreamObserver<Cart> getServerResponseObserver(){
      StreamObserver<Cart> observer = new StreamObserver>Cart<(){
         @Override
         public void onNext(Cart cart) {
            logger.info("Order summary:" 
               + "
Total number of Books: " 
               + cart.getBooks() 
               + "
Total Order Value: " 
               + cart.getPrice());
         }
         @Override
         public void onError(Throwable t) {
            logger.info("Error while reading response from Server: " + t);
         }
         @Override
            public void onCompleted() {
         }
      };
      return observer;
   }
   public void addBook(String book) {
      logger.info("Adding book with title starting with: " + book);
      Book request = Book.newBuilder().setName(book).build();

      if(streamClientSender == null) {
         withCancellation = Context.current().withCancellation();
         streamClientSender = stub.totalCartValue(getServerResponseObserver());
      }
      try {
         streamClientSender.onNext(request);
      }
      catch (StatusRuntimeException e) {
         logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      }
   }
   public void completeOrder() {
      logger.info("Done, waiting for server to create order summary...");
      if(streamClientSender != null);
      streamClientSender.onCompleted();
   }
   public void cancelOrder() {
      withCancellation.cancel(null);
   }
   public static void main(String[] args) throws Exception {
      String serverAddress = "localhost:50051";
      ManagedChannel channel = ManagedChannelBuilder.forTarget(serverAddress).usePlaintext().build();
      try {
         BookStoreClientStreamingClientCancelation client = new BookStoreClientStreamingClientCancelation(channel); String bookName = "";
         while(true) {
            System.out.println("Type book name to be added to the cart....");
            bookName = System.console().readLine();
            if(bookName.equals("EXIT")) {
               client.completeOrder();
               break;
            }
            if(bookName.equals("CANCEL")) {
               client.cancelOrder();
               break;
            }
            client.addBook(bookName);
         }
      } finally {
         channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
      }
   }
}

因此,如果我们看到上面的代码,以下行定义了一个启用了取消功能的上下文。

withCancellation = Context.current().withCancellation();

这是当用户输入 CANCEL 时会调用的方法。这将取消订单并让服务器知道。

public void cancelOrder() {
   withCancellation.cancel(null);
}

现在让我们看看实际效果。要运行代码,请启动两个 shell。通过执行以下命令在第一个 shell 上启动服务器。

java -cp .	arget\grpc-point-1.0.jar
com.tp.bookstore.BookeStoreServerClientStreaming

我们将看到以下输出 −

输出

Jul 31, 2021 3:29:58 PM
com.tp.bookstore.BookeStoreServerClientStreaming start
INFO: Server started, listening on 50051

以上输出表示服务器已启动。

Now, let us start the client

java -cp .	arget\grpc-point-1.0.jar
com.tp.bookstore.BookStoreClientStreamingClientCancelation

我们将得到以下输出 −

输出

Type book name to be added to the cart....
Great
Jul 31, 2021 3:30:55 PM
com.tp.bookstore.BookStoreClientStreamingClientCancelation
addBook
INFO: Adding book with title starting with: Great

Type book name to be added to the cart....
CANCEL
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookStoreClientStreamingClientCancelation$1
onError
INFO: Error while reading response from Server:

io.grpc.StatusRuntimeException: UNAVAILABLE: Channel
shutdownNow invoked

我们将在服务器日志中获得以下数据 −

INFO: Searching for book with title starting with: Great
Jul 31, 2021 3:30:56 PM
com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp
l$1 onNext
INFO: Found book, adding to cart:....
Jul 31, 2021 3:30:58 PM
com.tp.bookstore.BookeStoreServerClientStreaming$BookStoreImp
l$1 onError
INFO: Error while reading book stream:
io.grpc.StatusRuntimeException: CANCELLED: client cancelled

因此,我们可以看到,客户端发起了取消向服务器发出的请求的行动。服务器也收到了有关取消的通知。