JDK源码详解——DelayQueue,ScheduledThreadPoolExecutor

DelayQueue

DelayQueue 也是一种队列,它内部的元素有“延迟”,也就是当从队列中获取元素时,如果它的延迟时间未到,则无法取出。


DelayQueue 的类签名和继承结构如下:

public class DelayQueue extends AbstractQueue
    implements BlockingQueue {}


JDK源码详解——DelayQueue,ScheduledThreadPoolExecutor

下面分析其代码实现。


代码分析

相关接口

DelayQueue 中的元素要实现 Delayed 接口,该接口定义如下:

public interface Delayed extends Comparable {
    /**
     * 以给定的时间单位,返回该对象的剩余延迟
     * 若为零或者负数表示延时已经过去
     */
    long getDelay(TimeUnit unit);
}

Delayed 接口继承自 Comparable 接口,而它本身只定义了一个 getDelay 方法,该方法的作用是获取对象的剩余延迟时间。


Comparable 接口也只有一个 compareTo 方法:

public interface Comparable {
    public int compareTo(T o);
}

这里不再详述。


构造器

DelayQueue 有两个构造器,如下:

// 无参构造器
public DelayQueue() {}


// 指定集合的构造器
public DelayQueue(Collection<? extends E> c) {
    // 该方法最后是通过 add 方法实现的,后文进行分析
    this.addAll(c);
}


成员变量

// 锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();


// 优先队列,实际存储元素的地方
private final PriorityQueue q = new PriorityQueue();


// 线程等待的标识
private Thread leader = null;


// 触发条件,表示是否可以从队列中读取元素
private final Condition available = lock.newCondition();

关于优先队列可参考前文「JDK源码分析-PriorityQueue」的分析。


入队方法

DelayQueue 也是一个队列,它的入队方法有:add(E), offer(E), put(E) 等,它们的定义如下:

public boolean add(E e) {
    return offer(e);
}


public void put(E e) {
    offer(e);
}


public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

这几个方法都是通过 offer(E) 方法实现的,它的代码如下:

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 入队
        q.offer(e);
        // 若该元素为队列头部元素,唤醒等待的线程
        // (表示可以从队列中读取数据了)
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}


出队方法

有入队自然也有出队,主要方法有:poll(), take(), poll(timeout, unit), 如下:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取队列头部元素
        E first = q.peek();
        // 头部元素为空,或者延时未到,则返回空
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        // 否则返回头部元素
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

poll 方法是非阻塞的,即调用之后无论元素是否存在都会立即返回。下面看下阻塞的 take 方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 以可中断方式获取锁
    lock.lockInterruptibly();
    try {
        // 无限循环
        for (;;) {
            // 获取队列头部元素
            E first = q.peek();
            // 若为空,则等待
            if (first == null)
                available.await();
            // 若不为空
            else {
                // 获取延迟的纳秒数,若小于等于零(即过期),则获取并删除头部元素
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                // 执行到这里,表示 delay>0,也就是延时未过期
                first = null; // don't retain ref while waiting
                // leader 不为空表示有其他线程在读取数据,当前线程等待
                if (leader != null)
                    available.await();
                else {
                    // 将当前线程设置为 leader
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待延迟时间过期
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 唤醒该条件下的其他线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

该方法看起来稍复杂,主要逻辑如下:

1. 获取队列头部元素;

1.1 若该元素为空(队列为空),则当前线程等待;

1.2 若该元素不为空,且已经过期,则取出该元素(并移除);

1.2.1 若未过期,且有其他线程在操作(leader 不为空),当前线程等待;

1.2.2 若未过期,且没有其他线程操作,则占有“操作权”(将 leader 设置为当前线程),并等待延迟过期。

以上操作循环执行。


take 方法是阻塞操作,当条件不满足时会一直等待。另一个 poll(timeout, unit) 方法和它有些类似,只不过带有延时,如下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 以可中断方式获取锁
        lock.lockInterruptibly();
        try {
            // 无限循环
            for (;;) {
                // 获取队列的头部元素
                E first = q.peek();
                // 若头部元素为空(即队列为空),当超时时间大于零则等待相应的时间;
                //   否则(即超时时间小于等于零)返回空
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    // 执行到这里表示队列头部元素不为空
                    // 获取剩余延时
                    long delay = first.getDelay(NANOSECONDS);
                    // 延时已过期,返回队列头部元素
                    if (delay <= 0)
                        return q.poll();
                    // 延时未过期且等待超时,返回空
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    // 延时未过期且等待未超时,且等待超时<延迟时间
                    // 表示有其他线程在取数据,则当前线程进入等待
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        // 没有其他线程等待,将当前线程设置为 leader,类似于“独占”操作
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            // 计算剩余延迟时间
                            nanos -= delay - timeLeft;
                        } finally {
                            // 该线程操作完毕,把 leader 置空
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 唤醒 available 条件下的一个其他线程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

take 和 poll 方法还有一个区别:当延迟未过期时,take 方法会一直等待,而 poll 方法则会返回空。


此外还有一个 peek 方法,该方法虽然也能获取队列头部的元素,但与以上出队方法不同的是,peek 方法只是读取队列头部元素,并不会将其删除:

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 返回队列的头部元素(不删除)
        return q.peek();
    } finally {
        lock.unlock();
    }
}

以上就是 DelayQueue 的主要方法的代码分析,为便于理解,下面简要举例分析。


用法举例

示例代码:

自定义一个实现了 Delayed 接口的 Task 类,并将它的几个对象添加到一个延迟队列中,代码如下:

public class TestDelayedQueue {
    public static void main(String[] args) throws Exception {
        BlockingQueue delayQueue = new DelayQueue<>();
        long now = System.currentTimeMillis();
        delayQueue.put(new Task("c", now + 6000));        
        delayQueue.put(new Task("d", now + 10000));
        delayQueue.put(new Task("a", now + 3000));
        delayQueue.put(new Task("b", now + 4000));
        
        while (true) {
            System.out.println(delayQueue.take());
            TimeUnit.SECONDS.sleep(1);
        }
    }


    private static class Task implements Delayed {
        private String taskName;
        private long endTime;


        public Task(String taskName, long endTime) {
            this.taskName = taskName;
            this.endTime = endTime;
        }


        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }


        @Override
        public int compareTo(Delayed o) {
            return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
        }


        @Override
        public String toString() {
            return "taskName-->" + taskName;
        }
    }
}

结果会以延迟时间的顺序取出各个元素。


小结

1. DelayQueue 是一种队列,同时实现了 BlockingQueue 接口;

2. 它内部的元素有延迟时间的概念,出队时,若延时未到,则无法读取到队列头部的元素;

3. 它是线程安全的。


ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 也是一个线程池类,是线程池类 ThreadPoolExecutor 的子类。除了 ThreadPoolExecutor 相关的方法之外,它还增加了执行定时任务和周期性任务的方法。它的类签名和继承结构如下:

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {}
JDK源码详解——DelayQueue,ScheduledThreadPoolExecutor

可以看到,它继承了 ThreadPoolExecutor 类(参考 「JDK源码分析-ThreadPoolExecutor」),并且实现了 ScheduledExecutorService 接口(参考 「JDK源码分析-ScheduledExecutorService」),因此具有二者的特性。下面分析其代码实现。


代码分析

内部嵌套类 DelayedWorkQueue

先看它的一个内部嵌套类 DelayedWorkQueue,该类是一个延迟队列,它的类签名和继承结构如下:

static class DelayedWorkQueue extends AbstractQueue
    implements BlockingQueue {}
JDK源码详解——DelayQueue,ScheduledThreadPoolExecutor

DelayedWorkQueue 类与前文分析的 DelayQueue 「JDK源码分析-DelayQueue」实现原理类似,这里就不再赘述。


构造器

ScheduledThreadPoolExecutor 有如下四个构造器:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}


public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}


public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}


public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

这几个构造器都是直接调用父类 ThreadPoolExecutor 的构造器,只是传入了不同的参数。而其中的参数 workQueue 都传入了上面的延迟队列 DelayedWorkQueue。


内部类 ScheduledFutureTask

ScheduledThreadPoolExecutor 还有一个内部类 ScheduledFutureTask,它的继承结构如下:

JDK源码详解——DelayQueue,ScheduledThreadPoolExecutor

它继承了 FutureTask 类(可参考前文「JDK源码分析-FutureTask」的分析),且实现了 RunnableScheduledFuture 接口,该接口定义如下:

public interface RunnableScheduledFuture extends RunnableFuture, ScheduledFuture {
    // 一个任务是否周期性执行的,若是则可以重复执行;否则只能运行一次
    boolean isPeriodic();
}

RunnableScheduledFuture 只定义了一个方法 isPeriodic,该方法用于判断一个任务是否是周期性执行的。它继承的 RunnableFuture 接口在前文 FutureTask 类中已进行分析,而 ScheduledFuture 接口如下:

public interface ScheduledFuture extends Delayed, Future {
}

它的内部并未定义方法,只是整合了 Delayed 接口和 Future 接口,Delayed 接口前文也已分析,下面分析该类的主要代码。


先看它的主要成员变量:

// 定时任务执行的时间(单位:纳秒)
private long time;


/**
 * 重复执行的任务的时间间隔(单位:纳秒) 
 * 正数表示固定频率(fixed-rate)执行
 * 负数表示固定延迟(fixed-delay)执行
 * 零表示非重复执行的任务
 */
private final long period;


// reExecutePeriodic 方法中重新排队的任务
RunnableScheduledFuture outerTask = this;


// 延迟队列中的索引位置,便于快速取消
int heapIndex;

构造器:

/**
 * 构造器一:用给定的触发时间(纳秒),创建一个一次性任务
 */
ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}


/**
 * 构造器二:用给定的触发时间和间隔(纳秒),创建一个周期性任务
 */
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}


/**
 * 构造器三:用给定的触发时间(纳秒),创建一个一次性任务
 */
ScheduledFutureTask(Callable callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask 有三个构造器,可分为两类:分别是创建一次性任务(一和三)和周期性任务(二)。其中一和三还是 Runnable 和 Callable 的区别。


该类是一个任务类,即 Runnable 接口的实现类,因此它最核心的就是 run 方法,如下:

public void run() {
    // 是否为周期性任务
    boolean periodic = isPeriodic();
    // 若任务不能执行,则取消
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 若为非周期性任务
    else if (!periodic)
        // 若为周期性任务,调用 ScheduledFutureTask 的父类(即 FutureTask)的 run 方法执行
        ScheduledFutureTask.super.run();
    // 若为周期性任务,调用 ScheduledFutureTask 的父类(即 FutureTask)的 runAndReset 方法执行
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime(); // 设置下一次执行时间
        reExecutePeriodic(outerTask); // 周期性执行
    }
}

reExecutePeriodic 方法如下:

/**
 * 该方法主要是将周期性任务重新排队
 * 它的实现与 delayedExecute 方法(后面分析)逻辑有些类似
 */
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}


schedule & scheduleAtFixedRate & scheduleWithFixedDelay

这几个就是执行定时任务和周期性任务的方法,它们是对前文 「JDK源码分析-ScheduledExecutorService」接口所定义的方法实现,可参考前文的分析。


schedule 方法 1:其作用是延迟指定的时间后执行任务(即执行定时任务),只会执行一次。

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    // 把用户提交的 Runnable 对象包装为 RunnableScheduledFuture 对象
    // decorateTask 方法默认返回第二个参数
    // decorateTask 方法的修饰符是 protected,可根据需求自行扩展
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask(command, null,
                                      triggerTime(delay, unit)));
    // 执行给定的任务
    delayedExecute(t);
    return t;
}

delayExecute 方法:

/*
 * 延迟或周期性任务的主要执行方法。
 * 若线程池已关闭,则拒绝该任务(执行拒绝策略);
 * 否则将任务添加到工作队列,若有需要启动一个线程去执行。
 * 若在添加任务时关闭了线程池,则将其从队列移除并取消该任务
 */
private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 若线程池已关闭,则执行拒绝策略
    if (isShutdown())
        reject(task);
    else {
        // 将该任务添加到任务队列(即前面的延迟队列)
        super.getQueue().add(task);
        // 若当前任务无法执行,则将其从队列移除并且取消执行(类似事务的回滚操作)
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        // 任务可以执行,若有需要新增线程以执行该任务
        else
            ensurePrestart();
    }
}

schedule 方法 2:

public  ScheduledFuture schedule(Callable callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture t = decorateTask(callable,
        new ScheduledFutureTask(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

该方法与前者类似,差别在于这里的参数类型是 Callable,前者是 Runnable 类型,其他操作一样。


scheduleAtFixedRate 方法:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 将 Runnable 对象包装为 ScheduledFutureTask 对象
    ScheduledFutureTask sft =
        new ScheduledFutureTask(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

该方法与前面的 schedule 方法类似,区别仅在于使用了不同的 ScheduledFutureTask 对象,其他的执行流程几乎一样。


scheduleWithFixedDelay 方法:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask sft =
        new ScheduledFutureTask(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

该方法与 scheduleAtFixedRate 方法基本一样,区别仅在于构建 ScheduledFutureTask 对象时参数 period 不同(一正一负,用以区分类型)。


execute & submit 方法

这两个方法是 Executor 接口和 ExecutorService 接口所定义的方法,代码实现如下:

public void execute(Runnable command) {
    schedule(command, 0, NANOSECONDS);
}


public Future<?> submit(Runnable task) {
    return schedule(task, 0, NANOSECONDS);
}

它们内部直接调用了 schedule(Runnable) 方法。另外两个 submit 方法:

public  Future submit(Runnable task, T result) {
    return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}


public  Future submit(Callable task) {
    return schedule(task, 0, NANOSECONDS);
}

它们内部直接调用了 schedule(Callable) 方法。


小结

1. ScheduledThreadPoolExecutor 是线程池的实现类之一;

2. 它继承自 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口;

3. 提供了异步提交任务的 execute 方法和 submit 方法;

4. 提供了执行定时任务的 schedule 方法和周期性任务的 scheduleAtFixedRate/scheduleWithFixedDelay 方法(使用延迟队列实现)。

展开阅读全文

页面更新:2024-02-24

标签:前文   源码   周期性   队列   线程   头部   详解   元素   接口   类似   定义   对象   操作   代码   时间   方法   科技

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top