1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.bootstrap.server;
12 import kiss.log;
13 import collie.net;
14 import collie.channel;
15 import collie.bootstrap.serversslconfig;
16 public import collie.bootstrap.exception;
17 
18 
19 import std.exception;
20 
21 final class ServerBootstrap(PipeLine)
22 {
23     this()
24     {
25         _loop = new EventLoop();
26     }
27 
28     this(EventLoop loop)
29     {
30         _loop = loop;
31     }
32 
33     auto pipeline(shared AcceptPipelineFactory factory)
34     {
35         _acceptorPipelineFactory = factory;
36         return this;
37     }
38 
39     auto setSSLConfig(ServerSSLConfig config)
40     {
41 	       _sslConfig = config;
42 	       return this;
43     }
44 
45     auto childPipeline(shared PipelineFactory!PipeLine factory)
46     {
47         _childPipelineFactory = factory;
48         return this;
49     }
50 
51     auto group(EventLoopGroup group)
52     {
53         _group = group;
54         return this;
55     }
56 
57     auto setReusePort(bool ruse)
58     {
59         _rusePort = ruse;
60         return this;
61     }
62 
63     /**
64             The Value will be 0 or 5s ~ 1800s.
65             0 is disable, 
66             if(value < 5) value = 5;
67             if(value > 3000) value = 1800;
68         */
69     auto heartbeatTimeOut(uint second)
70     {
71         _timeOut = second;
72         _timeOut = _timeOut < 5 ? 5 : _timeOut;
73         _timeOut = _timeOut > 1800 ? 1800 : _timeOut;
74 
75         return this;
76     }
77 
78     void bind(Address addr)
79     {
80         _address = addr;
81     }
82 
83     void bind(ushort port)
84     {
85         _address = new InternetAddress(port);
86     }
87 
88     void bind(string ip, ushort port)
89     {
90         _address = new InternetAddress(ip, port);
91     }
92 
93     void stopListening()
94     {
95 		if (!_listening)
96             return;
97 		scope(exit)_listening = false;
98         foreach (ref accept; _serverlist)
99         {
100             accept.stop();
101         }
102         _mainAccept.stop();
103 
104     }
105 
106 	void stop()
107 	{
108 		if(!_isLoopWait) return;
109 		scope(exit) _isLoopWait = false;
110 		_group.stop();
111 		_loop.stop();
112 	}
113 
114     void join()
115     {
116 		if (!_isLoopWait)
117             return;
118         if (_group)
119             _group.wait();
120     }
121 
122     void waitForStop()
123     {
124 		if(_isLoopWait)
125 			throw new ServerIsRuningException("server is runing!");
126 		if(!_listening)
127 			startListening();
128 		_isLoopWait = true;
129 		if(_group)
130 			_group.start();
131 		_loop.join();
132     }
133 
134 	void startListening()
135 	{
136 		if (_listening)
137 			throw new ServerIsListeningException("server is listening!");
138 		if (_address is null || _childPipelineFactory is null)
139 			throw new ServerStartException("the address or childPipelineFactory is null!");
140 
141 		_listening = true;
142 		uint wheel, time;
143 		bool beat = getTimeWheelConfig(wheel, time);
144 		_mainAccept = creatorAcceptor(_loop);
145 		_mainAccept.initialize();
146 		if (beat)
147 			_mainAccept.startTimingWhile(wheel, time);
148 		if (_group)
149 		{
150 			foreach (loop; _group)
151 			{
152 				auto acceptor = creatorAcceptor(loop);
153 				acceptor.initialize();
154 				_serverlist ~= acceptor;
155 				if (beat)
156 					acceptor.startTimingWhile(wheel, time);
157 			}
158 		}
159 		logDebug("server _listening!");
160 	}
161 
162 	EventLoopGroup group(){return _group;}
163 
164 	@property EventLoop eventLoop(){return _loop;}
165 
166 	@property Address address(){return _address;}
167 
168 protected:
169     auto creatorAcceptor(EventLoop loop)
170     {
171         auto acceptor = new TcpListener(loop, _address.addressFamily);
172 		if(_rusePort)
173         	acceptor.reusePort = _rusePort;
174         acceptor.bind(_address);
175         acceptor.listen(1024);
176 		{
177 			Linger optLinger;
178 			optLinger.on = 1;
179 			optLinger.time = 0;
180 			acceptor.setOption(SocketOptionLevel.SOCKET, SocketOption.LINGER, optLinger);
181 		}
182         AcceptPipeline pipe;
183         if (_acceptorPipelineFactory)
184             pipe = _acceptorPipelineFactory.newPipeline(acceptor);
185         else
186             pipe = AcceptPipeline.create();
187 
188         SSL_CTX* ctx = null;
189 		version(USE_SSL)
190         {
191             if (_sslConfig)
192             {
193                 ctx = _sslConfig.generateSSLCtx();
194                 if (!ctx)
195 					throw new SSLException("Can not gengrate SSL_CTX");
196             }
197         }
198 
199         return new ServerAcceptor!(PipeLine)(acceptor, pipe, _childPipelineFactory,
200             ctx);
201     }
202 
203     bool getTimeWheelConfig(out uint whileSize, out uint time)
204     {
205         if (_timeOut == 0)
206             return false;
207         if (_timeOut <= 40)
208         {
209             whileSize = 50;
210             time = _timeOut * 1000 / 50;
211         }
212         else if (_timeOut <= 120)
213         {
214             whileSize = 60;
215             time = _timeOut * 1000 / 60;
216         }
217         else if (_timeOut <= 600)
218         {
219             whileSize = 100;
220             time = _timeOut * 1000 / 100;
221         }
222         else if (_timeOut < 1000)
223         {
224             whileSize = 150;
225             time = _timeOut * 1000 / 150;
226         }
227         else
228         {
229             whileSize = 180;
230             time = _timeOut * 1000 / 180;
231         }
232         return true;
233     }
234 
235 private:
236     shared AcceptPipelineFactory _acceptorPipelineFactory;
237     shared PipelineFactory!PipeLine _childPipelineFactory;
238 
239     ServerAcceptor!(PipeLine) _mainAccept;
240     EventLoop _loop;
241 
242     ServerAcceptor!(PipeLine)[] _serverlist;
243     EventLoopGroup _group;
244 
245 	bool _listening = false;
246     bool _rusePort = true;
247 	bool _isLoopWait = false;
248     uint _timeOut = 0;
249     Address _address;
250 
251     ServerSSLConfig _sslConfig = null;
252 }
253 
254 private:
255 
256 import std.functional;
257 import kiss.timingwheel;
258 import collie.utils.memory;
259 import collie.net;
260 
261 final @trusted class ServerAcceptor(PipeLine) : InboundHandler!(Socket)
262 {
263     this(TcpListener acceptor, AcceptPipeline pipe,
264         shared PipelineFactory!PipeLine clientPipeFactory, SSL_CTX* ctx = null)
265     {
266         _acceptor = acceptor;
267         _pipeFactory = clientPipeFactory;
268         pipe.addBack(this);
269         pipe.finalize();
270         _pipe = pipe;
271         _pipe.transport(_acceptor);
272         _acceptor.setReadHandle(&acceptCallBack);
273         _sslctx = ctx;
274 		_list = new ServerConnection!PipeLine();
275 		version(USE_SSL)
276 			_sharkList = new SSLHandShark();
277     }
278 
279     pragma(inline, true) void initialize()
280     {
281         _pipe.transportActive();
282     }
283 
284     pragma(inline, true) void stop()
285     {
286         _pipe.transportInactive();
287     }
288 
289     override void read(Context ctx, Socket msg)
290     {
291 		version(USE_SSL)
292         {
293             if (_sslctx)
294             {
295                     auto ssl = SSL_new(_sslctx);
296 				static if (IOMode == IO_MODE.iocp){
297 					BIO * readBIO = BIO_new(BIO_s_mem());
298 					BIO * writeBIO = BIO_new(BIO_s_mem());
299 					SSL_set_bio(ssl, readBIO, writeBIO);
300 					SSL_set_accept_state(ssl);
301 					auto asynssl = new SSLSocket(_acceptor.eventLoop, msg, ssl,readBIO,writeBIO);
302 				} else {
303                     if (SSL_set_fd(ssl, msg.handle()) < 0)
304                     {
305                         error("SSL_set_fd error: fd = ", msg.handle());
306                         SSL_shutdown(ssl);
307                         SSL_free(ssl);
308                         return;
309                     }
310                     SSL_set_accept_state(ssl);
311                     auto asynssl = new SSLSocket(_acceptor.eventLoop, msg, ssl);
312 				}
313                     auto shark = new SSLHandShark(asynssl, &doHandShark);
314 
315 					shark.next = _sharkList.next;
316 					if(shark.next) shark.next.prev = shark;
317 					shark.prev = _sharkList;
318 					_sharkList.next = shark;
319 
320                     asynssl.start();
321             }
322             else
323             {
324                 auto asyntcp = new TcpStream(_acceptor.eventLoop, msg);
325                 startSocket(asyntcp);
326             }
327         } else
328         {
329             auto asyntcp = new TcpStream(_acceptor.eventLoop, msg);
330             startSocket(asyntcp);
331         }
332     }
333 
334     override void transportActive(Context ctx)
335     {
336         logDebug("acept transportActive");
337         if (!_acceptor.watch())
338         {
339             logError("acceptor start error!");
340         }
341     }
342 
343     override void transportInactive(Context ctx)
344     {
345         _acceptor.close();
346 		auto con = _list.next;
347 		_list.next = null;
348 		while(con) {
349 			auto tcon = con;
350 			con = con.next;
351 			tcon.close();
352 		}
353         _acceptor.eventLoop.stop();
354     }
355 
356 protected:
357     pragma(inline) void remove(ServerConnection!PipeLine conn)
358     {
359 		conn.prev.next = conn.next;
360 		if(conn.next)
361 			conn.next.prev = conn.prev;
362         gcFree(conn);
363     }
364 
365     void acceptCallBack(EventLoop loop, Socket socket)  nothrow 
366     {
367         catchAndLogException(_pipe.read(socket));
368     }
369 
370     pragma(inline, true) @property acceptor()
371     {
372         return _acceptor;
373     }
374 
375     void startTimingWhile(uint whileSize, uint time)
376     {
377         if (_timer)
378             return;
379         _timer = new Timer(_acceptor.eventLoop);
380         _timer.setTimerHandle(&doWheel);
381         _wheel = new TimingWheel(whileSize);
382         _timer.start(time);
383     }
384 
385     void doWheel() nothrow
386     {
387         if (_wheel)
388             _wheel.prevWheel();
389     }
390 
391 	version(USE_SSL)
392     {
393         void doHandShark(SSLHandShark shark, SSLSocket sock)
394         {
395 			shark.prev.next = shark.next;
396 			if(shark.next) shark.next.prev = shark.prev;
397             scope (exit)
398                 delete shark;
399             if (sock)
400             {
401                 sock.setHandshakeCallBack(null);
402                 startSocket(sock);
403             }
404         }
405     }
406 
407     void startSocket(TcpStream sock)
408     {
409         auto pipe = _pipeFactory.newPipeline(sock);
410         if (!pipe)
411         {
412             gcFree(sock);
413             return;
414         }
415         pipe.finalize();
416         auto con = new ServerConnection!PipeLine(pipe);
417         con.serverAceptor = this;
418 
419 		con.next = _list.next;
420 		if(con.next)
421 			con.next.prev = con;
422 		con.prev = _list;
423 		_list.next = con;
424 
425         con.initialize();
426         if (_wheel)
427             _wheel.addNewTimer(con);
428     }
429 
430 private:
431    // int[ServerConnection!PipeLine] _list;
432 	ServerConnection!PipeLine _list;
433 
434 	version(USE_SSL)
435     {
436 		SSLHandShark _sharkList;
437     }
438 
439     TcpListener _acceptor;
440     Timer _timer;
441     TimingWheel _wheel;
442     AcceptPipeline _pipe;
443     shared PipelineFactory!PipeLine _pipeFactory;
444 
445     SSL_CTX* _sslctx = null;
446 }
447 
448 @trusted final class ServerConnection(PipeLine) : WheelTimer, PipelineManager
449 {
450     this(PipeLine pipe)
451     {
452         _pipe = pipe;
453         _pipe.pipelineManager = this;
454     }
455 
456     ~this()
457     {
458     }
459 
460     pragma(inline, true) void initialize()
461     {
462         _pipe.transportActive();
463     }
464 
465     pragma(inline, true) void close()
466     {
467         _pipe.transportInactive();
468     }
469 
470     pragma(inline, true) @property serverAceptor()
471     {
472         return _manger;
473     }
474 
475     pragma(inline, true) @property serverAceptor(ServerAcceptor!PipeLine manger)
476     {
477         _manger = manger;
478     }
479 
480     override void deletePipeline(PipelineBase pipeline)
481     {
482         pipeline.pipelineManager = null;
483         _pipe = null;
484         stop();
485         _manger.remove(this);
486     }
487 
488     override void refreshTimeout()
489     {
490         rest();
491     }
492 
493     override void onTimeOut() nothrow
494     {
495 		collectException(_pipe.timeOut());
496     }
497 private:
498 	this(){}
499 	ServerConnection!PipeLine prev;
500 	ServerConnection!PipeLine next;
501 private:
502     ServerAcceptor!PipeLine _manger;
503     PipeLine _pipe;
504 }
505 
506 version(USE_SSL)
507 {
508     final class SSLHandShark
509     {
510         alias SSLHandSharkCallBack = void delegate(SSLHandShark shark, SSLSocket sock);
511         this(SSLSocket sock, SSLHandSharkCallBack cback)
512         {
513             _socket = sock;
514             _cback = cback;
515             _socket.setCloseCallBack(&onClose);
516             _socket.setReadCallBack(&readCallBack);
517             _socket.setHandshakeCallBack(&handSharkCallBack);
518         }
519 
520         ~this()
521         {
522         }
523 
524     protected:
525         void handSharkCallBack()
526         {
527             logDebug("the ssl handshark over");
528             _cback(this, _socket);
529             _socket = null;
530         }
531 
532         void readCallBack(ubyte[] buffer)
533         {
534         }
535 
536         void onClose()
537         {
538             logDebug("the ssl handshark fail");
539             _socket.setCloseCallBack(null);
540             _socket.setReadCallBack(null);
541             _socket.setHandshakeCallBack(null);
542             _socket = null;
543             _cback(this, _socket);
544         }
545 
546 	private:
547 		this(){}
548 		SSLHandShark prev;
549 		SSLHandShark next;
550     private:
551         SSLSocket _socket;
552         SSLHandSharkCallBack _cback;
553     }
554 }