中央地方攜手合作,全面啟動臺灣智慧水聯網

環保署8月17日在新竹市舉辦「2020臺灣智慧水聯網」啟動發表會,正式發表5種水質感測元件及固定式、移動式、手持式等3款水質感測器,同時發給13個縣市政府400臺感測器,合辦建構我國智慧水聯網。智慧水聯網為打破傳統人工採樣限制,利用水質感測器每分鐘感測河川水體的酸鹼度、電導度、溫度及溶氧,結合地理資訊系統定位,以AI人工智慧即時監控水質在時間及空間上的變化,達到環境智慧執法、推動公民環境教育以及拓展相關產業跨域應用等目標。

智慧水聯網過去4個月試驗應用期間,於3縣市共查獲17件異常案例,有2件地檢署偵辦中,持續追繳罰鍰,在水質感測器輔助下,打擊不肖廠商效果顯著,讓污染無所遁形。環保署為擴大科技執法應用層面,已與全臺13個縣市合作,將布設在陳情或列管工廠集中地區,24小時水質監控不間斷,並將針對偵測的污染,即時智慧派遣查處污染。

環保署表示,除了智慧稽查外,5G時代來臨,環保署所研發之手持式水質感測器具備成本低、使用簡單、結合個人行動裝置等優點,學術單位可利用感測器推動環境教育、探索水體生態等課程,也成為河川巡守隊巡查新利器,鼓勵隊員巡檢並參與污染通報、髒亂清理等工作,讓政府民眾齊心共創護水新生活。

環保署也持續活化產業跨域應用,利用新型感測技術推動各類用水水質管理,促進產業交流與技術媒合,提升產業數位轉型量能。未來環保署將以「優化環境治理、驅動民間感測、開展產業應用」為目標,達到「感測,無水不在;應用,無限可能」數位時代願景!

新聞來源:https://enews.epa.gov.tw/Page/3B3C62C78849F32F/2042f3c7-e254-47af-aeca-3a6999f0178c

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司“嚨底家”!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

特斯拉代號 D 新車出爐 搭半自動駕駛系統

  美國電動車厰特斯拉(Tesla)執行長馬斯克(Elon Musk)發表代號「D」的新一代豪華電動車 Model S,搭載雙電動馬達,由前一代的後輪驅動,升級為全時四輪傳動(AWD),起步加速到 100 公里只需 4 秒,充飽電可行駛 442.57 公里,採用鋰電池提供馬力,讓充電更快速方便,預計 12 月開始出貨。   馬斯克在美國時間 10 月 9 日發表先前他在推特預告的代號 D 車款,其實就是現有車款 Model S 車款的升級版,D 指的就是雙馬達(Dual Motors)。馬斯克表示,雙馬達 AWD 的設計,可提升行路能力,應付歐美惡劣天候,並提升效率、增強動力及加速性。   此外,新車款同時搭載全新自動駕駛(autopilot)軟體,配備包括長距離雷達、影像辨識鏡頭及 360 度的超音波聲納,雖無法完全自動駕駛,但可以判讀交通號誌與辨別行人,具備路邊停車輔助功能。且若停在自宅門口,車主可以召喚愛車,車內的車用電腦還能連結至車主行事曆,需要出門時汽車就能準備就緒。   (Source:)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

約定民生銀行 特斯拉將在中國20城市建400個充電樁

特斯拉與民生銀行達成合作,雙方約定將共同在中國20個城市的民生銀行自有營業廳及社區金融門店建至少400個充電樁。截至8月,特斯拉在中國建成的目的地充電裝置已超過370個。在此之前,特斯拉用了一年時間才在中國建了16個超級充電站。   此前,特斯拉已經與銀泰、SOHO和中國聯通簽署過類似的合作協定。其中,特斯拉與地產商銀泰集團宣佈合作啟動“目的地充電”項目,充電1個小時可行駛40公里;還計畫年內依託聯通營業廳在全國120個城市共同建設400個目的地充電站和20個城市超級充電站。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

電動機車 E-bike 上路 試營運前 30 分鐘免費

  除了有腳踏車 U-bike 可租借,現在連電動機車都能租借得到了。台灣城市動力公司開發自動化公共 E-bike 租借系統,試營運期間提供前 30 分鐘免費優惠,民眾前往租借還可參加多項大獎的抽獎活動。   台灣城市動力公司建構的E-bike租借系統,上周率先在新北市板橋區縣民大道與漢生東路交口的車站停車廣場供國人自由租借,民眾透過悠遊卡註冊後就可選車、取車、還車、付費,方便性與租借U-bike無異。開放試租首日在一個站點就吸引數百民眾註冊借車,該公司相信未來E-bike站點持續擴增後,勢必讓政府推動的機車電動化進度呈跳躍式進展。   台灣城市動力公司總經理洪國修表示,環保減碳是政府重要的施政項目,為提高民眾騎乘電動二輪車之意願,環保署除提供購買電動車 2 萬 4 千元的補助,台灣城市動力公司也配合這項政策,從便民方向建置全球首創的電池交換站系統(BES),民眾在 30 秒即可完成電池更換,免除電池充電過久或保固維護的不便。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

其他動力電池車望塵莫及 鋁氣電池續航達 1,600 公里

不讓美國特斯拉電動車專美於前,美鋁公司(Alcoa)宣稱已完成由以色列飛能(Phinergy)研發成功的鋁氣電池(Al–air batteries),並且達成航距超過 1,600 公里的測試。這項研發讓相繼投入電動車領域的德國寶馬、日本豐田等相形失色。   美鋁和以色列飛能最近在美國蒙特利爾維倫紐夫賽車場,測試了以鋁空氣電池為能源的車輛,驚人的是續航里程竟上升到 994 英里。在金屬空氣電池中,鋁空氣電池就是其中之一,如今這 1,600 千公里的續航力已遠超過包括特斯拉 Model S 在內等現有各類動力電池車。其他插電式混合動力車、增程式電動車(續航在 500 公里左右)或者豐天、現代所謂的氫燃料電池車(續航超過 500-600 公里)等,也同樣望塵莫及。   飛能公司說,這一組電池重僅約 100 公斤,裝有 50 塊電池板,平均 1 塊鋁空氣電池板就可驅動車輛行駛約 32 公里。但鋁空氣電池的放電過程會導致陽極腐蝕產生氫,導致陽極材料過度消耗,並增加電池內部損耗,使其商業化進程放緩。   然而,鋁空氣電池維護非常方便。按現有技術方案,鋁空氣電池是作為鋰電池的補充電源,鋰電池能量耗盡後,鋁空氣電池才接手,所以使用者無需進行充電,每 1-2 個月注入自來水維持其化學反應,每年也只需讓技術人員進行一次保養即可,電池壽命甚至可達 20 至 30 年。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

東元強攻菲電動車市場 擬明年第二季投產

東元海外新布局持續有所斬獲,公司年初獲菲律賓馬尼拉最大車隊千台電動吉普巴士訂單,首批 50 輛將於年底前陸續出貨。東元表示,菲律賓當地擁有銀行的前三大機車經銷商今年正式向公司提出合作,並期盼公司能在當地設廠就近供應,而內部評估對方所提之方案可行性很高,預期最快 2015 年第二季就會在當地投產,未來該廠將同時生產特種電動三輪車、電動吉普巴士等。   東元集團看好電動車市場大有可為,但無意與美國 Tesla、德國 BMW、日本豐田、本田及日產等國際大廠競逐純電動車市場商機,利用本身擅長的馬達及電控系統強項發展特種電動車,作為進軍全球電動車市場的試金石,目前拿下雲林西螺果菜市場 8 百輛電動搬運車,約 2 億元標案。   此外,東元在土耳其市場也有所進展;公司指出,預期土耳其市場 的 2014 年可帶進營收貢獻約 9 千萬元,2015 年布局效益將更進一步顯現,並以挑戰倍增為銷售目標;同時,土耳其亦可作為業務拓展的跳板,有助於進一步把旗下機電、馬達、電控與商用空調等產品,銷售至歐洲、中東與非洲等市場。  

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

日 TDK 開發出電動車無線充電技術

日本 TDK 開發出了用於電動車(EV)及插電式混合動力車(PHV)的非接觸供電系統。當電動車或插電式混合動力車停在使用了該系統的停車場時,無需使用有線電纜即可充電。若在公路上大規模鋪設該系統,甚至可能讓電動車等邊行駛邊充電,預計在 2018 年上路應用。   TDK 今年 4 月與美國麻州的創業企業 WiTricity 合作,獲得有關電動汽車無線供電的技術經驗。基於 WiTricity 的技術,並利用 TDK 擅長的磁線圈技術成功開發了無線供充電系統。系統由無線供電用送電線圈和受電線圈組成,線圈之間的距離即使超過 10 厘米也能送電。TDK 計劃從 2015 年上半年開始向汽車廠商等提供樣品。   此外,TDK 還針對電動車的行駛途中供電,開始試驗。在周長為 30 米的試驗場地路面下方,每隔 5 米鋪設 6 個送電線圈,讓試製車在上方行駛。據稱,目前已能以 5 公里時速行駛 120 公里。  

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

小師妹學JVM之:java的字節碼byte code簡介

目錄

  • 簡介
  • Byte Code的作用
  • 查看Byte Code字節碼
  • java Byte Code是怎麼工作的
  • 總結

簡介

Byte Code也叫做字節碼,是連接java源代碼和JVM的橋樑,源代碼編譯成為字節碼,而字節碼又被加載進JVM中運行。字節碼怎麼生成,怎麼查看字節碼,隱藏在Byte Code背後的秘密是什麼呢?快跟小師妹一起來看看吧。

Byte Code的作用

小師妹:F師兄,為什麼Java需要字節碼呢?直接編譯成為機器碼不是更快嗎?

小師妹,Java的設計初衷是一次編寫,到處運行。為了兼容各個平台的運行環境,java特別為各種平台設計了JVM。

我們可以把JVM看做是一種抽象,對外提供了統一的接口。這樣我們只需要編寫符合JVM規範的代碼,即可在JVM中運行。

回想下之前我們提到過的java的執行過程:

  1. 編寫java代碼文件比如Example.java
  2. 使用java編譯器javac將源文件編譯成為Example.class文件
  3. JVM加載生成的字節碼文件,將其轉換成為機器可以識別的native machine code執行

小師妹:F師兄,我有一個大膽的想法,JVM的作用是將字節碼解釋或者編譯成為機器碼。然後在相應的運行環境中執行。那麼有沒有可能,不需要JVM,不需要機器碼,而是直接在對應的平台上執行字節碼呢?

愛因斯坦說過沒有想像力的靈魂,就像沒有望遠鏡的天文台。小師妹你這個想法很好,這種實現有個專業的說法叫做:Java processor。

Java processor就是用硬件來實現的JVM。因此字節碼可以直接在Java processor中運行。

其中比較出名的是Jazelle DBX,這是一個主要支持J2ME環境的硬件架構。為了提升java在手機端的執行速度。

但是這樣做其實也是有缺點的,後面我們會講到,java字節碼中的指令非常非常多。所以如果用硬件來實現的話,就會非常非常複雜。

一般來說Java processor不會實現全部的字節碼中的功能,只會提供部分的實現。

查看Byte Code字節碼

小師妹:F師兄,那使用javac編譯過後的class文件跟字節碼有什麼關係呢?

class文件中大部分都是byte code,其他的部分是一些meta data元數據信息。這些組合在一起就是class文件了。

小師妹:F師兄,你說class文件是byte code,為什麼我在IDE中打開的時候,直接显示的是反編譯出來的源文件呢?

小師妹,這是IDE的一個便利功能。因為大多數情況下,沒有人想去看class文件的Byte code的,大家都是想去看看這個class文件的源文件是什麼樣的。

我們舉個最簡單的例子:

這個類中,我們定義了一個很簡單的testByteCode方法,裏面定義了兩個變量,然後返回他們兩個的和。

現在有兩種方法來查看這個類的Byte Code:

第一種方法是用javap命令:

javap -c ByteCodeUsage.class

生成的結果如上所示。

第二種方法就是在IDEA中,選中class文件,然後在view中選中show Bytecode:

我們看下輸出結果:

兩個的結果在显示上面可能有細微的差異,但是並不影響我們後面對其的解析。

java Byte Code是怎麼工作的

小師妹:F師兄,能講解一下這些byte code到底是怎麼工作的嗎?

首先我們要介紹一下JVM的實現是基於棧的結構的。為什麼要基於棧的結構呢?那是因為棧是最適合用來實現function互相調用的。

我們再回顧一下上面的testByteCode的字節碼。裏面有很多iconst,istore的東西,這些東西被稱作Opcode,也就是一些基於棧的操作指令。

上面講了java bytecode的操作指令其實有很多個。下面我們列出這些指令的部分介紹:

實在是太多了,這裏就不把所有的列出來了。

我們看到的指令名字其實是一個助記詞,真實的Opcode是一個佔用兩個字節的数字。

下面我們來詳細解釋一下testByteCode方法:

public int testByteCode();
    Code:
       0: iconst_1
       1: istore_1
       2: iconst_2
       3: istore_2
       4: iload_1
       5: iload_2
       6: iadd
       7: ireturn

第一步,iconst_1將int 1加載到stack中。

第二步,istore_1將入棧的int 1出棧,並存儲到變量1中。

第三步,iconst_2將int 2入棧。

第四步,istore_2將入棧的int 2出棧,並存儲到變量2中。

第五步,iload_1將變量1中的值入棧。

第六步,iload_2將變量2中的值入棧。

第七步,iadd將棧中的兩個變量出棧,並相加。然後將結果入棧。

第八步,ireturn將棧中的結果出棧。

這幾步實際上完美的還原了我們在testByteCode方法中定義的功能。

當然我們只介紹了最賤的byte code命令,通過這些簡單的命令可以組合成為更加複雜的java命令。

總結

本文介紹了java byte code的作用和具體的指令,並分析了一個簡單的例子來做說明。希望大家能夠掌握。

本文的例子https://github.com/ddean2009/learn-java-base-9-to-20

本文作者:flydean程序那些事

本文鏈接:http://www.flydean.com/jvm-byte-code/

本文來源:flydean的博客

歡迎關注我的公眾號:程序那些事,更多精彩等着您!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

Spring Boot2+Resilience4j實現容錯之Bulkhead

Resilience4j是一個輕量級、易於使用的容錯庫,其靈感來自Netflix Hystrix,但專為Java 8和函數式編程設計。輕量級,因為庫只使用Vavr,它沒有任何其他外部庫依賴項。相比之下,Netflix Hystrix對Archaius有一個編譯依賴關係,Archaius有更多的外部庫依賴關係,如Guava和Apache Commons。

Resilience4j提供高階函數(decorators)來增強任何功能接口、lambda表達式或方法引用,包括斷路器、速率限制器、重試或艙壁。可以在任何函數接口、lambda表達式或方法引用上使用多個裝飾器。優點是您可以選擇所需的裝飾器,而無需其他任何東西。

有了Resilience4j,你不必全力以赴,你可以選擇你需要的。

https://resilience4j.readme.io/docs/getting-started

概覽

Resilience4j提供了兩種艙壁模式(Bulkhead),可用於限制併發執行的次數:

  • SemaphoreBulkhead(信號量艙壁,默認),基於Java併發庫中的Semaphore實現。
  • FixedThreadPoolBulkhead(固定線程池艙壁),它使用一個有界隊列和一個固定線程池。

本文將演示在Spring Boot2中集成Resilience4j庫,以及在多併發情況下實現如上兩種艙壁模式。

引入依賴

在Spring Boot2項目中引入Resilience4j相關依賴

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-boot2</artifactId>
    <version>1.4.0</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-bulkhead</artifactId>
    <version>1.4.0</version>
</dependency>

由於Resilience4j的Bulkhead依賴於Spring AOP,所以我們需要引入Spring Boot AOP相關依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

我們可能還希望了解Resilience4j在程序中的運行時狀態,所以需要通過Spring Boot Actuator將其暴露出來

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

實現SemaphoreBulkhead(信號量艙壁)

resilience4j-spring-boot2實現了對resilience4j的自動配置,因此我們僅需在項目中的yml/properties文件中編寫配置即可。

SemaphoreBulkhead的配置項如下:

屬性配置 默認值 含義
maxConcurrentCalls 25 艙壁允許的最大并行執行量
maxWaitDuration 0 嘗試進入飽和艙壁時,應阻塞線程的最長時間。

添加配置

示例(使用yml):

resilience4j.bulkhead:
  configs:
    default:
      maxConcurrentCalls: 5
      maxWaitDuration: 20ms
  instances:
    backendA:
      baseConfig: default
    backendB:
      maxWaitDuration: 10ms
      maxConcurrentCalls: 20

如上,我們配置了SemaphoreBulkhead的默認配置為maxConcurrentCalls: 5,maxWaitDuration: 20ms。並在backendA實例上應用了默認配置,而在backendB實例上使用自定義的配置。這裏的實例可以理解為一個方法/lambda表達式等等的可執行單元。

編寫Bulkhead邏輯

定義一個受SemaphoreBulkhead管理的Service類:

@Service
public class BulkheadService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private BulkheadRegistry bulkheadRegistry;

    @Bulkhead(name = "backendA")
    public JsonNode getJsonObject() throws InterruptedException {
        io.github.resilience4j.bulkhead.Bulkhead.Metrics metrics = bulkheadRegistry.bulkhead("backendA").getMetrics();
        logger.info("now i enter the method!!!,{}<<<<<<{}", metrics.getAvailableConcurrentCalls(), metrics.getMaxAllowedConcurrentCalls());
        Thread.sleep(1000L);
        logger.info("now i exist the method!!!");
        return new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis());
    }
}

如上,我們將@Bulkhead註解放到需要管理的方法上面。並且通過name屬性指定該方法對應的Bulkhead實例名字(這裏我們指定的實例名字為backendA,所以該方法將會利用默認的配置)。

定義接口類:

@RestController
public class BulkheadResource {
    @Autowired
    private BulkheadService bulkheadService;

    @GetMapping("/json-object")
    public ResponseEntity<JsonNode> getJsonObject() throws InterruptedException {
        return ResponseEntity.ok(bulkheadService.getJsonObject());
    }
}

編寫測試:

首先添加測試相關依賴

<dependency>
    <groupId>io.rest-assured</groupId>
    <artifactId>rest-assured</artifactId>
    <version>3.0.5</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <version>4.0.2</version>
    <scope>test</scope>
</dependency>

這裏我們使用rest-assured和awaitility編寫多併發情況下的API測試

public class SemaphoreBulkheadTests extends Resilience4jDemoApplicationTests {
    @LocalServerPort
    private int port;
    @BeforeEach
    public void init() {
        RestAssured.baseURI = "http://localhost";
        RestAssured.port = port;
    }

    @Test
    public void 多併發訪問情況下的SemaphoreBulkhead測試() {
        CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
        IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
                statusList.add(given().get("/json-object").statusCode());
            }
        ));
        await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
        System.out.println(statusList);
        assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(5);
        assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(3);
    }
}

可以看到所有請求中只有前五個順利通過了,其餘三個都因為超時而導致接口報500異常。我們可能並不希望這種不友好的提示,因此Resilience4j提供了自定義的失敗回退方法。當請求併發量過大時,無法正常執行的請求將進入回退方法。

首先我們定義一個回退方法

private JsonNode fallback(BulkheadFullException exception) {
        return new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis());
    }

注意:回退方法應該和調用方法放置在同一類中,並且必須具有相同的方法簽名,並且僅帶有一個額外的目標異常參數。

然後在@Bulkhead註解中指定回退方法:@Bulkhead(name = "backendA", fallbackMethod = "fallback")

最後修改API測試代碼:

@Test
public void 多併發訪問情況下的SemaphoreBulkhead測試使用回退方法() {
    CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
    IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
            statusList.add(given().get("/json-object").statusCode());
        }
    ));
    await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
    System.out.println(statusList);
    assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
}

運行單元測試,成功!可以看到,我們定義的回退方法,在請求過量時起作用了。

實現FixedThreadPoolBulkhead(固定線程池艙壁)

FixedThreadPoolBulkhead的配置項如下:

配置名稱 默認值 含義
maxThreadPoolSize Runtime.getRuntime().availableProcessors() 配置最大線程池大小
coreThreadPoolSize Runtime.getRuntime().availableProcessors() - 1 配置核心線程池大小
queueCapacity 100 配置隊列的容量
keepAliveDuration 20ms 當線程數大於核心時,這是多餘空閑線程在終止前等待新任務的最長時間

添加配置

示例(使用yml):

resilience4j.thread-pool-bulkhead:
  configs:
    default:
      maxThreadPoolSize: 4
      coreThreadPoolSize: 2
      queueCapacity: 2
  instances:
    backendA:
      baseConfig: default
    backendB:
      maxThreadPoolSize: 1
      coreThreadPoolSize: 1
      queueCapacity: 1

如上,我們定義了一段簡單的FixedThreadPoolBulkhead配置,我們指定的默認配置為:maxThreadPoolSize: 4,coreThreadPoolSize: 2,queueCapacity: 2,並且指定了兩個實例,其中backendA使用了默認配置而backendB使用了自定義的配置。

編寫Bulkhead邏輯

定義一個受FixedThreadPoolBulkhead管理的方法:

@Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL)
public CompletableFuture<JsonNode> getJsonObjectByThreadPool() throws InterruptedException {
    io.github.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();
    logger.info("now i enter the method!!!,{}", metrics);
    Thread.sleep(1000L);
    logger.info("now i exist the method!!!");
    return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
}

如上定義和SemaphoreBulkhead的方法大同小異,其中@Bulkhead显示指定了type的屬性為Bulkhead.Type.THREADPOOL,表明其方法受FixedThreadPoolBulkhead管理。由於@Bulkhead默認的BulkheadSemaphoreBulkhead,所以在未指定type的情況下為SemaphoreBulkhead。另外,FixedThreadPoolBulkhead只對CompletableFuture方法有效,所以我們必創建返回CompletableFuture類型的方法。

定義接口類方法

@GetMapping("/json-object-with-threadpool")
public ResponseEntity<JsonNode> getJsonObjectWithThreadPool() throws InterruptedException, ExecutionException {
    return ResponseEntity.ok(bulkheadService.getJsonObjectByThreadPool().get());
}

編寫測試代碼

@Test
public void 多併發訪問情況下的ThreadPoolBulkhead測試() {
    CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
    IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
            statusList.add(given().get("/json-object-with-threadpool").statusCode());
        }
    ));
    await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
    System.out.println(statusList);
    assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(6);
    assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(2);
}

測試中我們并行請求了8次,其中6次請求成功,2次失敗。根據FixedThreadPoolBulkhead的默認配置,最多能容納maxThreadPoolSize+queueCapacity次請求(根據我們上面的配置為6次)。

同樣,我們可能並不希望這種不友好的提示,那麼我們可以指定回退方法,在請求無法正常執行時使用回退方法。

private CompletableFuture<JsonNode> fallbackByThreadPool(BulkheadFullException exception) {
    return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis()));
}
@Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL, fallbackMethod = "fallbackByThreadPool")
public CompletableFuture<JsonNode> getJsonObjectByThreadPoolWithFallback() throws InterruptedException {
    io.github.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();
    logger.info("now i enter the method!!!,{}", metrics);
    Thread.sleep(1000L);
    logger.info("now i exist the method!!!");
    return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
}

編寫測試代碼

@Test
public void 多併發訪問情況下的ThreadPoolBulkhead測試使用回退方法() {
    CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
    IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
            statusList.add(given().get("/json-object-by-threadpool-with-fallback").statusCode());
        }
    ));
    await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
    System.out.println(statusList);
    assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
}

由於指定了回退方法,所有請求的響應狀態都為正常了。

總結

本文首先簡單介紹了Resilience4j的功能及使用場景,然後具體介紹了Resilience4j中的Bulkhead。演示了如何在Spring Boot2項目中引入Resilience4j庫,使用代碼示例演示了如何在Spring Boot2項目中實現Resilience4j中的兩種Bulkhead(SemaphoreBulkhead和FixedThreadPoolBulkhead),並編寫API測試驗證我們的示例。

本文示例代碼地址:https://github.com/cg837718548/resilience4j-demo

歡迎訪問筆者博客:blog.dongxishaonian.tech

關注筆者公眾號,推送各類原創/優質技術文章 ⬇️

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

多線程高併發編程(12) — 阻塞算法實現ArrayBlockingQueue源碼分析

一.前言

  前文探究了非阻塞算法的實現ConcurrentLinkedQueue安全隊列,也說明了阻塞算法實現的兩種方式,使用一把鎖(出隊和入隊同一把鎖ArrayBlockingQueue)和兩把鎖(出隊和入隊各一把鎖LinkedBlockingQueue)來實現,今天來探究下ArrayBlockingQueue。

  ArrayBlockingQueue是一個阻塞隊列,底層使用數組結構實現,按照先進先出(FIFO)的原則對元素進行排序。

  ArrayBlockingQueue是一個線程安全的集合,通過ReentrantLock鎖來實現,在併發情況下可以保證數據的一致性。

  此外,ArrayBlockingQueue的容量是有限的,數組的大小在初始化時就固定了,不會隨着隊列元素的增加而出現擴容的情況,也就是說ArrayBlockingQueue是一個“有界緩存區”。

  從下圖可以看出,ArrayBlockingQueue是使用一個數組存儲元素的,當向隊列插入元素時,首先會插入到數組下標索引為6的位置,再有新元素進來時插入到索引為7的位置,依次類推,如果滿了就不會再插入。

  當元素出隊時,先移除索引為2的元素3,與入隊一樣,依次類推,移除索引3、4、5…上的元素。這也形成了“先進先出”。

 

二.源碼解析

  1. 構造方法

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        //隊列實現:數組
        final Object[] items;
    
        //當讀取元素時數組的下標(下一個被取出元素的索引)
        int takeIndex;
    
        //添加元素時數組的下標 (下一個被添加元素的索引)
        int putIndex;
    
        //隊列中元素個數:
        int count;
    
        //可重入鎖:
        final ReentrantLock lock;
    
        //入隊操作時是否讓線程等待
        private final Condition notEmpty;
    
        //出隊操作時是否讓線程等待
        private final Condition notFull;
    
        /**
         * 初始化隊列容量構造:由於公平鎖會降低隊列的性能,因而使用非公平鎖(默認)。
         */
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        //帶初始容量大小和公平鎖隊列(公平鎖通過ReentrantLock實現):
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    }
    •  在多線程中,默認不保證線程公平的訪問隊列;

    •  在ArrayBlockingQueue中為了保證數據的安全,使用了ReentrantLock鎖。由於鎖的引入,導致了線程之間的競爭。當有一個線程獲取到鎖時,其餘線程處於等待狀態。當鎖被釋放時,所有等待線程為奪鎖而競爭;

    • 鎖有公平鎖和非公平鎖:

      •  公平鎖:等待的線程在獲取鎖而競爭時,按照等待的先後順序FIFO進行獲取操作;公平鎖可以應用在比如併發下的日誌輸出隊列中,保證了日誌輸出的順序完整性;
        •  優點:等待鎖的線程不會餓死,和非公平鎖相比,在獲得鎖和保證鎖分配的均衡性差異較小;
        • 缺點:使用公平鎖的程序在多線程訪問時表現為很低的吞吐量(即速度很慢),等待隊列中除第一個線程以外的所有線程都會阻塞,CPU喚醒阻塞線程的開銷比非公平鎖的大;公平鎖不能保證線程調度的公平性,因此,使用公平鎖的眾多線程中的一員可能獲得多倍的成功機會,這種情況發生在其他活動線程沒有被處理並且目前並未持有鎖時【ReentrantLock源碼對公平鎖的定義】;
           Note however, that fairness of locks does not guarantee
           fairness of thread scheduling. Thus, one of many threads using a
           fair lock may obtain it multiple times in succession while other
           active threads are not progressing and not currently holding the
           lock.
          •  上面這句話有重入鎖的概念,一個線程可以在已經獲取鎖的情況下再次進入獲取到鎖,不需要競爭;同時,如果一個線程獲取到了鎖,然後釋放,在其他線程來獲取之前再次是可以獲取到鎖的。
            A: Request Lock -> Release Lock -> Request Lock Again (Succeeds) 
                                                   B: Request Lock (Denied)... 
            -----------------------   Time   --------------------------------->
      •  非公平鎖:在獲取鎖時,無論是先等待還是后等待的線程,均有可能獲取到鎖。即根據搶佔機制,是隨機獲取鎖的,和公平鎖不一樣的是先來的不一定能獲取到鎖,有可能一直拿不到鎖,這樣會造成“飢餓”現象;
        • 優點:非公平鎖性能高於公平鎖性能。首先,在恢復一個被掛起的線程與該線程真正運行之間存在着嚴重的延遲,而且,非公平鎖更能充分的利用CPU的時間片,盡量減少CPU空閑的狀態時間;即可以減少喚起線程的開銷,整體的吞吐效率高,因為線程有幾率不阻塞直接獲取到鎖,CPU不必喚醒其他所有線程;
        • 缺點:處於等待隊列中的線程可能會餓死或者等很久才會獲得鎖;
      • 產生“飢餓”的原因:
        • 高優先級吞噬所有低優先級的CPU時間片,優先級越高,就會獲得越高的CPU執行機會; —> 使用默認的優先級;
        • 線程被永久阻塞在一個等待進入同步塊synchronized的狀態(長時間執行) ,同時synchronized並不保障等待線程的順序(鎖釋放后,隨機競爭,由OS調度),這會存在一個可能是某個線程總是搶鎖搶不到導致一直等待狀態 —> 避免持有鎖的線程長時間執行、使用显示lock來代替synchronized;
          synchronized(obj) {
                  while (true) {
               // .... infinite loop
               }
        •  等待的線程永遠不被喚醒:如果多個線程處在wait方法執行上,而對其調用notify方法不會保證哪一個線程會獲得喚醒,喚醒是無序的,跟VM/OS調度有關,甚至底層是隨機選取一個或是隊列中的第一個,任何線程都有可能處於繼續等待的狀態,因此存在這樣一個風險,即一個等待線程從來得不到喚醒,因為其他等待線程總是能被獲得喚醒 —> 使用显示lock來代替synchronized;
      •  比如ReentrantLock:
        •  在公平鎖中,如果有另一個線程持有鎖或者有其他線程在等待隊列中等待這個鎖,那麼新發出的請求的線程將被放入到隊列中;
        • 非公平鎖中, 根據搶佔機制,擁有鎖的線程在釋放鎖資源的時候, 新發出請求的線程可以和等待隊列中的第一個線程競爭鎖資源, 新線程競爭失敗才放入隊列中,但是已經進入等待隊列的線程, 依然是按照先進先出的順序獲取鎖資源;
  2. 入隊:有阻塞式和非阻塞式

    1. 阻塞式:當隊列中的元素已滿時,則會將此線程停止,讓其處於等待狀態,直到隊列中有空餘位置產生

      public void put(E e) throws InterruptedException {
              checkNotNull(e);
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();//獲取鎖
              try {
                  //隊列中元素 == 數組長度(隊列滿了),則線程等待
                  while (count == items.length)
                      notFull.await();
                  enqueue(e);//元素加入隊列
              } finally {
                  lock.unlock();//釋放鎖
              }
          }
      • lockInterruptibly:
        • 如果當前線程未被中斷,則獲取鎖。
        • 如果該鎖沒有被另一個線程保持,則獲取該鎖並立即返回,將鎖的保持計數設置為 1。
        • 如果當前線程已經保持此鎖,則將保持計數加 1,並且該方法立即返回。
        • 如果鎖被另一個線程保持,則出於線程調度目的,禁用當前線程,並且在發生以下兩種情況之一以前,該線程將一直處於休眠狀態:1)鎖由當前線程獲得;2)其他某個線程中斷當前線程
    2. 非阻塞式:當隊列中的元素已滿時,並不會阻塞此線程的操作,而是讓其返回又或者是拋出異常

      public boolean add(E e) {
              return super.add(e);// AbstractQueue.add
          }
          public boolean add(E e) {
              if (offer(e))//調用實現接口
                  return true;
              else
                  throw new IllegalStateException("Queue full");
          }
          public boolean offer(E e) {
              checkNotNull(e);//檢測是否有空指針異常
              final ReentrantLock lock = this.lock;//獲得鎖對象
              lock.lock();//加鎖
              try {
                  //如果隊列滿了,返回false
                  if (count == items.length)
                      return false;
                  else {
                      //元素加入隊列
                      enqueue(e);
                      return true;
                  }
              } finally {
                  lock.unlock();//釋放鎖
              }
          }
          private void enqueue(E x) {
              // assert lock.getHoldCount() == 1;
              // assert items[putIndex] == null;
              //獲得數組
              final Object[] items = this.items;
              //槽位填充元素
              items[putIndex] = x;
              //獲得下一個被添加元素的索引,如果值等於數組長度,表示到達尾部了,需要從頭開始填充
              if (++putIndex == items.length)
                  putIndex = 0;
              count++;//數量+1
              notEmpty.signal();//喚醒出隊上的等待線程,表示有元素可以消費了
          }
      • enqueue中++putIndex == items.length,putIndex=0:這是因為當前隊列執行元素出隊時總是從隊列頭部獲取,而添加元素的索引從隊列尾部獲取所以當隊列索引(從0開始)與數組長度相等時,下次我們就需要從數組頭部開始添加了
    3. 阻塞式和非阻塞式的結合:offer(E e, long timeout, TimeUnit unit),向隊列尾部添加元素,可以設置線程等待時間,如果超過指定時間隊列還是滿的,則返回false;

      public boolean offer(E e, long timeout, TimeUnit unit)
              throws InterruptedException {
      
              checkNotNull(e);//檢測是否為空
              long nanos = unit.toNanos(timeout);//轉換成超時時間閥值
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();//加鎖
              try {
                  //隊列是否滿了的判斷
                  while (count == items.length) {
                      if (nanos <= 0)//等待超時結束返回false
                          return false;
                      nanos = notFull.awaitNanos(nanos);//隊列滿了,等待出隊有空位填充
                  }
                  enqueue(e);//加入隊列中
                  return true;
              } finally {
                  lock.unlock();//釋放鎖
              }
          }
  3. 出隊:同樣有阻塞式和非阻塞式

    1. 阻塞式:當隊列中的元素已空時,則會將此線程停止,讓其處於等待狀態,直到隊列中有元素插入

      public E take() throws InterruptedException {
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  //隊列為空,進行等待
                  while (count == 0)
                      notEmpty.await();
                  return dequeue();//返回出隊元素
              } finally {
                  lock.unlock();
              }
          }
    2. 非阻塞式:當隊列中的元素已滿時,並不會阻塞此線程的操作,而是讓其返回null或元素【裏面的迭代器比較複雜,留待下文探究】

      public E poll() {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  //隊列為空,返回null,否則返回元素
                  return (count == 0) ? null : dequeue();
              } finally {
                  lock.unlock();
              }
          }
          private E dequeue() {
              // assert lock.getHoldCount() == 1;
              // assert items[takeIndex] != null;
              final Object[] items = this.items;//獲得隊列
              @SuppressWarnings("unchecked")
              E x = (E) items[takeIndex];//獲得出隊元素
              items[takeIndex] = null;//出隊槽位元素置為null
              //下一個被取出元素的索引+1,如果值等於長度,表示後面沒有元素了,需要從頭開始取出
              if (++takeIndex == items.length)
                  takeIndex = 0;
              count--;//數量-1
              if (itrs != null)//迭代器不為空
                  itrs.elementDequeued();//同時更新迭代器中的元素數據
              notFull.signal();//喚醒入隊線程
              return x;//返回出隊元素
          }
    3. 阻塞式和非阻塞式的結合:poll(long timeout, TimeUnit unit),出隊獲取元素,可以設置線程等待時間,如果超過指定時間隊列還是空的,則返回null;

      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
              long nanos = unit.toNanos(timeout);//轉換成超時時間閥值
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();//加鎖
              try {
                  while (count == 0) {//隊列空了,等待
                      if (nanos <= 0)//超時了返回null
                          return null;
                      nanos = notEmpty.awaitNanos(nanos);//等待入隊填充元素
                  }
                  return dequeue();//返回出隊元素
              } finally {
                  lock.unlock();//釋放鎖
              }
          }
  4. 移除元素remove:

    public boolean remove(Object o) {
            //要移除的元素為空返回false
            if (o == null) return false;
            //獲得隊列數組
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();//加鎖
            try {
                //隊列有元素
                if (count > 0) {
                    final int putIndex = this.putIndex;//獲得下一個被添加元素的索引
                    int i = takeIndex;//下一個被取出元素的索引
                    do {
                        if (o.equals(items[i])) {//從takeIndex下標開始,找到要被刪除的元素
                            removeAt(i);//移除
                            return true;
                        }
                        if (++i == items.length)//下一個被取出元素的索引+1並判斷是否等於隊列長度,如果是,表示需要從頭開始遍歷
                            i = 0;
                    } while (i != putIndex);//繼續查找,直到找到最後一個元素
                }
                return false;
            } finally {
                lock.unlock();//解鎖
            }
        }
    
      /**
       * 根據下標移除元素,那麼會分成兩種情況一個是移除的是隊首元素,一個是移除的是非隊首元素,移除隊首元素,就相當於出隊操作,
       * 移除非隊首元素那麼中間就有空位了,後面元素需要依次補上,然後如果是隊尾元素,那麼putIndex也就是插入操作的下標也就需要跟着移動。
       */
        void removeAt(final int removeIndex) {
            // assert lock.getHoldCount() == 1;
            // assert items[removeIndex] != null;
            // assert removeIndex >= 0 && removeIndex < items.length;
            final Object[] items = this.items;//獲得隊列
            if (removeIndex == takeIndex) {//移除的是隊首元素
                // removing front item; just advance
                items[takeIndex] = null;//隊首置為null
                if (++takeIndex == items.length)//下一個被取出元素的索引+1並判斷是否等於隊列長度
                    takeIndex = 0;
                count--;//數量-1
                if (itrs != null)//迭代器不為空
                    itrs.elementDequeued();//更新迭代器元素
            } else {//移除的不是隊首元素,而是中間元素
                // an "interior" remove
    
                // slide over all others up through putIndex.
                final int putIndex = this.putIndex;//下一個被添加元素的索引
                for (int i = removeIndex;;) {//對隊列進行遍歷,因為是隊列中間的值被移除了,所有後面的元素都要挨個遷移
                    int next = i + 1;//獲取移除元素的下一個坐標
                    if (next == items.length)//判斷是否等於隊列長度
                        next = 0;
                    if (next != putIndex) {//獲取移除元素的下一個坐標!=下一個被添加元素的索引,表示移除元素的索引後面有值
                        items[i] = items[next];//當前要移除的元素置為後面的元素,即對後面的元素往前遷移,覆蓋要移除的元素
                        i = next;//下一個遷移的索引
                    } else {//移除的元素是最後一個,後面沒有值了
                        items[i] = null;//移除元素,直接置為null
                        this.putIndex = i;//更新下一個被添加元素的索引
                        break;//結束
                    }
                }
                count--;//數量-1
                if (itrs != null)//迭代器不為空
                    itrs.removedAt(removeIndex);//更新迭代器元素
            }
            notFull.signal();//喚醒入隊線程,可以添加元素了
        }
  5. 清空元素clear:用於清空ArrayBlockingQueue,並且會釋放所有等待notFull條件的線程(存放元素的線程)

    public void clear() {
            final Object[] items = this.items;//獲得隊列
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int k = count;//獲取元素數量
                if (k > 0) {//有元素,表示隊列不為空
                    final int putIndex = this.putIndex;//下一個被添加元素的索引
                    int i = takeIndex;//下一個被取出元素的索引
                    do {
                        items[i] = null;//對每個有元素的槽位置為null
                        if (++i == items.length)
                            i = 0;
                    } while (i != putIndex);//從有元素的第一個槽位開始遍歷,直到槽位元素為null
                    takeIndex = putIndex;//更新取出和添加的索引
                    count = 0;//數量更新為0
                    if (itrs != null)//迭代器不為空
                        itrs.queueIsEmpty();//更新迭代器為空
                    //若有等待notFull條件的線程,則逐一喚醒
                    for (; k > 0 && lock.hasWaiters(notFull); k--)
                        notFull.signal();//喚醒入隊線程,可以添加元素了
                }
            } finally {
                lock.unlock();
            }
        }
  6. offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)裏面有awaitNanos,下面探討該功能實現:對當前線程或等待的入/出隊線程進行掛起,如果有入/出隊操作進行了喚醒出/入隊操作,則acquireQueued自旋獲取到鎖,然後出/入隊中的ReentrantLock是重入鎖,可以重入獲取到鎖進行出/入隊操作

        AbstractQueuedSynchronizer:
        //進行超時控制
        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
            //如果當前線程中斷了拋出中斷異常
            if (Thread.interrupted())
                throw new InterruptedException();
            //當前線程加入到Condition隊列中
            Node node = addConditionWaiter();
            //鎖釋放是否成功:釋放當前線程的lock,從AQS的隊列中移出
            int savedState = fullyRelease(node);
            //到達等待時間點
            final long deadline = System.nanoTime() + nanosTimeout;
            //中斷標識
            int interruptMode = 0;
            //當前節點是否在同步隊列中,否表示不在,進入掛起判斷操作,如果已經在Sync隊列中,則退出循環
            //那什麼時候會把當前線程又加入到Sync隊列中呢?當然是調用signal方法的時候,因為這裏需要喚醒之前調用await方法的線程,喚醒之後進行下面的獲取鎖等操作
            while (!isOnSyncQueue(node)) {
                //如果超時了,將線程掛起,然後停止遍歷
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                //如果等待時間間隔超過了1000,繼續掛起
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //線程中斷了停止遍歷
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                //獲得剩餘的等待時間間隔
                nanosTimeout = deadline - System.nanoTime();
            }
            //結束掛起,acquireQueued自旋對當前線程的隊列出隊進行獲取鎖並返回線程是否中斷
            //如果線程被中斷,並且中斷的方式不是拋出異常,則設置中斷後續的處理方式設置為REINTERRUPT
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;//中斷標識更新為退出等待時重新中斷
            if (node.nextWaiter != null)//當前節點後面還有節點,多併發操作了
                unlinkCancelledWaiters();//從頭到尾遍歷Condition隊列,移除被cancel的節點
            //如果線程已經被中斷,則根據之前獲取的interruptMode的值來判斷是繼續中斷還是拋出異常
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();//返回剩餘等待時間
        }
  7. drainTo可以一次性獲取隊列中所有的元素,它減少了鎖定隊列的次數,使用得當在某些場景下對性能有不錯的提升

    //最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定collection中
        public int drainTo(Collection<? super E> c) {
            return drainTo(c, Integer.MAX_VALUE);
        }
        public int drainTo(Collection<? super E> c, int maxElements) {
            checkNotNull(c);//檢查是否為空
            if (c == this)//如果集合類型相同拋出參數異常
                throw new IllegalArgumentException();
            if (maxElements <= 0)//如果給定移除數量小於0,返回0,表示不做移除操作
                return 0;
            final Object[] items = this.items;//獲得隊列
            final ReentrantLock lock = this.lock;
            lock.lock();//加鎖
            try {
                int n = Math.min(maxElements, count);//獲得元素的最小數量
                int take = takeIndex;//下一個被取出元素的索引
                int i = 0;
                try {
                    while (i < n) {//遍歷移除和添加
                        @SuppressWarnings("unchecked")
                        E x = (E) items[take];//獲得移除元素
                        c.add(x);//元素添加到直到集合中
                        items[take] = null;//元素原先隊列位置置為null
                        if (++take == items.length)//如果取出索引到達尾部,從頭開始遍歷取出
                            take = 0;
                        i++;//移除的數量+1,如果達到了移除的最小數量,結束遍歷
                    }
                    return n;//返回一共移除並添加了多少個元素
                } finally {
                    // Restore invariants even if c.add() threw
                    if (i > 0) {//如果有移除操作
                        count -= i;//隊列元素數量-i
                        takeIndex = take;//重置下一個被取出元素的索引
                        if (itrs != null) {//迭代器不為空
                            if (count == 0)//隊列空了
                                itrs.queueIsEmpty();//迭代器清空
                            else if (i > take)//說明take中間變成0了,通知itr
                                itrs.takeIndexWrapped();
                        }
                        //喚醒在因為隊列滿而等待的入隊線程,最多喚醒i個,避免線程被喚醒了因為隊列又滿了而阻塞
                        for (; i > 0 && lock.hasWaiters(notFull); i--)
                            notFull.signal();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

 

三.Logback 框架中異步日誌打印中ArrayBlockingQueue的使用

  1. 在高併發並且響應時間要求比較小的系統中同步打日誌已經滿足不了需求了,這是因為打日誌本身是需要同步寫磁盤的,會造成 響應時間 增加,如下圖同步日誌打印模型為:

  2. 異步模型是業務線程把要打印的日誌任務寫入一個隊列后直接返回,然後使用一個線程專門負責從隊列中獲取日誌任務寫入磁盤,其模型具體如下圖:

    • 如圖可知其實 logback 的異步日誌模型是一個多生產者單消費者模型,通過使用隊列把同步日誌打印轉換為了異步,業務線程調用異步 appender 只需要把日誌任務放入日誌隊列,日誌線程則負責使用同步的 appender 進行具體的日誌打印到磁盤;
  3. 接下來看看異步日誌打印具體實現,要把同步日誌打印改為異步需要修改 logback 的 xml 配置文件:

    <appender name="PROJECT" class="ch.qos.logback.core.FileAppender">
            <file>project.log</file>
            <encoding>UTF-8</encoding>
            <append>true</append>
    
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <!-- daily rollover -->
                <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern>
                <!-- keep 7 days' worth of history -->
                <maxHistory>7</maxHistory>
            </rollingPolicy>
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>
                    <![CDATA[%n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method} %X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer},
                    ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n  %-5level %logger{35} - %m%n]]>
                </pattern>
            </layout>
        </appender>
    
        <appender name="asyncProject" class="ch.qos.logback.classic.AsyncAppender">
            <discardingThreshold>0</discardingThreshold>
            <queueSize>1024</queueSize>
            <neverBlock>true</neverBlock>
            <appender-ref ref="PROJECT" />
        </appender>
         <logger name="PROJECT_LOGGER" additivity="false">
            <level value="WARN" />
            <appender-ref ref="asyncProject" />
        </logger>
  4. 從上面可知 AsyncAppender 是實現異步日誌的關鍵,下面探究它的原理:

    1. 如上圖可知 AsyncAppender 繼承自 AsyncAppenderBase,其中後者具體實現了異步日誌模型的主要功能,前者只是重寫了其中的一些方法。另外從類圖可知 logback 中的異步日誌隊列是一個阻塞隊列, 後面會知道其實是一個有界阻塞隊列 ArrayBlockingQueue, 其中 queueSize 是有界隊列的元素個數默認為 256;
    2. worker則是工作線程,也就是異步打印日誌的消費者線程,aai則是一個appender的裝飾器,裡邊存放的同步日誌的appender,其中appenderCount記錄aai裡邊附加的同步appender的個數(這個和配置文件相對應,一個異步的appender對應一個同步的appender),neverBlock用來指示當同步隊列已滿時是否阻塞打印日誌線程(如果配置neverBlock=true,當隊列滿了之後,後面阻塞的線程想要輸出的消息就直接被丟棄,從而線程不會阻塞),discardingThreshold是一個閾值,當日誌隊列裡邊的空閑元素個數小於該值時,新來的某些級別的日誌就會直接被丟棄。
  5.  接下來看下何時創建的日誌隊列以及何時啟動的消費線程,這需要看下 AsyncAppenderBase 的 start 方法,該方法是在解析完畢配置 AsyncAppenderBase 的 xml 的節點元素后被調用 :

    public void start() {
            if (isStarted())
                return;
            if (appenderCount == 0) {
                addError("No attached appenders found.");
                return;
            }
            if (queueSize < 1) {
                addError("Invalid queue size [" + queueSize + "]");
                return;
            }
            // 創建一個ArrayBlockingQueue阻塞隊列,queueSize默認為256,創建阻塞隊列的原因是:防止生產者過多,造成隊列中元素過多,產生OOM異常
            blockingQueue = new ArrayBlockingQueue<E>(queueSize);
            // 如果discardingThreshold未定義的話,默認為queueSize的1/5
            if (discardingThreshold == UNDEFINED)
                discardingThreshold = queueSize / 5;
            addInfo("Setting discardingThreshold to " + discardingThreshold);
            // 將工作線程設置為守護線程,即當jvm停止時,即使隊列中有未處理的元素,也不會在進行處理
            worker.setDaemon(true);
            // 為線程設置name便於調試
            worker.setName("AsyncAppender-Worker-" + getName());
            // make sure this instance is marked as "started" before staring the worker Thread
            // 啟動線程
            super.start();
            worker.start();
        }
    1. logback 使用的隊列是有界隊列 ArrayBlockingQueue,之所以使用有界隊列是考慮到內存溢出問題,在高併發下寫日誌的 qps 會很高如果設置為無界隊列隊列本身會佔用很大內存,很可能會造成 內存溢出。
    2. 這裏消費日誌隊列的 worker 線程被設置為了守護線程,意味着當主線程運行結束並且當前沒有用戶線程時候該 worker 線程會隨着 JVM 的退出而終止,而不管日誌隊列裏面是否還有日誌任務未被處理。另外這裏設置了線程的名稱是個很好的習慣,因為這在查找問題的時候很有幫助,根據線程名字就可以定位到是哪個線程。
  6. 既然是有界隊列那麼肯定需要考慮如果隊列滿了,該如何處置,是丟棄老的日誌任務,還是阻塞日誌打印線程直到隊列有空餘元素那?下面看append 方法:

    protected void append(E eventObject) {
            // 判斷隊列中的元素數量是否小於discardingThreshold,如果小於的話,並且日誌等級小於info的話,則直接丟棄這些日誌任務
            if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
                return;
            }
            preprocess(eventObject);
            // 日誌入隊
            put(eventObject);
        }
        private boolean isQueueBelowDiscardingThreshold() {
            return (blockingQueue.remainingCapacity() < discardingThreshold);
        }
    
       // 子類重寫的方法   判斷日誌等級
        protected boolean isDiscardable(ILoggingEvent event) {
            Level level = event.getLevel();
            return level.toInt() <= Level.INFO_INT;
        }    
    • 日誌入隊put:從下面可知如果 neverBlock 設置為 false(默認為 false)則會調用阻塞隊列的 put 方法,而 put 是阻塞的,也就是說如果當前隊列滿了,如果再企圖調用 put 方法向隊列放入一個元素則調用線程會被阻塞直到隊列有空餘空間。這裡有必要提下其中blockingQueue.put(eventObject)當日誌隊列滿了的時候 put 方法會調用 await() 方法阻塞當前線程,如果其它線程中斷了該線程,那麼該線程會拋出 InterruptedException 異常,那麼當前的日誌任務就會被丟棄了。如果 neverBlock 設置為了 true 則會調用阻塞隊列的 offer 方法,而該方法是非阻塞的,如果當前隊列滿了,則會直接返回,也就是丟棄當前日誌任務。
      private void put(E eventObject) {
              // 判斷是否阻塞(默認為false),則會調用阻塞隊列的put方法
              if (neverBlock) {
                  blockingQueue.offer(eventObject);
              } else {
                  putUninterruptibly(eventObject);
              }
      }
          // 可中斷的阻塞put方法
          private void putUninterruptibly(E eventObject) {
              boolean interrupted = false;
              try {
                  while (true) {
                      try {
                          blockingQueue.put(eventObject);
                          break;
                      } catch (InterruptedException e) {
                          interrupted = true;
                      }
                  }
              } finally {
                  if (interrupted) {
                      Thread.currentThread().interrupt();
                  }
              }
          }
  7. 最後看下 addAppender 方法,可以看出,一個異步的appender只能綁定一個同步appender,這個appender會被放入AppenderAttachableImpl的appenderList列表裡邊

    public void addAppender(Appender<E> newAppender) {
            if (appenderCount == 0) {
                appenderCount++;
                addInfo("Attaching appender named [" + newAppender.getName() + "] to AsyncAppender.");
                aai.addAppender(newAppender);
            } else {
                addWarn("One and only one appender may be attached to AsyncAppender.");
                addWarn("Ignoring additional appender named [" + newAppender.getName() + "]");
            }
    }
  8. 通過上面我們已經分析完了日誌生產線程放入日誌任務到日誌隊列的實現,下面一起來看下消費線程是如何從隊列裏面消費日誌任務並寫入磁盤的,由於消費線程是一個線程,那就從 worker 的 run 方法看起(消費者,將日誌寫入磁盤的線程方法):

    class Worker extends Thread {
    
            public void run() {
                AsyncAppenderBase<E> parent = AsyncAppenderBase.this;
                AppenderAttachableImpl<E> aai = parent.aai;
    
                // loop while the parent is started 一直循環知道線程被中斷
                while (parent.isStarted()) {
                    try {// 從阻塞隊列中獲取元素,交由給同步的appender將日誌打印到磁盤
                        E e = parent.blockingQueue.take();
                        aai.appendLoopOnAppenders(e);
                    } catch (InterruptedException ie) {
                        break;
                    }
                }
    
                addInfo("Worker thread will flush remaining events before exiting. ");
                //執行到這裏說明該線程被中斷,則把隊列裡邊的剩餘日誌任務刷新到磁盤
                for (E e : parent.blockingQueue) {
                    aai.appendLoopOnAppenders(e);
                    parent.blockingQueue.remove(e);
                }
    
                aai.detachAndStopAllAppenders();
            }
        }
    • try邏輯中從日誌隊列使用 take 方法獲取一個日誌任務,如果當前隊列為空則當前線程會阻塞到 take 方法直到隊列不為空才返回,獲取到日誌任務後會調用 AppenderAttachableImpl 的 aai.appendLoopOnAppenders 方法,該方法會循環調用通過 addAppender 注入的同步日誌 appener 具體實現日誌打印到磁盤的任務。

四.參考:

  1. 公平鎖的使用場景:https://stackoverflow.com/questions/26455578/when-to-use-fairness-mode-in-java-concurrency
  2. 公平鎖和非公平鎖的區別的提問:https://segmentfault.com/q/1010000006439146
  3. 公平鎖不能保證線程調度的公平性:https://stackoverflow.com/questions/60903107/understanding-fair-reentrantlock-in-java
  4. logback異步日誌打印中的ArrayBlockingQueue的使用:https://my.oschina.net/u/4410397/blog/3428573

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案