摘要:本文学习了如何在高并发场景下使用线程池创建线程。
环境
Windows 10 企业版 LTSC 21H2 Java 1.8
1 类和接口 1.1 Executor Executor是一个顶层接口,在它里面只声明了一个execute()
方法,用来在接下来的某个时刻执行提交的任务。
常用方法:
java 1 void execute (Runnable command) ;
1.2 ExecutorService ExecutorService接口继承了Executor接口,并声明了一些方法。
常用方法:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void shutdown () ;List<Runnable> shutdownNow () ; boolean isShutdown () ;boolean isTerminated () ;<T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task);
1.3 ThreadPoolExecutor ThreadPoolExecutor是线程池中最核心的一个类,通过间接的方式实现了ExecutorService接口。
构造方法:
java 1 2 3 4 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) ;public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) ;public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) ;public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ;
参数:
corePoolSize:核心池的大小。当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
maximumPoolSize:线程池最大线程数,它表示在线程池中最多能创建多少个线程。
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。
unit:参数keepAliveTime的时间单位,有7种取值。在TimeUnit类中有7种静态属性:
TimeUnit.DAYS;// 天
TimeUnit.HOURS;// 小时
TimeUnit.MINUTES;// 分钟
TimeUnit.SECONDS;// 秒
TimeUnit.MILLISECONDS;// 毫秒
TimeUnit.MICROSECONDS;// 微妙
TimeUnit.NANOSECONDS;// 纳秒
workQueue:一个阻塞队列,用来存储等待执行的任务。一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue:有界阻塞队列。
LinkedBlockingQueue:无界阻塞队列。
SynchronousQueue:不存储元素阻塞队列,即单个元素阻塞队列。
DelayQueue:延时阻塞队列。
threadFactory:线程工厂,主要用来创建线程。
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:抛异常,默认策略。
ThreadPoolExecutor.CallerRunsPolicy:调用线程处理新任务。
ThreadPoolExecutor.DiscardPolicy:丢弃新任务。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃等待最久任务,将新任务加入队列。
2 使用 2.1 线程池状态 常用属性:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int runStateOf (int c) { return c & ~CAPACITY; }private static int workerCountOf (int c) { return c & CAPACITY; }private static int ctlOf (int rs, int wc) { return rs | wc; }
状态的转换:
RUNNING -> SHUTDOWN:调用了shutdown()方法。
RUNNING / SHUTDOWN -> STOP:调用了shutdownNow()方法。
SHUTDOWN -> TIDYING:当队列中任务都被取出执行完成,并且所有工作线程都结束了任务,再没有未被执行的任务。
STOP -> TIDYING:线程池中没有正在运行的线程,任务队列中任务都被取消。
TIDYING -> TERMINATED:钩子方法terminated()执行完毕后。
状态说明:
RUNNING:运行态,可处理新任务并执行队列中的任务。
SHUTDOW:关闭态,不接受新任务,但处理队列中的任务。
STOP:停止态,不接受新任务,不处理队列中任务,且打断运行中任务。
TIDYING:整理态,所有任务已经结束,将执行terminated()方法。
TERMINATED:结束态,terminated()方法已完成。
创建线程池之后不会马上创建线程,在提交任务后才会创建线程,可以手动设置创建线程池后马上创建线程:
prestartCoreThread():初始化一个核心线程。
prestartAllCoreThreads():初始化所有核心线程。
关闭线程池:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。
动态调整线程池容量:
setCorePoolSize():设置核心池大小。
setMaximumPoolSize():设置线程池最大能创建的线程数目大小。
2.2 按需创建线程池 Executors类中提供了几个静态方法创建线程池:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); } public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); } public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor (corePoolSize); } public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue ()); }:
它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了:
newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue。
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue。
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
newScheduledThreadPool将corePoolSize设置为入参,将maximumPoolSize设置为Integer.MAX_VALUE,使用的DelayQueue。
不建议直接使用这几个静态方法创建线程池:
SingleThreadPool和FixedThreadPool允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
CachedThreadPool和ScheduledThreadPool允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
建议通过ThreadPoolExecutor的方式创建线程池,这样的处理方式能更加明确线程池的运行规则,规避资源耗尽的风险。
另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。
2.3 线程创建策略 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务。
如果当前线程池中的线程数目大于等于corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中:
若添加成功,则该任务会等待空闲线程将其取出去执行。
若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务。
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理。
如果线程池中的线程数量大于corePoolSize时,当某线程空闲时间超过keepAliveTime时,线程将被终止,直至线程池中的线程数目不大于corePoolSize。
如果允许为核心池中的线程设置存活时间,当核心池中的线程空闲时间超过keepAliveTime时,核心线程也会被终止。
2.4 缓存队列 超出一定数量的任务会转移队列中,队列与池里的线程大小的关联表现在:
如果运行的线程数小于corePoolSize,会创建线程执行任务。
如果运行的线程已大于corePoolSize,会把新的任务放于队列中。如果队列已到最大时,会继续创建线程,直到超过maximumPoolSize。如果线程超过maximumPoolSize,将拒绝接收新的任务。
而添加任务到队列时,有几种常规的策略:
有界队列。如ArrayBlockingQueue,当定义了maximumPoolSizes时,使用有界队列可以预防资源的耗尽,但是增加了调整和控制队列的难度,队列的大小和线程池的大小是相互影响的,使用很大的队列和较小的线程池会减少CPU消耗、操作系统资源以及线程上下文开销,但却人为的降低了吞吐量。如果任务是频繁阻塞型的(I/O),系统是可以把时间片分给多个线程的。而采用较小的队列和较大的线程池,虽会造成CPU繁忙,但却会遇到调度开销,这也会降低吞吐量。
无界队列。如LinkedBlockingQueue,当核心线程正在工作时,使用不用预先定义大小的无界队列,使新任务等待,所以如果线程数是小于corePoolSize时,将不会有入队操作。这种策略将很适合那些相互独立的任务,如Web服务器。无界队列可能会堆积大量的请求,从而导致OOM。
直接传递。如SynchronousQueue,不存储元素的阻塞队列,将任务直接交给线程。每一个入队操作必须等待另一个线程移除操作,否则入队将一直阻塞。当处理一些可能有内部依赖的任务时,这种策略避免了加锁操作。直接传递一般不能限制maximumPoolSizes以避免拒绝接收新的任务,可能会造成增加无限多的线程导致OOM。
延时队列。如DelayQueue,队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。延时队列也是一个无界队列,因此往队列中插入数据的操作永远不会被阻塞,而只有获取数据的操作才会被阻塞。延时队列的maximumPoolSizes没有限制,可能会造成增加无限多的线程导致OOM。
2.5 拒绝策略 当线程数量达到缓存队列的最大容量时,线程池则已经饱和了,此时则不会接收新的任务。会调用RejectedExecutionHandler的rejectedExecution()
方法执行饱和策略。
在线程池内部预定义了几种处理策略:
终止执行(AbortPolicy)。默认策略,会抛出一个RejectedExecutionException运行异常到调用者线程来阻止系统运行。
调用者线程来运行任务(CallerRunsPolicy)。这种策略会由调用execute()方法的线程来执行任务,它提供了一个简单的反馈机制并能降低新任务的提交频率。
丢弃策略(DiscardPolicy)。丢弃提交的任务。
丢弃队列里最老的一个任务(DiscardOldestPolicy)。丢弃工作队列中等待最久一个任务,并将提交的任务加入队列。
2.6 合理配置 一般需要根据任务的类型来配置线程池大小。
如果是CPU密集型任务,即需要执行大量运算且没有阻塞的任务,就需要设置尽量少的线程数,减少线程上下文切换,参考值可以设为NCPU+1
。
如果是IO密集型任务,即线程存在阻塞或等待,并不是一直再执行,就需要设置尽量多的线程数,参考值可以设置为2*NCPU
。
另外,如果是IO密集型任务,也可以根据NCPU/(1-阻塞系数)
这个公式计算,阻塞系数是在0.8到0.9之间的一个值。
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
3 实现原理 3.1 属性 常用属性:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock ();private final HashSet<Worker> workers = new HashSet <Worker>();private final Condition termination = mainLock.newCondition();private int largestPoolSize;private long completedTaskCount;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private volatile long keepAliveTime;private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;
3.2 源码 3.2.1 内部类 内部类:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } }
3.2.2 提交任务 最核心的任务提交方法是execute()
方法,虽然通过submit()
方法也可以提交任务,但是实际上submit()
方法里面最终调用的还是execute()
方法。
提交任务:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
3.2.3 添加线程 添加线程:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
失败回滚:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
3.2.4 启动线程 启动线程:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
3.2.5 获取任务 获取任务:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
3.3.6 程序退出 程序退出:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
4 使用 创建线程池,核心线程数为2,最大线程数为4,任务队列大小为3,启动7个线程。
示例:
java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class Demo { public static void main (String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor (2 , 4 , 100 , TimeUnit.MILLISECONDS, new ArrayBlockingQueue <>(3 )); for (int i = 1 ; i <= 7 ; i++) { Runnable runnable = new DemoThread (i); executor.execute(runnable); System.out.println("线程编号:" + i + ",线程池:" + executor.getPoolSize() + ",队列:" + executor.getQueue().size()); } executor.shutdown(); } } class DemoThread implements Runnable { int taskNo = 0 ; public DemoThread (int taskNo) { this .taskNo = taskNo; } @SuppressWarnings("static-access") public void run () { try { System.out.println("task " + taskNo); Thread.currentThread().sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
结果:
log 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 线程编号:1,线程池:1,队列:0 task 1 线程编号:2,线程池:2,队列:0 线程编号:3,线程池:2,队列:1 线程编号:4,线程池:2,队列:2 task 2 线程编号:5,线程池:2,队列:3 线程编号:6,线程池:3,队列:3 线程编号:7,线程池:4,队列:3 task 6 task 7 // 等待1s task 3 task 4 task 5
说明:
在前2个任务放到线程池里时,没有超过核心线程数,所以创建新的线程,执行任务。
在第3个任务放到线程池里时,超过了核心线程数,所以放到了任务缓存队列里,等待执行任务。
在第4个任务放到线程池里时,超过了核心线程数,所以放到了任务缓存队列里,等待执行任务。
在第5个任务放到线程池里时,超过了核心线程数,所以放到了任务缓存队列里,等待执行任务。
在第6个任务放到线程池里时,超过了核心线程数,超过了缓存队列长度,线程池的线程数量小于线程池的最大容量,所以创建新的线程,执行任务。
在第7个任务放到线程池里时,超过了核心线程数,超过了缓存队列长度,线程池的线程数量小于线程池的最大容量,所以创建新的线程,执行任务。
等待任务执行完毕,释放线程,获取任务缓存队列里的任务,执行任务。
如果有第8个任务放到线程池里,超过了核心线程数,超过了缓存队列长度,线程池的线程数量大于线程池的最大容量,所以产生RejectedExecutionException拒绝任务异常。
条