專用通道>

您當前所在位置: 首頁 > 行業新聞 > IT技術討論 >

IT技術討論

JAVA 線程池

發布者:亚游成都錦江點擊: 分享到
一、ThreadPoolExecutor的使用 1.ThreadPoolExecutor的完整構造方法是: ThreadPoolExecutor ( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue Runnable workQueue, RejectedExecutionHandler hand

一、ThreadPoolExecutor的使用

1.ThreadPoolExecutor的完整構造方法是:

ThreadPoolExecutor ( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue< Runnable >  workQueue, RejectedExecutionHandler hander)
申請免費試學】【在線報名在線谘詢

2.Executors中的幾個工廠方法

ThreadPoolExecutor是Executors類的底層實現

①newFixedThreadPool ( int nThreads)——固定大小線程池

[java]

1
2
3
4
5
public static ExecutorService newFixedThreadPool(intnThreads) { 
        return new ThreadPoolExecutor(nThreads, nThreads, 
                                      0L, TimeUnit.MILLISECONDS, 
                                      newLinkedBlockingQueue<Runnable>()); 
    }

可以看到corePoolSize和maximunPoolSize的大小是一樣的(其實使用無界Queue的話maximumPoolSize參數是沒有意義的),keepAliveTime和unit的設置表明該實現不想keepAlive,最後的BlockingQueue選擇了LinkedBlockingQueue,該Queue有一個特點就是它是無界的;
申請免費試學】【在線報名在線谘詢

②newSingleThreadExecutor()——單線程

[java]

1
2
3
4
5
public static ExecutorService newSingleThreadExecutor() { 
        return new ThreadPoolExecutor(11
                                      0L, TimeUnit.MILLISECONDS, 
                                      newLinkedBlockingQueue<Runnable>()); 
    }

申請免費試學】【在線報名在線谘詢

③newCachedThreadPool()——無界線程池,可以進行自動線程回收

[java]

 

1
2
3
4
5
public static ExecutorService newCachedThreadPool() { 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                                      60L, TimeUnit.SECONDS, 
                                      newSynchronousQueue<Runnable>()); 
    }

在BlockingQueue的選擇上使用SynchronousQueue,在該Queue中,每個插入操作必須等待另一個線程的對應移除操作。比如,我先添加一個元素,接下來如果想繼續添加則會阻塞,直到另一個線程取走一個元素,反之亦然(也就是緩衝區為1的生產者消費者模式)。
申請免費試學】【在線報名在線谘詢

3.corePoolSize、workQueue、maximumPoolSize的優先級

當一個任務通過execute(Runnable)方法欲添加到線程池時: 
如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。

如果此時線程池中的數量等於 corePoolSize,但是緩衝隊列 workQueue未滿,那麽任務被放入緩衝隊列。

如果此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。

如果此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那麽通過 handler所指定的策略來處理此任務。

也就是:處理任務的優先級為:

核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。

當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
申請免費試學】【在線報名在線谘詢

二、BlockingQueue<Runnable> workQueue

1.BlockingQueue的作用

所有的BlockingQueue都可以用於傳輸和保持提交的任務,可以使用此隊列對池大小進行交互。

如果運行的線程小於corePoolSize,則Executor始終首選添加新的線程,而不進行排隊,也就是當當前運行的線程小於corePoolSize時,則任務根本不會存放,添加到Queue中,而是直接創建一個新的thread;

如果運行的線程等於或多餘corePoolSize,則Executor首選將請求加入Queue,而不是添加新的thread;

如果無法將請求加入Queue,則創建新線程,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕。
申請免費試學】【在線報名在線谘詢

2.排隊的三種策略

①直接提交,工作隊列的默認選項是SynchronousQueue,它將任務交給線程而不保持它們,如果不存在可用於立即執行的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界maximumPoolSize以避免拒絕新提交的任務,當命令以超過隊列所能處理的平均連續到達時,此策略允許無界線程具有增長的可能性。

②無界隊列,使用無界隊列如LinkedBlockingQueue將導致在所有corePoolSize線程都忙時新任務在隊列中等待,這樣創建的線程數就不會超過corePoolSize,因此maximumPoolSize的值也就無效了。當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列。

③有界隊列,當使用有限的maximumPoolSize時,有界隊列有助於防止資源耗盡,但是可能較難調整和控製。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作係統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則係統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。  
申請免費試學】【在線報名在線谘詢

3.幾個例子

①直接提交策略SynchronousQueue,由於該Queue本身的特性,在某次添加元素後必須等待其他線程取走後才能繼續添加。

[java]

1
2
new ThreadPoolExecutor(2330, TimeUnit.SECONDS,  
            new SynchronousQueue<Runnable>());

   當核心線程已經有2個正在運行.

--此時繼續來了一個任務(A),根據前麵介紹的“如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。”,所以A被添加到queue中。

--又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嚐試1中描述,但是由於使用的SynchronousQueue,所以一定無法加入進去。

--此時便滿足了上麵提到的“如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕。”,所以必然會新建一個線程來運行這個任務。

--暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,後一個呢?queue中無法插入,而線程數達到了maximumPoolSize,所以隻好執行異常策略了。

所以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就可以避免上述情況發生(如果希望限製就直接使用有界隊列)。對於使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。

什麽意思?如果你的任務A1,A2有內部關聯,A1需要先運行,那麽先提交A1,再提交A2,當使用SynchronousQueue亚游可以保證,A1必定先被執行,在A1麽有被執行前,A2不可能添加入queue中。

②使用無界隊列策略,即LinkedBlockingQueue

拿newFixedThreadPool來說,如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。

--如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。

--如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。

--OK,此時任務變加入隊列之中了,那什麽時候才會添加新線程呢?如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。

這裏就很有意思了,可能會出現無法加入隊列嗎?不像SynchronousQueue那樣有其自身的特點,對於無界隊列來說,總是可以加入的(資源耗盡,當然另當別論)。換句說,永遠也不會觸發產生新的線程!corePoolSize大小的線程數會一直運行,忙完當前的,就從隊列中拿任務開始運行。所以要防止任務瘋長,比如任務運行的實行比較長,而添加任務的速度遠遠超過處理任務的時間,而且還不斷增加,如果任務內存大一些,不一會兒就爆了,嗬嗬。

③有界隊列,即ArrayBlockingQueue

[java]

1
2
new ThreadPoolExecutor( 
             2430, TimeUnit.SECONDS, newArrayBlockingQueue<Runnable>(2));

假設,所有的任務都永遠無法執行完。

對於首先來的A,B來說直接運行,接下來,如果來了C,D,他們會被放到queu中,如果接下來再來E,F,則增加線程運行E,F。但是如果再來任務,隊列無法再接受了,線程數也到達最大的限製了,所以就會使用拒絕策略來處理。
申請免費試學】【在線報名在線谘詢

三、用途和用法

為什麽要用線程池:

減少了創建和銷毀線程的次數,每個工作線程都可以被重複利用,可執行多個任務;
可以根據係統的承受能力,調整線程池中工作線線程的數目,防止因為因為消耗過多的內存,而把服務器累趴下(每個線程需要大約1MB內存,線程開的越多,消耗的內存也就越大,最後死機)

網絡請求通常有兩種形式:第一種,請求不是很頻繁,而且每次連接後會保持相當一段時間來讀數據或者寫數據,最後斷開,如文件下載,網絡流媒體等。另一種形式是請求頻繁,但是連接上以後讀/寫很少量的數據就斷開連接。考慮到服務的並發問題,如果每個請求來到以後服務都為它啟動一個線程,那麽這對服務的資源可能會造成很大的浪費,特別是第二種情況。因為通常情況下,創建線程是需要一定的耗時的,設這個時間為T1,而連接後讀/寫服務的時間為T2,當T1>>T2時,亚游就應當考慮一種策略或者機製來控製,使得服務對於第二種請求方式也能在較低的功耗下完成。

通常,亚游可以用線程池來解決這個問題,首先,在服務啟動的時候,亚游可以啟動好幾個線程,並用一個容器(如線程池)來管理這些線程。當請求到來時,可以從池中去一個線程出來,執行任務(通常是對請求的響應),當任務結束後,再將這個線程放入池中備用;如果請求到來而池中沒有空閑的線程,該請求需要排隊等候。最後,當服務關閉時銷毀該池即可。
申請免費試學】【在線報名在線谘詢

 



亚游谘詢老師

成都優越教育谘詢有限公司(亚游成都錦江校區介紹)

成都優越教育谘詢有限公司(亚游成都錦江校區)是亚游總部在成都設立的一家示範校區。涵蓋ACCP、Java、.Net、網絡營銷、市場營銷,遊戲開發等多專業校區,承擔教學、就業示範、以及教師培養輸送等職責。
亚游作為北京大學下屬的專業計算機學校,以北京大學強大師資作為依托,連續13年被評為“中國IT教育第一品牌”,累計培養60+萬優秀軟件工程師,是名符其實的軟件工程師的搖籃……請認準品牌名校——亚游成都錦江校區,地址:成都市春熙路北口東行500米(大慈寺22號)。