Hazelcast - IQueue

java.util.concurrent.BlockingQueue 提供了一个接口,支持 JVM 中的线程以不同的速率生成和使用消息。 生产者根据可用容量进行阻塞,而消费者则根据队列中可用的元素进行阻塞。

类似地,IQueue 扩展了 BlockingQueue 并使其成为分布式版本。 它提供类似的功能:put、take 等。

关于 IQueue 需要注意的一个要点是,与其他集合不同,数据没有分区。 所有数据都存储/存在于单个 JVM 上。 所有 JVM 仍然可以访问数据,但队列无法扩展到单个机器/JVM 之外。 如果元素数量增加超出可用内存,则会引发 OutOfMemoryException。

队列支持同步备份和异步备份。 同步备份可确保即使保存队列的 JVM 出现故障,所有元素都将被保留并可从备份中获取。

让我们看一个有用函数的示例。

添加元素和读取元素

让我们在 3 个 JVM 上执行以下代码。 一个生产者代码和另外两个消费者代码。

示例

第一部分是生产者代码,它创建一个队列并向其中添加项目。

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   // create a queue
   IQueue<String> hzFruits = hazelcast.getQueue("fruits");
   String[] fruits = {"Mango", "Apple", "Banana", "Watermelon"};
   for (String fruit : fruits) {
      System.out.println("Producing: " + fruit);
      Thread.sleep(1000);
   }
   System.exit(0);
}

第二部分是读取元素的消费者代码。

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   IQueue<String> hzFruits = hazelcast.getQueue("fruits");
   while(!hzFruits.isEmpty()) {
   System.out.println("Consuming: " + hzFruits.take());
      Thread.sleep(2000);
   }
   System.exit(0);
}

输出

生产者代码的输出显示它无法添加现有元素。

Producing Mango
Producing Apple
Producing Banana
Producing Watermelon

第一个使用者的代码输出显示它消耗了部分数据。

Consuming Mango
Consuming Banana

第二个使用者的代码输出显示它消耗了数据的其他部分 −

Consuming Apple
Consuming Watermelon

有用的方法

Sr.No 函数名称 & 描述
1

add(Type element)

向列表添加元素

2

remove(Type element)

从列表中删除一个元素

3

poll()

返回队列头,如果队列为空则返回NULL

4

take()

返回队列头或等待元素可用

5

size()

返回列表中元素的数量

6

contains(Type element)

如果元素存在则返回

7

getPartitionKey()

返回保存列表的分区键

6

addItemListener(ItemListener<Type>listener, value)

通知订阅者列表中的元素被删除/添加/修改。

hazelcast_data_structures.html