1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.bootstrap.server; 12 import kiss.log; 13 import collie.net; 14 import collie.channel; 15 import collie.bootstrap.serversslconfig; 16 public import collie.bootstrap.exception; 17 18 19 import std.exception; 20 21 final class ServerBootstrap(PipeLine) 22 { 23 this() 24 { 25 _loop = new EventLoop(); 26 } 27 28 this(EventLoop loop) 29 { 30 _loop = loop; 31 } 32 33 auto pipeline(shared AcceptPipelineFactory factory) 34 { 35 _acceptorPipelineFactory = factory; 36 return this; 37 } 38 39 auto setSSLConfig(ServerSSLConfig config) 40 { 41 _sslConfig = config; 42 return this; 43 } 44 45 auto childPipeline(shared PipelineFactory!PipeLine factory) 46 { 47 _childPipelineFactory = factory; 48 return this; 49 } 50 51 auto group(EventLoopGroup group) 52 { 53 _group = group; 54 return this; 55 } 56 57 auto setReusePort(bool ruse) 58 { 59 _rusePort = ruse; 60 return this; 61 } 62 63 /** 64 The Value will be 0 or 5s ~ 1800s. 65 0 is disable, 66 if(value < 5) value = 5; 67 if(value > 3000) value = 1800; 68 */ 69 auto heartbeatTimeOut(uint second) 70 { 71 _timeOut = second; 72 _timeOut = _timeOut < 5 ? 5 : _timeOut; 73 _timeOut = _timeOut > 1800 ? 1800 : _timeOut; 74 75 return this; 76 } 77 78 void bind(Address addr) 79 { 80 _address = addr; 81 } 82 83 void bind(ushort port) 84 { 85 _address = new InternetAddress(port); 86 } 87 88 void bind(string ip, ushort port) 89 { 90 _address = new InternetAddress(ip, port); 91 } 92 93 void stopListening() 94 { 95 if (!_listening) 96 return; 97 scope(exit)_listening = false; 98 foreach (ref accept; _serverlist) 99 { 100 accept.stop(); 101 } 102 _mainAccept.stop(); 103 104 } 105 106 void stop() 107 { 108 if(!_isLoopWait) return; 109 scope(exit) _isLoopWait = false; 110 _group.stop(); 111 _loop.stop(); 112 } 113 114 void join() 115 { 116 if (!_isLoopWait) 117 return; 118 if (_group) 119 _group.wait(); 120 } 121 122 void waitForStop() 123 { 124 if(_isLoopWait) 125 throw new ServerIsRuningException("server is runing!"); 126 if(!_listening) 127 startListening(); 128 _isLoopWait = true; 129 if(_group) 130 _group.start(); 131 _loop.join(); 132 } 133 134 void startListening() 135 { 136 if (_listening) 137 throw new ServerIsListeningException("server is listening!"); 138 if (_address is null || _childPipelineFactory is null) 139 throw new ServerStartException("the address or childPipelineFactory is null!"); 140 141 _listening = true; 142 uint wheel, time; 143 bool beat = getTimeWheelConfig(wheel, time); 144 _mainAccept = creatorAcceptor(_loop); 145 _mainAccept.initialize(); 146 if (beat) 147 _mainAccept.startTimingWhile(wheel, time); 148 if (_group) 149 { 150 foreach (loop; _group) 151 { 152 auto acceptor = creatorAcceptor(loop); 153 acceptor.initialize(); 154 _serverlist ~= acceptor; 155 if (beat) 156 acceptor.startTimingWhile(wheel, time); 157 } 158 } 159 logDebug("server _listening!"); 160 } 161 162 EventLoopGroup group(){return _group;} 163 164 @property EventLoop eventLoop(){return _loop;} 165 166 @property Address address(){return _address;} 167 168 protected: 169 auto creatorAcceptor(EventLoop loop) 170 { 171 auto acceptor = new TcpListener(loop, _address.addressFamily); 172 if(_rusePort) 173 acceptor.reusePort = _rusePort; 174 acceptor.bind(_address); 175 acceptor.listen(1024); 176 { 177 Linger optLinger; 178 optLinger.on = 1; 179 optLinger.time = 0; 180 acceptor.setOption(SocketOptionLevel.SOCKET, SocketOption.LINGER, optLinger); 181 } 182 AcceptPipeline pipe; 183 if (_acceptorPipelineFactory) 184 pipe = _acceptorPipelineFactory.newPipeline(acceptor); 185 else 186 pipe = AcceptPipeline.create(); 187 188 SSL_CTX* ctx = null; 189 version(USE_SSL) 190 { 191 if (_sslConfig) 192 { 193 ctx = _sslConfig.generateSSLCtx(); 194 if (!ctx) 195 throw new SSLException("Can not gengrate SSL_CTX"); 196 } 197 } 198 199 return new ServerAcceptor!(PipeLine)(acceptor, pipe, _childPipelineFactory, 200 ctx); 201 } 202 203 bool getTimeWheelConfig(out uint whileSize, out uint time) 204 { 205 if (_timeOut == 0) 206 return false; 207 if (_timeOut <= 40) 208 { 209 whileSize = 50; 210 time = _timeOut * 1000 / 50; 211 } 212 else if (_timeOut <= 120) 213 { 214 whileSize = 60; 215 time = _timeOut * 1000 / 60; 216 } 217 else if (_timeOut <= 600) 218 { 219 whileSize = 100; 220 time = _timeOut * 1000 / 100; 221 } 222 else if (_timeOut < 1000) 223 { 224 whileSize = 150; 225 time = _timeOut * 1000 / 150; 226 } 227 else 228 { 229 whileSize = 180; 230 time = _timeOut * 1000 / 180; 231 } 232 return true; 233 } 234 235 private: 236 shared AcceptPipelineFactory _acceptorPipelineFactory; 237 shared PipelineFactory!PipeLine _childPipelineFactory; 238 239 ServerAcceptor!(PipeLine) _mainAccept; 240 EventLoop _loop; 241 242 ServerAcceptor!(PipeLine)[] _serverlist; 243 EventLoopGroup _group; 244 245 bool _listening = false; 246 bool _rusePort = true; 247 bool _isLoopWait = false; 248 uint _timeOut = 0; 249 Address _address; 250 251 ServerSSLConfig _sslConfig = null; 252 } 253 254 private: 255 256 import std.functional; 257 import kiss.timingwheel; 258 import collie.utils.memory; 259 import collie.net; 260 261 final @trusted class ServerAcceptor(PipeLine) : InboundHandler!(Socket) 262 { 263 this(TcpListener acceptor, AcceptPipeline pipe, 264 shared PipelineFactory!PipeLine clientPipeFactory, SSL_CTX* ctx = null) 265 { 266 _acceptor = acceptor; 267 _pipeFactory = clientPipeFactory; 268 pipe.addBack(this); 269 pipe.finalize(); 270 _pipe = pipe; 271 _pipe.transport(_acceptor); 272 _acceptor.setReadHandle(&acceptCallBack); 273 _sslctx = ctx; 274 _list = new ServerConnection!PipeLine(); 275 version(USE_SSL) 276 _sharkList = new SSLHandShark(); 277 } 278 279 pragma(inline, true) void initialize() 280 { 281 _pipe.transportActive(); 282 } 283 284 pragma(inline, true) void stop() 285 { 286 _pipe.transportInactive(); 287 } 288 289 override void read(Context ctx, Socket msg) 290 { 291 version(USE_SSL) 292 { 293 if (_sslctx) 294 { 295 auto ssl = SSL_new(_sslctx); 296 static if (IOMode == IO_MODE.iocp){ 297 BIO * readBIO = BIO_new(BIO_s_mem()); 298 BIO * writeBIO = BIO_new(BIO_s_mem()); 299 SSL_set_bio(ssl, readBIO, writeBIO); 300 SSL_set_accept_state(ssl); 301 auto asynssl = new SSLSocket(_acceptor.eventLoop, msg, ssl,readBIO,writeBIO); 302 } else { 303 if (SSL_set_fd(ssl, msg.handle()) < 0) 304 { 305 error("SSL_set_fd error: fd = ", msg.handle()); 306 SSL_shutdown(ssl); 307 SSL_free(ssl); 308 return; 309 } 310 SSL_set_accept_state(ssl); 311 auto asynssl = new SSLSocket(_acceptor.eventLoop, msg, ssl); 312 } 313 auto shark = new SSLHandShark(asynssl, &doHandShark); 314 315 shark.next = _sharkList.next; 316 if(shark.next) shark.next.prev = shark; 317 shark.prev = _sharkList; 318 _sharkList.next = shark; 319 320 asynssl.start(); 321 } 322 else 323 { 324 auto asyntcp = new TcpStream(_acceptor.eventLoop, msg); 325 startSocket(asyntcp); 326 } 327 } else 328 { 329 auto asyntcp = new TcpStream(_acceptor.eventLoop, msg); 330 startSocket(asyntcp); 331 } 332 } 333 334 override void transportActive(Context ctx) 335 { 336 logDebug("acept transportActive"); 337 if (!_acceptor.watch()) 338 { 339 logError("acceptor start error!"); 340 } 341 } 342 343 override void transportInactive(Context ctx) 344 { 345 _acceptor.close(); 346 auto con = _list.next; 347 _list.next = null; 348 while(con) { 349 auto tcon = con; 350 con = con.next; 351 tcon.close(); 352 } 353 _acceptor.eventLoop.stop(); 354 } 355 356 protected: 357 pragma(inline) void remove(ServerConnection!PipeLine conn) 358 { 359 conn.prev.next = conn.next; 360 if(conn.next) 361 conn.next.prev = conn.prev; 362 gcFree(conn); 363 } 364 365 void acceptCallBack(EventLoop loop, Socket socket) nothrow 366 { 367 catchAndLogException(_pipe.read(socket)); 368 } 369 370 pragma(inline, true) @property acceptor() 371 { 372 return _acceptor; 373 } 374 375 void startTimingWhile(uint whileSize, uint time) 376 { 377 if (_timer) 378 return; 379 _timer = new Timer(_acceptor.eventLoop); 380 _timer.setTimerHandle(&doWheel); 381 _wheel = new TimingWheel(whileSize); 382 _timer.start(time); 383 } 384 385 void doWheel() nothrow 386 { 387 if (_wheel) 388 _wheel.prevWheel(); 389 } 390 391 version(USE_SSL) 392 { 393 void doHandShark(SSLHandShark shark, SSLSocket sock) 394 { 395 shark.prev.next = shark.next; 396 if(shark.next) shark.next.prev = shark.prev; 397 scope (exit) 398 delete shark; 399 if (sock) 400 { 401 sock.setHandshakeCallBack(null); 402 startSocket(sock); 403 } 404 } 405 } 406 407 void startSocket(TcpStream sock) 408 { 409 auto pipe = _pipeFactory.newPipeline(sock); 410 if (!pipe) 411 { 412 gcFree(sock); 413 return; 414 } 415 pipe.finalize(); 416 auto con = new ServerConnection!PipeLine(pipe); 417 con.serverAceptor = this; 418 419 con.next = _list.next; 420 if(con.next) 421 con.next.prev = con; 422 con.prev = _list; 423 _list.next = con; 424 425 con.initialize(); 426 if (_wheel) 427 _wheel.addNewTimer(con); 428 } 429 430 private: 431 // int[ServerConnection!PipeLine] _list; 432 ServerConnection!PipeLine _list; 433 434 version(USE_SSL) 435 { 436 SSLHandShark _sharkList; 437 } 438 439 TcpListener _acceptor; 440 Timer _timer; 441 TimingWheel _wheel; 442 AcceptPipeline _pipe; 443 shared PipelineFactory!PipeLine _pipeFactory; 444 445 SSL_CTX* _sslctx = null; 446 } 447 448 @trusted final class ServerConnection(PipeLine) : WheelTimer, PipelineManager 449 { 450 this(PipeLine pipe) 451 { 452 _pipe = pipe; 453 _pipe.pipelineManager = this; 454 } 455 456 ~this() 457 { 458 } 459 460 pragma(inline, true) void initialize() 461 { 462 _pipe.transportActive(); 463 } 464 465 pragma(inline, true) void close() 466 { 467 _pipe.transportInactive(); 468 } 469 470 pragma(inline, true) @property serverAceptor() 471 { 472 return _manger; 473 } 474 475 pragma(inline, true) @property serverAceptor(ServerAcceptor!PipeLine manger) 476 { 477 _manger = manger; 478 } 479 480 override void deletePipeline(PipelineBase pipeline) 481 { 482 pipeline.pipelineManager = null; 483 _pipe = null; 484 stop(); 485 _manger.remove(this); 486 } 487 488 override void refreshTimeout() 489 { 490 rest(); 491 } 492 493 override void onTimeOut() nothrow 494 { 495 collectException(_pipe.timeOut()); 496 } 497 private: 498 this(){} 499 ServerConnection!PipeLine prev; 500 ServerConnection!PipeLine next; 501 private: 502 ServerAcceptor!PipeLine _manger; 503 PipeLine _pipe; 504 } 505 506 version(USE_SSL) 507 { 508 final class SSLHandShark 509 { 510 alias SSLHandSharkCallBack = void delegate(SSLHandShark shark, SSLSocket sock); 511 this(SSLSocket sock, SSLHandSharkCallBack cback) 512 { 513 _socket = sock; 514 _cback = cback; 515 _socket.setCloseCallBack(&onClose); 516 _socket.setReadCallBack(&readCallBack); 517 _socket.setHandshakeCallBack(&handSharkCallBack); 518 } 519 520 ~this() 521 { 522 } 523 524 protected: 525 void handSharkCallBack() 526 { 527 logDebug("the ssl handshark over"); 528 _cback(this, _socket); 529 _socket = null; 530 } 531 532 void readCallBack(ubyte[] buffer) 533 { 534 } 535 536 void onClose() 537 { 538 logDebug("the ssl handshark fail"); 539 _socket.setCloseCallBack(null); 540 _socket.setReadCallBack(null); 541 _socket.setHandshakeCallBack(null); 542 _socket = null; 543 _cback(this, _socket); 544 } 545 546 private: 547 this(){} 548 SSLHandShark prev; 549 SSLHandShark next; 550 private: 551 SSLSocket _socket; 552 SSLHandSharkCallBack _cback; 553 } 554 }