1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.codec.http.session.httpsession;
12 
13 import collie.codec.http.headers;
14 import collie.codec.http.httpmessage;
15 import collie.codec.http.httptansaction;
16 import collie.codec.http.codec.httpcodec;
17 import collie.codec.http.codec.wsframe;
18 import collie.codec.http.errocode;
19 import kiss.logger;
20 
21 import kiss.net.TcpStream;
22 import kiss.util.functional;
23 import kiss.net;
24 import kiss.event.core;
25 import kiss.event.task;
26 import std.socket;
27 import std.experimental.allocator.mallocator;
28 import collie.codec.http.httpwritebuffer;
29 
30 alias HTTPBuffer =  HTTPByteBuffer!(Mallocator);
31 abstract class HTTPSessionController
32 {
33 	HTTPTransactionHandler getRequestHandler(HTTPTransaction txn, HTTPMessage msg);
34 
35 	void attachSession(HTTPSession session){}
36 	
37 	/**
38    * logInformed at the end when the given HTTPSession is going away.
39    */
40 	void detachSession(HTTPSession session){}
41 	
42 	/**
43    * logInform the controller that the session's codec changed
44    */
45 	void onSessionCodecChange(HTTPSession session) {}
46 }
47 
48 interface SessionDown
49 {
50 	void httpWrite(StreamWriteBuffer buffer);
51 	void httpClose();
52 	void post(void delegate());
53 	Address localAddress();
54 	Address remoteAddress();
55 }
56 
57 /// HTTPSession will not send any read event
58 abstract class HTTPSession : HTTPTransaction.Transport,
59 	HTTPCodec.CallBack
60 {
61 	alias StreamID = HTTPCodec.StreamID;
62 	interface logInfoCallback {
63 		// Note: you must not start any asynchronous work from onCreate()
64 		void onCreate(HTTPSession);
65 		//void onIngressError(const HTTPSession, ProxygenError);
66 		void onIngressEOF();
67 		void onRequestBegin(HTTPSession);
68 		void onRequestEnd(HTTPSession,
69 			uint maxIngressQueueSize);
70 		void onActivateConnection(HTTPSession);
71 		void onDeactivateConnection(HTTPSession);
72 		// Note: you must not start any asynchronous work from onDestroy()
73 		void onDestroy(HTTPSession);
74 		void onIngressMessage(HTTPSession,
75 			HTTPMessage);
76 		void onIngressLimitExceeded(HTTPSession);
77 		void onIngressPaused(HTTPSession);
78 		void onTransactionDetached(HTTPSession);
79 		void onPingReplySent(ulong latency);
80 		void onPingReplyReceived();
81 		void onSettingsOutgoingStreamsFull(HTTPSession);
82 		void onSettingsOutgoingStreamsNotFull(HTTPSession);
83 		void onFlowControlWindowClosed(HTTPSession);
84 		void onEgressBuffered(HTTPSession);
85 		void onEgressBufferCleared(HTTPSession);
86 	}
87 
88 	this(HTTPSessionController controller,HTTPCodec codec,SessionDown down)
89 	{
90 		_controller = controller;
91 		_down = down;
92 		_codec = codec;
93 		_codec.setCallback(this);
94 	}
95 
96 	//HandlerAdapter {
97 	void onRead(ubyte[] msg) {
98 		//logDebug("on read: ", cast(string)msg);
99 		_codec.onIngress(msg);
100 	}
101 
102 	void onActive() {
103 		_localAddr = _down.localAddress;
104 		_peerAddr = _down.remoteAddress;
105 	}
106 
107 	void inActive() {
108 		getCodec.onConnectClose();
109 		version(CollieDebugMode) logDebug("connection closed!");
110 	}
111 
112 	void onTimeout() @trusted {
113 		if(_codec)
114 			_codec.onTimeOut();
115 	}
116 
117 	//HandlerAdapter}
118 	//HTTPTransaction.Transport, {
119 	override  void pauseIngress(HTTPTransaction txn){}
120 	
121 	override void resumeIngress(HTTPTransaction txn){}
122 	
123 	override void transactionTimeout(HTTPTransaction txn){}
124 	
125 	override void sendHeaders(HTTPTransaction txn,
126 		HTTPMessage headers,
127 		bool eom)
128 	{
129 		HTTPBuffer tdata = new HTTPBuffer();
130 		_codec.generateHeader(txn,headers,tdata,eom);
131 
132 		version(CollieDebugMode) 
133 		logDebug("sending headers: length=", tdata.length);
134 		//auto cback = eom ? bind(&closeWriteCallBack,txn) : &writeCallBack;
135 		//_down.httpWrite(tdata.data(true),cback);
136 		if(eom){
137 			tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();}));
138 		} 
139 		_down.httpWrite(tdata);
140 		
141 	}
142 
143 	override size_t sendBody(HTTPTransaction txn,
144 		in ubyte[] data,
145 		bool eom)
146 	{
147 		HTTPBuffer tdata = new HTTPBuffer();
148 		size_t rlen = getCodec.generateBody(txn,tdata,data,eom);
149 
150 		version(CollieDebugMode) 
151 		logDebug("sending body: length=", rlen);
152 
153 		if(eom){
154 			tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();}));
155 		} 
156 		_down.httpWrite(tdata);
157 
158 		return rlen;
159 	}
160 	
161 	override size_t sendChunkHeader(HTTPTransaction txn,size_t length)
162 	{
163 		auto tdata = new HTTPByteBuffer!Mallocator();
164 		size_t rlen = getCodec.generateChunkHeader(txn,tdata,length);
165 		_down.httpWrite(tdata);
166 		return rlen;
167 	}
168 
169 	
170 	override size_t sendChunkTerminator(HTTPTransaction txn)
171 	{
172 		auto tdata = new HTTPByteBuffer!Mallocator();
173 		size_t rlen = getCodec.generateChunkTerminator(txn,tdata);
174 
175 		tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();}));
176 		_down.httpWrite(tdata);
177 
178 		return rlen;
179 	}
180 	
181 
182 	override size_t sendEOM(HTTPTransaction txn)
183 	{
184 		HttpWriteBuffer tdata = new HTTPByteBuffer!Mallocator();
185 		size_t rlen = getCodec.generateEOM(txn,tdata);
186 		logDebug("send eom!! ",rlen);
187 		//if(rlen) 
188 			//_down.httpWrite(tdata.data(true),bind(&closeWriteCallBack,txn));
189 		if(rlen){
190 			tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();}));
191 			_down.httpWrite(tdata);
192 		} else {
193 			_down.post((){
194 					closeWriteCallBack();
195 					txn.onDelayedDestroy();
196 				});
197 		}
198 		return rlen;
199 	}
200 
201 	//		size_t sendAbort(HTTPTransaction txn,
202 	//			HTTPErrorCode statusCode);
203 
204 	override void socketWrite(HTTPTransaction txn,StreamWriteBuffer buffer) {
205 		_down.httpWrite(buffer);
206 	}
207 
208 
209 	override size_t sendWsData(HTTPTransaction txn,OpCode code,ubyte[] data)
210 	{
211 		auto tdata = new HTTPByteBuffer!Mallocator();
212 		size_t rlen = getCodec.generateWsFrame(txn,tdata,code,data);
213 		if(rlen) {
214 			bool eom = getCodec.shouldClose();
215 //			auto cback = eom ? bind(&closeWriteCallBack,txn) : &writeCallBack;
216 //			_down.httpWrite(tdata.data(true),cback);
217 			if(eom){
218 				tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();}));
219 			} 
220 			_down.httpWrite(tdata);
221 		}
222 		return rlen;
223 	}
224 
225 	override void notifyPendingEgress()
226 	{}
227 	
228 	override void detach(HTTPTransaction txn)
229 	{
230 		if(_codec)
231 			_codec.detach(txn);
232 	}
233 	
234 	//		void notifyIngressBodyProcessed(uint32_t bytes);
235 	//		
236 	//		void notifyEgressBodyBuffered(int64_t bytes);
237 	
238 	override Address getLocalAddress(){
239 		return _localAddr;
240 	}
241 	
242 	override Address getPeerAddress(){
243 		return _peerAddr;
244 	}
245 	
246 	
247 	override HTTPCodec getCodec(){
248 		return _codec;
249 	}
250 
251 	void restCodeC(HTTPCodec codec){
252 		if(_codec)
253 			_codec.setCallback(null);
254 		codec.setCallback(this);
255 		_codec = codec;
256 	}
257 	
258 	override bool isDraining(){return false;}
259 	//HTTPTransaction.Transport, }
260 
261 
262 	// HTTPCodec.CallBack {
263 	override void onMessageBegin(HTTPTransaction txn, HTTPMessage msg)
264 	{
265 		if(txn){
266 			txn.transport = this;
267 		}
268 		version(CollieDebugMode) logDebug("begin a http requst or reaponse!");
269 	}
270 
271 	override void onHeadersComplete(HTTPTransaction txn,
272 		HTTPMessage msg){
273 		version(CollieDebugMode) logDebug("url: ", msg.url);
274 		msg.clientAddress = getPeerAddress();
275 		setupOnHeadersComplete(txn,msg);
276 	}
277 
278 	override void onNativeProtocolUpgrade(HTTPTransaction txn,CodecProtocol protocol,string protocolString,HTTPMessage msg)
279 	{
280         msg.clientAddress = getPeerAddress();
281 		setupProtocolUpgrade(txn,protocol,protocolString,msg);
282 	}
283 
284 	override void onBody(HTTPTransaction txn,const ubyte[] data){
285 		if(txn)
286 			txn.onIngressBody(data,cast(ushort)0);
287 	}
288 
289 	override void onChunkHeader(HTTPTransaction txn, size_t length){
290 		if(txn)
291 			txn.onIngressChunkHeader(length);
292 	}
293 
294 	override void onChunkComplete(HTTPTransaction txn){
295 		if(txn)
296 			txn.onIngressChunkComplete();
297 	}
298 
299 	override void onMessageComplete(HTTPTransaction txn, bool upgrade){
300 		if(txn)
301 			txn.onIngressEOM();
302 	}
303 
304 	override void onError(HTTPTransaction txn,HTTPErrorCode code){
305 		logDebug("ERRO : ", code);
306 		_down.httpClose();
307 	}
308 
309 	override void onAbort(HTTPTransaction txn,
310 		HTTPErrorCode code){
311 		_down.httpClose();
312 	}
313 	
314 	override void onWsFrame(HTTPTransaction txn,ref WSFrame wsf){
315 		logDebug(".....");
316 		if(txn)
317 			txn.onWsFrame(wsf);
318 	}
319 
320 	// HTTPCodec.CallBack }
321 protected:
322 	/**
323    * Called by onHeadersComplete(). This function allows downstream and
324    * upstream to do any setup (like preparing a handler) when headers are
325    * first received from the remote side on a given transaction.
326    */
327 	void setupOnHeadersComplete(ref HTTPTransaction txn,
328 		HTTPMessage msg);
329 
330 	void setupProtocolUpgrade(ref HTTPTransaction txn,CodecProtocol protocol,string protocolString,HTTPMessage msg);
331 
332 	final void closeWriteCallBack(){
333 		if(_codec is null || _codec.shouldClose()) {
334 			logDebug("\t\t --------do close!!!");
335 			_down.httpClose();
336 		}
337 	}
338 	
339 	Address _localAddr;
340 	Address _peerAddr;
341 	HTTPCodec _codec;
342 
343 	HTTPSessionController _controller;
344 	SessionDown _down;
345 }
346