概述:
在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。当然生产的数据需要放在一个指定的地方以供消费者使用,以下例子以一个实体类作为存储数据的缓存区。这样生产生产数据时,先判断缓存区是否有数据,如果没数据生成数据并放到缓存区,如果有数据则等待消费者把数据处理了,再生成数据。同理消费者要处理数据先判断缓存区是否有数据,如果有数据就将处理了,如果没数据,则通知消费者生产数据。
一生产与一消费:
先定义一个实体类作为缓存区:
public class Entity {//默认该实体并没数据(没有美食)public static String food=""; }
定义一个生产者:
public class Producer {private Object lock = new Object(); //对象锁public Producer(Object lock){this.lock=lock;}public void productFood(){try { synchronized(lock){while(!Entity.food.equals("")){ //有美食,休息,等待消费处理完在生产System.out.println(Thread.currentThread().getName()+"生产者休息中...");lock.wait();} System.out.println(Thread.currentThread().getName()+"生产者做出美食");Entity.food="apple";lock.notify();} } catch (InterruptedException e) {e.printStackTrace();}} }
定义一个消费者:
public class Customer {private Object lock = new Object(); //对象锁public Customer(Object lock){this.lock=lock;}public void killFood(){try { synchronized(lock){while(Entity.food.equals("")){ //没美食,需要等待生产者生产System.out.println(Thread.currentThread().getName()+"消费者排队等候美食...");lock.wait();} System.out.println(Thread.currentThread().getName()+"消费者干掉美食...");Entity.food="";lock.notify(); } } catch (InterruptedException e) {e.printStackTrace();}} }
定义两个线程,生产者线程和消费者线程
1、生产者线程:
public class ProducerRunnable implements Runnable {private Producer producer;public ProducerRunnable(Producer producer){this.producer= producer;}@Overridepublic void run() {while(true){producer.productFood();}} }
2、消费者线程
public class CustomerRunnable implements Runnable {private Customer customer;public CustomerRunnable(Customer customer){this.customer=customer;}@Overridepublic void run() {while(true){customer.killFood();}} }
现在消费者和生产者的已经定义完了,下面我们做一下测试
public class Test {public static void main(String[] args) {Object lock = new Object(); //作为对象锁Producer p = new Producer(lock); //创建一生产者Customer c = new Customer(lock); //创建一消费者new Thread(new ProducerRunnable(p),"A").start(); //调用生产者线程new Thread(new CustomerRunnable(c),"B").start(); //调用消费者线程 } }
此时打印的结果是:
从输出的结果看,程序是正确的,每次都是需要生产者做出美食后,消费者才有美食吃。其实不然,但是多消费者/多生产者时,该程序会出现假死(即全部线程都处于wait状态),我们在Test中创建多个生产者和消费者如下:
public class Test {public static void main(String[] args) {Object lock = new Object(); //作为对象锁Producer p = new Producer(lock); //创建一生产者Customer c = new Customer(lock); //创建一消费者for(int i=0; i<2;i++){new Thread(new ProducerRunnable(p),"A"+i).start(); //调用生产者线程new Thread(new CustomerRunnable(c),"B"+i).start(); //调用消费者线程 }} }
此时打印的结果是:
从打印结果看,所以数据,所有线程都处于等待状态,这主要的原因是notify唤醒线程具有随意性造成的,我们做一下分析:
1、A0生产者生产美食,生产后发出通知(这里属于过早通知,没唤醒任何线程),释放锁,并进入下一次while循环
2、A1生产者准备生产美食,发现已经有美食了,休息等待别人的唤醒
3、A0生产者在下一轮循环,发现有美食,所以休息等待别人的唤醒
4、B1消费者进行消费,发现了美食,干掉美食,唤醒生产者(从输出结果可以看出唤醒了A1),并进入下一次while循环
5、B1消费进入下一轮循环,发现没有美食,所以排队等待美食 (wait状态)
6、B0消费者进行消费,发现没美食,排队等待美食 (wait状态)
7、第五步,B1唤醒了A1,A1生产美食,并唤醒A2(notify是随机唤醒一个线程的),进入下一个while循环
8、A1生产者在下一轮循环,发现有美食,所以休息等待别人的唤醒(wait状态)
9、A0被唤醒,发现有美食,所以休息等待别人的唤醒(wait状态)
从分析结果可以看出所以所以进出处于等待状态,程序处于假死。对于处理这种结果很容易,即把消费者和生产者类的lock.notify()改成lock.notifyAll()即可。
PS:可以注意到我上面判断Entity.food是否为空,用的是while,这里是防止线程不安全用的(在被唤醒的时候,判断一下,我是否真的可以执行)。如果用if的话,可能会出现线程出现异常
先定义一个实体类拥有消费者、生产者和缓存区(这里模拟栈,并且只能放一个数据):
public class MyStack {private List<String> list = new ArrayList<String>(); //存放数据//生产者synchronized public void push(){try {if(list.size()==1){ //有数据,等待 this.wait();} list.add("apple");System.out.println(Thread.currentThread().getName()+" : "+list.size());Thread.sleep(200);this.notify();} catch (InterruptedException e) {e.printStackTrace();} }//消费者synchronized public void pop(){try {if(list.size()==0){ //有数据,等待 this.wait();} list.remove(0);System.out.println(Thread.currentThread().getName()+" : "+list.size());Thread.sleep(200);this.notify();} catch (InterruptedException e) {e.printStackTrace();} } }
定义生产者线程:
public class SProducerRunnable implements Runnable {private MyStack myStack;public SProducerRunnable(MyStack myStack){this.myStack=myStack;}@Overridepublic void run() {while(true){myStack.push();} } }
定义消费者线程:
public class SCustomerRunnable implements Runnable {private MyStack myStack;public SCustomerRunnable(MyStack myStack){this.myStack=myStack;}@Overridepublic void run() {while(true){myStack.pop();}} }
多生产者多消费者测试
public class Test {public static void main(String[] args) {MyStack myStack = new MyStack();for(int i=0; i<2 ;i++){new Thread(new SProducerRunnable(myStack),"A"+i).start(); //调用生产者线程new Thread(new SCustomerRunnable(myStack),"B"+i).start(); //调用消费者线程 } } }
此时输出结果:
可以看出A1会出现添加两个值,而且报IndexOutOfBoundsException错误,这主要是因为用if,但唤醒全部进程的时候,没能像while一样进一步校验是否可以执行。所以多线程判断时,需要用while进行判断(当然根据情况而定)