※想知道最厲害的網頁設計公司嚨底家"!
RWD(響應式網頁設計)是透過瀏覽器的解析度來判斷要給使用者看到的樣貌
作為公司的公共產品,經常有這樣的需求:就是新建一個本地服務,產品線作為客戶端通過 tcp 接入本地服務,來獲取想要的業務能力。
與印象中動輒處理成千上萬連接的 tcp 網絡服務不同,這個本地服務是跑在客戶機器上的,Win32 上作為開機自啟動的 windows 服務運行;
Linux 上作為 daemon 在後台運行。總的說來就是用於接收幾個產品進程的連接,因此輕量化是其最重要的要求,在這個基礎上要能兼顧跨平台就可以了。
其實主要就是 windows,再兼顧一點兒 linux。
考察了幾個現有的開源網絡框架,從 ACE 、boost::asio 到 libevent,都有不盡於人意的地方:
a) ACE:太重,只是想要一個網絡框架,結果它扒拉扒拉一堆全提供了,不用還不行;
b) boost::asio:太複雜,牽扯到 boost 庫,並且引入了一堆 c++ 模板,需要高版本 c++ 編譯器支持;
c) libevent:這個看着不錯,當時確實用這個做底層封裝了一版,結果發版后發現一個比較致命的問題,導致在防火牆設置比較嚴格的機器上初始化失敗,這個後面我會詳細提到。
其它的就更不用說了,之前也粗略看過陳碩的 muddo,總的感覺吧,它是基於其它開源框架不足地方改進的一個庫,有相當可取的地方,但是這個改進的方向也主要是解決更大併發、更多連接,不是我的痛點,所以沒有繼續深入研究。
好了,與其在不同開源框架之間糾結,不如自己動手寫一個。
反正我的場景比較固定,不用像它們那樣面面俱,我給自己羅列了一些這個框架需要支持基本的功能:
1)同步寫、異步讀;
2)可同時監聽多路事件,基於 1)這裏只針對異步 READ 事件(包含連接進入、連接斷開),寫數據是同步的,因而不需要處理異步 WRITE 事件;
3)要有設置一次性和周期性定時器的能力 (業務決定的);
4)不需要處理信號 (windows 上也沒信號這一說,linux 自己搞搞 sigaction 就好啦);
……
雖然這個框架未來只會運行在用戶的單機上,但是我不希望它一出生就帶有性能缺陷,所以性能平平的 select 沒能進入我的法眼,我決定給它裝上最強大的心臟:
Windows 平台: iocp
Linux 平台:epoll
ok,從需求到底層技術路線,貌似都講清楚了,依照 libevent 我給它取名為 gevent,下面我們從代碼級別看下這個框架是怎麼簡化 tcp 服務搭建這類工作的。
首先看一下這個 tcp 服務框架的 sample:
svc_handler.h
1 #include "EventBase.h"
2 #include "EventHandler.h"
3
4 class GMyEventBase : public GEventBase
5 {
6 public:
7 GEventHandler* create_handler ();
8 };
9
10
11 class svc_handler : public GJsonEventHandler
12 {
13 public:
14 virtual ~svc_handler () {}
15 virtual void on_read_msg (Json::Value const& val);
16 };
epoll_svc.cpp
1 #include <stdio.h>
2 #include "svc_handler.h"
3 #include <signal.h>
4
5 GMyEventBase g_base;
6 GEventHandler* GMyEventBase::create_handler ()
7 {
8 return new svc_handler;
9 }
10
11 void sig_int (int signo)
12 {
13 printf ("%d caught\n", signo);
14 g_base.exit (1);
15 printf ("exit ok\n");
16 }
17
18 int main (int argc, char *argv[])
19 {
20 if (argc < 2)
21 {
22 printf ("usage: epoll_svc port\n");
23 return -1;
24 }
25
26 unsigned short port = atoi (argv[1]);
27
28 #ifndef WIN32
29 struct sigaction act;
30 act.sa_handler = sig_int;
31 sigemptyset(&act.sa_mask);
32 act.sa_flags = SA_RESTART;
33 if (sigaction (SIGINT, &act, NULL) < 0)
34 {
35 printf ("install SIGINT failed, errno %d\n", errno);
36 return -1;
37 }
38 else
39 printf ("install SIGINT ok\n");
40 #endif
41
42 // to test small message block
43 if (g_base.init (/*8, 10*/) < 0)
44 return -1;
45
46 printf ("init ok\n");
47 do
48 {
49 if (!g_base.listen (port))
50 {
51 g_base.exit (0);
52 printf ("exit ok\n");
53 break;
54 }
55
56 printf ("listen ok\n");
57 g_base.run ();
58 printf ("run over\n");
59 } while (0);
60
61 g_base.fini ();
62 printf ("fini ok\n");
63
64 g_base.cleanup ();
65 printf ("cleanup ok\n");
66 return 0;
67 }
這個服務的核心是 GMyEventBase 類,它使用了框架中的 GEventBase 類,從後者派生而來,
只改寫了一個 create_handler 接口來提供我們的事件處理對象 svc_handler,它是從框架中的 GEventHandler 派生而來,
svc_handler 只改寫了一個 on_read_msg 來處理 Json 格式的消息輸入。
程序的運行就是分別調用 GMyEventBase(實際上是GEventBase) 的 init / listen / run / fini / cleaup 方法。
而與業務相關的代碼,都在 svc_handler 中處理:
svc_handler.cpp
1 #include "svc_handler.h"
2
3 void svc_handler::on_read_msg (Json::Value const& val)
4 {
5 int key = val["key"].asInt ();
6 std::string data = val["data"].asString ();
7 printf ("got %d:%s\n", key, data.c_str ());
8
9 Json::Value root;
10 Json::FastWriter writer;
11 root["key"] = key + 1;
12 root["data"] = data;
13
14 int ret = 0;
15 std::string resp = writer.write(root);
16 resp = resp.substr (0, resp.length () - 1); // trim tailing \n
17 if ((ret = send (resp)) <= 0)
18 printf ("send response failed, errno %d\n", errno);
19 else
20 printf ("response %d\n", ret);
21 }
它期待 Json 格式的數據,並且有兩個字段 key(int) 與 data (string),接收數據后將 key 增 1 后返回給客戶端。
再來看下客戶端 sample:
clt_handler.h
1 #include "EventBaseAR.h"
2 #include "EventHandler.h"
3
4 class GMyEventBase : public GEventBaseWithAutoReconnect
5 {
6 public:
7 GEventHandler* create_handler ();
8 };
9
10
11 class clt_handler : public GJsonEventHandler
12 {
13 public:
14 virtual ~clt_handler () {}
15 #ifdef TEST_TIMER
16 virtual bool on_timeout (GEV_PER_TIMER_DATA *gptd);
17 #endif
18 virtual void on_read_msg (Json::Value const& val);
19 };
epoll_clt.cpp
1 #include <stdio.h>
2 #include "clt_handler.h"
3 #include <signal.h>
4
5 //#define TEST_READ
6 //#define TEST_CONN
7 //#define TEST_TIMER
8
9 GMyEventBase g_base;
10 GEventHandler* GMyEventBase::create_handler ()
11 {
12 return new clt_handler;
13 }
14
15
16 int sig_caught = 0;
17 void sig_int (int signo)
18 {
19 sig_caught = 1;
20 printf ("%d caught\n", signo);
21 g_base.exit (0);
22 printf ("exit ok\n");
23 }
24
25 void do_read (GEventHandler *eh, int total)
26 {
27 char buf[1024] = { 0 };
28 int ret = 0, n = 0, key = 0, err = 0;
29 char *ptr = nullptr;
30 while ((total == 0 || n++ < total) && fgets (buf, sizeof(buf), stdin) != NULL)
31 {
32 // skip \n
33 buf[strlen(buf) - 1] = 0;
34 //n = sscanf (buf, "%d", &key);
35 key = strtol (buf, &ptr, 10);
36 if (ptr == nullptr)
37 {
38 printf ("format: int string\n");
39 continue;
40 }
41
42 Json::Value root;
43 Json::FastWriter writer;
44 root["key"] = key;
45 // skip space internal
46 root["data"] = *ptr == ' ' ? ptr + 1 : ptr;
47
48 std::string req = writer.write (root);
49 req = req.substr (0, req.length () - 1); // trim tailing \n
50 if ((ret = eh->send (req)) <= 0)
51 {
52 err = 1;
53 printf ("send %d failed, errno %d\n", req.length (), errno);
54 break;
55 }
56 else
57 printf ("send %d\n", ret);
58 }
59
60 if (total == 0)
61 printf ("reach end\n");
62
63 if (!err)
64 {
65 eh->disconnect ();
66 printf ("call disconnect to notify server\n");
67 }
68
69 // wait receiving thread
70 //sleep (3);
71 // if use press Ctrl+D, need to notify peer our break
72 }
73
74 #ifdef TEST_TIMER
75 void test_timer (unsigned short port, int period_msec, int times)
76 {
77 int n = 0;
78 GEventHandler *eh = nullptr;
79
80 do
81 {
82 eh = g_base.connect (port);
83 if (eh == nullptr)
84 break;
85
86 printf ("connect ok\n");
87 void* t = g_base.timeout (1000, period_msec, eh, NULL);
88 if (t == NULL)
89 {
90 printf ("timeout failed\n");
91 break;
92 }
93 else
94 printf ("set timer %p ok\n", t);
95
96 // to wait timer
97 do
98 {
99 sleep (400);
100 printf ("wake up from sleep\n");
101 } while (!sig_caught && n++ < times);
102
103 g_base.cancel_timer (t);
104 } while (0);
105 }
106 #endif
107
108 #ifdef TEST_CONN
109 void test_conn (unsigned short port, int per_read, int times)
110 {
111 # ifdef WIN32
112 srand (GetCurrentProcessId());
113 # else
114 srand (getpid ());
115 # endif
116 int n = 0, elapse = 0;
117 clt_handler *eh = nullptr;
118
119 do
120 {
121 eh = (clt_handler *)g_base.connect (port);
122 if (eh == nullptr)
123 break;
124
125 printf ("connect ok\n");
126
127 do_read (eh, per_read);
128 # ifdef WIN32
129 elapse = rand() % 1000;
130 Sleep(elapse);
131 printf ("running %d ms\n", elapse);
132 # else
133 elapse = rand () % 1000000;
134 usleep (elapse);
135 printf ("running %.3f ms\n", elapse/1000.0);
136 # endif
137
138 } while (!sig_caught && n++ < times);
139 }
140 #endif
141
142 #ifdef TEST_READ
143 void test_read (unsigned short port, int total)
144 {
145 int n = 0;
146 GEventHandler *eh = nullptr;
147
148 do
149 {
150 eh = g_base.connect (port);
151 if (eh == nullptr)
152 break;
153
154 printf ("connect ok\n");
155 do_read (eh, total);
156 } while (0);
157 }
158 #endif
159
160 int main (int argc, char *argv[])
161 {
162 if (argc < 2)
163 {
164 printf ("usage: epoll_clt port\n");
165 return -1;
166 }
167
168 unsigned short port = atoi (argv[1]);
169
170 #ifndef WIN32
171 struct sigaction act;
172 act.sa_handler = sig_int;
173 sigemptyset(&act.sa_mask);
174 // to ensure read be breaked by SIGINT
175 act.sa_flags = 0; //SA_RESTART;
176 if (sigaction (SIGINT, &act, NULL) < 0)
177 {
178 printf ("install SIGINT failed, errno %d\n", errno);
179 return -1;
180 }
181 #endif
182
183 if (g_base.init (2) < 0)
184 return -1;
185
186 printf ("init ok\n");
187
188 #if defined(TEST_READ)
189 test_read (port, 0); // 0 means infinite loop until user break
190 #elif defined(TEST_CONN)
191 test_conn (port, 10, 100);
192 #elif defined (TEST_TIMER)
193 test_timer (port, 10, 1000);
194 #else
195 # error please define TEST_XXX macro to do something!
196 #endif
197
198 if (!sig_caught)
199 {
200 // Ctrl + D ?
201 g_base.exit (0);
202 printf ("exit ok\n");
203 }
204 else
205 printf ("has caught Ctrl+C\n");
206
207 g_base.fini ();
208 printf ("fini ok\n");
209
210 g_base.cleanup ();
211 printf ("cleanup ok\n");
212 return 0;
213 }
客戶端同樣使用了 GEventBase 的派生類 GMyEventBase 來作為事件循環的核心,所不同的是(注意並非之前例子里的那個類,雖然同名),它提供了 clt_handler 來處理自己的業務代碼。
另外為了提供連接中斷後自動向服務重連的功能,這裏 GMyEventBase 派生自 GEventBase 類的子類 GEventBaseWithAutoReconnect (位於 EventBaseAR.h/cpp 中)。
程序的運行是分別調用 GEventBase 的 init / connect / fini / cleaup 方法以及 GEventHandler 的 send / disconnect 來測試讀寫與連接。
定義宏 TEST_READ 用來測試讀寫;定義宏 TEST_CONN 可以測試連接的通斷及讀寫;定義宏 TEST_TIMER 來測試周期性定時器及讀寫。它們是互斥的。
clt_handler 主要用來異步接收服務端的回送數據並打印:
clt_handler.cpp
1 #include "clt_handler.h"
2
3 #ifdef TEST_TIMER
4 extern void do_read (clt_handler *, int);
5 bool clt_handler::on_timeout (GEV_PER_TIMER_DATA *gptd)
6 {
7 printf ("time out ! id %p, due %d, period %d\n", gptd, gptd->due_msec, gptd->period_msec);
8 do_read ((clt_handler *)gptd->user_arg, 1);
9 return true;
10 }
11 #endif
12
13 void clt_handler::on_read_msg (Json::Value const& val)
14 {
15 int key = val["key"].asInt ();
16 std::string data = val["data"].asString ();
17 printf ("got %d:%s\n", key, data.c_str ());
18 }
這個測試程序可以通過在控制台手工輸入數據來驅動,也可以通過測試數據文件來驅動,下面的 awk 腳本用來製造符合格式的測試數據:
epoll_gen.awk
1 #! /bin/awk -f
2 BEGIN {
3 WORDNUM = 1000
4 for (i = 1; i <= WORDNUM; i++) {
5 printf("%d %s\n", randint(WORDNUM), randword(20))
6 }
7 }
8
9 # randint(n): return a random integer number which is >= 1 and <= n
10 function randint(n) {
11 return int(n *rand()) + 1
12 }
13
14 # randlet(): return a random letter, which maybe upper, lower or number.
15 function randlet() {
16 return substr("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", randint(62), 1)
17 }
18
19 # randword(LEN): return a rand word with a length of LEN
20 function randword(LEN) {
21 randw=""
22 for( j = 1; j <= LEN; j++) {
23 randw=randw randlet()
24 }
25 return randw
26 }
生成的測試文件格式如下:
238 s0jKlYkEjwE4q3nNJugF
568 0cgNaSgDpP3VS45x3Wum
996 kRF6SgmIReFmrNBcCecj
398 QHQqCrB5fC61hao1BV2x
945 XZ6KLtA4jZTEnhcAugAM
619 WE95NU7FnsYar4wz279j
549 oVCTmD516yvmtuJB2NG3
840 NDAaL5vpzp8DQX0rLRiV
378 jONIm64AN6UVc7uTLIIR
251 EqSBOhc40pKXhCbCu8Ey
整個工程編譯的話就是一個 CMakeLists 文件,可以通過 cmake 生成對應的 Makefile 或 VS solution 來編譯代碼:
CMakeLists.txt
1 cmake_minimum_required(VERSION 3.0)
2 project(epoll_svc)
3 include_directories(../core ../include)
4 set(CMAKE_CXX_FLAGS "-std=c++11 -pthread -g -Wall ${CMAKE_CXX_FLAGS}")
5 link_directories(${PROJECT_SOURCE_DIR}/../lib)
6 set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/../bin)
7
8 add_executable (epoll_svc epoll_svc.cpp svc_handler.cpp ../core/EventBase.cpp ../core/EventHandler.cpp ../core/log.cpp)
9 IF (WIN32)
10 target_link_libraries(epoll_svc jsoncpp ws2_32)
11 ELSE ()
12 target_link_libraries(epoll_svc jsoncpp rt)
13 ENDIF ()
14
15 add_executable (epoll_clt epoll_clt.cpp clt_handler.cpp ../core/EventBase.cpp ../core/EventBaseAR.cpp ../core/EventHandler.cpp ../core/log.cpp)
16 target_compile_definitions(epoll_clt PUBLIC -D TEST_READ)
17 IF (WIN32)
18 target_link_libraries(epoll_clt jsoncpp ws2_32)
19 ELSE ()
20 target_link_libraries(epoll_clt jsoncpp rt)
21 ENDIF ()
22
23 add_executable (epoll_local epoll_local.cpp)
24 IF (WIN32)
25 target_link_libraries(epoll_local jsoncpp ws2_32)
26 ELSE ()
27 target_link_libraries(epoll_local jsoncpp rt)
28 ENDIF ()
這個項目包含三個編譯目標,分別是 epoll_svc 、epoll_clt 與 epoll_local,其中前兩個可以跨平台編譯,后一個只能在 Linux 平台編譯,用來驗證 epoll 的一些特性。
編譯完成后,首先運行服務端:
>./epoll_svc 1025
然後運行客戶端:
>./epoll_clt 1025 < demo
測試多個客戶端同時連接,可以使用下面的腳本:
epoll_start.sh
1 #! /bin/bash
2 # /bin/sh -> /bin/dash, do not recognize our for loop
3
4 for((i=0;i<10;i=i+1))
5 do
6 ./epoll_clt 1025 < demo &
7 echo "start $i"
8 done
可以同時啟動 10 個客戶端。
※網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!
透過資料庫的網站架設建置,建立公司的形象或購物系統,並提供最人性化的使用介面,讓使用者能即時接收到相關的資訊
通過 Ctrl+C 退出服務端;通過 Ctrl+C 或 Ctrl+D 退出單個客戶端;
通過下面的腳本來停止多個客戶端與服務端:
epoll_stop.sh
1 #! /bin/sh
2 pkill -INT epoll_clt
3 sleep 1
4 pkill -INT epoll_svc
框架的用法介紹完之後,再簡單遊覽一下這個庫的各層級對外接口。
EventBase.h
1 #pragma once
2
3
4 #include "EventHandler.h"
5 #include <string>
6 #include <map>
7 #include <mutex>
8 #include <condition_variable>
9 #include "thread_group.hpp"
10
11 #define GEV_MAX_BUF_SIZE 65536
12
13 class GEventBase : public IEventBase
14 {
15 public:
16 GEventBase();
17 ~GEventBase();
18
19 #ifdef WIN32
20 virtual HANDLE iocp () const;
21 #else
22 virtual int epfd () const;
23 #endif
24 virtual bool post_timer(GEV_PER_TIMER_DATA *gptd);
25 virtual GEventHandler* create_handler() = 0;
26
27 // thr_num :
28 // =0 - no default thread pool, user provide thread and call run
29 // <0 - use max(|thr_num|, processer_num)
30 // >0 - use thr_num
31 bool init(int thr_num = -8, int blksize = GEV_MAX_BUF_SIZE
32 #ifndef WIN32
33 , int timer_sig = SIGUSR1
34 #endif
35 );
36
37 bool listen(unsigned short port, unsigned short backup = 10);
38 GEventHandler* connect(unsigned short port, GEventHandler* exist_handler = NULL);
39 // PARAM
40 // due_msec: first timeout milliseconds
41 // period_msec: later periodically milliseconds
42 // arg: user provied argument
43 // exist_handler: reuse the timer handler
44 //
45 // RETURN
46 // NULL: failed
47 void* timeout(int due_msec, int period_msec, void *arg, GEventHandler *exist_handler);
48 bool cancel_timer(void* tid);
49 void fini();
50 void run();
51 void exit(int extra_notify = 0);
52 void cleanup();
53
54 protected:
55 #ifdef WIN32
56 bool do_accept(GEV_PER_IO_DATA *gpid);
57 bool do_recv(GEV_PER_HANDLE_DATA *gphd, GEV_PER_IO_DATA *gpid);
58 void do_error(GEV_PER_HANDLE_DATA *gphd);
59
60 int init_socket();
61 bool issue_accept();
62 bool issue_read(GEV_PER_HANDLE_DATA *gphd);
63 bool post_completion(DWORD bytes, ULONG_PTR key, LPOVERLAPPED ol);
64
65 #else
66 bool do_accept(int fd);
67 bool do_recv(conn_key_t key);
68 void do_error(conn_key_t key);
69
70 bool init_pipe();
71 void close_pipe();
72 bool post_notify (char ch, void* ptr = nullptr);
73 void promote_leader (std::unique_lock<std::mutex> &guard);
74
75 GEventHandler* find_by_key (conn_key_t key, bool erase);
76 GEventHandler* find_by_fd (int fd, conn_key_t &key, bool erase);
77
78 # ifdef HAS_SIGTHR
79 void sig_proc ();
80 # endif
81 #endif
82
83 bool do_timeout(GEV_PER_TIMER_DATA *gptd);
84
85 virtual bool on_accept(GEV_PER_HANDLE_DATA *gphd);
86 virtual bool on_read(GEventHandler *h, GEV_PER_IO_DATA *gpid);
87 virtual void on_error(GEventHandler *h);
88 virtual bool on_timeout (GEV_PER_TIMER_DATA *gptd);
89
90
91 protected:
92 volatile bool m_running = false;
93 int m_thrnum = 0;
94 int m_blksize = GEV_MAX_BUF_SIZE;
95 std::thread_group m_grp;
96 SOCKET m_listener = INVALID_SOCKET;
97
98 std::mutex m_mutex; // protect m_map
99 std::mutex m_tlock; // protect m_tmap
100 // timer_t may conflict when new timer created after old timer closed
101 //std::map <timer_t, GEventHandler *> m_tmap;
102 std::map <GEV_PER_TIMER_DATA*, GEventHandler *> m_tmap;
103
104 #ifdef WIN32
105 LPFN_ACCEPTEX m_acceptex = nullptr;
106 LPFN_GETACCEPTEXSOCKADDRS m_getacceptexsockaddrs = nullptr;
107 HANDLE m_iocp = NULL;
108 HANDLE m_timerque = NULL;
109
110 std::map<GEV_PER_HANDLE_DATA*, GEventHandler*> m_map;
111 #else
112 int m_ep = -1;
113 int m_pp[2];
114 int m_tsig = 0; // signal number for timer
115
116 std::mutex m_lock; // protect epoll
117 pthread_t m_leader = -1;
118 std::map<conn_key_t, GEventHandler*> m_map;
119 # ifdef HAS_SIGTHR
120 // special thread only cares about signal
121 std::thread *m_sigthr = nullptr;
122 # endif
123 #endif
124 };
- init,它在底層啟動 thr_num 個線程來跑 run 方法;每次 IO 的塊緩衝區大小由 blksize 指定;它內部還創建了對應的 iocp 或 epoll 對象,便於之后加入 socket 句柄進行處理。
- exit,它通知線程池中的所有線程退出等待,windows 上是通過 PostQueuedCompletionStatus,Linux 上是通過在自建的一個 pipe 上寫數據以觸發 epoll 退出(這個 pipe 在 init 中創建並加入 epoll);
- fini,它在所有工作線程退出后,關閉之前創建的對象,清理事件循環用到的資源;
- cleanup,它清理之前建立的 fd-handler 映射,清理遺留的處理器並釋放資源;
- run,它是線程池運行函數,windows 上是通過 GetQueuedCompletionStatus 在 iocp 上等待;在 linux 上是通過 epoll_wait 在 epoll 上等待事件。當有事件產生后,根據事件類型,分別調用 do_accept / on_accept、do_recv / on_read、do_error / on_error 回調來分派事件;
- listen,創建偵聽 socket 並加入到 iocp 或 epoll 中;
- connect,連接到遠程服務並將成功連接的 socket 加入到 iocp 或 epoll 中;
- timeout,設置定時器事件,windows 上是通過 CreateTimerQueueTimer 實現定時器超時;linux 則是通過 timer_create 實現的,都是系統現成的東西,只不過在系統定時器到期后,給對應的 iocp 或 epoll 對象發送了一個通知而已,在 linux 上這個通知機制是上面提到過的 pipe 來實現的,因而有一定延遲,不能指定精度太小的定時器;
- cancel_timer,取消之前設置的定時器。
然後看下 GEventHandler 提供的回調接口,應用可以從它派生並完成業務相關代碼:
EventHandler.h
1 #pragma once
2 #include "platform.h"
3
4 #ifdef WIN32
5 // must ensure <winsock2.h> precedes <widnows.h> included, to prevent winsock2.h conflict with winsock.h
6 # include <WinSock2.h>
7 # include <Windows.h>
8 # include <mswsock.h> // for LPFN_ACCEPTEX & LPFN_GETACCEPTEXSOCKADDRS later in EventBase.h
9 #else
10 # include <unistd.h> // for close
11 # include <sys/socket.h>
12 # include <sys/epoll.h>
13 # include <sys/time.h>
14 # include <netinet/in.h> // for struct sockaddr_in
15 # include <arpa/inet.h> // for inet_addr/inet_ntoa
16 # include <string.h> // for memset/memcpy
17 # include <signal.h>
18 #endif
19
20 #include <mutex>
21 #include "jsoncpp/json.h"
22
23
24 class GEventHandler;
25 struct GEV_PER_TIMER_DATA;
26 class IEventBase
27 {
28 public:
29 #ifdef WIN32
30 virtual HANDLE iocp () const = 0;
31 #else
32 virtual int epfd () const = 0;
33 #endif
34
35 virtual void* timeout(int due_msec, int period_msec, void *arg, GEventHandler *exist_handler) = 0;
36 virtual bool cancel_timer(void* tid) = 0;
37 virtual bool post_timer(GEV_PER_TIMER_DATA *gptd) = 0;
38 };
39
40
41 #ifdef WIN32
42 enum GEV_IOCP_OP
43 {
44 OP_TIMEOUT = 1,
45 OP_ACCEPT,
46 OP_RECV,
47 };
48 #else
49 // the purpose of this key is to distinguish different connections with same fd !
50 // (when connection break and re-established soon, fd may not change but port will change)
51 struct conn_key_t
52 {
53 int fd;
54 unsigned short lport;
55 unsigned short rport;
56
57 conn_key_t (int f, unsigned short l, unsigned short r);
58 bool operator< (struct conn_key_t const& rhs) const;
59 };
60 #endif
61
62
63 struct GEV_PER_HANDLE_DATA
64 {
65 SOCKET so;
66 SOCKADDR_IN laddr;
67 SOCKADDR_IN raddr;
68
69 #ifndef WIN32
70 conn_key_t key () const;
71 #endif
72
73 GEV_PER_HANDLE_DATA(SOCKET s, SOCKADDR_IN *l, SOCKADDR_IN *r);
74 virtual ~GEV_PER_HANDLE_DATA();
75 };
76
77 struct GEV_PER_IO_DATA
78 {
79 SOCKET so;
80 #ifdef WIN32
81 GEV_IOCP_OP op;
82 OVERLAPPED ol;
83 WSABUF wsa; // wsa.len is buffer length
84 DWORD bytes; // after compeleted, bytes trasnfered
85 #else
86 char *buf;
87 int len;
88 #endif
89
90 GEV_PER_IO_DATA(
91 #ifdef WIN32
92 GEV_IOCP_OP o,
93 #endif
94 SOCKET s, int l);
95 virtual ~GEV_PER_IO_DATA();
96 };
97
98 struct GEV_PER_TIMER_DATA
99 #ifdef WIN32
100 : public GEV_PER_IO_DATA
101 #endif
102 {
103 IEventBase *base;
104 int due_msec;
105 int period_msec;
106 void *user_arg;
107 bool cancelled;
108 #ifdef WIN32
109 HANDLE timerque;
110 HANDLE timer;
111 #else
112 timer_t timer;
113 #endif
114
115 GEV_PER_TIMER_DATA(IEventBase *base, int due, int period, void *arg
116 #ifdef WIN32
117 , HANDLE tq);
118 #else
119 , timer_t tid);
120 #endif
121
122 virtual ~GEV_PER_TIMER_DATA();
123 void cancel ();
124 };
125
126 class GEventHandler
127 {
128 public:
129 GEventHandler();
130 virtual ~GEventHandler();
131
132 GEV_PER_HANDLE_DATA* gphd();
133 GEV_PER_TIMER_DATA* gptd();
134 bool connected();
135 void disconnect();
136 void clear();
137 SOCKET fd();
138
139 int send(char const* buf, int len);
140 int send(std::string const& str);
141
142 virtual bool reuse();
143 virtual bool auto_reconnect();
144 virtual void arg(void *param) = 0;
145 virtual void reset(GEV_PER_HANDLE_DATA *gphd, GEV_PER_TIMER_DATA *gptd, IEventBase *base);
146 virtual bool on_read(GEV_PER_IO_DATA *gpid) = 0;
147 virtual void on_error(GEV_PER_HANDLE_DATA *gphd);
148 // note when on_timeout called, handler's base may cleared by cancel_timer, use gptd->base instead if it is not null.
149 virtual bool on_timeout(GEV_PER_TIMER_DATA *gptd) = 0;
150 virtual void cleanup(bool terminal);
151 void close(bool terminal);
152
153 protected:
154 GEV_PER_HANDLE_DATA *m_gphd = nullptr;
155 GEV_PER_TIMER_DATA *m_gptd = nullptr;
156 IEventBase *m_base = nullptr;
157 // us so instead of m_gphd,
158 // as the later one may destroyed during using..
159 SOCKET m_so;
160 };
161
162 // a common handler to process json protocol.
163 class GJsonEventHandler : public GEventHandler
164 {
165 public:
166 //virtual void on_read();
167 virtual void arg(void *param);
168 virtual void reset(GEV_PER_HANDLE_DATA *gphd, GEV_PER_TIMER_DATA *gptd, IEventBase *base);
169 virtual bool on_read(GEV_PER_IO_DATA *gpid);
170 virtual void on_read_msg(Json::Value const& root) = 0;
171 virtual bool on_timeout(GEV_PER_TIMER_DATA *gptd);
172 virtual void cleanup(bool terminal);
173
174 protected:
175 // protect m_stub to prevent multi-entry
176 #ifdef HAS_ET
177 std::mutex m_mutex;
178 #endif
179
180 std::string m_stub;
181 };
這裏主要有兩個類,GEventHandler 處理通用的基於流的數據;GJsonEventHandler 處理基於 json 格式的數據。
前者需要重寫 on_read 方法來處理塊數據;後者需要重寫 on_read_msg 方法來處理 json 數據。
目前 json 的解析是通過 jsoncpp 庫完成的,這個庫本身是跨平台的(本 git 庫僅提供 64 位 Linux 靜態鏈接庫及 VS2013 的 32 位 Release 版本 Windows 靜態庫)。
svc_handler 與 clt_handler 均從 GJsonEventHandler 派生。
如果有新的流格式需要處理 ,只需要從 GEventHandler 類派生新的處理類即可。
除了讀取連接上的數據,還有其它一些重要的回調接口,列明如下:
- on_read,連接上有數據到達;
- on_error,連接斷開;
- on_tmeout,定時器事件;
- ……
如果有新的事件需要處理 ,也可以在這裏擴展。
最後看下 GEventBaseWithAutoReconnect 提供的與自動重連相關的接口:
EventBaseAR.h
1 #pragma once
2
3
4 #include "EventBase.h"
5 #include <thread>
6
7 #define GEV_RECONNECT_TIMEOUT 2 // seconds
8 #define GEV_MAX_RECONNECT_TIMEOUT 256 // seconds
9
10 class GEventBaseWithAutoReconnect : public GEventBase
11 {
12 public:
13 GEventBaseWithAutoReconnect(int reconn_min = GEV_RECONNECT_TIMEOUT, int reconn_max = GEV_MAX_RECONNECT_TIMEOUT);
14 ~GEventBaseWithAutoReconnect();
15
16 bool do_connect(unsigned short port, void *arg);
17 GEventHandler* connector();
18
19 protected:
20 virtual void on_error(GEventHandler *h);
21 virtual bool on_timeout(GEV_PER_TIMER_DATA *gptd);
22
23 virtual void on_connect_break();
24 virtual bool on_connected(GEventHandler *app);
25
26 protected:
27 void do_reconnect(void *arg);
28
29 protected:
30 unsigned short m_port;
31 GEventHandler* m_app;
32 GEventHandler* m_htimer;
33 void* m_timer;
34 int m_reconn_min;
35 int m_reconn_max;
36 int m_reconn_curr;
37 };
其實比較簡單,只比 GEventBase 類多了一個 do_connect 方法,來擴展 connect 不能自動重連的問題。
底層的話,是通過定時器來實現指數後退重連算法的。
最後,如果你還是感到雲里霧裡的,可以參考一下下面的類結構圖:
黑色標註的是框架提供的類,紅色是服務端派生的類,藍色是客戶端派生的類,其實 GMyEventBase 的唯一作用就是將 svc_handler 與 clt_handler 分別引入各自的框架中,
所以用戶的關注點主要還是在派生自己的 GEventHandler 類,並在其中的回調接口中處理數據就可以了。
後記
這個框架已經應用到我司的公共產品中,併為數個 tcp 服務提供底層支撐,經過百萬級別用戶機器驗證,運行穩定性還是可以的,所以當得起“工業級”這三個字。
前面在說到開源庫的選型時還留了一個口子沒有交待,這裏一併說下。
其實最早的重構版本是使用 libevent 來實現的,但是發現它在 windows 上使用的是低效的 select,
而且為了增加、刪除句柄,它又使用了一種 self-pipe-trick 的技巧,簡單說來的就是下面的代碼序列:
listen (listen_fd, 1);
……
connect (connect_fd, &addr, size);
……
accept_fd = accept (listen_fd, &addr, &size);
在缺乏 pipe 調用的 win32 環境製造了一個 socket 自連接,從而進行一些通知。
這一步是必要的,如果不能成功連接就會導致整個 libevent 初始化失敗,從而運行不起來。
不巧的是,在一些 windows 機器上(約佔用戶總量 10%),由於防火牆設置嚴格,上述 listen 與 connect 調用可以成功,
但是 accept 會失敗返回,從而導致整個服務退出 (防火牆會嚴格禁止不在白名單上偵聽的端口的連接)。
對於已知端口,可以通過在防火牆上設置白名單來避免,但是對於這種隨機 listen 的端口,真的是太難了,基本無解。
回頭考察了一下 asio,windows 上使用的是 iocp,自然沒有這個自連接;
ACE 有多種實現可供選擇,如果使用 ACE_Select_Reactor / ACE_TP_Reactor 是會有這個自連接,
但是你可以選擇其它實現,如基於 WaitForMultipleEvents 的 ACE_WFMO_Reactor(最大隻支持 62 個句柄,放棄),
或基於 iocp 的 ACE_Proactor (前攝式,與反應式在編程上稍有不同,更接近於 asio)就沒有這個自連接。
再說的深一點,其實公司最早的網絡庫使用的就是基於 boost 的 asio,大量的使用了 c++ 模板,
有時候產生了一些崩潰,但是根據 dump 完全無法定位崩潰點(各種冗長的模板展開名稱),
導致了一些頑固的已知 bug 一直找不到崩潰點而無法解決(雖然量不大),所以才有了要去重新選型網絡庫以及後來這一系列的東西。
本來一開始我是想用 ACE 的,因為我讀過這個庫的源碼,對裏面所有的東西都非常熟悉,
但是看看 ACE 小 5 MB 的 dll 尺寸,還是放棄了(產品本身安裝包也就這麼大吧),
對於一個公司底層的公共組件,被各種產品攜帶,需要嚴格控制“體重”
(後來聽說 ACE 按功能拆分了代碼模塊,你只需要選自己依賴的部分即可,不過我還沒有試過)。
使用這個庫代替之前的 boost::asio 后,我還有一個意外收穫,就是編譯出來的 dll 尺寸明顯小了很多,700 K -> 500 K 的樣子,看來所謂模板膨脹是真有其事……
最後奉上 gevent 的 github 鏈接,歡迎有相同需求的小夥伴前來“復刻” :
https://github.com/goodpaperman/gevent
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
※網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!
當全世界的人們隨著網路時代而改變向上時您還停留在『網站美醜不重要』的舊有思維嗎?機會是留給努力改變現況的人們,別再浪費一分一秒可以接觸商機的寶貴時間!