電動車前景看俏,全球電動車累計銷售量已突破 400 萬

彭博能源財經(BNEF)最新報告指出,全球電動自小客車累計銷售已突破 400 萬大關,雖然乍看之下只佔總汽車銷售量的一小部分,但與 2015 年的 100 萬輛相比,其成長速度可說是一日千里。

BNEF 指出,若把電動巴士計算在內,電動車銷售量早在 7 月初就抵達 400 萬,其中電動自小客車全球銷售量在 2018 年 6 月底來到 350 萬輛,電動巴士則為是 421,000 輛,總銷售量為 397 萬。

報告顯示,電動車銷售量從 100 萬到 200 萬輛僅花費 17 個月,更在短短 6 個月就從 300 萬增加到 400 萬輛,而隨著電動車技術進步與價格下滑,未來電動車發展將踩油門加速,全球電動車銷售量僅需 6 個月、在 2019 年 3 月就能突破 500 萬。

電動車銷售量與日俱增,電動車銷售佔比在中國、歐洲和北美等主要市場也不斷提升,以 2018 年第二季來說,電動車就分別占當地總銷售的 4%、2.3% 與 1.6%。中國市場則是全球電動車發展迅速的一大功臣,中國市場早在 2011 年就佔全球電動自小客車總銷售的 37%,更占電動巴士的 99%。

BNEF 指出,未來中國將佔全球電動車總銷售的 42%,歐洲與北美分別占 26% 與 25%,若特斯拉平價電動車款 Model 3 在北美的銷售行情一路上漲,北美電動車銷售量則會迅速超越歐洲,而這兩個市場的銷售量也將同時達到 130 萬輛。

報告也表示,2018 年底之前還會有幾款電動車上市,這將能進一步提升全球電動車銷售市場。BNEF 指出,Model 3 將於 2019 年中旬進入歐洲市場、中國的「雙積分制」也將在 2019 年生效,新型車款與政策都能推動歐洲與中國電動車買氣。

中國雙積分制規定各大車廠必須出售一定比例的環保車,其中分為「油耗積分」與「新能源積分」,若車廠生產越多汽油車,油耗積分就會隨之減少;生產越多高性能電動車,新能源積分就越多。車廠每年正負積分必須抵銷歸零,如果積分沒辦法歸零就不能販售新車。

在政策挹注之下,中國電動車發展將逐步加速。中國媒體也指出,中國政府預估新能源車產量可在 2020 年達到 200 萬輛,銷售佔比更會在 2025 年達到總汽車市場的 20%。

BNEF 先前預估,2025 年全球電動車累計銷售量將增加 10 倍、達到 1,100 萬輛,2020-2030 年電動車價格則可與傳統汽車相當,2030 年全球電動車銷售量有望突破 3,000 萬輛。

隨著氣候變遷與空氣污染加劇,各國開始意識到電動車開發的重要性、紛紛開始制定禁售汽柴油車時間表與路上零排放目標,未來電動車的發展還會再加快,說不定可提早突破 1,000 萬大關。

(首圖來源:。文/DaisyChuang)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

Java 從入門到進階之路(二十三)

在之前的文章我們介紹了一下 Java 中的  集合框架中的Collection 的迭代器 Iterator,本章我們來看一下 Java 集合框架中的Collection 的泛型。

在講泛型之前我們先來看下面一段代碼:

 1 public class Main {
 2     public static void main(String[] args) {
 3         Point point = new Point(1, 2);
 4 
 5         point.setX(2);
 6         int ix = point.getX();
 7         System.out.println(ix); // (2, 2)
 8 
 9         /**
10          * 如果想要 x 值變為 double 類型則可以強轉為 double 類型
11          * */
12         point.setX(2);
13         double dx = (double) point.getX();
14         System.out.println(dx); // 2.0
15     }
16 }
17 
18 class Point {
19     private int x;
20     private int y;
21 
22     public Point(int x, int y) {
23         this.x = x;
24         this.y = y;
25     }
26 
27     public int getX() {
28         return x;
29     }
30 
31     public void setX(int x) {
32         this.x = x;
33     }
34 
35     public int getY() {
36         return y;
37     }
38 
39     public void setY(int y) {
40         this.y = y;
41     }
42 
43     @Override
44     public String toString() {
45         return "(" + x + ", " + y + ")";
46     }
47 }

上面的代碼我們之前的文章講過,我們可以通過傳入 x 和 y 值來定義 Point 點,如果我們想要 double 類型的點時需要造型為 double 類型,那我要定義漢字類型的呢?那就造型成 String 類型,這就很麻煩,每次都需要自己來造型,有種鞋不合腳的感覺,那能不能定義我想要什麼類型就是什麼類型呢,如下:

 1 public class Main {
 2     public static void main(String[] args) {
 3         Point<Integer> point1 = new Point<Integer>(1, 2); // 必須是包裝類
 4         point1.setX(1);
 5         System.out.println(point1.getX()); // 1
 6 
 7         Point<Double> point2 = new Point<Double>(1.1, 2.1); // 必須是包裝類
 8         point2.setX(1.2);
 9         System.out.println(point2.getX()); // 1.2
10 
11         Point<String> point3 = new Point<String>("一", "二"); // 必須是包裝類
12         point3.setX("三");
13         System.out.println(point3.getX()); //
14     }
15 }
16 
17 /**
18  * 泛型
19  * 又稱參數化類型,是將當前類的屬性的類型,方法參數的類型及方法
20  * 返回值的類型的定義權移交給使用者,
21  * 使用者在創建當前類的同時將泛型的試劑類型傳入
22  * 数字和字母組合,数字不能開頭
23  */
24 class Point<T> { // 定義為泛型 T 類型
25     private T x;
26     private T y;
27 
28     public Point(T x, T y) {
29         this.x = x;
30         this.y = y;
31     }
32 
33     public T getX() {
34         return x;
35     }
36 
37     public void setX(T x) {
38         this.x = x;
39     }
40 
41     public T getY() {
42         return y;
43     }
44 
45     public void setY(T y) {
46         this.y = y;
47     }
48 
49     @Override
50     public String toString() {
51         return "(" + x + ", " + y + ")";
52     }
53 }

從上面的代碼中,我們定義了一個 T 的類型 Point,當我們要實例化該類時,根據自己的需求傳入想要的包裝類類型即可,這樣就滿足了不同的需求,各取所需。 

泛型從底層來說其實就是 Object,定義了泛型只是編譯器在做一些驗證工作,當我們對泛型類型設置值時,會檢查是否滿足類型要求,當我們獲取一個泛型類型的值時,會自動進行類型轉換。

在平時我們是很少自己來定義泛型的,泛型是用來約束集合中元素的類型,如下:

 1 import java.util.ArrayList;
 2 import java.util.Collection;
 3 import java.util.Iterator;
 4 
 5 public class Main {
 6     public static void main(String[] args) {
 7         Collection<String> collection = new ArrayList<String>(); // 只能添加 String 類型的元素
 8         collection.add("one");
 9         collection.add("two");
10         collection.add("thee");
11         collection.add("four");
12         // collection.add(1); // 編譯錯誤
13         for (String string : collection) {
14             System.out.println(string); // one two three four
15         }
16         Iterator<String> iterator = collection.iterator();
17         while (iterator.hasNext()) {
18             // String string = (String) iterator.next(); 不需要再造型
19             String string = iterator.next();
20             System.out.println(string); // one two three four
21         }
22     }
23 }

  

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

Spring Boot入門系列(十八)整合mybatis,使用註解的方式實現增刪改查

之前介紹了Spring Boot 整合mybatis 使用xml配置的方式實現增刪改查,還介紹了自定義mapper 實現複雜多表關聯查詢。雖然目前 mybatis 使用xml 配置的方式 已經極大減輕了配置的複雜度,支持 generator 插件 根據表結構自動生成實體類、配置文件和dao層代碼,減輕很大一部分開發量;但是 java 註解的運用發展到今天。約定取代配置的規範已經深入人心。開發者還是傾向於使用註解解決一切問題,註解版最大的特點是具體的 SQL 文件需要寫在 Mapper 類中,取消了 Mapper 的 XML 配置 。這樣不用任何配置文件,就可以簡單配置輕鬆上手。所以今天就介紹Spring Boot 整合mybatis 使用註解的方式實現數據庫操作 。

Spring Boot 整合mybatis 使用xml配置版之前已經介紹過了,不清楚的朋友可以看看之前的文章:https://www.cnblogs.com/zhangweizhong/category/1657780.html。

 

一、整合Mybatis

Spring Boot  整合Mybatis 的步驟都是一樣的,已經熟悉的同學可以略過。

1、pom.xml增加mybatis相關依賴

我們只需要加上pom.xml文件這些依賴即可。

       <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.41</version>
        </dependency>
        <!--mybatis-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.1</version>
        </dependency>
        <!--mapper-->
        <dependency>
            <groupId>tk.mybatis</groupId>
            <artifactId>mapper-spring-boot-starter</artifactId>
            <version>1.2.4</version>
        </dependency>
        <!-- pagehelper -->
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
            <version>1.2.3</version>
        </dependency>
        <!-- druid 數據庫連接框架-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.9</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.generator</groupId>
            <artifactId>mybatis-generator-core</artifactId>
            <version>1.3.2</version>
            <scope>compile</scope>
            <optional>true</optional>
        </dependency>

 

2、application.properties配置數據連接

application.properties中需要增加mybatis相關的數據庫配置。

############################################################
# 數據源相關配置,這裏用的是阿里的druid 數據源
############################################################
spring.datasource.url=jdbc:mysql://localhost:3306/zwz_test
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.druid.initial-size=1
spring.datasource.druid.min-idle=1
spring.datasource.druid.max-active=20
spring.datasource.druid.test-on-borrow=true
spring.datasource.druid.stat-view-servlet.allow=true

############################################################
# mybatis 相關配置
############################################################
mybatis.type-aliases-package=com.weiz.pojo
mybatis.mapper-locations=classpath:mapper/*.xml
mapper.mappers=com.weiz.utils.MyMapper    #這個MyMapper 就是我之前創建的mapper統一接口。後面所有的mapper類都會繼承這個接口
mapper.not-empty=false
mapper.identity=MYSQL
# 分頁框架
pagehelper.helperDialect=mysql
pagehelper.reasonable=true
pagehelper.supportMethodsArguments=true
pagehelper.params=count=countSql

這裏的配置有點多,不過最基本的配置都在這。

 

3、在啟動主類添加掃描器

在SpringBootStarterApplication 啟動類中增加包掃描器。

@SpringBootApplication
//掃描 mybatis mapper 包路徑
@MapperScan(basePackages = "com.weiz.mapper") // 這一步別忘了。
//掃描 所有需要的包, 包含一些自用的工具類包 所在的路徑
@ComponentScan(basePackages = {"com.weiz","org.n3r.idworker"})
public class SpringBootStarterApplication {

public static void main(String[] args) {
SpringApplication.run(SpringBootStarterApplication.class, args);
}

}

注意:這一步別忘了,需要在SpringBootStarterApplication 啟動類中增加包掃描器,自動掃描加載com.weiz.mapper 裏面的mapper 類。

 

以上,就把Mybatis 整合到項目中了。 接下來就是創建表和pojo類,mybatis提供了強大的自動生成功能。只需簡單幾步就能生成pojo 類和mapper。

 

二、代碼自動生成工具

Mybatis 整合完之後,接下來就是創建表和pojo類,mybatis提供了強大的自動生成功能的插件。mybatis generator插件只需簡單幾步就能生成pojo 類和mapper。操作步驟和xml 配置版也是類似的,唯一要注意的是 generatorConfig.xml 的部分配置,要配置按註解的方式生成mapper 。

1、增加generatorConfig.xml配置文件

在resources 文件下創建 generatorConfig.xml 文件。此配置文件獨立於項目,只是給自動生成工具類的配置文件,具體配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE generatorConfiguration
        PUBLIC "-//mybatis.org//DTD MyBatis Generator Configuration 1.0//EN"
        "http://mybatis.org/dtd/mybatis-generator-config_1_0.dtd">
<generatorConfiguration>
    <context id="DB2Tables"  targetRuntime="MyBatis3">
        <commentGenerator>
            <property name="suppressDate" value="true"/>
            <!-- 是否去除自動生成的註釋 true:是 : false:否 -->
            <property name="suppressAllComments" value="true"/>
        </commentGenerator>
        <!--數據庫鏈接URL,用戶名、密碼 -->
        <jdbcConnection driverClass="com.mysql.jdbc.Driver" connectionURL="jdbc:mysql://127.0.0.1:3306/zwz_test" userId="root" password="root">
        </jdbcConnection>
        <javaTypeResolver>
            <property name="forceBigDecimals" value="false"/>
        </javaTypeResolver>
        <!-- 生成模型的包名和位置-->
        <javaModelGenerator targetPackage="com.weiz.pojo" targetProject="src/main/java">
            <property name="enableSubPackages" value="true"/>
            <property name="trimStrings" value="true"/>
        </javaModelGenerator>
        <!-- 生成映射文件的包名和位置-->
        <sqlMapGenerator targetPackage="mapping" targetProject="src/main/resources">
            <property name="enableSubPackages" value="true"/>
        </sqlMapGenerator>
        <!-- 生成DAO的包名和位置-->
        <!-- XMLMAPPER生成xml映射文件, ANNOTATEDMAPPER生成的dao採用註解來寫sql -->
        <javaClientGenerator type="ANNOTATEDMAPPER" targetPackage="com.weiz.mapper" targetProject="src/main/java">
            <property name="enableSubPackages" value="true"/>
        </javaClientGenerator>
        <!-- 要生成的表 tableName是數據庫中的表名或視圖名 domainObjectName是實體類名-->
        <table tableName="sys_user" domainObjectName="User" enableCountByExample="false" enableUpdateByExample="false" enableDeleteByExample="false" enableSelectByExample="false" selectByExampleQueryId="false"></table>
    </context>
</generatorConfiguration>

注意:

這裏的配置 <javaClientGenerator type=”ANNOTATEDMAPPER” targetPackage=”com.weiz.mapper” targetProject=”src/main/java”>

type 的值很重要:
  XMLMAPPER : 表示生成xml映射文件。

  ANNOTATEDMAPPER: 表示生成的mapper 採用註解來寫sql。

 

2、數據庫User表

需要在數據庫中創建相應的表。這個表結構很簡單,就是普通的用戶表sys_user

CREATE TABLE `sys_user` (
  `id` varchar(32) NOT NULL DEFAULT '',
  `username` varchar(32) DEFAULT NULL,
  `password` varchar(64) DEFAULT NULL,
  `nickname` varchar(64) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `sex` int(11) DEFAULT NULL,
  `job` int(11) DEFAULT NULL,
  `face_image` varchar(6000) DEFAULT NULL,
  `province` varchar(64) DEFAULT NULL,
  `city` varchar(64) DEFAULT NULL,
  `district` varchar(64) DEFAULT NULL,
  `address` varchar(255) DEFAULT NULL,
  `auth_salt` varchar(64) DEFAULT NULL,
  `last_login_ip` varchar(64) DEFAULT NULL,
  `last_login_time` datetime DEFAULT NULL,
  `is_delete` int(11) DEFAULT NULL,
  `regist_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

 

3、創建GeneratorDisplay類

package com.weiz.utils;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

import org.mybatis.generator.api.MyBatisGenerator;
import org.mybatis.generator.config.Configuration;
import org.mybatis.generator.config.xml.ConfigurationParser;
import org.mybatis.generator.internal.DefaultShellCallback;

public class GeneratorDisplay {

    public void generator() throws Exception{

        List<String> warnings = new ArrayList<String>();
        boolean overwrite = true;
        //指定 逆向工程配置文件
        File configFile = new File("generatorConfig.xml"); 
        ConfigurationParser cp = new ConfigurationParser(warnings);
        Configuration config = cp.parseConfiguration(configFile);
        DefaultShellCallback callback = new DefaultShellCallback(overwrite);
        MyBatisGenerator myBatisGenerator = new MyBatisGenerator(config,
                callback, warnings);
        myBatisGenerator.generate(null);

    } 
    
    public static void main(String[] args) throws Exception {
        try {
            GeneratorDisplay generatorSqlmap = new GeneratorDisplay();
            generatorSqlmap.generator();
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }
}

這個其實也是調用mybatis generator實現的。跟mybatis generator安裝插件是一樣的。

注意:利用Generator自動生成代碼,對於已經存在的文件會存在覆蓋和在原有文件上追加的可能性,不宜多次生成。如需重新生成,需要刪除已生成的源文件。

 

4、Mybatis Generator自動生成pojo和mapper

運行GeneratorDisplay 如下圖所示,即可自動生成相關的代碼。

 

 

上圖可以看到,pojo 包裏面自動生成了User 實體對象 ,mapper包裏面生成了 UserMapper 和UserSqlProvider 類。UserMapper 是所有方法的實現。UserSqlProvider則是為UserMapper 實現動態SQL。

注意:

  UserMapper 中的所有的動態SQL腳本,都定義在類UserSqlProvider中。若要增加新的動態SQL,只需在UserSqlProvider中增加相應的方法,然後在UserMapper中增加相應的引用即可,

  如:@UpdateProvider(type=UserSqlProvider.class, method=”updateByPrimaryKeySelective”)。

 

 

三、實現增刪改查

在項目中整合了Mybatis並通過自動生成工具生成了相關的mapper和配置文件之後,下面就開始項目中的調用。這個和之前的xml 配置版也是一樣的。

1、在service包下創建UserService及UserServiceImpl接口實現類

創建com.weiz.service包,添加UserService接口類

package com.weiz.service;

import com.weiz.pojo.User;

public interface UserService {
    public int saveUser(User user);
    public int updateUser(User user);
    public int deleteUser(String userId);
    public User queryUserById(String userId);
}

創建com.weiz.service.impl包,並增加UserServiceImpl實現類,並實現增刪改查的功能,由於這個代碼比較簡單,這裏直接給出完整的代碼。

package com.weiz.service.impl;

import com.weiz.mapper.UserMapper;
import com.weiz.pojo.User;
import com.weiz.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private UserMapper userMapper;

    @Override
    public int saveUser(User user) {
        return userMapper.insertSelective(user);
    }

    @Override
    public int updateUser(User user) {
        return userMapper.updateByPrimaryKeySelective(user);
    }

    @Override
    public int deleteUser(String userId) {
        return userMapper.deleteByPrimaryKey(userId);
    }

    @Override
    public User queryUserById(String userId) {
        return userMapper.selectByPrimaryKey(userId);
    }
}

 

2、編寫controller層,增加MybatisController控制器

package com.weiz.controller;

import com.weiz.pojo.User;
import com.weiz.utils.JSONResult;
import com.weiz.service.UserService;
import org.n3r.idworker.Sid;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController
@RequestMapping("mybatis")
public class MyBatisCRUDController {

    @Autowired
    private UserService userService;

    @Autowired
    private Sid sid;

    @RequestMapping("/saveUser")
    public JSONResult saveUser() {

        String userId = sid.nextShort();
        User user = new User();
        user.setId(userId);
        user.setUsername("spring boot" + new Date());
        user.setNickname("spring boot" + new Date());
        user.setPassword("abc123");
        user.setIsDelete(0);
        user.setRegistTime(new Date());

        userService.saveUser(user);
        return JSONResult.ok("保存成功");
    }

    @RequestMapping("/updateUser")
    public JSONResult updateUser() {
        User user = new User();
        user.setId("10011001");
        user.setUsername("10011001-updated" + new Date());
        user.setNickname("10011001-updated" + new Date());
        user.setPassword("10011001-updated");
        user.setIsDelete(0);
        user.setRegistTime(new Date());

        userService.updateUser(user);
        return JSONResult.ok("保存成功");
    }


    @RequestMapping("/deleteUser")
    public JSONResult deleteUser(String userId) {
        userService.deleteUser(userId);
        return JSONResult.ok("刪除成功");
    }

    @RequestMapping("/queryUserById")
    public JSONResult queryUserById(String userId) {
        return JSONResult.ok(userService.queryUserById(userId));
    }
}

 

3、測試

在瀏覽器輸入controller裏面定義的路徑即可。只要你按照上面的步驟一步一步來,基本上就沒問題,是不是特別簡單。

 

 

最後

以上,就把Spring Boot整合Mybatis註釋版 實現增刪改查介紹完了,Spring Boot 整合Mybatis 是整個Spring Boot 非常重要的功能,也是非常核心的基礎功能,希望大家能夠熟練掌握。後面會深入介紹Spring Boot的各個功能和用法。

這個系列課程的完整源碼,也會提供給大家。大家關注我的微信公眾號(架構師精進),回復:springboot源碼。獲取這個系列課程的完整源碼。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

【大廠面試08期】談一談你對HashMap的理解?

摘要

HashMap的原理也是大廠面試中經常會涉及的問題,同時也是工作中常用到的Java容器,本文主要通過對以下問題進行分析講解,來幫助大家理解HashMap的原理。

1.HashMap添加一個鍵值對的過程是怎麼樣的?

2.為什麼說HashMap不是線程安全的?

3.為什麼要一起重寫hashCode()和equal()方法?

HashMap添加一個鍵值對的過程是怎麼樣的?

這是網上找的一張流程圖,可以結合著步驟來看這個流程圖,了解添加鍵值對的過程。

1.初始化table

判斷table是否為空或為null,否則執行resize()方法(resize方法一般是擴容時調用,也可以調用來初始化table)。

2.計算hash值

根據鍵值key計算hash值。(因為hashCode是一個int類型的變量,是4字節,32位,所以這裡會將hashCode的低16位與高16位進行一個異或運算,來保留高位的特徵,以便於得到的hash值更加均勻分佈)

static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

3.插入或更新節點

根據(n – 1) & hash計算得到插入的數組下標i,然後進行判斷

table[i]==null

那麼說明當前數組下標下,沒有hash衝突的元素,直接新建節點添加。

table[i].hash == hash &&(table[i]== key || (key != null && key.equals(table[i].key)))

判斷table[i]的首個元素是否和key一樣,如果相同直接更新value。

table[i] instanceof TreeNode

判斷table[i] 是否為treeNode,即table[i] 是否是紅黑樹,如果是紅黑樹,則直接在樹中插入鍵值對。

其他情況

上面的判斷條件都不滿足,說明table[i]存儲的是一個鏈表,那麼遍歷鏈表,判斷是否存在已有元素的key與插入鍵值對的key相等,如果是,那麼更新value,如果沒有,那麼在鏈表末尾插入一個新節點。插入之後判斷鏈表長度是否大於8,大於8的話把鏈錶轉換為紅黑樹。

4.擴容

插入成功后,判斷實際存在的鍵值對數量size是否超多了最大容量threshold(一般是數組長度*負載因子0.75),如果超過,進行擴容。

源代碼如下:

2.為什麼說HashMap不是線程安全的?

其實通過學習HashMap添加鍵值對的方法,我們可以看到整個方法內都沒有使用到鎖,所以一旦多線併發訪問,就有可能造成數據不一致的問題,

例如:

如果有兩個添加鍵值對的線程都執行到if ((tab = table) == null || (n = tab.length) == 0)這行語句,都對table變量進行數組初始化,就會造成已經初始化好的數組table被覆蓋,然後前面初始化的線程會將鍵值對添加到之前初始化的數組中去,造成鍵值對丟失。

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    // tab為空則創建 
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    ...後面的代碼省略
}

3.為什麼要一起重寫hashCode()和equal()方法?

當我們的對象一旦作為HashMap中的key,或者是HashSet中的元素使用時,就必須同時重寫hashCode()和equal()方法

首先看看hashCode()和equal()方法的默認實現

可以看到Obejct類中的源碼如下,可以看到equals()方法的默認實現是判斷兩個對象的內存地址是否相同來決定返回結果。

    public native int hashCode();
	public boolean equals(Object obj) {
        return (this == obj);
    }

網上很多博客說hashCode的默認實現是返回內存地址,其實不對,以OpenJDK為例,hashCode的默認計算方法有5種,有返回隨機數的,有返回內存地址,具體採用哪一種計算方法取決於運行時庫和JVM的具體實現。

感興趣的朋友可以看看這篇博客
https://blog.csdn.net/xusiwei1236/article/details/45152201

然後看看hashCode()方法,equal()方法在HashMap中的應用

static final int hash(Object key) {
    int h;
    //因為hashCode是一個int類型的變量,是4字節,32位,所以這裡會將hashCode的低16位與高16位進行一個異或運算,來保留高位的特徵,以便於得到的hash值更加均勻分佈
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
public V put(K key, V value) {
    return putVal(hash(key), key, value, false, true);
}

為了將一組鍵值對均勻得存儲在一個數組中,HashMap對key的hashCode進行計算得到一個hash值,用hash對數組長度取模,得到數組下標,將鍵值對存儲在數組下標對應的鏈表下(假設鏈表長度小於8,沒有達到轉換為紅黑樹的閥值)。

下面是添加鍵值對的putVal()方法,當數組下標對應的是一個鏈表時執行的代碼

//遍歷鏈表
for (int binCount = 0; ; ++binCount) {
    if ((e = p.next) == null) {//已經遍歷到鏈表末尾,說明鏈表不存在這個key
        p.next = newNode(hash, key, value, null);//在末尾添加這個鍵值對
        if (binCount >= TREEIFY_THRESHOLD - 1) //超過鏈錶轉化為紅黑樹的閥值(也急速鏈表長度》=8)
            treeifyBin(tab, hash);
        break;
    }
    if (e.hash == hash &&
        ((k = e.key) == key || (key != null && key.equals(k))))
        break;
    p = e;
}

可以清楚地看到判斷添加的key與鏈表中已存在的key是否相等的方法主要是e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k))),
也就是:
1.先判斷hash值是否相等,不相等直接結束判斷,因為hash值不相等,key肯定不相等。
2.判斷兩個key對象的內存地址是否相等(相等指向內存中同一個對象)。
3.key不為null,調用key的equal()方法判斷是否相等,因為有可能兩個key在內存中存儲的地址不一樣,但是是相等的。
就像是

String a = new String("test");
String b = new String("test");

System.out.println("a==b is "+a==b);//打印為false
System.out.println("a.equals(b) is "+a.equals(b));//打印為true

背景

假設我們有一個KeyObject類,假設我們認為兩個KeyObject的屬性a相等,那麼KeyObject就是相等的相等的,我們將KeyObject作為HashMap的key,以KeyObject是否相等作為去重標準,不能重複添加KeyObject相等,value不等的值到HashMap中去

public static class KeyObject {
    Integer a;
    public KeyObject(Integer a) {
        this.a = a;
    }
}

假設都hashCode()方法和equals()方法都不重寫(結果:HashMap無法保證去重)

執行以下代碼:

public static void main(String[] args) {
    KeyObject key1 = new KeyObject(1);
    KeyObject key2 = new KeyObject(1);
    System.out.println("key1的hashCode為"+ key1.hashCode());
    System.out.println("key2的hashCode為" + key2.hashCode());
    System.out.println("key1.equals(key2)的結果為"+(key1.equals(key2)));
    HashMap<KeyObject,String> hashMap = new HashMap<KeyObject,String>();
    hashMap.put(key1,"value1");
    hashMap.put(key2,"value2");
    //打印hashMap
    for(KeyObject key :hashMap.keySet()){
        System.out.println("KeyObject.a="+key.a+" : "+hashMap.get(key));
    }
}

如果KeyObject的hashCode()方法和equals()方法都不重寫,那麼即便KeyObject的屬性a都是1,key1和key2的hashCode都是不相同的,key1和key2調用equals()方法也不相等,這樣hashMap中就可以同時存在key1和key2了。

打印結果:

key1的hashCode為728890494
key2的hashCode為1558600329
key1.equals(key2)的結果為false
KeyObject.a=1 : value1
KeyObject.a=1 : value2

假如只重寫hashCode()方法(結果:無法正確地與鏈表元素進行相等判斷,從而無法保證去重)

執行以下代碼:

 public static class KeyObject {
    Integer a;
    public KeyObject(Integer a) {
        this.a = a;
    }

    @Override
    public int hashCode() {
        return a;
    }

    public static void main(String[] args) {
        KeyObject key1 = new KeyObject(1);
        KeyObject key2 = new KeyObject(1);
        System.out.println("key1的hashCode為"+ key1.hashCode());
        System.out.println("key2的hashCode為" + key2.hashCode());
        System.out.println("key1.equals(key2)的結果為"+(key1.equals(key2)));
        HashMap<KeyObject,String> hashMap = new HashMap<KeyObject,String>();
        hashMap.put(key1,"value1");
        hashMap.put(key2,"value2");
        for(KeyObject key :hashMap.keySet()){
            System.out.println("TestObject.a="+key.a+" : "+hashMap.get(key));
        }
    }
}

此時equal()方法的實現是默認實現,也就是當兩個對象的內存地址相等時,equal()方法才返回true,雖然key1和key2的a屬性是相同的,但是他們在內存中是不同的對象,所以key1==key2結果會是false,KeyObject的equals()方法默認實現是判斷兩個對象的內存地址,所以 key1.equals(key2)也會是false,所以這兩個鍵值對可以重複地添加到hashMap中去。

輸出結果:

key1的hashCode為1
key2的hashCode為1
key1.equals(key2)的結果為false
TestObject.a=1 : value1
TestObject.a=1 : value2

假如只重寫equals()方法(結果:映射到HashMap中不同數組下標,無法保證去重)

public static class KeyObject {
    Integer a;
    public KeyObject(Integer a) {
        this.a = a;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        KeyObject keyObject = (KeyObject) o;
        return Objects.equals(a, keyObject.a);
    }

    public static void main(String[] args) {
        KeyObject key1 = new KeyObject(1);
        KeyObject key2 = new KeyObject(1);
        System.out.println("key1的hashCode為"+ key1.hashCode());
        System.out.println("key2的hashCode為" + key2.hashCode());
        System.out.println("key1.equals(key2)的結果為"+(key1.equals(key2)));
        HashMap<KeyObject,String> hashMap = new HashMap<KeyObject,String>();
        hashMap.put(key1,"value1");
        hashMap.put(key2,"value2");
        for(KeyObject key :hashMap.keySet()){
            System.out.println("TestObject.a="+key.a+" : "+hashMap.get(key));
        }
    }
}

假設只equals()方法,hashCode方法會是默認實現,具體的計算方法取決於JVM,(測試時發現是內存地址不同但是相等的對象,它們的hashCode不相同),所以計算得到的數組下標不相同,會存儲到hashMap中不同數組下標下的鏈表中,也會導致HashMap中存在重複元素。

輸出結果如下:

key1的hashCode為1289479439
key2的hashCode為6738746
key1.equals(key2)的結果為true
TestObject.a=1 : value1
TestObject.a=1 : value2

總結

所以當我們的對象一旦作為HashMap中的key,或者是HashSet中的元素使用時,就必須同時重寫hashCode()和equal()方法,因為hashCode會影響key存儲的數組下標及與鏈表元素的初步判斷,equal()是作為判斷key與鏈表中的key是否相等的最後標準。

  • 所以只重寫hashCode()方法,會導致無法正確地與鏈表元素進行相等判斷,從而無法保證去重)
  • 只重寫equals()方法導致鍵值對映射到HashMap中不同數組下標,無法保證去重

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準

Spring Boot 把 Maven 幹掉了,擁抱 Gradle!

在國外某社交網站上有一個關於遷移 Spring Boot 遷移 Maven 至 Gradle 的帖子:

該貼子上也有很多人質疑:Maven 用的好好的,為什麼要遷移至 Gradle?

雖然該貼子只是說 Gradle 牛逼,但並沒有說遷移至 Gradle 所帶來的影響和價值。

所以,Spring Boot 官方對此也發了博文作了解釋:

https://spring.io/blog/2020/06/08/migrating-spring-boot-s-build-to-gradle

棧長簡單概括一下。

沒錯,Spring Boot 做了一個重大調整:

在 Spring Boot 2.3.0.M1 中,將首次使用 Gradle 代替 Maven 來構建 Spring Boot 項目。

為什麼要遷移?

Spring Boot 團隊給出的主要原因是,遷移至 Gradle 可以減少構建項目所花費的時間

因為使用 Maven 構建,回歸測試時間太長了,等待項目構建大大增加了修復 bug 和實現新特性的時間。

而 Gradle 的宗旨是減少構建工作量,它可以根據需要構建任何有變化的地方或者并行構建。

當然,Spring Boot 團隊也花了很多時間來嘗試用 Maven 進行 并行構建,但因為構建 Spring Boot 項目的複雜性,最終失敗了。

另外,Spring Boot 團隊也看到了在其他 Spring 項目中使用 Gradle 以及并行構建所帶來的提升,並且還可以使用 Gradle 在一些第三方項目上的構建緩存,這些優勢都促使 Gradle 帶到構建 Spring Boot 項目中來。

遷移有什麼好處?

棧長使用 Maven,哪怕只改一個代碼也是構建全部,構建項目確實要花不少時間。

Spring Boot 官方也給出了數據,一次完整的 Maven 項目構建一般需要一個小時或者以上,而在過去的 4 周時間內,使用 Gradle 構建的平均時間只用了 9 分 22 秒!!!

如下面截圖所示:

光從構建時間來看,效率真是倍數級的。

https://github.com/spring-projects/spring-boot/tree/v2.3.0.RELEASE

棧長特意去看了下,在 Spring Boot 2.2.8 中使用的是 Maven:

而最新發布的 Spring Boot 2.3.1 已經是切換到 Gradle 了:

會帶來什麼影響?

也許會有小夥伴質疑,Spring Boot 遷移到了 Gradle,會不會對公司現有的 Maven 項目或者後續的版本升級造成影響?

如果你只是使用 Spring Boot 框架來搭建系統,那還是可以繼續使用 Maven 來管理依賴的,Spring Boot 會繼續在 Maven 中央倉庫提交。

如下面所示:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot</artifactId>
    <version>2.3.1.RELEASE</version>
</dependency>

因為當版本確定之後,這個 Maven 構建只是一次性的,不會影響 Spring Boot 團隊的日常迭代效率。

但是,如果我們需要在本地構建 Spring Boot 源碼,或者你正在學習最新 Spring Boot 源碼,就需要掌握 Gradle 構建了。

題外話,Gradle 肯定是未來的趨勢,但也不一定非得遷移至 Gradle,只有適合自己的才是最好的,畢竟現在 Maven 和 Gradle 都是主流,但是 Maven 更佔有市場,很多主流開源項目都是以 Maven 依賴來作為示例演示的。

棧長也會陸續關注 Spring Boot 動態,後續也會給大家帶來各方面的教程,獲取歷史教程可以在Java技術棧公眾號後台回復:boot,掌握 Spring Boot 問題不大。

學習、從不止步。

推薦去我的博客閱讀更多:

1.Java JVM、集合、多線程、新特性系列教程

2.Spring MVC、Spring Boot、Spring Cloud 系列教程

3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程

4.Java、後端、架構、阿里巴巴等大廠最新面試題

覺得不錯,別忘了點贊+轉發哦!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

RocketMQ系列(七)事務消息(數據庫|最終一致性)

終於到了今天了,終於要講RocketMQ最牛X的功能了,那就是事務消息。為什麼事務消息被吹的比較熱呢?近幾年微服務大行其道,整個系統被切成了多個服務,每個服務掌管着一個數據庫。那麼多個數據庫之間的數據一致性就成了問題,雖然有像XA這種強一致性事務的支持,但是這種強一致性在互聯網的應用中並不適合,人們還是更傾向於使用最終一致性的解決方案,在最終一致性的解決方案中,使用MQ保證各個系統之間的數據一致性又是首選。

RocketMQ為我們提供了事務消息的功能,它使得我們投放消息和其他的一些操作保持一個整體的原子性。比如:向數據庫中插入數據,再向MQ中投放消息,把這兩個動作作為一個原子性的操作。貌似其他的MQ是沒有這種功能的。

但是,縱觀全網,講RocketMQ事務消息的博文中,幾乎沒有結合數據庫的,都是直接投放消息,然後講解事務消息的幾個狀態,雖然講的也沒毛病,但是和項目中事務最終一致性的落地方案還相距甚遠。包括我自己在內,在項目中,服務化以後,用MQ保證事務的最終一致性,在網上一搜,根本沒有落地的方案,都是侃侃而談。於是,我寫下這篇博文,結合數據庫,來談一談RocketMQ的事務消息到底怎麼用。

基礎概念

要使用RocketMQ的事務消息,要實現一個TransactionListener的接口,這個接口中有兩個方法,如下:

/**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

/**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
LocalTransactionState checkLocalTransaction(final MessageExt msg);

RocketMQ的事務消息是基於兩階段提交實現的,也就是說消息有兩個狀態,prepared和commited。當消息執行完send方法后,進入的prepared狀態,進入prepared狀態以後,就要執行executeLocalTransaction方法,這個方法的返回值有3個,也決定着這個消息的命運,

  • COMMIT_MESSAGE:提交消息,這個消息由prepared狀態進入到commited狀態,消費者可以消費這個消息;
  • ROLLBACK_MESSAGE:回滾,這個消息將被刪除,消費者不能消費這個消息;
  • UNKNOW:未知,這個狀態有點意思,如果返回這個狀態,這個消息既不提交,也不回滾,還是保持prepared狀態,而最終決定這個消息命運的,是checkLocalTransaction這個方法。

當executeLocalTransaction方法返回UNKNOW以後,RocketMQ會每隔一段時間調用一次checkLocalTransaction,這個方法的返回值決定着這個消息的最終歸宿。那麼checkLocalTransaction這個方法多長時間調用一次呢?我們在BrokerConfig類中可以找到,

 /**
  * Transaction message check interval.
  */
@ImportantField
private long transactionCheckInterval = 60 * 1000;

這個值是在brokder.conf中配置的,默認值是60*1000,也就是1分鐘。那麼會檢查多少次呢?如果每次都返回UNKNOW,也不能無休止的檢查吧,

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 5;

這個是檢查的最大次數,超過這個次數,如果還返回UNKNOW,這個消息將被刪除。

事務消息中,TransactionListener這個最核心的概念介紹完后,我們看看代碼如何寫吧。

落地案例

我們在數據庫中有一張表,具體如下:

CREATE TABLE `s_term` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `term_year` year(4) NOT NULL ,
  `type` int(1) NOT NULL DEFAULT '1' ,
  PRIMARY KEY (`id`)
) 

字段的具體含義大家不用管,一會我們將向這張表中插入一條數據,並且向MQ中投放消息,這兩個動作是一個原子性的操作,要麼全成功,要麼全失敗。

我們先來看看事務消息的客戶端的配置,如下:

@Bean(name = "transactionProducer",initMethod = "start",destroyMethod = "shutdown")
public TransactionMQProducer transactionProducer() {
    TransactionMQProducer producer = new
        TransactionMQProducer("TransactionMQProducer");
    producer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    producer.setTransactionListener(transactionListener());
    return producer;
}

@Bean
public TransactionListener transactionListener() {
    return new TransactionListenerImpl();
}

我們使用TransactionMQProducer生命生產者的客戶端,並且生產者組的名字叫做TransactionMQProducer,後面NameServer的地址沒有變化。最後就是設置了一個TransactionListener監聽器,這個監聽器的實現我們也定義了一個Bean,返回的是我們自定義的TransactionListenerImpl,我們看看裡邊怎麼寫的吧。

public class TransactionListenerImpl implements TransactionListener {
    @Autowired
    private TermMapper termMapper;

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        Integer termId = (Integer)arg;
        Term term = termMapper.selectById(termId);
        System.out.println("executeLocalTransaction termId="+termId+" term:"+term);
        if (term != null) return COMMIT_MESSAGE;

        return LocalTransactionState.UNKNOW;
    }

	@Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String termId = msg.getKeys();
        Term term = termMapper.selectById(Integer.parseInt(termId));
        System.out.println("checkLocalTransaction termId="+termId+" term:"+term);
        if (term != null) {
            System.out.println("checkLocalTransaction:COMMIT_MESSAGE");
            return COMMIT_MESSAGE;
        }
        System.out.println("checkLocalTransaction:ROLLBACK_MESSAGE");
        return ROLLBACK_MESSAGE;
    }
}

在這個類中,我們要實現executeLocalTransaction和checkLocalTransaction兩個方法,其中executeLocalTransaction是在執行完send方法后立刻執行的,裡邊我們根據term表的id去查詢,如果能夠查詢出結果,就commit,消費端可以消費這個消息,如果查詢不到,就返回一個UNKNOW,說明過一會會調用checkLocalTransaction再次檢查。在checkLocalTransaction方法中,我們同樣用termId去查詢,這次如果再查詢不到就直接回滾了。

好了,事務消息中最重要的兩個方法都已經實現了,我們再來看看service怎麼寫吧,

@Autowired
private TermMapper termMapper;
@Autowired
@Qualifier("transactionProducer")
private TransactionMQProducer producer;

@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
    Term term = new Term();
    term.setTermYear(2020);
    term.setType(1);
    int insert = termMapper.insert(term);

    Message message = new Message();
    message.setTopic("cluster-topic");
    message.setKeys(term.getId()+"");
    message.setBody(new String("this is transaction mq "+new Date()).getBytes());

    TransactionSendResult sendResult = producer
        .sendMessageInTransaction(message, term.getId());
    System.out.println("sendResult:"+sendResult.getLocalTransactionState() 
                       +" 時間:"+new Date());
}
  • 在sendTransactionMQ方法上,我們使用了@Transactional註解,那麼在這個方法中,發生任何的異常,數據庫事務都會回滾;
  • 然後,我們創建Term對象,向數據庫中插入Term;
  • 構建Mesaage的信息,將termId作為message的key;
  • 使用sendMessageInTransaction發送消息,傳入message和termId,這兩個參數和executeLocalTransaction方法的入參是對應的。

最後,我們在test方法中,調用sendTransactionMQ方法,如下:

@Test
public void sendTransactionMQ() throws InterruptedException {
    try {
        transactionService.sendTransactionMQ();
    } catch (Exception e) {
        e.printStackTrace();
    }

    Thread.sleep(600000);
}

整個生產端的代碼就是這些了,消費端的代碼沒有什麼變化,就不給大家貼出來了。接下來,我們把消費端的應用啟動起來,消費端的應用最好不要包含生產端的代碼,因為TransactionListener實例化以後,就會進行監聽,而我們在消費者端是不希望看到TransactionListener中的日誌的。

我們運行一下生產端的代碼,看看是什麼情況,日誌如下:

executeLocalTransaction termId=15 term:com.example.rocketmqdemo.entity.Term@4a3509b0
sendResult:COMMIT_MESSAGE 時間:Wed Jun 17 08:56:49 CST 2020
  • 我們看到,先執行的是executeLocalTransaction這個方法,termId打印出來了,發送的結果也出來了,是COMMIT_MESSAGE,那麼消費端是可以消費這個消息的;
  • 注意一下兩個日誌的順序,先執行的executeLocalTransaction,說明在執行sendMessageInTransaction時,就會調用監聽器中的executeLocalTransaction,它的返回值決定着這個消息是否真正的投放到隊列中;

再看看消費端的日誌,

msgs.size():1
this is transaction mq Wed Jun 17 08:56:49 CST 2020

消息被正常消費,沒有問題。那麼數據庫中有沒有termId=15的數據呢?我們看看吧,

數據是有的,插入數據也是成功的。

這樣使用就真的正確的嗎?我們改一下代碼看看,在service方法中拋個異常,讓數據庫的事務回滾,看看是什麼效果。改動代碼如下:

@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
    ……
    throw new Exception("數據庫事務異常");
}

拋出異常后,數據庫的事務會回滾,那麼MQ呢?我們再發送一個消息看看,

生產端的日誌如下:

executeLocalTransaction termId=16 term:com.example.rocketmqdemo.entity.Term@5d6b5d3d
sendResult:COMMIT_MESSAGE 時間:Wed Jun 17 09:07:15 CST 2020

java.lang.Exception: 數據庫事務異常
  • 從日誌中,我們可以看到,消息是投放成功的,termId=16,事務的返回狀態是COMMIT_MESSAGE;
  • 最後拋出了我們定義的異常,那麼數據庫中應該是不存在這條消息的啊;

我們先看看數據庫吧,

數據庫中並沒有termId=16的數據,那麼數據庫的事務是回滾了,而消息是投放成功的,並沒有保持原子性啊。那麼為什麼在執行executeLocalTransaction方法時,能夠查詢到termId=16的數據呢?還記得MySQL的事務隔離級別嗎?忘了的趕快複習一下吧。在事務提交前,我們是可以查詢到termId=16的數據的,所以消息提交了,看看消費端的情況,

msgs.size():1
this is transaction mq Wed Jun 17 09:07:15 CST 2020

消息也正常消費了,這明顯不符合我們的要求,我們如果在微服務之間使用這種方式保證數據的最終一致性,肯定會有大麻煩的。那我們該怎麼使用s呢?我們可以在executeLocalTransaction方法中,固定返回UNKNOW,數據插入數據庫成功也好,失敗也罷,我們都返回UNKNOW。那麼這個消息是否投放到隊列中,就由checkLocalTransaction決定了。checkLocalTransaction肯定在sendTransactionMQ后執行,而且和sendTransactionMQ不在同一事務中。我們改一下程序吧,

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    return LocalTransactionState.UNKNOW;
}

其他的地方不用改,我們再發送一下消息,

sendResult:UNKNOW 時間:Wed Jun 17 09:56:59 CST 2020
java.lang.Exception: 數據庫事務異常

checkLocalTransaction termId=18 term:null
checkLocalTransaction:ROLLBACK_MESSAGE
  • 事務消息發送的結果是UNKNOW,然後拋出異常,事務回滾;
  • checkLocalTransaction方法,查詢termId=18的數據,為null,消息再回滾;

又看了一下消費端,沒有日誌。數據庫中也沒有termId=18的數據,這才符合我們的預期,數據庫插入不成功,消息投放不成功。我們再把拋出異常的代碼註釋掉,看看能不能都成功。

@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
    ……
    //throw new Exception("數據庫事務異常");
}

再執行一下發送端程序,日誌如下:

sendResult:UNKNOW 時間:Wed Jun 17 10:02:57 CST 2020
checkLocalTransaction termId=19 term:com.example.rocketmqdemo.entity.Term@3b643475
checkLocalTransaction:COMMIT_MESSAGE
  • 發送結果返回UNKNOW;
  • checkLocalTransaction方法查詢termId=19的數據,能夠查到;
  • 返回COMMIT_MESSAGE,消息提交到隊列中;

先看看數據庫中的數據吧,

termId=19的數據入庫成功了,再看看消費端的日誌,

msgs.size():1
this is transaction mq Wed Jun 17 10:02:56 CST 2020

消費成功,這才符合我們的預期。數據插入數據庫成功,消息投放隊列成功,消費消息成功。

總結

事務消息最重要的就是TransactionListener接口的實現,我們要理解executeLocalTransaction和checkLocalTransaction這兩個方法是干什麼用的,以及它們的執行時間。再一個就是和數據庫事務的結合,數據庫事務的隔離級別大家要知道。把上面這幾點掌握了,就可以靈活的使用RocketMQ的事務消息了。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

從0到1學會logstash的玩法(ELK)

本篇文章採用的採用的是logstash7.7.0版本,主要從如下幾個方面介紹

1、logstash是什麼,可以用來幹啥

2、logstash的基本原理是什麼

3、怎麼去玩這個elk的組件logstash

一、logstash是什麼,有哪些作用

1.1、概念

官方概念:Logstash是免費且開放的服務器端數據處理管道,能夠從多個來源採集數據,轉換數據,然後將數據發送到您最喜歡的“存儲庫”中。

1.2、功能

Logstash能夠動態地採集、轉換和傳輸數據,不受格式或複雜度的影響。利用Grok從非結構化數據中派生出結構,從IP地址解碼出地理坐標,匿名化或排除敏感字段,並簡化整體處理過程。

採集數據:

  能夠採集各種樣式、大小和來源的數據往往以各種各樣的形式,比如log日誌,收集redis、kafka等熱門分佈式技術的數據,並且還可以收集實現了java的JMS規範的消息中心的數據,或分散或集中地存在於很多系統中。Logstash支持各種輸入選擇,可以同時從眾多常用來源捕捉事件。能夠以連續的流式傳輸方式,輕鬆地從日誌、指標、Web應用、數據存儲以及各種AWS服務採集數據

 

 

 

解析數據和轉換:

  數據從源傳輸到存儲庫的過程中,Logstash過濾器能夠解析各個事件,識別已命名的字段以構建結構,並將它們轉換成通用格式,以便進行更強大的分析和實現商業價值。
Logstash能夠動態地轉換和解析數據,不受格式或複雜度的影響:

  • 利用Grok從非結構化數據中解析出結構數據
  • 從IP地址破譯出地理坐標
  • 將PII數據匿名化,完全排除敏感字段
  • 簡化整體處理,不受數據源、格式或架構的影響

  數據輸入端從各種數據源收集到的數據可能會有很多不是我們想要的,這時我們可以給Logstash定義過濾器,過濾器可以定義多個,它們依次執行,最終把我們想要的數據過濾出來,然後把這些數據解析成目標數據庫,如elasticsearch等能支持的數據格式存儲數據。

 

數據轉存:

  選擇好存儲庫,導出數據到存儲庫進行存儲,儘管Elasticsearch是我們的首選輸出方向,能夠為我們的搜索和分析帶來無限可能,但它並非唯一選擇。Logstash提供眾多輸出選擇,可以將數據發送到您要指定的地方,比如redis、kafka等

 

 

 

 

二、logstash的基本原理

  logstash分為三個步驟:inputs(必須的)→ filters(可選的)→ outputs(必須的),inputs生成時間,filters對其事件進行過濾和處理,outputs輸出到輸出端或者決定其存儲在哪些組件里。inputs和outputs支持編碼和解碼。

 

執行模型:

Logstash是協調inputs、filters和outputs執行事件處理的管道。

  Logstash管道中的每個input階段都在自己的線程中運行。將寫事件輸入到內存(默認)或磁盤上的中心隊列。每個管道工作線程從該隊列中取出一批事件,通過配置的filter處理該批事件,然後通過output輸出到指定的組件存儲。管道處理數據量的大小和管道工作線程的數量是可配置的

  默認情況下,Logstash使用管道階段(input→filter和filter→output)之間的內存限制隊列來緩衝事件。如果Logstash不安全地終止,存儲在內存中的所有事件都將丟失。為了幫助防止數據丟失,可以啟用Logstash將飛行中的事件持久化到磁盤。有關詳細信息,請參閱持久隊列https://www.elastic.co/guide/en/logstash/current/persistent-queues.html

如下是目前logstash7.7.0支持的inputs、outputs、filters

inputs:

azure_event_hubs,beats,cloudwatch,couchdb_changes,dead_letter_queue,elasticsearch,exec,file,ganglia,gelf,generator,github,google_cloud_storage,google_pubsub,graphite,heartbeat,http,http_poller,imap,irc,java_generator,java_stdin,jdbc,jms,jmx,kafka,kinesis,log4j,lumberjack,meetup,pipe,puppet_facter,rabbitmq,redis,relp,rss,s3,s3-sns-sqs,salesforce,snmp,snmptrap,sqlite,sqs,stdin,stomp,syslog,tcp,twitter,udp,unix,varnishlog,websocket,wmi,xmpp

outputs:

boundary, circonus, cloudwatch, csv, datadog, datadog_metrics, elastic_app_search, elasticsearch, email, exec, file, ganglia, gelf, google_bigquery, google_cloud_storage, google_pubsub, graphite, graphtastic, http, influxdb, irc, sink, java_stdout, juggernaut, kafka, librato, loggly, lumberjack, metriccatcher, mongodb, nagios, nagios_nsca, opentsdb, pagerduty, pipe, rabbitmq, redis, redmine, riak, riemann, s3, sns, solr_http, sqs, statsd, stdout, stomp, syslog, tcp, timber, udp, webhdfs, websocket, xmpp, zabbix

filters:

aggregate, alter, bytes, cidr, cipher, clone, csv, date, de_dot, dissect, dns, drop, elapsed, elasticsearch, environment, extractnumbers, fingerprint, geoip, grok, http, i18n, java_uuid, jdbc_static, jdbc_streaming, json, json_encode, kv, memcached, metricize, metrics, mutate, prune, range, ruby, sleep, split, syslog_pri, threats_classifier, throttle, tld, translate, truncate, urldecode, useragent, uuid, xml

三、玩一玩logstash

3.1、壓縮包方式安裝

下載地址1:https://www.elastic.co/cn/downloads/logstash   

下載地址2:https://elasticsearch.cn/download/

這裏需要安裝jdk,我使用的是elasticsearch7.7.0自帶的jdk:

解壓即安裝:

tar -zxvf logstash-7.7.0.tar.gz

來個logstash版本的HelloWorld:

./bin/logstash -e 'input { stdin { } } output { stdout {} }'

 

 

 

3.2、logstash配置文件

 logstash.yml:包含Logstash配置標誌。您可以在此文件中設置標誌,而不是在命令行中傳遞標誌。在命令行上設置的任何標誌都會覆蓋logstash中的相應設置

 pipelines.yml:包含在單個Logstash實例中運行多個管道的框架和指令。

 jvm.options:包含JVM配置標誌。使用此文件設置總堆空間的初始值和最大值。您還可以使用此文件為Logsta設置語言環境

 log4j2.properties:包含log4j 2庫的默認設置

 start.options (Linux):用於配置啟動服務腳本

logstash.yml文件詳解:

node.name  #默認主機名,該節點的描述名字
path.data  #LOGSTASH_HOME/data ,Logstash及其插件用於任何持久需求的目錄
pipeline.id #默認main,pipeline的id
pipeline.java_execution #默認true,使用java執行引擎
pipeline.workers #默認為主機cpu的個數,表示并行執行管道的過濾和輸出階段的worker的數量
pipeline.batch.size #默認125 表示單個工作線程在嘗試執行過濾器和輸出之前從輸入中收集的最大事件數
pipeline.batch.delay #默認50 在創建管道事件時,在將一個小批分派給管道工作者之前,每個事件需要等待多長時間(毫秒)
pipeline.unsafe_shutdown  #默認false,當設置為true時,即使內存中仍有運行的事件,強制Logstash在關閉期間將會退出。默認情況下,Logstash將拒絕退出,直到所有接收到的事件都被推入輸出。啟用此選項可能導致關機期間數據丟失
pipeline.ordered #默認auto,設置管道事件順序。true將強制對管道進行排序,如果有多個worker,則阻止logstash啟動。如果為false,將禁用維持秩序所需的處理。訂單順序不會得到保證,但可以節省維護訂單的處理成本
path.config #默認LOGSTASH_HOME/config  管道的Logstash配置的路徑
config.test_and_exit #默認false,設置為true時,檢查配置是否有效,然後退出。請注意,使用此設置不會檢查grok模式的正確性
config.reload.automatic #默認false,當設置為true時,定期檢查配置是否已更改,並在更改時重新加載配置。這也可以通過SIGHUP信號手動觸發
config.reload.interval  #默認3s ,檢查配置文件頻率
config.debug #默認false 當設置為true時,將完全編譯的配置显示為調試日誌消息
queue.type #默認memory ,用於事件緩衝的內部排隊模型。為基於內存中的遺留隊列指定內存,或為基於磁盤的脫機隊列(持久隊列)指定持久內存
path.queue #默認path.data/queue  ,在啟用持久隊列時存儲數據文件的目錄路徑
queue.page_capacity #默認64mb ,啟用持久隊列時(隊列),使用的頁面數據文件的大小。隊列數據由分隔為頁面的僅追加數據文件組成
queue.max_events #默認0,表示無限。啟用持久隊列時,隊列中未讀事件的最大數量
queue.max_bytes  #默認1024mb,隊列的總容量,以字節為單位。確保磁盤驅動器的容量大於這裏指定的值
queue.checkpoint.acks #默認1024,當啟用持久隊列(隊列)時,在強制執行檢查點之前被隔離的事件的最大數量
queue.checkpoint.writes #默認1024,當啟用持久隊列(隊列)時,強制執行檢查點之前的最大寫入事件數
queue.checkpoint.retry #默認false,啟用后,對於任何失敗的檢查點寫,Logstash將對每個嘗試的檢查點寫重試一次。任何後續錯誤都不會重試。並且不推薦使用,除非是在那些特定的環境中
queue.drain #默認false,啟用后,Logstash將等待,直到持久隊列耗盡,然後關閉
path.dead_letter_queue#默認path.data/dead_letter_queue,存儲dead-letter隊列的目錄
http.host #默認"127.0.0.1" 表示endpoint REST端點的綁定地址。
http.port #默認9600 表示endpoint REST端點的綁定端口。
log.level #默認info,日誌級別fatal,error,warn,info,debug,trace,
log.format #默認plain 日誌格式
path.logs  #默認LOGSTASH_HOME/logs 日誌目錄

3.3、keystore

keystore可以保護一些敏感的信息,使用變量的方式替代,比如使用ES_PWD代替elasticsearch的密碼,可以通過${ES_PWD}來獲取elasticsearch的密碼,這樣就是的密碼不再是明文密碼。

./bin/logstash-keystore create   #創建一個keyword
 ./bin/logstash-keystore add ES_PWD #創建一個elastic的passwd,然後通過${ES_PWD}使用該密碼
 ./bin/logstash-keystore list  #查看已經設置好的鍵值對
 ./bin/logstash-keystore remove ES_PWD #刪除在keyword中的key

例如:

 

 

 3.4、logstash命令

注意:參數和logstash.yml配置文件對應(這裏不詳解,請查看3.2節)

-n, --node.name NAME          
-f, --path.config CONFIG_PATH 
-e, --config.string CONFIG_STRING
--field-reference-parser MODE 
--modules MODULES             
-M, --modules.variable MODULES_VARIABLE
--setup                      
--cloud.id CLOUD_ID                                            
--cloud.auth CLOUD_AUTH       
--pipeline.id ID              
-w, --pipeline.workers COUNT 
--pipeline.ordered ORDERED   
--java-execution                                               
--plugin-classloaders         
-b, --pipeline.batch.size SIZE 
-u, --pipeline.batch.delay DELAY_IN_MS 
--pipeline.unsafe_shutdown    
--path.data PATH              
-p, --path.plugins PATH       
-l, --path.logs PATH         
--log.level LEVEL             
--config.debug                
-i, --interactive SHELL      
-V, --version                 
-t, --config.test_and_exit    
-r, --config.reload.automatic 
--config.reload.interval 
--http.host HTTP_HOST         
--http.port HTTP_PORT         
--log.format FORMAT           
--path.settings SETTINGS_DIR

3.5、logstash配置文件的格式和value type

1、logstash配置文件的格式如下:

輸入,解析過濾,輸出,其中filter不是必須的,其他兩個是必須的。

input {
  ...
}

filter {
  ...
}

output {
  ...
}

2、value types(logstash支持的數據類型)

array:數組可以是單個或者多個字符串值。

users => [ {id => 1, name => bob}, {id => 2, name => jane} ]

Lists:集合

path => [ "/var/log/messages", "/var/log/*.log" ]
uris => [ "http://elastic.co", "http://example.net" ]

Boolean:true 或者false

ssl_enable => true

Bytes:字節類型

my_bytes => "1113"   # 1113 bytes
my_bytes => "10MiB"  # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes

Codec:編碼類型

codec => "json"

Hash:哈希(散列)

match => {
  "field1" => "value1"
  "field2" => "value2"
  ...
}
# or as a single line. No commas between entries:
match => { "field1" => "value1" "field2" => "value2" }

Number:数字類型

port => 33

Password:密碼類型

my_password => "password"

URI:uri類型

my_uri => "http://foo:bar@example.net"

Path: 路徑類型

my_path => "/tmp/logstash"

String:字符串類型,字符串必須是單個字符序列。注意,字符串值被括在雙引號或單引號中

3.6、logstash的權限配置

配置賬號,一個種是role,一種是user,配置方式有兩種,一種是通過elasticsearch的API配置,一種是通過kibana配置:

第一種方式:通過elasticsearch的API配置:

#添加一個logstash_writer的角色
POST _xpack/security/role/logstash_writer
{
  "cluster": ["manage_index_templates", "monitor", "manage_ilm"], 
  "indices": [
    {
      "names": [ "logstash-*" ],  #索引的模式匹配
      "privileges": ["write","create","delete","create_index","manage","manage_ilm"]   #權限內容
    }
  ]
}

#添加一個有logstash_writer角色權限的用戶:logstash_internal
POST _xpack/security/user/logstash_internal
{
  "password" : "x-pack-test-password",
  "roles" : [ "logstash_writer"],  #分配角色
  "full_name" : "Internal Logstash User"
}

#添加一個logstash_reader角色,只有read權限
POST _xpack/security/role/logstash_reader
{
  "indices": [
    {
      "names": [ "logstash-*" ], 
      "privileges": ["read","view_index_metadata"]
    }
  ]
}

#添加一個有logstash_reader角色權限的用戶:logstash_user
POST _xpack/security/user/logstash_user
{
  "password" : "x-pack-test-password",
  "roles" : [ "logstash_reader", "logstash_admin"], 
  "full_name" : "Kibana User for Logstash"
}

第二種:通過kibana的界面配置

Management > Roles
Management > Users

 

 

 權限選擇見elasticsearch官網:

https://www.elastic.co/guide/en/elasticsearch/reference/current/authorization.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/security-privileges.html

 3.7、多管道配置

  如果需要在同一個進程中運行多個管道,Logstash提供了一種通過名為pipelines.yml的配置文件來實現此目的的方法。

例如:

- pipeline.id: my-pipeline_1
  path.config: "/etc/path/to/p1.config"
  pipeline.workers: 3
- pipeline.id: my-other-pipeline
  path.config: "/etc/different/path/p2.cfg"
  queue.type: persisted

  該文件在YAML文件格式中,並包含一個字典列表,其中每個字典描述一個管道,每個鍵/值對指定該管道的設置。該示例展示了通過id和配置路徑描述的兩個不同管道。對於第一個管道,為pipeline.workers的值設置為3,而在另一个中,持久隊列特性被啟用。未在pipelines.yml顯式設置的值。yml文件將使用到logstash中指定的默認值。
  當啟動Logstash不帶參數時,它將讀取管道pipelines.yml。yml文件並實例化文件中指定的所有管道。另一方面,當使用-e或-f時,Logstash會忽略管道。

注意:

  • 如果當前配置的事件流不共享相同的輸入/過濾器和輸出,並且使用標記和條件將它們彼此分離,這顯得多個管道尤其重要
  • 在單個實例中擁有多個管道還允許這些事件流具有不同的性能和持久性參數(例如,pipeline.workers和persistent queues的不同設置)。這種分離意味着一條管道中的阻塞輸出不會對另一條管道產生反壓力。
  • 考慮管道之間的資源競爭是很重要的,因為默認值是針對單個管道調優的。因此,例如,考慮減少每個管道使用pipeline.worker的數量,因為每個管道在默認情況下每個CPU核使用一個worker。
  • 每個管道都隔離持久隊列和死信隊列,它們的位置命名空間為pipeline.id的值

各管道之間的通信原理:https://www.elastic.co/guide/en/logstash/current/pipeline-to-pipeline.html,有興趣的可以了解下。

3.8、配置的重新加載

在我們運行logstash的過程,不想停掉logstash進程,但是又想修改配置,就可以使用到配置的重新加載了,有兩種方式。

第一種是在啟動的時候指定參數:

bin/logstash -f apache.config --config.reload.automatic

Logstash每3秒檢查一次配置更改。要更改此間隔,請使用–config.reload.interval <interval>選項,其中interval指定Logstash檢查配置文件更改的頻率(以秒為單位),請注意,必須使用單位限定符(s)

第二種就是強制加載配置文件

kill -SIGHUP pid  #pid為logstash的pid

自動配置重新加載配置注意點:

  • 當Logstash檢測到配置文件中的更改時,它將通過停止所有輸入來停止當前管道,並嘗試創建使用更新后的配置的新管道。驗證新配置的語法后,Logstash驗證所有輸入和輸出都可以初始化(例如,所有必需的端口都已打開)。如果檢查成功,則Logstash會將現有管道與新管道交換。如果檢查失敗,舊管道將繼續運行,並且錯誤將傳播到控制台。
  • 在自動重新加載配置期間,不會重新啟動JVM。管道的創建和交換都在同一過程中進行。
  • 對grok模式文件的更改也將重新加載,但僅在配置文件中的更改觸發重新加載(或重新啟動管道)時。

3.9、常用filter介紹

1、grok

注:grok插件是一個十分耗費資源的插件

官網:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

Grok是將非結構化日誌數據解析為結構化和可查詢內容的好方法,非常適合syslog日誌,apache和其他Web服務器日誌,mysql日誌等等

首先官方提供了120中的匹配模式(但是我一直都沒打開這個網址):https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

還有一個用來驗證自定義的解析是否正確的一個網址:http://grokdebug.herokuapp.com/

既然我沒能打開官方提供的120個的模式匹配,從一片博客中找到了一部分的匹配模式(如下):https://blog.csdn.net/cui929434/article/details/94390617

USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b

POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}

# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
HOST %{HOSTNAME}
IPORHOST (?:%{HOSTNAME}|%{IP})
HOSTPORT %{IPORHOST}:%{POSINT}

# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}

# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

# Shortcuts
QS %{QUOTEDSTRING}

# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)

grok內置的一些匹配模式

基礎語法,一種是自帶的模式,一種是自定義的模式:

自帶的模式語法:  %{SYNTAX:SEMANTIC}

SYNTAX是將匹配文本模式的名稱,grok自帶的那些匹配模式名
SEMANTIC是你給一段文字的標識相匹配該匹配模式匹配到的內容,相當於一個字段名

例如:

%{NUMBER:duration} %{IP:client}

比如要解析如下的日誌:

55.3.244.1 GET /index.html 15824 0.043

就可以使用該匹配模式去匹配:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

得出的結果如下:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

 

自定義的模式語法:(?<field_name>the pattern here)

例如:

(?<queue_id>[0-9A-F]{10,11})  #表示10-11個字符的16進制

我們可以創建一個目錄pattters,把我自定義的模式添加進去,在使用的時候就可以使用grok自帶的匹配模式的語法,例如:

我們在./patterns/postfix文件中添加如下內容
POSTFIX_QUEUEID [0-9A-F]{10,11}

如上我們就定義了一個匹配模式了,可以使用如下的方式使用。假如我們有如下的日誌格式:

Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

然後我們對其就行解析:

filter {
      grok {
        patterns_dir => ["./patterns"]  #指定自定義的匹配模式路徑
        match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
      }
    }

解析出的結果如下:

timestamp: Jan  1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

 grok的配置選項

break_on_match
值類型是布爾值
默認是true
描述:match可以一次設定多組,預設會依照順序設定處理,如果日誌滿足設定條件,則會終止向下處理。但有的時候我們會希望讓Logstash跑完所有的設定,這時可以將break_on_match設為false。

keep_empty_captures
值類型是布爾值
默認值是 false
描述:如果為true,捕獲失敗的字段將設置為空值

match
值類型是數組
默認值是 {}
描述:字段值的模式匹配
例如:
filter {
  grok { match => { "message" => "Duration: %{NUMBER:duration}" } }
}
#如果你需要針對單個字段匹配多個模式,則該值可以是一組,例如:
filter {
  grok { match => { "message" => [ "Duration: %{NUMBER:duration}", "Speed: %{NUMBER:speed}" ] } }
}

named_captures_only
值類型是布爾值
默認值是 true
描述:如果設置為true,則僅存儲來自grok的命名捕獲

overwrite
值類型是 array
默認是[]
描述:覆蓋已經存在的字段內容
例如:

filter {
  grok {
    match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
    overwrite => [ "message" ]
  }
}
如果日誌是May 29 16:37:11 sadness logger: hello world經過match屬性match => { “message” => “%{SYSLOGBASE} %{DATA:message}” }處理后,message的值變成了hello world。這時如果使用了overwrite => [ “message” ]屬性,那麼原來的message的值將被覆蓋成新值。

pattern_definitions
值類型是 數組
默認值是 {}
描述:模式名稱和模式正則表達式,也是用於定義當前過濾器要使用的自定義模式。匹配現有名稱的模式將覆蓋預先存在的定義。可以將此視為僅適用於grok定義的內聯模式,patterns_dir是將模式寫在外部。
例如:
filter {
    grok {
        patterns_dir => "/usr/local/elk/logstash/patterns"
        pattern_definitions => {"MYSELFTIMESTAMP" => "20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND})"}
        match => {"message" => ["%{MYSELFTIMESTAMP:timestamp} %{JAVACLASS:message}","%{MYSELF:content}"]}
    }
}

patterns_dir
值類型是數組
默認值是 []
描述:一些複雜的正則表達式,不適合直接寫到filter中,可以指定一個文件夾,用來專門保存正則表達式的文件,需要注意的是該文件夾中的所有文件中的正則表達式都會被依次加載,包括備份文件。

patterns_dir => ["/opt/logstash/patterns", "/opt/logstash/extra_patterns"]
正則文件以文本格式描述:

patterns_file_glob
屬性值的類型:string
默認值:“*”
描述:針對patterns_dir屬性中指定的文件夾里哪些正則文件,可以在這個filter中生效,需要本屬性來指定。默認值“*”是指所有正則文件都生效。

tag_on_failure
值類型是數組
默認值是 [“_grokparsefailure”]
描述:沒有成功匹配時,將值附加到字段到tags

tag_on_timeout
值類型是字符串
默認值是 “_groktimeout”
描述:如果Grok正則表達式超時,則應用標記。

timeout_millis
值類型是数字
默認值是 30000
描述: 嘗試在這段時間后終止正則表達式。如果應用了多個模式,則這適用於每個模式。這將永遠不會提前超時,但超時可能需要一些時間。實際的超時時間是基於250ms量化的近似值。設置為0以禁用超時。

各組件的公共配置選項

break_on_match
值類型是布爾值
默認是true
描述:match可以一次設定多組,預設會依照順序設定處理,如果日誌滿足設定條件,則會終止向下處理。但有的時候我們會希望讓Logstash跑完所有的設定,這時可以將break_on_match設為false。

keep_empty_captures
值類型是布爾值
默認值是 false
描述:如果為true,捕獲失敗的字段將設置為空值

match
值類型是數組
默認值是 {}
描述:字段值的模式匹配
例如:

filter {
  grok { match => { "message" => "Duration: %{NUMBER:duration}" } }
}

#如果你需要針對單個字段匹配多個模式,則該值可以是一組,例如:
filter {
  grok { match => { "message" => [ "Duration: %{NUMBER:duration}", "Speed: %{NUMBER:speed}" ] } }
}

named_captures_only
值類型是布爾值
默認值是 true
描述:如果設置為true,則僅存儲來自grok的命名捕獲

overwrite
值類型是 array
默認是[]
描述:覆蓋已經存在的字段內容
例如:

filter {
  grok {
    match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
    overwrite => [ "message" ]
  }
}

如果日誌是May 29 16:37:11 sadness logger: hello world經過match屬性match => { “message” => “%{SYSLOGBASE} %{DATA:message}” }處理后,message的值變成了hello world。這時如果使用了overwrite => [ “message” ]屬性,那麼原來的message的值將被覆蓋成新值。

pattern_definitions
值類型是 數組
默認值是 {}
描述:模式名稱和模式正則表達式,也是用於定義當前過濾器要使用的自定義模式。匹配現有名稱的模式將覆蓋預先存在的定義。可以將此視為僅適用於grok定義的內聯模式,patterns_dir是將模式寫在外部。
例如:

filter {
    grok {
        patterns_dir => "/usr/local/elk/logstash/patterns"
        pattern_definitions => {"MYSELFTIMESTAMP" => "20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND})"}
        match => {"message" => ["%{MYSELFTIMESTAMP:timestamp} %{JAVACLASS:message}","%{MYSELF:content}"]}
    }
}

patterns_dir
值類型是數組
默認值是 []
描述:一些複雜的正則表達式,不適合直接寫到filter中,可以指定一個文件夾,用來專門保存正則表達式的文件,需要注意的是該文件夾中的所有文件中的正則表達式都會被依次加載,包括備份文件。

patterns_dir => ["/opt/logstash/patterns", "/opt/logstash/extra_patterns"]
正則文件以文本格式描述:


patterns_file_glob
屬性值的類型:string
默認值:“*”
描述:針對patterns_dir屬性中指定的文件夾里哪些正則文件,可以在這個filter中生效,需要本屬性來指定。默認值“*”是指所有正則文件都生效。

tag_on_failure
值類型是數組
默認值是 [“_grokparsefailure”]
描述:沒有成功匹配時,將值附加到字段到tags

tag_on_timeout
值類型是字符串
默認值是 “_groktimeout”
描述:如果Grok正則表達式超時,則應用標記。

timeout_millis
值類型是数字
默認值是 30000
描述: 嘗試在這段時間后終止正則表達式。如果應用了多個模式,則這適用於每個模式。這將永遠不會提前超時,但超時可能需要一些時間。實際的超時時間是基於250ms量化的近似值。設置為0以禁用超時。

常用選項
所有過濾器插件都支持以下配置選項:


dd_field
值類型是散列
默認值是 {}
描述:如果匹配成功,向此事件添加任意字段。字段名可以是動態的,並使用%{Field}包含事件的一部分
filter {
      grok {
        add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
      }
    }

# 你也可以一次添加多個字段
filter {
  grok {
    add_field => {
      "foo_%{somefield}" => "Hello world, from %{host}"
      "new_field" => "new_static_value"
    }
  }
}

add_tag
值類型是數組
默認值是 []
描述:如果此過濾器成功,請向該事件添加任意標籤。標籤可以是動態的,並使用%{field} 語法包含事件的一部分。
例如:

filter {
  grok {
    add_tag => [ "foo_%{somefield}" ]
  }
}

# 你也可以一次添加多個標籤
filter {
  grok {
    add_tag => [ "foo_%{somefield}", "taggedy_tag"]
  }
}

enable_metric
值類型是布爾值
默認值是 true
描述:禁用或啟用度量標準

id
值類型是字符串
此值沒有默認值。
描述:向插件實例添加唯一ID,此ID用於跟蹤插件特定配置的信息。
例如:

filter {
  grok {
    id => "ABC"
  }
}

periodic_flush
值類型是布爾值
默認值是 false
描述:如果設置為ture,會定時的調用filter的更新函數(flush method)

remove_field
值的類型:array
默認值:[]
描述:刪除當前文檔中的指定filted

filter {
  grok {
    remove_field => [ "foo_%{somefield}" ]
  }
}
# 你也可以一次移除多個字段:
filter {
  grok {
    remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
  }
}

remove_tag
值類型是數組
默認值是 []
描述:如果此過濾器成功,請從該事件中移除任意標籤。標籤可以是動態的,並使用%{field} 語法包括事件的一部分。
例如:

filter {
  grok {
    remove_tag => [ "foo_%{somefield}" ]
  }
}
# 你也可以一次刪除多個標籤
filter {
  grok {
    remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"]
  }
}

2、mutate

官網網址:https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html

mutate允許對字段執行常規改變。可以重命名、刪除、替換和修改事件中的字段。

二話不說,來個例子先:

filter {
    mutate {
        split => ["hostname", "."]  #切分
        add_field => { "shortHostname" => "%{hostname[0]}" } #獲取切分后的第一個字段作為添加字段
    }
    mutate {
        rename => ["shortHostname", "hostname" ] #重命名
    }
}

mutate的配置選項:

常用的一些操作:
convert:轉換數據類型,數據類型hash
可以裝換的數據類型有:integer,string,integer_eu(和integer相同,显示格式為1.000),float,float_eu,boolean
實例:
filter {
      mutate {
        convert => {
          "fieldname" => "integer"
          "booleanfield" => "boolean"
        }
      }
    }

copy:類型hash,將現有字段複製到另一個字段。現有的目標域將被覆蓋
實例:
 filter {
      mutate {
         copy => { "source_field" => "dest_field" }
      }
    }
    
gsub:類型array,根據字段值匹配正則表達式,並用替換字符串替換所有匹配項。只支持字符串或字符串數組的字段。對於其他類型的字段,將不採取任何操作。
實例:
filter {
      mutate {
        gsub => [
          # replace all forward slashes with underscore
          "fieldname", "/", "_",
          # replace backslashes, question marks, hashes, and minuses
          # with a dot "."
          "fieldname2", "[\\?#-]", "."
        ]
      }
    }
    
join:類型hash,用分隔符連接數組。對非數組字段不執行任何操作
實例:
filter {
     mutate {
       join => { "fieldname" => "," }
     }
   }
  
lowercase:類型array,轉為小寫
實例:
filter {
      mutate {
        lowercase => [ "fieldname" ]
      }
    }
    
merge:類型hash,合併數組或散列的兩個字段。字符串字段將自動轉換為數組
實例:
filter {
     mutate {
        merge => { "dest_field" => "added_field" }
     }
   }
   
coerce:類型hash,設置存在但為空的字段的默認值
實例:
filter {
      mutate {
        # Sets the default value of the 'field1' field to 'default_value'
        coerce => { "field1" => "default_value" }
      }
    }
    
rename:類型hash,重命名
實例:
filter {
      mutate {
        # Renames the 'HOSTORIP' field to 'client_ip'
        rename => { "HOSTORIP" => "client_ip" }
      }
    }
    
replace:類型hash,用新值替換字段的值
實例:
filter {
      mutate {
        replace => { "message" => "%{source_host}: My new message" }
      }
    }
    
split:類型hash,使用分隔符將字段分割為數組。只對字符串字段有效
實例:
filter {
      mutate {
         split => { "fieldname" => "," }
      }
    }
    
strip:類型array,字段中刪除空白。注意:這隻對前導和后導空格有效。
實例:
filter {
      mutate {
         strip => ["field1", "field2"]
      }
    }
    
update:類型hash,使用新值更新現有字段。如果該字段不存在,則不採取任何操作。
實例:
filter {
      mutate {
        update => { "sample" => "My new message" }
      }
    }
    
uppercase:類型array,轉為大寫
實例:
filter {
      mutate {
        uppercase => [ "fieldname" ]
      }
    }
    
capitalize:類型array,將字符串轉換為等效的大寫字母。
實例:
filter {
      mutate {
        capitalize => [ "fieldname" ]
      }
    }

tag_on_failure:類型string,如果在應用此變異篩選器期間發生故障,則終止其餘操作,默認值:_mutate_error

公共配置見:grok公共配置

3、date

date filter用於從字段解析日期,然後使用該日期或時間戳作為事件的logstash時間戳

date常用的配置選項:

match:類型array,字段名在前,格式模式在後的數組[ field,formats... ],表示該字段能夠匹配到的時間模式,時間模式可以有多種
實例:
filter {
      date {
        match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
      }
    }
    
tag_on_failure:類型array,默認值["_dateparsefailure"],當沒有成功匹配時,將值追加到tags字段

target:類型String,默認值"@timestamp",將匹配的時間戳存儲到給定的目標字段中。如果沒有提供,默認更新事件到@timestamp字段。

timezone:類型String,表示時區,可以在該網址查看:http://joda-time.sourceforge.net/timezones.html

公共配置見:grok公共配置

這裏只對如上三種filter說明,具體其他的filter請見官網:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

3.10、案例一、apache日誌解析

這個例子屬官網的一個例子:https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html

但是我這裏不弄這麼負責,我們不使用filebeat,直接使用logstash,apache日誌的數據集下載:https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz

我這裏不打算安裝apach,所以直接使用官方提供的數據集。

下載數據集,然後解壓文件,就可以得到我們的一個日誌文件:logstash-tutorial.log

首先我們看一下apache日誌的格式:

[elk@lgh ~]$ tail -3 logstash-tutorial.log 
86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /projects/xdotool/ HTTP/1.1" 200 12292 "http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"
86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /reset.css HTTP/1.1" 200 1015 "http://www.semicomplete.com/projects/xdotool/" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"
86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /style2.css HTTP/1.1" 200 4877 "http://www.semicomplete.com/projects/xdotool/" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"

然後開始配置:

cd logstash-7.7.0/ && mkdir conf.d
cd conf.d/ 
vim  apache.conf
#############apache.conf的內容如下###################
input {
        file {
            path => "/home/elk/logstash-tutorial.log"
            type => "log"
            start_position => "beginning"
            }
    }
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}

output {
    stdout { codec => rubydebug }
}

然後啟動命令(可以選擇用nohup後台啟動):

 cd logstash-7.7.0/ && ./bin/logstash -f conf.d/apache.conf

執行結果如下(部分結果):

{
           "verb" => "GET",
          "bytes" => "8948",
           "type" => "log",
           "host" => "gxt_126_233",
    "httpversion" => "1.0",
        "message" => "67.214.178.190 - - [04/Jan/2015:05:20:59 +0000] \"GET /blog/geekery/installing-windows-8-consumer-preview.html HTTP/1.0\" 200 8948 \"http://www.semicomplete.com/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:21.0) Gecko/20100101 Firefox/21.0\"",
      "timestamp" => "04/Jan/2015:05:20:59 +0000",
       "referrer" => "\"http://www.semicomplete.com/\"",
     "@timestamp" => 2020-06-17T01:23:47.817Z,
           "path" => "/data/hd05/elk/logstash-tutorial.log",
          "ident" => "-",
       "response" => "200",
       "@version" => "1",
        "request" => "/blog/geekery/installing-windows-8-consumer-preview.html",
       "clientip" => "67.214.178.190",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:21.0) Gecko/20100101 Firefox/21.0\"",
           "auth" => "-"
}
{
           "verb" => "GET",
          "bytes" => "1015",
           "type" => "log",
           "host" => "gxt_126_233",
    "httpversion" => "1.1",
        "message" => "66.249.73.185 - - [04/Jan/2015:05:18:48 +0000] \"GET /reset.css HTTP/1.1\" 200 1015 \"-\" \"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)\"",
      "timestamp" => "04/Jan/2015:05:18:48 +0000",
       "referrer" => "\"-\"",
     "@timestamp" => 2020-06-17T01:23:47.815Z,
           "path" => "/data/hd05/elk/logstash-tutorial.log",
          "ident" => "-",
       "response" => "200",
       "@version" => "1",
        "request" => "/reset.css",
       "clientip" => "66.249.73.185",
          "agent" => "\"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)\"",
           "auth" => "-"
}
{
           "verb" => "GET",
          "bytes" => "28370",
           "type" => "log",
           "host" => "gxt_126_233",
    "httpversion" => "1.0",
        "message" => "207.241.237.220 - - [04/Jan/2015:05:21:16 +0000] \"GET /blog/tags/projects HTTP/1.0\" 200 28370 \"http://www.semicomplete.com/blog/tags/C\" \"Mozilla/5.0 (compatible; archive.org_bot +http://www.archive.org/details/archive.org_bot)\"",
      "timestamp" => "04/Jan/2015:05:21:16 +0000",
       "referrer" => "\"http://www.semicomplete.com/blog/tags/C\"",
     "@timestamp" => 2020-06-17T01:23:47.817Z,
           "path" => "/data/hd05/elk/logstash-tutorial.log",
          "ident" => "-",
       "response" => "200",
       "@version" => "1",
        "request" => "/blog/tags/projects",
       "clientip" => "207.241.237.220",
          "agent" => "\"Mozilla/5.0 (compatible; archive.org_bot +http://www.archive.org/details/archive.org_bot)\"",
           "auth" => "-"
}

執行結果

2.11、案例二、nginx日誌解析

首先安裝nginx:nginx功能介紹和基本安裝

這裏我們也不使用filebeat,因為這篇文章只是介紹logstash

配置:

cd conf.d/ 
vim  nginx.conf
############nginx.conf配置如下#####################
input {
        file {
            path => "/usr/local/nginx/logs/access.log"
            type => "log"
            start_position => "beginning"
            }
    }
   
filter {
grok {
        match => { "message" => ["(?<RemoteIP>(\d*.\d*.\d*.\d*)) - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
        add_field => {
                "Device" => "Charles Desktop"
        }
        remove_field => "message"
        remove_field => "beat.version"
        remove_field => "beat.name"
    }
}
    output {    
             elasticsearch {
                    hosts => ["192.168.110.130:9200"]
                    index => "nginx-log-%{+YYYY.MM.dd}"
                }
    }

如上的配置中輸出到elasticsearch中,這裏沒有設置密碼,所以不需要用戶和密碼,還有就是這裏使用的默認模板,如果想要修改的話可以使用,可以添加如下配置:

user => "elastic"  #用戶
password => "${ES_PWD}" #通過keystore存儲的密碼
manage_template => false #關閉默認的模板
template_name => "elastic-slowquery" #指定自定義的模板

執行命令啟動logstash

cd logstash-7.7.0/ && ./bin/logstash -f conf.d/nginx.conf

執行的結果(查看elasticsearch集群):

 

 

 從上面的兩個圖看,一個創建了我們在nginx.conf中指定的一個索引,然後索引的內容都是解析出來的一些字段內容。

3.12、案例三、elasticsearch慢日誌解析

這是實例我們採用filebeat+logstash+elasticsearch,還有權限驗證進行試驗:

這裏主要是對elasticsearch的慢日誌查詢做解析,雖然我在一篇文章搞懂filebeat(ELK)中篇文章中直接通過filebeat的elasticsearch(beat版本)的模塊對其做過解析,但是解析的還是不夠特別完善,這裏引入logstash對其解析,過濾。

首先配置filebeat文件(這裏只配置了一個輸入和一個輸出,沒有做多餘的處理,只是用來收集日誌):

#=========================== Filebeat inputs =============================
filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/logs/es_aaa_index_search_slowlog.log
    - /var/logs/es_bbb_index_search_slowlog.log
    #- c:\programdata\elasticsearch\logs\*

#================================ Outputs =====================================

# Configure what output to use when sending the data collected by the beat.

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.110.130:5044","192.168.110.131:5044","192.168.110.132:5044"]
  loadbalance: true   #這裏採用負載均衡機制,

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

然後啟動filebeat:

cd filebeat-7.7.0-linux-x86_64 && ./filebeat -e

然後配置logstash的配置文件:

cd conf.d/ 
vim  es.conf
############es.conf配置如下##############
input {
        beats{
                port => 5044
        }
    }
   
filter {
grok {
        match => {"message" => "\[%{TIMESTAMP_ISO8601:query_time},%{NUMBER:number1}\]\s*\[%{DATA:log_type}\]\s*\[%{DATA:index_query_type}\]\s*\[%{DATA:es_node}\]\s*\[%{DATA:index_name}\]\s*\[%{NUMBER:share_id}\]\s*took\[%{DATA:times_s}\],\s*took_millis\[%{NUMBER:query_times_ms}\],\s*types\[%{DATA:types}\],\s*stats\[%{DATA:status}\],\s*search_type\[%{DATA:search_type}\],\s*total_shards\[%{NUMBER:total_shards}\],\s*source\[%{DATA:json_query}\],\s*extra_source"}
        remove_field => ["message","@version","status","times_s","@timestamp","number1"]
    }
  mutate{
        convert => {
        "query_times_ms" => "integer"
}
}
}
    output {    
             elasticsearch {
                    hosts => ["192.168.110.130:9200","192.168.110.131:9200","192.168.110.132:9200"]
                    index => "elastic-slowquery-222"
                    user => "elastic"
                    password => "${ES_PWD}"
                    manage_template => false
                    template_name => "elastic-slowquery"
                }
    }

如上配置使用了用戶的權限驗證,以及elasticsearch的自定義模板

啟動logstash

cd logstash-7.7.0/ && ./bin/logstash -f conf.d/es.conf

登錄es查看結果:

 

logstash就介紹到這裏了,如果有疑問多看官網比較好

 

參考:

logstash官網:https://www.elastic.co/guide/en/logstash/current/index.html

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

LeetCode 78,面試常用小技巧,通過二進制獲得所有子集

本文始發於個人公眾號:TechFlow,原創不易,求個關注

今天是LeetCode專題第47篇文章,我們一起來看下LeetCode的第78題Subsets(子集)。

這題的官方難度是Medium,點贊3489,反對79,通過率59.9%。從這個數據我們也可以看得出來,這是一道難度不是很大,但是質量很高的題。的確,在這道題的解法當中,你會學到一種新的技巧。

廢話不多說,我們先來看題意。

題意

這題的題意非常簡單,和上一題有的一拼,基本上從標題就能猜到題目的意思。給定一個沒有重複元素的int型數組,要求返回所有的子集,要求子集當中沒有重複項,每一項當中也沒有重複的元素。

樣例

Input: nums = [1,2,3]
Output:
[
  [3],
  [1],
  [2],
  [1,2,3],
  [1,3],
  [2,3],
  [1,2],
  []
]

照搬上題

剛拿到手可能有點蒙,但是稍微想一下就會發現,這一題和上題非常接近,兩者唯一的不同就是,子集沒有數量的限制,從空集開始,一直到它本身結束,不論多少個元素都可以。而上一題要求的是有數量限制的,也就是說上一題我們求的其實是限定了k個元素的子集。

想明白這點就簡單了,顯然我們可以復用上一題的算法,我們來遍歷這個k,從0到n,就可以獲得所有的子集了。只要你上一題做出來了,那麼這題幾乎沒有任何難度。如果你沒有看過上一題的文章的話,可以通過傳送門回顧一下:

LeetCode 77,組合挑戰,你能想出不用遞歸的解法嗎?

我們直接來看代碼:

class Solution:
    def subsets(self, nums: List[int]) -> List[List[int]]:
        # 上一題求解k個組合的解法
        def combine(n, k, ret):
            window = list(range(1, k+1)) + [n+1]
            j = 0
            
            while j < k:
                cur = []
                for i in range(k):
                    cur.append(nums[window[i] - 1])
                ret.append(cur[:])
                
                j = 0
                while j < k and window[j+1] == window[j] + 1:
                    window[j] = j + 1
                    j += 1
                window[j] += 1
                
        # 手動添加空集
        ret = [[]]
        n = len(nums)
        # 遍歷k從1到n
        for i in range(1, n+1):
            combine(n, i, ret)
        return ret

二進制組合

照搬上一題的解法固然是可行的,但是這麼做完全沒有必要,也得不到任何收穫。所以我們應該想一下新的解法。

既然這道題讓我們求的是所有的子集,那麼我們可以從子集的特點入手。我們之前學過,一個含有n個元素的子集的數量是。這個很容易想明白,因為n個元素,每個元素都有兩個狀態,選或者不選。並且這n個元素互相獨立,也就是說某個元素選或者不選並不會影響其他的元素,所以我們可以知道一共會有種可能。

我們也可以從組合數入手,我們令所有子集的數量為S,那麼根據上面我們用組合求解的解法,可以得到:

兩者的結果是一樣的,說明這個結論一定是正確的。

不知道大家看到n個元素,每個元素有兩個取值有什麼想法,如果做過的題目數量夠多的話,應該能很快聯想到二進制。因為在二進制當中,每一個二進制位就只有0和1兩種取值。那麼我們就可以用n位的二進制數來表示n個元素集合取捨的狀態。n位二進制數的取值範圍是,所以我們用一重循環去遍歷它,就相當於一重循環遍歷了整個集合所有的狀態。

這種技巧我們也曾經在動態規劃狀態壓縮的文章當中提到過,並且在很多題目當中都會用到。所以建議大家可以了解一下,說不定什麼時候面試就用上了。

根據這個技巧, 我們來實現代碼就非常簡單了。

class Solution:
    def subsets(self, nums: List[int]) -> List[List[int]]:
        ret = []
        n = len(nums)
        # 遍歷所有的狀態
        # 1左移n位相當於2的n次方
        for s in range(1 << n):
            cur = []
            # 通過位運算找到每一位是0還是1
            for i in range(n):
                # 判斷s狀態在2的i次方上,也就是第i位上是0還是1
                if s & (1 << i):
                    cur.append(nums[i])
            ret.append(cur[:])
            
        return ret

從代碼來看明顯比上面的解法短得多,實際上運行的速度也更快,因為我們去掉了所有多餘的操作,我們遍歷的每一個狀態都是正確的,也不用考慮重複元素的問題。

總結

不知道大家看完文章都有一些什麼感悟,可能第一種感悟就是LeetCode應該按照順序刷吧XD。

的確如此,LeetCode出題人出題都是有套路的,往往出了一道題之後,為了提升題目數量(湊提數),都會在之前題目的基礎上做變形,變成一道新題。所以如果你按照順序刷題的話,會很明顯地發現這一點。如果你從這個角度出發去思考的話,不但能理解題目之間的聯繫,還能揣摩出出題人的用意,這也是一件很有趣的事情。

如果喜歡本文,可以的話,請點個關注,給我一點鼓勵,也方便獲取更多文章。

本文使用 mdnice 排版

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

JDK動態代理

在《springAOP之代理模式》中說了代理模式,包含靜態代理和動態代理,在動態代理模式中又分為JDK動態代理和CGlib動態代理,今天重點來看JDK動態代理。

一、概述

說到JDK動態代理就必須想到JDK動態代理要求有一個統一的接口,那為什麼要有接口,下面會說到,下面看我的接口類,

package cn.com.jdk.proxy;

public interface Subject {

    void sayHello(String a);
}

接口類很簡單就是一個簡單的方法定義。下面看實際的接口的實現類SubjectImpl,

package cn.com.jdk.proxy;

public class SubjectImpl implements Subject {

    @Override
    public void sayHello(String a) {
        // TODO Auto-generated method stub

        System.out.println("hello:"+a);
    }

}

實現類簡單的事項了Subject接口,進行了打印操作。下面看代理類

package cn.com.jdk.proxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class JDKProxy implements InvocationHandler {
    private SubjectImpl si;
    //此屬性不用管
    private String a;
/**
 * proxy  JDK動態生成的代理類的實例
 * method 目標方法的Method對象     Class.forName("cn.com.jdk.proxy.Subject").getMethod("sayHello", new Class[] { Class.forName("java.lang.String") });
 * args   目標方法的參數                       new Object[] { paramString }
 */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // TODO Auto-generated method stub
        System.out.println("before");
        //使用反射的放式調用si(被代理類)目標方法
        Object o=method.invoke(si, args);
        System.out.println("after");
        return o;
    }
    public JDKProxy(SubjectImpl si,String a) {
        this.si=si;
        this.a=a;
    }

}

上面是代理類的實現,在代理類中含義被代理類的一個引用,且提供了響應的構造方法。下面具體的使用,

package cn.com.jdk.proxy;

import java.lang.reflect.Proxy;

public class ProxyTest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        //進行此項設置,可以在項目的com/sun/proxy目錄下找到JDK動態生成的代理類的字節碼文件
        System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles","true");
        SubjectImpl si=new SubjectImpl();
        
        Subject subject=(Subject)Proxy.newProxyInstance(si.getClass().getClassLoader(), si.getClass().getInterfaces(), new JDKProxy(si,"111"));
        subject.sayHello("tom");

    }

}

上面是使用的代碼,通過Proxy類的newProxyInstance方法獲得一個Subject的實例,調用sayHello方法,下面看執行結果

before
hello:tom
after

可以看到執行了sayHello方法,且打印了before和after,這不正是代理類中invoke方法的執行嗎,看下面

很神奇的一件事,我們不光調用了sayHello方法,實現了打印,而且在加入了自己的打印方法,這不正是AOP的增強功能嗎。這一切是怎麼發生的那,下面細細說來。

二、詳述

上面,我們又複習了JDK動態代理的內容,以及演示了如何使用JDK動態代理,下面我們要看這是怎麼實現的,先從測試的下面這段代碼說起,也是最重要的代碼,JDK動態代理的精華都在這句代碼里,

Subject subject=(Subject)Proxy.newProxyInstance(si.getClass().getClassLoader(), si.getClass().getInterfaces(), new JDKProxy(si,"111"));

這句代碼是調用了Proxy類的newProxyInstance方法,此方法的入參如下,

public static Object newProxyInstance(ClassLoader loader,
                                          Class<?>[] interfaces,
                                          InvocationHandler h)

 一共三個參數,一個是ClassLoader,這裏傳入的是被代理對象的類加載器;一個是Class,這裏傳入的是被代理對象所實現的接口;一個是InvocationHandler,這裏傳入的是代理類,代理類實現了InvocationHandler接口。

 1、newProxyInstance方法

下面看newProxyInstance方法的定義,

@CallerSensitive
    public static Object newProxyInstance(ClassLoader loader,
                                          Class<?>[] interfaces,
                                          InvocationHandler h)
        throws IllegalArgumentException
    {
        Objects.requireNonNull(h);

        final Class<?>[] intfs = interfaces.clone();
        final SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
        }

        /*
         * Look up or generate the designated proxy class.
         */
          //1、使用代理類的類加載器和其所實現的接口,動態生成代理類
        Class<?> cl = getProxyClass0(loader, intfs);

        /*
         * Invoke its constructor with the designated invocation handler.
         */
        try {
            if (sm != null) {
                checkNewProxyPermission(Reflection.getCallerClass(), cl);
            }
            //2、返回JDK生成的代理類的構造方法,該構造方法的參數為
            //  InvocationHandler
            final Constructor<?> cons = cl.getConstructor(constructorParams);
            final InvocationHandler ih = h;
            if (!Modifier.isPublic(cl.getModifiers())) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    public Void run() {
                        cons.setAccessible(true);
                        return null;
                    }
                });
            }
//3、返回該構造方法的一個實例,也就是使用InvocationHandler為參數的構造方法利用反射的機制返回一個實例。
return cons.newInstance(new Object[]{h}); } catch (IllegalAccessException|InstantiationException e) { throw new InternalError(e.toString(), e); } catch (InvocationTargetException e) { Throwable t = e.getCause(); if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new InternalError(t.toString(), t); } } catch (NoSuchMethodException e) { throw new InternalError(e.toString(), e); } }

該方法中有三步比較重要,上面的註釋已經標出。

1.1、getProxyClass0(loader, intfs)方法

該方法便是上面的第一步,這一步的作用是JDK返回一個代理類的實例,方法上的註釋如下,

/*
         * Look up or generate the designated proxy class.
         */
        Class<?> cl = getProxyClass0(loader, intfs);

註釋直譯過來是查找或者生成指定的代理類,這裡有兩層意思,一個是查找,第二個是生成,由此可以想到這個方法中應該有緩存,下面看方法的具體定義,

/**
     * Generate a proxy class.  Must call the checkProxyAccess method
     * to perform permission checks before calling this.
     */
    private static Class<?> getProxyClass0(ClassLoader loader,
                                           Class<?>... interfaces) {
        if (interfaces.length > 65535) {
            throw new IllegalArgumentException("interface limit exceeded");
        }

        // If the proxy class defined by the given loader implementing
        // the given interfaces exists, this will simply return the cached copy;
        // otherwise, it will create the proxy class via the ProxyClassFactory
        return proxyClassCache.get(loader, interfaces);
    }

這個方法很簡單,判斷了接口的數量,大於65535便拋異常,接口的數量大於65535的可能性不大。最後調用了proxyClassCache的get方法,首先看proxyClassCache,從字面上理解是代理類的緩存,看其定義,

/**
     * a cache of proxy classes
     */
    private static final WeakCache<ClassLoader, Class<?>[], Class<?>>
        proxyClassCache = new WeakCache<>(new KeyFactory(), new ProxyClassFactory());

是一個WeakCache對象實例,看下該構造方法,

/**
     * Construct an instance of {@code WeakCache}
     *
     * @param subKeyFactory a function mapping a pair of
     *                      {@code (key, parameter) -> sub-key}
     * @param valueFactory  a function mapping a pair of
     *                      {@code (key, parameter) -> value}
     * @throws NullPointerException if {@code subKeyFactory} or
     *                              {@code valueFactory} is null.
     */
    public WeakCache(BiFunction<K, P, ?> subKeyFactory,
                     BiFunction<K, P, V> valueFactory) {
        this.subKeyFactory = Objects.requireNonNull(subKeyFactory);
        this.valueFactory = Objects.requireNonNull(valueFactory);
    }

看了該類的構造方法后,回到proxyClassCache.get(loader, interfaces)方法的調用,我們已經知道proxyClassCache是WeakCache的一個實例,那麼get方法如下,

 /**
     * Look-up the value through the cache. This always evaluates the
     * {@code subKeyFactory} function and optionally evaluates
     * {@code valueFactory} function if there is no entry in the cache for given
     * pair of (key, subKey) or the entry has already been cleared.
     *
     * @param key       possibly null key
     * @param parameter parameter used together with key to create sub-key and
     *                  value (should not be null)
     * @return the cached value (never null)
     * @throws NullPointerException if {@code parameter} passed in or
     *                              {@code sub-key} calculated by
     *                              {@code subKeyFactory} or {@code value}
     *                              calculated by {@code valueFactory} is null.
     */
    public V get(K key, P parameter) {
        Objects.requireNonNull(parameter);

        expungeStaleEntries();

        Object cacheKey = CacheKey.valueOf(key, refQueue);

        // lazily install the 2nd level valuesMap for the particular cacheKey
        ConcurrentMap<Object, Supplier<V>> valuesMap = map.get(cacheKey);
        if (valuesMap == null) {
            ConcurrentMap<Object, Supplier<V>> oldValuesMap
                = map.putIfAbsent(cacheKey,
                                  valuesMap = new ConcurrentHashMap<>());
            if (oldValuesMap != null) {
                valuesMap = oldValuesMap;
            }
        }

        // create subKey and retrieve the possible Supplier<V> stored by that
        // subKey from valuesMap
        Object subKey = Objects.requireNonNull(subKeyFactory.apply(key, parameter));
        Supplier<V> supplier = valuesMap.get(subKey);
        Factory factory = null;

        while (true) {
            if (supplier != null) {
                // supplier might be a Factory or a CacheValue<V> instance
                V value = supplier.get();
                if (value != null) {
                    return value;
                }
            }
            // else no supplier in cache
            // or a supplier that returned null (could be a cleared CacheValue
            // or a Factory that wasn't successful in installing the CacheValue)

            // lazily construct a Factory
            if (factory == null) {
                factory = new Factory(key, parameter, subKey, valuesMap);
            }

            if (supplier == null) {
                supplier = valuesMap.putIfAbsent(subKey, factory);
                if (supplier == null) {
                    // successfully installed Factory
                    supplier = factory;
                }
                // else retry with winning supplier
            } else {
                if (valuesMap.replace(subKey, supplier, factory)) {
                    // successfully replaced
                    // cleared CacheEntry / unsuccessful Factory
                    // with our Factory
                    supplier = factory;
                } else {
                    // retry with current supplier
                    supplier = valuesMap.get(subKey);
                }
            }
        }
    }

 上面是WeakCache的get方法,這個方法暫時不作說明,後面會詳細介紹WeakCache類,請參見《JDK動態代理之WeakCache 》。這裏只需記住該get方法會返回一個代理類的實例即可。那麼此代理類是如何定義的那?

1.1.1、$Proxy0.class代理類

這個代理類是JDK動態生成的,其命名規則為以“$”開頭+Proxy+“從0開始的序列”。上面在測試的時候,我們加入了下面這行代碼,

//進行此項設置,可以在項目的com/sun/proxy目錄下找到JDK動態生成的代理類的字節碼文件
        System.getProperties().put("sun.misc.ProxyGenerator.saveGeneratedFiles","true");

註釋中寫到可以生成代理類的字節碼文件,下面是使用反編譯工具過來的java代碼,

package com.sun.proxy;

import cn.com.jdk.proxy.Subject;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;

public final class $Proxy0 extends Proxy
  implements Subject
{
  private static Method m1;
  private static Method m3;
  private static Method m2;
  private static Method m0;
 //參數為InvocationHandler的構造方法
  public $Proxy0(InvocationHandler paramInvocationHandler)
    throws 
  {
   //調用父類Proxy的構造方法,在父類的構造方法中會初始化h屬性
    super(paramInvocationHandler);
  }

  public final boolean equals(Object paramObject)
    throws 
  {
    try
    {
      return ((Boolean)this.h.invoke(this, m1, new Object[] { paramObject })).booleanValue();
    }
    catch (RuntimeException localRuntimeException)
    {
      throw localRuntimeException;
    }
    catch (Throwable localThrowable)
    {
    }
    throw new UndeclaredThrowableException(localThrowable);
  }
//實現的Subject的sayHello方法
  public final void sayHello(String paramString)
    throws 
  {
    try
    {
      //調用h的invoke方法,這裏的h指的是實現了InvocationHandler的類
      //調用其中的invoke方法,在本例中是調用JDKProxy類中的invoke方
      //
      this.h.invoke(this, m3, new Object[] { paramString });
      return;
    }
    catch (RuntimeException localRuntimeException)
    {
      throw localRuntimeException;
    }
    catch (Throwable localThrowable)
    {
    }
    throw new UndeclaredThrowableException(localThrowable);
  }

  public final String toString()
    throws 
  {
    try
    {
      return (String)this.h.invoke(this, m2, null);
    }
    catch (RuntimeException localRuntimeException)
    {
      throw localRuntimeException;
    }
    catch (Throwable localThrowable)
    {
    }
    throw new UndeclaredThrowableException(localThrowable);
  }

  public final int hashCode()
    throws 
  {
    try
    {
      return ((Integer)this.h.invoke(this, m0, null)).intValue();
    }
    catch (RuntimeException localRuntimeException)
    {
      throw localRuntimeException;
    }
    catch (Throwable localThrowable)
    {
    }
    throw new UndeclaredThrowableException(localThrowable);
  }

  static
  {
    try
    {
      m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[] { Class.forName("java.lang.Object") });
      m3 = Class.forName("cn.com.jdk.proxy.Subject").getMethod("sayHello", new Class[] { Class.forName("java.lang.String") });
      m2 = Class.forName("java.lang.Object").getMethod("toString", new Class[0]);
      m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class[0]);
      return;
    }
    catch (NoSuchMethodException localNoSuchMethodException)
    {
      throw new NoSuchMethodError(localNoSuchMethodException.getMessage());
    }
    catch (ClassNotFoundException localClassNotFoundException)
    {
    }
    throw new NoClassDefFoundError(localClassNotFoundException.getMessage());
  }
}

上面是反編譯過來的JDK生成的代理類的代碼,包含了一個使用InvocationHandler作為參數的構造方法,以及實現了Subject接口的sayHello方法。上面註釋中寫到該構造方法調用了其父類Proxy的構造方法,下面看其父類Proxy的構造方法,

protected Proxy(InvocationHandler h) {
        Objects.requireNonNull(h);
        this.h = h;
    }

把InvocationHandler的值賦給了h,h的定義如下,

protected InvocationHandler h;

那麼在生成的代理類中自然會繼承該屬性,所以在代理類中的sayHello中使用下面的方法調用,

public final void sayHello(String paramString)
    throws 
  {
    try
    {
      this.h.invoke(this, m3, new Object[] { paramString });
      return;
    }
    catch (RuntimeException localRuntimeException)
    {
      throw localRuntimeException;
    }
    catch (Throwable localThrowable)
    {
    }
    throw new UndeclaredThrowableException(localThrowable);
  }

上面的this.h便是其父類的h屬性。在上面的this.h.invoke中的m3是怎麼來的那,看下面,

 static
  {
    try
    {
      m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[] { Class.forName("java.lang.Object") });
      m3 = Class.forName("cn.com.jdk.proxy.Subject").getMethod("sayHello", new Class[] { Class.forName("java.lang.String") });
      m2 = Class.forName("java.lang.Object").getMethod("toString", new Class[0]);
      m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class[0]);
      return;
    }
    catch (NoSuchMethodException localNoSuchMethodException)
    {
      throw new NoSuchMethodError(localNoSuchMethodException.getMessage());
    }
    catch (ClassNotFoundException localClassNotFoundException)
    {
    }
    throw new NoClassDefFoundError(localClassNotFoundException.getMessage());
  }

在該類的靜態代碼塊中給出了4個屬性。

1.2、getConstructor(constructorParams)方法

在上面的getProxyClass0方法中我們知道該方法會返回一個JDK生成代理類的Class對象,此類的定義便是上面的$Proxy0.class類。其定義在上面已經分析過。getConstructor方法要返回一個以constructorParams為參數的構造方法,

@CallerSensitive
    public Constructor<T> getConstructor(Class<?>... parameterTypes)
        throws NoSuchMethodException, SecurityException {
        checkMemberAccess(Member.PUBLIC, Reflection.getCallerClass(), true);
        return getConstructor0(parameterTypes, Member.PUBLIC);
    }

調用了getConstuctor0方法返回一個public的構造方法,

private Constructor<T> getConstructor0(Class<?>[] parameterTypes,
                                        int which) throws NoSuchMethodException
    {
        Constructor<T>[] constructors = privateGetDeclaredConstructors((which == Member.PUBLIC));
        for (Constructor<T> constructor : constructors) {
            if (arrayContentsEq(parameterTypes,
                                constructor.getParameterTypes())) {
                return getReflectionFactory().copyConstructor(constructor);
            }
        }
        throw new NoSuchMethodException(getName() + ".<init>" + argumentTypesToString(parameterTypes));
    }

上面的方法會返回一個public的構造方法。

回到最初的調用,我們看getConstructor方法的參數是constructorParams,此屬性定義如下,

/** parameter types of a proxy class constructor */
    private static final Class<?>[] constructorParams =
        { InvocationHandler.class };

是一個Class數組,其類型為InvocationHandler。這樣便可以知道是通過代理類的Class對象返回其構造方法cons。有了構造方法下面便是通過構造方法生成實例。

1.3、cons.newInstance(new Object[]{h})方法

此方法便是通過構造方法返回一個代理類的實例。

 

上面分析了Proxy的newProxyInstance方法,此方法最終會返回一個代理類的實例,會經過下面幾個步驟,

從上面的步驟,我們知道在獲得代理類的構造方法時,是獲得其參數為InvocationHandler的構造方法,所以肯定要實現InvocationHandler接口,在本例中便是JDKProxy類,這個類實現了這個接口。值開篇我們講到JDK動態代理必須要有統一的接口,從上面的步驟中我們知道在生成代理類的Class對象時使用了兩個參數,一個ClassLoader,另一個是接口,這裏就是為什麼要有統一的接口,因為在生成代理類的Class對象中需要接口,所以被代理類必須要有一個接口。

2、方法調用

這裏的方法調用,便是對應使用方法中的下面這行代碼,

subject.sayHello("tom");

在上面的分析中獲得了一個代理類的實例,即下面這行代碼,

Subject subject=(Subject)Proxy.newProxyInstance(si.getClass().getClassLoader(), si.getClass().getInterfaces(), new JDKProxy(si,"111"));

通過使用被代理類的類加載器、被代理類所實現的接口、實現了InvocationHandler接口的類的實例三個參數,返回了一個代理類的實例。上面已經詳細分析過。此代理類的實例繼承了Proxy,實現了Subject接口。其sayHello方法如下,

public final void sayHello(String paramString)
    throws 
  {
    try
    {
      this.h.invoke(this, m3, new Object[] { paramString });
      return;
    }
    catch (RuntimeException localRuntimeException)
    {
      throw localRuntimeException;
    }
    catch (Throwable localThrowable)
    {
    }
    throw new UndeclaredThrowableException(localThrowable);
  }

上面已經分析過,this.h是InvocationHandler的實例,這裏便是new JDKProxy(si,”111″),m3是m3 = Class.forName(“cn.com.jdk.proxy.Subject”).getMethod(“sayHello”, new Class[] { Class.forName(“java.lang.String”) });下面看JDKProxy中的invoke方法,

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // TODO Auto-generated method stub
        System.out.println("before");
        //使用反射的放式調用目標方法
        Object o=method.invoke(si, args);
        System.out.println("after");
        return o;
    }

此方法的三個參數分別為代理類的實例、Method對象(sayHello),調用sayHello時的參數,所以要調用被代理類的sayHello方法,需要這樣寫:method.invoke(si,args),即調用被代理類(SubjectImpl)的sayHello方法,參數為args(tom)。下面是一個簡單的方法調用過程,

三、總結

本文分析了JDK動態代理的簡單使用方法及背後的原理,有不當之處歡迎指正,感謝!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

Probius:一個功能強大的自定義任務系統

斷更的這些日子,我又折騰了一個輪子,文末參考源碼

大約在一年半以前寫過一篇文章『探秘varian:優雅的發布部署程序』,裡邊有講到我們採用類似lego的模塊化方式來構建CICD的流程,雖能滿足我們的需求,但終究需要編寫代碼,使用成本有點高,不夠友好。近段時間終於下定決心將其重構,只為帶來更好的使用體驗,於是便有了這個項目Probius

Probius為遊戲星際爭霸里的角色,是一隻充滿好奇心的星靈探測機,取此名字的意思也是希望用戶能夠在這個系統中充分發揮想象,藉助此系統實現各種自定義的功能,覆蓋更多的運維場景

設計思路

Probius由三個關鍵詞構成:命令、模板、任務

命令:為系統中的最小粒度,可以是一個具體的linux命令,或者是一個腳本都可以

模板:模板為一組命令的集合

任務:模板為靜態的定義,而任務就是模板的執行,執行一個任務實際上就是去執行了一個模板內的所有命令

整體思想跟varian一樣,但不同的是可以僅僅通過web端的配置,就能實現各種各樣的功能,下邊具體介紹下如何配置的

頁面配置

新建命令,在這個頁面可以創建命令或者腳本

如果是單純的命令,直接在命令輸入框填寫即可,如果是需要執行腳本,則點擊腳本之後,會額外多出一個腳本輸入框,填寫要執行的腳本

理論上不限制腳本的類型,可以是shell、python或者go之類的,前提是系統上有腳本的運行環境,當命令或者腳本有參數的時候可以在參數列寫上參數名稱,然後在最終執行任務的時候需要傳遞具體參數的值過來

在命令執行完成后,會根據命令的返回狀態也就是$?的值來判斷命令是否執行成功,當$?為0是表示執行成功,否則表示執行失敗,如果是執行的腳本時,需要在腳本最後明確腳本返回狀態,shell腳本可以在腳本執行成功時通過exit指定退出狀態,例如

ls /ops-coffee.cn &&\
exit 0 ||\
exit 2

而對於python腳本則可以藉助sys.exit這樣寫

import sys

if 'www' in 'ops-coffee.cn':
  sys.exit(0)
else:
  sys.exit(3)

其他語言類似

模板的創建分為幾步,先創建一個模板

然後給模板添加任務

主要為選擇任務、確定執行順序、選擇執行主機以及執行用戶,添加完成后可以在模板詳情頁面看到關聯的命令

模板定義了一個完整的任務流程,定義完成后就可以執行任務了,執行任務界面寫的比較簡單

這界面主要給運維人員使用,定義任務名稱、所要執行的模板ID、以及參數,支持定時執行或者周期執行,只需要加上crontab參數即可,除了可以立即執行任務外,還可以將次任務保存為常用任務,後續在常用任務頁面可以直接執行

這個功能主要方便其他非運維人員使用本系統,同時也支持針對任務設置權限,可以將權限設置給某個用戶組,那麼則只有這個組內的成員可以看到並執行任務了

任務執行后可以通過任務歷史查看任務執行詳情,在這個頁面可以清晰的看到任務執行到了哪一步,是成功還是失敗

可以點擊每一步任務後邊的日誌查看實時日誌輸出

寫在最後

如果你用過我們開源的一站式DevOps平台CODO的話,會發現這個系統跟CODO的TASK模塊非常像,是的沒錯,這個設計與CODO的TASK如出一轍,但開源的CODO任務模塊要更加強大,例如支持分組執行、支持任務重做、支持人工干預等等

TASK的源碼在這裏:https://github.com/opendevops-cn/codo-task,感興趣可以自行閱讀部署,需要注意的是CODO為微服務架構,單獨安裝TASK是無法正常運行的,具體部署方法參考官方文檔

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化