ZeroMQ - 通信模式
ZeroMQ 是一个高性能、异步消息传递库,它提供了一组用于生成分布式应用程序的通信模式。它提供了一种在各种应用程序中实现消息传递模式的简单而结构化的方法。以下是 ZeroMQ 的一些基本核心概念 −
- 套接字作为通信端点:ZeroMQ 套接字支持高级消息传递模式,例如 PUB-SUB 和 PUSH-PULL。
- 异步消息传递:ZeroMQ 套接字异步运行,允许应用程序无阻塞地发送和接收消息。
- 可扩展性和性能:ZeroMQ 被设计为水平扩展,允许多对多配置,同时保持高吞吐量和低延迟。
通信模式
以下是通信模式列表:
- 请求-回复 (REQ-REP)
- 发布-订阅 (PUB-SUB)
- 推拉 (PUSH-PULL)
- 配对 (PAIR)
- DEALER-ROUTER (DEALER-ROUTER)
- ROUTER-DEALER (ROUTER-DEALER)
- XPUB-XSUB
这些通信模式可以以各种方式组合以创建复杂的分布式系统。 ZeroMQ 提供了一个灵活且可扩展的消息传递框架,允许开发人员实现从简单的客户端-服务器配置到复杂的分布式架构的任何内容。
请求-回复
这是一种远程服务调用和任务分配通信模式,可将一组客户端连接到一组服务。这意味着它是一种简单的客户端-服务器通信,客户端向服务器发送请求,服务器以回复进行响应。
例如,考虑一个响应来自客户端的 HTTP 请求的 Web 服务器。
示例
以下是 REQ/REP 的示例。首先,我们连接到服务器并从客户端获取响应。
package com.zeromq.mavenProject; import org.zeromq.SocketType; import org.zeromq.ZMQ; import org.zeromq.ZContext; public class Practice { public static void main(String[] args) { try (ZContext context = new ZContext()) { System.out.println("Connecting to TP server"); // 与服务器通信的套接字 ZMQ.Socket socket = context.createSocket(SocketType.REQ); socket.connect("tcp://localhost:5555"); for (int requestNbr = 0; requestNbr != 5; requestNbr++) { String request = "Tutorialspoint"; System.out.println("Sending TP " + requestNbr); socket.send(request.getBytes(ZMQ.CHARSET), 0); byte[] reply = socket.recv(0); System.out.println( "Received " + new String(reply, ZMQ.CHARSET) + " " + requestNbr ); } } } }
Output
Connecting to TP server Sending TP 0 Received world 0 Sending TP 1 Received world 1 Sending TP 2 Received world 2 Sending TP 3 Received world 3 Sending TP 4 Received world 4
发布-订阅
这是一种数据分发通信模式,将一组发布者链接到一组订阅者。它用于向多个接收者广播消息。
例如,考虑一个向多个订阅者广播新闻文章的新闻源系统。
示例
在此示例中,发布者创建一个 PUB 套接字,绑定到端口,并发送消息。订阅者创建一个 SUB 套接字,连接到发布者,接收消息,打印出收到的消息。
// 发布者 (PUB) 类 import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; // 运行 PUB 注释后,它会运行 SUB。 public class PubSubPublisher { public static void main(String[] args) { Context context = ZMQ.context(1); Socket socket = context.socket(ZMQ.PUB); socket.bind("tcp://*:5555"); while (true) { socket.send("Hello, subscribers!".getBytes(), 0); } } } // 订阅者(SUB)类 public class PubSubSubscriber { public static void main(String[] args) { Context context = ZMQ.context(1); Socket socket = context.socket(ZMQ.SUB); socket.connect("tcp://localhost:5555"); socket.subscribe("".getBytes()); // Subscribe to all messages while (true) { byte[] message = socket.recv(0); System.out.println("Received message: " + new String(message)); } } }
Output
Received message: Hello, subscribers! Received message: Hello, subscribers! Received message: Hello, subscribers! Received message: Hello, subscribers! Received message: Hello, subscribers! Received message: Hello, subscribers!
推拉
此模式对于负载平衡和任务分配非常有用。推送套接字将消息发送到拉取套接字,拉取套接字接收并处理消息。
例如,考虑一个将任务分发到多个工作节点的负载平衡器。
示例
在此示例中,推送器创建一个 PUSH 套接字并连接到拉取器。拉取器创建一个拉取套接字,将其绑定到端口,从推送器接收工作项并打印它们。
package com.zeromq.mavenProject; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; // 运行 PUSH 注释后,它会运行 PULL。 public class PushPullPusher { public static void main(String[] args) { // 创建 PUSH 套接字并连接到拉取器 Context context = ZMQ.context(1); Socket socket = context.socket(ZMQ.PUSH); socket.connect("tcp://localhost:5500"); // 将工作项发送给拉取器 while (true) { socket.send("Work item".getBytes(), 0); } } } public class PushPullPuller { public static void main(String[] args) { // 创建一个 PULL 套接字并将其绑定到一个端口 Context context = ZMQ.context(1); Socket socket = context.socket(ZMQ.PULL); socket.bind("tcp://*:5500"); // 从推送者接收工作项 while (true) { byte[] message = socket.recv(0); System.out.println("Received work item: " + new String(message)); } } }
输出
配对
此模式可用于简单的点对点交互。一对套接字以点对点方式相互连接,以便两个节点可以双向通信。
例如,考虑一个允许两个用户相互通信的聊天应用程序。
示例
在此示例中,我们通过创建两个可以相互发送和接收消息的节点来演示 PAIR 模式。我们正在检查连接是双向的还是异步的。如果是,则每个节点都等待另一个节点响应,然后再发送下一条消息。
package com.zeromq.mavenProject; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; // 运行 PairNode1 注释后,它会运行 PairNode2。 public class PairNode1 { public static void main(String[] args) { // 创建一个 PAIR 套接字并将其绑定到一个端口 Context context = ZMQ.context(1); Socket socket = context.socket(ZMQ.PAIR); socket.bind("tcp://*:5000"); // 从节点 2 接收消息并发送响应 while (true) { byte[] message = socket.recv(0); System.out.println("Received message: " + new String(message)); socket.send("Hello, Node 2!".getBytes(), 0); } } } public class PairNode2 { public static void main(String[] args) { // 创建一个 PAIR 套接字并连接到节点 1 Context context = ZMQ.context(1); Socket socket = context.socket(ZMQ.PAIR); socket.connect("tcp://localhost:5000"); // 发送消息到节点 1 并接收响应 while (true) { socket.send("Hello, Node 1!".getBytes(), 0); byte[] message = socket.recv(0); System.out.println("Received response: " + new String(message)); } } }
输出
Received message: Hello, Node 1! Received message: Hello, Node 1! Received message: Hello, Node 1! Received message: Hello, Node 1! Received message: Hello, Node 1!
Dealer-Router
此模式用于实现复杂的分布式系统。Dealer套接字将消息发送到路由器套接字,路由器套接字将消息分派给一个或多个连接的对等点。
例如,考虑将查询路由到多个节点的分布式数据库系统。
Router-Dealer
此模式有助于实现复杂的分布式系统。路由器套接字从一个或多个连接的Dealer套接字路由消息,每个套接字将其发送到各自的对等点。
例如,考虑路由到多个边缘服务器的内容交付网络。
XPUB-XSUB
此通信模式在创建动态发布-订阅系统时很有用。XPUB 是一个特殊的发布者,允许订阅者动态连接和断开连接。 XSUB 是一个特殊的订阅者,它可以连接到多个发布者。
例如,考虑一个实时分析系统,其中多个发布者将数据发送给多个订阅者。