1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.codec.http.httptansaction; 12 import kiss.log; 13 import collie.codec.http.codec.httpcodec; 14 import collie.codec.http.httpmessage; 15 import collie.codec.http.errocode; 16 import collie.codec.http.httpwritebuffer; 17 // import kiss.net.TcpStream; 18 import kiss.net.TcpStream; 19 20 import std.socket; 21 public 22 import collie.codec.http.codec.wsframe; 23 24 enum TransportDirection : ubyte { 25 DOWNSTREAM, // toward the client 26 UPSTREAM // toward the origin application or data 27 } 28 29 interface HTTPTransactionHandler 30 { 31 /** 32 * Called once per transaction. This notifies the handler of which 33 * transaction it should talk to and will receive callbacks from. 34 */ 35 void setTransaction(HTTPTransaction txn); 36 /** 37 * Called once after a transaction successfully completes. It 38 * will be called even if a read or write error happened earlier. 39 * This is a terminal callback, which means that the HTTPTransaction 40 * object that gives this call will be invalid after this function 41 * completes. 42 */ 43 void detachTransaction(); 44 45 /** 46 * Called at most once per transaction. This is usually the first 47 * ingress callback. It is possible to get a read error before this 48 * however. If you had previously called pauseIngress(), this callback 49 * will be delayed until you call resumeIngress(). 50 */ 51 void onHeadersComplete(HTTPMessage msg); 52 53 /** 54 * Can be called multiple times per transaction. If you had previously 55 * called pauseIngress(), this callback will be delayed until you call 56 * resumeIngress(). 57 */ 58 void onBody(const ubyte[] chain); 59 60 /** 61 * Can be called multiple times per transaction. If you had previously 62 * called pauseIngress(), this callback will be delayed until you call 63 * resumeIngress(). This signifies the beginning of a chunk of length 64 * 'length'. You will receive onBody() after this. Also, the length will 65 * be greater than zero. 66 */ 67 void onChunkHeader(size_t length) ; 68 69 /** 70 * Can be called multiple times per transaction. If you had previously 71 * called pauseIngress(), this callback will be delayed until you call 72 * resumeIngress(). This signifies the end of a chunk. 73 */ 74 void onChunkComplete() ; 75 76 /** 77 * Can be called any number of times per transaction. If you had 78 * previously called pauseIngress(), this callback will be delayed until 79 * you call resumeIngress(). Trailers can be received once right before 80 * the EOM of a chunked HTTP/1.1 reponse or multiple times per 81 * transaction from SPDY and HTTP/2.0 HEADERS frames. 82 */ 83 // void onTrailers(std::unique_ptr<HTTPHeaders> trailers) noexcept 84 // = 0; 85 86 /** 87 * Can be called once per transaction. If you had previously called 88 * pauseIngress(), this callback will be delayed until you call 89 * resumeIngress(). After this callback is received, there will be no 90 * more normal ingress callbacks received (onEgress*() and onError() 91 * may still be invoked). The Handler should consider 92 * ingress complete after receiving this message. This Transaction is 93 * still valid, and work may still occur on it until detachTransaction 94 * is called. 95 */ 96 void onEOM(); 97 98 /** 99 * Can be called at any time before detachTransaction(). This callback 100 * implies that an error has occurred. To determine if ingress or egress 101 * is affected, check the direciont on the HTTPException. If the 102 * direction is INGRESS, it MAY still be possible to send egress. 103 */ 104 void onError(HTTPErrorCode erromsg); 105 106 /** 107 * If the remote side's receive buffer fills up, this callback will be 108 * invoked so you can attempt to stop sending to the remote side. 109 */ 110 void onEgressPaused(); 111 112 /** 113 * This callback lets you know that the remote side has resumed reading 114 * and you can now continue to send data. 115 */ 116 void onEgressResumed(); 117 118 void onWsFrame(ref WSFrame wsf); 119 120 121 bool onUpgtade(CodecProtocol protocol,HTTPMessage msg); 122 } 123 124 class HTTPTransaction 125 { 126 interface Transport 127 { 128 alias SocketWriteCallBack = TCPWriteCallBack; 129 130 void pauseIngress(HTTPTransaction txn); 131 132 void resumeIngress(HTTPTransaction txn); 133 134 void transactionTimeout(HTTPTransaction txn); 135 136 void sendHeaders(HTTPTransaction txn, 137 HTTPMessage headers, 138 bool eom); 139 140 size_t sendBody(HTTPTransaction txn, 141 in ubyte[], 142 bool eom); 143 144 size_t sendChunkHeader(HTTPTransaction txn, 145 size_t length); 146 147 size_t sendChunkTerminator(HTTPTransaction txn); 148 149 150 size_t sendEOM(HTTPTransaction txn); 151 152 void socketWrite(HTTPTransaction txn,StreamWriteBuffer buffer); 153 154 // size_t sendAbort(HTTPTransaction txn, 155 // HTTPErrorCode statusCode); 156 157 size_t sendWsData(HTTPTransaction txn,OpCode code,ubyte[] data); 158 // size_t sendPriority(HTTPTransaction txn, 159 // const http2::PriorityUpdate& pri); 160 // 161 // size_t sendWindowUpdate(HTTPTransaction txn, 162 // uint32_t bytes); 163 164 void notifyPendingEgress(); 165 166 void detach(HTTPTransaction txn); 167 168 // void notifyIngressBodyProcessed(uint32_t bytes); 169 // 170 // void notifyEgressBodyBuffered(int64_t bytes); 171 172 Address getLocalAddress(); 173 174 Address getPeerAddress(); 175 176 177 HTTPCodec getCodec(); 178 179 bool isDraining(); 180 181 } 182 183 this(TransportDirection direction, HTTPCodec.StreamID id,uint seqNo) 184 { 185 _id = id; 186 _seqNo = seqNo; 187 } 188 @property HTTPTransactionHandler handler(){return _handler;} 189 @property void handler(HTTPTransactionHandler han){_handler = han;} 190 191 @property streamID(){return _id;} 192 @property transport(Transport port){_transport = port;} 193 @property transport(){return _transport;} 194 195 bool isUpstream() const { 196 return _direction == TransportDirection.UPSTREAM; 197 } 198 199 bool isDownstream() const { 200 return _direction == TransportDirection.DOWNSTREAM; 201 } 202 uint getSequenceNumber() const { return _seqNo; } 203 204 HTTPCodec.StreamID getID() const { return _id; } 205 206 207 Address getLocalAddress(){return _transport.getLocalAddress();} 208 209 Address getPeerAddress(){return _transport.getPeerAddress();} 210 211 /** 212 * Invoked by the session when the ingress headers are complete 213 */ 214 void onIngressHeadersComplete(HTTPMessage msg) 215 { 216 // logDebug("onIngressHeadersComplete handle is ", (handler is null)); 217 if(isUpstream() && msg.isResponse()) { 218 _lastResponseStatus = msg.statusCode; 219 } 220 if(_handler) 221 _handler.onHeadersComplete(msg); 222 } 223 224 /** 225 * Invoked by the session when some or all of the ingress entity-body has 226 * been parsed. 227 */ 228 void onIngressBody(const ubyte[] chain, ushort padding) 229 { 230 if(_handler) 231 _handler.onBody(chain); 232 } 233 234 /** 235 * Invoked by the session when a chunk header has been parsed. 236 */ 237 void onIngressChunkHeader(size_t length) 238 { 239 if(_handler) 240 _handler.onChunkHeader(length); 241 } 242 243 /** 244 * Invoked by the session when the CRLF terminating a chunk has been parsed. 245 */ 246 void onIngressChunkComplete() 247 { 248 if(_handler) 249 _handler.onChunkComplete(); 250 } 251 252 /** 253 * Invoked by the session when the ingress message is complete. 254 */ 255 void onIngressEOM() 256 { 257 if(_handler) 258 _handler.onEOM(); 259 } 260 261 void onErro(HTTPErrorCode erro) 262 { 263 if(_handler) 264 _handler.onError(erro); 265 } 266 /** 267 * Schedule or refresh the timeout for this transaction 268 */ 269 void refreshTimeout() {} 270 271 /** 272 * Timeout callback for this transaction. The timer is active while 273 * until the ingress message is complete or terminated by error. 274 */ 275 void timeoutExpired() {} 276 277 /** 278 * Send the egress message headers to the Transport. This method does 279 * not actually write the message out on the wire immediately. All 280 * writes happen at the end of the event loop at the earliest. 281 * Note: This method should be called once per message unless the first 282 * headers sent indicate a 1xx status. 283 * 284 * sendHeaders will not set EOM flag in header frame, whereas 285 * sendHeadersWithEOM will. sendHeadersWithOptionalEOM backs both of them. 286 * 287 * @param headers Message headers 288 */ 289 void sendHeaders(HTTPMessage headers) 290 { 291 sendHeadersWithOptionalEOM(headers,false); 292 } 293 294 void sendHeadersWithEOM(HTTPMessage headers) 295 { 296 sendHeadersWithOptionalEOM(headers,true); 297 } 298 299 void sendHeadersWithOptionalEOM(HTTPMessage headers, bool eom) 300 { 301 if(transport) 302 transport.sendHeaders(this,headers,eom); 303 } 304 /** 305 * Send part or all of the egress message body to the Transport. If flow 306 * control is enabled, the chunk boundaries may not be respected. 307 * This method does not actually write the message out on the wire 308 * immediately. All writes happen at the end of the event loop at the 309 * earliest. 310 * Note: This method may be called zero or more times per message. 311 * 312 * @param body Message body data; the Transport will take care of 313 * applying any necessary protocol framing, such as 314 * chunk headers. 315 */ 316 void sendBody(in ubyte[] body_, bool iseom = false){ 317 if(transport) 318 if(transport)transport.sendBody(this,body_, iseom); 319 } 320 321 /** 322 * Write any protocol framing required for the subsequent call(s) 323 * to sendBody(). This method does not actually write the message out on 324 * the wire immediately. All writes happen at the end of the event loop 325 * at the earliest. 326 * @param length Length in bytes of the body data to follow. 327 */ 328 void sendChunkHeader(size_t length) { 329 if(transport) 330 transport.sendChunkHeader(this,length); 331 } 332 333 void socketWrite(StreamWriteBuffer buffer){ 334 if(transport) 335 transport.socketWrite(this,buffer); 336 } 337 338 /** 339 * Write any protocol syntax needed to terminate the data. This method 340 * does not actually write the message out on the wire immediately. All 341 * writes happen at the end of the event loop at the earliest. 342 * Frame begun by the last call to sendChunkHeader(). 343 */ 344 void sendChunkTerminator() { 345 if(transport) 346 transport.sendChunkTerminator(this); 347 } 348 /** 349 * Send part or all of the egress message body to the Transport. If flow 350 * control is enabled, the chunk boundaries may not be respected. 351 * This method does not actually write the message out on the wire 352 * immediately. All writes happen at the end of the event loop at the 353 * earliest. 354 * Note: This method may be called zero or more times per message. 355 * 356 * @param body Message body data; the Transport will take care of 357 * applying any necessary protocol framing, such as 358 * chunk headers. 359 */ 360 /** 361 * Finalize the egress message; depending on the protocol used 362 * by the Transport, this may involve sending an explicit "end 363 * of message" indicator. This method does not actually write the 364 * message out on the wire immediately. All writes happen at the end 365 * of the event loop at the earliest. 366 * 367 * If the ingress message also is complete, the transaction may 368 * detach itself from the Handler and Transport and delete itself 369 * as part of this method. 370 * 371 * Note: Either this method or sendAbort() should be called once 372 * per message. 373 */ 374 void sendEOM(){ 375 if(transport) 376 transport.sendEOM(this); 377 } 378 379 void sendTimeOut() 380 { 381 if(!transport) return; 382 import collie.codec.http.headers; 383 scope HTTPMessage msg = new HTTPMessage(); 384 msg.statusCode(408); 385 msg.statusMessage("Request Timeout"); 386 msg.getHeaders.add(HTTPHeaderCode.CONNECTION,"close"); 387 sendHeadersWithEOM(msg); 388 } 389 390 void sendWsData(OpCode code,ubyte[] data) 391 { 392 if(transport) 393 transport.sendWsData(this,code,data); 394 } 395 396 void onWsFrame(ref WSFrame wsf){ 397 logDebug("....."); 398 if(_handler) 399 _handler.onWsFrame(wsf); 400 } 401 402 bool onUpgtade(CodecProtocol protocol, HTTPMessage msg){ 403 if(_handler) 404 return _handler.onUpgtade(protocol, msg); 405 406 return false; 407 } 408 409 package: 410 void onDelayedDestroy() 411 { 412 // logDebug("deleting is ", deleting); 413 if(deleting) return; 414 deleting = true; 415 if(_handler) { 416 _handler.detachTransaction(); 417 _handler = null; 418 } 419 if(_transport) { 420 _transport.detach(this); 421 _transport = null; 422 } 423 } 424 private: 425 HTTPCodec.StreamID _id; 426 Transport _transport; 427 HTTPTransactionHandler _handler; 428 TransportDirection _direction; 429 uint _seqNo; 430 431 bool deleting = false; 432 private: 433 ushort _lastResponseStatus; 434 } 435