Protobuf - 与 Kafka 集成

我们已经介绍了相当多的 Protobuf 及其数据类型的示例。在本章中,让我们再举一个例子,看看 Protobuf 如何与 Kafka 使用的 Schema Registry 集成。让我们首先了解什么是"Schema Registry"。

Schema Registry

Kafka 是广泛使用的消息队列之一。它用于大规模应用发布者-订阅者模型。有关 Kafka 的更多信息,请点击此处 − https://www.tutorialspoint.com/apache_kafka/index.htm

然而,在基本层面上,Kafka 生产者应该发送一条消息,即 Kafka 消费者 可以读取的一条信息。而发送和使用消息正是我们需要模式的地方。在大型组织中尤其需要,因为有多个团队读取/写入 Kafka 主题。Kafka 提供了一种将此模式存储在模式注册表中的方法,然后在生产者/消费者创建/使用消息时创建/使用模式注册表。

维护模式有两个主要好处 −

  • 兼容性 −在较大的组织中,生成消息的团队必须不破坏使用这些消息的下游工具。模式注册表可确保更改向后兼容。

  • 高效编码 − 发送字段名称,其类型与每条消息一起占用空间且计算效率低下。有了模式,我们就不需要在每条消息中发送此信息了。

模式注册表支持 Avro、Google ProtobufJSON Schema 作为模式语言。这些语言中的模式可以存储在模式注册表中。对于本教程,我们需要 Kafka 设置和 Schema 注册表设置。

要安装 Kafka,您可以查看以下链接 −

安装 Kafka 后,您可以通过更新 /etc/schema-registry/schema-registry.properties 文件来设置 Schema Registry。

# Schema Registry 应监听的位置
listeners=http://0.0.0.0:8081

# Schema Registry 在其下方使用 Kafka,因此我们需要告知 Kafka 代理可用的位置
kafkastore.bootstrap.servers=PLAINTEXT://hostname:9092,SSL://hostname2:9092
完成后,您可以运行:
sudo systemctl start confluent-schema-registry

设置完成后,让我们开始使用 Google Protobuf 和 Schema Registry。

带有 Protobuf Schema 的 Kafka Producer

让我们继续剧院示例。我们将使用以下 Protobuf 模式 −

syntax = "proto3";
package theater;
option java_package = "com.tutorialspoint.theater";

message Theater {
   string name = 1;
   string address = 2;
  
   int32 total_capcity = 3;
   int64 mobile = 4;
   float base_ticket_price = 5;
  
   bool drive_in = 6;
  
   enum PAYMENT_SYSTEM{
      CASH = 0;
      CREDIT_CARD = 1;
      DEBIT_CARD = 2;
      APP = 3;
   }
   PAYMENT_SYSTEM payment = 7;
   repeated string snacks = 8;
   
   map<string, int32> movieTicketPrice = 9;
}

现在,让我们创建一个简单的 Kafka writer,它将以这种格式编码的消息写入 Kafka 主题。但要做到这一点,首先,我们需要向我们的 Maven POM 添加一些依赖项 −

  • Kafka 客户端使用 Kafka 生产者和消费者

  • Kafka Protobuf 序列化器对消息进行序列化和反序列化

  • Slf4j 简单确保我们从 Kafka 获取日志

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.5.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer -->
<dependency>
   <groupId>io.confluent</groupId>
   <artifactId>kafka-protobuf-serializer</artifactId>
   <version>5.5.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-simple</artifactId>
   <version>1.7.30</version>
</dependency>

完成后,让我们创建一个 Kafka 生产者。此生产者将创建并发送一条包含 theater 对象的消息。

package com.tutorialspoint.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.tutorialspoint.theater.TheaterOuterClass.Theater;
import com.tutorialspoint.theater.TheaterOuterClass.Theater.PAYMENT_SYSTEM;

public class KafkaProtbufProducer {
   public static void main(String[] args) throws Exception{
      String topicName = "testy1";
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("clientid", "foo");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
      props.put("schema.registry.url", "http://localhost:8081");
      props.put("auto.register.schemas", "true");
      
      Producer<String, Theater> producer = new KafkaProducer<>(props);
      producer.send(new ProducerRecord<String, Theater>(topicName, "SilverScreen", getTheater())).get();
      System.out.println("Sent to Kafka: 
" + getTheater());
      producer.flush();
      producer.close();
   }
   public static Theater getTheater() {
      List<String> snacks = new ArrayList<>();
      snacks.add("Popcorn");
      snacks.add("Coke");
      snacks.add("Chips");
      snacks.add("Soda");
           
      Map<String, Integer> ticketPrice = new HashMap<>();
      ticketPrice.put("Avengers Endgame", 700);
      ticketPrice.put("Captain America", 200);
      ticketPrice.put("Wonder Woman 1984", 400);
                  
      Theater theater = Theater.newBuilder()
         .setName("Silver Screener")
         .setAddress("212, Maple Street, LA, California")
         .setDriveIn(true)
         .setTotalCapacity(320)
         .setMobile(98234567189L)
         .setBaseTicketPrice(22.45f)
         .setPayment(PAYMENT_SYSTEM.CREDIT_CARD)
         .putAllMovieTicketPrice(ticketPrice)
         .addAllSnacks(snacks)
         .build();
      return theater;
   }
}

以下是我们需要注意的几点 −

  • 我们需要将 Schema Registry URL 传递给生产者。

  • 我们还需要传递特定于 Schema Registry 的正确 Protobuf 序列化器。

  • 发送完成后,Schema Registry 会自动存储 theater 对象的架构。

  • 最后,我们从自动生成的 Java 代码中创建了一个 theater 对象,这就是我们将要发送的内容。

现在让我们编译并执行代码 −

mvn clean install ; java -cp .	arget\protobuf-tutorial-1.0.jar com.tutorialspoint.kafka.KafkaProtbufProducer

我们将看到以下输出 −

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1621692205607
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - 
[Producer clientId=producer-1] Cluster ID: 7kwQVXjYSz--bE47MiXmjw

发送至 Kafka

name: "Silver Screener"
address: "212, Maple Street, LA, California"
total_capacity: 320
mobile: 98234567189
base_ticket_price: 22.45
drive_in: true
payment: CREDIT_CARD
snacks: "Popcorn"
snacks: "Coke"
snacks: "Chips"
snacks: "Soda"
movieTicketPrice {
   key: "Avengers Endgame"
   value: 700
}
movieTicketPrice {
   key: "Captain America"
   value: 200
}
movieTicketPrice {
   key: "Wonder Woman 1984"
   value: 400
}
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - 
[Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

这意味着我们的消息已发送。

现在,让我们确认架构已存储在架构注册表中。

curl -X GET http://localhost:8081/subjects | jq

显示的输出为 "topicName"+"key/value"

[
   "testy1-value"
]

我们还可以看到注册表存储的架构 −

curl  -X GET http://localhost:8081/schemas/ids/1 | jq {
   "schemaType": "PROTOBUF",
   "schema": "syntax = \"proto3\";
package theater;

option java_package = \"com.tutorialspoint.theater\";

message Theater {
      
  string name = 1;
  string address = 2;
  int64 total_capacity = 3;
  
      int64 mobile = 4;
  float base_ticket_price = 5;
  bool drive_in = 6;
  
      .theater.Theater.PAYMENT_SYSTEM payment = 7;
  repeated string snacks = 8;
  
      repeated .theater.Theater.MovieTicketPriceEntry movieTicketPrice = 9;

  
      message MovieTicketPriceEntry {
    option map_entry = true;
  
    
      string key = 1;
    int32 value = 2;
  }
  enum PAYMENT_SYSTEM {
         
 CASH = 0;
    CREDIT_CARD = 1;
    DEBIT_CARD = 2;
    APP = 3;
  
      }

   }
"
}

带有 Protobuf 模式的 Kafka 消费者

现在让我们创建一个 Kafka 消费者。此消费者将使用包含 theater 对象的消息。

package com.tutorialspoint.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.tutorialspoint.theater.TheaterOuterClass.Theater;
import com.tutorialspoint.theater.TheaterOuterClass.Theater.PAYMENT_SYSTEM;

public class KafkaProtbufProducer {
   public static void main(String[] args) throws Exception{
      String topicName = "testy1";
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("clientid", "foo");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         
      props.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
      props.put("schema.registry.url", "http://localhost:8081");
      props.put("auto.register.schemas", "true");
      Producer<String, Theater> producer = new KafkaProducer<>(props);
      producer.send(new ProducerRecord<String, Theater>(topicName, "SilverScreen", getTheater())).get();
      
      System.out.println("Sent to Kafka: 
" + getTheater());
      producer.flush();
      producer.close();
   }
   public static Theater getTheater() {
      List<String> snacks = new ArrayList<>();
      snacks.add("Popcorn");
      snacks.add("Coke");
      snacks.add("Chips");
      snacks.add("Soda");
           
      Map<String, Integer> ticketPrice = new HashMap<>();
      ticketPrice.put("Avengers Endgame", 700);
      ticketPrice.put("Captain America", 200);
      ticketPrice.put("Wonder Woman 1984", 400);
      
      Theater theater = Theater.newBuilder()
         .setName("Silver Screener")
         .setAddress("212, Maple Street, LA, California")
         .setDriveIn(true)
         .setTotalCapacity(320)
         .setMobile(98234567189L)
         .setBaseTicketPrice(22.45f)
         .setPayment(PAYMENT_SYSTEM.CREDIT_CARD)
         .putAllMovieTicketPrice(ticketPrice)
         .addAllSnacks(snacks)
         .build();
      return theater;
   }
}

以下是我们需要注意的要点 −

  • 我们需要将 Schema Registry URL 传递给消费者。

  • 我们还需要传递特定于 Schema Registry 的正确 Protobuf Deserializer。

  • 当我们完成消费后,Schema Registry 会自动读取 theater 对象的存储模式。

  • 最后,我们从自动生成的 Java 代码中创建了一个 theater 对象,这就是我们将要发送的内容。

现在让我们编译并执行代码 −

mvn clean install ; java -cp .	arget\protobuf-tutorial-1.0.jar com.tutorialspoint.kafka.KafkaProtbufConsumer

offset = 0, key = SilverScreen, value = May 22, 2021 7:50:15 PM com.google.protobuf.TextFormat$Printer$MapEntryAdapter compareTo
May 22, 2021 7:50:15 PM com.google.protobuf.TextFormat$Printer$MapEntryAdapter compareTo

name: "Silver Screener"
address: "212, Maple Street, LA, California"
total_capacity: 320
mobile: 98234567189
base_ticket_price: 22.45
drive_in: true
payment: CREDIT_CARD
snacks: "Popcorn"
snacks: "Coke"
snacks: "Chips"
snacks: "Soda"
movieTicketPrice {
   key: "Captain America"
   value: 200
}
movieTicketPrice {
   key: "Wonder Woman 1984"
   value: 400
}
movieTicketPrice {
   key: "Avengers Endgame"
   value: 700
}

因此,我们可以看到,写入 Kafka 的消息已被消费者正确使用。此外,注册中心存储了架构,也可以通过 REST API 访问该架构。