怎么做网站推广方案/百度一下你就知道啦
文章目录
- 场景需求
- 同步方式
- 代码
- 结果输出
- 异步方式
- 代码
- 结果输出
- 延伸
转载请标明出处:
https://bigmaning.blog.csdn.net/article/details/125455916
本文出自:【BigManing的博客】
场景需求
一共有20条数据,每条数据处理需要2秒,分别使用同步、异步方式处理,一共需要多久才能处理完呢?
接下来进行简单的验证,如有疑惑,欢迎指正。
同步方式
使用map算子模拟同步
代码
/*** 同步方式 */private static void syncHandleData() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.generateSequence(1, 20).map(aLong -> {return handleData(aLong);});env.execute("test");}/*** 模拟业务 耗时2s操作**/private static Long handleData(Long aLong) throws InterruptedException {String dateTime = DateFormatUtils.ISO_DATETIME_FORMAT.format(new Date());System.out.println(dateTime + "====> 处理第" + aLong + "个任务... 开始 " + Thread.currentThread().getName());// 模拟 耗时2s处理Thread.sleep(2000);dateTime = DateFormatUtils.ISO_DATETIME_FORMAT.format(new Date());System.out.println(dateTime + "====> 处理第" + aLong + "个任务... 结束");return aLong;}
结果输出
从以下结果可以得出结论:
- 每个任务都是按照顺序执行的
- 每个任务都是耗时2s()
- 总耗时为 20*2 = 40s
2022-06-25T09:53:39====> 处理第1个任务... 开始 Thread-7
2022-06-25T09:53:41====> 处理第1个任务... 结束
2022-06-25T09:53:41====> 处理第2个任务... 开始 Thread-7
2022-06-25T09:53:43====> 处理第2个任务... 结束
2022-06-25T09:53:43====> 处理第3个任务... 开始 Thread-7
2022-06-25T09:53:45====> 处理第3个任务... 结束
2022-06-25T09:53:45====> 处理第4个任务... 开始 Thread-7
2022-06-25T09:53:47====> 处理第4个任务... 结束
2022-06-25T09:53:47====> 处理第5个任务... 开始 Thread-7
2022-06-25T09:53:49====> 处理第5个任务... 结束
2022-06-25T09:53:49====> 处理第6个任务... 开始 Thread-7
2022-06-25T09:53:51====> 处理第6个任务... 结束
2022-06-25T09:53:51====> 处理第7个任务... 开始 Thread-7
2022-06-25T09:53:53====> 处理第7个任务... 结束
2022-06-25T09:53:53====> 处理第8个任务... 开始 Thread-7
2022-06-25T09:53:55====> 处理第8个任务... 结束
2022-06-25T09:53:55====> 处理第9个任务... 开始 Thread-7
2022-06-25T09:53:57====> 处理第9个任务... 结束
2022-06-25T09:53:57====> 处理第10个任务... 开始 Thread-7
2022-06-25T09:53:59====> 处理第10个任务... 结束
2022-06-25T09:53:59====> 处理第11个任务... 开始 Thread-7
2022-06-25T09:54:01====> 处理第11个任务... 结束
2022-06-25T09:54:01====> 处理第12个任务... 开始 Thread-7
2022-06-25T09:54:03====> 处理第12个任务... 结束
2022-06-25T09:54:03====> 处理第13个任务... 开始 Thread-7
2022-06-25T09:54:05====> 处理第13个任务... 结束
2022-06-25T09:54:05====> 处理第14个任务... 开始 Thread-7
2022-06-25T09:54:07====> 处理第14个任务... 结束
2022-06-25T09:54:07====> 处理第15个任务... 开始 Thread-7
2022-06-25T09:54:09====> 处理第15个任务... 结束
2022-06-25T09:54:09====> 处理第16个任务... 开始 Thread-7
2022-06-25T09:54:11====> 处理第16个任务... 结束
2022-06-25T09:54:11====> 处理第17个任务... 开始 Thread-7
2022-06-25T09:54:13====> 处理第17个任务... 结束
2022-06-25T09:54:13====> 处理第18个任务... 开始 Thread-7
2022-06-25T09:54:15====> 处理第18个任务... 结束
2022-06-25T09:54:15====> 处理第19个任务... 开始 Thread-7
2022-06-25T09:54:17====> 处理第19个任务... 结束
2022-06-25T09:54:17====> 处理第20个任务... 开始 Thread-7
2022-06-25T09:54:19====> 处理第20个任务... 结束
异步方式
使用AsyncDataStream
数据流,因为我们不需要关注顺序,所以使用unorderedWait
方法。如果你需要关注顺序,可以使用orderedWait方法。
代码
/*** 异步方式*/private static void asyncHandleData() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Long> longDataStreamSource = env.generateSequence(1, 20);// 不用关心顺序,异步处理完直接发送到下游AsyncDataStream.unorderedWait(longDataStreamSource, new MyRichAsyncFunction(), 1, TimeUnit.MINUTES, 5);env.execute("test");}/*** RichAsyncFunction 异步处理Function 实现类*/static class MyRichAsyncFunction extends RichAsyncFunction<Long, Long> {private ThreadPoolExecutor threadPoolExecutor;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 初始化操作threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());}@Overridepublic void close() throws Exception {super.close();// 善后操作threadPoolExecutor.shutdown();}@Overridepublic void timeout(Long input, ResultFuture<Long> resultFuture) throws Exception {// 超时操作resultFuture.completeExceptionally(new TimeoutException());}@Overridepublic void asyncInvoke(Long aLong, ResultFuture<Long> resultFuture) throws Exception {// 异步客户端处理CompletableFuture.runAsync(() -> {try {handleData(aLong);} catch (InterruptedException e) {resultFuture.completeExceptionally(e);return;}resultFuture.complete(Collections.singleton(aLong));}, threadPoolExecutor);}}/*** 模拟 耗时2s操作*/private static Long handleData(Long aLong) throws InterruptedException {String dateTime = DateFormatUtils.ISO_DATETIME_FORMAT.format(new Date());System.out.println(dateTime + "====> 处理第" + aLong + "个任务... 开始 " + Thread.currentThread().getName());// 模拟 耗时2s处理Thread.sleep(2000);dateTime = DateFormatUtils.ISO_DATETIME_FORMAT.format(new Date());System.out.println(dateTime + "====> 处理第" + aLong + "个任务... 结束");return aLong;}
结果输出
从以下结果可以得出结论:
- 任务是异步并发处理的
- 由于是AsyncDataStream.unorderedWait,所以顺序是无序的
- 总耗时为 8s (2022-06-25T10:23:09 - 2022-06-25T10:23:01)
2022-06-25T10:23:01====> 处理第4个任务... 开始 pool-3-thread-4
2022-06-25T10:23:01====> 处理第2个任务... 开始 pool-3-thread-2
2022-06-25T10:23:01====> 处理第5个任务... 开始 pool-3-thread-5
2022-06-25T10:23:01====> 处理第3个任务... 开始 pool-3-thread-3
2022-06-25T10:23:01====> 处理第1个任务... 开始 pool-3-thread-1
2022-06-25T10:23:03====> 处理第4个任务... 结束
2022-06-25T10:23:03====> 处理第2个任务... 结束
2022-06-25T10:23:03====> 处理第1个任务... 结束
2022-06-25T10:23:03====> 处理第3个任务... 结束
2022-06-25T10:23:03====> 处理第5个任务... 结束
2022-06-25T10:23:03====> 处理第6个任务... 开始 pool-3-thread-4
2022-06-25T10:23:03====> 处理第7个任务... 开始 pool-3-thread-2
2022-06-25T10:23:03====> 处理第8个任务... 开始 pool-3-thread-1
2022-06-25T10:23:03====> 处理第9个任务... 开始 pool-3-thread-3
2022-06-25T10:23:03====> 处理第10个任务... 开始 pool-3-thread-5
2022-06-25T10:23:05====> 处理第6个任务... 结束
2022-06-25T10:23:05====> 处理第7个任务... 结束
2022-06-25T10:23:05====> 处理第8个任务... 结束
2022-06-25T10:23:05====> 处理第9个任务... 结束
2022-06-25T10:23:05====> 处理第11个任务... 开始 pool-3-thread-4
2022-06-25T10:23:05====> 处理第10个任务... 结束
2022-06-25T10:23:05====> 处理第12个任务... 开始 pool-3-thread-1
2022-06-25T10:23:05====> 处理第13个任务... 开始 pool-3-thread-2
2022-06-25T10:23:05====> 处理第14个任务... 开始 pool-3-thread-3
2022-06-25T10:23:05====> 处理第15个任务... 开始 pool-3-thread-5
2022-06-25T10:23:07====> 处理第13个任务... 结束
2022-06-25T10:23:07====> 处理第15个任务... 结束
2022-06-25T10:23:07====> 处理第11个任务... 结束
2022-06-25T10:23:07====> 处理第12个任务... 结束
2022-06-25T10:23:07====> 处理第14个任务... 结束
2022-06-25T10:23:07====> 处理第16个任务... 开始 pool-3-thread-3
2022-06-25T10:23:07====> 处理第17个任务... 开始 pool-3-thread-5
2022-06-25T10:23:07====> 处理第18个任务... 开始 pool-3-thread-4
2022-06-25T10:23:07====> 处理第19个任务... 开始 pool-3-thread-1
2022-06-25T10:23:07====> 处理第20个任务... 开始 pool-3-thread-2
2022-06-25T10:23:09====> 处理第18个任务... 结束
2022-06-25T10:23:09====> 处理第17个任务... 结束
2022-06-25T10:23:09====> 处理第16个任务... 结束
2022-06-25T10:23:09====> 处理第19个任务... 结束
2022-06-25T10:23:09====> 处理第20个任务... 结束
延伸
使用orderedWait会有什么样的表现呢?
代码修改如下:
/*** 异步方式*/private static void asyncHandleData() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Long> longDataStreamSource = env.generateSequence(1, 20);// 关心顺序,使用orderedWait, 异步处理完 并且是位于队列第一个 才发送到下游AsyncDataStream.orderedWait(longDataStreamSource, new MyRichAsyncFunction(), 1, TimeUnit.MINUTES, 5)// // 验证 orderedWait .print("打印结果:");env.execute("test");}
运行结果:
2022-06-25T10:39:57====> 处理第2个任务... 开始 pool-3-thread-2
2022-06-25T10:39:57====> 处理第4个任务... 开始 pool-3-thread-4
2022-06-25T10:39:57====> 处理第5个任务... 开始 pool-3-thread-5
2022-06-25T10:39:57====> 处理第3个任务... 开始 pool-3-thread-3
2022-06-25T10:39:57====> 处理第1个任务... 开始 pool-3-thread-1
2022-06-25T10:39:59====> 处理第4个任务... 结束
2022-06-25T10:39:59====> 处理第1个任务... 结束
2022-06-25T10:39:59====> 处理第2个任务... 结束
2022-06-25T10:39:59====> 处理第3个任务... 结束
2022-06-25T10:39:59====> 处理第5个任务... 结束
打印结果:> 1
打印结果:> 2
打印结果:> 3
打印结果:> 4
打印结果:> 5
2022-06-25T10:39:59====> 处理第6个任务... 开始 pool-3-thread-1
2022-06-25T10:39:59====> 处理第7个任务... 开始 pool-3-thread-2
2022-06-25T10:39:59====> 处理第8个任务... 开始 pool-3-thread-4
2022-06-25T10:39:59====> 处理第9个任务... 开始 pool-3-thread-3
2022-06-25T10:39:59====> 处理第10个任务... 开始 pool-3-thread-5
2022-06-25T10:40:01====> 处理第9个任务... 结束
2022-06-25T10:40:01====> 处理第6个任务... 结束
2022-06-25T10:40:01====> 处理第8个任务... 结束
2022-06-25T10:40:01====> 处理第7个任务... 结束
2022-06-25T10:40:01====> 处理第10个任务... 结束
打印结果:> 6
打印结果:> 7
打印结果:> 8
打印结果:> 9
打印结果:> 10
2022-06-25T10:40:01====> 处理第11个任务... 开始 pool-3-thread-3
2022-06-25T10:40:01====> 处理第12个任务... 开始 pool-3-thread-1
2022-06-25T10:40:01====> 处理第13个任务... 开始 pool-3-thread-4
2022-06-25T10:40:01====> 处理第14个任务... 开始 pool-3-thread-2
2022-06-25T10:40:01====> 处理第15个任务... 开始 pool-3-thread-5
2022-06-25T10:40:03====> 处理第11个任务... 结束
2022-06-25T10:40:03====> 处理第12个任务... 结束
2022-06-25T10:40:03====> 处理第13个任务... 结束
打印结果:> 11
打印结果:> 12
打印结果:> 13
2022-06-25T10:40:03====> 处理第14个任务... 结束
2022-06-25T10:40:03====> 处理第15个任务... 结束
2022-06-25T10:40:03====> 处理第16个任务... 开始 pool-3-thread-2
打印结果:> 14
2022-06-25T10:40:03====> 处理第17个任务... 开始 pool-3-thread-1
打印结果:> 15
2022-06-25T10:40:03====> 处理第18个任务... 开始 pool-3-thread-4
2022-06-25T10:40:03====> 处理第19个任务... 开始 pool-3-thread-3
2022-06-25T10:40:03====> 处理第20个任务... 开始 pool-3-thread-5
2022-06-25T10:40:05====> 处理第16个任务... 结束
打印结果:> 16
2022-06-25T10:40:05====> 处理第17个任务... 结束
2022-06-25T10:40:05====> 处理第19个任务... 结束
2022-06-25T10:40:05====> 处理第18个任务... 结束
打印结果:> 17
2022-06-25T10:40:05====> 处理第20个任务... 结束
打印结果:> 18
打印结果:> 19
打印结果:> 20