RabbitMQ - 快速指南

RabbitMQ - 概述

什么是 RabbitMQ?

RabbitMQ 是一个用 Java 编写的开源消息代理。它完全符合 JMS 1.1 标准。它由 Apache 软件基金会开发和维护,并根据 Apache 许可证获得许可。它为企业级消息传递应用程序提供高可用性、可扩展性、可靠性、性能和安全性。

JMS 是一种允许开发基于消息的系统的规范。 RabbitMQ 充当位于应用程序之间的消息代理,允许它们以异步和可靠的方式进行通信。

AMQ

消息类型

为了更好地理解,下面解释了两种类型的消息选项。

点对点

在这种类型的通信中,代理只向一个消费者发送消息,而其他消费者将等待,直到他们从代理那里获得消息。没有消费者会收到相同的消息。

如果没有消费者,代理将保留消息,直到它找到一个消费者。这种类型的通信也称为基于队列的通信,其中生产者将消息发送到队列,并且只有一个消费者从队列中获取一条消息。如果有多个消费者,他们可能会收到下一条消息,但不会收到与其他消费者相同的消息。

点对点消息传递

发布/订阅

在这种类型的通信中,代理会向所有活跃消费者发送相同的消息副本。这种类型的通信也称为基于主题的通信,其中代理会向已订阅特定主题的所有活跃消费者发送相同的消息。此模型支持单向通信,无需验证传输的消息。

发布/订阅消息

RabbitMQ - 环境设置

本章将指导您如何准备开发环境以开始使用 RabbitMQ。它还将教您如何在设置 RabbitMQ 之前在您的机器上设置 JDK、Maven 和 Eclipse −

设置 Java 开发工具包 (JDK)

您可以从 Oracle 的 Java 站点下载最新版本的 SDK − Java SE 下载。您将在下载的文件中找到安装 JDK 的说明,请按照给出的说明进行安装和配置设置。最后设置 PATH 和 JAVA_HOME 环境变量以引用包含 java 和 javac 的目录,通常分别为 java_install_dir/bin 和 java_install_dir。

如果您运行的是 Windows 并且已在 C:\jdk-11.0.11 中安装了 JDK,则必须在 C:\autoexec.bat 文件中输入以下行。

set PATH=C:\jdk-11.0.11;%PATH%
set JAVA_HOME=C:\jdk-11.0.11

或者,在 Windows NT/2000/XP 上,您必须右键单击"我的电脑",选择"属性"→"高级"→"环境变量"。然后,您必须更新 PATH 值并单击"确定"按钮。

在 Unix(Solaris、Linux 等)上,如果 SDK 安装在 /usr/local/jdk-11.0.11 中,并且您使用 C shell,则必须将以下内容放入 .cshrc 文件中。

setenv PATH /usr/local/jdk-11.0.11/bin:$PATH
setenv JAVA_HOME /usr/local/jdk-11.0.11

或者,如果您使用集成开发环境 (IDE),如 Borland JBuilder、Eclipse、IntelliJ IDEA 或 Sun ONE Studio,则必须编译并运行一个简单的程序来确认 IDE 知道您在哪里安装了 Java。否则,您必须按照 IDE 文档中给出的说明进行适当的设置。

设置 Eclipse IDE

本教程中的所有示例都是使用 Eclipse IDE 编写的。因此,我们建议您在计算机上安装最新版本的 Eclipse。

要安装 Eclipse IDE,请从 www.eclipse.org/downloads 下载最新的 Eclipse 二进制文件。下载安装程序后,将二进制分发包解压到方便的位置。例如,在 Windows 上是 C:\eclipse,在 Linux/Unix 上是 /usr/local/eclipse,最后适当地设置 PATH 变量。

可以在 Windows 计算机上执行以下命令来启动 Eclipse,或者只需双击 eclipse.exe 即可。

%C:\eclipse\eclipse.exe 

在 Unix(Solaris、Linux 等)机器上执行以下命令即可启动 Eclipse −

$/usr/local/eclipse/eclipse

启动成功后,如果一切正常,则应显示以下结果 −

Eclipse 主页

设置 Maven

在本教程中,我们使用 maven 运行和构建基于 spring 的示例,以运行基于 RabbitMQ 的应用程序。按照 Maven - 环境设置 安装 maven。

RabbitMQ - 功能

RabbitMQ 是最流行的开源消息代理之一。它旨在为企业级消息传递应用程序提供高可用性、可扩展性、可靠性、性能和安全性。以下是 RabbitMQ 的一些显著特点。

  • 轻量级 − RabbitMQ 轻量级,并且非常容易在本地和云端安装。

  • 连接选项 − RabbitMQ 支持多种消息传递协议,并且可以部署在分布式/联合配置中,以满足高可用性和可扩展性的要求。

  • 可插入式架构 − RabbitMQ 允许选择持久性机制,并且还提供了根据应用程序需求自定义身份验证和授权安全性的选项。

  • 多平台 − RabbitMQ 为许多流行语言(如 Java、Python、JavaScript、Ruby 等)提供客户端 API。

  • 代理群集 − RabbitMQ 可以部署为群集以实现高可用性和吞吐量。它可以跨多个可用区域和地区联合。

  • 功能丰富 − RabbitMQ 为代理和客户端提供了许多高级功能。

  • 简单的管理界面 − RabbitMQ 管理控制台易于使用,但仍提供许多强大的管理功能。

  • 企业和云就绪 − RabbitMQ 支持可插入式身份验证和授权。它支持 LDAP 和 TLS。它可以轻松部署在公共云和私有云中。

  • 功能丰富 − RabbitMQ 为代理和客户端提供了许多高级功能。它提供插件来支持持续集成、运营指标以及与其他企业系统的集成等。

  • 管理 − RabbitMQ 提供 HTTP API、命令行工具和 UI 来管理和监控 RabbitMQ。

RabbitMQ - 安装

RabbitMQ 建立在 Erlang 运行时上,因此在安装 RabbitMQ 之前,我们需要下载 Erlang 并安装它。确保您使用管理员权限安装 Erlang 和 RabbitMQ。

Erlang

Erlang 是一种通用编程语言和运行时环境。您可以从其主页下载最新版本的 Erlang − 下载 Erlang/OTP。。我们正在 Windows 上安装 Erlang,并下载了适用于 Windows 的 Erlang/OTP 24.2.2 安装程序 - otp_win64_24.2.2.exe。

下载 Erlang

现在使用安装程序安装 Erlang,双击它并按照默认选择完成设置。

Erlang 安装

RabbitMQ 安装

从其官方下载页面下载 RabbitMQ 最新二进制文件,我们已下载 3.9.13 作为 rabbitmq-server-3.9.13.exe windows。

RabbitMQ 下载

现在使用安装程序双击安装 RabbitMQ,然后按照默认选择完成设置。

RabbitMQ 安装

默认情况下,RabbitMQ 作为 windows 服务运行。要启用基于 Web 的管理 UI,需要执行以下步骤。

转到 RabbitMQ 安装目录并键入如下所示的命令 −

C:\Program Files\RabbitMQ Server
abbitmq_server-3.9.13\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@DESKTOP-86KD9FC:
rabbitmq_management
The following plugins have been configured:
   rabbitmq_management
   rabbitmq_management_agent
   rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-86KD9FC...
The following plugins have been enabled:
   rabbitmq_management
   rabbitmq_management_agent
   rabbitmq_web_dispatch

started 3 plugins.

C:\Program Files\RabbitMQ Server
abbitmq_server-3.9.13\sbin>rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
Enabling plugins on node rabbit@DESKTOP-86KD9FC:
rabbitmq_shovel
rabbitmq_shovel_management
The following plugins have been configured:
   rabbitmq_management
   rabbitmq_management_agent
   rabbitmq_shovel
   rabbitmq_shovel_management
   rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-86KD9FC...
The following plugins have been enabled:
   rabbitmq_shovel
   rabbitmq_shovel_management

started 2 plugins.

C:\Program Files\RabbitMQ Server
abbitmq_server-3.9.13\sbin>

使用管理权限编辑 C:\Windows\System32\drivers\etc\hosts 文件,并在其中添加以下行 −

127.0.0.1 rabbitmq

验证安装

现在打开 http://rabbitmq:15672/ 以打开管理控制台。使用 guest/guest 登录。

RabbitMQ 管理控制台

RabbitMQ - 生产者应用程序

现在让我们创建一个将消息发送到 RabbitMQ 队列的生产者应用程序。

创建项目

使用 eclipse,选择 文件新建Maven 项目。勾选创建一个简单的项目(跳过原型选择)并单击下一步。

输入详细信息,如下所示 −

  • groupId − com.tutorialspoint

  • artifactId − 生产者

  • version − 0.0.1-SNAPSHOT

  • name − RabbitMQ 生产器

单击"完成"按钮,将创建一个新项目。

pom.xml

现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>producer</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Producer</name>
   <properties>
      <java.version>11</java.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>
</project>

现在创建一个生产者类,它将向 RabbitMQ 队列发送消息。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
   private static String QUEUE = "MyFirstQueue";

   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
         channel.queueDeclare(QUEUE, false, false, false, null);

         Scanner input = new Scanner(System.in);
         String message;
         do {
            System.out.println("Enter message: ");
            message = input.nextLine();
            channel.basicPublish("", QUEUE, null, message.getBytes());
         } while (!message.equalsIgnoreCase("Quit"));
      }
   }
}

生产者类创建连接,创建通道,连接到队列。如果用户输入退出,则应用程序终止,否则它将使用 basicPublish 方法将消息发送到队列。

我们将在 RabbitMQ - 测试应用程序 一章中运行此应用程序。

RabbitMQ - 消费者应用程序

现在让我们创建一个消费者应用程序,它将从 RabbitMQ 队列接收消息。

创建项目

使用 eclipse,选择 文件新建Maven 项目。勾选创建一个简单的项目(跳过原型选择)并单击下一步。

输入详细信息,如下所示:

  • groupId − com.tutorialspoint

  • artifactId − consumer

  • version − 0.0.1-SNAPSHOT

  • name − RabbitMQ Consumer

单击完成按钮,将创建一个新项目。

pom.xml

现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>consumer</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Consumer</name>
      <properties>
      <java.version>11</java.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>
</project>

现在创建一个从 RabbitMQ 队列接收消息的消费者类。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {
   private static String QUEUE = "MyFirstQueue";

   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE, false, false, false, null);
      System.out.println("Waiting for messages. To exit press CTRL+C");

      DeliverCallback deliverCallback = (consumerTag, delivery) -> {
         String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
         System.out.println("Received '" + message + "'");
      };
      channel.basicConsume(QUEUE, true, deliverCallback, consumerTag -> { });
   }
}

消费者类创建连接,创建通道,如果不存在则创建队列,然后从队列接收消息(如果有),并继续轮询队列中的消息。消息传递后,将由 basicConsume() 方法使用 deliverCallback 进行处理。

我们将在 RabbitMQ - 测试应用程序 一章中运行此应用程序。

RabbitMQ - 测试应用程序

启动生产者应用程序

在 eclipse 中,右键单击 Producer.java 源,然后选择 Run As → Java 应用程序。生产者应用程序将开始运行,您将看到以下输出 −

Enter message:

启动消费者应用程序

在 eclipse 中,右键单击 Consumer.java 源,然后选择 Run As → Java Application。消费者应用程序将开始运行,您将看到以下输出 −

Waiting for messages. To exit press CTRL+C

发送消息

在生产者控制台窗口中,输入 Hi 并按 Enter 按钮发送消息。

Enter message:
Hi

接收消息

在消费者控制台窗口中验证,消息已收到。

Waiting for messages. To exit press CTRL+C
Received = Hi

发送 Quit 作为消息以终止生产者窗口会话并终止客户端窗口会话。

验证

现在在您的浏览器中打开 http://rabbitmq:15672/。它将要求输入凭据。使用 guest/guest 作为用户名/密码,它将加载 RabbitMQ 管理控制台,您可以在其中检查队列以检查状态。它将显示已排队和已交付的消息。

queue

RabbitMQ - 发布者应用程序

现在让我们创建一个将消息发送到 RabbitMQ Exchange 的发布者应用程序。此交换将把消息传递到与交换绑定的队列。

创建项目

使用 eclipse,选择 FileNew Maven Project。勾选 Create a simple project(skip archetype choice) 并单击 Next。

输入详细信息,如下所示 −

  • groupId − com.tutorialspoint

  • artifactId − publisher

  • version − 0.0.1-SNAPSHOT

  • name − RabbitMQ Publisher

单击"完成"按钮,将创建一个新项目。

pom.xml

现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>publisher</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Publisher</name>
   <properties>
      <java.version>11</java.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>
</project>

现在创建一个 Publisher 类,它将向 RabbitMQ 主题发送消息并将其广播给所有订阅者。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Publisher {
   private static final String EXCHANGE = "MyExchange";
   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      try (Connection connection = factory.newConnection();
      Channel channel = connection.createChannel()) {
         channel.exchangeDeclare(EXCHANGE, "fanout");
         Scanner input = new Scanner(System.in);
         String message;
         do {
            System.out.println("Enter message: ");
            message = input.nextLine();
            channel.basicPublish(EXCHANGE, "", null, message.getBytes());
         } while (!message.equalsIgnoreCase("Quit"));
      }
   }
}

生产者类创建连接、创建通道、声明交换器,然后要求用户输入消息。消息被发送到交换器,作为队列名称,我们没有传递队列名称,因此所有绑定到此交换器的队列都将收到消息。如果用户输入退出,则应用程序终止,否则它将向主题发送消息。

我们将在RabbitMQ - 测试应用程序一章中运行此应用程序。

RabbitMQ - 订阅者应用程序

现在让我们创建一个订阅者应用程序,它将从 RabbitMQ 主题接收消息。

创建项目

使用 eclipse,选择文件新建Maven 项目。勾选创建一个简单的项目(跳过原型选择)并单击下一步。

输入详细信息,如下所示 −

  • groupId − com.tutorialspoint

  • artifactId − 订阅者

  • version − 0.0.1-SNAPSHOT

  • name − RabbitMQ 订阅者

单击完成按钮,将创建一个新项目。

pom.xml

现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>subscriber</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>RabbitMQ Subscriber</name>
   <dependencies>
      <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>5.14.2</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.26</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>1.7.26</version>
      </dependency>
   </dependencies>  
</project>

现在创建一个从 RabbitMQ 队列接收消息的订阅者类。

package com.tutorialspoint.rabbitmq;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Subscriber {
   private static String EXCHANGE = "MyExchange";
   public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      channel.exchangeDeclare(EXCHANGE, "fanout");

      String queueName = channel.queueDeclare().getQueue();
      channel.queueBind(queueName, EXCHANGE, "");
      System.out.println("Waiting for messages. To exit press CTRL+C");

      DeliverCallback deliverCallback = (consumerTag, delivery) -> {
         String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
         System.out.println("Received '" + message + "'");
      };
      channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
   }
}

订阅者类创建连接、创建通道、声明交换器、创建随机队列并将其与交换器绑定,然后接收来自主题的消息(如果有)。按 Ctrl + C 终止,否则它将继续轮询队列以查找消息。

我们将在 RabbitMQ - 测试应用程序 一章中多次运行此应用程序以创建多个订阅者。

RabbitMQ - 测试应用程序主题

启动发布者应用程序

在 eclipse 中,右键单击 Publisher.java 源,然后选择 Run As → Java 应用程序。发布者应用程序将开始运行,您将看到以下输出 −

Enter message:

启动订阅者应用程序

在 eclipse 中,右键单击订阅者.java 源,然后选择运行为 → Java 应用程序。订阅者应用程序将开始运行,您将看到如下输出 −

Waiting for messages. To exit press CTRL+C

启动另一个订阅者应用程序

在 eclipse 中,再次右键单击订阅者.java 源,然后选择运行为 → Java 应用程序。另一个订阅者应用程序将开始运行,您将看到如下输出 −

Waiting for messages. To exit press CTRL+C

发送消息

在发布者控制台窗口中,输入 Hi 并按 Enter 按钮发送消息。

Enter message:
Hi

接收消息

在订阅者控制台窗口中验证,每个窗口都收到了消息。

Received = Hi

发送 Quit 作为消息以终止所有发布者和订阅者控制台窗口会话。

验证

现在在浏览器中打开 http://rabbitmq:15672/。它将要求输入凭据。使用 guest/guest 作为用户名/密码,它将加载 RabbitMQ 管理控制台,您可以在其中检查队列和交换以检查已传递消息和绑定的状态。