socketserver模塊使用與源碼分析
前言
在前面的學習中我們其實已經可以通過socket
模塊來建立我們的服務端,並且還介紹了關於TCP協議的粘包問題。但是還有一個非常大的問題就是我們所編寫的Server端是不支持併發性服務的,在我們之前的代碼中只能加入一個通信循環來進行排隊式的單窗口一對一服務。那麼這一篇文章將主要介紹如何使用socketserver
模塊來建立具有併發性的Server端。
基於TCP協議的socketserver服務端
我們先看它的一段代碼,對照代碼來看功能。
#!/usr/bin/env python3
# _*_ coding:utf-8 _*_
# ==== 使用socketserver創建支持多併發性的服務器 TCP協議 ====
import socketserver
class MyServer(socketserver.BaseRequestHandler):
"""自定義類"""
def handle(self):
"""handle處理請求"""
print("雙向鏈接通道建立完成:", self.request) # 對於TCP協議來說,self.request相當於雙向鏈接通道conn,即accept()的第一部分
print("客戶端的信息是:", self.client_address) # 對於TCP協議來說,相當於accept()的第二部分,即客戶端的ip+port
while 1: # 開始內層通信循環
try: # # bug修復:針對windows環境
data = self.request.recv(1024)
if not data:
break # bug修復:針對類UNIX環境
print("收到客戶機[{0}]的消息:[{1}]".format(self.client_address, data))
self.request.sendall(data.upper()) # #sendall是重複調用send.
except Exception as e:
break
self.request.close() # 當出現異常情況下一定要關閉鏈接
if __name__ == '__main__':
s1 = socketserver.ThreadingTCPServer(("0.0.0.0", 6666), MyServer) # 公網服務器綁定 0.0.0.0 私網測試為 127.0.0.1
s1.serve_forever() # 啟動服務
1.導入socketserver
模塊
2.創建一個新的類,並繼承socketserver.BaseRequestHandler
類
3.覆寫handle
方法,對於TCP協議來說,self.request
相當於雙向鏈接通道conn
,self.client_address
相當於被服務方的ip和port信息,也就是addr
,而整個handle
方法相當於鏈接循環。
4.寫入收發邏輯規則
5.防止客戶端發送空的消息已致雙方卡死
6.防止客戶端突然斷開已致服務端崩潰
7.粘包優化(可選)
8.實例化 socketserver.ThreadingTCPServer
類,並傳入IP+port,以及剛寫好的類名
9.使用socketserver.ThreadingTCPServer
實例化對象中的server_forever( )
方法啟動服務
它其實是這樣的:
我們不用管鏈接循環,因為在執行handle
方法之前內部已經幫我們做好了。當我們使用serve_forever()
方法的時候便開始監聽鏈接描述符對象,一旦有鏈接請求就創建一個子線程來處理該鏈接。
基於UDP協議的socketserver服務端
基於UDP協議的socketserver
服務端與基於TCP協議的socketserver
服務端大相徑庭,但是還是有幾點不太一樣的地方。
對TCP來說:
self.request = 雙向鏈接通道(conn)
對UDP來說:
self.request = (client_data_byte,udp的套接字對象)
#!/usr/bin/env python3
# _*_ coding:utf-8 _*_
# ==== 使用socketserver創建支持多併發性的服務器 UDP協議 ====
import socketserver
class MyServer(socketserver.BaseRequestHandler):
"""自定義類"""
def handle(self):
"""handle處理請求"""
# 由於UDP是基於消息的協議,故根本不用通信循環
data = self.request[0] # 對於UDP協議來說,self.request其實是個元組。第一個元素是消息內容主題(Bytes類型),相當於recvfrom()的第一部分
server = self.request[1] # 第二個元素是服務端本身,即自己
print("客戶端的信息是:", self.client_address) # 對於UDP協議來說,相當於recvfrom()的第二部分,即客戶端的ip+port
print("收到客戶機[{0}]的消息:[{1}]".format(self.client_address, data))
server.sendto(data.upper(),self.client_address)
if __name__ == '__main__':
s1 = socketserver.ThreadingUDPServer(("0.0.0.0", 6666), MyServer) # 公網服務器綁定 0.0.0.0 私網測試為 127.0.0.1
s1.serve_forever() # 啟動服務
擴展:socketserver源碼分析
探索socketserver中的繼承關係
好了,接下來我們開始剖析socketserver
模塊中的源碼部分。在Pycharm下使用CTRL+鼠標左鍵
,可以進入源碼進行查看。
我們在查看源碼前一定要首先要明白兩點:
socketserver
類分為兩部分,其一是server
類主要是負責處理鏈接方面,另一類是request
類主要負責處理通信方面。
好了,請在腦子里記住這個概念。我們來看一些socketserver
模塊的實現用了哪些其他的基礎模塊。
注意,接下來的源碼註釋部分我並沒有在源代碼中修改,也請讀者不要修改源代碼的任何內容。
import socket # 這模塊挺熟悉吧
import selectors # 這個是一個多線程模塊,主要支持I/O多路復用。
import os # 老朋友了
import sys # 老朋友
import threading # 多線程模塊
from io import BufferedIOBase # 讀寫相關的模塊
from time import monotonic as time # 老朋友time模塊
socketserver中用到的基礎模塊
好了,讓我們接着往下走。可以看到一個變量__all__
,是不是覺得很熟悉?就是我們使用 from xxx import xxx
能導入進的東西全是被__all__
控制的,我們看一下它包含了哪些內容。
__all__ = ["BaseServer", "TCPServer", "UDPServer",
"ThreadingUDPServer", "ThreadingTCPServer",
"BaseRequestHandler", "StreamRequestHandler",
"DatagramRequestHandler", "ThreadingMixIn"]
# 這個是我們原本的 __all__ 中的值。
if hasattr(os, "fork"):
__all__.extend(["ForkingUDPServer","ForkingTCPServer", "ForkingMixIn"])
if hasattr(socket, "AF_UNIX"):
__all__.extend(["UnixStreamServer","UnixDatagramServer",
"ThreadingUnixStreamServer",
"ThreadingUnixDatagramServer"])
# 上面兩個if判斷是給__all__添加內容的,os.fork()這個方法是創建一個新的進程,並且只在類UNIX平台下才有效,Windows平台下是無效的,所以這裏對於Windows平台來說就from socketserver import xxx 肯定少了三個類,這三個類的作用我們接下來會聊到。而關於socket中的AF_UNIX來說我們其實已經學習過了,是基於文件的socket家族。這在Windows上也是不支持的,只有在類UNIX平台下才有效。所以Windows平台下的導入又少了4個類。
# poll/select have the advantage of not requiring any extra file descriptor,
# contrarily to epoll/kqueue (also, they require a single syscall).
if hasattr(selectors, 'PollSelector'):
_ServerSelector = selectors.PollSelector
else:
_ServerSelector = selectors.SelectSelector
# 這兩個if還是做I/O多路復用使用的,Windows平台下的結果是False,而類Unix平台下的該if結果為True,這關乎I/O多路復用的性能選擇。到底是select還是poll或者epoll。
socketserver模塊對於from xxx import * 導入的處理
我們接着向下看源碼,會看到許許多多的類。先關掉它來假設自己是解釋器一行一行往下走會去執行那個部分。首先是一條if
判斷
if hasattr(os, "fork"):
class ForkingMixIn:
pass # 這裏我自己省略了
# 我們可以看見這條代碼是接下來執行的,它意思還是如果在類Unix環境下,則會去創建該類。如果在Windows平台下則不會創建該類
處理點一
繼續走,其實這種if
判斷再創建類的地方還有兩處。我這裏全部列出來:
if hasattr(os, "fork"):
class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
if hasattr(socket, 'AF_UNIX'):
class UnixStreamServer(TCPServer):
address_family = socket.AF_UNIX
class UnixDatagramServer(UDPServer):
address_family = socket.AF_UNIX
class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
處理點二 and 三
好了,說完了大體粗略的一個流程,我們該來研究這裏面的類都有什麼作用,這裏可以查看每個類的文檔信息。大致如下:
前面已經說過,socketserver
模塊中主要分為兩大類,我們就依照這個來進行劃分。
socketserver模塊源碼內部class功能一覽 |
處理鏈接相關 |
|
BaseServer |
基礎鏈接類 |
TCPServer |
TCP協議類 |
UDPServer |
UDP協議類 |
UnixStreamServer |
文件形式字節流類 |
UnixDatagramServer |
文件形式數據報類 |
處理通信相關 |
|
BaseRequestHandler |
基礎請求處理類 |
StreamRequestHandler |
字節流請求處理類 |
DatagramRequestHandler |
數據報請求處理類 |
多線程相關 |
|
ThreadingMixIn |
線程方式 |
ThreadingUDPServer |
多線程UDP協議服務類 |
ThreadingTCPServer |
多線程TCP協議服務類 |
多進程相關 |
|
ForkingMixIn |
進程方式 |
ForkingUDPServer |
多進程UDP協議服務類 |
ForkingTCPServer |
多進程TCP協議服務類 |
他們的繼承關係如下:
ForkingUDPServer(ForkingMixIn, UDPServer)
ForkingTCPServer(ForkingMixIn, TCPServer)
ThreadingUDPServer(ThreadingMixIn, UDPServer)
ThreadingTCPServer(ThreadingMixIn, TCPServer)
StreamRequestHandler(BaseRequestHandler)
DatagramRequestHandler(BaseRequestHandler)
處理鏈接相關
處理通信相關
多線程相關
總繼承關係(處理通信相關的不在其中,並且不包含多進程)
最後補上一個多進程的繼承關係,就不放在總繼承關係中了,容易圖形造成混亂。
多進程相關
實例化過程分析
有了繼承關係我們可以來模擬實例化的過程,我們以TCP協議為準:
socketserver.ThreadingTCPServer(("0.0.0.0", 6666), MyServer)
我們點進(選中上面代碼的ThradingTCPServe
r部分,CTRL+鼠標左鍵
)源碼部分,查找其 __init__
方法:
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
看來沒有,那麼就找第一父類有沒有,我們點進去可以看到第一父類ThreadingMixIn
也沒有__init__
方法,看上面的繼承關係圖可以看出是普通多繼承,那麼就是廣度優先的查找順序。我們來看第二父類TCPServer
中有沒有,看來第二父類中是有__init__
方法的,我們詳細來看。
class TCPServer(BaseServer):
"""註釋全被我刪了,影響視線"""
address_family = socket.AF_INET # 基於網絡的套接字家族
socket_type = socket.SOCK_STREAM # TCP(字節流)協議
request_queue_size = 5 # 消息隊列最大為5,可以理解為backlog,即半鏈接池的大小
allow_reuse_address = False # 端口重用默認關閉
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
# 可以看見,上面先是調用了父類的__init__方法,然後又實例化出了一個socket對象!所以我們先不着急往下看,先看其父類中的__init__方法。
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
TCPServer中的__init__()
來看一下,BaseServer
類中的__init__
方法。
class BaseServer:
"""註釋依舊全被我刪了"""
timeout = None # 這個變量可以理解為超時時間,先不着急說他。先看 __init__ 方法
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address # 即我們傳入的 ip+port ("0.0.0.0", 6666)
self.RequestHandlerClass = RequestHandlerClass # 即我們傳入的自定義類 MyServer
self.__is_shut_down = threading.Event() # 這裏可以看到執行了該方法,這裏先不詳解,因為它是一個事件鎖,所以不用管
self.__shutdown_request = False
BaseServer中的__init__()
在BaseServer
中執行了thrading
模塊下的Event()
方法。我這裏還是提一嘴這個方法是幹嘛用的,它會去控制線程的啟動順序,這裏實例化出的self.__is_shut_down
其實就是一把鎖,沒什麼深究的,接下來的文章中我也會寫到。我們繼續往下看,現在是該回到TCPServer
的__init__
方法中來了。
class TCPServer(BaseServer):
"""註釋全被我刪了,影響視線"""
address_family = socket.AF_INET # 基於網絡的套接字家族
socket_type = socket.SOCK_STREAM # TCP(字節流)協議
request_queue_size = 5 # 消息隊列最大為5,可以理解為backlog,即半鏈接池的大小
allow_reuse_address = False # 端口重用默認關閉
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): # 看這裏!!!!
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate: # 在創建完socket對象后就會進行該判斷。默認參數bind_and_activate就是為True
try:
self.server_bind() # 現在進入該方法查看細節
self.server_activate()
except:
self.server_close()
raise
TCPServer中的__init__()
好了,需要找這個self.bind()
方法,還是從頭開始找。實例本身沒有,第一父類ThreadingMixIn
也沒有,所以現在我們看的是TCPServer
的server_bind()
方法:
def server_bind(self):
"""Called by constructor to bind the socket.
May be overridden.
"""
if self.allow_reuse_address: # 這裏的變量對應 TCPServer.__init__ 上面定義的類方法,端口重用這個。由於是False,所以我們直接往下執行。
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address) # 綁定 ip+port 即 ("0.0.0.0", 6666)
self.server_address = self.socket.getsockname() # 獲取socket的名字 其實還是 ("0.0.0.0", 6666)
TCPServer中的server_bind()
現在我們該看TCPServer
下的server_activate()
方法了。
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
self.socket.listen(self.request_queue_size) # 其實就是監聽半鏈接池,backlog為5
TCPServer中的server_activate()
這個時候沒有任何異常會拋出的,所以我們已經跑完了整個實例化的流程。並將其賦值給s1
現在我們看一下s1
的__dict__
字典,再接着進行源碼分析。
{'server_address': ('0.0.0.0', 6666), 'RequestHandlerClass': <class '__main__.MyServer'>, '_BaseServer__is_shut_down': <threading.Event object at 0x000002A96A0208E0>, '_BaseServer__shutdown_request': False, 'socket': <socket.socket fd=716, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 6666)>}
s1的__dict__
server_forever()啟動服務分析
我們接着來看下一條代碼。
s1.serve_forever()
還是老規矩,由於s1
是ThreadingTCPServer
類的實例對象,所以我們去一層層的找serve_forever()
,最後在BaseServer
類中找到了。
def serve_forever(self, poll_interval=0.5):
"""註釋被我刪了"""
self.__is_shut_down.clear() # 上面說過了那個Event鎖,控制子線程的啟動順序。這裏的clear()代表清除,這個不是重點,往下看。
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)# 這裡是設置了一個監聽類型為讀取事件。也就是說當有請求來的時候當前socket對象就會發生反應。
while not self.__shutdown_request: # 為False,會執行,注意!下面都是死循環了!!!
ready = selector.select(poll_interval) # 設置最大監聽時間為0.5s
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request: # BaseServer類中的類方法,為False,所以不執行這個。
break
if ready: # 代表有鏈接請求會執行下面的方法
self._handle_request_noblock() # 這兒是比較重要的一個點。我們先來看。
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set() # 這裡是一個釋放鎖的行為
BaseServer中的serve_forever()
如果有鏈接請求,則會執行self._handle_request_noblock()
方法,它在哪裡呢?剛好這個方法就在BaseServer
中serve_forever()
方法的正下方第4個方法的位置。
def _handle_request_noblock(self):
"""註釋被我刪了"""
try:
request, client_address = self.get_request() # 這裏的這個方法在TCPServer中,它的return值是 self.socket.accept(),就是就是返回了元組然後被解壓賦值了。其實到這一步三次握手監聽已經開啟了。
except OSError:
return
if self.verify_request(request, client_address): # 這個是驗證ip和port,返回的始終是True
try:
self.process_request(request, client_address) # request 雙向鏈接通道,client_address客戶端ip+port。現在我們來找這個方法。
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
BaseServer中的_handle_request_noblock()
現在開始查找self.process_request(request, client_address)
該方法,還是先從實例對象本身找,找不到去第一父類找。他位於第一父類ThreadingMixIn
中。
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address)) # 創建子線程!!看這裏!
t.daemon = self.daemon_threads # ThreadingMixIn的類屬性,為False
if not t.daemon and self.block_on_close: # 第一個值為False,第二個值為True。他們都是ThreadingMixIn的類屬性
if self._threads is None: # 會執行
self._threads = [] # 創建了空列表
self._threads.append(t) # 將當前的子線程添加至空列表中
t.start() # 開始當前子線程的運行,即運行self.process_request_thread方法
ThreadingMixIn中的process_request()
我們可以看到,這裏的target
參數中指定了一個方法self.process_request_thread
,其實意思就是說當這個線程t
在start
的時候會去執行該方法。我們看一下它都做了什麼,這個方法還是在ThreadingMixIn
類中。
def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
try:
self.finish_request(request, client_address) # 可以看到又執行該方法了,這裏我再標註一下,別弄頭暈了。request 雙向鏈接通道,client_address客戶端ip+port。
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request) # 它不會關閉這個線程,而是將其設置為wait()狀態。
ThreadingMixIn中的 process_request_thread()
看self.finish_request()
方法,它在BaseServer
類中
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self) # 這裡是幹嘛?其實就是在進行實例化!
BaseServer中的finish_request
self.RequestHandlerClass(request, client_address, self)
,我們找到self
的__dict__
字典,看看這個到底是什麼東西
{'server_address': ('0.0.0.0', 6666), 'RequestHandlerClass': <class '__main__.MyServer'>, '_BaseServer__is_shut_down': <threading.Event object at 0x000002A96A0208E0>, '_BaseServer__shutdown_request': False, 'socket': <socket.socket fd=716, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 6666)>}
s1的__dict__
可以看到,它就是我們傳入的那個類,即自定義的MyServer
類。我們把request,client_address,以及整個是實例self傳給了MyServer的__init__
方法。但是我們的MyServer類沒有__init__
,怎麼辦呢?去它父類BaseRequestHandler
裏面找唄。
class BaseRequestHandler:
"""註釋被我刪了"""
def __init__(self, request, client_address, server):
self.request = request # request 雙向鏈接通道
self.client_address = client_address # 客戶端ip+port
self.server = server # 即 實例對象本身。上面的__dict__就是它的__dict__
self.setup() # 鈎子函數,我們可以自己寫一個類然後繼承`BaseRequestHandler`並覆寫其setup方法即可。
try:
self.handle() # 看,自動執行handle
finally:
self.finish() # 鈎子函數
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
BaseRequestHandler中的__init__
現在我們知道了,為什麼一定要覆寫handle
方法了吧。
socketserver內部調用順序流程圖(基於TCP協議)
實例化過程圖解
server_forever()啟動服務圖解
擴展:驗證鏈接合法性
在很多時候,我們的TCP服務端為了防止網絡泛洪可以設置一個三次握手驗證機制。那麼這個驗證機制的實現其實也是非常簡單的,我們的思路在於進入通信循環之前,客戶端和服務端先走一次鏈接認證,只有通過認證的客戶端才能夠繼續和服務端進行鏈接。
下面就來看一下具體的實現步驟。
#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'
from socket import *
import hmac,os
secret_key=b'linhaifeng bang bang bang'
def conn_auth(conn):
'''
認證客戶端鏈接
:param conn:
:return:
'''
print('開始驗證新鏈接的合法性')
msg=os.urandom(32) # 新方法,生成32位隨機Bytes類型的值
conn.sendall(msg)
h=hmac.new(secret_key,msg)
digest=h.digest()
respone=conn.recv(len(digest))
return hmac.compare_digest(respone,digest) # 對比結果為True或者為False
def data_handler(conn,bufsize=1024):
if not conn_auth(conn):
print('該鏈接不合法,關閉')
conn.close()
return
print('鏈接合法,開始通信')
while True:
data=conn.recv(bufsize)
if not data:break
conn.sendall(data.upper())
def server_handler(ip_port,bufsize,backlog=5):
'''
只處理鏈接
:param ip_port:
:return:
'''
tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(backlog)
while True:
conn,addr=tcp_socket_server.accept()
print('新連接[%s:%s]' %(addr[0],addr[1]))
data_handler(conn,bufsize)
if __name__ == '__main__':
ip_port=('127.0.0.1',9999)
bufsize=1024
server_handler(ip_port,bufsize)
Server端
#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'
from socket import *
import hmac,os
secret_key=b'linhaifeng bang bang bang'
def conn_auth(conn):
'''
驗證客戶端到服務器的鏈接
:param conn:
:return:
'''
msg=conn.recv(32) # 拿到隨機位數
h=hmac.new(secret_key,msg) # 摻鹽
digest=h.digest()
conn.sendall(digest)
def client_handler(ip_port,bufsize=1024):
tcp_socket_client=socket(AF_INET,SOCK_STREAM)
tcp_socket_client.connect(ip_port)
conn_auth(tcp_socket_client)
while True:
data=input('>>: ').strip()
if not data:continue
if data == 'quit':break
tcp_socket_client.sendall(data.encode('utf-8'))
respone=tcp_socket_client.recv(bufsize)
print(respone.decode('utf-8'))
tcp_socket_client.close()
if __name__ == '__main__':
ip_port=('127.0.0.1',9999)
bufsize=1024
client_handler(ip_port,bufsize)
Client端
到這裏已經很簡單了,服務器將隨機數給客戶機發過去,客戶機收到后也用自家的鹽與隨機數加料,再使用digest()
將它轉化為字節,直接發送了回來然後客戶端通過hmac.compare_digest()
方法驗證兩個的值是否相等,如果不等就說明鹽不對。客戶機不合法服務端將會關閉與該客戶機的雙向鏈接通道。
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"
※網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線
※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整
※南投搬家費用,距離,噸數怎麼算?達人教你簡易估價知識!
※教你寫出一流的銷售文案?