1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.channel.pipeline;
12 
13 import std.typecons;
14 import std.variant;
15 import std.functional;
16 import std.range.primitives;
17 
18 import collie.channel.handler;
19 import collie.channel.handlercontext;
20 import collie.channel.exception;
21 import collie.net;
22 import kiss.event;
23 
24 interface PipelineManager
25 {
26     void deletePipeline(PipelineBase pipeline);
27     void refreshTimeout();
28 }
29 
30 abstract class PipelineBase
31 {
32     this()
33     {
34     }
35 
36     ~this()
37     {
38     }
39 
40     pragma(inline)
41     @property final void pipelineManager(PipelineManager manager)
42     {
43         _manager = manager;
44     }
45 
46     pragma(inline,true)
47     @property final PipelineManager pipelineManager()
48     {
49         return _manager;
50     }
51 
52     pragma(inline)
53     final void deletePipeline()
54     {
55         if (_manager)
56         {
57             _manager.deletePipeline(this);
58         }
59     }
60 
61     pragma(inline)
62     @property final void transport(Channel transport)
63     {
64         _transport = transport;
65     }
66 
67     pragma(inline,true)
68     @property final Channel transport()
69     {
70         return _transport;
71     }
72 
73     pragma(inline)
74     final PipelineBase addBack(H)(H handler)
75     {
76         return addHelper(new ContextType!(H)(this, handler), false);
77     }
78 
79     pragma(inline)
80     final PipelineBase addFront(H)(H handler)
81     {
82         return addHelper(new ContextType!(H)(this, handler), true);
83     }
84 
85     pragma(inline)
86     final PipelineBase remove(H)(H handler)
87     {
88         return removeHelper!H(handler, true);
89     }
90 
91     pragma(inline)
92     final PipelineBase remove(H)()
93     {
94         return removeHelper!H(null, false);
95     }
96 
97     pragma(inline)
98     final PipelineBase removeFront()
99     {
100         if (_ctxs.empty())
101         {
102 			throw new PipelineEmptyException("No handlers in pipeline");
103         }
104         removeAt(0);
105         return this;
106     }
107 
108     pragma(inline)
109     final PipelineBase removeBack()
110     {
111         if (_ctxs.empty())
112         {
113 			throw new PipelineEmptyException("No handlers in pipeline");
114         }
115         removeAt(_ctxs.length - 1);
116         return this;
117     }
118 
119     pragma(inline)
120     final auto getHandler(H)(int i)
121     {
122         getContext!H(i).handler;
123     }
124 
125     final auto getHandler(H)()
126     {
127         auto ctx = getContext!H();
128         if (ctx)
129             return ctx.handler;
130         return null;
131     }
132 
133     pragma(inline)
134     auto getContext(H)(int i)
135     {
136         auto ctx = cast(ContextType!H)(_ctxs[i]);
137         assert(ctx);
138         return ctx;
139     }
140 
141     auto getContext(H)()
142     {
143         foreach (i; 0 .. _ctxs.length)
144         {
145             auto tctx = _ctxs.at(i);
146             auto ctx = cast(ContextType!H)(tctx);
147             if (ctx)
148                 return ctx;
149         }
150         return null;
151     }
152 
153     void finalize();
154 
155     final void detachHandlers()
156     {
157         foreach (i; 0 .. _ctxs.length)
158         {
159             auto ctx = _ctxs[i];
160             ctx.detachPipeline();
161         }
162     }
163 
164 protected:
165     PipelineContext[] _ctxs;
166     PipelineContext[] _inCtxs;
167     PipelineContext[] _outCtxs;
168 
169     bool _isFinalize = true;
170 private:
171     PipelineManager _manager = null;
172     Channel _transport;
173     //	AsynTransportlogInfo _transportlogInfo;
174 
175     final PipelineBase addHelper(Context)(Context ctx, bool front)
176     {
177         PipelineContext[] addBefore(PipelineContext[] ctxs, Context ctx){
178             auto tctxs = new PipelineContext[ctxs.length + 1];
179             tctxs[0] = ctx;
180             tctxs[1..$] = ctxs[0..$];
181             return tctxs;
182 
183         }
184         _isFinalize = false;
185         front ? _ctxs = addBefore(_ctxs,ctx) : _ctxs ~= ctx;
186         if (Context.dir == HandlerDir.BOTH || Context.dir == HandlerDir.IN)
187         {
188             front ? _inCtxs = addBefore(_inCtxs,ctx) : _inCtxs ~= ctx;
189             //front ? _inCtxs.insertBefore(ctx) : _inCtxs.insertBack(ctx);
190         }
191         if (Context.dir == HandlerDir.BOTH || Context.dir == HandlerDir.OUT)
192         {
193             front ? _outCtxs = addBefore(_outCtxs,ctx) : _outCtxs ~= ctx;
194             //front ? _outCtxs.insertBefore(ctx) : _outCtxs.insertBack(ctx);
195         }
196         return this;
197     }
198 
199     final PipelineBase removeHelper(H)(H handler, bool checkEqual)
200     {
201         bool removed = false;
202 
203         for (size_t i = 0; i < _ctxs.length; ++i)
204         {
205             auto ctx = cast(ContextType!H) _ctxs[i];
206             if (ctx && (!checkEqual || ctx.getHandler() == handler))
207             {
208                 removeAt(i);
209                 removed = true;
210                 --i;
211                 break;
212             }
213         }
214         if (!removed)
215         {
216 			throw new HandlerNotInPipelineException("No such handler in pipeline");
217         }
218 
219         return *this;
220     }
221 
222     final void removeAt(size_t site)
223     {
224         import kiss.container.array;
225         _isFinalize = false;
226         PipelineContext rctx = _ctxs[site];
227         rctx.detachPipeline();
228         removeSite(_ctxs,site);
229         //_ctxs.removeSite(site);
230 
231         import std.algorithm.searching;
232 
233         const auto dir = rctx.getDirection();
234         if (dir == HandlerDir.BOTH || dir == HandlerDir.IN)
235         {
236             arrayRemove(_inCtxs,rctx,true);
237            // _inCtxs.removeOne(rctx);
238         }
239 
240         if (dir == HandlerDir.BOTH || dir == HandlerDir.OUT)
241         {
242             arrayRemove(_inCtxs,rctx,true);
243             //_outCtxs.removeOne(rctx);
244         }
245     }
246 }
247 
248 /*
249  * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
250  * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
251  *
252  * Use Unit for one of the types if your pipeline is unidirectional.
253  * If R is void, read(),  will be disabled.
254  * If W is Unit, write() and close() will be disabled.
255  */
256 
257 final class Pipeline(R, W = void) : PipelineBase
258 {
259     alias Ptr = Pipeline!(R, W);
260 
261     static Ptr create()
262     {
263         return new Ptr();
264     }
265 
266     ~this()
267     {
268 //        if (!_isStatic)  // USE GC, maybe the contex will free before pipeline
269 //        {
270 //            detachHandlers();
271 //        }
272     }
273 
274     pragma(inline)
275     void read(R msg)
276     {
277         static if (!is(R == void))
278         {
279             if (_front)
280                 _front.read(msg);
281             else
282 				throw new NotHasInBoundException("read(): not have inbound handler in Pipeline");
283         }
284     }
285 
286     pragma(inline,true)
287     void timeOut()
288     {
289         static if (!is(R == void))
290         {
291             if (_front)
292                 _front.timeOut();
293             else
294 				throw new NotHasInBoundException("timeOut(): not have inbound handler in Pipeline");
295         }
296     }
297 
298     pragma(inline)
299     void transportActive()
300     {
301         static if (!is(R == void))
302         {
303             if (_front)
304             {
305                 _front.transportActive();
306             }
307         }
308     }
309 
310     pragma(inline)
311     void transportInactive()
312     {
313         static if (!is(R == void))
314         {
315             if (_front)
316             {
317                 _front.transportInactive();
318             }
319         }
320     }
321 
322     static if (!is(W == void))
323     {
324         alias TheCallBack = void delegate(W, size_t);
325         pragma(inline)
326         void write(W msg, TheCallBack cback = null)
327         {
328 
329             if (_back)
330                 _back.write(msg, cback);
331             else
332 				throw new NotHasOutBoundException("close(): no outbound handler in Pipeline");
333         }
334     }
335 
336     pragma(inline)
337     void close()
338     {
339         static if (!is(W == void))
340         {
341             if (_back)
342                 _back.close();
343             else
344 				throw new NotHasOutBoundException("close(): no outbound handler in Pipeline");
345         }
346     }
347 
348     override void finalize()
349     {
350         if (_isFinalize)
351             return;
352         _front = null;
353         static if (!is(R == void))
354         {
355             if (!_inCtxs.empty())
356             {
357                 _front = cast(InboundLink!R)(_inCtxs[0]);
358                 for (size_t i = 0; i < _inCtxs.length - 1; i++)
359                 {
360                     _inCtxs[i].setNextIn(_inCtxs[i + 1]);
361                 }
362                 _inCtxs[_inCtxs.length - 1].setNextIn(null);
363             }
364         }
365 
366         _back = null;
367         static if (!is(W == void))
368         {
369 
370             if (!_outCtxs.empty())
371             {
372                 _back = cast(OutboundLink!W)(_outCtxs[_outCtxs.length - 1]);
373                 for (size_t i = _outCtxs.length - 1; i > 0; --i)
374                 {
375                     _outCtxs[i].setNextOut(_outCtxs[i - 1]);
376                 }
377                 _outCtxs[0].setNextOut(null);
378             }
379         }
380 
381         for (int i = 0; i < _ctxs.length; ++i)
382         {
383             _ctxs[i].attachPipeline();
384         }
385 
386         if (_front is null && _back is null)
387 			throw new PipelineEmptyException("No Handler in the Pipeline");
388 
389         _isFinalize = true;
390     }
391 
392 protected:
393     this()
394     {
395         super();
396     }
397 
398     this(bool isStatic)
399     {
400         _isStatic = isStatic;
401         super();
402     }
403 
404 private:
405     bool _isStatic = false;
406 
407     static if (!is(R == void))
408     {
409         InboundLink!R _front = null;
410     }
411     else
412     {
413         Object _front = null;
414     }
415 
416     static if (!is(W == void))
417     {
418         OutboundLink!W _back = null;
419     }
420     else
421     {
422         Object _back = null;
423     }
424 }
425 
426 abstract shared class PipelineFactory(PipeLine)
427 {
428     PipeLine newPipeline(TcpStream transport);
429 }
430 
431 alias AcceptPipeline = Pipeline!(TcpStream, uint);
432 abstract shared class AcceptPipelineFactory
433 {
434     AcceptPipeline newPipeline(TcpListener acceptor);
435 }