- 為什么要有線程池?
- Java是實現(xiàn)和管理線程池有哪些方式?請簡單舉例如何使用。
- 為什么很多公司不允許使用Executors去創(chuàng)建線程池?那么推薦怎么使用呢?
- ThreadPoolExecutor有哪些核心的配置參數(shù)?請簡要說明
- ThreadPoolExecutor可以創(chuàng)建哪是哪三種線程池呢?
- 當隊列滿了并且worker的數(shù)量達到maxSize的時候,會怎么樣?
- 說說ThreadPoolExecutor有哪些RejectedExecutionHandler策略?默認是什么策略?
- 簡要說下線程池的任務執(zhí)行機制?execute–>addWorker–>runworker(getTask)
- 線程池中任務是如何提交的?
- 線程池中任務是如何關(guān)閉的?
- 在配置線程池的時候需要考慮哪些配置因素?
- 如何監(jiān)控線程池的狀態(tài)?
為什么要有線程池
- 降低資源消耗(線程無限制地創(chuàng)建,然后使用完畢后銷毀)
- 提高響應速度(無須創(chuàng)建線程)
- 提高線程的可管理性
ThreadPoolExecutor例子
- WorkerThread
SimpleThreadPool
- RejectedExecutionHandlerImpl.java
- MyMonitorThread.java
- WorkerPool.java
ThreadPoolExecutor使用詳解
Execute原理
- 線程池首先當前運行的線程數(shù)量是否少于corePoolSize。如果是,則創(chuàng)建一個新的工作線程來執(zhí)行任務。如果都在執(zhí)行任務,則進入2.
- 判斷BlockingQueue是否已經(jīng)滿了,倘若還沒有滿,則將線程放入BlockingQueue。否則進入3.
- 如果創(chuàng)建一個新的工作線程將使當前運行的線程數(shù)量超過maximumPoolSize,則交給RejectedExecutionHandler來處理任務。
參數(shù)
- corePoolSize線程池中的核心線程數(shù),當提交一個任務時,線程池創(chuàng)建一個新線程執(zhí)行任務,直到當前線程數(shù)等于corePoolSize,即使有其他空閑線程能夠執(zhí)行新來的任務,也會繼續(xù)創(chuàng)建線程;如果當前線程數(shù)為corePoolSize,繼續(xù)提交的任務被保存到阻塞隊列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。
- workQueue用來保存等待被執(zhí)行的任務的阻塞隊列.在JDK中提供了如下阻塞隊列:具體可以參考JUC集合:BlockQueue詳解
- ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,按FIFO排序任務;
- LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的阻塞隊列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQueue;
- SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue;
- PriorityBlockingQueue:具有優(yōu)先級的無界阻塞隊列;
- maximumPoolSize線程池中允許的最大線程數(shù)。如果當前阻塞隊列滿了,且繼續(xù)提交任務,則創(chuàng)建新的線程執(zhí)行任務,前提是當前線程數(shù)小于maximumPoolSize;當阻塞隊列是無界隊列,則maximumPoolSize則不起作用,因為無法提交至核心線程池的線程會一直持續(xù)地放入workQueue.
- keepAliveTime線程空閑時的存活時間,即當線程沒有任務執(zhí)行時,該線程繼續(xù)存活的時間;默認情況下,該參數(shù)只在線程數(shù)大于corePoolSize時才有用,超過這個時間的空閑線程將被終止;
- unitkeepAliveTime的單位
- threadFactory創(chuàng)建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設置一個具有識別度的線程名。默認為DefaultThreadFactory
- handler線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務,必須采取一種策略處理該任務,線程池提供了4種策略:
- AbortPolicy:直接拋出異常,默認策略;
- CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務;
- DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執(zhí)行當前任務;
- DiscardPolicy:直接丟棄任務;
三種類型
newFixedThreadPool
- 線程池里的線程數(shù)量不超過corePoolSize,這導致了maximumPoolSize和keepAliveTime將會是個無用參數(shù)
- 由于使用了無界隊列,所以FixedThreadPool永遠不會拒絕,即飽和策略失效
newSingleThreadExecutor
newCachedThreadPool
- 主線程調(diào)用SynchronousQueue的offer()方法放入task,倘若此時線程池中有空閑的線程嘗試讀取SynchronousQueue的task,即調(diào)用了SynchronousQueue的poll(),那么主線程將該task交給空閑線程.否則執(zhí)行(2)
- 當線程池為空或者沒有空閑的線程,則創(chuàng)建新的線程執(zhí)行任務.
- 執(zhí)行完任務的線程倘若在60s內(nèi)仍空閑,則會被終止.因此長時間空閑的CachedThreadPool不會持有任何線程資源.
關(guān)閉線程池
關(guān)閉方式-shutdown
關(guān)閉方式-shutdownNow
ThreadPoolExecutor源碼詳解
幾個關(guān)鍵屬性
內(nèi)部狀態(tài)
- RUNNING:-1<<COUNT_BITS,即高3位為111,該狀態(tài)的線程池會接收新任務,并處理阻塞隊列中的任務;
- SHUTDOWN:0<<COUNT_BITS,即高3位為000,該狀態(tài)的線程池不會接收新任務,但會處理阻塞隊列中的任務;
- STOP:1<<COUNT_BITS,即高3位為001,該狀態(tài)的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務;
- TIDYING:2<<COUNT_BITS,即高3位為010,所有的任務都已經(jīng)終止;
- TERMINATED:3<<COUNT_BITS,即高3位為011,terminated()方法已經(jīng)執(zhí)行完成
任務的執(zhí)行
execute–>addWorker–>runworker(getTask)
execute()方法
ThreadPoolExecutor.execute(task)實現(xiàn)了Executor.execute(task)
- 為什么需要doublecheck線程池的狀態(tài)?
addWorker方法
Worker類的runworker方法
- 繼承了AQS類,可以方便的實現(xiàn)工作線程的中止操作;
- 實現(xiàn)了Runnable接口,可以將自身作為一個任務在工作線程中執(zhí)行;
- 當前提交的任務firstTask作為參數(shù)傳入Worker的構(gòu)造方法;
- 線程啟動之后,通過unlock方法釋放鎖,設置AQS的state為0,表示運行可中斷;
- Worker執(zhí)行firstTask或從workQueue中獲取任務:進行加鎖操作,保證thread不被其他線程中斷(除非線程池被中斷)檢查線程池狀態(tài),倘若線程池處于中斷狀態(tài),當前線程將中斷。執(zhí)行beforeExecute執(zhí)行任務的run方法執(zhí)行afterExecute方法解鎖操作
通過getTask方法從阻塞隊列中獲取等待的任務,如果隊列中沒有任務,getTask方法會被阻塞并掛起,不會占用cpu資源;
注意這里一段代碼是keepAliveTime起作用的關(guān)鍵:
任務的提交
- submit任務,等待線程池execute
- 執(zhí)行FutureTask類的get方法時,會把主線程封裝成WaitNode節(jié)點并保存在waiters鏈表中,并阻塞等待運行結(jié)果;
- FutureTask任務執(zhí)行完成后,通過UNSAFE設置waiters相應的waitNode為null,并通過LockSupport類unpark方法喚醒主線程;
- Callable接口類似于Runnable,只是Runnable沒有返回值。
- Callable任務除了返回正常結(jié)果之外,如果發(fā)生異常,該異常也會被返回,即Future可以拿到異步執(zhí)行任務各種結(jié)果;
- Future.get方法會導致主線程阻塞,直到Callable任務執(zhí)行完成;
submit方法
AbstractExecutorService.submit()實現(xiàn)了ExecutorService.submit()可以獲取執(zhí)行完的返回值,而ThreadPoolExecutor是AbstractExecutorService.submit()的子類,所以submit方法也是ThreadPoolExecutor`的方法。
FutureTask對象
- 內(nèi)部狀態(tài)
- get方法
內(nèi)部通過awaitDone方法對主線程進行阻塞,具體實現(xiàn)如下:
- 判斷FutureTask當前的state,如果大于COMPLETING,說明任務已經(jīng)執(zhí)行完成,則直接返回;
- 如果當前state等于COMPLETING,說明任務已經(jīng)執(zhí)行完,這時主線程只需通過yield方法讓出cpu資源,等待state變成NORMAL;
- 通過WaitNode類封裝當前線程,并通過UNSAFE添加到waiters鏈表;
- 最終通過LockSupport的park或parkNanos掛起線程;
- 通過執(zhí)行Callable任務的call方法;
- 如果call執(zhí)行成功,則通過set方法保存結(jié)果;
- 如果call執(zhí)行有異常,則通過setException保存異常;
任務的關(guān)閉
shutdownNow做的比較絕,它先將線程池狀態(tài)設置為STOP,然后拒絕所有提交的任務。最后中斷左右正在運行中的worker,然后清空任務隊列。
為什么線程池不允許使用Executors去創(chuàng)建?推薦方式是什么?
- newFixedThreadPool和newSingleThreadExecutor:主要問題是堆積的請求處理隊列可能會耗費非常大的內(nèi)存,甚至OOM。
- newCachedThreadPool和newScheduledThreadPool:主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會創(chuàng)建數(shù)量非常多的線程,甚至OOM。
推薦方式1
推薦方式2
推薦方式3
- CPU密集型:盡可能少的線程,Ncpu+1
- IO密集型:盡可能多的線程,Ncpu*2,比如數(shù)據(jù)庫連接池
- 混合型:CPU密集型的任務與IO密集型任務的執(zhí)行時間差別較小,拆分為兩個線程池;否則沒有必要拆分。
監(jiān)控線程池的狀態(tài)
- getTaskCount()Returnstheapproximatetotalnumberoftasksthathaveeverbeenscheduledforexecution.
- getCompletedTaskCount()Returnstheapproximatetotalnumberoftasksthathavecompletedexecution.返回結(jié)果少于getTaskCount()。
- getLargestPoolSize()Returnsthelargestnumberofthreadsthathaveeversimultaneouslybeeninthepool.返回結(jié)果小于等于maximumPoolSize
- getPoolSize()Returnsthecurrentnumberofthreadsinthepool.
- getActiveCount()Returnstheapproximatenumberofthreadsthatareactivelyexecutingtasks.
參考文章
- 《Java并發(fā)編程藝術(shù)》
- https://www.jianshu.com/p/87bff5cc8d8c
- https://blog.csdn.net/programmer_at/article/details/79799267
- https://blog.csdn.net/u013332124/article/details/79587436
- https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice
由于問答代碼塊插入受限,部分代碼未完全展示,若有需要可閱讀原文:戳我閱讀原文