基础版生产者消费者模型

这里我们构建一个最基本的生产者消费者模型,多个Producer线程往队列中写入数据,一个Consumer线程从队列中按顺序取出数据。基本代码如下

public class Consumer extends Thread {

    private final AtomicInteger producerCnt;

    private ArrayBlockingQueue<String> queue;

    @Override
    public void run() {
        try {
            int cntVal = 0;
            while (producerCnt.intValue() != 0 || !queue.isEmpty()) {
                String pollVal = queue.poll(1, TimeUnit.SECONDS);
                System.out.println("poll a value: "+pollVal);
                cntVal++;
            }
            System.out.println("finished, total count: "+cntVal);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public Consumer(ArrayBlockingQueue<String> queue, AtomicInteger cnt) {
        this.queue = queue;
        this.producerCnt = cnt;
    }
}
public class Producer extends Thread {

    private ArrayBlockingQueue<String> queue;

    private AtomicInteger producerCnt;

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();
        try {
            for (int i = 0; i < 100; i++) {
                queue.put(threadName + "     " + i);
            }
            System.out.println("Producer finished:"+threadName);
            producerCnt.decrementAndGet();
        } catch (InterruptedException e) {
            e.printStackTrace();
            producerCnt.decrementAndGet();
        }
    }

    public Producer(ArrayBlockingQueue<String> queue, AtomicInteger cnt) {
        this.queue = queue;
        this.producerCnt = cnt;
    }
}

接下来我们再分别启动Producer和Consumer线程

public static void main(String[] args){
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5,true);
    AtomicInteger cnt = new AtomicInteger(3);

    Producer producer1 = new Producer(queue,cnt);
    Producer producer2 = new Producer(queue,cnt);
    Producer producer3 = new Producer(queue,cnt);
    Consumer consumer = new Consumer(queue,cnt);

    producer1.start();
    producer2.start();
    producer3.start();
    consumer.start();
}

执行下,可以看到这样的输出内容


poll a value: Thread-0     0
poll a value: Thread-0     1
poll a value: Thread-0     2
poll a value: Thread-0     3
poll a value: Thread-0     4
poll a value: Thread-0     5
poll a value: Thread-1     0
poll a value: Thread-1     1

......

poll a value: Thread-2     96
poll a value: Thread-0     92
Producer finished:Thread-2
poll a value: Thread-2     97
poll a value: Thread-0     93
poll a value: Thread-2     98
poll a value: Thread-0     94
poll a value: Thread-2     99
Producer finished:Thread-0
poll a value: Thread-0     95
poll a value: Thread-0     96
poll a value: Thread-0     97
poll a value: Thread-0     98
poll a value: Thread-0     99
finished, total count: 300

Process finished with exit code 0

其中有几处细节,这里的队列使用了ArrayBlockingQueue,ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。

在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。

使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

该队列在初始化的时候声明了长度,并且第二个参数设置为true,new ArrayBlockingQueue<>(5,true)。设置了队列的固定长度为5,当生产者的效率大于消费者的效率的时候,一般队列的长度会越来越长导致占用内存越来大,而此处设置了固定长度,则可以避免此类问题。另外第二个参数

public ArrayBlockingQueue(int capacity, boolean fair)

表示的是是否使用公平锁,设置为true使用公平锁,可以避免某个写入线程长时间无法得到写入锁导致长时间等待的问题。

另外Producer线程中往队列写入使用的是queue.put()方法,当队列已满的时候,写入线程会一直等待

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

而消费者模型中从队列获取数据使用的是queue.poll(1, TimeUnit.SECONDS),该方法使用了两个参数,一个超时时间的数值、一个是单位,当尝试从队列中获取对象的时候如果读取不到,则会等待,超过了设定的时间之后则会超时,方法的源码如下

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

所以在consumer中读取队列的逻辑之外我们需要额外加一个while循环

 while (producerCnt.intValue() != 0 || !queue.isEmpty())

一个是判断当前是否仍有producer线程仍在运行当中,另一个是判断ArrayBlockingQueue队列中是否仍有数据。producerCnt使用了AtomicInteger类型的对象,以保证线程安全。不过这里仍然存在一点细节问题,如果producer线程的数据生产效率比较低,某一时刻队列中的数据被消费完了,而consumer线程从队列中读取对象却超时了,此时会得到一个null,即queue.poll(1, TimeUnit.SECONDS)超时了没有取到队列中的对象会返回null,所以我们需要对取出来的对象进行非null判断。

至此,这样一个基础的生产者消费者模型算是基本搭建完成。接下来,我们需要在这份代码的基础上实现一个大量数据异步导出的服务模块。

修改为应用中可执行的服务

接下来的一个步骤,我们需要将原来单独的生产者消费者模型代码修改为一般应用中的Service代码

更新待续……