環境資訊中心綜合外電;姜唯 編譯;林大利 審校
本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※教你寫出一流的銷售文案?
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
※回頭車貨運收費標準
※別再煩惱如何寫文案,掌握八大原則!
※超省錢租車方案
※產品缺大量曝光嗎?你需要的是一流包裝設計!
摘錄自2020年8月4日中央社報導
美國農業部(USDA)動植物健康檢查局副局長艾爾西(Osama El-Lissy)表示,截至7月29日止,已鑑定出芥菜、高麗菜、牽牛花、薄荷、鼠尾草、迷迭香、薰衣草等植物種子,另有木棉花、玫瑰種子。目前鑑定出的品種雖屬無害,但植物專家警告,外來種子有可能損害農作物。
美國哥倫比亞廣播公司新聞網(CBS News)報導,全美50州都有民眾通報收到可疑種子包裹。美國農業部已提醒民眾收到後千萬不要栽種,應立刻向當地的農政單位通報。德州農業廳長米勒(Sid Miller)呼籲民眾保持警覺,「那可能是細菌,也可能是病毒或某個外來種植物」。
維吉尼亞州的農業官員警告:「外來物種會造成環境浩劫,取代或破壞原生植物及昆蟲,嚴重損害農作物。降低外來物種入侵並大量繁殖的風險並減少相關管控成本,最有效的方法就是採取措施、預防入侵物種的引入。」
愛荷華州農業暨土地管理廳的種子檢驗官員普魯伊斯納(Robin Pruisner)表示,有通報指出種子上可能含有殺蟲或除黴劑,對農作物危害甚大。
生物多樣性
國際新聞
美國
種子
外來物種
入侵植物
外來種植物
本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※超省錢租車方案
※別再煩惱如何寫文案,掌握八大原則!
※回頭車貨運收費標準
※教你寫出一流的銷售文案?
※產品缺大量曝光嗎?你需要的是一流包裝設計!
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
通過收集自動售貨機系統的銷售數據,EI數據分析售貨銷量狀況。
該場景主要描述的是設備可以通過MQTT協議與物聯網平台進行交互,應用側可以到物聯網平台訂閱設備側變化的通知,用戶可以在控制台或通過應用側接口創建數據轉發規則,把設備上報的屬性轉發給其他華為雲服務。
核心知識點:產品模型、編輯碼插件、訂閱推送、屬性上報、MQTT協議、數據轉發規則。
流程解釋:
1、創建自動售貨機產品:物聯網平台以產品為粒度管理批量設備。用戶可以通過平台提供的API接口或控制台創建產品。
2、上傳產品模型:產品模型是定義一種設備的基本屬性和命令。產品模型可以通過控制台,也可以導入公共產品庫的模型。該場景沒有編解碼插件,是因為設備是基於安卓操作系統開發的,能夠通過MQTT協議與平台進行交互。
3、批量註冊自動售貨機設備:平台提供了應用側API接口可以註冊設備,也可以通過控制台批量註冊。註冊設備時獲取的設備ID,是設備側與平台交互的唯一標識。
4、創建自動售貨機設備狀態變化的訂閱:售貨管理系統可以在平台創建設備變化的通知訂閱,需要把callback url即應用回調地址傳給平台,平台後續會推送通知到該url。
5、設備建鏈:MQTT設備是指通過MQTT協議,不論是集成了華為IoT Device SDK,還是原生MQTT協議接入,只要是json數據格式傳輸給平台,平台就無需使用編解碼插件。如果是二進制上傳,則需要先做編解碼插件的開發。MQTT是長連接,需要先建鏈才能進行數據傳輸,可以通過安全加密方式8883端口接入(推薦),也可以通過非安全加密方式1883端口接入。
6、推送自動售貨機設備激活通知:平台會根據之前應用訂閱的回調地址,把自動售貨機設備上線的通知類型通過HTTP/HTTPS推送回去。
7、創建數據轉發規則:售貨管理系統可以通過API接口創建規則,也可以通過控制台創建,指定過濾指定的屬性,給指定的通道轉發數據。
8、開通DIS通道/MRS服務:華為公有雲上有豐富的SaaS服務和PaaS服務,供您結合自己的業務需要進行組合使用。DIS服務提供高效採集、傳輸、分發能力,支持多種IoT協議,可以開通該服務,通過IoTDA規則引擎,把自動售貨機設備的數據轉發給DIS,然後再利用諸如MRS服務,實現自動售貨銷量狀況數據分析。
9、自動售貨機屬性上報:設備側可以通過SDK或MQTT原生協議接入平台,屬性上報銷售信息。這裏值得注意的是,設備側上報的數據,是通過屬性上報,與消息上報最大的區別在於是否經過產品模型。屬性上報的內容與格式都要跟產品模型定義保持一致。具體概念介紹可以參閱“物模型”。
10、按規則數據轉發:平台收到設備上報的屬性后,規則引擎會進行過濾(不論屬性還是消息,平台都會做規則過濾),把設定好的屬性值轉發到指定的DIS通道,然後再通過DIS的接口,由MRS去消費DIS的數據,實現對銷量的分析。
物聯網解決方案中,作為數據主體的“物”可能數量會非常大,產生的數據已經無法通過傳統的數據處理服務進行處理。如何分析與利用這龐大的物聯網設備數據對物聯網企業來說又是一個新的挑戰。
華為雲物聯網平台提供規則引擎能力,支持將數據上報的數據轉發至華為雲其他雲服務,可實現將海量數據通過數據接入服務(DIS)轉發至MapReduce服務(MRS),對數據進行處理后再由數據可視化服務(DLV)讀取數據呈現為可視化報表,實現數據的一站式採集、處理和分析。
在本示例中,我們實現下述場景:
自動售貨機每次銷售商品後上報銷售商品種類、數量、時間和所屬區域到物聯網平台,物聯網平台將數據通過數據接入服務轉發至MapReduce服務,MapReduce服務處理數據並寫為統計文件,數據可視化服務從統計文件讀取數據展現為四個維度的銷售報表。
創建集群,用於存儲和處理DIS轉儲的數據。
登錄華為雲官方網站,訪問MapReduce服務。
單擊“立即購買”,創建集群,以下配置僅為樣例。
注:下圖以新版自定義購買界面為例,需要在“購買集群”界面點擊右上角的“點擊體驗新版”,然後選擇“自定義購買”。
|
參數名稱 |
說明 |
|---|---|
|
軟件配置 |
|
|
當前區域 |
保持默認。 |
|
集群名稱 |
自定義或保持默認。 |
|
集群版本 |
保持默認。 |
|
集群類型 |
分析集群。組件勾選Spark,系統會自動勾選Hive和Tez。“Hive使用外部數據源存儲元數據”保持關閉。 |
|
Kerberos認證 |
關閉。 |
|
用戶名 |
固定為“admin”不可修改。 |
|
密碼 |
自定義。 |
|
確認密碼 |
|
|
硬件配置 |
|
|
計費模式 |
按實際使用需求選擇,本示例中選擇“按需計費”。 |
|
網絡配置 |
全部保持默認。 |
|
實例 |
為節省實驗費用,可修改分析Core的實例數量為1,其餘保持默認值。密碼自定義。 |
|
高級配置均保持默認。 |
|
3.集群創建成功后,等待15到30分鐘,集群狀態變更為“運行中”則表示創建成功。
登錄華為雲官方網站,訪問對象存儲服務。
單擊“管理控制台”進入對象存儲服務管理控制台。
單擊頁面右上角的“創建桶”,根據需求選擇桶規格后,單擊“立即創建”。
創建通道並配置轉儲任務,實現將設備管理服務傳入DIS的數據轉發至MRS。
登錄華為雲官方網站,訪問數據接入服務。
單擊“立即購買”,購買接入通道,以下配置僅為樣例。
|
參數名稱 |
說明 |
|---|---|
|
區域 |
保持默認。 |
|
通道名稱 |
自定義或保持默認。 |
|
通道類型 |
保持默認值“普通”。 |
|
分區數量 |
按需填寫。 |
|
生命周期 |
|
|
源數據類型 |
選擇“JSON”。 |
|
自動擴縮容 |
保持關閉。 |
|
Schema開關 |
|
|
高級配置 |
保持默認。 |
3.通道購買成功后,進入DIS控制台“接入管理 > 通道管理”頁面。
4.單擊需要查看的通道名稱,進入所選通道的管理頁面,選擇“轉儲管理”頁簽。
5.單擊“添加轉儲任務”按鈕。
6.在彈出的“添加轉儲任務”頁面配置轉儲相關配置項。
|
參數名稱 |
說明 |
|---|---|
|
源數據類型 |
默認為通道源數據類型 |
|
轉儲服務類型 |
選擇“MRS”。 |
|
任務名稱 |
自定義,如“iot_to_mrs”。 |
|
轉儲文件格式 |
選擇“Text”。 |
|
MRS集群 |
選擇已創建成功的MRS集群。 |
|
HDFS路徑 |
選擇轉儲文件要存儲的路徑,建議選擇“/user”。 |
|
轉儲文件目錄 |
自定義轉儲文件存放的文件夾名稱,本示例中為“temp”。 |
|
偏移量 |
選擇“最新”。 |
|
數據轉儲周期 |
本示例中修改為“60”。 |
|
數據臨時桶 |
選擇已創建的OBS桶。 |
|
數據臨時目錄 |
自定義,本示例中為“temp”。 |
7.單擊“立即創建”。
在設備接入服務中創建產品模型、註冊設備並設置數據轉發規則,實現當設備上報數據時將數據轉發至DIS。
登錄華為雲官方網站,訪問設備接入服務。
單擊“立即使用”進入設備接入控制台。
單擊“規則 > 創建規則 > 數據轉發”,首次創建對接到DIS服務的規則時,平台會根據對接的雲服務和區域彈出對應的雲服務訪問授權窗口。
4.單擊左側導航欄的“產品”,單擊右上角下拉框,選擇新建產品所屬的資源空間。
注:本文中使用的產品模型和設備僅為示例,您可以使用自己的產品模型和設備進行操作。
5.單擊右上角的“創建產品”,創建一個基於MQTT協議的產品,填寫參數后,單擊“確認”。
|
基本信息 |
|
|
產品名稱 |
自定義,如MQTT_Device |
|
協議類型 |
選擇“MQTT” |
|
數據格式 |
選擇“JSON” |
|
廠商名稱 |
自定義 |
|
功能定義 |
|
|
選擇模型 |
請參考步驟6導入模型即可。 |
|
所屬行業 |
根據實際情況進行填寫。 |
|
設備類型 |
|
6.在功能定義頁面,單擊“上傳模型文件”,單擊Profile.zip,獲取產品模型文件樣例。
7.進入“設備 > 設備註冊”頁面,單擊“註冊設備”,參考下錶填寫參數。
|
參數名稱 |
說明 |
|---|---|
|
所屬產品 |
選擇在步驟5中創建的產品。 |
|
設備標識碼 |
設備唯一物理標識,如IMEI、MAC地址等,用於設備在接入物聯網平台時攜帶該標識信息完成接入鑒權。
|
|
設備名稱 |
自定義。 |
|
設備認證類型 |
選擇“密鑰”。 |
|
密鑰 |
設備密鑰,可自定義,不填寫物聯網平台會自動生成。 |
填寫完成后單擊“確定”,請注意保存註冊成功返回的“設備ID”和“設備密鑰”。
8.單擊左側導航欄的“規則”,單擊右上角的“創建規則”,選擇“數據轉發”。
9.填寫規則內容,規則名稱自定義,“數據類型”選擇“JSON”,轉發至“數據接入服務(DIS)”,“區域”選擇您開通OBS的區域,“通道”選擇您創建的桶,填寫完成后單擊“創建規則”。
配置數據可視化服務,新建數據報表視圖。
登錄華為雲官方網站,訪問數據可視化服務。
單擊“進入控制台”。
注:若您未開通DLV服務,可單擊“體驗試用”獲取30天的基礎版免費試用。
訪問DLV控制台“我的大屏”頁面,新建一個大屏。
4.選擇空白模板,輸入大屏名稱后,單擊“創建大屏”。
5.單擊“文本 > 標題”新增一個標題。
6.在右側“數據”面板修改靜態數據中“value”的值為“每日銷量”。
7.在大屏內拖動標題到左上角,並拉伸成合適的形狀。
8.單擊“常用圖表 > 線狀圖”新增一個線狀圖報表。
9.拖動圖表到標題下面並拉伸成合適的形狀。
10.重複以上步驟再添加一個標題為“時間段銷量”柱狀圖,一個標題為“種類銷量”的餅狀圖,一個標題為“地區銷量”的區域排行圖,並根據自己的需要設置圖表的樣式。最終效果類似下圖。
11.單擊頁面右上角的返回按鈕退出編輯頁面。
1.首先控制設備上報10條數據。
您可以使用配置設備接入服務時註冊的真實設備接入平台,上報數據。
您也可以使用模擬器模擬設備上報數據,操作方法請參考通過MQTT.fx體驗設備接入。
上報數據的樣例如下,請自行修改參數的取值模擬真實設備數據:
樣例1
{
"msgType": "deviceReq",
"data": [{
"serviceId": "sales",
"serviceData": {
"category": "soda",
"number": "1",
"area": "SZLH",
"timeStamp": "20190425T091157Z"
}
}]
}
上述樣例表示UTC時間2019年4月25日9點11分57秒深圳羅湖的自動販賣機銷售了一支蘇打飲料。
樣例2
{
"msgType": "deviceReq",
"data": [{
"serviceId": "sales",
"serviceData": {
"category": "juice",
"number": "2",
"area": "SZFT"
"timeStamp": "20190426T170005Z"
}
}]
}
上述樣例表示UTC時間2019年4月26日17點05秒深圳福田的自動販賣機銷售了兩支果汁飲料。
本文以上報下錶的數據為例。
|
category |
number |
area |
timeStamp |
|---|---|---|---|
|
soda |
1 |
SZLH |
20190425T091157Z |
|
juice |
1 |
SZFT |
20190425T121511Z |
|
sport |
1 |
SZLH |
20190425T172433Z |
|
juice |
2 |
SZFT |
20190426T170005Z |
|
soda |
1 |
SZNS |
20190426T190905Z |
|
juice |
1 |
SZNS |
20190427T085959Z |
|
juice |
2 |
SZLH |
20190427T111111Z |
|
soda |
3 |
SZFT |
20190428T182215Z |
|
sport |
1 |
SZLH |
20190429T205901Z |
|
soda |
1 |
SZLG |
20190430T225045Z |
2.登錄MRS管理控制台,選擇“集群列表 > 現有集群”,單擊集群名進入集群管理頁面。
3.單擊頁面上方的“文件管理”,再單擊“HDFS文件列表”,進入轉儲文件目錄(例如“temp”)查看是否存在轉儲的數據文件。
注:DIS會將數據合併轉發,所以此處的文件數量和上報的數據條數可能會不一致。
4.單擊頁面上方的“作業管理”,在“作業”頁簽中單擊“添加”,配置作業信息。本示例中創建一個spark類型的作業,實現分析設備上報數據,分別按日期、時間段、種類、區域統計銷量,將分析結果輸出為CSV文件並保存至OBS。
|
參數名稱 |
說明 |
|---|---|
|
作業類型 |
選擇“SparkSubmit”。 |
|
作業名稱 |
自定義,如“test”。 |
|
執行程序路徑 |
|
|
運行程序參數 |
左側選擇“–class”,右側輸入“com.huawei.bigdata.spark.examples.SalesStatistics”。 |
|
執行程序參數 |
輸入“AK SK inputpath outputpath”。
|
|
服務配置參數 |
無需填寫。 |
配置完成后單擊“確定”啟動作業。
5.作業完成后,可在OBS桶內看到output文件夾,裏面有四個文件夾,每個文件夾內有一個“_SUCCESS”文件和一個“part”開頭的csv文件。
注:本實驗的樣例程序分析數據時會將UTC時間轉換為本地時間,因此數據分析結果中的日期與時間段數值會和上報時的數值不一致。
6.登錄華為雲官方網站,訪問數據可視化服務。
7.單擊“進入控制台”。
8.單擊“我的數據 > 新建數據連接” ,在“新建數據連接”頁面左側的數據庫類型中,選擇“CSV文件”,按照下錶的數據規劃填寫配置后單擊“確定”。重複本步驟建立4個數據連接。
|
參數名 |
說明 |
|---|---|
|
名稱 |
建立4個數據連接,分別命名為:
|
|
Access Key |
填寫華為雲賬號的AK、SK,獲取方法可參考AK和SK的獲取方法。 |
|
Secret Access Key |
|
|
文件來源 |
選擇“OBS文件”。 |
|
文件路徑 |
4個連接分別選擇步驟5的output文件夾內和連接同名的文件夾內的csv文件。 |
9.返回“我的大屏”頁簽,單擊配置數據可視化服務時創建的大屏右下的編輯按鈕進入編輯頁面。
10.選中“每日銷量”表,在右側數據面板選擇數據類型為“CSV文件”,數據連接選擇步驟8添加的數據連接“salesByDate”。
11.根據響應數據的屬性名稱配置字段映射。
配置 “x”為 “saleDate”, “y”為 “saleNumber”。
12.選中“時間段銷量”表,在右側數據面板選擇數據類型為“CSV文件”,數據連接選擇步驟8添加的數據連接“salesByTime”。
13.根據響應數據的屬性名稱配置字段映射。
配置 “x”為 “saleTime”, “y”為 “saleNumber”。
14.選中“種類銷量”表,在右側數據面板選擇數據類型為“CSV文件”,數據連接選擇步驟8添加的數據連接“salesByCategory”。
15.根據響應數據的屬性名稱配置字段映射。
配置 “s”為 “category”, “y”為 “saleNumber”,並設置各個分類的名稱(本示例中為“soda”,“juice”,“sport”)和圖例的顏色。
16.選中“地區銷量”表,在右側數據面板選擇數據類型為“CSV文件”,數據連接選擇步驟8添加的數據連接“salesByArea”。
17.根據響應數據的屬性名稱配置字段映射。
配置 “num”為 “saleNumber”。
18.全部圖表配置完成后,單擊頁面右上角的可預覽報表,示例如下圖。
至此,通過該文檔的學習,您應該對自動售貨機銷售分析場景有了一定的了解。接下來,可以在系列後續文章中,可以學習到更多的物聯網業務場景。
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※帶您來了解什麼是 USB CONNECTOR ?
※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面
※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!
※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
※教你寫出一流的銷售文案?
曹工說JDK源碼(1)–ConcurrentHashMap,擴容前大家同在一個哈希桶,為啥擴容后,你去新數組的高位,我只能去低位?
曹工說JDK源碼(2)–ConcurrentHashMap的多線程擴容,說白了,就是分段取任務
曹工說JDK源碼(3)–ConcurrentHashMap,Hash算法優化、位運算揭秘
這個基本也是redis 面試的經典題目了,然而,網上不少博客對這個詞的定義都含糊不清,各執一詞。
主要有兩類說法:
大量緩存key,由於設置了相同的過期時間,在某個時刻同時失效,導致此刻的查詢請求,全部湧向db,本來db的tps大概是幾千左右,結果湧入了幾十萬的請求,那db肯定直接就扛不住了
這種說法下面,解決方案一般是,把過期時間增加一個隨機值,這樣,也就不會大批量的key同時失效了
另外一種說法是,本來redis扛下了大部分的請求,但是,由於緩存所在的機器,發生了宕機。此時,緩存這台機器之間就連不上了,redis服務也掛了,此時,你的服務里,發現redis取不到,然後全都跑去查數據庫,那,就發生和前面一樣的情況了,請求全部湧向db,db無響應。
兩類說法,也不用覺得,這個對,那個不對,不過是一個技術名詞,當初發明這個詞的人,估計也沒想那麼多,結果傳播開來之後,就變成了現在這個樣子。
我們這裏主要採用下面那一種說法,因為下面這種說法,其實是已經包含了上面的情景。但是,下面這種場景,要複雜的多,因為redis此時就是一個完全不可信的東西了,你得想好,怎麼不讓它掛掉,那是不是應該部署sentinel、cluster集群?同時,持久化必須要開啟。
這樣呢,掛掉后,短暫的不可用之後,大概幾十s吧,緩存集群就恢復了,就又可用了。
同時,我們還得考慮,假設,現在redis掛了,我們代碼的降級策略是什麼?
大家發現redis掛了,首先,估計是會拋異常了,連接超時;拋了異常后,要直接拋到前端嗎?作為一個穩健的後端程序,那肯定是不行的,你redis掛了,數據庫又沒掛;好吧,那我們就大家一起去查數據庫。
結果,大量的查詢請求,就烏泱泱地跑去查庫了,然後,db卒。這個肯定不行。
所以,我們必須要控制的一點是,當發現某個key失效了,不是大家都去查庫,而是要進行 併發控制。
什麼是併發控制?就是不能全部放過去查庫,只能放少部分,免得把脆弱的db打死。
併發控制,基本就是要爭奪去查庫的權利了,這一步,基本就是一個選舉的過程,可以通過搶鎖的方式,比如Reentrentlock,synchronized,cas也可以。
搶到鎖的線程,有資格去查庫,其他線程要麼被阻塞,要麼自旋
搶到鎖的線程,去查庫,查到數據后,將數據存放在某個地方,通知其他線程去取(如果其他線程被阻塞的話);或者,如果其他線程沒被阻塞,比如sleep 50ms,再去指定的地方拿數據那種,這種就不需要通知
總之,如果其他線程要我們通知,我們就通知;不要我們通知,我們就不通知。
在while(true)里,sleep 50ms,然後再去取數據
這種類似於忙等待,但是每次sleep一會,所以還不錯
將自己阻塞,等待搶到鎖的線程,構建完緩存后,來喚醒
在while(true)里,一直忙循環,期間一直檢查數據是否已經ok了,這種方案呢,要看裏面:檢查數據的操作,是否耗時;如果只是檢查jvm內存里的數據,那還好;否則的話,假設要去檢查redis的話,這種io比較耗時的操作的話,就不合適了,cpu會一直空轉。
主線程構建緩存時,其他線程,在while(true)里,sleep 一定時間,然後再檢查數據是否ready。
說了這麼多,好像和題目里的concurrenthashmap沒啥關係,不,是有關係的,因為,這個思路,其實就是來自於concurrentHashMap。
在我們用無參構造函數,去new一個ConcurrentHashMap時,此時還不會去創建底層數組,這個是一個小優化。什麼時候創建數組呢,是在我們第一次去put的時候。
put的時候,會調用putVal。
其中,putVal代碼如下:
transient volatile Node<K,V>[] table;
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
// 1
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 2
if (tab == null || (n = tab.length) == 0)
tab = initTable();
1處,把field table,賦值給局部變量tab
2處,如果tab為null,則進行initTable初始化
這個2處,在多線程put的時候,是可能多個線程同時進來的。有併發問題。
我們接下來,看看initTable是怎麼解決這個問題的,畢竟,我們new數組,只new一次即可,new那麼多次,沒用,對性能有損耗。所以,這裏面肯定會多線程爭奪初始化權利的代碼。
private transient volatile int sizeCtl;
transient volatile Node<K,V>[] table;
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab;
int sc;
// 0
while ((tab = table) == null || tab.length == 0) {
// 1
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 2
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 3
if ((tab = table) == null || tab.length == 0) {
// 4
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
// 5
sizeCtl = sc;
}
break;
}// end if
}// end while
return tab;
}
1處,這裏把sizeCtl,賦值給局部變量sc。這裏的sizeCtl是一個很重要的field,當我們new完之後,默認這個字段,要麼為0,要麼為準備創建的底層數組的長度。
這裏去判斷是否小於0,那肯定不滿足,小於0,會是什麼意思?當某個線程,搶到了這個initTable中的底層數組的創建權利時,就會把sizeCtl改為 -1。
所以,這裏的意思是,看看是否已經有其他線程在初始化了,如果已經有了,則直接調用:
Thread.yield();
這個方法的意思是,暗示操作系統,自己準備放棄cpu;但操作系統,自有它自己的線程調度規則,所以,這個方法可能沒什麼效果;我們業務代碼,這裏一般可以修改為Thread.sleep。
這個方法調用完成后,後續也沒有其他代碼,所以會直接跳轉到循環開始處(0處代碼),判斷table是否初始化ok了,如果沒有ok,則會繼續進來。
2處,使用cas,如果此時,sizeCtl的值等於sc的值,就修改sizeCtl為 -1;如果成功,則返回true,進入3處
否則,會跳轉到0處,繼續循環。
3處,雖然搶到了控制權,但是這裏還是要再判斷一下,不然可能出現重複初始化,即,不加這一行,4處的代碼,會被重複執行
4處開始,這裏去執行真正的初始化邏輯。
//
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 1
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 2
table = tab = nt;
sc = n - (n >>> 2);
這裏的1處,new數組;2處,賦值給field:table;此時,因為table 這個field是volatile修飾的,所以其他線程會馬上感知到。0處代碼就不會為true了,就不會繼續循環了。
5處,修改sizeCtl為正數。
這裏說下,為啥要加3處的那個判斷。
現在,假設線程A,在初始化完成后,走到了5處,修改了sizeCtl為正數;而線程B,剛好執行1處代碼:
// 1
if ((sc = sizeCtl) < 0)
那肯定,1處就不滿足了;然後就會進到2處,cas修改成功,進行初始化。沒有3處判斷的話,就會重複初始化。
我這裏的方案,還是比較簡單那種,就是,n個線程同時爭奪構建緩存的權利;winner線程,構建緩存后,會把緩存設置到redis;其他線程則是一直在while(true)里sleep一段時間,然後檢查redis里的數據是否不為空。
這個方案中,redis掛了這種情況,是沒在考慮中的,但是一個方案,沒辦法立馬各方面全部到位,後續我再完善一下。
@Override
public Users getUser(long userId) {
ValueOperations<String, Users> ops = redisTemplate.opsForValue();
// 1
Users s = ops.get(String.valueOf(userId));
if (s == null) {
/**
* 2 這裏要去查庫獲取值
*/
Users users = getUsersFromDB(userId);
// 3
ops.set(String.valueOf(users.getUserId()),users);
return users;
}
return s;
}
private Users getUsersFromDB(long userId) {
Users users = new Users();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("spent 1s to get user from db");
users.setUserId(userId);
users.setUserName("zhangsan");
return users;
}
直接看上面的1,2,3處。就是檢查、構建緩存,設置到緩存的過程。
// 1
private volatile int initControl;
@Override
public Users getUser(long userId) {
ValueOperations<String, Users> ops = redisTemplate.opsForValue();
Users users;
while (true) {
// 2
users = ops.get(String.valueOf(userId));
if (users != null) {
// 3
break;
}
// 4
int initControlLocal = initControl;
/**
* 5 如果已經有線程在進行獲取了,則直接放棄cpu
*/
if (initControlLocal < 0) {
// log.info("initControlLocal < 0,just yield and wait");
// Thread.yield();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
log.warn("e:{}", e);
}
continue;
}
/**
* 6 爭奪控制權
*/
boolean bGotChanceToInit = U.compareAndSwapInt(this,
INIT_CONTROL, initControlLocal, -1);
// 7
if (bGotChanceToInit) {
try {
// 8
users = ops.get(String.valueOf(userId));
if (users == null) {
log.info("got change to init");
/**
* 9 這裏要去查庫獲取值
*/
users = getUsersFromDB(userId);
ops.set(String.valueOf(users.getUserId()), users);
log.info("init over");
}
} finally {
// 10
initControl = 0;
}
break;
}// end if (bGotChanceToInit)
}// end while
return users;
}
1處,定義了一個field,initControl;默認為0.線程們會去使用cas,修改為-1,成功的線程,即獲得初始化緩存的權利。
注意,要定義為volatile,保證線程間的可見性
2處,去redis獲取緩存,如果不為null,直接返回
4處,如果沒取到緩存,則進入此處;此處,將field:initControl賦值給局部變量
5處,判斷局部變量initControlLocal,是否小於0;小於0,說明已經有線程在進行初始化了,直接contine,繼續下一次循環
6處,如果當前還沒有線程在初始化,則開始競爭初始化的權利,誰成功地用cas,修改field:initControl為-1,誰就獲得這個權利
7處,如果當前線程獲得了權利,則進入8處,否則,會繼續下一次循環
8處,再次去redis,獲取緩存,如果不為空,則進入9處
9處,查庫,設置緩存
10處,修改field:initControl為0,表示退出初始化
這裏的代碼,整體和hashmap中的initTable是一模一樣的。
上面的方案,怎麼測試沒問題呢?我寫了一段測試代碼。
ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("discard:{}",r);
}
});
@RequestMapping("/test.do")
public void test() {
// 0
iUsersService.deleteUser(111L);
CyclicBarrier barrier = new CyclicBarrier(100);
for (int i = 0; i < 100; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
long start = System.currentTimeMillis();
// 1
Users users = iUsersService.getUser(111L);
log.info("result:{},spent {} ms", users, System.currentTimeMillis() - start);
}
});
}
}
上面模擬100併發下,獲取緩存。
0處,把緩存刪了,模擬緩存失效
1處,調用方法,獲取緩存。
效果如下:
可以看到,只有一個線程拿到了初始化權利。
https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/redis-cache-avalanche
jdk的併發包,寫得真是有水平,大家仔細研究的話,必有收穫。
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!
※網頁設計公司推薦不同的風格,搶佔消費者視覺第一線
※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整
※南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!
※教你寫出一流的銷售文案?
※超省錢租車方案
目前我們做的上位機項目還是以Winform為主,在實際應用過程中,可能還會出現一些細節的修改。對於這種情況,如果上位機帶有自動更新功能,我們只需要將更新后的應用程序打包放在指定的路徑下,可以讓用戶自己來進行更新使用,會大大增加項目的便捷性。
01.自動更新整體思路
今天給大家介紹一下如何基於C#實現WinForm自動更新的一種方式,這種方式長期應用在項目中,提供了很多幫助,也節約了大量的時間成本,並且也使用在CMSPro軟件中,整體流程如下圖所示:
圖表 1自動更新流程
02.實現說明
通過上圖,可以發現這種方式是基於打包文件的方式實現的,好處在於整體打包下載,即使中途出現網絡中斷也不會有任何影響,當然相比於那種單個文件更新的方式,可能每次耗時會多一些,但是由於更新並不是一個頻繁操作的過程,這個時間是可以接受的。
(1)首先對於項目是否啟用自動更新,是通過配置的方式實現的,在實際開發中,可以使用手動更新和自動更新兩種方式,當啟用自動更新時,每次啟動應用程序都會與服務器版本號做下比較,判斷是否執行自動更新的流程。
圖表 2自動更新界面
(2)對於手動更新,可以通過點擊,彈出一個手動更新窗體,如下圖所示:
圖表 3手動更新界面
(3)對於服務器路徑、本地版本號等信息都是通過本地配置文件存儲的,因此本地需要有一個LocalVersion的配置文件,具體用什麼形式,可以自由選擇,Ini、Txt、Xml、Json都可以,如下圖所示:
圖表 4本地配置文件參考
(4)服務器側也會有一個配置文件,形式自由選擇,應該包含以下信息:當前服務器版本號、最新版本的程序包、該版本是否更新、該版本更新內容等信息,同時如果有新版本,應該將新版本的文件放到指定路徑下,保證最新版本包的這個路徑是有效路徑。
圖表 5服務器配置文件參考
(5)上位機通過將服務器的最新版本號與本地的版本號做對比,如果服務器的版本號較大,說明服務器有更新版本,因此,會根據最新版本包的地址進行下載,這裏採用的是zip文件,下載過程根據網絡及實際情況可能會耗時,因此上位機側應該做個進度條,讓用戶知道下載的進度情況,同時對於每一步的狀態也應該通過圖標的方式來進行显示,讓用戶明確更新的進度情況。
圖表 6自動更新流程
(6)更新完成后,系統會自動重啟新的應用程序,可以看到軟件從之前的5.3.5版本升級到最新的6.0.0版本。
圖表 7更新結果
03.整體總結
本文主要工控上位機進行自動更新的流程做了一個整體介紹,主要是介紹流程為主,給大家分享一下實現的整體思路,畢竟每個人的實現方式都可能有所不同,大家也可以在此基礎上增加一個新的功能,給自己的上位機軟件增加一點特色的同時,也給自己提供了便捷一下升級的過程的話,可以通過關注本公眾號:dotNet工控上位機,併發送關鍵詞:CMSPro,下載之後安裝運行,便會直接進入版本升級的過程。
公眾號:thinger_swj
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※網頁設計公司推薦不同的風格,搶佔消費者視覺第一線
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面
※南投搬家公司費用需注意的眉眉角角,別等搬了再說!
※教你寫出一流的銷售文案?
本次做後台管理系統,採用的是 AntD 框架。涉及到圖片的上傳,用的是AntD的 upload 組件。
我在上一篇文章《前端AntD框架的upload組件上傳圖片時遇到的一些坑》中講到:AntD 的 upload 組件有很多坑,引起了很多人的關注。折騰過的人,自然明白其中的苦楚。
今天這篇文章,我們繼續來研究 AntD 的 upload 組件的另一個坑。
備註:本文寫於2020-06-11,使用的 antd 版本是 3.13.6。
因為需要上傳多張圖片,所以採用的是照片牆的形式。上傳成功后的界面如下:
(1)上傳中:
(2)上傳成功:
(3)圖片預覽:
首先,你需要讓後台同學提供好圖片上傳的接口。上一篇文章中,我們是把接口調用直接寫在了 <Upload> 標籤的 action 屬性當中。但如果你在調接口的時候,動作很複雜(比如根據業務要求,需要連續調兩個接口才能上傳圖片,或者在調接口時還要做其他的事情),這個 action 方法就無法滿足需求了。那該怎麼做呢?
好在 AntD 的 upload 組件給我們提供了 customRequest這個方法:
關於customRequest 這個方法, AntD 官方並沒有給出示例,他們只是在 GitHub 上給出了這樣一個簡短的介紹:
但這個方法怎麼用呢?用的時候,會遇到什麼問題呢?AntD 官方沒有說。我在網上搜了半天,也沒看到比較完整的、切實可行的 Demo。我天朝地大物博,網絡資料浩如煙海,AntD 可是口口聲聲被人們號稱是天朝最好用的管理後台的樣式框架。可如今,卻面臨這樣的局面。我看着你們,滿懷羡慕。
既然如此,那我就自己研究吧。折騰了一天,總算是把 customRequest 的坑踩得差不多了。
啥也不說了,直接上代碼。
採用 AntD框架的 upload 組件的 customRequest 方法,自定義上傳行為。核心代碼如下:
import React, { PureComponent } from 'react';
import { Button, Card, Form, message, Upload, Icon, Modal, Row, Col } from 'antd';
import { connect } from 'dva';
import { queryMyData, submitData } from '../api';
import { uploadImage } from '../../utils/wq.img.upload';
import styles from '../../utils/form.less';
const FormItem = Form.Item;
@Form.create()
export default class PicturesWall extends PureComponent {
constructor(props) {
super(props);
const { id } = this.props.match.params;
this.state = {
id,
img: undefined, // 從接口拿到的圖片字段
imgList: [], // 展示在 antd圖片組件上的數據
previewVisible: false,
previewImage: '',
};
}
componentDidMount() {
const { id } = this.state;
id && this.queryData();
}
// 調接口,查詢已有的數據
queryData() {
const { id } = this.state;
queryMyData({
id,
})
.then(({ ret, data }) => {
if (ret == 0 && data && data.list && data.list.length) {
const item = data.list[0];
const img = data.img;
const imgList = item.img
? [
{
uid: '1', // 注意,這個uid一定不能少,否則展示失敗
name: 'hehe.png',
status: 'done',
url: img,
},
]
: [];
this.setState({
img,
imgList,
});
} else {
return Promise.reject();
}
})
.catch(() => {
message.error('查詢出錯,請重試');
});
}
handleCancel = () => this.setState({ previewVisible: false });
// 方法:圖片預覽
handlePreview = (file) => {
console.log('smyhvae handlePreview:' + JSON.stringify(file));
this.setState({
previewImage: file.url || file.thumbUrl,
previewVisible: true,
});
};
// 參考鏈接:https://www.jianshu.com/p/f356f050b3c9
handleBeforeUpload = (file) => {
console.log('smyhvae handleBeforeUpload file:' + JSON.stringify(file));
console.log('smyhvae handleBeforeUpload file.file:' + JSON.stringify(file.file));
console.log('smyhvae handleBeforeUpload file type:' + JSON.stringify(file.type));
//限製圖片 格式、size、分辨率
const isJPG = file.type === 'image/jpeg';
const isJPEG = file.type === 'image/jpeg';
const isGIF = file.type === 'image/gif';
const isPNG = file.type === 'image/png';
const isLt2M = file.size / 1024 / 1024 < 1;
if (!(isJPG || isJPEG || isPNG)) {
Modal.error({
title: '只能上傳JPG、JPEG、PNG格式的圖片~',
});
} else if (!isLt2M) {
Modal.error({
title: '圖片超過1M限制,不允許上傳~',
});
}
return (isJPG || isJPEG || isPNG) && isLt2M;
};
// checkImageWH 返回一個promise 檢測通過返回resolve 失敗返回reject阻止圖片上傳
checkImageWH(file) {
return new Promise(function (resolve, reject) {
let filereader = new FileReader();
filereader.onload = (e) => {
let src = e.target.result;
const image = new Image();
image.onload = function () {
// 獲取圖片的寬高
file.width = this.width;
file.height = this.height;
resolve();
};
image.onerror = reject;
image.src = src;
};
filereader.readAsDataURL(file);
});
}
// 圖片上傳
doImgUpload = (options) => {
const { onSuccess, onError, file, onProgress } = options;
// start:進度條相關
// 偽裝成 handleChange裏面的圖片上傳狀態
const imgItem = {
uid: '1', // 注意,這個uid一定不能少,否則上傳失敗
name: 'hehe.png',
status: 'uploading',
url: '',
percent: 99, // 注意不要寫100。100表示上傳完成
};
this.setState({
imgList: [imgItem],
}); // 更新 imgList
// end:進度條相關
const reader = new FileReader();
reader.readAsDataURL(file); // 讀取圖片文件
reader.onload = (file) => {
const params = {
myBase64: file.target.result, // 把 本地圖片的base64編碼傳給後台,調接口,生成圖片的url
};
// 上傳圖片的base64編碼,調接口后,返回 imageId
uploadImage(params)
.then((res) => {
console.log('smyhvae doImgUpload:' + JSON.stringify(res));
console.log('smyhvae 圖片上傳成功:' + res.imageUrl);
const imgItem = {
uid: '1', // 注意,這個uid一定不能少,否則上傳失敗
name: 'hehe.png',
status: 'done',
url: res.imageUrl, // url 是展示在頁面上的絕對鏈接
imgUrl: res.imageUrl, // imgUrl 是存到 db 里的相對鏈接
// response: '{"status": "success"}',
};
this.setState({
imgList: [imgItem],
}); // 更新 imgList
})
.catch((e) => {
console.log('smyhvae 圖片上傳失敗:' + JSON.stringify(e || ''));
message.error('圖片上傳失敗,請重試');
});
};
};
handleChange = ({ file, fileList }) => {
console.log('smyhvae handleChange file:' + JSON.stringify(file));
console.log('smyhvae handleChange fileList:' + JSON.stringify(fileList));
if (file.status == 'removed') {
this.setState({
imgList: [],
});
}
};
submit = (e) => {
e.preventDefault();
this.props.form.validateFields((err, fieldsValue) => {
if (err) {
return;
}
const { id, imgList } = this.state;
const tempImgList = imgList.filter((item) => item.status == 'done'); // 篩選出 status = done 的圖片
const imgArr = [];
tempImgList.forEach((item) => {
imgArr.push(item.imgUrl);
// imgArr.push(item.url);
});
submitData({
id,
img: imgArr[0] || '', // 1、暫時只傳一張圖片給後台。如果傳多張圖片,那麼,upload組件需要進一步完善,比較麻煩,以後有需求再優化。2、如果圖片字段是選填,那就用空字符串兜底
})
.then((res) => {
if (res.ret == 0) {
message.success(`${id ? '修改' : '新增'}成功,自動跳轉中...`);
} else if (res.ret == 201 || res.ret == 202 || res.ret == 203 || res.ret == 6) {
return Promise.reject(res.msg);
} else {
return Promise.reject();
}
})
.catch((e) => {
message.error(e || '提交失敗,請重試');
});
});
};
render() {
const { id, imgList } = this.state;
console.log('smyhvae render imgList:' + JSON.stringify(imgList));
const { getFieldDecorator } = this.props.form;
const formItemLayout = {
labelCol: { span: 3 },
wrapperCol: { span: 10 },
};
const buttonItemLayout = {
wrapperCol: { span: 10, offset: 3 },
};
const uploadButton = (
<div>
<Icon type="plus" />
<div className="ant-upload-text">Upload</div>
</div>
);
return (
<Card title={id ? '修改信息' : '新增信息'}>
<Form onSubmit={this.submit} layout="horizontal">
{/* 新建圖片、編輯圖片 */}
<FormItem label="圖片" {...formItemLayout}>
{getFieldDecorator('img', {
rules: [{ required: false, message: '請上傳圖片' }],
})(
<Upload
action="2"
customRequest={this.doImgUpload}
listType="picture-card"
fileList={imgList}
onPreview={this.handlePreview}
beforeUpload={this.handleBeforeUpload}
onChange={this.handleChange}
>
{imgList.length >= 1 ? null : uploadButton}
</Upload>
)}
</FormItem>
<Row>
<Col span={3} />
<Col span={18} className={styles.graytext}>
注:圖片支持JPG、JPEG、PNG格式,小於1M,最多上傳1張
</Col>
</Row>
<FormItem {...buttonItemLayout}>
<Button type="primary" htmlType="submit">
提交
</Button>
</FormItem>
</Form>
{/* 圖片點開預覽 */}
<Modal visible={this.state.previewVisible} footer={null} onCancel={this.handleCancel}>
<img alt="example" style={{ width: '100%' }} src={this.state.previewImage} />
</Modal>
</Card>
);
}
}
注意file的格式:https://www.lmonkey.com/t/oREQA5XE1
Demo在線演示:
https://stackoverflow.com/questions/58128062/using-customrequest-in-ant-design-file-upload
fileList 格式在線演示:
https://stackoverflow.com/questions/51514757/action-function-is-required-with-antd-upload-control-but-i-dont-need-it
ant design Upload組件的使用總結:https://www.jianshu.com/p/0aa4612af987
antd上傳功能的CustomRequest:https://mlog.club/article/3832743
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
※別再煩惱如何寫文案,掌握八大原則!
※教你寫出一流的銷售文案?
※超省錢租車方案
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
※產品缺大量曝光嗎?你需要的是一流包裝設計!
如果你對性能測試感興趣,但是又不熟悉理論知識,可以看下面的系列文章
https://www.cnblogs.com/poloyy/category/1620792.html
我們在學習性能測試之前,需要有個新的認識:性能測試,不再是像功能測試一樣單純的找 Bug,而是去找性能指標
簡單理解:一個接口返回的數據比較多(假設:不使用分頁,把所有數據同時返回)
大數據測試的功能屬於功能測試哦
在性能測試過程中發現一些問題,假設定位到某一段代碼有問題,可以截圖提交 Bug 給開發,但這並不是我們性能測試的最終目的,最終目的是找出性能指標
跑步100米,用時多少?運動員的心跳、步伐頻率是多少?
性能指標值和響應時間是否滿足當前業務場景的最低要求(合格線)
電商系統,下單業務,目前還不知道系統支持多少人同時下單,那麼我們需要找到服務器能正常支持多少人同時下單
以下含義來源高老的解釋,比較“官方”的術語
其實也算是一個簡潔描述的性測試流程了
目前博主是正在學習性能測試的小白一枚,希望通過通俗簡單的術語來學懂性能測試,打造屬於自己的知識體系,歡迎大家進群與我溝通(870155189)
通過增加“用戶數”,就是常說的併發數
天平秤,稱東西的時候,需要逐步加砝碼,最終達到砝碼和物品重量的平衡點,因為它不可能一下子就達到平衡點【好比不可能一下子找到系統能承受的最大負載量】
問:大家什麼時候會覺得工作壓力大?
答:996、007;因為你不會覺得955壓力山大吧
結論:所以在我們日常工作中,長時間工作強度高,才會覺得壓力大;如果你一周就加班一天也說壓力大…(那就別干這一行了)
測試系統的穩定性
工作壓力大,你還能堅持下去(那穩定性肯定好吧),那如果你很快就離職了(那穩定性肯定差,都宕機罷工了)
隔三差五的出現下面的情況
負載測試一般時間比較短,壓力測試時間比較長,持續運行時間短就能正常使用,但持續運行時間長就可能崩掉了,這是什麼原因呢?
壓力測試長時間運行,可能會逐漸增加系統的內存佔用空間,若得不到有效的內存回收,當達到內存最大值時,系統就會崩掉
電商秒殺場景,幾十個商品幾十萬個人同時秒殺搶購
從一袋米中找一個綠豆,和一碗米中找一個綠豆,找的時間肯定是千差萬別的
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※別再煩惱如何寫文案,掌握八大原則!
※網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!
※超省錢租車方案
※教你寫出一流的銷售文案?
※網頁設計最專業,超強功能平台可客製化
※產品缺大量曝光嗎?你需要的是一流包裝設計!
獲取流數據的時候,通常需要根據所需把流拆分出其他多個流,根據不同的流再去作相應的處理。
舉個例子:創建一個商品實時流,商品有季節標籤,需要對不同標籤的商品做統計處理,這個時候就需要把商品數據流根據季節標籤分流。
先模擬一個實時的數據流
import lombok.Data;
@Data
public class Product {
public Integer id;
public String seasonType;
}
自定義Source
import common.Product;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.ArrayList;
import java.util.Random;
public class ProductStremingSource implements SourceFunction<Product> {
private boolean isRunning = true;
@Override
public void run(SourceContext<Product> ctx) throws Exception {
while (isRunning){
// 每一秒鐘產生一條數據
Product product = generateProduct();
ctx.collect(product);
Thread.sleep(1000);
}
}
private Product generateProduct(){
int i = new Random().nextInt(100);
ArrayList<String> list = new ArrayList();
list.add("spring");
list.add("summer");
list.add("autumn");
list.add("winter");
Product product = new Product();
product.setSeasonType(list.get(new Random().nextInt(4)));
product.setId(i);
return product;
}
@Override
public void cancel() {
}
}
輸出:
使用 filter 算子根據數據的字段進行過濾。
import common.Product;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import source.ProductStremingSource;
public class OutputStremingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Product> source = env.addSource(new ProductStremingSource());
// 使用Filter分流
SingleOutputStreamOperator<Product> spring = source.filter(product -> "spring".equals(product.getSeasonType()));
SingleOutputStreamOperator<Product> summer = source.filter(product -> "summer".equals(product.getSeasonType()));
SingleOutputStreamOperator<Product> autumn = source.filter(product -> "autumn".equals(product.getSeasonType()));
SingleOutputStreamOperator<Product> winter = source.filter(product -> "winter".equals(product.getSeasonType()));
source.print();
winter.printToErr();
env.execute("output");
}
}
結果輸出(紅色為季節標籤是winter的分流輸出):
重寫OutputSelector內部類的select()方法,根據數據所需要分流的類型反正不同的標籤下,返回SplitStream,通過SplitStream的select()方法去選擇相應的數據流。
只分流一次是沒有問題的,但是不能使用它來做連續的分流。
SplitStream已經標記過時了
public class OutputStremingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Product> source = env.addSource(new ProductStremingSource());
// 使用Split分流
SplitStream<Product> dataSelect = source.split(new OutputSelector<Product>() {
@Override
public Iterable<String> select(Product product) {
List<String> seasonTypes = new ArrayList<>();
String seasonType = product.getSeasonType();
switch (seasonType){
case "spring":
seasonTypes.add(seasonType);
break;
case "summer":
seasonTypes.add(seasonType);
break;
case "autumn":
seasonTypes.add(seasonType);
break;
case "winter":
seasonTypes.add(seasonType);
break;
default:
break;
}
return seasonTypes;
}
});
DataStream<Product> spring = dataSelect.select("machine");
DataStream<Product> summer = dataSelect.select("docker");
DataStream<Product> autumn = dataSelect.select("application");
DataStream<Product> winter = dataSelect.select("middleware");
source.print();
winter.printToErr();
env.execute("output");
}
}
推薦使用這種方式
首先需要定義一個OutputTag用於標識不同流
可以使用下面的幾種函數處理流發送到分流中:
之後再用getSideOutput(OutputTag)選擇流。
public class OutputStremingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Product> source = env.addSource(new ProductStremingSource());
// 使用Side Output分流
final OutputTag<Product> spring = new OutputTag<Product>("spring");
final OutputTag<Product> summer = new OutputTag<Product>("summer");
final OutputTag<Product> autumn = new OutputTag<Product>("autumn");
final OutputTag<Product> winter = new OutputTag<Product>("winter");
SingleOutputStreamOperator<Product> sideOutputData = source.process(new ProcessFunction<Product, Product>() {
@Override
public void processElement(Product product, Context ctx, Collector<Product> out) throws Exception {
String seasonType = product.getSeasonType();
switch (seasonType){
case "spring":
ctx.output(spring,product);
break;
case "summer":
ctx.output(summer,product);
break;
case "autumn":
ctx.output(autumn,product);
break;
case "winter":
ctx.output(winter,product);
break;
default:
out.collect(product);
}
}
});
DataStream<Product> springStream = sideOutputData.getSideOutput(spring);
DataStream<Product> summerStream = sideOutputData.getSideOutput(summer);
DataStream<Product> autumnStream = sideOutputData.getSideOutput(autumn);
DataStream<Product> winterStream = sideOutputData.getSideOutput(winter);
// 輸出標籤為:winter 的數據流
winterStream.print();
env.execute("output");
}
}
結果輸出:
更多文章:www.ipooli.com
掃碼關注公眾號《ipoo》
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※教你寫出一流的銷售文案?
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
※回頭車貨運收費標準
※別再煩惱如何寫文案,掌握八大原則!
※超省錢租車方案
※產品缺大量曝光嗎?你需要的是一流包裝設計!
我們福祿網絡致力於為廣大用戶提供智能化充值服務,包括各類通信充值卡(比如移動、聯通、電信的話費及流量充值)、遊戲類充值卡(比如王者榮耀、吃雞類點券、AppleStore充值、Q幣、鬥魚幣等)、生活服務類(比如肯德基、小鹿茶等),網娛類(比如QQ各類鑽等),作為一個服務提供商,商品質量的穩定、持續及充值過程的便捷一直是我們在業內的口碑。
在整個商品流通過程中,如何做好庫存的管理,以充分提高庫存運轉周期和資金使用效率,一直是個難題。基於此,我們提出了智能化的庫存管理服務,根據訂單數據及商品數據,來預測不同商品隨着時間推移的日常消耗情況。
目前成熟的時間序列預測算法很多,但商業領域性能優越的卻不多,經過多種嘗試,給大家推薦2種時間序列算法:facebook開源的Prophet算法和LSTM深度學習算法。
現將個人理解的2種算法特性予以簡要說明:
time,product,cnt
2019-10-01 00,**充值,6
2019-10-01 00,***遊戲,368
2019-10-01 00,***,1
2019-10-01 00,***,11
2019-10-01 00,***遊戲,17
2019-10-01 00
,三網***,39
2019-10-01 00,**網,6
2019-10-01 00,***,2
字段說明:
時間序列一般不進行特徵處理,當然可以根據具體情況進行歸一化處理或是取對數處理等。
目前待選的算法主要有2種:
以上是理論上的調參步驟,但我們在實際情況下在建議使用grid_search(網格尋參)方式,直接簡單效果好。當機器性能不佳時網格調參配合理論調參方法可以加快調參速度。建議初學者使用手動調參方式以理解每個參數對模型效果的影響。
holiday.csv
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from fbprophet import Prophet
data = pd.read_csv('../data/data2.csv', parse_dates=['time'], index_col='time')
def get_product_data(name, rule=None):
product = data[data['product'] == name][['cnt']]
product.plot()
if rule is not None:
product = product.resample(rule).sum()
product.reset_index(inplace=True)
product.columns = ['ds', 'y']
return product
holidays = pd.read_csv('holiday.csv', parse_dates=['ds'])
holidays['lower_window'] = -1
holidays = holidays.append(pd.DataFrame({
'holiday': '雙11',
'ds': pd.to_datetime(['2019-11-11', '2020-11-11']),
'lower_window': -1,
'upper_window': 1,
})).append(pd.DataFrame({
'holiday': '雙12',
'ds': pd.to_datetime(['2019-12-12', '2020-12-12']),
'lower_window': -1,
'upper_window': 1,
})
)
def predict(name, rule='1d', freq='d', periods=1, show=False):
ds = get_product_data(name, rule=rule)
if ds.shape[0] < 7:
return None
m = Prophet(holidays=holidays)
m.fit(ds)
future = m.make_future_dataframe(freq=freq, periods=periods) # 建立數據預測框架,數據粒度為天,預測步長為一年
forecast = m.predict(future)
if show:
m.plot(forecast).show() # 繪製預測效果圖
m.plot_components(forecast).show() # 繪製成分趨勢圖
mse = forecast['yhat'].iloc[ds.shape[0]] - ds['y'].values
mse = np.abs(mse) / (ds['y'].values + 1)
return [name, mse.mean(), mse.max(), mse.min(), np.quantile(mse, 0.9), np.quantile(mse, 0.8), mse[-7:].mean(),
ds['y'].iloc[-7:].mean()]
if __name__ == '__main__':
products = set(data['product'])
p = []
for i in products:
y = predict(i)
if y is not None:
p.append(y)
df = pd.DataFrame(p, columns=['product', 'total_mean', 'total_max', 'total_min', '0.9', '0.8', '7_mean',
'7_real_value_mean'])
df.set_index('product', inplace=True)
product_sum: pd.DataFrame = data.groupby('product').sum()
df = df.join(product_sum)
df.sort_values('cnt', ascending=False, inplace=True)
df.to_csv('result.csv', index=False)
結果如下:由於行數較多這裏只展示前1行
根據結果,對比原生數據,可以得出如下結論:
就算法與產品的匹配性可分為3個類型:
A. 因素分解圖
上圖中主要分為3個部分,分別對應prophet 3大要素,趨勢、節假日或特殊日期、周期性(包括年周期、月周期、week周期、天周期以及用戶自定義的周期)
下面依照上面因素分解圖的順序依次對圖進行說明:
LSTM(長短記憶網絡)主要用於有先後順序的序列類型的數據的深度學習網絡。是RNN的優化版本。一般用於自然語言處理,也可用於時間序列的預測。
簡單來說就是,LSTM一共有三個門,輸入門,遺忘門,輸出門, i 、o、 f 分別為三個門的程度參數, g 與RNN中的概念一致。公式里可以看到LSTM的輸出有兩個,細胞狀態c 和隱狀態 h,c是經輸入、遺忘門的產物,也就是當前cell本身的內容,經過輸出門得到h,就是想輸出什麼內容給下一單元。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
from torch import nn
from sklearn.preprocessing import MinMaxScaler
ts_data = pd.read_csv('../data/data2.csv', parse_dates=['time'], index_col='time')
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
n_vars = 1 if type(data) is list else data.shape[1]
df = pd.DataFrame(data)
cols, names = list(), list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
names += [('var%d(t-%d)' % (j + 1, i)) for j in range(n_vars)]
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
if i == 0:
names += [('var%d(t)' % (j + 1)) for j in range(n_vars)]
else:
names += [('var%d(t+%d)' % (j + 1, i)) for j in range(n_vars)]
# put it all together
agg = pd.concat(cols, axis=1)
agg.columns = names
# drop rows with NaN values
if dropnan:
agg.dropna(inplace=True)
return agg
def transform_data(feature_cnt=2):
yd = ts_data[ts_data['product'] == '移動話費'][['cnt']]
scaler = MinMaxScaler(feature_range=(0, 1))
yd_scaled = scaler.fit_transform(yd.values)
yd_renamed = series_to_supervised(yd_scaled
, n_in=feature_cnt).values.astype('float32')
n_row = yd_renamed.shape[0]
n_train = int(n_row * 0.7)
train_X, train_y = yd_renamed[:n_train, :-1], yd_renamed[:n_train, -1]
test_X, test_y = yd_renamed[n_train:, :-1], yd_renamed[n_train:, -1]
# 最後,我們需要將數據改變一下形狀,因為 RNN 讀入的數據維度是 (seq, batch, feature),所以要重新改變一下數據的維度,這裏只有一個序列,所以 batch 是 1,而輸入的 feature 就是我們希望依據的幾天,這裏我們定的是兩個天,所以 feature 就是 2.
train_X = train_X.reshape((-1, 1, feature_cnt))
test_X = test_X.reshape((-1, 1, feature_cnt))
print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)
# 轉化成torch 的張量
train_x = torch.from_numpy(train_X)
train_y = torch.from_numpy(train_y)
test_x = torch.from_numpy(test_X)
test_y = torch.from_numpy(test_y)
return scaler, train_x, train_y, test_x, test_y
scaler, train_x, train_y, test_x, test_y = transform_data(24)
# lstm 網絡
class lstm_reg(nn.Module): # 括號中的是python的類繼承語法,父類是nn.Module類 不是參數的意思
def __init__(self, input_size, hidden_size, output_size=1, num_layers=2): # 構造函數
# inpu_size 是輸入的樣本的特徵維度, hidden_size 是LSTM層的神經元個數,
# output_size是輸出的特徵維度
super(lstm_reg, self).__init__() # super用於多層繼承使用,必須要有的操作
self.rnn = nn.LSTM(input_size, hidden_size, num_layers) # 兩層LSTM網絡,
self.reg = nn.Linear(hidden_size, output_size) # 把上一層總共hidden_size個的神經元的輸出向量作為輸入向量,然後回歸到output_size維度的輸出向量中
def forward(self, x): # x是輸入的數據
x, _ = self.rnn(x) # 單個下劃線表示不在意的變量,這裡是LSTM網絡輸出的兩個隱藏層狀態
s, b, h = x.shape
x = x.view(s * b, h)
x = self.reg(x)
x = x.view(s, b, -1) # 使用-1表示第三個維度自動根據原來的shape 和已經定了的s,b來確定
return x
def train(feature_cnt, hidden_size, round, save_path='model.pkl'):
# 我使用了GPU加速,如果不用的話需要把.cuda()給註釋掉
net = lstm_reg(feature_cnt, hidden_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-2)
for e in range(round):
# 新版本中可以不使用Variable了
# var_x = Variable(train_x).cuda()
# var_y = Variable(train_y).cuda()
# 將tensor放在GPU上面進行運算
var_x = train_x
var_y = train_y
out = net(var_x)
loss = criterion(out, var_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (e + 1) % 100 == 0:
print('Epoch: {}, Loss:{:.5f}'.format(e + 1, loss.item()))
# 存儲訓練好的模型參數
torch.save(net.state_dict(), save_path)
return net
if __name__ == '__main__':
net = train(24, 8, 5000)
# criterion = nn.MSELoss()
# optimizer = torch.optim.Adam(net.parameters(), lr=1e-2)
pred_test = net(test_x) # 測試集的預測結果
pred_test = pred_test.view(-1).data.numpy() # 先轉移到cpu上才能轉換為numpy
# 乘以原來歸一化的刻度放縮回到原來的值域
origin_test_Y = scaler.inverse_transform(test_y.reshape((-1,1)))
origin_pred_test = scaler.inverse_transform(pred_test.reshape((-1,1)))
# 畫圖
plt.plot(origin_pred_test, 'r', label='prediction')
plt.plot(origin_test_Y, 'b', label='real')
plt.legend(loc='best')
plt.show()
# 計算MSE
# loss = criterion(out, var_y)?
true_data = origin_test_Y
true_data = np.array(true_data)
true_data = np.squeeze(true_data) # 從二維變成一維
MSE = true_data - origin_pred_test
MSE = MSE * MSE
MSE_loss = sum(MSE) / len(MSE)
print(MSE_loss)
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※超省錢租車方案
※別再煩惱如何寫文案,掌握八大原則!
※回頭車貨運收費標準
※教你寫出一流的銷售文案?
※產品缺大量曝光嗎?你需要的是一流包裝設計!
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
摘錄自2020年8月11日中央社報導
新加坡今天(11日)銷毀重達9公噸的非法走私象牙,並透過網路直播,預計數天才能完成銷毀。當局表示,這是全球近年最大的銷毀非法象牙行動,展現星國打擊非法野生動物交易的決心。
根據動保人士估計,每天約有100隻非洲象被意圖盜取象牙等大象身體部位的盜獵者所殺,目前僅存約40萬隻非洲象。
新加坡是非洲與亞洲之間運送非法動物商品的海上航路點。除了對非法運輸的商品採取強硬立場,新加坡去年也宣示,自2021年9月起,將全面禁止國內象牙及其製品銷售。
生物多樣性
國際新聞
新加坡
象牙
野生動物
非洲象
非法盜獵
象牙走私
本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※帶您來了解什麼是 USB CONNECTOR ?
※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面
※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!
※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
※教你寫出一流的銷售文案?