ETJava Beta | Java    注册   登录
  • 搜索:
  • 生产者消费者模式,以及基于BlockingQueue的快速实现

    发表于      阅读(1)     博客类别:Crawler     转自:https://www.cnblogs.com/jilodream/p/18383435
    如有侵权 请联系我们删除  (页面底部联系我们)  

    生产者消费者模式,以及基于BlockingQueue的快速实现
    什么是生产者消费者模式,
    简单来说就是有两个角色,一个角色主要负责生产数据,一个角色主要负责消费(使用)数据。
    那么生产者直接依赖消费者,然后直接调用是否可以?
    答案是可以的,但是有些场景无法及时解决,典型的就是生产者消费者的速度无法同步,导致整体的速度上不去的情况。执行速度永远取决于二者的最小速度(假设生产者和消费者的速度时快时慢)。
    一般的解决方案是什么呢?
    我们通常会加一个中间件,作为缓冲队列。

    生产者生产好的数据丢到缓冲队列中,消费者根据自身情况及时获取数据,处理数据,如下图:

    典型的实现就是Mq,比如Rocket mq、rabbit mq等,这是微服务、分布式场景下一个常见的中间件。

    在服务内部,我们通常也会出现生产者消费者的模式,这时候引入mq显然有一些过重,我们需要更高效,更简洁的办法。
    最简单的办法就是用一个queue,或者用一个List,生产者从队尾增加数据,消费者从队列头部取数据。
    但是这样就会存在多线程共享数据的场景,需要做一些锁来解决线程同步的问题。
    其次我们消费者在抢占数据时,我们又希望有公平抢占和非公平抢占锁的问题,
    同时我们还要考虑缓冲队列是否要有界,如果无界的话,资源是否会存在耗尽问题,
    而且我们还要考虑是否可以更优雅的平衡生产者和消费者的处理速度,比如:
    如使用线程状态来控制,
    如果缓冲队列满了,那么生产者自己会挂起停止生产,当队列有空余空间时(注意不是全部清空数据),生产者会被唤醒继续生产;
    如果缓冲队列空了,那么消费者自己会挂起停止消费,当队列有数据时(注意不是全部堆满数据),消费者会被唤醒继续消费。
    诸如此类等等问题,如果我们想开发这样一个缓冲队列,要考虑和解决还是比较有难度的。
    而java8 提供的工具包早已看穿了这一切,(防盗连接:本文首发自http://www.cnblogs.com/jilodream/ )并提供了一套优雅的解决方案BlockingQueue
    BlockingQueue 是一个接口定义,我们实际使用的是下边的各个实现类:

    BlockingQueue 中常用的方法如下:

    除此之外还会有获取指定元素(不移除)E element()/E peek(),删除指定元素boolean remove(Object o)等方法这里就先不赘述了。

    下面我们来看一组简单的示例
    生产者生产三种礼物gift: 手机/电脑/LV包
    消费者通通消费
    生产者消费者都使用线程池来维护,具体情况如下代码:

    主类

     1 package com.example.demo.learnblockingqueue;
     2 
     3 import java.util.Comparator;
     4 import java.util.concurrent.ArrayBlockingQueue;
     5 import java.util.concurrent.BlockingQueue;
     6 import java.util.concurrent.DelayQueue;
     7 import java.util.concurrent.ExecutorService;
     8 import java.util.concurrent.Executors;
     9 import java.util.concurrent.LinkedBlockingQueue;
    10 import java.util.concurrent.PriorityBlockingQueue;
    11 import java.util.concurrent.SynchronousQueue;
    12 
    13 /**
    14  * @discription
    15  */
    16 public class BlockQueueMain {
    17     public static void main(String[] args) {
    18 
    19         BlockingQueue<Gift> queue = new ArrayBlockingQueue<>(3);
    20         Gift phone = new Gift("iphone", 8888);
    21         Producer phoneProducer = new Producer(queue, "AAAA  苹果代工厂", phone);
    22 
    23         Gift computer = new Gift("lenovo", 12888);
    24         Producer computerProducer = new Producer(queue, "AAAA  联想代工厂", computer);
    25 
    26         Gift bag = new Gift("LV", 28888);
    27         Producer bagProducer = new Producer(queue, "AAAA  LV代工厂", bag);
    28 
    29         Consumer zConsumer = new Consumer(queue, "BBBB  小Z");
    30         Consumer lConsumer = new Consumer(queue, "BBBB  小L");
    31 
    32         ExecutorService executorService = Executors.newCachedThreadPool();
    33         executorService.submit(phoneProducer);
    34         executorService.submit(computerProducer);
    35         executorService.submit(bagProducer);
    36         executorService.submit(zConsumer);
    37         executorService.submit(lConsumer);
    38 
    39     }
    40 }

    产品类

     1 package com.example.demo.learnblockingqueue;
     2 
     3 import lombok.AllArgsConstructor;
     4 import lombok.Data;
     5 import lombok.NoArgsConstructor;
     6 
     7 import java.util.Date;
     8 import java.util.concurrent.Delayed;
     9 import java.util.concurrent.TimeUnit;
    10 
    11 /**
    12  * @discription
    13  */
    14 @Data
    15 @NoArgsConstructor
    16 public class Gift implements Delayed {
    17 
    18     private String name;
    19 
    20     private int price;
    21 
    22     private final long completeTime = new Date().getTime() + 3000;
    23 
    24     public Gift(Gift giftSample) {
    25         name = giftSample.getName();
    26         price = giftSample.getPrice();
    27     }
    28 
    29     public Gift(String name, int price) {
    30         this.name = name;
    31         this.price = price;
    32     }
    33 
    34     @Override
    35     public long getDelay(TimeUnit unit) {
    36 
    37         return unit.convert(completeTime - new Date().getTime(), TimeUnit.MILLISECONDS);
    38     }
    39 
    40     @Override
    41     public int compareTo(Delayed o) {
    42         return 0;
    43     }
    44 }

    生产者:

     1 package com.example.demo.learnblockingqueue;
     2 
     3 import lombok.extern.slf4j.Slf4j;
     4 
     5 import java.util.concurrent.BlockingQueue;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 /**
     9  * @discription
    10  */
    11 @Slf4j
    12 public class Producer implements Runnable {
    13 
    14     private final BlockingQueue<Gift> queue;
    15 
    16     private final String name;
    17 
    18     private final Gift giftSample;
    19 
    20     public Producer(BlockingQueue<Gift> queue, String name, Gift giftSample) {
    21         this.queue = queue;
    22         this.name = name;
    23         this.giftSample = giftSample;
    24     }
    25 
    26     @Override
    27     public void run() {
    28 
    29         while (true) {
    30             Gift newGift = new Gift(giftSample);
    31             try {
    32                 log.warn(name + "开始生产:{}", newGift);
    33                 queue.put(newGift);
    34                 log.warn(name + "生产了:{}, 剩余空间{}", newGift,  queue.remainingCapacity());
    35 
    36                 TimeUnit.MILLISECONDS.sleep(10000);
    37             } catch (InterruptedException e) {
    38                 log.error("producer  {} catch a ex", name, e);
    39             }
    40         }
    41     }
    42 }

    消费者:

     1 package com.example.demo.learnblockingqueue;
     2 
     3 import lombok.extern.slf4j.Slf4j;
     4 
     5 import java.util.concurrent.BlockingQueue;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 /**
     9  * @discription
    10  */
    11 @Slf4j
    12 public class Consumer implements Runnable {
    13 
    14     private final String name;
    15     private final BlockingQueue<Gift> queue;
    16 
    17 
    18     public Consumer(BlockingQueue<Gift> queue, String name) {
    19         this.queue = queue;
    20         this.name = name;
    21     }
    22 
    23     @Override
    24     public void run() {
    25         while (true) {
    26 
    27             try {
    28                 Gift gift = queue.take();
    29                 log.warn(name + "购买了: {} ,剩余空间: {}", gift, queue.remainingCapacity());
    30                 TimeUnit.MILLISECONDS.sleep(3000);
    31             } catch (InterruptedException e) {
    32                 log.error("Consumer  {} catch a ex", name, e);
    33             }
    34         }
    35     }
    36 }

    执行后效果如下:

    注意由于是多线程在操作,所以某一秒处的日志可能是乱序的,

    感兴趣的同学可以自行分析下

    17:13:37.477 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂开始生产:Gift(name=lenovo, price=12888, completeTime=1724750022471)
    17:13:37.478 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂开始生产:Gift(name=iphone, price=8888, completeTime=1724750022471)
    17:13:37.478 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂开始生产:Gift(name=LV, price=28888, completeTime=1724750022473)
    17:13:37.486 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂生产了:Gift(name=iphone, price=8888, completeTime=1724750022471), 剩余空间1
    17:13:37.486 [pool-1-thread-4] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小Z购买了: Gift(name=iphone, price=8888, completeTime=1724750022471) ,剩余空间: 2
    17:13:37.486 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小L购买了: Gift(name=lenovo, price=12888, completeTime=1724750022471) ,剩余空间: 2
    17:13:37.486 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂生产了:Gift(name=lenovo, price=12888, completeTime=1724750022471), 剩余空间2
    17:13:37.486 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂生产了:Gift(name=LV, price=28888, completeTime=1724750022473), 剩余空间2
    17:13:40.491 [pool-1-thread-4] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小Z购买了: Gift(name=LV, price=28888, completeTime=1724750022473) ,剩余空间: 3
    17:13:47.491 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂开始生产:Gift(name=LV, price=28888, completeTime=1724750032491)
    17:13:47.491 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂开始生产:Gift(name=iphone, price=8888, completeTime=1724750032491)
    17:13:47.491 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂开始生产:Gift(name=lenovo, price=12888, completeTime=1724750032491)
    17:13:47.491 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂生产了:Gift(name=lenovo, price=12888, completeTime=1724750032491), 剩余空间1
    17:13:47.491 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂生产了:Gift(name=LV, price=28888, completeTime=1724750032491), 剩余空间2
    17:13:47.491 [pool-1-thread-4] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小Z购买了: Gift(name=iphone, price=8888, completeTime=1724750032491) ,剩余空间: 2
    17:13:47.491 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂生产了:Gift(name=iphone, price=8888, completeTime=1724750032491), 剩余空间2
    17:13:47.491 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小L购买了: Gift(name=LV, price=28888, completeTime=1724750032491) ,剩余空间: 2

    接下来简单说下BlockingQueue的几种实现方式的内部逻辑和使用场景
    1、ArrayBlockingQueue
    内部使用数组实现缓冲区,(防盗连接:本文首发自http://www.cnblogs.com/jilodream/ )有界队列;
    使用ReentrantLock锁来保证线程安全
    可以满足大部分的业务场景。

    2、LinkedBlockingQueue
    内部使用了单链表实现了缓冲区,可以是无界队列,也可以是有界队列;
    使用读写锁来保证线程安全。
    适用于队列弹性比较大,并发性要求高的场景。

    3、SynchronousQueue
    同步队列
    很多人称如果想要任务快速执行,可以使用该队列。容易让别人有误解,不如我们来举个实际的例子
    将主类中的blockingqueue 切换为SynchronousQueue,

    BlockingQueue<Gift> queue = new SynchronousQueue<>();

    输入为下,下面是前13秒是的输出,红色字体为我的解释:

    15:35:30.906 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂开始生产:Gift(name=LV, price=28888, completeTime=1724744135902)
    15:35:30.915 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂生产了:Gift(name=LV, price=28888, completeTime=1724744135902), 剩余空间0
    15:35:30.907 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂开始生产:Gift(name=lenovo, price=12888, completeTime=1724744135901)
    15:35:30.915 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小L购买了: Gift(name=LV, price=28888, completeTime=1724744135902) ,剩余空间: 0
    15:35:30.906 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂开始生产:Gift(name=iphone, price=8888, completeTime=1724744135901)
    15:35:30.915 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂生产了:Gift(name=lenovo, price=12888, completeTime=1724744135901), 剩余空间0
    15:35:30.915 [pool-1-thread-4] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小Z购买了: Gift(name=lenovo, price=12888, completeTime=1724744135901) ,剩余空间: 0
    请求之后,工厂分别生产了3个产品:手机、电脑、LV
    之后LV 和电脑 都被消费了,此时iPhone还属于待消费状态
    15:35:33.916 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Consumer - BBBB 小L购买了: Gift(name=iphone, price=8888, completeTime=1724744135901) ,剩余空间: 0 15:35:33.916 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA 苹果代工厂生产了:Gift(name=iphone, price=8888, completeTime=1724744135901), 剩余空间0 消费者睡眠3秒钟以后,消费了手机
    此时手机的生产者才开始打印日志,标明此时put方法的阻塞状态才结束

    注意最后的时间点 15:35:33.916

    消费者第二次消费完,iPhone的厂商立刻打印了日志,所以之前iPhone的厂商在放置数据的过程中,一直处于一个阻塞的状态。
    也就是如果消费者不取,那么生产者就一直处于呈现状态。这恰恰是如果是快速消费的场景,可以使用此方案,而不是此方案可以让你的任务快速执行。当然我们通过名字其实也可以判断。

    这种场景现实中有么?
    也是有的,比如服务员找零,一般会将钱放在手里,等你拿了钱才会做接下来的动作。像这种需要同步交接,快速响应的场景,那么就可以考虑SynchronousQueue。
    当然实际场景中,服务员也不可能一直等着,消费者可能一直不拿钱(超时设定),消费者不收钱(通过InterruptedException 打断阻塞)。

    4、PriorityBlockingQueue
    优先级的阻塞队列,想法其实很简单,就是消费者不再是先进先出的获取数据,而是允许根据优先级排序后,优先拿优先级高的产品。队列内部是通过最小堆来维护队列的,我们在创建队列时,
    要指定比较算法。如下,我们使用的是按照价格由高到低的方式获取产品,同时调整生产者速度>消费者速度(调整生产者sleep 为1000ms即可):

            BlockingQueue<Gift> queue = new PriorityBlockingQueue<>(3, (o1, o2) -> o2.getPrice() - o1.getPrice());

    输出结果如下,我们可以看到消费者只获取当前队列中价值最高的产品:

    18:39:01.040 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂开始生产:Gift(name=iphone, price=8888, completeTime=1724755146036)
    18:39:01.040 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂开始生产:Gift(name=lenovo, price=12888, completeTime=1724755146036)
    18:39:01.040 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂开始生产:Gift(name=LV, price=28888, completeTime=1724755146036)
    18:39:01.045 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂生产了:Gift(name=LV, price=28888, completeTime=1724755146036),
    18:39:01.045 [pool-1-thread-4] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小Z购买了: Gift(name=LV, price=28888, completeTime=1724755146036) ,
    18:39:01.045 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂生产了:Gift(name=lenovo, price=12888, completeTime=1724755146036), 
    18:39:01.045 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小L购买了: Gift(name=lenovo, price=12888, completeTime=1724755146036) ,
    18:39:01.045 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂生产了:Gift(name=iphone, price=8888, completeTime=1724755146036), 
    18:39:02.059 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂开始生产:Gift(name=LV, price=28888, completeTime=1724755147059)
    ...省略一部分日志....
    18:39:03.069 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂生产了:Gift(name=LV, price=28888, completeTime=1724755148069), 
    18:39:03.069 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂生产了:Gift(name=lenovo, price=12888, completeTime=1724755148069), 
    18:39:04.052 [pool-1-thread-4] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小Z购买了: Gift(name=LV, price=28888, completeTime=1724755148069) ,
    18:39:04.052 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小L购买了: Gift(name=LV, price=28888, completeTime=1724755147059) ,

    5、DelayQueue

    这个队列也很有意思,它需要有产品实现一个delay接口,接口实时的返回还要多久这个产品可用。
    如下我们切换为 DelayQueue。
    效果如下:
    消费者即使已经触发获取,(防盗连接:本文首发自http://www.cnblogs.com/jilodream/ )但是还是无法立刻拿到,需要在delay()返回非正数以后才能获取。这样生产者可以生产一些内部有时间概念的产品,起到一个定时器的作用。
    举个例子:
    我们装修了一间房子,由于装修有害物质,我们需要6个月后才能入住,

    常用的办法如下:
    1、添加定时器,由定时器来触发,但是显然要有外部依赖其他对象。
    2、反复获取,拿到之后check时间,如果未到时间,重新丢回队列,但是这样显然过于复杂,系统的调用频次也会上去。
    3、消费者来获取产品时,判断时间是否可取,如果不可取就一直等待到可取到产品为止。注意此时消费线程会阻塞到该队列头部的产品,并不会越过它尝试拿后续的产品。(DelayQueue选用此种策略)

    我们先修改缓冲队列为DelayQueue

    BlockingQueue<Gift> queue = new DelayQueue<>();

    生产者的时间周期改为2s,消费者的时间周期改为4s ,假设产品的过期时间是5s注意这几个时间节点,

    Gift 类的代码我们需要改造一下,方便定位分析:

     1 package com.example.demo.learnblockingqueue;
     2 
     3 import lombok.Data;
     4 import lombok.NoArgsConstructor;
     5 import lombok.extern.slf4j.Slf4j;
     6 
     7 import java.util.Date;
     8 import java.util.concurrent.Delayed;
     9 import java.util.concurrent.TimeUnit;
    10 
    11 /**
    12  * @discription
    13  */
    14 @Data
    15 @NoArgsConstructor
    16 @Slf4j
    17 public class Gift implements Delayed {
    18 
    19     private String name;
    20 
    21     private int price;
    22 
    23     private final long completeTime = new Date().getTime() + 5000;
    24 
    25     public Gift(Gift giftSample) {
    26         name = giftSample.getName() + "_" + completeTime;
    27         price = giftSample.getPrice();
    28     }
    29 
    30     public Gift(String name, int price) {
    31         this.name = name;
    32         this.price = price;
    33     }
    34 
    35     private void printLog() {
    36         log.warn(Thread.currentThread() + "     !!!!"+ "_" + name);
    37     }
    38 
    39     @Override
    40     public long getDelay(TimeUnit unit) {
    41         printLog();
    42         return unit.convert(completeTime - new Date().getTime(), TimeUnit.MILLISECONDS);
    43     }
    44 
    45     @Override
    46     public int compareTo(Delayed o) {
    47         return 0;
    48     }
    49 }

    效果大概是这样的:

    是不是和我们设想的不太一样,直接说发生了什么,注意看不同颜色的分析对应的不同的颜色的日志:

    首先是线程5 消费者抢到了锁,则直接等待到产品到期,注意线程5不是按照我们约定的周期检查的,然后二次确认时间,接着就消费了(24秒--->29秒)

    此时线程4抢到了下一把锁注意下一个礼物是最新生产的产品,而不是最早生产的产品然后等待产品到期时间,二次确认并消费(28秒--->33秒)注意在二次确认的时间的时候,两个线程都有确认,但是只有一个线程真正会消费成功,这不影响使用效果(猜测这里是为了提高性能,并没有完全,没有采用很重的锁)。

    接下来的逻辑一样

    Connected to the target VM, address: '127.0.0.1:54328', transport: 'socket'
    19:52:24.292 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂开始生产:Gift(name=LV_1724759549288, price=28888, completeTime=1724759549288)
    19:52:24.292 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂开始生产:Gift(name=lenovo_1724759549289, price=12888, completeTime=1724759549289)
    19:52:24.292 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂开始生产:Gift(name=iphone_1724759549288, price=8888, completeTime=1724759549288)
    19:52:24.300 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Gift - Thread[pool-1-thread-5,5,main]     !!!!_LV_1724759549288
    19:52:24.300 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂生产了:Gift(name=LV_1724759549288, price=28888, completeTime=1724759549288), 
    19:52:24.300 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂生产了:Gift(name=lenovo_1724759549289, price=12888, completeTime=1724759549289),
    19:52:24.302 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂生产了:Gift(name=iphone_1724759549288, price=8888, completeTime=1724759549288), 
    ..省略..
    19:52:28.321 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂生产了:Gift(name=LV_1724759553321, price=28888, completeTime=1724759553321), 
    19:52:28.321 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂生产了:Gift(name=lenovo_1724759553321, price=12888, completeTime=1724759553321), 
    19:52:28.321 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂生产了:Gift(name=iphone_1724759553321, price=8888, completeTime=1724759553321), 
    19:52:29.300 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Gift - Thread[pool-1-thread-5,5,main]     !!!!_LV_1724759549288
    19:52:29.300 [pool-1-thread-4] WARN com.example.demo.learnblockingqueue.Gift - Thread[pool-1-thread-4,5,main]     !!!!_iphone_1724759553321
    19:52:29.300 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小L购买了: Gift(name=LV_1724759549288, price=28888, completeTime=1724759549288) ,
    19:52:30.326 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂开始生产:Gift(name=LV_1724759555326, price=28888, completeTime=1724759555326)
    19:52:30.326 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂开始生产:Gift(name=iphone_1724759555326, price=8888, completeTime=1724759555326)
    ...省略..
    19:52:32.334 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂开始生产:Gift(name=iphone_1724759557334, price=8888, completeTime=1724759557334)
    19:52:32.334 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂生产了:Gift(name=LV_1724759557334, price=28888, completeTime=1724759557334), 
    19:52:32.334 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂生产了:Gift(name=iphone_1724759557334, price=8888, completeTime=1724759557334),
    19:52:32.334 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂生产了:Gift(name=lenovo_1724759557334, price=12888, completeTime=1724759557334), 
    19:52:33.301 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Gift - Thread[pool-1-thread-5,5,main]     !!!!_iphone_1724759553321
    19:52:33.331 [pool-1-thread-4] WARN com.example.demo.learnblockingqueue.Gift - Thread[pool-1-thread-4,5,main]     !!!!_iphone_1724759553321
    19:52:33.331 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Gift - Thread[pool-1-thread-5,5,main]     !!!!_iphone_1724759557334
    19:52:33.331 [pool-1-thread-4] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小Z购买了: Gift(name=iphone_1724759553321, price=8888, completeTime=1724759553321) ,
    19:52:34.346 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂开始生产:Gift(name=lenovo_1724759559346, price=12888, completeTime=1724759559346)
    19:52:34.346 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂开始生产:Gift(name=LV_1724759559346, price=28888, completeTime=1724759559346)
    19:52:34.346 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂开始生产:Gift(name=iphone_1724759559346, price=8888, completeTime=1724759559346)
    ...省略..
    19:52:36.361 [pool-1-thread-3] WARN com.example.demo.learnblockingqueue.Producer - AAAA  LV代工厂生产了:Gift(name=LV_1724759561361, price=28888, completeTime=1724759561361),
    19:52:36.361 [pool-1-thread-2] WARN com.example.demo.learnblockingqueue.Producer - AAAA  联想代工厂生产了:Gift(name=lenovo_1724759561361, price=12888, completeTime=1724759561361), 
    19:52:36.361 [pool-1-thread-1] WARN com.example.demo.learnblockingqueue.Producer - AAAA  苹果代工厂生产了:Gift(name=iphone_1724759561361, price=8888, completeTime=1724759561361), 
    Disconnected from the target VM, address: '127.0.0.1:54328', transport: 'socket'(防盗连接:本文首发自http://www.cnblogs.com/jilodream/ )
    19:52:37.339 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Gift - Thread[pool-1-thread-5,5,main]     !!!!_iphone_1724759557334
    19:52:37.340 [pool-1-thread-4] WARN com.example.demo.learnblockingqueue.Gift - Thread[pool-1-thread-4,5,main]     !!!!_iphone_1724759561361
    19:52:37.339 [pool-1-thread-5] WARN com.example.demo.learnblockingqueue.Consumer - BBBB  小L购买了: Gift(name=iphone_1724759557334, price=8888, c

    以上我们可以得到两个重要结论:

    DelayQueue并不是按照顺序进行消费的,而是获取最新的的数据进行尝试获取,(尽管此时队列中已经存在可用的产品)

    DelayQueue唤醒消费者的时间,是按照产品的到达可用状态的时间后,才会唤醒

    因此使用DelayQueue时,我们要注意,生产商品不能太激烈,生产速度不要大于消费速度,否则产品很可能消费不到了(当然也可以利用这个规则,将旧的产品进行淘汰处理)

    产品的delay时间不能太长,否则可能会导致消费者一直处于挂起状态。

上一篇: k8s网络原理之Calico


下一篇:扫描线