1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.codec.http.httptansaction;
12 import kiss.log;
13 import collie.codec.http.codec.httpcodec;
14 import collie.codec.http.httpmessage;
15 import collie.codec.http.errocode;
16 import collie.codec.http.httpwritebuffer;
17 // import kiss.net.TcpStream;
18 import kiss.net.TcpStream;
19 
20 import std.socket;
21 public 
22 import collie.codec.http.codec.wsframe;
23 
24 enum TransportDirection : ubyte {
25 	DOWNSTREAM,  // toward the client
26 	UPSTREAM     // toward the origin application or data
27 }
28 
29 interface HTTPTransactionHandler 
30 {
31 	/**
32    * Called once per transaction. This notifies the handler of which
33    * transaction it should talk to and will receive callbacks from.
34    */
35 	void setTransaction(HTTPTransaction txn);
36 	/**
37    * Called once after a transaction successfully completes. It
38    * will be called even if a read or write error happened earlier.
39    * This is a terminal callback, which means that the HTTPTransaction
40    * object that gives this call will be invalid after this function
41    * completes.
42    */
43 	void detachTransaction();
44 	
45 	/**
46    * Called at most once per transaction. This is usually the first
47    * ingress callback. It is possible to get a read error before this
48    * however. If you had previously called pauseIngress(), this callback
49    * will be delayed until you call resumeIngress().
50    */
51 	void onHeadersComplete(HTTPMessage msg);
52 	
53 	/**
54    * Can be called multiple times per transaction. If you had previously
55    * called pauseIngress(), this callback will be delayed until you call
56    * resumeIngress().
57    */
58 	void onBody(const ubyte[] chain);
59 	
60 	/**
61    * Can be called multiple times per transaction. If you had previously
62    * called pauseIngress(), this callback will be delayed until you call
63    * resumeIngress(). This signifies the beginning of a chunk of length
64    * 'length'. You will receive onBody() after this. Also, the length will
65    * be greater than zero.
66    */
67 	void onChunkHeader(size_t length) ;
68 	
69 	/**
70    * Can be called multiple times per transaction. If you had previously
71    * called pauseIngress(), this callback will be delayed until you call
72    * resumeIngress(). This signifies the end of a chunk.
73    */
74 	void onChunkComplete() ;
75 	
76 	/**
77    * Can be called any number of times per transaction. If you had
78    * previously called pauseIngress(), this callback will be delayed until
79    * you call resumeIngress(). Trailers can be received once right before
80    * the EOM of a chunked HTTP/1.1 reponse or multiple times per
81    * transaction from SPDY and HTTP/2.0 HEADERS frames.
82    */
83 //	void onTrailers(std::unique_ptr<HTTPHeaders> trailers) noexcept
84 //		= 0;
85 	
86 	/**
87    * Can be called once per transaction. If you had previously called
88    * pauseIngress(), this callback will be delayed until you call
89    * resumeIngress(). After this callback is received, there will be no
90    * more normal ingress callbacks received (onEgress*() and onError()
91    * may still be invoked). The Handler should consider
92    * ingress complete after receiving this message. This Transaction is
93    * still valid, and work may still occur on it until detachTransaction
94    * is called.
95    */
96 	void onEOM();
97 	
98 	/**
99    * Can be called at any time before detachTransaction(). This callback
100    * implies that an error has occurred. To determine if ingress or egress
101    * is affected, check the direciont on the HTTPException. If the
102    * direction is INGRESS, it MAY still be possible to send egress.
103    */
104 	void onError(HTTPErrorCode erromsg);
105 	
106 	/**
107    * If the remote side's receive buffer fills up, this callback will be
108    * invoked so you can attempt to stop sending to the remote side.
109    */
110 	void onEgressPaused();
111 	
112 	/**
113    * This callback lets you know that the remote side has resumed reading
114    * and you can now continue to send data.
115    */
116 	void onEgressResumed();
117 
118 	void onWsFrame(ref WSFrame wsf);
119 
120 
121 	bool onUpgtade(CodecProtocol protocol,HTTPMessage msg);
122 }
123 
124 class HTTPTransaction
125 {
126 	interface Transport
127 	{
128 		alias SocketWriteCallBack = TCPWriteCallBack;
129 
130 		void pauseIngress(HTTPTransaction txn);
131 		
132 		void resumeIngress(HTTPTransaction txn);
133 		
134 		void transactionTimeout(HTTPTransaction txn);
135 		
136 		void sendHeaders(HTTPTransaction txn,
137 			HTTPMessage headers,
138 			bool eom);
139 		
140 		size_t sendBody(HTTPTransaction txn,
141 			in ubyte[],
142 			bool eom);
143 		
144 		size_t sendChunkHeader(HTTPTransaction txn,
145 			size_t length);
146 		
147 		size_t sendChunkTerminator(HTTPTransaction txn);
148 
149 		
150 		size_t sendEOM(HTTPTransaction txn);
151 
152 		void socketWrite(HTTPTransaction txn,StreamWriteBuffer buffer);
153 
154 //		size_t sendAbort(HTTPTransaction txn,
155 //			HTTPErrorCode statusCode);
156 
157 		size_t sendWsData(HTTPTransaction txn,OpCode code,ubyte[] data);
158 //		size_t sendPriority(HTTPTransaction txn,
159 //			const http2::PriorityUpdate& pri);
160 //		
161 //		size_t sendWindowUpdate(HTTPTransaction txn,
162 //			uint32_t bytes);
163 		
164 		void notifyPendingEgress();
165 		
166 		void detach(HTTPTransaction txn);
167 		
168 //		void notifyIngressBodyProcessed(uint32_t bytes);
169 //		
170 //		void notifyEgressBodyBuffered(int64_t bytes);
171 		
172 		Address getLocalAddress();
173 		
174 		Address getPeerAddress();
175 
176 		
177 		HTTPCodec getCodec();
178 		
179 		bool isDraining();
180 
181 	}
182 
183 	this(TransportDirection direction, HTTPCodec.StreamID id,uint seqNo)
184 	{
185 		_id = id;
186 		_seqNo = seqNo;
187 	}
188 	@property HTTPTransactionHandler handler(){return _handler;}
189 	@property void handler(HTTPTransactionHandler han){_handler = han;}
190 
191 	@property streamID(){return _id;}
192 	@property transport(Transport port){_transport = port;}
193 	@property transport(){return _transport;}
194 
195 	bool isUpstream() const {
196 		return _direction == TransportDirection.UPSTREAM;
197 	}
198 	
199 	bool isDownstream() const {
200 		return _direction == TransportDirection.DOWNSTREAM;
201 	}
202 	uint getSequenceNumber() const { return _seqNo; }
203 
204 	HTTPCodec.StreamID getID() const { return _id; }
205 
206 
207 	Address getLocalAddress(){return _transport.getLocalAddress();}
208 	
209 	Address getPeerAddress(){return _transport.getPeerAddress();}
210 
211 	/**
212    * Invoked by the session when the ingress headers are complete
213    */
214 	void onIngressHeadersComplete(HTTPMessage msg)
215 	{
216 		// logDebug("onIngressHeadersComplete handle is ", (handler is null));
217 		if(isUpstream() && msg.isResponse()) {
218 			_lastResponseStatus = msg.statusCode;
219 		}
220 		if(_handler)
221 			_handler.onHeadersComplete(msg);
222 	}
223 	
224 	/**
225    * Invoked by the session when some or all of the ingress entity-body has
226    * been parsed.
227    */
228 	void onIngressBody(const ubyte[] chain, ushort padding)
229 	{
230 		if(_handler)
231 			_handler.onBody(chain);
232 	}
233 	
234 	/**
235    * Invoked by the session when a chunk header has been parsed.
236    */
237 	void onIngressChunkHeader(size_t length)
238 	{
239 		if(_handler)
240 			_handler.onChunkHeader(length);
241 	}
242 	
243 	/**
244    * Invoked by the session when the CRLF terminating a chunk has been parsed.
245    */
246 	void onIngressChunkComplete()
247 	{
248 		if(_handler)
249 			_handler.onChunkComplete();
250 	}
251 
252 	/**
253    * Invoked by the session when the ingress message is complete.
254    */
255 	void onIngressEOM()
256 	{
257 		if(_handler)
258 			_handler.onEOM();
259 	}
260 
261 	void onErro(HTTPErrorCode erro)
262 	{
263 		if(_handler)
264 			_handler.onError(erro);
265 	}
266 	/**
267    * Schedule or refresh the timeout for this transaction
268    */
269 	void refreshTimeout() {}
270 
271 	/**
272    * Timeout callback for this transaction.  The timer is active while
273    * until the ingress message is complete or terminated by error.
274    */
275 	void timeoutExpired() {}
276 
277 	/**
278    * Send the egress message headers to the Transport. This method does
279    * not actually write the message out on the wire immediately. All
280    * writes happen at the end of the event loop at the earliest.
281    * Note: This method should be called once per message unless the first
282    * headers sent indicate a 1xx status.
283    *
284    * sendHeaders will not set EOM flag in header frame, whereas
285    * sendHeadersWithEOM will. sendHeadersWithOptionalEOM backs both of them.
286    *
287    * @param headers  Message headers
288    */
289 	void sendHeaders(HTTPMessage headers)
290 	{
291 		sendHeadersWithOptionalEOM(headers,false);
292 	}
293 
294 	void sendHeadersWithEOM(HTTPMessage headers)
295 	{
296 		sendHeadersWithOptionalEOM(headers,true);
297 	}
298 
299 	void sendHeadersWithOptionalEOM(HTTPMessage headers, bool eom)
300 	{
301 		if(transport)
302 			transport.sendHeaders(this,headers,eom);
303 	}
304 	/**
305    * Send part or all of the egress message body to the Transport. If flow
306    * control is enabled, the chunk boundaries may not be respected.
307    * This method does not actually write the message out on the wire
308    * immediately. All writes happen at the end of the event loop at the
309    * earliest.
310    * Note: This method may be called zero or more times per message.
311    *
312    * @param body Message body data; the Transport will take care of
313    *             applying any necessary protocol framing, such as
314    *             chunk headers.
315    */
316 	void sendBody(in ubyte[] body_, bool iseom = false){
317 		if(transport)
318 			if(transport)transport.sendBody(this,body_, iseom);
319 	}
320 	
321 	/**
322    * Write any protocol framing required for the subsequent call(s)
323    * to sendBody(). This method does not actually write the message out on
324    * the wire immediately. All writes happen at the end of the event loop
325    * at the earliest.
326    * @param length  Length in bytes of the body data to follow.
327    */
328 	void sendChunkHeader(size_t length) {
329 		if(transport)
330 			transport.sendChunkHeader(this,length);
331 	}
332 
333 	void socketWrite(StreamWriteBuffer buffer){
334 		if(transport)
335 			transport.socketWrite(this,buffer);
336 	}
337 	
338 	/**
339    * Write any protocol syntax needed to terminate the data. This method
340    * does not actually write the message out on the wire immediately. All
341    * writes happen at the end of the event loop at the earliest.
342    * Frame begun by the last call to sendChunkHeader().
343    */
344 	void sendChunkTerminator() {
345 		if(transport)
346 			transport.sendChunkTerminator(this);
347 	}
348 	/**
349    * Send part or all of the egress message body to the Transport. If flow
350    * control is enabled, the chunk boundaries may not be respected.
351    * This method does not actually write the message out on the wire
352    * immediately. All writes happen at the end of the event loop at the
353    * earliest.
354    * Note: This method may be called zero or more times per message.
355    *
356    * @param body Message body data; the Transport will take care of
357    *             applying any necessary protocol framing, such as
358    *             chunk headers.
359    */
360 	/**
361    * Finalize the egress message; depending on the protocol used
362    * by the Transport, this may involve sending an explicit "end
363    * of message" indicator. This method does not actually write the
364    * message out on the wire immediately. All writes happen at the end
365    * of the event loop at the earliest.
366    *
367    * If the ingress message also is complete, the transaction may
368    * detach itself from the Handler and Transport and delete itself
369    * as part of this method.
370    *
371    * Note: Either this method or sendAbort() should be called once
372    *       per message.
373    */
374 	void sendEOM(){
375 		if(transport)
376 			transport.sendEOM(this);
377 	}
378 
379 	void sendTimeOut()
380 	{
381 		if(!transport) return;
382 		import collie.codec.http.headers;
383 		scope HTTPMessage msg = new HTTPMessage();
384 		msg.statusCode(408);
385 		msg.statusMessage("Request Timeout");
386 		msg.getHeaders.add(HTTPHeaderCode.CONNECTION,"close");
387 		sendHeadersWithEOM(msg);
388 	}
389 
390 	void sendWsData(OpCode code,ubyte[] data)
391 	{
392 		if(transport)
393 			transport.sendWsData(this,code,data);
394 	}
395 
396 	void onWsFrame(ref WSFrame wsf){
397 		logDebug(".....");
398 		if(_handler)
399 			_handler.onWsFrame(wsf);
400 	}
401 
402 	bool onUpgtade(CodecProtocol protocol, HTTPMessage msg){
403 		if(_handler)
404 			return _handler.onUpgtade(protocol, msg);
405 
406 		return false;
407 	}
408 
409 package:
410 	void onDelayedDestroy()
411 	{
412 		// logDebug("deleting is ", deleting);
413 		if(deleting) return;
414 		deleting = true;
415 		if(_handler) {
416 			_handler.detachTransaction();
417 			_handler = null;
418 		}
419 		if(_transport) {
420 			_transport.detach(this);
421 			_transport = null;
422 		}
423 	}
424 private:
425 	HTTPCodec.StreamID _id;
426 	Transport _transport;
427 	HTTPTransactionHandler _handler;
428 	TransportDirection _direction;
429 	uint _seqNo;
430 
431 	bool deleting = false;
432 private:
433 	ushort _lastResponseStatus;
434 }
435