Flink中異步AsyncIO的實現 (源碼分析)

先上張圖整體了解Flink中的異步io

 

阿里貢獻給flink的,優點就不說了嘛,官網上都有,就是寫庫不會柱塞性能更好

然後來看一下, Flink 中異步io主要分為兩種

  一種是有序Ordered

  一種是無序UNordered

主要區別是往下游output的順序(注意這裏順序不是寫庫的順序既然都異步了寫庫的順序自然是無法保證的),有序的會按接收的順序繼續往下游output發送,無序就是誰先處理完誰就先往下游發送

兩張圖了解這兩種模式的實現

 

有序:record數據會通過異步線程寫庫,Emitter是一個守護進程,會不停的拉取queue頭部的數據,如果頭部的數據異步寫庫完成,Emitter將頭數據往下游發送,如果頭元素還沒有異步寫庫完成,柱塞      

無序:record數據會通過異步線程寫庫,這裡有兩個queue,一開始放在uncompleteedQueue,當哪個record異步寫庫成功后就直接放到completedQueue中,Emitter是一個守護進程,completedQueue只要有數據,會不停的拉取queue數據往下游發送 

    

可以看到原理還是很簡單的,兩句話就總結完了,就是利用queue和java的異步線程,現在來看下源碼

這裏AsyncIO在Flink中被設計成operator中的一種,自然去OneInputStreamOperator的實現類中去找

於是來看一下AsyncWaitOperator.java

  

看到它的open方法(open方法會在taskmanager啟動job的時候全部統一調用,可以翻一下以前的文章)

這裏啟動了一個守護線程Emitter,來看下線程具體做了什麼

 

 1處拉取數據,2處就是常規的將拉取到的數據往下游emit,Emitter拉取數據,這裏先不講因為分為有序的和無序的

 這裏已經知道了這個Emitter的作用是循環的拉取數據往下游發送

 回到AsyncWaitOperator.java在它的open方法初始化了Emitter,那它是如何處理接收到的數據的呢,看它的ProcessElement()方法

 

    

 

 其實主要就是三個個方法

先是!!!將record封裝成了一個包裝類StreamRecordQueueEntry,主要是這個包裝類的構造方法中,創建了一個CompleteableFuture(這個的complete方法其實會等到用戶代碼執行的時候用戶自己決定什麼時候完成)

1處主要就是講元素加入到了對應的queue,這裏也分為兩種有序和無序的

 

這裏也先不講這兩種模式加入數據的區別

接着2處就是調用用戶的代碼了,來看看官網的異步io的例子

 

 給了一個Future作為參數,用戶自己起了一個線程(這裏思考一下就知道了為什麼要新起一個異步線程去執行,因為如果不起線程的話,那processElement方法就柱塞了,無法異步了)去寫庫讀庫等,然後調用了這個參數的complete方法(也就是前面那個包裝類中的CompleteableFuture)並且傳入了一個結果

看下complete方法源碼

 

 這個resultFuture是每個record的包裝類StreamRecordQueueEntry的其中一個屬性是一個CompletableFuture

 那現在就清楚了,用戶代碼在自己新起的線程中當自己的邏輯執行完以後會使這個異步線程結束,並輸入一個結果

 那這個幹嘛用的呢

 

最開始的圖中看到有序和無序實現原理,有序用一個queue,無序用兩個queue分別就對應了

OrderedStreamElementQueue類中

 

 UnorderedStreamElementQueue類中

 

回到前面有兩個地方沒有細講,一是兩種模式的Emitter是如何拉取數據的,二是兩種模式下數據是如何加入OrderedStreamElementQueue的

有序模式:

1.先來看一下有序模式的,Emitter的數據拉取,和數據的加入

    其tryPut()方法

      

      

     onComplete方法

       

       onCompleteHandler方法

        

  這裏比較繞,先將接收的數據加入queue中,然後onComplete()中當上一個異步線程getFuture() 其實就是每個元素包裝類裏面的那個CompletableFuture,當他結束時(會在用戶方法用戶調用complete時結束)異步調用傳入的對象的 accept方法,accept方法中調用了onCompleteHandler()方法,onCompleteHandler方法中會判斷queue是否為空,以及queue的頭元素是否完成了用戶的異步方法,當完成的時候,就會將headIsCompleted這個對象signalAll()喚醒

 

2.接着看有序模式Emitter的拉取數據

       

   這裡有序方式拉取數據的邏輯很清晰,如果為空或者頭元素沒有完成用戶的異步方法,headIsCompleted這個對象會wait住(上面可以知道,當加入元素的到queue且頭元素完成異步方法的時候會signalAll())然後將頭數據返回,往下游發送

 

這樣就實現了有序發送,因為Emitter只拉取頭元素且已經完成用戶異步方法的頭元素

 

無序模式: 

  這裏和有序模式就大同小異了,只是變成了,接收數據后直接加入uncompletedQueue,當數據完成異步方法的時候就,放到completedQueue裏面去並signalAll(),只要completedqueue裏面有數據,Emitter就拉取往下發

 

這樣就實現了無序模式,也就是異步寫入誰先處理完就直接放到完成隊列裏面去,然後往下發,不用管接收數據的順序

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益