安阳seo网站优化如何自己免费制作网站
ScheduledThreadPoolExecutor 是一个 可以在指定延迟时间后 或者 定时进行任务调度执行 的线程池。
文章目录
- 一、类图
- 二、原理剖析
- (1) schedule(Runnable command, long delay, TimeUnit unit) 方法
- (2) run() 方法
- (3)scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) 方法
- (4)scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) 方法
- 三、 总结
一、类图
可以看到,ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor ,并实现了 ScheduledExecutorService 接口。线程池队列是 DelayedWorkQueue,它和 DelayedQueue 类似,是一个延迟队列。
ScheduledFutureTask 是具有返回值的任务,继承自 FutureTask。FutureTask 的内部有一个变量 state 用来表示任务的状态:
private volatile int state;
一开始状态为 NEW:
private static final int NEW = 0;
执行中状态:
private static final int COMPLETING = 1;
正常运行结束状态:
private static final int NORMAL = 2;
运行中异常:
private static final int EXCEPTIONAL = 3;
任务被取消:
private static final int CANCELLED = 4;
任务正在被打断:
private static final int INTERRUPTING = 5;
任务已经被打断:
private static final int INTERRUPTED = 6;
可能的任务状态转换路径:
ScheduledFutureTask 内部还有一个变量 period 用来表示任务的类型,任务类型有:
- period = 0,说明当前任务是一次性的,执行完毕后就退出了。
- period 为负数,说明当前任务为 fixed-delay 任务,是 固定延迟的定时可重复执行任务。
- period 为正数,说明当前任务为 fixed-rate 任务,是 固定频率的定时可重复执行任务。
ScheduledThreadPoolExecutor 的构造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {// (一)super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
(一):
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
二、原理剖析
(1) schedule(Runnable command, long delay, TimeUnit unit) 方法
✨ 该方法的作用是 提交一个延迟执行的任务,任务 从提交时间 算起 延迟单位为 unit 的 delay 时间后开始执行。提交的任务不是周期性任务,任务只会执行一次。✨
/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {// 参数校验if (command == null || unit == null)throw new NullPointerException();// 任务转换,把提交的 command 任务转换成 ScheduledFutureTask// ScheduledFutureTask 是具体放入延迟队列里的东西RunnableScheduledFuture<?> t = decorateTask(command,//(一)new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));// (三)添加任务到延迟队列delayedExecute(t);return t;}
由于是延迟任务,所以 ScheduledFutureTask 实现了 long getDelay(TimeUnit unit)
和 int compareTo(Delayed other)
方法。 triggerTime 方法将延迟时间转换成绝对时间,也就是 把当前时间的纳秒数 加上 延迟的纳秒数 后的 long 型值。
(一)ScheduledFutureTask 构造方法:
ScheduledFutureTask(Runnable r, V result, long ns) {// (二)调用父类 FutureTask 的构造方法super(r, result);this.time = ns;// period 为 0 ,说明是一次性任务this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}
(二):
public FutureTask(Runnable runnable, V result) {// 通过适配器把 runnable 转换成 callablethis.callable = Executors.callable(runnable, result);// 设置当前任务状态为 NEWthis.state = NEW; // ensure visibility of callable}
ScheduledFutureTask 的 getDelay 方法:
public long getDelay(TimeUnit unit) {// 装饰后时间 - 当前时间return unit.convert(time - now(), NANOSECONDS);}
可以看到,该方法用来计算当前任务还有多少时间过期。
ScheduledFutureTask 的 compareTo 方法:
public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}
compareTo 的作用是 加入元素到延迟队列后,在内部 建立 或者 调整 堆时,会使用元素的 compareTo 方法 与 队列里其他因素做比较,让快过期的元素放到队首,所以无论什么时候,向队列里添加元素,队首的元素都是最快要过期的元素。
(三)delayedExecute:
private void delayedExecute(RunnableScheduledFuture<?> task) {// 如果线程池关闭了if (isShutdown())// 则执行线程池拒绝策略reject(task);else {// 添加任务到延迟队列super.getQueue().add(task);// 添加完毕后,重新检查线程池是否被关闭了if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))// 如果线程池已经被关闭了,而且任务是一次性的,// 而且线程已经被从任务队列中移除,也就是说,该任务已经在执行了// 就调用任务的 cancel 方法取消任务task.cancel(false);else// (四)确保至少一个线程在处理任务ensurePrestart();}}
(四)ensurePrestart():
void ensurePrestart() {// 获取线程池中线程个数int wc = workerCountOf(ctl.get());// 如果线程个数小于核心线程数if (wc < corePoolSize)// 则 新增一个核心线程addWorker(null, true);// 如果线程数为 0else if (wc == 0)// 新增一个线程addWorker(null, false);}
(2) run() 方法
以上是延迟队列里添加任务,接下来看看,线程池里的线程 如何 获取 并 执行 任务。上一篇 ThreadPoolExecutor ,具体执行任务的是 Worker 线程,Worker 线程调用具体任务的 run 方法来执行,而 ScheduledThreadPoolExecutor 中,任务是 ScheduledFutureTask,其 run() 方法源码:
public void run() {// (一)判断周期性boolean periodic = isPeriodic();// (二)判断当前任务是否该被取消if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}
}
(一)isPeriodic:
public boolean isPeriodic() {return period != 0;}
由于在 ScheduledFutureTask 构造方法中,设置了 this.period = 0;
所以 该方法将返回 false,不是周期性任务,而是 一次性任务。
(二):
boolean canRunInCurrentRunState(boolean periodic) {// (三)return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);}
传入的 periodic 参数是 false,所以 isRunningOrShutdown 传入的参数是 executeExistingDelayedTasksAfterShutdown,该值默认是 true: private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
表示当其他线程调用了 shutdown 命令 关闭线程池后,当前任务还是要执行;如果值为 false,表示当前任务应被取消。
(三)isRunningOrShutdown:
final boolean isRunningOrShutdown(boolean shutdownOK) {int rs = runStateOf(ctl.get());return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);}
传入的参数是 true,也就是说,如果当前线程池状态为 RUNNING 或者 SHUTDOWN,该方法返回 true,接下来 run 方法会执行 ScheduledFutureTask.super.run();
;如果当前线程池状态不为 RUNNING 也 不为 SHUTDOWN,那么在 run() 方法中, 就会执行cancel(false)
,取消任务(因为只有 RUNNING 和 SHUTDOWN 这两个状态是会去处理队列中的任务的,其他状态自然就被取消啦 ),传入的 false 参数表示不允许中断。
接下来看看 ScheduledFutureTask.super.run();
,也就是父类 FutureTask 的 run 方法:
public void run() {// 如果任务状态不是 NEW 就直接返回if (state != NEW ||// 或者 当前任务状态为 NEW // 但是 使用 CAS 设置当前任务的持有者为当前线程 失败 就直接返回!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;// (五)setException(ex);}// 如果任务执行成功,修改任务状态if (ran)// (四)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}
(四)set:
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}
可以看到,该方法首先使用 CAS 将当前任务状态从 NEW 转换到 COMPLETING,如果有多个线程调用方法,(比如,当同一个 command 被 多次提交到线程池时)只有一个线程会成功。成功的线程再通过 UNSAFE.putOrderedInt 设置任务的线程状态为 NORMAL,即 正常结束状态,这里不需要 CAS 了是因为对于同一个任务,只可能有一个线程运行到这里。
(五)setException:
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}
可以看到,该方法首先使用 CAS 将当前任务状态从 NEW 转换到 COMPLETING,然后再设置当前任务状态为 EXCEPTION,即 任务非正常结束。
(3)scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) 方法
该方法的作用是,当任务执行完毕后,让其延迟固定时间再次运行 (fixed-delay 任务)。其中 initialDay 表示 提交任务后 延迟多少时间 开始执行任务 command,delay 表示 当任务执行完毕后 延长多少时间后 再次运行 command 任务,unit 是 initialDelay 和 delay 的时间单位。任务会一直运行,直到抛出了异常,被取消了,或者 关闭了线程池。
构造方法源码:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {// 参数校验 if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();// 任务转换,把 command 转换成 ScheduleFutureTask ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;// 添加任务到队列delayedExecute(t);return t;}
将任务添加到队列后,线程池线程会从队列中获取任务,然后调用 ScheduledFutureTask 的 run 方法,由于 period = - delay != 0
,所以 isPeriodic() 方法返回 true,接下来 run 方法中会执行 ScheduledFutureTask.super.runAndReset()
,该方法源码:
protected boolean runAndReset() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptss = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;}
可以看到, ScheduledFutureTask 的 run 方法里的 runAndResetFutureTask ,和 父类 FutureTask 的 run 方法类似,只是 任务正常执行完毕后,不会设置任务的状态,因为任务是可重复执行的,而且,返回值是 return ran && s == NEW;
, 也就是说,如果当前任务正常执行完毕 并且 任务状态是 NEW 则返回 true,否则返回 false。 如果返回了 true,那么在 run 方法里继续向下执行 setNextRunTime();
,设置该任务 下一次的执行时间。该方法源码如下:
private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}
如果 p < 0,说明当前任务为 fixed-deley 任务,就会设置 time 为 当前时间加上 -p 的时间,也就是 延迟 -p 时间后再次执行。 执行完 setNextRunTime 方法后,会执行 reExecutePeriodic(outerTask);
,把任务重新入队。
🎭总结 :
✨ fixed-delay 类型的任务的执行原理:当添加一个任务到延迟队列后,等待 initialDelay 时间,任务就会过期,过期的任务就会被从队列移除,并执行。执行完毕后,会重新设置任务的延迟时间,然后再把任务放入延迟队列,循环往复。需要注意的是,如果一个任务在执行中抛出了异常,那么这个任务就结束了,但是不影响其他任务的执行。✨
(4)scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) 方法
该方法相对起始时间点 以固定频率 调用指定的任务 (fixed-rate 任务)。当把任务提交到线程池并延迟 initialDelay 时间(时间单位为 unit) 后 开始执行任务 command。然后 从 intialDelay + period 时间点再次执行,而后在 initialDelay + 2 * period 时间点 再执行,循环往复,直到抛出了异常 或者 调用了任务的 cancel 方法取消了任务,或者关闭了线程池。scheduleAtFixedRate 的原理 和 scheduleWithFixedDelay 类似。
源码:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;
}
因为对于 fixed-period 任务,period 是大于 0 的,所以转换 command 为 ScheduledFutureTask 时, 传入的是 period,在 当前任务执行为三比,调用 setNextRunTime();
时 执行的是 time += p;
。
🎭总结 :
✨ fixed-rate 类型的任务的执行原理:时间为 initialDelay + n * period 时 启动任务 ,但是如果 当前任务还没有执行完,下一次要执行任务的时间到了,则不会并发执行,会延迟执行,要等到当前任务执行完毕后再执行。✨
三、 总结
ScheduledThreadPoolExecutor 内部使用了 DelayQueue 来存放具体任务。任务分为三种,一次性执行完毕就结束的;fixed-delay 任务保证同一个任务在多次执行之间间隔固定时间的,fixed-rate 任务保证 按照固定的频率执行的。任务的类型使用 period 值来区分。