Hazelcast - IQueue
java.util.concurrent.BlockingQueue 提供了一个接口,支持 JVM 中的线程以不同的速率生成和使用消息。 生产者根据可用容量进行阻塞,而消费者则根据队列中可用的元素进行阻塞。
类似地,IQueue 扩展了 BlockingQueue 并使其成为分布式版本。 它提供类似的功能:put、take 等。
关于 IQueue 需要注意的一个要点是,与其他集合不同,数据没有分区。 所有数据都存储/存在于单个 JVM 上。 所有 JVM 仍然可以访问数据,但队列无法扩展到单个机器/JVM 之外。 如果元素数量增加超出可用内存,则会引发 OutOfMemoryException。
队列支持同步备份和异步备份。 同步备份可确保即使保存队列的 JVM 出现故障,所有元素都将被保留并可从备份中获取。
让我们看一个有用函数的示例。
添加元素和读取元素
让我们在 3 个 JVM 上执行以下代码。 一个生产者代码和另外两个消费者代码。
示例
第一部分是生产者代码,它创建一个队列并向其中添加项目。
public static void main(String... args) throws IOException, InterruptedException { //initialize hazelcast instance HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); // create a queue IQueue<String> hzFruits = hazelcast.getQueue("fruits"); String[] fruits = {"Mango", "Apple", "Banana", "Watermelon"}; for (String fruit : fruits) { System.out.println("Producing: " + fruit); Thread.sleep(1000); } System.exit(0); }
第二部分是读取元素的消费者代码。
public static void main(String... args) throws IOException, InterruptedException { //initialize hazelcast instance HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(); IQueue<String> hzFruits = hazelcast.getQueue("fruits"); while(!hzFruits.isEmpty()) { System.out.println("Consuming: " + hzFruits.take()); Thread.sleep(2000); } System.exit(0); }
输出
生产者代码的输出显示它无法添加现有元素。
Producing Mango Producing Apple Producing Banana Producing Watermelon
第一个使用者的代码输出显示它消耗了部分数据。
Consuming Mango Consuming Banana
第二个使用者的代码输出显示它消耗了数据的其他部分 −
Consuming Apple Consuming Watermelon
有用的方法
Sr.No | 函数名称 & 描述 |
---|---|
1 | add(Type element) 向列表添加元素 |
2 | remove(Type element) 从列表中删除一个元素 |
3 | poll() 返回队列头,如果队列为空则返回NULL |
4 | take() 返回队列头或等待元素可用 |
5 | size() 返回列表中元素的数量 |
6 | contains(Type element) 如果元素存在则返回 |
7 | getPartitionKey() 返回保存列表的分区键 |
6 | addItemListener(ItemListener<Type>listener, value) 通知订阅者列表中的元素被删除/添加/修改。 |
hazelcast_data_structures.html