多線程高併發編程(10) — ConcurrentHashMap源碼分析

  一.背景

  前文講了HashMap的源碼分析,從中可以看到下面的問題:

  • HashMap的put/remove方法不是線程安全的,如果在多線程併發環境下,使用synchronized進行加鎖,會導致效率低下;
  • 在遍歷迭代獲取時進行修改(put/remove)操作,會導致發生併發修改異常(ConcurrentModificationException);
  • 在JDK1.7之前,對HashMap進行put添加操作,會導致鏈表反轉,造成鏈表迴路,從而發生get死循環,(當然這個問題在JDK1.8被改進了按照原鏈表順序進行重排移動);
  • 如果多個線程同時檢測到元素個數超過 數組大小 * loadFactor,這樣就會發生多個線程同時對數組進行擴容,都在重新計算元素位置以及複製數據,但是最終只有一個線程擴容后的數組會賦給 table,也就是說其他線程的都會丟失,並且各自線程 put 的數據也丟失;

  基於上述問題,都可以使用ConcurrentHashMap進行解決,ConcurrentHashMap使用分段鎖技術解決了併發訪問效率,在遍歷迭代獲取時進行修改操作也不會發生併發修改異常等等問題。

  二.源碼解析

  1. 構造方法:

    //最大容量大小
        private static final int MAXIMUM_CAPACITY = 1 << 30;
        //默認容量大小
        private static final int DEFAULT_CAPACITY = 16;
        /**
         *控制標識符,用來控制table的初始化和擴容的操作,不同的值有不同的含義
         *  多線程之間,以volatile方式讀取sizeCtl屬性,來判斷ConcurrentHashMap當前所處的狀態。
         *  通過cas設置sizeCtl屬性,告知其他線程ConcurrentHashMap的狀態變更
         *未初始化:
         *  sizeCtl=0:表示沒有指定初始容量。
         *  sizeCtl>0:表示初始容量。
         *初始化中:
         *  sizeCtl=-1,標記作用,告知其他線程,正在初始化
         *正常狀態:
         *  sizeCtl=0.75n ,擴容閾值
         *擴容中:
         *  sizeCtl < 0 : 表示有其他線程正在執行擴容
         *  sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此時只有一個線程在執行擴容
         */
        private transient volatile int sizeCtl;
        //併發級別
        private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
        //創建一個新的空map,默認大小是16
        public ConcurrentHashMap() {
        }
        public ConcurrentHashMap(int initialCapacity) {
            if (initialCapacity < 0)
                throw new IllegalArgumentException();
            //調整table的大小,tableSizeFor的實現查看前文HashMap源碼分析的構造方法模塊
            int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                       MAXIMUM_CAPACITY :
                       tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
            this.sizeCtl = cap;
        }
        public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
            this.sizeCtl = DEFAULT_CAPACITY;
            putAll(m);
        }
        public ConcurrentHashMap(int initialCapacity, float loadFactor) {
            this(initialCapacity, loadFactor, 1);
        }
        /**
         * concurrencyLevel:併發度,預估同時操作數據的線程數量
         * 表示能夠同時更新ConccurentHashMap且不產生鎖競爭的最大線程數。
         * 默認值為16,(即允許16個線程併發可能不會產生競爭)。
         */
        public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            //至少使用同樣多的桶容納同樣多的更新線程來操作元素
            if (initialCapacity < concurrencyLevel)   // Use at least as many bins
                initialCapacity = concurrencyLevel;   // as estimated threads
            long size = (long)(1.0 + (long)initialCapacity / loadFactor);
            int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
            this.sizeCtl = cap;
        }
  2. put:

    public V put(K key, V value) {
            return putVal(key, value, false);
        }
        static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash普通節點哈希的可用位
        //把位數控制在int最大整數之內,h ^ (h >>> 16)的含義查看前文的put源碼解析
        static final int spread(int h) {
            return (h ^ (h >>> 16)) & HASH_BITS;
        }
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            //key和value為空拋出異常
            if (key == null || value == null) throw new NullPointerException();
            //得到hash值
            int hash = spread(key.hashCode());
            int binCount = 0;
            //自旋對table進行遍歷
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                //初始化table
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                //如果hash計算出的槽位元素為null,CAS將元素填充進當前槽位並結束遍歷
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                //hash為-1,說明正在擴容,那麼就幫助其擴容。以加快速度
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    synchronized (f) {// 同步 f 節點,防止增加鏈表的時候導致鏈表成環
                        if (tabAt(tab, i) == f) {// 如果對應的下標位置的節點沒有改變
                            if (fh >= 0) {//f節點的hash值大於0
                                binCount = 1;//鏈表初始長度
                                // 死循環,直到將值添加到鏈表尾部,並計算鏈表的長度
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    //hash和key相同,值進行覆蓋
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    //hash和key不同,添加到鏈表後面
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            //是樹節點,添加到樹中
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                     //如果節點添加到鏈表和樹中
                    if (binCount != 0) {
                        //鏈表長度大於等於8時,將鏈錶轉換成紅黑樹樹
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            // 判斷是否需要擴容
            addCount(1L, binCount);
            return null;
        }
    1. initTable:初始化

      private final Node<K,V>[] initTable() {
              Node<K,V>[] tab; int sc;
              while ((tab = table) == null || tab.length == 0) {
                  //如果一個線程發現sizeCtl<0,意味着另外的線程執行CAS操作成功,當前線程只需要讓出cpu時間片,即保證只有一個線程初始化
                  //由於sizeCtl是volatile的,保證了順序性和可見性
                  if ((sc = sizeCtl) < 0)
                      Thread.yield(); // lost initialization race; just spin
                  else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//cas操作判斷並置為-1
                      try {
                          if ((tab = table) == null || tab.length == 0) {
                              int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//若沒有參數則默認容量為16
                              @SuppressWarnings("unchecked")
                              Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//創建數組
                              table = tab = nt;//數組賦值給當前ConcurrentHashMap
                              //計算下一次元素到達擴容的閥值,如果n為16的話,那麼這裏 sc = 12,其實就是 0.75 * n
                              sc = n - (n >>> 2);
                          }
                      } finally {
                          sizeCtl = sc;
                      }
                      break;
                  }
              }
              return tab;
          }
    2. tabAt:尋找指定數組在內存中i位置的數據

      static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
              /**getObjectVolatile:獲取obj對象中offset偏移地址對應的object型field的值,支持volatile load語義。
               * 數組的尋址計算方式:a[i]_address = base_address + i * data_type_size
               * base_address:起始地址;i:索引;data_type_size:數據類型長度大小
               */
              return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
          }
    3. helpTransfer:幫助擴容

      private static int RESIZE_STAMP_BITS = 16;
          /**
           * numberOfLeadingZeros()的具體算法邏輯請參考:https://www.jianshu.com/p/2c1be41f6e59
           * numberOfLeadingZeros(n)返回的是n的二進制標識的從高位開始到第一個非0的数字的之間0的個數,比如numberOfLeadingZeros(8)返回的就是28 ,因為0000 0000 0000 0000 0000 0000 0000 1000在1前面有28個0
           * RESIZE_STAMP_BITS 的值是16,1 << (RESIZE_STAMP_BITS - 1)就是將1左移位15位,0000 0000 0000 0000 1000 0000 0000 0000
           * 然後將兩個数字再按位或,將相當於 將移位后的 兩個數相加。
           * 比如:
           * 8的二進製表示是: 0000 0000 0000 0000 0000 0000 0000 1000 = 8
           * 7的二進製表示是: 0000 0000 0000 0000 0000 0000 0000 0111 = 7
           * 按位或的結果是:  0000 0000 0000 0000 0000 0000 0000 1111 = 15
           * 相當於 8 + 7 =15
           * 為什麼會出現這種效果呢?因為8是2的整數次冪,也就是說8的二進製表示只會在某個高位上是1,其餘地位都是0,所以在按位或的時候,低位表示的全是7的位值,所以出現了這種效果。
           */
          static final int resizeStamp(int n) {
              return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
          }
          final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
              Node<K,V>[] nextTab; int sc;
               //如果table不是空,且node節點是轉移類型,且node節點的nextTable(新 table)不是空,嘗試幫助擴容
              if (tab != null && (f instanceof ForwardingNode) &&
                  (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
                  //根據length得到一個標識符號
                  int rs = resizeStamp(tab.length);
                   //如果nextTab沒有被併發修改,且tab也沒有被併發修改,且sizeCtl<0(說明還在擴容)
                  while (nextTab == nextTable && table == tab &&
                         (sc = sizeCtl) < 0) {
                      /**
                       * 如果 sizeCtl 無符號右移16不等於rs( sc前16位如果不等於標識符,則標識符變化了)
                       * 或者 sizeCtl == rs + 1(擴容結束了,不再有線程進行擴容)(默認第一個線程設置 sc ==rs 左移 16 位 + 2,當第一個線程結束擴容了,就會將 sc 減1。這個時候,sc 就等於 rs + 1)
                       * 或者 sizeCtl == rs + 65535  (如果達到最大幫助線程的數量,即 65535)
                       * 或者轉移下標正在調整 (擴容結束)
                       * 結束循環,返回 table
                       * 【即如果還在擴容,判斷標識符是否變化,判斷擴容是否結束,判斷是否達到最大線程數,判斷擴容轉移下標是否在調整(擴容結束),如果滿足任意條件,結束循環。】
                       */
                      if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                          sc == rs + MAX_RESIZERS || transferIndex <= 0)
                          break;
                      // 如果以上都不是, 將 sizeCtl + 1, (表示增加了一個線程幫助其擴容)
                      if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                          transfer(tab, nextTab);//進行擴容和數據遷移
                          break;
                      }
                  }
                  return nextTab;//返回擴容后的數組
              }
              return table;//沒有擴容,返回原數組
          }
    4.  transfer:擴容和數據遷移,採用多線程擴容,整個擴容過程,通過cas設置sizeCtl、transferIndex等變量協調多個線程進行併發擴容;

      1.  transferIndex屬性:
        //擴容索引,表示已經分配給擴容線程的table數組索引位置。主要用來協調多個線程,併發安全地獲取遷移任務(hash桶)。
        private transient volatile int transferIndex;
        1. 在擴容之前,transferIndex 在數組的最右邊 。此時有一個線程發現已經到達擴容閾值,準備開始擴容。

        2. 擴容線程,在遷移數據之前,首先要將transferIndex左移(以cas的方式修改 transferIndex=transferIndex-stride(要遷移hash桶的個數)),獲取遷移任務。每個擴容線程都會通過for循環+CAS的方式設置transferIndex,因此可以確保多線程擴容的併發安全。( 換個角度,我們可以將待遷移的table數組,看成一個任務隊列,transferIndex看成任務隊列的頭指針。而擴容線程,就是這個隊列的消費者。擴容線程通過CAS設置transferIndex索引的過程,就是消費者從任務隊列中獲取任務的過程。 )
      2.  擴容過程:
        1.  容量已經達到擴容閾值,需要進行擴容操作,此時transferindex=tab.length=32
        2. 擴容線程A 以cas的方式修改transferindex=31-16=16 ,然後按照降序遷移table[31]–table[16]這個區間的hash桶
        3. 遷移hash桶時,會將桶內的鏈表或者紅黑樹,按照一定算法,拆分成2份,將其插入nextTable[i]和nextTable[i+n](n是table數組的長度)。 遷移完畢的hash桶,會被設置成ForwardingNode節點,以此告知訪問此桶的其他線程,此節點已經遷移完畢
        4. 此時線程2訪問到了ForwardingNode節點,如果線程2執行的put或remove等寫操作,那麼就會先幫其擴容。如果線程2執行的是get等讀方法,則會調用ForwardingNode的find方法,去nextTable裏面查找相關元素
        5. 線程2加入擴容操作
        6. 如果準備加入擴容的線程,發現以下情況,放棄擴容,直接返回。
          1. 發現transferIndex=0,即所有node均已分配

          2. 發現擴容線程已經達到最大擴容線程數

                                                    

      1.  源碼解析
        private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            int n = tab.length, stride;
            //先判斷CPU核數,如果是多核,將數組長度/8,再/核數,得到stride,否則stride=數組長度,如果stride<16,則stride=16
            //這裏的目的是讓每個CPU處理的桶一樣多,避免出現轉移任務不均勻的現象,如果桶較少的話,默認一個CPU(一個線程)處理16個桶,即確保每次至少獲取16個桶(遷移任務)
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
            //未初始化進行初始化
            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//擴容2倍
                    nextTab = nt;//更新
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;//擴容失敗,sizeCtl使用int最大值。
                    return;
                }
                nextTable = nextTab;//更新成員變量
                //transferIndex默認=table.length
                transferIndex = n;
            }
            int nextn = nextTab.length;//新tab的長度
            //創建一個fwd節點,用於佔位。當別的線程發現這個槽位中是fwd類型的節點,表示其他線程正在擴容,並且此節點已經擴容完畢,跳過這個節點。關聯了nextTab,可以通過ForwardingNode.find()訪問已經遷移到nextTab的數據。
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
            //首次推進為 true,如果等於true,說明需要再次推進一個下標(i--),反之,如果是false,那麼就不能推進下標,需要將當前的下標處理完畢才能繼續推進
            boolean advance = true;
            //完成狀態,如果是true,就結束此方法。
            boolean finishing = false; // to ensure sweep before committing nextTab
            //自旋,i表示當前線程可以處理的當前桶區間最大下標,bound表示當前線程可以處理的當前桶區間最小下標
            for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                 //while:如果當前線程可以向後推進;這個循環就是控制i遞減。同時,每個線程都會進入這裏取得自己需要轉移的桶的區間
                //分析場景:table.length=32,此時執行到這個地方nextTab.length=64 A,B線程同時進行擴容。
                //A,B線程同時執行到while循環中cas這段代碼
                //A線程獲第一時間搶到資源,設置bound=nextBound=16,i = nextIndex - 1=31 A線程搬運table[31]~table[16]中間16個元素
                //B線程再次回到while起點,然後在次獲取到 bound = nextBound-0,i=nextIndex - 1=15,B線程搬運table[15]~table[0]中間16個元素
                //當transferIndex=0的時候,說明table裏面所有搬運任務都已經完成,無法在分配任務。
                while (advance) {
                    int nextIndex, nextBound;
                    // 對i減1,判斷是否大於等於bound(正常情況下,如果大於bound不成立,說明該線程上次領取的任務已經完成了。那麼,需要在下面繼續領取任務)
                    // 如果對i減1大於等於 bound,或者完成了,修改推進狀態為 false,不能推進了。任務成功后修改推進狀態為 true。
                    // 通常,第一次進入循環,i-- 這個判斷會無法通過,從而走下面的nextIndex = transferIndex(獲取最新的轉移下標)。其餘情況都是:如果可以推進,將i減1,然後修改成不可推進。如果i對應的桶處理成功了,改成可以推進。
                    if (--i >= bound || finishing)
                        advance = false;//這裏設置false,是為了防止在沒有成功處理一個桶的情況下卻進行了推進
                   // 這裏的目的是:1. 當一個線程進入時,會選取最新的轉移下標。
                   //             2. 當一個線程處理完自己的區間時,如果還有剩餘區間的沒有別的線程處理,再次CAS獲取區間。
                    else if ((nextIndex = transferIndex) <= 0) {
                        // 如果小於等於0,說明沒有區間可以獲取了,i改成-1,推進狀態變成false,不再推進
                        // 這個-1會在下面的if塊里判斷,從而進入完成狀態判斷
                        i = -1;
                        advance = false;//這裏設置false,是為了防止在沒有成功處理一個桶的情況下卻進行了推進
                    }
                    // CAS修改transferIndex,即 length - 區間值,留下剩餘的區間值供後面的線程使用
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;//這個值就是當前線程可以處理的最小當前區間最小下標
                        i = nextIndex - 1;//初次對i賦值,這個就是當前線程可以處理的當前區間的最大下標
                        advance = false;// 這裏設置false,是為了防止在沒有成功處理一個桶的情況下卻進行了推進,這樣導致漏掉某個桶。下面的 if(tabAt(tab, i) == f) 判斷會出現這樣的情況。
                    }
                }
                //i<0(不在 tab 下標內,按照上面的判斷,領取最後一段區間的線程結束)
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    if (finishing) {// 如果完成了擴容和數據遷移
                        nextTable = null;//刪除成員遍歷
                        table = nextTab;//更新table
                        sizeCtl = (n << 1) - (n >>> 1);//更新閥值
                        return;//結束transfer
                    }
                    //如果沒完成,嘗試將sc -1. 表示這個線程結束幫助擴容了,將 sc 的低 16 位減一。
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        //如果 sc - 2 不等於標識符左移 16 位。如果他們相等了,說明沒有線程在幫助他們擴容了。也就是說,擴容結束了。
                        /**
                         *第一個擴容的線程,執行transfer方法之前(helpTransfer方法中),會設置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
                         *後續幫其擴容的線程,執行transfer方法之前,會設置 sizeCtl = sizeCtl+1
                         *每一個退出transfer的方法的線程,退出之前,會設置 sizeCtl = sizeCtl-1
                         *那麼最後一個線程退出時:
                         *必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
                        */
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;// 不相等,說明不到最後一個線程,直接退出transfer方法
                        finishing = advance = true;// 如果相等,擴容結束了,更新 finising 變量
                        i = n; // recheck before commit,最後退出的線程要重新check下是否全部遷移完畢
                    }
                }
                else if ((f = tabAt(tab, i)) == null) // 獲取老tab的i下標位置的變量,如果是 null,就使用 fwd 佔位。
                    advance = casTabAt(tab, i, null, fwd);// 如果成功寫入 fwd 佔位,再次推進一個下標
                else if ((fh = f.hash) == MOVED)// 如果不是 null 且 hash 值是 MOVED。
                    advance = true; // already processed,說明別的線程已經處理過了,再次推進一個下標
                else {// 到這裏,說明這個位置有實際值了,且不是佔位符。對這個節點上鎖。為什麼上鎖,防止 putVal 的時候向鏈表插入數據
                    synchronized (f) {
                        // 判斷 i 下標處的桶節點是否和 f 相同
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;// low, height 高位桶,低位桶
                            // 如果 f 的 hash 值大於 0 。TreeBin 的 hash 是 -2
                            if (fh >= 0) {
                                // 對老長度進行與運算(第一個操作數的的第n位於第二個操作數的第n位如果都是1,那麼結果的第n為也為1,否則為0)
                                // 由於 Map 的長度都是 2 的次方(000001000 這類的数字),那麼取於 length 只有 2 種結果,一種是 0,一種是1
                                //  如果是結果是0 ,Doug Lea 將其放在低位,反之放在高位,目的是將鏈表重新 hash,放到對應的位置上,讓新的取於算法能夠擊中他。
                                int runBit = fh & n;
                                Node<K,V> lastRun = f; // 尾節點,且和頭節點的 hash 值取於不相等
                                // 遍歷這個桶
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    // 取於桶中每個節點的 hash 值
                                    int b = p.hash & n;
                                    // 如果節點的 hash 值和首節點的 hash 值取於結果不同
                                    if (b != runBit) {
                                        runBit = b; // 更新 runBit,用於下面判斷 lastRun 該賦值給 ln 還是 hn。
                                        lastRun = p; // 這個 lastRun 保證後面的節點與自己的取於值相同,避免後面沒有必要的循環
                                    }
                                }
                                if (runBit == 0) {// 如果最後更新的 runBit 是 0 ,設置低位節點
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun; // 如果最後更新的 runBit 是 1, 設置高位節點
                                    ln = null;
                                }// 再次循環,生成兩個鏈表,lastRun 作為停止條件,這樣就是避免無謂的循環(lastRun 後面都是相同的取於結果)
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    // 如果與運算結果是 0,那麼就還在低位
                                    if ((ph & n) == 0) // 如果是0 ,那麼創建低位節點
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else // 1 則創建高位
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                // 其實這裏類似 hashMap
                                // 設置低位鏈表放在新數組的 i
                                setTabAt(nextTab, i, ln);
                                // 設置高位鏈表,在原有長度上加 n
                                setTabAt(nextTab, i + n, hn);
                                // 將舊的鏈表設置成佔位符,表示處理過了
                                setTabAt(tab, i, fwd);
                                // 繼續向後推進
                                advance = true;
                            }// 如果是紅黑樹
                            else if (f instanceof TreeBin) {
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> lo = null, loTail = null;
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                // 遍歷
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    // 和鏈表相同的判斷,與運算 == 0 的放在低位
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    } // 不是 0 的放在高位
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                // 如果樹的節點數小於等於 6,那麼轉成鏈表,反之,創建一個新的樹
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                // 低位樹
                                setTabAt(nextTab, i, ln);
                                // 高位數
                                setTabAt(nextTab, i + n, hn);
                                // 舊的設置成佔位符
                                setTabAt(tab, i, fwd);
                                // 繼續向後推進
                                advance = true;
                            }
                        }
                    }
                }
            }
        }
    1. addCount:計數

      // 從 putVal 傳入的參數是x=1,check=binCount默認是0,只有hash衝突了才會大於1,且他的大小是鏈表的長度(如果不是紅黑樹結構的話,紅黑樹=2)。
          private final void addCount(long x, int check) {
              CounterCell[] as; long b, s;
               //如果計數盒子不是空或者修改 baseCount 失敗
              if ((as = counterCells) != null ||
                  !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                  CounterCell a; long v; int m;
                  boolean uncontended = true;
                   // 如果計數盒子是空(尚未出現併發)
                   // 如果隨機取餘一個數組位置為空 或者
                   // 修改這個槽位的變量失敗(出現併發了)
                   // 執行 fullAddCount 方法,在fullAddCount自旋直到CAS操作成功才結束退出
                  if (as == null || (m = as.length - 1) < 0 ||
                      (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                      !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                      fullAddCount(x, uncontended);
                      return;
                  }
                  if (check <= 1)
                      return;
                  s = sumCount();
              }
              // 檢查是否需要擴容,在 putVal 方法調用時,默認就是要檢查的(check默認是0,鏈表是鏈表長度,紅黑樹是2),如果是值覆蓋了,就忽略
              if (check >= 0) {
                  Node<K,V>[] tab, nt; int n, sc;
                  // 如果map.size() 大於 sizeCtl(達到擴容閾值需要擴容) 且
                  // table 不是空;且 table 的長度小於 1 << 30。(可以擴容)
                  while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                         (n = tab.length) < MAXIMUM_CAPACITY) {
                      // 根據 length 得到一個標識
                      int rs = resizeStamp(n);
                      if (sc < 0) {//表明此時有別的線程正在進行擴容
                          // 如果 sc 的低 16 位不等於 標識符(校驗異常 sizeCtl 變化了)
                          // 如果 sc == 標識符 + 1 (擴容結束了,不再有線程進行擴容)(默認第一個線程設置 sc ==rs 左移 16 位 + 2,當第一個線程結束擴容了,就會將 sc 減一。這個時候,sc 就等於 rs + 1)
                          // 如果 sc == 標識符 + 65535(幫助線程數已經達到最大)
                          // 如果 nextTable == null(結束擴容了)
                          // 如果 transferIndex <= 0 (轉移狀態變化了)
                          // 結束循環
                          if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                              sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                              transferIndex <= 0)
                              break;
                          // 不滿足前面5個條件時,嘗試參与此次擴容,把正在執行transfer任務的線程數加1,+2代表有1個,+1代表有0個,表示多了一個線程在幫助擴容,執行transfer
                          if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                              transfer(tab, nt);
                      }
                      //如果不在擴容,將 sc 更新:標識符左移 16 位 然後 + 2. 也就是變成一個負數。高 16 位是標識符,低 16 位初始是 2.
                      //試着讓自己成為第一個執行transfer任務的線程
                      else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                   (rs << RESIZE_STAMP_SHIFT) + 2))
                          transfer(tab, null);
                      s = sumCount();// 重新計數,判斷是否需要開啟下一輪擴容
                  }
              }
          }
  1. get:

    public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            //得到hash
            int h = spread(key.hashCode());
            //table有值,且查找到的槽位有值(tabAt方法通過valatile讀)
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                //hash、key、value都相同返回當前查找到節點的值
                if ((eh = e.hash) == h) {
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                //遍歷特殊節點:紅黑樹、已經遷移的節點(ForwardingNode)等
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                //遍歷node鏈表(e.next也是valitle變量)
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
  2. remove:

    public V remove(Object key) {
            return replaceNode(key, null, null);
        }
        //通過volatile設置第i個節點的值
        static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
            U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
        }
        final V replaceNode(Object key, V value, Object cv) {
            int hash = spread(key.hashCode());
            //自旋
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                //數組或查找的槽位為空,結束自旋返回null
                if (tab == null || (n = tab.length) == 0 ||
                    (f = tabAt(tab, i = (n - 1) & hash)) == null)
                    break;
                //正在擴容,幫助擴容
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;//返回的舊值
                    boolean validated = false;//是否進行刪除鏈表或紅黑樹節點
                    synchronized (f) {//槽位加鎖
                        //getObjectVolatile獲取tab[i],如果此時tab[i]!=f,說明其他線程修改了tab[i]。回到for循環開始處,重新執行
                        if (tabAt(tab, i) == f) {//槽位節點沒有變化
                            if (fh >= 0) {//槽位節點是鏈表
                                validated = true;
                                //遍歷鏈表
                                for (Node<K,V> e = f, pred = null;;) {
                                    K ek;
                                    //hash、key、value相同
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        V ev = e.val;//臨時節點緩存當前節點值
                                        //值相同
                                        if (cv == null || cv == ev ||
                                            (ev != null && cv.equals(ev))) {
                                            oldVal = ev;//給舊值賦值
                                            if (value != null)//值覆蓋,replace()調用
                                                e.val = value;
                                            else if (pred != null)//有前節點,表示當前節點不是頭節點
                                                pred.next = e.next;//刪除當前節點
                                            else
                                                setTabAt(tab, i, e.next);//刪除頭節點,即更新當前槽位(數組槽位)節點為頭節點的下一節點
                                        }
                                        break;
                                    }
                                    //當前節點不是目標節點,繼續遍歷下一個節點
                                    pred = e;
                                    //到達鏈表尾部,依舊沒有找到,跳出循環
                                    if ((e = e.next) == null)
                                        break;
                                }
                            }
                            else if (f instanceof TreeBin) {//紅黑樹
                                validated = true;
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> r, p;
                                //樹有節點且查找的節點不為null
                                if ((r = t.root) != null &&
                                    (p = r.findTreeNode(hash, key, null)) != null) {
                                    V pv = p.val;
                                    //值相同
                                    if (cv == null || cv == pv ||
                                        (pv != null && cv.equals(pv))) {
                                        oldVal = pv;//給舊值賦值
                                        if (value != null)//值覆蓋,replace()調用
                                            p.val = value;
                                        else if (t.removeTreeNode(p))//刪除節點成功
                                            setTabAt(tab, i, untreeify(t.first));//更新當前槽位(數組槽位)節點為樹的第一個節點
                                    }
                                }
                            }
                        }
                    }
                    if (validated) {
                        //如果刪除了節點,更新size
                        if (oldVal != null) {
                            if (value == null)
                                addCount(-1L, -1);//數量-1
                            return oldVal;
                        }
                        break;
                    }
                }
            }
            return null;
        }

  三.總結

  1.  put:使用cas插入,如果是鏈表或樹節點才會加鎖同步操作,提高了性能

    1. 不允許有key或value為null,否則拋出異常;
    2. 在第一次put時初始化table(initTable()),初始化有併發控制,通過sizeCtl變量判斷,sizeCtl<0表示已經有線程在初始化,當前線程就不在進行,否則sizeCtl置為-1(CAS)並創建數組;
    3. 當hash計算出的槽位節點為null時,使用CAS插入元素;
    4. 當hash為MOVED(-1)時,幫助擴容,但可能幫助不了,因為每個線程默認16個桶,如果只有16個桶,第二個線程無法幫助擴容;
    5. 如果hash衝突了,同步槽位節點,如果槽位是鏈表結構,進行鏈表操作,覆蓋舊值或插入到鏈表尾部;如果是樹結構,添加到樹中;
    6. 元素添加到鏈表或樹中,如果鏈表長度大於8,將鏈錶轉換為紅黑樹;
    7. 調用addCount(),對size+1,並判斷是否需要擴容addCount(),如果是值覆蓋操作就不需要調用該方法;
  2. initTable:初始化

    1. 數組table為null才進行初始化,否則直接返回舊數組;
    2. 如果當前sizeCtl小於0,表示有線程正在初始化,則當前線程禮讓CPU,保證只有一個線程正在初始化數組;
    3. 如果沒有線程在初始化,則當前線程CAS將sizeCtl置為-1並創建數組,然後重新計算閥值;
  3. helpTransfer:幫助擴容

    1. 當嘗試插入操作時,發現節點是forward類型,則會幫助擴容;
    2. 每次加入一個線程都會將sizeCtl的低16位+1,同時校驗高16位的標識符;
    3. 擴容最大的幫助線程是65535,這是低16位的最大值限制;
    4. 每個線程默認分配16個桶,如果桶的數量是16,那麼第二個線程無法幫助擴容,即桶被分配完其他線程無法進場擴容;
  4. transfer:擴容和數據遷移

    1. 根據CPU核數平均分配給每個CPU相同數量的桶,如果不夠16個,默認就是16個;
    2. 按照2倍容量進行擴容;
    3. 每個線程在處理完自己領取的區間后,還可以繼續領取,如果還有的話,通過transferIndex變量遞減16實現桶數量控制;
    4. 每次處理空桶的時候,會把當前桶標識為forward節點,告訴put的其他線程說“我正在擴容,快來幫忙”,但如果只有16個桶,只能有一個線程進行擴容;
    5. 如果有了佔位符MOVED,表示已經被處理過,跳過這個桶,繼續推進處理其他桶;
    6. 如果有真正的實際值,那麼就同步加鎖頭節點,防止putVal的併發;
    7. 同步塊里將鏈表拆分成兩份,根據 hash & length 得到是否是0,如果是0,放在新數組低位,反之放在length+i的高位。這是防止下次取值hash找不到正確的位置;
    8. 如果該桶類型是紅黑樹,也會拆分成2個,然後判斷拆分過的桶的大小是否小於等於6,如果是轉換成鏈表;
    9. 線程處理完如果沒有可選區間,且任務沒有完成,則會將整個表檢查一遍,防止遺漏;
  5. addCount:擴容判斷

    1. 當插入結束時,會對size+1,並判斷是否需要擴容的判斷;
    2. 優先使用計數盒子(如果不是空,說明併發了),如果計數盒子是空,使用baseCount變量+1;
    3. 如果修改baseCount失敗,使用計數盒子,如果還是修改失敗,在fullAddCount()中自旋直到CAS操作成功;
    4. 檢查是否需要擴容;
    5. 如果size大於等於sizeCtl且長度小於1<<30,可以擴容;
    6. 如果已經在擴容,幫助其擴容;
    7. 如果沒有在擴容,自行開啟擴容,更新sizeCtl變量為負數,賦值為標識符高16位+2;
  6. remove:刪除元素

    1. 自旋遍曆數量,如果數組或根據hash計算的槽位節點值為null,直接結束自旋返回null;
    2. 如果槽位節點正在擴容,幫助擴容;
    3. 如果槽位節點有值,同步加鎖;
    4. 如果該槽位節點還是沒有任何變化,判斷是鏈表結構類型節點還是樹結構類型節點,通過遍歷查找元素,找到刪除該節點或重新設置頭節點;
    5. 如果刪除了節點,更新size-1,如果有舊值則返回舊值,否則返回null;

  四.參考

  1. https://www.jianshu.com/p/2829fe36a8dd
  2. https://www.jianshu.com/p/487d00afe6ca
  3. https://juejin.im/post/5b001639f265da0b8f62d0f8#comment

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

讀懂操作系統之虛擬內存TLB與緩存(cache)關係篇(四)

前言

前面我們講到通過TLB緩存頁表加快地址翻譯,通過上一節緩存原理的講解為本節做鋪墊引入TLB和緩存的關係,同時我們來完整梳理下從CPU產生虛擬地址最終映射為物理地址獲取數據的整個過程是怎樣的,若有錯誤之處,還請批評指正。

TLB和緩存串行訪問(Serial TLB & Cache Access)

這裡會跳過前面對虛擬頁號、虛擬頁偏移量、TLB索引和標記等的詳細分析和計算,不清楚的童鞋請先查看前面文章再來看本文。假設我們有14位的虛擬地址、12位的物理地址,每頁大小為64字節,如下:

 

 

同時假設已完全清楚虛擬地址和物理地址劃分,接下來則是針對虛擬地址和物理地址進行位劃分,如下:

同時我們假設TLB是通過組相聯來進行映射,TLB中有16個條目,4路相聯,所以TLB索引(TI)和TLB標記(TT)在虛擬地址中虛擬頁號進行位劃分如下: 

 

我們假設緩存採取直接映射的機制,緩存大小為64字節,每塊大小為4個字節,說明緩存有16塊即4位,位偏移為2位,所以緩存索引(CI)和緩存標記(CT)在物理地址中進行位劃分如下:

現假設讀取虛擬地址(0x0255),那麼將其劃分為VPN(0x09),VPO(0x15),然後將VPN劃分為TT(0x02)和TI(1)如下:

接下來通過TT(0x02)和TI(1)去查找TLB,如下:

此時我們會發現TLB缺失,緊接着通過VPN(0x0916 = 2110)去頁表中查找得到PPN(0x1716 = 2310),如下: 

 

因其PPO = VPO(0x15),所以計算出物理地址為(23 * 64+21 = 149310 = 0x5D516

然後根據上述物理地址劃分為CT(0x17)、CI(5)、CO(1),如下:

最後通過上述CT(0x17)和CI(5)去查找緩存,此時緩存命中,然後將數據發送到CPU,如下:

從CPU到獲取數據整個的過程是這樣的:【1】CPU產生虛擬地址【2】TLB翻譯成物理地址【3】TLB命中,將物理地址發送到緩存【4】緩存命中返回數據。其中每一個過程涉及到的細節,比如TLB缺失、頁缺失等等前面已有詳細講解,殊途同歸,大致過程則是如下圖解

通過如上可看出此時TLB與Cache是串行訪問的關係,這是最簡單同時也是比較慢的方式,因為不得不等待TLB翻譯完成后才去檢查緩存中是否有數據,如此一來將對CPU處理速度產生重大影響,涉及到大量內存訪問時間。

TLB和緩存并行訪問(Parallel TLB & Cache Access)

當前處理器最普遍的設計是採取TLB和Cache并行的方式,有些也稱之為重疊訪問(Overlapping TLB & Cache Access),從而提高訪問速度,那麼并行訪問到底是如何做的呢?有沒有什麼使用限制呢?這裏我們以Intel Skylake(英特爾第六代微處理器架構)為例來說明,其虛擬地址和物理地址結構大致如下:

看到上述結構我們可以發現物理地址中的PPN和緩存標記(CT)位數相等以及其他,英特爾這樣設計就是為了讓TLB和Cache可以并行訪問。TLB和Cache并行訪問原理:虛擬地址(VA)中的高階位即(VPN)用來查找TLB,而低階位(VPO)用來查找緩存。通過TLB將VPN映射到PPN,此時PPN作為緩存標記(CT),而將VPO中的低階位作為緩存偏移量(CO),高階位作為緩存索引(CI)。有了緩存標記和緩存索引我們就可以查詢到數據,比如CPU產生虛擬地址(0x7916 = 00011110012),此時通過并行訪問則為如下圖解

我們結合上述圖解繼續進行分析將并行訪問分為三種情況,比如上緩存中的tag = 11,同時我們產生的PPN = tag = 11,說明緩存標記等於物理頁號,同時緩存命中,最終返回數據B5給CPU(其一)。假設產生的緩存標記不是11,那麼說明緩存標記不等於頁號或者緩存缺失,但此時TLB命中,那麼將通過TLB中的物理頁號直接訪問主存(其二)。否則做標準的虛擬地址翻譯(其三)。為便於大家理解,我們通過偽代碼形式來說明:

if (cache hit && cache tag = PPN)
  //返回數據到CPU
else if (cache miss || cache tag != PPN && TLB hit) 
  //通過TLB中的PPN訪問主存
else
  //標準地址翻譯

兩種緩存架構(Cache & TLB Access)

緩存索引(Cache index)用於查找數據在緩存中的索引位置,而緩存標記(Cache tag)則是驗證緩存中有哪些數據。從上述對并行訪問原理講解我們知道將虛擬地址中的虛擬偏移量可作為物理緩存索引,這裏我們稱之為虛擬索引,同時我們將VPN轉換為PPN,這種模式稱之為虛擬索引、物理標記緩存架構(Virtual-indexed Physically-tagged Caches),其實我們也可以將虛擬地址中的偏移量作為緩存標記,也就是說虛擬地址中的偏移量(VPO)既作為緩存索引也作為緩存標記,這種緩存架構成為虛擬索引、虛擬標記緩存架構(Virtual-indexed Virtually-tagged Caches),也叫虛擬地址緩存(Virtual Address Caches),接下來我們來分析這兩種緩存架構。

虛擬索引、虛擬標記緩存(Virtual-indexed Virtually-tagged Caches)

 

 

此種緩存架構讓緩存保存虛擬地址,但是現代處理器極少使用這種緩存設計,雖然很塊,但是處理起來很複雜, 比如進行上下文切換時需要刷新緩存(當然可以在地址空間添加ASID),但是即使這樣,由於頁面可以共享而造成處理頁面別名問題,用於直接映射緩存的解決方案,共享頁面的VA必須在緩存索引位中一致,確保訪問同一PA的所有VA將在直接映射的緩存(早期SPARC)中發生衝突,所以大多處理器採用第二種(VA-PA)緩存架構。

虛擬索引、物理標記緩存(Virtual-indexed Physically-tagged Caches)

 

并行TLB & Cache訪問採取的就是此種架構,此種架構要求緩存索引完全包含在虛擬地址中的虛擬偏移量中。緩存標記和PPN相等(當然第一種)當查詢緩存時也執行TLB訪問,它是當前處理器最常見的設計,我們知道緩存使用的是物理地址,而CPU產生的是虛擬地址,這也就意味着沒有TLB就無法完成緩存查找。前面我們了解到緩存數據存儲結構存在直接映射、組相聯、全相聯三種結構,在此種緩存架構中有使用限制,我們首先來看看直接映射。

 

并行訪問的本質在於緩存查詢數據無需等待TLB完成,二者可同時開始,所以當兩者訪問完成后需要進行比較,如果(cache size <=  page size)即(C = L + b) <= P才有效,因為對於緩存的所有輸入都無需進行任何翻譯。

通過組相聯增加了緩存的關聯性從而減少索引到緩存所需地址的位數,在訪問完成後進行比較,如果(cache size) / (associativity) ≤ page size即(C <= P + A)才有效。對於緩存和TLB都採用的組相聯從而減少缺失率,所以對於并行訪問中的緩存組相聯映射必須滿足(cache size) / (associativity) ≤ page size。那麼問題來了,如果一個緩存大小為64KB,採用2路相聯,頁大小>=4k,那麼可以進行并行訪問TLB & Cache嗎?很顯然不能,如下

緩存大小:64KB = 216     ————-》 C = 16

組相聯:2                        ————-》 A = 1

頁大小: 4KB = 212        ————–》P >= 12

那麼問題又來了,對於一個16位的虛擬地址,頁大小為64字節,緩存大小為256b,採用8路相聯的1級緩存且有16塊,那麼可以并行訪問TLB &Cache嗎?請輸出原因。

總結

本節我們詳細介紹了TLB &Cache二者的關係,採用并行訪問通過VPN查找TLB,VPO查詢緩存同時進行來提高訪問速度。下一節我們進入頁表數據結構的詳細講解,謝謝。 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準

進制數轉換方法(八/十六/十)

進制轉換算法

 

二/八/十六進制 → 十進制

 

  • 二進制 → 十進制

  方法:二進制數從低位到高位(即從右往左)計算,第0位的權值是2的0次方,第1位的權值是2的1次方,第2位的權值是2的2次方,依次遞增下去,把最後的結果相加的值就是十進制的值了。

  例:將二進制的(101011)B轉換為十進制的步驟如下:

1. 第0位 1 x 2^0 = 1;

2. 第1位 1 x 2^1 = 2;

3. 第2位 0 x 2^2 = 0;

4. 第3位 1 x 2^3 = 8;

5. 第4位 0 x 2^4 = 0;

6. 第5位 1 x 2^5 = 32;

7. 讀數,把結果值相加,1+2+0+8+0+32=43,即(101011)B=(43)D。

  • 八進制 → 十進制

  方法:八進制數從低位到高位(即從右往左)計算,第0位的權值是8的0次方,第1位的權值是8的1次方,第2位的權值是8的2次方,依次遞增下去,把最後的結果相加的值就是十進制的值了。

  八進制就是逢8進1,八進制數採用 0~7這八數來表達一個數。

  例:將八進制的(53)O轉換為十進制的步驟如下:

1. 第0位 3 x 8^0 = 3;

2. 第1位 5 x 8^1 = 40;

3. 讀數,把結果值相加,3+40=43,即(53)O=(43)D。

  • 十六進制 → 十進制

  方法:十六進制數從低位到高位(即從右往左)計算,第0位的權值是16的0次方,第1位的權值是16的1次方,第2位的權值是16的2次方,依次遞增下去,把最後的結果相加的值就是十進制的值了。

  十六進制就是逢16進1,十六進制的16個數為0123456789ABCDEF。

  例:將十六進制的(2B)H轉換為十進制的步驟如下:

1. 第0位 B x 16^0 = 11;

2. 第1位 2 x 16^1 = 32;

3. 讀數,把結果值相加,11+32=43,即(2B)H=(43)D。

十進制 → 二、八、十六進制

  • 十進制 → 二進制

  方法:除2取余法,即每次將整數部分除以2,餘數為該位權上的數,而商繼續除以2,餘數又為上一個位權上的數,這個步驟一直持續下去,直到商為0為止,最後讀數時候,從最後一個餘數讀起,一直到最前面的一個餘數。 

  例:將十進制的(43)D轉換為二進制的步驟如下:

1. 將商43除以2,商21餘數為1;

2. 將商21除以2,商10餘數為1;

3. 將商10除以2,商5餘數為0;

4. 將商5除以2,商2餘數為1;

5. 將商2除以2,商1餘數為0; 

6. 將商1除以2,商0餘數為1; 

7. 讀數,因為最後一位是經過多次除以2才得到的,因此它是最高位,讀数字從最後的餘數向前讀,101011,即(43)D=(101011)B。

  • 十進制 → 八進制

  方法1:除8取余法,即每次將整數部分除以8,餘數為該位權上的數,而商繼續除以8,餘數又為上一個位權上的數,這個步驟一直持續下去,直到商為0為止,最後讀數時候,從最後一個餘數起,一直到最前面的一個餘數。

  例:將十進制的(796)D轉換為八進制的步驟如下:

1. 將商796除以8,商99餘數為4;

2. 將商99除以8,商12餘數為3;

3. 將商12除以8,商1餘數為4;

4. 將商1除以8,商0餘數為1;

5. 讀數,因為最後一位是經過多次除以8才得到的,因此它是最高位,讀数字從最後的餘數向前讀,1434,即(796)D=(1434)O。

  方法2:使用間接法,先將十進制轉換成二進制,然後將二進制又轉換成八進制;

  • 十進制 → 十六進制

  方法1:除16取余法,即每次將整數部分除以16,餘數為該位權上的數,而商繼續除以16,餘數又為上一個位權上的數,這個步驟一直持續下去,直到商為0為止,最後讀數時候,從最後一個餘數起,一直到最前面的一個餘數。

  例:將十進制的(796)D轉換為十六進制的步驟如下:

1. 將商796除以16,商49餘數為12,對應十六進制的C;

2. 將商49除以16,商3餘數為1;

3. 將商3除以16,商0餘數為3;

4. 讀數,因為最後一位是經過多次除以16才得到的,因此它是最高位,讀数字從最後的餘數向前讀,31C,即(796)D=(31C)H。

  方法2:使用間接法,先將十進制轉換成二進制,然後將二進制又轉換成十六進制;

二進制 ↔ 八、十六進制

  • 二進制 → 八進制

  方法:取三合一法,即從二進制的小數點為分界點,向左(向右)每三位取成一位,接着將這三位二進制按權相加,然後,按順序進行排列,小數點的位置不變,得到的数字就是我們所求的八進制數。如果向左(向右)取三位后,取到最高(最低)位時候,如果無法湊足三位,可以在小數點最左邊(最右邊),即整數的最高位(最低位)添0,湊足三位。

  例:將二進制的(11010111.0100111)B轉換為八進制的步驟如下:

1. 小數點前111 = 7;

2. 010 = 2;

3. 11補全為011,011 = 3;

4. 小數點后010 = 2;

5. 011 = 3;

6. 1補全為100,100 = 4;

7. 讀數,讀數從高位到低位,即(11010111.0100111)B=(327.234)O。

  • 八進制 → 二進制

  方法:取一分三法,即將一位八進制數分解成三位二進制數,用三位二進制按權相加去湊這位八進制數,小數點位置照舊。

  例:將八進制的(327)O轉換為二進制的步驟如下:

1. 3 = 011;

2. 2 = 010;

3. 7 = 111;

4. 讀數,讀數從高位到低位,011010111,即(327)O=(11010111)B

  • 二進制 → 十六進制

  方法:取四合一法,即從二進制的小數點為分界點,向左(向右)每四位取成一位,接着將這四位二進制按權相加,然後,按順序進行排列,小數點的位置不變,得到的数字就是我們所求的十六進制數。如果向左(向右)取四位后,取到最高(最低)位時候,如果無法湊足四位,可以在小數點最左邊(最右邊),即整數的最高位(最低位)添0,湊足四位。

  例:將二進制的(11010111)B轉換為十六進制的步驟如下:

1. 0111 = 7;

2. 1101 = D;

3. 讀數,讀數從高位到低位,即(11010111)B=(D7)H。

  • 十六進制 → 二進制

  方法:取一分四法,即將一位十六進制數分解成四位二進制數,用四位二進制按權相加去湊這位十六進制數,小數點位置照舊。

  例:將十六進制的(D7)H轉換為二進制的步驟如下:

1. D = 1101;

2. 7 = 0111;

3. 讀數,讀數從高位到低位,即(D7)H=(11010111)B。

八進制 ↔ 十六進制

  • 八進制 → 十六進制

  方法:將八進制轉換為二進制,然後再將二進制轉換為十六進制,小數點位置不變。

  例:將八進制的(327)O轉換為十六進制的步驟如下:

1. 3 = 011;

2. 2 = 010;

3. 7 = 111;

4. 0111 = 7;

5. 1101 = D;

6. 讀數,讀數從高位到低位,D7,即(327)O=(D7)H。

  • 十六進制 → 八進制

  方法:將十六進制轉換為二進制,然後再將二進制轉換為八進制,小數點位置不變。

  例:將十六進制的(D7)H轉換為八進制的步驟如下:

1. 7 = 0111;

2. D = 1101;

3. 0111 = 7;

4. 010 = 2;

5. 011 = 3;

6. 讀數,讀數從高位到低位,327,即(D7)H=(327)O。

 

十進制的小數到二進制的轉換

  • 將小數部分0.125乘以2,得0.25,然後取整數部分0

      

 

  • 再將小數部分0.25乘以2,得0.5,然後取整數部分0
  • 再將小數部分0.5乘以2,得1,然後取整數部分1
  • 得到的二進制的結果就是0.001

二進制到十進制的轉換

  • 0.001第一位為0,則0*1/2,即0乘以2負 一次方

  • 0.001第二位為0,則0*1/4,即0乘以2的負二次方。
  • 0.001第三位為1,則1*1/8,即1乘以2的負三次方。

  • 各個位上乘完之後,相加,0*1/2+0*1/4+1*1/8得十進制的0.125

注:一些內容通過上網查找並進行了整理

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

C Primer Plus(三)

重讀C Primer Plus ,查漏補缺

  重讀C Primer Plus,記錄遺漏的、未掌握的、不清楚的知識點

 

文件輸入/輸出

  1、fgets函數在讀取文件內容時會將換行符讀入,但gets不會,fputs函數在寫入文件時不會追加一個換行符,但puts會,應該對應配合使用。

  2、不同操作系統下,以文本方式打開文件,幾乎沒有區別,但由於不同操作系統文件結尾的的標識符不同,以二進制方式打開時,可能會將結尾標識符錯誤輸出。

  3、對於大型文件,有兩個特殊的函數提供支持:

1 int fgetpos(FILE * restrict stream, fpos_t * restrict pos);
2 int fsetpos(FILE * stream, const fpos_t *pos);

  其中,fpos_t是通過其他類型定義的文件定位類型,在使用上述函數時,fsetpos中的pos必須是通過fgetpos函數獲得的。當兩個函數執行成功時,會返回0。

  4、其他標準IO函數

 1 size_t fwrite(const void * restrict ptr, size_t size, size_t nmemb, FILE* restrict fp);
 2 size_t fread(void * restrict ptr, size_t size, size_t nmemb, FILE* restrict fp);
 3 // 是否到達文件結尾
 4 int feof(FILE* fp);
 5 // 是否發生讀寫錯誤
 6 int ferror(FILE* fp);
 7 // 將字符迴流進緩衝區
 8 int ungetc(int c, FILE* fp)
 9 // 立刻將緩衝區內容寫入文件
10 int fflush(FILE* fp)
11 // 替換緩衝區
12 int setvbuf(FILE* restrict fp, char * restrict buf, int mode, size_t size)

  當然,上述的一些函數在目前的VS Studio中會被認為是不安全的函數,已經過時。

  

結構和其他數據格式

   5、C99標準下支持對結構體初始化時的任意字段賦值:

1 struct book gift = {.value=25.99, .author="Harry Potter", .title="Yoo"};
2 // 此時 0.25 會被賦給定義結構體時author后的那個成員,即便那個成員已經被初始化過。
3 struct book gift = {.value=25.99, .author="Harry Potter", 0.25};

  6、對於結構體數組,數組名不是其首個元素的地址,需要引用首個元素再取地址。

  7、在結構中一般使用字符數組,而不使用字符指針,結構中的字符指針無法很好的初始化地址,這樣會有使用上的風險,所以結構中的字符指針最好只指向那些字符串常量或者是指向由malloc分配的內存。

  8、C99標準對結構也支持複合文字,同時複合文字的結構也可以作為函數參數,也可以取地址,也和普通變量有相同的生存周期,聲明方式如下:

1 (struct book) {"The Idiot", "Fyodor Dostoyevsky", 6.99}

  9、C99支持一種伸縮型數組成員,這個成員必須是結構中最後一個成員,而且不是唯一一個成員,就像聲明普通數組一樣,但括號內為空,這個成員不會在聲明后立即存在,實際上,C99希望使用malloc為這樣含有伸縮型成員的數組分配空間。

1 struct flex{
2     int count;
3     double avreage;
4     double scores[]; // 伸縮型成員
5 }
6 struct flex * pf;
7 pf = malloc(sizeof(struct flex) + 5*sizeof(double))
8 pf->count = 5;
9 pf->scores[2] = 2.99;

  10、對於C中的枚舉類型,某些屬性不能順延至C++,例如C允許對枚舉做++運算,但C++不允許。

1 enum spectrum {red, yellow, green, blue};
2 spectrum color;
3 for(color = red; color != blue; color++);

  11、在C中,對於同一作用域下的標記和變量名可以使用同一個名字,因為對於標記(枚舉、結構,聯合),他們使用的名字空間與普通變量不同,但C++中不可以,例:

1 struct complex{double x, double y};
2 int complex; // 在C中不會引起衝突,但C++中則不允許

  12、對於函數指針執行函數時,會出現兩種語法,ANSI C把他們視為等價的。

1 void ToUpper(char *);
2 void (*pf) (char*);
3 char str[] = "hello";
4 pf = ToUpper;
5 (*pf)(str); // 語法1
6 pf(str);    // 語法2

  

位操作

  13、為什麼一個字節可以表示的有符號整數的範圍是-128~+127?

  看這裏:https://www.cnblogs.com/Dylan7/p/12649972.html

  14、計算機中小數是如何表示的?(一部分表示指數,一部分表示小數,有精確度問題)

  15、對位進行操作的第二種方法就是位字段(從沒用過,細節可以用到時再研究),位字段好比一個結構體,但其中的成員,代表的是某幾位上的值,好處是避免了通過複雜的位運算去控制某些位上的值,聲明例如:

1 struct box
2 {
3     unsigned int opaque       :1 // 整體結構的對齊補齊依據無符號整型
4     unsigned int fill_color   :3 // 数字代表需要幾位來表示這個字段
5     unsigned int              :4 // 可以跳過一些位
6     unsigned int show_border  :1 // 但一個字段不能橫跨兩個無符號整型的邊界
7 }
8 struct box b;
9 b.fill_color = 7; // 不可以超過字段所佔用的位可表示的上限

  

C預處理器和C庫

  16、程序翻譯的第一步,在預處理前,編譯器會對代碼做一些翻譯,將代碼中出現的字符映射到源字符集(用來處理多字節字符和使C外觀更加國際化的三元字符擴展),接着查找反斜杠后緊跟換行符的實例,將其轉換為一行,然後將文本劃分為預處理的語言符號序列以及空白字符及註釋序列(將用一個空格代替一個註釋),最後進入預處理階段,尋找每一個預處理指令。

  17、 幾個宏定義

1 #define F(x) #x      // #將語言符號字符串化
2 #define F(x) F##x    // ##將兩個語言符號組成一個語言符號
3 #define F(x,...)  printf("x", __VA_ARGS__)  // ...和__VA_ARGS__,可變參數(必須為最後一個參數)

  18、#if 指令後面跟常量整數表達式,可以與 #elif 配合使用,例如:

1 #if 1 == SYS
2     ...
3 #elif 2 == SYS
4     ...
5 #endif    

  同時,還有以下新的實現方式,defined 是一個預處理運算符,如果參數使用#define定義過,defined返回1,否則返回0。

1 #if defined(INMPC)
2     ...
3 #elif defined(VAX)
4     ...
5 #endif   

  19、#line 用於重置__LINE__,__FILE__宏所報告的行數

    #error 指令使預處理器可以發出一條錯誤信息

1 #line 10000
2 #line 10 cool.c"
3 #if __STD_VERSION__ != 199901L
4     #error Not C99
5 #endif

  20、C99 提供了_Pragma預處理器運算符,可以將字符串轉換成常規的編譯指示

1 _Pragma("c99 on") 等價於
2 #pragma c99 on

  21、內聯函數不會在調試器中显示,例如使用gdb調試時,有些內聯函數無法被手動執行,同時內聯函數具有內部鏈接屬性,所以在多文件程序中,使用其他文件的內聯函數時,要單獨聲明一次,並且在嘗試獲 取內聯函數的地址時,編譯器都會產生非內聯函數,也就是說可能產生外部定義的函數。

  23、在main()函數結束時,會隱式地調用exit()函數,同時,可以通過atexit()函數,向exit()註冊在程序允許結束時執行的函數,ANSI C保證可以設置至少32個函數,按照先設置后執行的順序執行,atexit()函數接受一個返回值為void,參數也為void的函數指針作為唯一參數。

  24、memcpy()與memmove()兩個函數的區別在於聲明上,以及memcpy()會假定兩個內存區沒有重疊。

1 void *memcpy(void * restrict s1, const void * restrict s2, size_t n);
2 void *memmove(void *s1, void *s2, size_t n);

  25、可變參數的相關內容包含在stdarg.h頭文件中,使用起來比較複雜,包括初始化可變參數列表,遍歷列表,清理列表,拷貝列表等一系列操作,需要時再研究。

 

高級數據表示

  26、 這章沒什麼新奇內容,但它告訴我們,用C可以實現很多複雜的數據結構。

 

 

  2020年4月16日,星期五,晚23點09分,首次完整讀完這本書,共勉。

  學如逆水行舟,不進則退;心似平原放馬,易縱難收。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

從0到70%:Chrome上位揭秘!

最近的數據显示,Chrome在2020年4月的市場份額達到了70%左右,把微軟的Edge和Firefox遠遠甩在身後,毫無疑問,Chrome贏得了第二次遊覽器之戰,成為新一代王者。 

 

Chrome的第一版於2008年推出,當時微軟的IE瀏覽器幾乎佔到了60%的份額,Firefox緊隨其後,佔據了大概30%,Chrome在2008年僅占0.3%。 

十年以後Chrome就主導了瀏覽器的市場, 這一切是怎麼發生的? 

我們先來回顧一下第一次瀏覽器之戰,交戰的雙方是Netscape的Navigator瀏覽器和微軟的IE, Netscape佔據着先發優勢,IE則背靠Windows這棵大樹,雙方你來我往,戰事極為精彩,競爭帶來了技術進步,像JavaScript、DHTML、CSS、XMLHttpRequest等各種技術層出不窮。 

IE4成為這場戰爭的轉折點,因為它被集成到了Windows當中, 開箱即用,免費,誰還會去額外下載安裝收費的Netscape呢?  

Netscape從此潰不成軍,IE贏得了最後的勝利,從這張圖中可以清楚地看出IE和Netscape之間此消彼長的勢頭。 

 

IE的勝利也結束了Web瀏覽器的快速創新,早期的IE是积極進取的,比如AJAX的基石XMLHttpRequest就是IE率先提出來,並且發揚光大的。 

可是一旦垄斷形成,微軟就不思進取,不想更新了,2001年微軟發布IE6以後,在長達5年的時間里,IE居然沒有新版本發布! 

很多年輕的程序員沒有經歷過被IE支配的“恐懼”,那個時候開發網頁,必須要保證在IE6上能夠運行,否則你絕對活不了。巔峰時期IE6曾經達到過90%多的恐怖佔有率, 很多國企,內部系統都是基於IE6。 

雖然Netscape的瀏覽器戰敗,卻沒有因為死亡,Netscape把Navigator的代碼開源,捐給了非盈利的Mozilla基金會。在這裏Netscape幾經輾轉,終於在2004年涅槃重生,變身為著名的Firefox。 

 

我現在還記得第一次看到FireFox時的感受:非常輕薄!速度飛快!作為開發人員,我迅速就拋棄了老舊不堪的IE, 把Firefox作為主力的瀏覽器。 

Firefox也不負眾望,在此後的幾年中穩步上升,到2009年達到了30%多的佔有率,隱隱有成為下一個霸主的潛質。 

可是另外一個可怕的對手出現了,Google在2008年推出了一個叫做Chrome的瀏覽器,這個時候iPhone上市不到一年,Windows7剛剛面世,IE依然是最流行的瀏覽器。 

但是Google卻看到了不一樣的東西,它們認為現在的互聯網和10幾年前大不相同了,原來只是web pages,現在到處是applications,而瀏覽器的本質卻沒有變化。Google覺得自己有責任改變, 這幾幅漫畫講述了Google要推出Chrome的根本原因: 

 

 此時的Web已經進入Web2.0時代,像Google Map和GMail這樣的應用迫切需要瀏覽器能夠快速地裝載頁面,快速地執行JavaScript。 

Google下定決心,從零開始設計一個滿足現代Web應用的瀏覽器, 瀏覽器不僅僅是一個瀏覽網頁的工具,而是一個新的平台,在此之上可以在線完成各種事情,這種深刻的洞察力將給Google Chrome帶來極大的成功。 

Google給新瀏覽器定下的目標是:穩定,快速,安全,好用,開源。財大氣粗的它組建了一支豪華團隊來開發Chrome , 並且從Firefox挖走了好幾員幹將,如Ben Goodger,這可是Firefox的主力開發。 

與IE和Firefox相比,Chrome的一大優勢就是拋棄了遺留代碼的包袱,從頭開始設計,開發人員可以盡情地施展才華,他們帶來了兩個重要的創新: 

1. 在很早的時候就確定下了“沙箱”的機制:每個Tab頁都運行在自己的進程中,互不影響,充分利用多核。 

2. 開發了強勁的JavaScript執行引擎 V8,讓Web應用迅速地執行JavaScript代碼。 

2008年9月,Chrome推出Beta版,9個月後,即獲得3000萬用戶 

2009年12月,推出擴展程序庫,讓用戶安裝第三方插件,生態迅速繁榮。 

2010年,推出Web 應用商店。 

2012年2月,Chrome發布了Android版本, 6月推出iOS版本,此時市場份額達到30%以上 

2013年,為了對第三方的惡意擴展程序進行控制,Google要求所有的擴展必須託管在應用商店中 

……

 

一系列措施讓Chrome迅速蠶食了Firefox和IE的市場,從這幅圖可以清晰地看出IE(藍色線條)的沒落和Chrome(綠色線條)的崛起。

 

 

微軟豈會就此認輸?在這段時間內相繼推出IE7, IE8, IE9,IE10 , IE 11, 但是遺留的包袱讓它步履維艱,它那緩慢的速度經常成為大家調侃的對象: 

 

到了Windows 10 ,微軟另起爐灶,推出新的瀏覽器Edge,但也難挽敗局。 

微軟新CEO納德拉上台以後,一反原來封閉的形象,擁抱開源。2018年底,微軟宣布將會採用Google開源的Chromium為核心來構建Edge瀏覽器,從此Microsoft Edge和Google Chrome算是同源了,以後發展如何,我們拭目以待。 

Chome登上王位以後,對Google帶來了巨大的好處,因為Google本身提供了很多極為Web的服務:GMail, Google Map , Youtube, Google Gocs, Google Earth….  現在Google把瀏覽器端和服務器端都掌握了,那修改一下中間的協議也不算什麼了,對用戶來說,反正背後的協議也看不到,只要能變快就行。 

Google可以用Chrome試驗各種新協議,於是我們看到它對HTTP1.1動手,做了一個叫做SPDY協議的實驗,非常成功,成為了HTTP 2的基礎。然後又對傳輸層協議開刀,搞出了一個新的傳輸層協議QUIC,解決了TCP了諸多問題,有望把TCP給替換掉。基於QUIC,新的HTTP協議,即HTTP/3正在制定當中。 

尾聲

 Chrome的成功主要是因為Google深刻的洞察力,他們看到了Web未來的趨勢,迅速推出產品擁抱了這種趨勢。 

Chrome如今佔據了和當年的IE6一樣的主導地位, 一些批評聲音出現了,The verge有一篇報道說Google的很多Web應用都提倡“使用Chrome瀏覽效果最佳”, “Google Meet、Allo、YouTube TV、Google Earth 和 YouTube Studio Beta 都會阻止 Windows 10 系統的默認瀏覽器 Microsoft Edge 訪問它們,並指引用戶下載 Chrome 瀏覽器” ,“使用非Chrome瀏覽器訪問google.com會被提醒三次下載Chrome。”

 

 

Chrome最終會走向何方?你覺得Chrome會像IE那樣停滯不前嗎? 

參考資料:

https://en.wikipedia.org/wiki/Browser_wars

https://usefyi.com/chrome-history/

https://www.theverge.com/2018/1/4/16805216/google-chrome-only-sites-internet-explorer-6-web-standards

https://www.google.com/googlebooks/chrome/big_00.html

 

更多精彩文章,盡在碼農翻身

微服務把我坑了

如何降低程序員的工資?

程序員,你得選准跑路的時間!

兩年,我學會了所有的編程語言!

一直CRUD,一直996,我煩透了,我要轉型

字節碼萬歲!

上帝託夢給我說:一切皆文件

Javascript: 一個屌絲的逆襲

Node.js :我只需要一個店小二

我是一個線程

TCP/IP之大明郵差

一個故事講完Https

CPU 阿甘

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

設計模式系列之工廠模式三兄弟(Factory Pattern)

說明:設計模式系列文章是讀劉偉所著《設計模式的藝術之道(軟件開發人員內功修鍊之道)》一書的閱讀筆記。個人感覺這本書講的不錯,有興趣推薦讀一讀。詳細內容也可以看看此書作者的博客https://blog.csdn.net/LoveLion/article/details/17517213

工廠模式是最常用的一類創建型設計模式,通常我們所說的工廠模式是指工廠方法模式,它也是使用頻率最高的工廠模式。簡單工廠模式是工廠方法模式的“小弟”,它不屬於GoF23種設計模式,但在軟件開發中應用也較為頻繁,通常將它作為學習其他工廠模式的入門。此外,工廠方法模式還有一位“大哥”——抽象工廠模式。這三種工廠模式各具特色,難度也逐個加大,在軟件開發中它們都得到了廣泛的應用,成為面向對象軟件中常用的創建對象的工具。

簡單工廠模式

簡單工廠模式並不屬於GoF 23個經典設計模式,但通常將它作為學習其他工廠模式的基礎,它的設計思想很簡單。

模式定義

簡單工廠模式(Simple Factory Pattern):定義一個工廠類,它可以根據參數的不同返回不同類的實例,被創建的實例通常都具有共同的父類。因為在簡單工廠模式中用於創建實例的方法是靜態(static)方法,因此簡單工廠模式又被稱為靜態工廠方法(Static Factory Method)模式,它屬於類創建型模式。

簡單工廠模式的要點在於:當你需要什麼,只需要傳入一個正確的參數,就可以獲取你所需要的對象,而無須知道其創建細節。

模式結構圖

簡單工廠模式結構圖如下所示:

模式偽代碼

在使用簡單工廠模式時,首先需要對產品類進行重構,不能設計一個包羅萬象的產品類,而需根據實際情況設計一個產品層次結構,將所有產品類公共的代碼移至抽象產品類,並在抽象產品類中聲明一些抽象方法,以供不同的具體產品類來實現,典型的抽象產品類代碼如下所示:

public abstract class Product {
    // 所有產品的公共屬性

    // 所有產品類的公共業務方法
    public void methodSame() {
        //公共方法的實現
    }

    // 聲明抽象業務方法
    public abstract void methodDiff();
}

具體產品類中實現了抽象產品類中聲明的抽象業務方法。

public class ConcreteProduct extends Product {
    @Override
    public void methodDiff() {
        // 具體產品業務方法的實現
    }
}

簡單工廠模式的核心是工廠類,在沒有工廠類之前,客戶端一般會使用new關鍵字來直接創建產品對象,而在引入工廠類之後,客戶端可以通過工廠類來創建產品,在簡單工廠模式中,工廠類提供了一個靜態工廠方法供客戶端使用,根據所傳入的參數不同可以創建不同的產品對象,典型的工廠類代碼如下所示:

public class Factory {
    //靜態工廠方法
    public static Product getProduct(String arg) {
        Product product = null;
        if (arg.equalsIgnoreCase("A")) {
            product = new ConcreteProductA();
            //初始化設置product
        } else if (arg.equalsIgnoreCase("B")) {
            product = new ConcreteProductB();
            //初始化設置product
        }
        return product;
    }
}

客戶端代碼中,我們通過調用工廠類的工廠方法即可得到產品對象,典型代碼如下所示:

public class Client {
    public static void main(String[] args) {
        Product product;
        product = Factory.getProduct("A"); //通過工廠類創建產品對象
        product.methodSame();
        product.methodDiff();
    }
}

模式簡化

有時候,為了簡化簡單工廠模式,我們可以將抽象產品類工廠類合併,將靜態工廠方法移至抽象產品類中,如下圖所示。

客戶端可以通過產品父類的靜態工廠方法,根據參數的不同創建不同類型的產品子類對象,這種做法在JDK等類庫和框架中也廣泛存在。
比如:java.nio.charset.Charset

public abstract class Charset {

    /**
     * Returns a charset object for the named charset.
     */
    public static Charset forName(String charsetName) {
        java.nio.charset.Charset cs = lookup(charsetName);
        if (cs != null)
            return cs;
        throw new UnsupportedCharsetException(charsetName);
    }
}

模式小結

簡單工廠模式提供了專門的工廠類用於創建對象,將對象的創建和對象的使用分離開,它作為一種最簡單的工廠模式在軟件開發中得到了較為廣泛的應用。

使用場景:

  1. 工廠類負責創建的對象比較少,由於創建的對象較少,不會造成工廠方法中的業務邏輯太過複雜。
  2. 客戶端只知道傳入工廠類的參數,對於如何創建對象並不關心。

工廠方法模式

簡單工廠模式雖然簡單,但存在一個很嚴重的問題。當系統中需要引入新產品時,由於靜態工廠方法通過所傳入參數的不同來創建不同的產品,這必定要修改工廠類的源代碼,將違背“開閉原則”,如何實現增加新產品而不影響已有代碼?工廠方法模式應運而生。

模式定義

工廠方法模式中,我們不再提供一個統一的工廠類來創建所有的產品對象,而是針對不同的產品提供不同的工廠,系統提供一個與產品等級結構對應的工廠等級結構。工廠方法模式定義如下:

工廠方法模式(Factory Method Pattern):定義一個用於創建對象的接口,讓子類決定將哪一個類實例化。工廠方法模式讓一個類的實例化延遲到其子類。工廠方法模式又簡稱為工廠模式(Factory Pattern),又可稱作虛擬構造器模式(Virtual Constructor Pattern)或多態工廠模式(Polymorphic Factory Pattern)。工廠方法模式是一種類創建型模式。

模式結構圖

工廠方法模式提供一個抽象工廠接口來聲明抽象工廠方法,而由其子類來具體實現工廠方法,創建具體的產品對象。工廠方法模式結構如圖所示:

在工廠方法模式結構圖中包含如下幾個角色:

  • Product(抽象產品):它是定義產品的接口,是工廠方法模式所創建對象的超類型,也就是產品對象的公共父類。
  • ConcreteProduct(具體產品):它實現了抽象產品接口,某種類型的具體產品由專門的具體工廠創建,具體工廠和具體產品之間一一對應。
  • Factory(抽象工廠):在抽象工廠類中,聲明了工廠方法(Factory Method),用於返回一個產品。抽象工廠是工廠方法模式的核心,所有創建對象的工廠類都必須實現該接口。
  • ConcreteFactory(具體工廠):它是抽象工廠類的子類,實現了抽象工廠中定義的工廠方法,並可由客戶端調用,返回一個具體產品類的實例。

模式偽代碼

與簡單工廠模式相比,工廠方法模式最重要的區別是引入了抽象工廠角色,抽象工廠可以是接口,也可以是抽象類或者具體類,其典型代碼如下所示:

public interface Factory {
    Product factoryMethod();
}

在抽象工廠中聲明了工廠方法但並未實現工廠方法,具體產品對象的創建由其子類負責,客戶端針對抽象工廠編程,可在運行時再指定具體工廠類,具體工廠類實現了工廠方法,不同的具體工廠可以創建不同的具體產品,其典型代碼如下所示:

public class ConcreteFactory implements Factory {
    @Override
    public Product factoryMethod() {
        return new ConcreteProduct();
    }
}

在客戶端代碼中,只需關心工廠類即可,不同的具體工廠可以創建不同的產品,典型的客戶端類代碼片段如下所示:

public class Client {
    public static void main(String[] args) {
        // 確定是哪個工廠可得到產品
        Factory factory = new ConcreteFactory();
        // 獲取產品
        Product product = factory.factoryMethod();
    }
}

模式簡化

有時候,為了進一步簡化客戶端的使用,還可以對客戶端隱藏工廠方法,此時,在工廠類中將直接調用產品類的業務方法,客戶端無須調用工廠方法創建產品,直接通過工廠即可使用所創建的對象中的業務方法。

// 改為抽象類
public class AbstractFactory {
    // 在工廠類中直接調用產品類的業務方法
    public void productMethod() {
        Product product = this.createProduct();
        product.method();
    }

    public abstract Product createProduct();
}

通過將業務方法的調用移入工廠類,可以直接使用工廠對象來調用產品對象的業務方法,客戶端無須直接調用工廠方法,在客戶端並不關心Product細節的情況下使用這種設計方案會更加方便。

模式小結

工廠方法模式能夠讓工廠可以自主確定創建何種產品對象,而如何創建這個對象的細節則完全封裝在具體工廠內部,用戶只需要關心所需產品對應的工廠,無須關心創建細節,甚至無須知道具體產品類的類名。基於工廠角色產品角色的多態性設計是工廠方法模式的關鍵。

抽象工廠模式

工廠方法模式通過引入工廠等級結構,解決了簡單工廠模式中工廠類職責太重的問題,但由於工廠方法模式中的每個工廠只生產一類產品,可能會導致系統中存在大量的工廠類,勢必會增加系統的開銷。此時,我們可以考慮將一些相關的產品組成一個產品族,由同一個工廠來統一生產,這就是我們本文將要學習的抽象工廠模式的基本思想。

這裏我斗膽舉個例子來說明一下吧,如果不恰當歡迎指出。

眾所周知,國內知名的電器廠有海爾、海信(姑且就認為是2個),電器廠會生產電視機、電冰箱、空調(姑且就認為是3種產品)。

  • 使用工廠方法模式:工廠方法模式中每個工廠只生產一類產品,那麼就必須要有海爾電視機廠海爾電冰箱廠海爾空調廠海信電視機廠海信電冰箱廠海信空調廠
  • 使用抽象工廠模式:抽象工廠中每個工廠生產由多種產品組成的”產品族”,那麼就只需要有海爾工廠海信工廠就夠了,每個工廠可生產自家的電視機、電冰箱、空調。

由此看出使用抽象工廠模式極大地減少了系統中類的個數。

模式定義

抽象工廠模式為創建一組對象提供了一種解決方案。與工廠方法模式相比,抽象工廠模式中的具體工廠不只是創建一種產品,它負責創建一族產品。抽象工廠模式定義如下:

抽象工廠模式(Abstract Factory Pattern):提供一個創建一系列相關或相互依賴對象的接口,而無須指定它們具體的類。抽象工廠模式又稱為Kit模式,它是一種對象創建型模式。

模式結構圖

在抽象工廠模式中,每一個具體工廠都提供了多個工廠方法用於產生多種不同類型的產品,這些產品構成了一個產品族,抽象工廠模式結構如圖所示:

在抽象工廠模式結構圖中包含如下幾個角色:

  • AbstractFactory(抽象工廠):它聲明了一組用於創建一族產品的方法,每一個方法對應一種產品。
  • ConcreteFactory(具體工廠):它實現了在抽象工廠中聲明的創建產品的方法,生成一組具體產品,這些產品構成了一個產品族,每一個產品都位於某個產品等級結構中。
  • AbstractProduct(抽象產品):它為每種產品聲明接口,在抽象產品中聲明了產品所具有的業務方法。
  • ConcreteProduct(具體產品):它定義具體工廠生產的具體產品對象,實現抽象產品接口中聲明的業務方法。

模式偽代碼

在抽象工廠中聲明了多個工廠方法,用於創建不同類型的產品,抽象工廠可以是接口,也可以是抽象類或者具體類,其典型代碼如下所示:

public abstract class AbstractFactory {

    public abstract AbstractProductA createProductA();

    public abstract AbstractProductB createProductB();

    public abstract AbstractProductC createProductC();
}

具體工廠實現了抽象工廠,每一個具體的工廠方法可以返回一個特定的產品對象,而同一個具體工廠所創建的產品對象構成了一個產品族。對於每一個具體工廠類,其典型代碼如下所示:

public class ConcreteFactory1 extends AbstractFactory {
    @Override
    public AbstractProductA createProductA() {
        return new ConcreteProductA1();
    }

    @Override
    public AbstractProductB createProductB() {
        return new ConcreteProductB1();
    }

    @Override
    public AbstractProductC createProductC() {
        return new ConcreteProductC1();
    }
}

模式小結

如果一開始就學習抽象工廠模式估計很難理解為什麼這樣設計,按次序學習分析簡單工廠模式工廠方法模式抽象工廠模式基本就順理成章了。實際開發中,可能並不是照搬照套工廠模式三兄弟的偽代碼,大多會簡化其中的部分實現。本來學習設計模式就是重思想,學習如何用抽象類、接口、拆分、組合等將軟件解耦合,並增強系統可擴展性,這才是最關鍵的。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

vue-toy: 200行代碼模擬Vue實現

vue-toy

200行左右代碼模擬vue實現,視圖渲染部分使用React來代替Snabbdom,歡迎Star。
項目地址:https://github.com/bplok20010/vue-toy

codesandbox示例

已實現的參數:

interface Options {
    el: HTMLElement | string;
	propsData?: Record<string, any>;
	props?: string[];
	name?: string;
	data?: () => Record<string, any>;
	methods?: Record<string, (e: Event) => void>;
	computed?: Record<string, () => any>;
	watch?: Record<string, (newValue: any, oldValue: any) => any>;
	render: (h: typeof React.createElement) => React.ReactNode;
	renderError?: (h: typeof React.createElement, error: Error) => React.ReactNode;
	mounted?: () => void;
	updated?: () => void;
	destroyed?: () => void;
	errorCaptured?: (e: Error, vm: React.ReactInstance) => void;
}

示例:

import Vue from "vue-toy";

const Hello = Vue.component({
	render(h){
		return h('span', null, 'vue-toy') ;
	}
})

new Vue({
  el: document.getElementById("root"),
  data() {
    return {
      msg: "hello vue toy"
    };
  },
  render(h) {
    return h("h1", null, this.msg, h(Hello));
  }
});

基本原理

官方原理圖:

實現基本步驟:

  1. 使用Observable創建觀察對象
  2. 定義好視圖既render函數
  3. 收集視圖依賴,並監聽依賴屬性
  4. 渲染視圖
  5. 重複3-4
// 創建觀察對象
// 觀察對象主要使用的是Object.defineProperty或Proxy來實現,
const data = observable({
    name: 'vue-toy',
});

// 渲染模版
const render = function(){
    return <h1>{data.name}</h1>
}

// 計算render的依賴屬性,
// 依賴屬性改變時,會重新計算computedFn,並執行監控函數watchFn,
// 屬性依賴計算使用棧及可以了。
// watch(computedFn, watchFn);
watch(render, function(newVNode, oldVNode){
    update(newVNode, mountNode);
});

//初始渲染
mount(render(), mountNode);

// 改變觀察對象屬性,如果render依賴了該屬性,則會重新渲染
data.name = 'hello vue toy';

視圖渲染部分(既render)使用的是vdom技術,vue使用Snabbdom庫,vue-toy使用的是react來進行渲染,所以在render函數里你可以直接使用React的JSX語法,不過別忘記import React from 'react',當然也可以使用preact inferno 等 vdom庫。

由於vue的template的最終也是解析並生成render函數,模版的解析可用htmleParser庫來生成AST,剩下就是解析指令並生產代碼,由於工作量大,這裏就不具體實現,直接使用jsx。

響應式實現

一個響應式示例代碼:

const data = Observable({
	name: "none",
});

const watcher =new Watch(
	data,
	function computed() {
		return "hello " + this.name;
	},
	function listener(newValue, oldValue) {
		console.log("changed:", newValue, oldValue);
	}
);
// changed vue-toy none
data.name = "vue-toy";

Observable實現

源碼
觀察對象創建這裏使用Proxy實現,示例:

function Observable(data) {
	return new Proxy(data, {
		get(target, key) {
			return target[key];
		},
		set(target, key, value) {
			target[key] = value;
			return true;
		},
	});
}

這就完成了一個對象的觀察,但以上示例代碼雖然能觀察對象,但無法實現對象屬性改動后通知觀察者,這時還缺少Watch對象來計算觀察函數的屬性依賴及Notify來實現屬性變更時的通知。

Watch實現

源碼

定義如下:

Watch(data, computedFn, watchFn);
  • data 為 computedFn 的 上下文 既 this 非必須
  • computedFn 為觀察函數並返回觀察的數據,Watch會計算出裏面的依賴屬性。
  • watchFn 當computedFn 返回內容發生改變時,watchFn會被調用,同時接收到新、舊值

大概實現如下:

// Watch.js
// 當前正在收集依賴的Watch
const CurrentWatchDep = {
    current: null,
};
class Watch {
    constructor(data, exp, fn) {
        this.deps = []; 
        this.watchFn = fn;
        this.exp =  () => {
                    return exp.call(data);
                };
        // 保存上一個依賴收集對象
        const lastWatchDep = CurrentWatchDep.current;
        // 設置當前依賴收集對象
        CurrentWatchDep.current = this;
        // 開始收集依賴,並獲取觀察函數返回的值
        this.last = this.exp();
        // 還原
        CurrentWatchDep.current = lastWatchDep;
    }
    clearDeps() {
        this.deps.forEach((cb) => cb());
        this.deps = [];
    }
    // 監聽依賴屬性的改動,並保存取消回調
    addDep(notify) {
        // 當依賴屬性改變時,重新觸發依賴計算
        this.deps.push(notify.sub(() => {
            this.check();
        }));
    }
    // 重新執行依賴計算
    check() {
        // 清空所有依賴,重新計算
        this.clearDeps();
        // 作用同構造函數
        const lastWatchDep = CurrentWatchDep.current;
        CurrentWatchDep.current = this;
        const newValue = this.exp();
        CurrentWatchDep.current = lastWatchDep;
        const oldValue = this.last;
        // 對比新舊值是否改變
        if (!shallowequal(oldValue, newValue)) {
            this.last = newValue;
            // 調用監聽函數
            this.watchFn(newValue, oldValue);
        }
    }
}

Notify實現

觀察對象發生改變后需要通知監聽者,所以還需要實現通知者Notify:

class Notify {
    constructor() {
        this.listeners = [];
    }
    sub(fn) {
        this.listeners.push(fn);
        return () => {
            const idx = this.listeners.indexOf(fn);
            if (idx === -1)
                return;
            this.listeners.splice(idx, 1);
        };
    }
    pub() {
        this.listeners.forEach((fn) => fn());
    }
}

調整Observable

前面的Observable太簡單了,無法完成屬性計算的需求,結合上面Watch Notify的來調整下Observable。

function Observable(data) {
	const protoListeners = Object.create(null);
	// 給觀察數據的所有屬性創建一個Notify
	each(data, (_, key) => {
		protoListeners[key] = new Notify();
	});
	return new Proxy(data, {
		get(target, key) {
			// 屬性依賴計算
			if (CurrentWatchDep.current) {
				const watcher = CurrentWatchDep.current;
				watcher.addDep(protoListener[key]);
			}
			return target[key];
		},
		set(target, key, value) {
			target[key] = value;
			if (protoListeners[key]) {
				// 通知所有監聽者
				protoListeners[key].pub();
			}
			return true;
		},
	});
}

好了,觀察者的創建和訂閱都完成了,開始模擬Vue。

模擬Vue

vue-toy 使用React來實現視圖的渲染,所以render函數里如果使用JSX則需要引入React

準備

既然已經實現了Observable和Watch,那我們就來實現基本原理的示例:

codesandbox示例

import Observable from "vue-toy/cjs/Observable";
import Watch from "vue-toy/cjs/Watch";

function mount(vnode) {
  console.log(vnode);
}

function update(vnode) {
  console.log(vnode);
}

const data = Observable({
  msg: "hello vue toy!",
  counter: 1
});

function render() {
  return `render: ${this.counter} | ${this.msg}`;
}

new Watch(data, render, update);

mount(render.call(data));

setInterval(() => data.counter++, 1000);
// 在控制台可看到每秒的輸出信息

這時將mount update的實現換成vdom就可以完成一個基本的渲染。

但這還不夠,我們需要抽象並封裝成組件來用。

Component

源碼

這裏的Component像是React的高階函數HOC,使用示例:

const Hello = Component({
	props: ["msg"],
	data() {
		return {
			counter: 1,
		};
	},
	render(h) {
		return h("h1", null, this.msg, this.counter);
	},
});

大概實現如下,options 參考文章開頭

function Component(options) {
	return class extends React.Component {
	    // 省略若干...
		constructor(props) {
			super(props);
			// 省略若干...
			// 創建觀察對象
			this.$data = Observable({ ...propsData, ...methods, ...data }, computed);
			// 省略若干...
			// 計算render依賴並監聽
			this.$watcher = new Watch(
				this.$data,
				() => {
					return options.render.call(this, React.createElement);
				},
				debounce((children) => { 
					this.$children = children;
					this.forceUpdate();
				})
			);
			this.$children = options.render.call(this, React.createElement);
		}
		shouldComponentUpdate(nextProps) {
			if (
				!shallowequal(
					pick(this.props, options.props || []),
					pick(nextProps, options.props || [])
				)
			) {
				this.updateProps(nextProps);
				this.$children = options.render.call(this, React.createElement);
				return true;
			}
			return false;
		}
        // 生命周期關聯
		componentDidMount() {
			options.mounted?.call(this);
		}

		componentWillUnmount() {
			this.$watcher.clearDeps();
			options.destroyed?.call(this);
		}

		componentDidUpdate() {
			options.updated?.call(this);
		}

		render() {
			return this.$children;
		}
	};
}

創建主函數 Vue

最後創建入口函數Vue,實現代碼如下:

export default function Vue(options) {
	const RootComponent = Component(options);
	let el;
	if (typeof el === "string") {
		el = document.querySelector(el);
	}

	const props = {
		...options.propsData,
		$el: el,
	};

	return ReactDOM.render(React.createElement(RootComponent, props), el);
}
Vue.component = Component;

好了,Vue的基本實現完成了。

感謝閱讀。

最後,歡迎Star:https://github.com/bplok20010/vue-toy

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※產品缺大量曝光嗎?你需要的是一流包裝設計!

震驚!ConcurrentHashMap裏面也有死循環,作者留下的“彩蛋”了解一下?

JDK BUG

這篇文章,聊一下我最近才知道的一個關於 JDK 8 的 BUG 吧。

首先說一下我是怎麼發現這個 BUG 的呢?

大家都知道我對 Dubbo 有一定的關注,前段時間 Dubbo 2.7.7 發布后我看了它的更新點,就是下面這個網址: https://github.com/apache/dubbo/releases/tag/dubbo-2.7.7

其中有 Bugfixes 這一部分:

每一個我都去簡單的看了一下,其他的 Bugfixes 或多或少都和 Dubbo 框架有一定的關聯性。但是上面紅框框起來的部分完全就是 JDK 的 Bug 了。

所以可以單獨拎出來說。

這個 Bug 我也是看到了這個地方才知道的,但是研究的過程中我發現,這個怎麼說呢:我懷疑這根本就不是 Bug ,這就是 Doug Lea 老爺子在釣魚執法。

為什麼這樣的說呢,大家看完本文就知道了。

Bug 穩定復現

點擊 Dubbo 裏面的鏈接,我們可以看到具體的描述就是一個鏈接:

打開這個鏈接:

https://bugs.openjdk.java.net/browse/JDK-8062841

我們可以看到:這個 Bug 是位於大名鼎鼎的 concurrent 包裏面的 computeIfAbsent 方法。

這個 Bug 在 JDK 9 裏面被修復了,修復人是 Doug Lea。

而我們知道 ConcurrentHashMap 就是 Doug Lea 的大作,可以說是“誰污染誰治理”。

要了解這個 Bug 是怎麼回事,就必須先了解下面這個方法是幹啥的:

java.util.concurrent.ConcurrentHashMap#computeIfAbsent

從這個方法的第二個入參 mappingFunction 我們可以知道這是 JDK 8 之後提供的方法了。

該方法的含義是:當前 Map 中 key 對應的值不存在時,會調用 mappingFunction 函數,並且將該函數的執行結果(不為 null)作為該 key 的 value 返回。

比如下面這樣的:

初始化一個 ConcurrentHashMap ,然後第一次去獲取 key 為 why 的 value,沒有獲取到,直接返回 null。

接着調用 computeIfAbsent 方法,獲取到 null 后調用 getValue 方法,將該方法的返回值和當前的 key 關聯起來。

所以,第二次獲取的時候拿到了 “why技術”。

其實上面的代碼的 17 行的返回值就是 “why技術”,只是我為了代碼演示,再去調用了一次 map.get() 方法。

知道這個方法干什麼的,接下來就帶大家看看 Bug 是什麼。

我們直接用這個問題裏面給的測試用例,地址:

https://bugs.openjdk.java.net/secure/attachment/23985/Main.java

我只是在第 11 行和第 21 行加入了輸出語句:

正常的情況下,我們希望方法正常結束,然後 map 裏面是這樣的:{AaAa=42,BBBB=42}

但是你把這個代碼拿到本地去跑(需要 JDK 8 環境),你會發現,這個方法永遠不會結束。因為它在進行死循環。

這就是 Bug。

提問的藝術

知道 Bug 了,按理來說就應該開始分析源碼,了解為啥出現了會出現這個 Bug。

但是我想先插播一小節提問的藝術。因為這個 Bug 就是一個活生生的示例呀。

這個鏈接,我建議你打開看看,這裏面還有 Doug Lea 老爺子的親自解答:

https://bugs.openjdk.java.net/browse/JDK-8062841

首先我們看提出問題的這個人對於問題的描述(可以先不用細看,反正看着也是懵逼的):

通常情況下,被提問的人分為兩類人:

1.遇到過並知道這個問題的人,可以看的明白你在說什麼。

2.雖然沒有碰見過這個問題,但感覺是自己熟悉的領域,可能知道答案,但是看了你的問題描述,也不知道你在說什麼。

這個描述很長,我第一次看的時候很懵逼,很難理解他在說什麼。我就是屬於第二類人。

而且在大多數的問題中,第二類人比第一類人多很多。

但是當我了解到這個 Bug 的來龍去脈的時候,再看這個描述,其實寫的很清楚了,也很好理解。我就變成第一類人了。

但是變成第一類人是有前提的,前提就是我已經了解到了這個地方 Bug 了。可惜,現在是提問,而被提問的人,還對這個 Bug 不是特別了解。

即使,這個被提問的人是 Doug Lea。

可以看到,2014 年 11 月 04 日 Martin 提出這個問題后, Doug Lea 在不到一個小時內就進行了回復,我給大家翻譯一下,老爺子回復的啥:

首先,你說你發現了 ConcurrentHashMap 的問題,但是我沒有看到的測試用例。那麼我就猜測一下是不是有其他線程在計算值的時候被卡住了,但是從你的描述中我也看不到相應的點。

簡單來說就是:Talk is cheap. Show me the code.(屁話少說,放碼過來。)

於是另一個哥們 Pardeep 在一個月後提交了一個測試案例,就是我們前面看到的測試案例:

Pardeep 給 Martin 回復到下面這段話:

他開門見山的說:我注意這個 bug 很長時間了,然後我還有一個測試用例。

可以說這個測試案例的出現,才是真正的轉折點。

然後他提出了自己的看法,這段描述簡短有力的說出了問題的所在(後面我們會講到),然後他還提出了自己的意見。

不到一個小時,這個回到得到了 Doug Lea 的回復:

他說:小伙子的建議還是不錯的,但是現在還不是我們解決這個問題的時候。我們也許會通過代碼改進死鎖檢查機制,以幫助用戶 debug 他們的程序。但是目前而言,這種機制就算做出來,工作效率也是非常低下的,比如在當前的這個案例下。但是現在我們至少清楚的知道,是否要實現這種機制是不能確定的。

總之一句話:問題我知道了,但是目前我還沒想到好的解決方法。

但是,在 19 天以後,老爺子又回來處理這個問題了:

這次的回答可謂是峰迴路轉,他說:請忽略我之前的話。我們發現了一些可行的改進方法,這些改進可以處理更多的用戶錯誤,包括本報告中所提供的測試用例,即解決在 computeIfAbsent 中提供的函數中進行遞歸映射更新導致死鎖這樣的問題。我們會在 JDK 9 裏面解決這個問題。

所以,回顧這個 Bug 被提出的過程。

首先是 Martin 提出了這個問題,並進行了詳細的描述。可惜的是他的描述很專業,是站在你已經了解了這個 Bug 的立場上去描述的,讓人看的很懵逼。

所以 Doug Lea 看到后也表示這啥呀,沒搞懂。

然後是 Pardeep 跟進這個問題,轉折點在於他拋出的這個測試案例。而我相信,既然 Martin 能把這個問題描述的很清楚,他一定是有一個自己的測試案例的,但是他沒有展現出來。

所以,朋友們,測試案例的重要性不言而喻了。問問題的時候不要只是拋出異常,你至少給段對應的代碼,或者日誌,或者一次性描述清楚,寫在文檔裏面發出來也行呀。

Bug 的原因

導致這個 Bug 的原因也是一句話就能說清楚,前面的 Pardeep 老哥也說了:

問題在於我們在進行 computeIfAbsent 的時候,裏面還有一個 computeIfAbsent。而這兩個 computeIfAbsent 它們的 key 對應的 hashCode 是一樣的。

你說巧不巧。

當它們的 hashCode 是一樣的時候,說明它們要往同一個槽放東西。

而當第二個元素進來的時候,發現坑位已經被前一個元素佔領了,可能就是這樣的畫風:

接下來我們就解析一下 computeIfAbsent 方法的工作流程:

第一步是計算 key 對應的 hashCode 應該放到哪個槽裏面。

然後是進入1649 行的這個 for 循環,而這個 for 循環是一個死循環,它在循環體內部判斷各種情況,如果滿足條件則 break 循環。

首先,我們看一下 “AaAa” 和 “BBBB” 經過 spread 計算(右移 16 位高效計算)后的 h 值是什麼:

哇塞,好巧啊,從框起來的這兩部分可以看到,都是 2031775 呢。

說明他們要在同一個槽裏面搞事情。

先是 “AaAa” 進入 computeIfAbsent 方法:

在第一次循環的時候 initTable,沒啥說的。

第二次循環先是在 1653 行計算出數組的下標,並取出該下標的 node。發現這個 node 是空的。於是進入分支判斷:

在標號為 ① 的地方進行 cas 操作,先用 r(即 ReservationNode)進行一個佔位的操作。

在標號為 ② 的地方進行 mappingFunction.apply 的操作,計算 value 值。如果計算出來不為 null,則把 value 組裝成最終的 node。

在標號為 ③ 的東西把之前佔位的 ReservationNode 替換成標號為 ② 的地方組裝成的node 。

問題就出現標號為 ② 的地方。可以看到這裏去進行了 mappingFunction.apply 的操作,而這個操作在我們的案例下,會觸發另一次 computeIfAbsent 操作。

現在 “AaAa” 就等着這個 computeIfAbsent 操作的返回值,然後進行下一步操作,也就是進行標號為 ③ 的操作了。

接着 “BBBB” 就來了。

通過前面我們知道了 “BBBB” 的 hashCode 經過計算后也是和 “AaAa” 一樣。所以它也要想要去那個槽裏面搞事情。

可惜它來晚了一步。

帶大家看一下對應的代碼:

當 key 為 “BBBB” 的時候,算出來的 h 值也是 2031775。

它也會進入 1649 行的這個死循環。然後進行各種判斷。

接下來我要論證的是:

在本文的示例代碼中,當運行到 key 為 “BBBB” 的時候,進入 1649 行這個死循環后,就退不出來了。程序一直在裏面循環運行。

在標號為 ① 的地方,由於這個時候 tab 已經不為 null 了,所以不會進入這個分支。

在標號為 ② 的地方,由於之前 “AaAa” 已經扔了一個 ReservationNode 進去佔位置了,所以不等於 null。所以,也就不會進入這個分支。

怕你懵逼,給你配個圖,真是暖男作者石錘了:

接下來到標號為 ③ 的地方,裏面有一個 MOVED,這個 MOVED 是幹啥的呢?

表示當前的 ConcurrentHashMap 是否是在進行擴容。

很明顯,現在還沒有到該擴容的時候:

第 1678 行的 f 就是之前 “AaAa” 扔進去的 ReservationNode ,這個 Node 的 hash 是 -3,不等於MOVED(-1)。

所以,不會進入這個分支判斷。

接下來,能進的只有標號為 ④ 的地方了,所以我們只需要把這個地方攻破,就徹底了解這個 Bug 了。

走起:

通過前面的分析我們知道了,當前案例情況下,只會進入 1672 行這個分支。

而這個分支裏面,還有四個判斷。我們一個個的攻破:

標號為 ⑤ 的地方,tabAt 方法取出來的對象,就是之前 “AaAa” 放進去的佔位的 ReservationNode ,也就是這個 f 。所以可以進入這個分支判斷。

標號為 ⑥ 的地方,fh >=0 。而 fh 是當前 node 的 hash 值,大於 0 說明當前是按照鏈表存儲的數據。之前我們分析過了,當前的 hash 值是 -3。所以,不會進入這個分支。

標號為 ⑦ 的地方,判斷 f 節點是否是紅黑樹存儲。當然不是的。所以,不會進入這個分支。

標號為 ⑧ 的地方,binCount 代表的是該下標裏面,有幾個 node 節點。很明顯,現在一個都沒有。所以當前的 binCount 還是 0 。所以,不會進入這個分支。

完了。分析完了。

Bug 也就出來了,一次 for 循環結束后,沒有 break。苦就苦在這個 for 循環還是個死循環。

再來一個上帝視角,看看當 key 為 “BBBB” 的時候發生了什麼事情:

進入無限循環內:

①.經過 “AaAa” 之後,tab 就不為 null 了。

②.當前的槽中已經被 “AaAa” 先放了一個 ReservationNode 進行佔位了,所以不為 null。

③.當前的 map 並沒有進行擴容操作。

④.包含⑤、⑥、⑦、⑧。

⑤.tabAt 方法取出來的對象,就是之前 “AaAa” 放進去的佔位的 ReservationNode,所以滿足條件進入分支。

⑥.判斷當前是否是鏈表存儲,不滿足條件,跳過。

⑦.判斷當前是否是紅黑樹存儲,不滿足條件,跳過。

⑧.判斷當前下標裏面是否放了 node,不滿足條件(“AaAa” 只有個佔位的Node ,並沒有初始完成,所以還沒有放到該下標裏面),進入下一次循環。

然後它就在死循環裏面出不來了!

我相信現在大家對於這個 Bug 的來路了解清楚了。

如果你是在 idea 裏面跑這個測試用例,也可以這樣直觀的看一眼:

點擊這個照相機圖標:

從線程快照裏面其實也是可以看到端倪的,大家可以去分析分析。

有的觀點說的是由於線程安全的導致的死循環,經過分析我覺得這個觀點是不對的。

它存在死循環,不是由於線程安全導致的,純粹是自己進入了死循環。

或者說,這是一個“彩蛋”?

或者……自信點,就說這事 Bug ,能穩定復現的那種。

那麼我們如果是使用 JDK 8 怎麼避免踩到這個“彩蛋”呢?

看看 Dubbo 裏面是怎麼解決的:

先調用了 get 方法,如果返回為 null,則調用 putIfAbsent 方法,這樣就能實現和之前一樣的效果了。

如果你在項目中也有使用 computeIfAbsent 的地方,建議也這樣去修改。

說到 ConcurrentHashMap get 方法返回 null,我就想起了之前討論的一個面試題了:

答案都寫在這個文章裏面了,有興趣的可以了解一下《這道面試題我真不知道面試官想要的回答是什麼》

Bug 的解決 其實徹底理解了這個 Bug 之後,我們再來看一下 JDK 9 裏面的解決方案,看一下官方源碼對比:

http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/ConcurrentHashMap.java?r1=1.258&r2=1.259&sortby=date&diff_format=f

就加了兩行代碼,判斷完是否是紅黑樹節點后,再判斷一下是否是 ReservationNode 節點,因為這個節點就是個佔位節點。如果是,則拋出異常。

就這麼簡單。沒有什麼神秘的。

所以,如果你在 JDK 9 裏面執行文本的測試用例,就會拋出 IllegalStateException。

這就是 Doug Lea 之前提到的解決方案:

了解了這個 Bug 的來龍去脈后,特別是看到解決方案后,我們就能輕描淡寫的說一句:

害,就這?沒聽說過!

另外,我看 JDK 9 修復的時候還不止修復了一個問題:

http://hg.openjdk.java.net/jdk9/jdk9/jdk/file/6dd59c01f011/src/java.base/share/classes/java/util/concurrent/ConcurrentHashMap.java

你去翻一翻。發現,啊,全是知識點啊,學不動了。

釣魚執法

為什麼我在文章的一開始就說了這是 Doug Lea 在釣魚執法呢?

因為在最開始提問的藝術那一部分,我相信,Doug Lea 跑完那個測試案例之後,心裏也有點數了。

大概知道問題在哪了,而且從他的回答和他寫的文檔中我也有理由相信,他寫的這個方法的時候就知道可能會出問題。

而且,Pardeep 的回復中提到了文檔,那我們就去看看官方文檔對於該方法的描述是怎樣的:

https://docs.oracle.com/javase/8/docs/api/

文檔中說函數方法應該簡短,簡單。而且不能在更新的映射的時候更新映射。就是說不能套娃。

套娃,用程序說就是recursive(遞歸),按照文檔說如果存在遞歸,則會拋出 IllegalStateException 。

而提到遞歸,你想到了什麼?

我首先就想到了斐波拉契函數。我們用 computeIfAbsent 實現一個斐波拉契函數如下:

public class Test {

static Map<Integer, Integer> cache = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        System.out.println("f(" + 14 + ") =" + fibonacci(14));
    }

    static int fibonacci(int i) {
        if (i == 0)
            return i;
        if (i == 1)
            return 1;
        return cache.computeIfAbsent(i, (key) -> {
            System.out.println("Slow calculation of " + key);
            return fibonacci(i - 2) + fibonacci(i - 1);
        });
    }
}

這就是遞歸調用,我用 JDK 1.8 跑的時候並沒有拋出 IllegalStateException,只是程序假死了,原因和我們前面分析的是一樣一樣的。我理解這個地方是和文檔不符的。

所以,我懷疑是 Doug Lea 在這個地方釣魚執法。

CHM一定線程安全嗎?

既然都說到 currentHashMap(CHM)了,那我說一個相關的注意點吧。

首先 CHM 一定能保證線程安全嗎?

是的,CHM 本身一定是線程安全的。但是,如果你使用不當還是有可能會出現線程不安全的情況。

給大家看一點 Spring 中的源碼吧:

org.springframework.core.SimpleAliasRegistry

在這個類中,aliasMap 是 ConcurrentHashMap 類型的:

在 registerAlias 和 getAliases 方法中,都有對 aliasMap 進行操作的代碼,但是在操作之前都是用 synchronized 把 aliasMap 鎖住了。

為什麼?為什麼我們操作 ConcurrentHashMap 的時候還要加鎖呢?

這個是根據場景而定的,這個別名管理器,在這裏加鎖應該是為了避免多個線程操作 ConcurrentHashMap 。

雖然 ConcurrentHashMap 是線程安全的,但是假設如果一個線程 put,一個線程 get,在這個代碼的場景裏面是不允許的。

如果覺得不太好理解的話我舉一個 redis 的例子。

redis 的 get、set 方法都是線程安全的吧。但是你如果先 get 再 set,那麼在多線程的情況下還是會有問題的。

因為這兩個操作不是原子性的。所以 incr 就應運而生了。

我舉這個例子的是想說線程安全與否不是絕對的,要看場景。給你一個線程安全的容器,你使用不當還是會有線程安全的問題。

再比如,HashMap 一定是線程不安全的嗎?

說不能說的這麼死吧。它是一個線程不安全的容器。但是如果我的使用場景是只讀呢?

在這個只讀的場景下,它就是線程安全的。

總之,看場景。道理,就是這麼一個道理。

最後說兩句(求關注)

所以點個“贊”吧,周更很累的,不要白嫖我,需要一點正反饋。

才疏學淺,難免會有紕漏,如果你發現了錯誤的地方,還請你留言指出來,我對其加以修改。

感謝您的閱讀,我堅持原創,十分歡迎並感謝您的關注。

我是 why,一個被代碼耽誤的文學創作者,不是大佬,但是喜歡分享,是一個又暖又有料的四川好男人。

歡迎關注我的微信公眾號:why技術。在這裏我會分享一些java技術相關的知識,用匠心敲代碼,對每一行代碼負責。偶爾也會荒腔走板的聊一聊生活,寫一寫書評、影評。感謝你的關注,願你我共同進步。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

※產品缺大量曝光嗎?你需要的是一流包裝設計!

結合中斷上下文切換和進程上下文切換分析Linux內核的一般執行過程

結合中斷上下文切換和進程上下文切換分析Linux內核的一般執行過程

目錄

  • 結合中斷上下文切換和進程上下文切換分析Linux內核的一般執行過程
    • 一. 實驗準備
    • 二. 實驗過程
      • I 分析中斷上下文的切換
      • II 分析fork子進程啟動執行時進程上下文及其特殊之處
      • III 分析execve系統調用中斷上下文及其特殊之處
      • IV 以系統調用作為特殊的中斷,結合中斷上下文切換和進程上下文切換分析Linux系統的一般執行過程
    • 三. 總結

一. 實驗準備

  1. 詳細要求

結合中斷上下文切換和進程上下文切換分析Linux內核一般執行過程

  • 以fork和execve系統調用為例分析中斷上下文的切換
  • 分析execve系統調用中斷上下文的特殊之處
  • 分析fork子進程啟動執行時進程上下文的特殊之處
  • 以系統調用作為特殊的中斷,結合中斷上下文切換和進程上下文切換分析Linux系統的一般執行過程

完成一篇博客總結分析Linux系統的一般執行過程,以期對Linux系統的整體運作形成一套邏輯自洽的模型,並能將所學的各種OS和Linux內核知識/原理融通進模型中

  1. 實驗環境

發行版本:Ubuntu 18.04.4 LTS

處理器:Intel® Core™ i7-8850H CPU @ 2.60GHz × 3

圖形卡:Parallels using AMD® Radeon pro 560x opengl engine

GNOME:3.28.2

二. 實驗過程

I 分析中斷上下文的切換

中斷髮生以後,CPU跳到內核設置好的中斷處理代碼中去,由這部分內核代碼來處理中斷。這個處理過程中的上下文就是中斷上下文

幾乎所有的體繫結構,都提供了中斷機制。當硬件設備想和系統通信的時候,它首先發出一個異步的中斷信號去打斷處理器的執行,繼而打斷內核的執行。中斷通常對應着一个中斷號,內核通過這个中斷號找到中斷服務程序,調用這個程序響應和處理中斷。當你敲擊鍵盤時,鍵盤控制器發送一个中斷信號告知系統,鍵盤緩衝區有數據到來,內核收到這个中斷號,調用相應的中斷服務程序,該服務程序處理鍵盤數據然後通知鍵盤控制器可以繼續輸入數據了。為了保證同步,內核可以使用中止—既可以停止所有的中斷也可以有選擇地停止某个中斷號對應的中斷,許多操作系統的中斷服務程序都不在進程上下文中執行,它們在一個與所有進程無關的、專門的中斷上下文中執行。之所以存在這樣一個專門的執行環境,為了保證中斷服務程序能夠在第一時間響應和處理中斷請求,然後快速退出。

對同一個CPU來說,中斷處理比進程擁有更高的優先級,所以中斷上下文切換並不會與進程上下文切換同時發生。由於中斷程序會打斷正常進程的調度和運行,大部分中斷處理程序都短小精悍,以便盡可能快的執行結束。

一個進程的上下文可以分為三個部分:用戶級上下文、寄存器上下文以及系統級上下文。

用戶級上下文: 正文、數據、用戶堆棧以及共享存儲區;
寄存器上下文: 通用寄存器、程序寄存器(IP)、處理器狀態寄存器(EFLAGS)、棧指針(ESP);
系統級上下文: 進程控制塊task_struct、內存管理信息(mm_struct、vm_area_struct、pgd、pte)、內核棧。

當發生進程調度時,進行進程切換就是上下文切換(context switch)。操作系統必須對上面提到的全部信息進行切換,新調度的進程才能運行。而系統調用進行的是模式切換(mode switch)。模式切換與進程切換比較起來,容易很多,而且節省時間,因為模式切換最主要的任務只是切換進程寄存器上下文的切換。

II 分析fork子進程啟動執行時進程上下文及其特殊之處

fork()系統調用會通過複製一個現有進程來創建一個全新的進程. 進程被存放在一個叫做任務隊列的雙向循環鏈表當中。鏈表當中的每一項都是類型為task_struct成為進程描述符的結構。

首先我們來看一段代碼

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main(){
  pid_t pid;
  char *message;
  int n;
  pid = fork();
  if(pid<0){
    perror("fork failed");
    exit(1);
  }
  if (pid == 0){
    message = "this is the child \n";
    n=6;
  }else {
    message = "this is the parent \n";
    n=3;
  }
  for(;n>0;n--){
    printf("%s",message);
    sleep(1);
  }
  return 0;
}

在Linux環境中編寫和執行

# 創建一個C文件,名為t.c,將上面的代碼拷貝進去
touch t.c
# 進行編譯
gcc t.c
# 執行
./a.out

之所以輸出是這樣的結果,是因為程序的執行流程如下圖所示:

以上的fork()例子的執行流程大致如下:

  1. 父進程初始化。
  2. 父進程調用fork,這是一個系統調用,因此進入內核。
  3. 內核根據父進程複製出一個子進程,父進程和子進程的PCB信息相同,用戶態代碼和數據也相同。因此,子進程現在的狀態看起來和父進程一樣,做完了初始化,剛調用了fork進入內核,還沒有從內核返回。
  4. 現在有兩個一模一樣的進程看起來都調用了fork進入內核等待從內核返回(實際上fork只調用了一次),此外系統中還有很多別的進程也等待從內核返回。是父進程先返回還是子進程先返回,還是這兩個進程都等待,先去調度執行別的進程,這都不一定,取決於內核的調度算法。
  5. 如果某個時刻父進程被調度執行了,從內核返回后就從fork函數返回,保存在變量pid中的返回值是子進程的id,是一個大於0的整數,因此執下面的else分支,然後執行for循環,打印"This is the parent\n"三次之後終止。
  6. 如果某個時刻子進程被調度執行了,從內核返回后就從fork函數返回,保存在變量pid中的返回值是0,因此執行下面的if (pid == 0)分支,然後執行for循環,打印"This is the child\n"六次之後終止。fork調用把父進程的數據複製一份給子進程,但此後二者互不影響,在這個例子中,fork調用之後父進程和子進程的變量messagen被賦予不同的值,互不影響。
  7. 父進程每打印一條消息就睡眠1秒,這時內核調度別的進程執行,在1秒這麼長的間隙里(對於計算機來說1秒很長了)子進程很有可能被調度到。同樣地,子進程每打印一條消息就睡眠1秒,在這1秒期間父進程也很有可能被調度到。所以程序運行的結果基本上是父子進程交替打印,但這也不是一定的,取決於系統中其它進程的運行情況和內核的調度算法,如果系統中其它進程非常繁忙則有可能觀察到不同的結果。另外,讀者也可以把sleep(1);去掉看程序的運行結果如何。
  8. 這個程序是在Shell下運行的,因此Shell進程是父進程的父進程。父進程運行時Shell進程處於等待狀態,當父進程終止時Shell進程認為命令執行結束了,於是打印Shell提示符,而事實上子進程這時還沒結束,所以子進程的消息打印到了Shell提示符後面。最後光標停在This is the child的下一行,這時用戶仍然可以敲命令,即使命令不是緊跟在提示符後面,Shell也能正確讀取。

fork()最特殊之處在於:成功調用后返回兩個值,是由於在複製時複製了父進程的堆棧段,所以兩個進程都停留在fork函數中,等待返回。所以fork函數會返回兩次,一次是在父進程中返回,另一次是在子進程中返回,這兩次的返回值不同

其中父進程返回子進程pid,這是由於一個進程可以有多個子進程,但是卻沒有一個函數可以讓一個進程來獲得這些子進程id,那談何給別人你創建出來的進程。而子進程返回0,這是由於子進程可以調用getppid獲得其父進程進程ID,但這個父進程ID卻不可能為0,因為進程ID0總是有內核交換進程所用,故返回0就可代表正常返回了。

從fork函數開始以後的代碼父子共享,既父進程要執行這段代碼,子進程也要執行這段代碼.(子進程獲得父進程數據空間,堆和棧的副本. 但是父子進程並不共享這些存儲空間部分. (即父,子進程共享代碼段.)。現在很多實現並不執行一個父進程數據段,堆和棧的完全複製. 而是採用寫時拷貝技術。這些區域有父子進程共享,而且內核地他們的訪問權限改為只讀的.如果父子進程中任一個試圖修改這些區域,則內核值為修改區域的那塊內存製作一個副本, 也就是如果你不修改我們一起用,你修改了之後對於修改的那部分內容我們分開各用個的。

再一個就是,在重定向父進程的標準輸出時,子進程標準輸出也被重定向。這就源於父子進程會共享所有的打開文件。 因為fork的特性就是將父進程所有打開文件描述符複製到子進程中。當父進程的標準輸出被重定向,子進程本是寫到標準輸出的時候,此時自然也改寫到那個對應的地方;與此同時,在父進程等待子進程執行時,子進程被改寫到文件show.out中,然後又更新了與父進程共享的該文件的偏移量;那麼在子進程終止后,父進程也寫到show.out中,同時其輸出還會追加在子進程所寫數據之後。

在fork之後處理文件描述符一般有以下兩種情況:

  • 父進程等待子進程完成。此種情況,父進程無需對其描述符作任何處理。當子進程終止后,它曾進行過讀,寫操作的任一共享描述符的文件偏移已發生改變。
  • 父子進程各自執行不同的程序段。這樣fork之後,父進程和子進程各自關閉它們不再使用的文件描述符,這樣就避免干擾對方使用的文件描述符了。這類似於網絡服務進程。

同時父子進程也是有區別的:它們不僅僅是兩個返回值不同;它們各自的父進程也不同,父進程的父進程是ID不變的;還有子進程不繼承父進程設置的文件鎖,子進程未處理的信號集會設置為空集等不同

事實上linux平台通過clone()系統調用實現fork()。fork(),vfork()和clone()庫函數都根據各自需要的參數標誌去調用clone(),然後由clone()去調用do_fork(). 再然後do_fork()完成了創建中的大部分工作,他定義在kernel/fork.c當中.該函數調用copy_process()。

具體的流程可以參考下圖:

III 分析execve系統調用中斷上下文及其特殊之處

execve() 系統調用的作用是運行另外一個指定的程序。它會把新程序加載到當前進程的內存空間內,當前的進程會被丟棄,它的堆、棧和所有的段數據都會被新進程相應的部分代替,然後會從新程序的初始化代碼和 main 函數開始運行。同時,進程的 ID 將保持不變。

execve() 系統調用通常與 fork() 系統調用配合使用。從一個進程中啟動另一個程序時,通常是先 fork() 一個子進程,然後在子進程中使用 execve() 變身為運行指定程序的進程。 例如,當用戶在 Shell 下輸入一條命令啟動指定程序時,Shell 就是先 fork() 了自身進程,然後在子進程中使用 execve() 來運行指定的程序。

Linux提供了execl、execlp、execle、execv、execvp和execve等六個用以執行一個可執行文件的函數(統稱為exec函數,其間的差異在於對命令行參數和環境變量參數的傳遞方式不同)。這些函數的第一個參數都是要被執行的程序的路徑,第二個參數則向程序傳遞了命令行參數,第三個參數則向程序傳遞環境變量。以上函數的本質都是調用在arch/i386/kernel/process.c文件中實現的系統調用sys_execve來執行一個可執行文件。

asmlinkage int sys_execve(struct pt_regs regs)
{
    int  error;
    char * filename;
    //將可執行文件的名稱裝入到一個新分配的頁面中
    filename = getname((char __user *) regs.ebx);
    error = PTR_ERR(filename);
    if (IS_ERR(filename))
       goto out;
    //執行可執行文件
    error = do_execve(filename,
          (char __user * __user *) regs.ecx,
          (char __user * __user *) regs.edx,
         &regs);
    if (error == 0) {
       task_lock(current);
       current->ptrace &= ~PT_DTRACE;
       task_unlock(current);
       
       set_thread_flag(TIF_IRET);
    }
    putname(filename);
out:
    return error;
}

該系統調用所需要的參數pt_regs在include/asm-i386/ptrace.h文件中定義。該參數描述了在執行該系統調用時,用戶態下的CPU寄存器在核心態的棧中的保存情況。通過這個參數,sys_execve可以獲得保存在用戶空間的以下信息:可執行文件路徑的指針(regs.ebx中)、命令行參數的指針(regs.ecx中)和環境變量的指針(regs.edx中)。

struct pt_regs {
    long ebx;
    long ecx;
    long edx;
    long esi;
    long edi;
    long ebp;
    long eax;
    int xds;
    int xes;
    long orig_eax;
    long eip;
    int xcs;
    long eflags;
    long esp;
    int xss;
};

regs.ebx保存着系統調用execve的第一個參數,即可執行文件的路徑名。因為路徑名存儲在用戶空間中,這裏要通過getname拷貝到內核空間中。getname在拷貝文件名時,先申請了一個page作為緩衝,然後再從用戶空間拷貝字符串。為什麼要申請一個頁面而不使用進程的系統空間堆棧?首先這是一個絕對路徑名,可能比較長,其次進程的系統空間堆棧大約為7K,比較緊缺,不宜濫用。用完文件名后,在函數的末尾調用putname釋放掉申請的那個頁面。

sys_execve的核心是調用do_execve函數,傳給do_execve的第一個參數是已經拷貝到內核空間的路徑名filename,第二個和第三個參數仍然是系統調用execve的第二個參數argv和第三個參數envp,它們代表的傳給可執行文件的參數和環境變量仍然保留在用戶空間中。簡單分析一下這個函數的思路:先通過open_err()函數找到並打開可執行文件,然後要從打開的文件中將可執行文件的信息裝入一個數據結構linux_binprm,do_execve先對參數和環境變量的技術,並通過prepare_binprm讀入開頭的128個字節到linux_binprm結構的bprm緩衝區,最後將執行的參數從用戶空間拷貝到數據結構bprm中。內核中有一個formats隊列,該隊列的每個成員認識並只處理一種格式的可執行文件,bprm緩衝區中的128個字節中有格式信息,便要通過這個隊列去辨認。do_execve()中的關鍵是最後執行一個search_binary_handler()函數,找到對應的執行文件格式,並返回一個值,這樣程序就可以執行了。

do_execve 定義在 <fs/exec.c> 中,關鍵代碼解析如下。

int do_execve(char * filename, char __user *__user *argv,
       char __user *__user *envp,    struct pt_regs * regs)
{
    struct linux_binprm *bprm; //保存要執行的文件相關的數據
    struct file *file;
    int retval;
    int i;
    retval = -ENOMEM;
    bprm = kzalloc(sizeof(*bprm), GFP_KERNEL);
    if (!bprm)
       goto out_ret;
    //打開要執行的文件,並檢查其有效性(這裏的檢查並不完備)
    file = open_exec(filename);
    retval = PTR_ERR(file);
    if (IS_ERR(file))
       goto out_kfree;
    //在多處理器系統中才執行,用以分配負載最低的CPU來執行新程序
    //該函數在include/linux/sched.h文件中被定義如下:
    // #ifdef CONFIG_SMP
    // extern void sched_exec(void);
    // #else
    // #define sched_exec() {}
    // #endif
    sched_exec();
    //填充linux_binprm結構
    bprm->p = PAGE_SIZE*MAX_ARG_PAGES-sizeof(void *);
    bprm->file = file;
    bprm->filename = filename;
    bprm->interp = filename;
    bprm->mm = mm_alloc();
    retval = -ENOMEM;
    if (!bprm->mm)
       goto out_file;
    //檢查當前進程是否在使用LDT,如果是則給新進程分配一個LDT
    retval = init_new_context(current, bprm->mm);
    if (retval  0)
       goto out_mm;
    //繼續填充linux_binprm結構
    bprm->argc = count(argv, bprm->p / sizeof(void *));
    if ((retval = bprm->argc)  0)
       goto out_mm;
    bprm->envc = count(envp, bprm->p / sizeof(void *));
    if ((retval = bprm->envc)  0)
       goto out_mm;
    retval = security_bprm_alloc(bprm);
    if (retval)
       goto out;
    //檢查文件是否可以被執行,填充linux_binprm結構中的e_uid和e_gid項
    //使用可執行文件的前128個字節來填充linux_binprm結構中的buf項
    retval = prepare_binprm(bprm);
    if (retval  0)
       goto out;
    //將文件名、環境變量和命令行參數拷貝到新分配的頁面中
    retval = copy_strings_kernel(1, &bprm->filename, bprm);
    if (retval  0)
       goto out;
    bprm->exec = bprm->p;
    retval = copy_strings(bprm->envc, envp, bprm);
    if (retval  0)
       goto out;
    retval = copy_strings(bprm->argc, argv, bprm);
    if (retval  0)
       goto out;
    //查詢能夠處理該可執行文件格式的處理函數,並調用相應的load_library方法進行處理
    retval = search_binary_handler(bprm,regs);
    if (retval >= 0) {
       free_arg_pages(bprm);
       //執行成功
       security_bprm_free(bprm);
       acct_update_integrals(current);
       kfree(bprm);
       return retval;
    }
out:
    //發生錯誤,返回inode,並釋放資源
    for (i = 0 ; i  MAX_ARG_PAGES ; i++) {
       struct page * page = bprm->page;
       if (page)
         __free_page(page);
    }
    if (bprm->security)
       security_bprm_free(bprm);
out_mm:
    if (bprm->mm)
       mmdrop(bprm->mm);
out_file:
    if (bprm->file) {
       allow_write_access(bprm->file);
       fput(bprm->file);
    }
out_kfree:
    kfree(bprm);
out_ret:
    return retval;
}

該函數用到了一個類型為linux_binprm的結構體來保存要執行的文件相關的信息,該結構體在include/linux/binfmts.h文件中定義:

struct linux_binprm{
    char buf[BINPRM_BUF_SIZE]; //保存可執行文件的頭128字節
    struct page *page[MAX_ARG_PAGES];
    struct mm_struct *mm;
    unsigned long p;    //當前內存頁最高地址
    int sh_bang;
    struct file * file;     //要執行的文件
    int e_uid, e_gid;    //要執行的進程的有效用戶ID和有效組ID
    kernel_cap_t cap_inheritable, cap_permitted, cap_effective;
    void *security;
    int argc, envc;     //命令行參數和環境變量數目
    char * filename;   //要執行的文件的名稱
    char * interp;       //要執行的文件的真實名稱,通常和filename相同
   unsigned interp_flags;
    unsigned interp_data;
    unsigned long loader, exec;
};

在該函數的最後,又調用了fs/exec.c文件中定義的search_binary_handler函數來查詢能夠處理相應可執行文件格式的處理器,並調用相應的load_library方法以啟動進程。這裏,用到了一個在include/linux/binfmts.h文件中定義的linux_binfmt結構體來保存處理相應格式的可執行文件的函數指針如下:

struct linux_binfmt {
    struct linux_binfmt * next;
    struct module *module;
    // 加載一個新的進程
    int (*load_binary)(struct linux_binprm *, struct pt_regs * regs);
    // 動態加載共享庫
    int (*load_shlib)(struct file *);
    // 將當前進程的上下文保存在一個名為core的文件中
   int (*core_dump)(long signr, struct pt_regs * regs, struct file * file);
    unsigned long min_coredump;
};

Linux內核允許用戶通過調用在include/linux/binfmt.h文件中定義的register_binfmt和unregister_binfmt函數來添加和刪除linux_binfmt結構體鏈表中的元素,以支持用戶特定的可執行文件類型。
在調用特定的load_binary函數加載一定格式的可執行文件后,程序將返回到sys_execve函數中繼續執行。該函數在完成最後幾步的清理工作后,將會結束處理並返回到用戶態中,最後,系統將會將CPU分配給新加載的程序。

execve系統調用的過程總結如下:

  • execve系統調用陷入內核,並傳入命令行參數和shell上下文環境
  • execve陷入內核的第一個函數:do_execve,該函數封裝命令行參數和shell上下文
  • do_execve調用do_execveat_common,後者進一步調用__do_execve_file,打開ELF文件並把所有的信息一股腦的裝入linux_binprm結構體
  • do_execve_file中調用search_binary_handler,尋找解析ELF文件的函數
  • search_binary_handler找到ELF文件解析函數load_elf_binary
  • load_elf_binary解析ELF文件,把ELF文件裝入內存,修改進程的用戶態堆棧(主要是把命令行參數和shell上下文加入到用戶態堆棧),修改進程的數據段代碼段
  • load_elf_binary調用start_thread修改進程內核堆棧(特別是內核堆棧的ip指針)
  • 進程從execve返回到用戶態后ip指向ELF文件的main函數地址,用戶態堆棧中包含了命令行參數和shell上下文環境

IV 以系統調用作為特殊的中斷,結合中斷上下文切換和進程上下文切換分析Linux系統的一般執行過程

Linux系統的一般執行過程

正在運行的用戶態進程X切換到運行用戶態進程Y的過程

  1. 發生中斷 ,完成以下步驟:

    save cs:eip/esp/eflags(current) to kernel stack
    load cs:eip(entry of a specific ISR) and ss:esp(point to kernel stack)

  2. SAVE_ALL //保存現場,這裡是已經進入內核中斷處里過程

  3. 中斷處理過程中或中斷返回前調用了schedule(),其中的switch_to做了關鍵的進程上下文切換

  4. 標號1之後開始運行用戶態進程Y(這裏Y曾經通過以上步驟被切換出去過因此可以從標號1繼續執行)

  5. restore_all //恢復現場

  6. 繼續運行用戶態進程Y

進程間的特殊情況

  • 通過中斷處理過程中的調度時機,用戶態進程與內核線程之間互相切換和內核線程之間互相切換
  • 與最一般的情況非常類似,只是內核線程運行過程中發生中斷沒有進程用戶態和內核態的轉換;
  • 內核線程主動調用schedule(),只有進程上下文的切換,沒有發生中斷上下文的切換,與最一般“的情況略簡略;
  • 創建子進程的系統調用在子進程中的執行起點及返回用戶態,如fork;
  • 加載一個新的可執行程序后返回到用戶態的情況,如execve;0-3G內核態和用戶態都可以訪問,3G以上只能內核態訪問。內核是所有進程共享的。內核是各種中斷處理過程和內核線程的集合。

三. 總結

這次實驗主要做了如下的事情:

  • 學習並完成實驗環境的配置的搭建
  • 學習並了解Linux內核中系統調用相關知識
  • 學習了中斷相關的知識
  • 學習並實踐了fork()與execve()系統調用的知識
  • 思考代碼執行的流程與原理

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

敏捷開發:最近的收穫和站會上的小黃鴨

最近

在博客園經常分享一些心得,有幸在另一個平台做了一場直播。

自己精心準備了很長時間,從素材和文章以及一字一句。

過程中感覺自信滿滿,後期再去回顧發現其實硬貨還需要再硬一點。

dxxxxxx 平台的分享

自己在這個社區進行分享的時候,開始自己照着自己的演講稿念,沒多久自己發現根據演講稿年思路不夠連貫。

自己扔掉演講稿根據之前自己準備的主體脈絡進行分享,有準備但是還是覺得臨場發揮思路更加連貫和清晰。

一個半小時過去了,我的分享完畢,這種直播的壓力確實是比較大的,因為你要直接面對聽眾,會遇到一些突發事情。

期間就發生了2次離線,還有你要關注評論的同時也不能讓評論影響你的節奏,

對於一些重要的評論在分享過程中隨時回復還是記錄下來?

直播結束后,第二天我又重新聽了一下自己的分享,總結一下直播的經驗。

過程比結果收穫更大。期間還有一件事讓我很是糾結。

我發朋友圈做宣傳,把自己的同事給屏蔽了,因為當時的我沒有自信,怕他們笑話。

心中的那種不自信的感覺完全戰勝了一切。自己不是技術最牛的,自己也不是這個管理經驗最豐富的。

我對自己這種糾結懊悔了很久,正常地做自己就好了,又不是做給別人看的。

下次再有機會我會更加自信,即便不是最專業,那也需要一份對自己努力的自信。

另一個話題

沒有一絲防備,我們直接切換另一個話題。

上面算是給自己這3個月沒寫博客劃一個句號。我們來看看最近我們在敏捷一些什麼?

我們項目組組織了讀書分享,當時組織這個讀書分享我也是很糾結。

作為項目經理不是職能經理,自己到底組織一場讀書分享合不合適?

我把我的疑惑和需求說給了我們項目群經理,希望從他那裡能夠得到一些經驗。

疑惑:我們從項目角度出發來組織學習,會不會觸碰了職能經理的職能邊界?

(比如讓大家閱讀《Scrum精髓:敏捷轉型指南》,會不會觸碰產品職能的邊界?)

需求:我希望大家能夠系統地認識敏捷和框架。
從中能夠學習到一些東西來應用到項目中,其次大家對於這種項目內的分享形式是擁抱的態度

經理的建議是可以嘗試一下。我們項目組開了一個會,約定了一下。

在不佔用大家太多的時間,我們挑選值得閱讀的章節么一個項目中帶着一次分享,每次分享大約1-2個章節。

是的,我們邁出了第一步

我們其實是在項目結束后一周內,確定好閱讀範圍,確定好分享人,確定好分享時長兩小時。

流程分為依次提問和自由提問,分享人可以拿任何的分享資料分享(doc,表格)都行。

目的就是讓大家心裏有一定的概念並對當前最容易實行的一個敏捷方法進行深入討論。

這是當時分享人的一個分享大綱

第一次分享,大家也在適應遮掩的一種分享狀態。

整體過程很順利也夠味,如果能夠再放鬆一些那就更好了,大家說著說著就說成了項目總結會~

從這次會議中的一些啟發

從有了開這種分享會的想法到實行,總結下來就是如果大家都抱有期待並且準備充足那就先試試。有想法就嘗試。

一些之前的想去執行的想法,當下就會執行嘗試。

站會上的小鴨子

終於到了標題中的這一部分。之前讀敏捷相關的書籍時,很多會提到站會上需要一個像權力交接棒的實際物品來標識你的權力~

以前我們開站會,時間和習慣都不錯,到時間我提醒大家開會,大家湊一堆開始開會。

但是,總覺得缺少一些什麼讓整個會議顯得有點太過於形式。

我想起了開始的那句話:有一個實際的權力物品來做交接。

看吧,就是這隻小鴨子

改變

每天的站會,不同的主持人,主持人的象徵就是擁有這隻小鴨子~

就是誰拿到這隻小鴨子,誰就是站會的負責人和支持人。

8:40 他會在群里喊,然後會議上主持,結束后流動到另一位組織者手裡~

大家對站會接受度更高了,對這個項目也有了一些感情,團隊的人的默契和氛圍會默默提高很多。

這種感覺是潛移默化的,也是需要我們隨時提醒和建立的。

再回過頭來說那次敏捷分享的效果,我們之前是開發,然後測試,最後上線。

我們雖然是分階段提測但是沒有執行分階段測試。

所以這次新版本中我們“打成”一致嘗試分階段提測後分階段測試。就是 開發-測試交替進行。

有沒有難度呢?有,就是大家的時間和專註度會受到衝擊。

對於實踐這個開發、測試交替進行我們承認一些東西也相信一些東西。

我們組內也承認起初實踐起來肯定會比之前的模式有一定的不適應,但是我們依然信心。

希望可以踴躍暴漏問題不管是個人還是團隊不管是心情還是問題,都暴露出來。

我們組內達成了一致默默嘗試了一下,我們重視了組內的衝刺總結會,其實從總結會上我們收穫還是很多的。

一些開發和溝通問題,組內他們能夠自組織去主動解決,根據目前組內的主動性,我覺得我不需要關心。

我重點解決了一下時間和專註度衝突的問題。

開發正在開發下一個階段,測試正在測試你上一個階段,突然一個bug給你你煩不煩?開不開心?

–你是馬上斷開思路去改正?還是抱着忐忑的心情繼續開發?

開發修復完bug,馬上回給測試,測試此時正在測試下一個階段,

–你是斷開思路馬上去回歸?還是思索着會不會影響後面的功能的心情測試下去?

達成一致

我們就問題討論,最終達成一致

當然我們還依然堅信,剛開始實踐期間肯定會有不適應和一些問題。

但是我們還堅持要解決問題並把問題的根因找出來,想辦法解決。這是毋庸置疑的。

分階段提測,大家達成一致是因為每一次提交的功能不是很多,測試測試之前可以先在群里發出通知。

開發就一些工作進行微調,便於後面的bug修改。

測試發通知–3:00統一提bug–開發4:00開始修復–測試最晚命題談進行回歸

還有一點就是,對於流程和嚴重bug,隨時支持,沒有理由。這個大家也是認可的。

總結

其實這一段時間對於我到其他社區進行了一次敏捷分享;

對於項目組,我們使用權力交接棒–小黃鴨,進行大家主人翁意識的交接培養。

對於項目,我們實踐衝刺,我們開始重視每一次的衝刺總結會,

我們希望團隊中任何一個人都可以發現問題並自己發起會議(當然也是集中遇到問題或者集中解決問題)

我們實踐開發-測試交替進行的模式,為什麼呢?

這個原因我們的測試總結得很好,縮短時間其次,重要的是測試可以提前參与,提前發現問題,而不是最後採取發現問題。

我理解的是通過測試的力量在過程中來保駕護航,而不是在項目最後去修修補補。

最近感觸很大,當一些行動獲得大家認可的時候,大家互相能理解,大家也都互相提建議。

為了讓項目更好,讓我們更加自主,你好,我好,項目好。你提高,我提高,項目也提高。

當前的應屆生的學習能力和融合能力確實很強。

作為項目經理除了要保證項目交付這個基本目標以外,也是希望大家從項目中能夠獲得一些東西。我更希望是自驅的動力~

如果可以,可以把項目中的好的實踐帶到其他項目中去,去默默影響他人。

堅持自己,堅持自己的那些原則。獲得好的優秀的經驗,慢慢影響他人。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益