1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.channel.pipeline; 12 13 import std.typecons; 14 import std.variant; 15 import std.functional; 16 import std.range.primitives; 17 18 import collie.channel.handler; 19 import collie.channel.handlercontext; 20 import collie.channel.exception; 21 import collie.net; 22 import kiss.event; 23 24 interface PipelineManager 25 { 26 void deletePipeline(PipelineBase pipeline); 27 void refreshTimeout(); 28 } 29 30 abstract class PipelineBase 31 { 32 this() 33 { 34 } 35 36 ~this() 37 { 38 } 39 40 pragma(inline) 41 @property final void pipelineManager(PipelineManager manager) 42 { 43 _manager = manager; 44 } 45 46 pragma(inline,true) 47 @property final PipelineManager pipelineManager() 48 { 49 return _manager; 50 } 51 52 pragma(inline) 53 final void deletePipeline() 54 { 55 if (_manager) 56 { 57 _manager.deletePipeline(this); 58 } 59 } 60 61 pragma(inline) 62 @property final void transport(Channel transport) 63 { 64 _transport = transport; 65 } 66 67 pragma(inline,true) 68 @property final Channel transport() 69 { 70 return _transport; 71 } 72 73 pragma(inline) 74 final PipelineBase addBack(H)(H handler) 75 { 76 return addHelper(new ContextType!(H)(this, handler), false); 77 } 78 79 pragma(inline) 80 final PipelineBase addFront(H)(H handler) 81 { 82 return addHelper(new ContextType!(H)(this, handler), true); 83 } 84 85 pragma(inline) 86 final PipelineBase remove(H)(H handler) 87 { 88 return removeHelper!H(handler, true); 89 } 90 91 pragma(inline) 92 final PipelineBase remove(H)() 93 { 94 return removeHelper!H(null, false); 95 } 96 97 pragma(inline) 98 final PipelineBase removeFront() 99 { 100 if (_ctxs.empty()) 101 { 102 throw new PipelineEmptyException("No handlers in pipeline"); 103 } 104 removeAt(0); 105 return this; 106 } 107 108 pragma(inline) 109 final PipelineBase removeBack() 110 { 111 if (_ctxs.empty()) 112 { 113 throw new PipelineEmptyException("No handlers in pipeline"); 114 } 115 removeAt(_ctxs.length - 1); 116 return this; 117 } 118 119 pragma(inline) 120 final auto getHandler(H)(int i) 121 { 122 getContext!H(i).handler; 123 } 124 125 final auto getHandler(H)() 126 { 127 auto ctx = getContext!H(); 128 if (ctx) 129 return ctx.handler; 130 return null; 131 } 132 133 pragma(inline) 134 auto getContext(H)(int i) 135 { 136 auto ctx = cast(ContextType!H)(_ctxs[i]); 137 assert(ctx); 138 return ctx; 139 } 140 141 auto getContext(H)() 142 { 143 foreach (i; 0 .. _ctxs.length) 144 { 145 auto tctx = _ctxs.at(i); 146 auto ctx = cast(ContextType!H)(tctx); 147 if (ctx) 148 return ctx; 149 } 150 return null; 151 } 152 153 void finalize(); 154 155 final void detachHandlers() 156 { 157 foreach (i; 0 .. _ctxs.length) 158 { 159 auto ctx = _ctxs[i]; 160 ctx.detachPipeline(); 161 } 162 } 163 164 protected: 165 PipelineContext[] _ctxs; 166 PipelineContext[] _inCtxs; 167 PipelineContext[] _outCtxs; 168 169 bool _isFinalize = true; 170 private: 171 PipelineManager _manager = null; 172 Channel _transport; 173 // AsynTransportlogInfo _transportlogInfo; 174 175 final PipelineBase addHelper(Context)(Context ctx, bool front) 176 { 177 PipelineContext[] addBefore(PipelineContext[] ctxs, Context ctx){ 178 auto tctxs = new PipelineContext[ctxs.length + 1]; 179 tctxs[0] = ctx; 180 tctxs[1..$] = ctxs[0..$]; 181 return tctxs; 182 183 } 184 _isFinalize = false; 185 front ? _ctxs = addBefore(_ctxs,ctx) : _ctxs ~= ctx; 186 if (Context.dir == HandlerDir.BOTH || Context.dir == HandlerDir.IN) 187 { 188 front ? _inCtxs = addBefore(_inCtxs,ctx) : _inCtxs ~= ctx; 189 //front ? _inCtxs.insertBefore(ctx) : _inCtxs.insertBack(ctx); 190 } 191 if (Context.dir == HandlerDir.BOTH || Context.dir == HandlerDir.OUT) 192 { 193 front ? _outCtxs = addBefore(_outCtxs,ctx) : _outCtxs ~= ctx; 194 //front ? _outCtxs.insertBefore(ctx) : _outCtxs.insertBack(ctx); 195 } 196 return this; 197 } 198 199 final PipelineBase removeHelper(H)(H handler, bool checkEqual) 200 { 201 bool removed = false; 202 203 for (size_t i = 0; i < _ctxs.length; ++i) 204 { 205 auto ctx = cast(ContextType!H) _ctxs[i]; 206 if (ctx && (!checkEqual || ctx.getHandler() == handler)) 207 { 208 removeAt(i); 209 removed = true; 210 --i; 211 break; 212 } 213 } 214 if (!removed) 215 { 216 throw new HandlerNotInPipelineException("No such handler in pipeline"); 217 } 218 219 return *this; 220 } 221 222 final void removeAt(size_t site) 223 { 224 import kiss.container.array; 225 _isFinalize = false; 226 PipelineContext rctx = _ctxs[site]; 227 rctx.detachPipeline(); 228 removeSite(_ctxs,site); 229 //_ctxs.removeSite(site); 230 231 import std.algorithm.searching; 232 233 const auto dir = rctx.getDirection(); 234 if (dir == HandlerDir.BOTH || dir == HandlerDir.IN) 235 { 236 arrayRemove(_inCtxs,rctx,true); 237 // _inCtxs.removeOne(rctx); 238 } 239 240 if (dir == HandlerDir.BOTH || dir == HandlerDir.OUT) 241 { 242 arrayRemove(_inCtxs,rctx,true); 243 //_outCtxs.removeOne(rctx); 244 } 245 } 246 } 247 248 /* 249 * R is the inbound type, i.e. inbound calls start with pipeline.read(R) 250 * W is the outbound type, i.e. outbound calls start with pipeline.write(W) 251 * 252 * Use Unit for one of the types if your pipeline is unidirectional. 253 * If R is void, read(), will be disabled. 254 * If W is Unit, write() and close() will be disabled. 255 */ 256 257 final class Pipeline(R, W = void) : PipelineBase 258 { 259 alias Ptr = Pipeline!(R, W); 260 261 static Ptr create() 262 { 263 return new Ptr(); 264 } 265 266 ~this() 267 { 268 // if (!_isStatic) // USE GC, maybe the contex will free before pipeline 269 // { 270 // detachHandlers(); 271 // } 272 } 273 274 pragma(inline) 275 void read(R msg) 276 { 277 static if (!is(R == void)) 278 { 279 if (_front) 280 _front.read(msg); 281 else 282 throw new NotHasInBoundException("read(): not have inbound handler in Pipeline"); 283 } 284 } 285 286 pragma(inline,true) 287 void timeOut() 288 { 289 static if (!is(R == void)) 290 { 291 if (_front) 292 _front.timeOut(); 293 else 294 throw new NotHasInBoundException("timeOut(): not have inbound handler in Pipeline"); 295 } 296 } 297 298 pragma(inline) 299 void transportActive() 300 { 301 static if (!is(R == void)) 302 { 303 if (_front) 304 { 305 _front.transportActive(); 306 } 307 } 308 } 309 310 pragma(inline) 311 void transportInactive() 312 { 313 static if (!is(R == void)) 314 { 315 if (_front) 316 { 317 _front.transportInactive(); 318 } 319 } 320 } 321 322 static if (!is(W == void)) 323 { 324 alias TheCallBack = void delegate(W, size_t); 325 pragma(inline) 326 void write(W msg, TheCallBack cback = null) 327 { 328 329 if (_back) 330 _back.write(msg, cback); 331 else 332 throw new NotHasOutBoundException("close(): no outbound handler in Pipeline"); 333 } 334 } 335 336 pragma(inline) 337 void close() 338 { 339 static if (!is(W == void)) 340 { 341 if (_back) 342 _back.close(); 343 else 344 throw new NotHasOutBoundException("close(): no outbound handler in Pipeline"); 345 } 346 } 347 348 override void finalize() 349 { 350 if (_isFinalize) 351 return; 352 _front = null; 353 static if (!is(R == void)) 354 { 355 if (!_inCtxs.empty()) 356 { 357 _front = cast(InboundLink!R)(_inCtxs[0]); 358 for (size_t i = 0; i < _inCtxs.length - 1; i++) 359 { 360 _inCtxs[i].setNextIn(_inCtxs[i + 1]); 361 } 362 _inCtxs[_inCtxs.length - 1].setNextIn(null); 363 } 364 } 365 366 _back = null; 367 static if (!is(W == void)) 368 { 369 370 if (!_outCtxs.empty()) 371 { 372 _back = cast(OutboundLink!W)(_outCtxs[_outCtxs.length - 1]); 373 for (size_t i = _outCtxs.length - 1; i > 0; --i) 374 { 375 _outCtxs[i].setNextOut(_outCtxs[i - 1]); 376 } 377 _outCtxs[0].setNextOut(null); 378 } 379 } 380 381 for (int i = 0; i < _ctxs.length; ++i) 382 { 383 _ctxs[i].attachPipeline(); 384 } 385 386 if (_front is null && _back is null) 387 throw new PipelineEmptyException("No Handler in the Pipeline"); 388 389 _isFinalize = true; 390 } 391 392 protected: 393 this() 394 { 395 super(); 396 } 397 398 this(bool isStatic) 399 { 400 _isStatic = isStatic; 401 super(); 402 } 403 404 private: 405 bool _isStatic = false; 406 407 static if (!is(R == void)) 408 { 409 InboundLink!R _front = null; 410 } 411 else 412 { 413 Object _front = null; 414 } 415 416 static if (!is(W == void)) 417 { 418 OutboundLink!W _back = null; 419 } 420 else 421 { 422 Object _back = null; 423 } 424 } 425 426 abstract shared class PipelineFactory(PipeLine) 427 { 428 PipeLine newPipeline(TcpStream transport); 429 } 430 431 alias AcceptPipeline = Pipeline!(TcpStream, uint); 432 abstract shared class AcceptPipelineFactory 433 { 434 AcceptPipeline newPipeline(TcpListener acceptor); 435 }