Java如何使用BlockingQueue对象?

Ajava.util.concurrent.BlockingQueue是扩展的接口,java.util.Queue为阻塞操作添加了特殊支持。当检索元素操作发生时,它将等待或阻止队列变为可用,并等待或阻止存储元素操作,直到队列具有可用空间为止。

该java.util.concurrent.BlockingQueue接口有一些可用的实现。这些实现包括以下类:

  • java.util.concurrent.ArrayBlockingQueue

  • java.util.concurrent.DelayQueue

  • java.util.concurrent.LinkedBlockingDeque

  • java.util.concurrent.LinkedBlockingQueue

  • java.util.concurrent.PriorityBlockingQueue

  • java.util.concurrent.SynchronousQueue

下面是如何使用BlockingQueue的示例。在本例中,我们使用接口的ArrayBlockingQueue实现。这个例子然后为每个生产者和消费者对象创建不同的线程。这两个线程都使用共享阻塞队列,其中生产者对象存储一些元素,消费者对象尝试检索这些元素。

package org.nhooo.example.util.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(32);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer, "Producer").start();
        new Thread(consumer, "Consumer").start();
    }
}

这是 Producer 类。我们定义一个字符串数组,并使用 for-loop 迭代该数组以将元素存储到队列中。我们通过调用 BlockingQueue 的 put ()方法来实现这一点。如果队列中没有可用空间,则 put ()方法阻塞进程。在这里调用 Thread.sleep ()方法会导致 Consumer 在等待队列中可用的对象时阻塞。

package org.nhooo.example.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    BlockingQueue<String> queue;

    Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("Producer.run");
        String[] data = {"D001", "D002", "D003", "D004", "D005"};

        try {
            for (String element : data) {
                queue.put(element);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Consumer对象循环从队列中检索元素。在此示例中,我们使用BlockingQueue的take()方法检索元素并将其打印到控制台中。

package org.nhooo.example.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    BlockingQueue<String> queue;

    Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("Consumer.run");

        while (true) {
            System.out.println("Reading queue...");

            try {
                System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在BlockingQueue具有用于在队列中的存储和检索的元素设置四种不同的方法。每种方法都有不同的行为。


抛出异常特殊值阻塞
超时
Insertadd(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()