ZeroMQ - 同步消息处理

ZeroMQ 是一个高性能异步通用消息库,用于构建分布式或并发应用程序。它也被称为ØMQ。众所周知,ZeroMQ 主要用于异步消息传递,您也可以使用各种套接字类型和消息传递模式实现同步消息处理模式。

关键概念

以下是在 ZeroMQ 中执行同步消息处理时应该了解的重要主题 −

  • 套接字:在 ZeroMQ 中,套接字是客户端和服务器之间发送或接收数据的端点(API)。它表示两个组件(例如进程或设备)之间的通信通道,允许它们交换数据。 ZeroMQ 为各种消息传递模式(如 PUB/SUB、REQ/REP、PUSH/PULL 等)提供不同类型的套接字。

  • 消息队列:ZeroMQ 套接字用于发送和接收消息。它们不需要专用的消息代理,因为消息传递模式是在库本身中实现的,这使得它"无需代理"。

  • 异步特性:ZeroMQ 主要设计为异步,这意味着它被设计为一次处理多个操作,这就是为什么它经常用于高性能、非阻塞通信的原因。

这是同步消息处理图 −

同步消息处理

实现同步处理

同步过程是一种操作,其中任务或操作按顺序执行,每个步骤或操作都等待前一个步骤或操作完成后才能继续。

要使用默认遵循异步机制的 ZeroMQ 实现同步处理,您可以使用特定模式和套接字类型。同步消息处理的最常见方法是使用遵循请求-回复模式的 REQ(请求)和 REP(回复)套接字。在 REQ/REP 请求模式中 −

  • REQ Socket:客户端使用它向服务器发送请求或数据。
  • REP Socket:服务器使用它来响应收到的请求。

Java 中的同步消息处理

以下是在 Java 中使用 ZeroMQ 进行同步消息处理的基本示例 −

服务器代码(REP Socket)

import org.zeromq.ZContext;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;

public class Response {
   public static void main(String[] args) {
      try (ZContext context = new ZContext()) {
         ZMQ.Socket socket = context.createSocket(ZMQ.REP);
         socket.bind("tcp://127.0.0.1:5555");

         while (true) {
            // 等待客户端的下一个请求
            String message = socket.recvStr();
            System.out.println("Received request: " + message);

            // 向客户端发送回复
            socket.send("Welcome, " + message);
         }
      }
   }
}

Client code (REQ Socket)

import org.zeromq.ZContext;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;

public class Request {
   public static void main(String[] args) {
      ZMQ.Context context = ZMQ.context(1);
      ZMQ.Socket socket = context.socket(ZMQ.REQ);
      socket.connect("tcp://127.0.0.1:5555");

      // 发送请求
      socket.send("to World".getBytes(ZMQ.CHARSET), 0);

      // 接收回复
      byte[] reply = socket.recv(0);
      System.out.println("Received reply: "+new String(reply,ZMQ.CHARSET));

      socket.close();
      context.term();
   }
}

输出

执行上述程序后,将显示以下输出 −

收到回复:欢迎来到世界

Python 中的同步消息处理

以下是在 Python 中使用 ZeroMQ 进行同步消息处理的基本示例 −

服务器代码(REP 套接字)

import zmq
def main():
   context = zmq.Context()
   socket = context.socket(zmq.REP)
   socket.bind("tcp://127.0.0.1:5555")

   while True:
      # 等待客户端的下一个请求
      message = socket.recv_string()
      print(f"Received request: {message}")

      # 向客户端发送回复
      socket.send_string(f"Welcome, {message}")

if __name__ == "__main__":
   main()

客户端代码(REQ Socket)

import zmq
def main():
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://127.0.0.1:5555")
    
    # 发送请求
    socket.send_string("to World")
    
    # 接收回复
    message = socket.recv_string()
    print(f"Received reply:{message}")

if __name__ == "__main__":
   	main()

输出

上述程序产生以下输出−

Received reply: Welcome, to World