在Java并發(fā)編程中?
提示
請帶著這些問題繼續(xù)后文,會很大程度上幫助你更好的理解相關(guān)知識點(diǎn)。@pdai為什么要有線程池?Java是實(shí)現(xiàn)和管理線程池有哪些方式? 請簡單舉例如何使用。為什么很多公司不允許使用Executors去創(chuàng)建線程池? 那么推薦怎么使用呢?ThreadPoolExecutor有哪些核心的配置參數(shù)? 請簡要說明ThreadPoolExecutor可以創(chuàng)建哪是哪三種線程池呢?當(dāng)隊(duì)列滿了并且worker的數(shù)量達(dá)到maxSize的時(shí)候,會怎么樣?說說ThreadPoolExecutor有哪些RejectedExecutionHandler策略? 默認(rèn)是什么策略?簡要說下線程池的任務(wù)執(zhí)行機(jī)制? execute –> addWorker –>runworker (getTask)線程池中任務(wù)是如何提交的?線程池中任務(wù)是如何關(guān)閉的?在配置線程池的時(shí)候需要考慮哪些配置因素?如何監(jiān)控線程池的狀態(tài)?為什么要有線程池線程池能夠?qū)€程進(jìn)行統(tǒng)一分配,調(diào)優(yōu)和監(jiān)控:降低資源消耗(線程無限制地創(chuàng)建,然后使用完畢后銷毀)提高響應(yīng)速度(無須創(chuàng)建線程)提高線程的可管理性ThreadPoolExecutor例子Java是如何實(shí)現(xiàn)和管理線程池的?從JDK 5開始,把工作單元與執(zhí)行機(jī)制分離開來,工作單元包括Runnable和Callable,而執(zhí)行機(jī)制由Executor框架提供。WorkerThreadSimpleThreadPool
程序中我們創(chuàng)建了固定大小為五個(gè)工作線程的線程池。然后分配給線程池十個(gè)工作,因?yàn)榫€程池大小為五,它將啟動五個(gè)工作線程先處理五個(gè)工作,其他的工作則處于等待狀態(tài),一旦有工作完成,空閑下來工作線程就會撿取等待隊(duì)列里的其他工作進(jìn)行執(zhí)行。這里是以上程序的輸出。輸出表明線程池中至始至終只有五個(gè)名為 "pool-1-thread-1" 到 "pool-1-thread-5" 的五個(gè)線程,這五個(gè)線程不隨著工作的完成而消亡,會一直存在,并負(fù)責(zé)執(zhí)行分配給線程池的任務(wù),直到線程池消亡。Executors 類提供了使用了 ThreadPoolExecutor 的簡單的 ExecutorService 實(shí)現(xiàn),但是 ThreadPoolExecutor 提供的功能遠(yuǎn)不止于此。我們可以在創(chuàng)建 ThreadPoolExecutor 實(shí)例時(shí)指定活動線程的數(shù)量,我們也可以限制線程池的大小并且創(chuàng)建我們自己的 RejectedExecutionHandler 實(shí)現(xiàn)來處理不能適應(yīng)工作隊(duì)列的工作。這里是我們自定義的 RejectedExecutionHandler 接口的實(shí)現(xiàn)。RejectedExecutionHandlerImpl.javaThreadPoolExecutor 提供了一些方法,我們可以使用這些方法來查詢 executor 的當(dāng)前狀態(tài),線程池大小,活動線程數(shù)量以及任務(wù)數(shù)量。因此我是用來一個(gè)監(jiān)控線程在特定的時(shí)間間隔內(nèi)打印 executor 信息。MyMonitorThread.java這里是使用 ThreadPoolExecutor 的線程池實(shí)現(xiàn)例子。WorkerPool.java注意在初始化 ThreadPoolExecutor 時(shí),我們保持初始池大小為 2,最大池大小為 4 而工作隊(duì)列大小為 2。因此如果已經(jīng)有四個(gè)正在執(zhí)行的任務(wù)而此時(shí)分配來更多任務(wù)的話,工作隊(duì)列將僅僅保留他們(新任務(wù))中的兩個(gè),其他的將會被RejectedExecutionHandlerImpl 處理。上面程序的輸出可以證實(shí)以上觀點(diǎn)。注意 executor 的活動任務(wù)、完成任務(wù)以及所有完成任務(wù),這些數(shù)量上的變化。我們可以調(diào)用 shutdown() 方法來結(jié)束所有提交的任務(wù)并終止線程池。ThreadPoolExecutor使用詳解其實(shí)java線程池的實(shí)現(xiàn)原理很簡單,說白了就是一個(gè)線程集合workerSet和一個(gè)阻塞隊(duì)列workQueue。當(dāng)用戶向線程池提交一個(gè)任務(wù)(也就是線程)時(shí),線程池會先將任務(wù)放入workQueue中。workerSet中的線程會不斷的從workQueue中獲取線程然后執(zhí)行。當(dāng)workQueue中沒有任務(wù)的時(shí)候,worker就會阻塞,直到隊(duì)列中有任務(wù)了就取出來繼續(xù)執(zhí)行。Execute原理當(dāng)一個(gè)任務(wù)提交至線程池之后:線程池首先當(dāng)前運(yùn)行的線程數(shù)量是否少于corePoolSize。如果是,則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù)。如果都在執(zhí)行任務(wù),則進(jìn)入2.判斷BlockingQueue是否已經(jīng)滿了,倘若還沒有滿,則將線程放入BlockingQueue。否則進(jìn)入3.如果創(chuàng)建一個(gè)新的工作線程將使當(dāng)前運(yùn)行的線程數(shù)量超過maximumPoolSize,則交給RejectedExecutionHandler來處理任務(wù)。當(dāng)ThreadPoolExecutor創(chuàng)建新線程時(shí),通過CAS來更新線程池的狀態(tài)ctl.參數(shù)corePoolSize 線程池中的核心線程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize, 即使有其他空閑線程能夠執(zhí)行新來的任務(wù), 也會繼續(xù)創(chuàng)建線程;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。workQueue 用來保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列. 在JDK中提供了如下阻塞隊(duì)列: 具體可以參考JUC 集合: BlockQueue詳解ArrayBlockingQueue: 基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按FIFO排序任務(wù);LinkedBlockingQueue: 基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQueue;SynchronousQueue: 一個(gè)不存儲元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue;PriorityBlockingQueue: 具有優(yōu)先級的無界阻塞隊(duì)列;LinkedBlockingQueue比ArrayBlockingQueue在插入刪除節(jié)點(diǎn)性能方面更優(yōu),但是二者在put(), take()任務(wù)的時(shí)均需要加鎖,SynchronousQueue使用無鎖算法,根據(jù)節(jié)點(diǎn)的狀態(tài)判斷執(zhí)行,而不需要用到鎖,其核心是Transfer.transfer().maximumPoolSize 線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize;當(dāng)阻塞隊(duì)列是無界隊(duì)列, 則maximumPoolSize則不起作用, 因?yàn)闊o法提交至核心線程池的線程會一直持續(xù)地放入workQueue.keepAliveTime 線程空閑時(shí)的存活時(shí)間,即當(dāng)線程沒有任務(wù)執(zhí)行時(shí),該線程繼續(xù)存活的時(shí)間;默認(rèn)情況下,該參數(shù)只在線程數(shù)大于corePoolSize時(shí)才有用, 超過這個(gè)時(shí)間的空閑線程將被終止;unit keepAliveTime的單位threadFactory 創(chuàng)建線程的工廠,通過自定義的線程工廠可以給每個(gè)新建的線程設(shè)置一個(gè)具有識別度的線程名。默認(rèn)為DefaultThreadFactoryhandler 線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:AbortPolicy: 直接拋出異常,默認(rèn)策略;CallerRunsPolicy: 用調(diào)用者所在的線程來執(zhí)行任務(wù);DiscardOldestPolicy: 丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);DiscardPolicy: 直接丟棄任務(wù);當(dāng)然也可以根據(jù)應(yīng)用場景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務(wù)。三種類型newFixedThreadPool線程池的線程數(shù)量達(dá)corePoolSize后,即使線程池沒有可執(zhí)行任務(wù)時(shí),也不會釋放線程。FixedThreadPool的工作隊(duì)列為無界隊(duì)列LinkedBlockingQueue(隊(duì)列容量為Integer.MAX_VALUE), 這會導(dǎo)致以下問題:線程池里的線程數(shù)量不超過corePoolSize,這導(dǎo)致了maximumPoolSize和keepAliveTime將會是個(gè)無用參數(shù)由于使用了無界隊(duì)列, 所以FixedThreadPool永遠(yuǎn)不會拒絕, 即飽和策略失效newSingleThreadExecutor初始化的線程池中只有一個(gè)線程,如果該線程異常結(jié)束,會重新創(chuàng)建一個(gè)新的線程繼續(xù)執(zhí)行任務(wù),唯一的線程可以保證所提交任務(wù)的順序執(zhí)行.由于使用了無界隊(duì)列, 所以SingleThreadPool永遠(yuǎn)不會拒絕, 即飽和策略失效newCachedThreadPool線程池的線程數(shù)可達(dá)到Integer.MAX_VALUE,即2147483647,內(nèi)部使用SynchronousQueue作為阻塞隊(duì)列; 和newFixedThreadPool創(chuàng)建的線程池不同,newCachedThreadPool在沒有任務(wù)執(zhí)行時(shí),當(dāng)線程的空閑時(shí)間超過keepAliveTime,會自動釋放線程資源,當(dāng)提交新任務(wù)時(shí),如果沒有空閑線程,則創(chuàng)建新線程執(zhí)行任務(wù),會導(dǎo)致一定的系統(tǒng)開銷; 執(zhí)行過程與前兩種稍微不同:主線程調(diào)用SynchronousQueue的offer()方法放入task, 倘若此時(shí)線程池中有空閑的線程嘗試讀取 SynchronousQueue的task, 即調(diào)用了SynchronousQueue的poll(), 那么主線程將該task交給空閑線程. 否則執(zhí)行(2)當(dāng)線程池為空或者沒有空閑的線程, 則創(chuàng)建新的線程執(zhí)行任務(wù).執(zhí)行完任務(wù)的線程倘若在60s內(nèi)仍空閑, 則會被終止. 因此長時(shí)間空閑的CachedThreadPool不會持有任何線程資源.關(guān)閉線程池遍歷線程池中的所有線程,然后逐個(gè)調(diào)用線程的interrupt方法來中斷線程.關(guān)閉方式 - shutdown將線程池里的線程狀態(tài)設(shè)置成SHUTDOWN狀態(tài), 然后中斷所有沒有正在執(zhí)行任務(wù)的線程.關(guān)閉方式 - shutdownNow將線程池里的線程狀態(tài)設(shè)置成STOP狀態(tài), 然后停止所有正在執(zhí)行或暫停任務(wù)的線程. 只要調(diào)用這兩個(gè)關(guān)閉方法中的任意一個(gè), isShutDown() 返回true. 當(dāng)所有任務(wù)都成功關(guān)閉了, isTerminated()返回true.ThreadPoolExecutor源碼詳解幾個(gè)關(guān)鍵屬性內(nèi)部狀態(tài)其中AtomicInteger變量ctl的功能非常強(qiáng)大: 利用低29位表示線程池中線程數(shù),通過高3位表示線程池的運(yùn)行狀態(tài):RUNNING: -1 << COUNT_BITS,即高3位為111,該狀態(tài)的線程池會接收新任務(wù),并處理阻塞隊(duì)列中的任務(wù);SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態(tài)的線程池不會接收新任務(wù),但會處理阻塞隊(duì)列中的任務(wù);STOP : 1 << COUNT_BITS,即高3位為001,該狀態(tài)的線程不會接收新任務(wù),也不會處理阻塞隊(duì)列中的任務(wù),而且會中斷正在運(yùn)行的任務(wù);TIDYING : 2 << COUNT_BITS,即高3位為010, 所有的任務(wù)都已經(jīng)終止;TERMINATED: 3 << COUNT_BITS,即高3位為011, terminated()方法已經(jīng)執(zhí)行完成任務(wù)的執(zhí)行execute –> addWorker –>runworker (getTask)線程池的工作線程通過Woker類實(shí)現(xiàn),在ReentrantLock鎖的保證下,把Woker實(shí)例插入到HashSet后,并啟動Woker中的線程。 從Woker類的構(gòu)造方法實(shí)現(xiàn)可以發(fā)現(xiàn): 線程工廠在創(chuàng)建線程thread時(shí),將Woker實(shí)例本身this作為參數(shù)傳入,當(dāng)執(zhí)行start方法啟動線程thread時(shí),本質(zhì)是執(zhí)行了Worker的runWorker方法。 firstTask執(zhí)行完成之后,通過getTask方法從阻塞隊(duì)列中獲取等待的任務(wù),如果隊(duì)列中沒有任務(wù),getTask方法會被阻塞并掛起,不會占用cpu資源;execute()方法ThreadPoolExecutor.execute(task)實(shí)現(xiàn)了Executor.execute(task)為什么需要double check線程池的狀態(tài)?在多線程環(huán)境下,線程池的狀態(tài)時(shí)刻在變化,而ctl.get()是非原子操作,很有可能剛獲取了線程池狀態(tài)后線程池狀態(tài)就改變了。判斷是否將command加入workque是線程池之前的狀態(tài)。倘若沒有double check,萬一線程池處于非running狀態(tài)(在多線程環(huán)境下很有可能發(fā)生),那么command永遠(yuǎn)不會執(zhí)行。addWorker方法從方法execute的實(shí)現(xiàn)可以看出: addWorker主要負(fù)責(zé)創(chuàng)建新的線程并執(zhí)行任務(wù) 線程池創(chuàng)建新線程執(zhí)行任務(wù)時(shí),需要 獲取全局鎖:Worker類的runworker方法繼承了AQS類,可以方便的實(shí)現(xiàn)工作線程的中止操作;實(shí)現(xiàn)了Runnable接口,可以將自身作為一個(gè)任務(wù)在工作線程中執(zhí)行;當(dāng)前提交的任務(wù)firstTask作為參數(shù)傳入Worker的構(gòu)造方法;一些屬性還有構(gòu)造方法:runWorker方法是線程池的核心:線程啟動之后,通過unlock方法釋放鎖,設(shè)置AQS的state為0,表示運(yùn)行可中斷;Worker執(zhí)行firstTask或從workQueue中獲取任務(wù):進(jìn)行加鎖操作,保證thread不被其他線程中斷(除非線程池被中斷)檢查線程池狀態(tài),倘若線程池處于中斷狀態(tài),當(dāng)前線程將中斷。執(zhí)行beforeExecute執(zhí)行任務(wù)的run方法執(zhí)行afterExecute方法解鎖操作通過getTask方法從阻塞隊(duì)列中獲取等待的任務(wù),如果隊(duì)列中沒有任務(wù),getTask方法會被阻塞并掛起,不會占用cpu資源;getTask方法下面來看一下getTask()方法,這里面涉及到keepAliveTime的使用,從這個(gè)方法我們可以看出線程池是怎么讓超過corePoolSize的那部分worker銷毀的。注意這里一段代碼是keepAliveTime起作用的關(guān)鍵:
allowCoreThreadTimeOut為false,線程即使空閑也不會被銷毀;倘若為ture,在keepAliveTime內(nèi)仍空閑則會被銷毀。如果線程允許空閑等待而不被銷毀timed == false,workQueue.take任務(wù): 如果阻塞隊(duì)列為空,當(dāng)前線程會被掛起等待;當(dāng)隊(duì)列中有任務(wù)加入時(shí),線程被喚醒,take方法返回任務(wù),并執(zhí)行;如果線程不允許無休止空閑timed == true, workQueue.poll任務(wù): 如果在keepAliveTime時(shí)間內(nèi),阻塞隊(duì)列還是沒有任務(wù),則返回null;任務(wù)的提交submit任務(wù),等待線程池execute執(zhí)行FutureTask類的get方法時(shí),會把主線程封裝成WaitNode節(jié)點(diǎn)并保存在waiters鏈表中, 并阻塞等待運(yùn)行結(jié)果;FutureTask任務(wù)執(zhí)行完成后,通過UNSAFE設(shè)置waiters相應(yīng)的waitNode為null,并通過LockSupport類unpark方法喚醒主線程;在實(shí)際業(yè)務(wù)場景中,F(xiàn)uture和Callable基本是成對出現(xiàn)的,Callable負(fù)責(zé)產(chǎn)生結(jié)果,F(xiàn)uture負(fù)責(zé)獲取結(jié)果。Callable接口類似于Runnable,只是Runnable沒有返回值。Callable任務(wù)除了返回正常結(jié)果之外,如果發(fā)生異常,該異常也會被返回,即Future可以拿到異步執(zhí)行任務(wù)各種結(jié)果;Future.get方法會導(dǎo)致主線程阻塞,直到Callable任務(wù)執(zhí)行完成;submit方法AbstractExecutorService.submit()實(shí)現(xiàn)了ExecutorService.submit() 可以獲取執(zhí)行完的返回值, 而ThreadPoolExecutor 是AbstractExecutorService.submit()的子類,所以submit方法也是ThreadPoolExecutor`的方法。通過submit方法提交的Callable任務(wù)會被封裝成了一個(gè)FutureTask對象。通過Executor.execute方法提交FutureTask到線程池中等待被執(zhí)行,最終執(zhí)行的是FutureTask的run方法;FutureTask對象public class FutureTask<V> implements RunnableFuture<V> 可以將FutureTask提交至線程池中等待被執(zhí)行(通過FutureTask的run方法來執(zhí)行)內(nèi)部狀態(tài)內(nèi)部狀態(tài)的修改通過sun.misc.Unsafe修改get方法內(nèi)部通過awaitDone方法對主線程進(jìn)行阻塞,具體實(shí)現(xiàn)如下:
如果主線程被中斷,則拋出中斷異常;判斷FutureTask當(dāng)前的state,如果大于COMPLETING,說明任務(wù)已經(jīng)執(zhí)行完成,則直接返回;如果當(dāng)前state等于COMPLETING,說明任務(wù)已經(jīng)執(zhí)行完,這時(shí)主線程只需通過yield方法讓出cpu資源,等待state變成NORMAL;通過WaitNode類封裝當(dāng)前線程,并通過UNSAFE添加到waiters鏈表;最終通過LockSupport的park或parkNanos掛起線程;run方法FutureTask.run方法是在線程池中被執(zhí)行的,而非主線程通過執(zhí)行Callable任務(wù)的call方法;如果call執(zhí)行成功,則通過set方法保存結(jié)果;如果call執(zhí)行有異常,則通過setException保存異常;任務(wù)的關(guān)閉shutdown方法會將線程池的狀態(tài)設(shè)置為SHUTDOWN,線程池進(jìn)入這個(gè)狀態(tài)后,就拒絕再接受任務(wù),然后會將剩余的任務(wù)全部執(zhí)行完shutdownNow做的比較絕,它先將線程池狀態(tài)設(shè)置為STOP,然后拒絕所有提交的任務(wù)。最后中斷左右正在運(yùn)行中的worker,然后清空任務(wù)隊(duì)列。
更深入理解為什么線程池不允許使用Executors去創(chuàng)建? 推薦方式是什么?線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。 說明:Executors各個(gè)方法的弊端:newFixedThreadPool和newSingleThreadExecutor: ??主要問題是堆積的請求處理隊(duì)列可能會耗費(fèi)非常大的內(nèi)存,甚至OOM。newCachedThreadPool和newScheduledThreadPool: ??主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會創(chuàng)建數(shù)量非常多的線程,甚至OOM。推薦方式 1首先引入:commons-lang3包推薦方式 2首先引入:com.google.guava包推薦方式 3spring配置線程池方式:自定義線程工廠bean需要實(shí)現(xiàn)ThreadFactory,可參考該接口的其它默認(rèn)實(shí)現(xiàn)類,使用方式直接注入bean調(diào)用execute(Runnable task)方法即可配置線程池需要考慮因素從任務(wù)的優(yōu)先級,任務(wù)的執(zhí)行時(shí)間長短,任務(wù)的性質(zhì)(CPU密集/ IO密集),任務(wù)的依賴關(guān)系這四個(gè)角度來分析。并且近可能地使用有界的工作隊(duì)列。性質(zhì)不同的任務(wù)可用使用不同規(guī)模的線程池分開處理:CPU密集型: 盡可能少的線程,Ncpu+1IO密集型: 盡可能多的線程, Ncpu*2,比如數(shù)據(jù)庫連接池混合型: CPU密集型的任務(wù)與IO密集型任務(wù)的執(zhí)行時(shí)間差別較小,拆分為兩個(gè)線程池;否則沒有必要拆分。監(jiān)控線程池的狀態(tài)可以使用ThreadPoolExecutor以下方法:getTaskCount() Returns the approximate total number of tasks that have ever been scheduled for execution.getCompletedTaskCount() Returns the approximate total number of tasks that have completed execution. 返回結(jié)果少于getTaskCount()。getLargestPoolSize() Returns the largest number of threads that have ever simultaneously been in the pool. 返回結(jié)果小于等于maximumPoolSizegetPoolSize() Returns the current number of threads in the pool.getActiveCount() Returns the approximate number of threads that are actively executing tasks.參考文章《Java并發(fā)編程藝術(shù)》https://www.jianshu.com/p/87bff5cc8d8chttps://blog.csdn.net/programmer_at/article/details/79799267https://blog.csdn.net/u013332124/article/details/79587436https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice由于問答代碼塊插入受限,部分代碼未完全展示,若有需要可閱讀原文:戳我閱讀原文