前端 Java线程池用法和原理分析

honey117 · January 06, 2020 · 24 hits

一. 基本用法

Java 支持四种类型的线程池:

1. newSingleThreadPool
ExecutorService service = Executors.newSingleThreadExecutor();

创建只有一个线程的线程池。在线程池关闭支持,当前正在执行的线程可能由于异常而终止,在这种情况下下一个排队的任务将取代这个异常线程、进入执行状态。在任何情况下,线程池中都不可能有多于一个的线程处于正在执行状态。跟 newFixedThreadPool(1) 相比,newSingleThreadPool 创建的的 Executor 不会被其他的线程重新配置。

2. newFixedSizeThreadPool
ExecutorService service = Executors.newFixedThreadPool(3);

创建有固定数量线程的线程池,所有排队的线程存放在一个无界队列LinkedBlockingQueue中,因此该线程池支持任意数量的线程排队等待执行。在任何情况下,最多有 nThreads 个线程处于运行状态(该参数是创建对象时用户传入的)。当线程池中的线程都处于运行状态时,后来提交的任务都会在 LinkedBlockingQueue 中排队等待执行,执行顺序为 FIFO。若正在运行的线程因为异常而终止,下一个排队线程会立即取代它的位置,进入运行状态。线程池中的线程会一直存在,除非显式的调用 shutdown。

注意:
当线程池中当前的线程数量小于核心线程数时(核心线程数就是创建对象时传入的唯一的一个参数),即使当前有线程处于空闲状态,当有Runnable被提交到线程池时,线程池依然会创建一个Thread来运行新提交的Runnable。

3. newCachedThreadPool
ExecutorService service = Executors.newCachedThreadPool();

创建一个可缓存的线程池,当有需要时就创建 Thread,但会复用之前创建的可用线程。当线程的空闲时间超过 60s 时,空闲线程将被回收。线程池的大小的上线为 Integer.MAX_VALUE,可视为无限大。

4. newScheduledThreadPool
ScheduledExecutorService service = Executors.newScheduledThreadPool(3);

创建一个可定时或周期执行任务的线程池,并可指定线程池的大小。

该线程池支持 3 种形式的延时和定时操作:

1. 延时执行:延时delay后执行command,只会执行一次

 public ScheduledFuture<?> schedule(Runnable command, long delay,TimeUnit unit) 

2. 定时执行(从开始执行时计时)

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit)

任务第一次在延迟 initialDelay 后开始执行,后续每隔 period,执行一次,即循环执行。
在执行 Runnable 的过程中,可能会遇到异常,这会导致后续的循环被终止。如果执行任务时,执行时间超过了 period,这会使得下一次的运行时间延后,但不会并发执行。
注意:计算period是从每次执行开始时开始计算的,所以可能出现线程还没执行完下一次的任务时间就到了。

3. 定时执行(从执行完毕开始计时)

public ScheduledFuture<?> scheduleWi**thFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit)

任务第一次在延迟 initialDelay 后开始执行,后续每隔 period,执行一次,即循环执行。
在执行 Runnable 的过程中,可能会遇到异常,这会导致后续的循环被终止。
注意:计算period是从每次执行完毕时开始计算的!

二. 原理分析

1. 线程池创建方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • 1. int corePoolSize:
    线程池中线程的个数。当提交新任务时,若当前线程小于 corePoolSize,即使当有有空闲的线程,也会创建新的线程来支持任务。可以通过 allowCoreThreadTimeOut 这个方法来改变线程个数。

  • 2. int maximumPoolSize:
    线程池中可以存在的最大的线程个数

  • 3. long keepAliveTime:
    线程存活时间,当线程池中线程总数大于 corePoolSize 时,若线程的空闲时间大于 keepAliveTime,线程将会终止。默认情况下,核心线程不受此超时时间影响。可通过 allowCoreThreadTimeOut(true) 使得核心线程也收到超时间控制。

  • 4. TimeUnit unit:
    keepAliveTime 的单位,从小到大分别为:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS、MINUTES、HOURS、DAYS

  • 5. BlockingQueue<Runnable> workQueue:
    存放等待执行任务的队列

  • 6. ThreadFactory threadFactory:
    创建线程的工厂类,在同一个 threadFactory 创建的线程有以下相同点:线程组相同、线程名格式相同、都是非守护线程、线程优先级都是 Thread.NORM_PRIORITY,创建线程的代码如下:

    public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    
  • 7. RejectedExecutionHandler handler:
    当线程池和队列都达到容量限制时的异常处理策略,RejectedExecutionHandler 有 4 个具体的实现类:

  • DiscardOldestPolicy:放弃当前提交最早的任务,然后尝试执行该任务

  • AbortPolicy:不执行任务,抛出 RejectedExecutionException 异常,这是默认的策略

  • CallerRunsPolicy:在调用线程执行任务,如果已经 shutdown,则放弃任务

  • DiscardPolicy:无法执行的任务将被静默移除,无任何提示

以上 7 个参数,其中以下两个在 Executors 中有默认实现:
1. ThreadFactory:Executors 中默认的线程工厂为:

public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
}

DefaultThreadFactory 定义如下:

 static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

2. RejectedExecutionHandler:默认为 AbortPolicy,即放弃任务并抛出异常

private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

下表为 4 种线程池的创建对象时的参数:

线程池 singleThreadPool fixedSizeThreadPool cachedThreadPool scheduledThreadPool
corePoolSize 1 nThreads 0 nThreads
maximumPoolSize 1 nThreads Integer.MAX_VALUE Integer.MAX_VALUE
keepAliveTime 0 0 60 0
unit MILLISECONDS MILLISECONDS SECONDS NANOSECONDS
workQueue LinkedBlockingQueue LinkedBlockingQueue SynchronousQueue DelayedWorkQueue
threadFactory default default default default
handler default default default default

表中threadFactory的默认实现为DefaultThreadFactory,handler的默认实现为RejectedExecutionHandler。

2. BlockingQueue 及其子类

BlockingQueue 有四个常见的实现:

  1. ArrayBlockingQueue:规定大小的 BlockingQueue,其构造函数必须带一个 int 参数来指明其大小.其所含的对象是以 FIFO(先入先出) 顺序排序的.

  2. LinkedBlockingQueue:大小不定的 BlockingQueue,若其构造函数带一个规定大小的参数,生成的 BlockingQueue 有大小限制,若不带大小参数,所生成的 BlockingQueue 的大小由 Integer.MAX_VALUE 来决定.其所含的对象是以 FIFO(先入先出) 顺序排序的

  3. PriorityBlockingQueue:类似于 LinkedBlockQueue,但其所含对象的排序不是 FIFO,而是依据对象的自然排序顺序或者是构造函数的 Comparator 决定的顺序.

  4. SynchronousQueue:特殊的 BlockingQueue,对其的操作必须是放和取交替完成的.

3. 线程池关闭

1. void shutdown()

当线程池调用该方法时,线程池的状态则立刻变成 SHUTDOWN 状态。此时,则不能再往线程池中添加任何任务,否则将会抛出 RejectedExecutionException 异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

2. List< Runnable> shutdownNow()

执行该方法,线程池的状态立刻变成 STOP 状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。 它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,但这种方法的作用有限,如果线程中没有 sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。所以,ShutdownNow() 并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

No Reply at the moment.
You need to Sign in before reply, if you don't have an account, please Sign up first.