1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.codec.http.session.httpsession;
12 
13 import collie.codec.http.headers;
14 import collie.codec.http.httpmessage;
15 import collie.codec.http.httptansaction;
16 import collie.codec.http.codec.httpcodec;
17 import collie.codec.http.codec.wsframe;
18 import collie.codec.http.errocode;
19 import kiss.log;
20 import kiss.net.TcpStream;
21 import kiss.functional;
22 public import kiss.net.struct_;
23 import kiss.event.task;
24 import std.socket;
25 import std.experimental.allocator.mallocator;
26 import collie.codec.http.httpwritebuffer;
27 
28 alias HTTPBuffer =  HTTPByteBuffer!(Mallocator);
29 abstract class HTTPSessionController
30 {
31 	HTTPTransactionHandler getRequestHandler(HTTPTransaction txn, HTTPMessage msg);
32 
33 	void attachSession(HTTPSession session){}
34 	
35 	/**
36    * logInformed at the end when the given HTTPSession is going away.
37    */
38 	void detachSession(HTTPSession session){}
39 	
40 	/**
41    * logInform the controller that the session's codec changed
42    */
43 	void onSessionCodecChange(HTTPSession session) {}
44 }
45 
46 interface SessionDown
47 {
48 	void httpWrite(StreamWriteBuffer buffer);
49 	void httpClose();
50 	void post(void delegate());
51 	Address localAddress();
52 	Address remoteAddress();
53 }
54 
55 /// HTTPSession will not send any read event
56 abstract class HTTPSession : HTTPTransaction.Transport,
57 	HTTPCodec.CallBack
58 {
59 	alias StreamID = HTTPCodec.StreamID;
60 	interface logInfoCallback {
61 		// Note: you must not start any asynchronous work from onCreate()
62 		void onCreate(HTTPSession);
63 		//void onIngressError(const HTTPSession, ProxygenError);
64 		void onIngressEOF();
65 		void onRequestBegin(HTTPSession);
66 		void onRequestEnd(HTTPSession,
67 			uint maxIngressQueueSize);
68 		void onActivateConnection(HTTPSession);
69 		void onDeactivateConnection(HTTPSession);
70 		// Note: you must not start any asynchronous work from onDestroy()
71 		void onDestroy(HTTPSession);
72 		void onIngressMessage(HTTPSession,
73 			HTTPMessage);
74 		void onIngressLimitExceeded(HTTPSession);
75 		void onIngressPaused(HTTPSession);
76 		void onTransactionDetached(HTTPSession);
77 		void onPingReplySent(ulong latency);
78 		void onPingReplyReceived();
79 		void onSettingsOutgoingStreamsFull(HTTPSession);
80 		void onSettingsOutgoingStreamsNotFull(HTTPSession);
81 		void onFlowControlWindowClosed(HTTPSession);
82 		void onEgressBuffered(HTTPSession);
83 		void onEgressBufferCleared(HTTPSession);
84 	}
85 
86 	this(HTTPSessionController controller,HTTPCodec codec,SessionDown down)
87 	{
88 		_controller = controller;
89 		_down = down;
90 		_codec = codec;
91 		_codec.setCallback(this);
92 	}
93 
94 	//HandlerAdapter {
95 	void onRead(ubyte[] msg) {
96 		//logDebug("on read: ", cast(string)msg);
97 		_codec.onIngress(msg);
98 	}
99 
100 	void onActive() {
101 		_localAddr = _down.localAddress;
102 		_peerAddr = _down.remoteAddress;
103 	}
104 
105 	void inActive() {
106 		getCodec.onConnectClose();
107 		logDebug("connect closed!");
108 	}
109 
110 	void onTimeout() @trusted {
111 		if(_codec)
112 			_codec.onTimeOut();
113 	}
114 
115 	//HandlerAdapter}
116 	//HTTPTransaction.Transport, {
117 	override  void pauseIngress(HTTPTransaction txn){}
118 	
119 	override void resumeIngress(HTTPTransaction txn){}
120 	
121 	override void transactionTimeout(HTTPTransaction txn){}
122 	
123 	override void sendHeaders(HTTPTransaction txn,
124 		HTTPMessage headers,
125 		bool eom)
126 	{
127 		auto tdata = new HTTPBuffer();
128 		_codec.generateHeader(txn,headers,tdata,eom);
129 
130 		//auto cback = eom ? bind(&closeWriteCallBack,txn) : &writeCallBack;
131 		//_down.httpWrite(tdata.data(true),cback);
132 		if(eom){
133 			tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();}));
134 		} 
135 		_down.httpWrite(tdata);
136 		
137 
138 	}
139 
140 	override size_t sendBody(HTTPTransaction txn,
141 		in ubyte[] data,
142 		bool eom)
143 	{
144 
145 		auto tdata = new HTTPBuffer();
146 		size_t rlen = getCodec.generateBody(txn,tdata,data,eom);
147 
148 		if(eom){
149 			tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();}));
150 		} 
151 		_down.httpWrite(tdata);
152 		
153 		// logDebug("send length : ", rlen);
154 		return rlen;
155 	}
156 	
157 	override size_t sendChunkHeader(HTTPTransaction txn,size_t length)
158 	{
159 		auto tdata = new HTTPByteBuffer!Mallocator();
160 		size_t rlen = getCodec.generateChunkHeader(txn,tdata,length);
161 		_down.httpWrite(tdata);
162 		return rlen;
163 	}
164 
165 	
166 	override size_t sendChunkTerminator(HTTPTransaction txn)
167 	{
168 		auto tdata = new HTTPByteBuffer!Mallocator();
169 		size_t rlen = getCodec.generateChunkTerminator(txn,tdata);
170 
171 		tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();}));
172 		_down.httpWrite(tdata);
173 
174 		return rlen;
175 	}
176 	
177 
178 	override size_t sendEOM(HTTPTransaction txn)
179 	{
180 		HttpWriteBuffer tdata = new HTTPByteBuffer!Mallocator();
181 		size_t rlen = getCodec.generateEOM(txn,tdata);
182 		logDebug("send eom!! ",rlen);
183 		//if(rlen) 
184 			//_down.httpWrite(tdata.data(true),bind(&closeWriteCallBack,txn));
185 		if(rlen){
186 			tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();}));
187 			_down.httpWrite(tdata);
188 		} else {
189 			_down.post((){
190 					closeWriteCallBack();
191 					txn.onDelayedDestroy();
192 				});
193 		}
194 		return rlen;
195 	}
196 
197 	//		size_t sendAbort(HTTPTransaction txn,
198 	//			HTTPErrorCode statusCode);
199 
200 	override void socketWrite(HTTPTransaction txn,StreamWriteBuffer buffer) {
201 		_down.httpWrite(buffer);
202 	}
203 
204 
205 	override size_t sendWsData(HTTPTransaction txn,OpCode code,ubyte[] data)
206 	{
207 		auto tdata = new HTTPByteBuffer!Mallocator();
208 		size_t rlen = getCodec.generateWsFrame(txn,tdata,code,data);
209 		if(rlen) {
210 			bool eom = getCodec.shouldClose();
211 //			auto cback = eom ? bind(&closeWriteCallBack,txn) : &writeCallBack;
212 //			_down.httpWrite(tdata.data(true),cback);
213 			if(eom){
214 				tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();}));
215 			} 
216 			_down.httpWrite(tdata);
217 		}
218 		return rlen;
219 	}
220 
221 	override void notifyPendingEgress()
222 	{}
223 	
224 	override void detach(HTTPTransaction txn)
225 	{
226 		if(_codec)
227 			_codec.detach(txn);
228 	}
229 	
230 	//		void notifyIngressBodyProcessed(uint32_t bytes);
231 	//		
232 	//		void notifyEgressBodyBuffered(int64_t bytes);
233 	
234 	override Address getLocalAddress(){
235 		return _localAddr;
236 	}
237 	
238 	override Address getPeerAddress(){
239 		return _peerAddr;
240 	}
241 	
242 	
243 	override HTTPCodec getCodec(){
244 		return _codec;
245 	}
246 
247 	void restCodeC(HTTPCodec codec){
248 		if(_codec)
249 			_codec.setCallback(null);
250 		codec.setCallback(this);
251 		_codec = codec;
252 	}
253 	
254 	override bool isDraining(){return false;}
255 	//HTTPTransaction.Transport, }
256 
257 
258 	// HTTPCodec.CallBack {
259 	override void onMessageBegin(HTTPTransaction txn, HTTPMessage msg)
260 	{
261 		if(txn){
262 			txn.transport = this;
263 		}
264 		logDebug("begin a http requst or reaponse!");
265 	}
266 
267 	override void onHeadersComplete(HTTPTransaction txn,
268 		HTTPMessage msg){
269 		logDebug("onHeadersComplete ------url: ", msg.url);
270 		msg.clientAddress = getPeerAddress();
271 		setupOnHeadersComplete(txn,msg);
272 	}
273 
274 	override void onNativeProtocolUpgrade(HTTPTransaction txn,CodecProtocol protocol,string protocolString,HTTPMessage msg)
275 	{
276         msg.clientAddress = getPeerAddress();
277 		setupProtocolUpgrade(txn,protocol,protocolString,msg);
278 	}
279 
280 	override void onBody(HTTPTransaction txn,const ubyte[] data){
281 		if(txn)
282 			txn.onIngressBody(data,cast(ushort)0);
283 	}
284 
285 	override void onChunkHeader(HTTPTransaction txn, size_t length){
286 		if(txn)
287 			txn.onIngressChunkHeader(length);
288 	}
289 
290 	override void onChunkComplete(HTTPTransaction txn){
291 		if(txn)
292 			txn.onIngressChunkComplete();
293 	}
294 
295 	override void onMessageComplete(HTTPTransaction txn, bool upgrade){
296 		if(txn)
297 			txn.onIngressEOM();
298 	}
299 
300 	override void onError(HTTPTransaction txn,HTTPErrorCode code){
301 		logDebug("ERRO : ", code);
302 		_down.httpClose();
303 	}
304 
305 	override void onAbort(HTTPTransaction txn,
306 		HTTPErrorCode code){
307 		_down.httpClose();
308 	}
309 	
310 	override void onWsFrame(HTTPTransaction txn,ref WSFrame wsf){
311 		logDebug(".....");
312 		if(txn)
313 			txn.onWsFrame(wsf);
314 	}
315 
316 	// HTTPCodec.CallBack }
317 protected:
318 	/**
319    * Called by onHeadersComplete(). This function allows downstream and
320    * upstream to do any setup (like preparing a handler) when headers are
321    * first received from the remote side on a given transaction.
322    */
323 	void setupOnHeadersComplete(ref HTTPTransaction txn,
324 		HTTPMessage msg);
325 
326 	void setupProtocolUpgrade(ref HTTPTransaction txn,CodecProtocol protocol,string protocolString,HTTPMessage msg);
327 protected:
328 	final void closeWriteCallBack(){
329 		// logDebug("shodle close!????????????");
330 		//txn.onDelayedDestroy();
331 		if(_codec is null || _codec.shouldClose()) {
332 			logDebug("\t\t --------do close!!!");
333 			_down.httpClose();
334 		}
335 	}
336 protected:
337 	Address _localAddr;
338 	Address _peerAddr;
339 	HTTPCodec _codec;
340 
341 	HTTPSessionController _controller;
342 	SessionDown _down;
343 }
344