基础版生产者消费者模型

这里我们构建一个最基本的生产者消费者模型,多个Producer线程往队列中写入数据,一个Consumer线程从队列中按顺序取出数据。基本代码如下

public class Consumer extends Thread {

    private final AtomicInteger producerCnt;

    private ArrayBlockingQueue<String> queue;

    @Override
    public void run() {
        try {
            int cntVal = 0;
            while (producerCnt.intValue() != 0 || !queue.isEmpty()) {
                String pollVal = queue.poll(1, TimeUnit.SECONDS);
                System.out.println("poll a value: "+pollVal);
                cntVal++;
            }
            System.out.println("finished, total count: "+cntVal);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public Consumer(ArrayBlockingQueue<String> queue, AtomicInteger cnt) {
        this.queue = queue;
        this.producerCnt = cnt;
    }
}
public class Producer extends Thread {

    private ArrayBlockingQueue<String> queue;

    private AtomicInteger producerCnt;

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();
        try {
            for (int i = 0; i < 100; i++) {
                queue.put(threadName + "     " + i);
            }
            System.out.println("Producer finished:"+threadName);
            producerCnt.decrementAndGet();
        } catch (InterruptedException e) {
            e.printStackTrace();
            producerCnt.decrementAndGet();
        }
    }

    public Producer(ArrayBlockingQueue<String> queue, AtomicInteger cnt) {
        this.queue = queue;
        this.producerCnt = cnt;
    }
}
Continue reading