1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.codec.http.session.httpsession; 12 13 import collie.codec.http.headers; 14 import collie.codec.http.httpmessage; 15 import collie.codec.http.httptansaction; 16 import collie.codec.http.codec.httpcodec; 17 import collie.codec.http.codec.wsframe; 18 import collie.codec.http.errocode; 19 import kiss.log; 20 import kiss.net.TcpStream; 21 import kiss.functional; 22 public import kiss.net.struct_; 23 import kiss.event.task; 24 import std.socket; 25 import std.experimental.allocator.mallocator; 26 import collie.codec.http.httpwritebuffer; 27 28 alias HTTPBuffer = HTTPByteBuffer!(Mallocator); 29 abstract class HTTPSessionController 30 { 31 HTTPTransactionHandler getRequestHandler(HTTPTransaction txn, HTTPMessage msg); 32 33 void attachSession(HTTPSession session){} 34 35 /** 36 * logInformed at the end when the given HTTPSession is going away. 37 */ 38 void detachSession(HTTPSession session){} 39 40 /** 41 * logInform the controller that the session's codec changed 42 */ 43 void onSessionCodecChange(HTTPSession session) {} 44 } 45 46 interface SessionDown 47 { 48 void httpWrite(StreamWriteBuffer buffer); 49 void httpClose(); 50 void post(void delegate()); 51 Address localAddress(); 52 Address remoteAddress(); 53 } 54 55 /// HTTPSession will not send any read event 56 abstract class HTTPSession : HTTPTransaction.Transport, 57 HTTPCodec.CallBack 58 { 59 alias StreamID = HTTPCodec.StreamID; 60 interface logInfoCallback { 61 // Note: you must not start any asynchronous work from onCreate() 62 void onCreate(HTTPSession); 63 //void onIngressError(const HTTPSession, ProxygenError); 64 void onIngressEOF(); 65 void onRequestBegin(HTTPSession); 66 void onRequestEnd(HTTPSession, 67 uint maxIngressQueueSize); 68 void onActivateConnection(HTTPSession); 69 void onDeactivateConnection(HTTPSession); 70 // Note: you must not start any asynchronous work from onDestroy() 71 void onDestroy(HTTPSession); 72 void onIngressMessage(HTTPSession, 73 HTTPMessage); 74 void onIngressLimitExceeded(HTTPSession); 75 void onIngressPaused(HTTPSession); 76 void onTransactionDetached(HTTPSession); 77 void onPingReplySent(ulong latency); 78 void onPingReplyReceived(); 79 void onSettingsOutgoingStreamsFull(HTTPSession); 80 void onSettingsOutgoingStreamsNotFull(HTTPSession); 81 void onFlowControlWindowClosed(HTTPSession); 82 void onEgressBuffered(HTTPSession); 83 void onEgressBufferCleared(HTTPSession); 84 } 85 86 this(HTTPSessionController controller,HTTPCodec codec,SessionDown down) 87 { 88 _controller = controller; 89 _down = down; 90 _codec = codec; 91 _codec.setCallback(this); 92 } 93 94 //HandlerAdapter { 95 void onRead(ubyte[] msg) { 96 //logDebug("on read: ", cast(string)msg); 97 _codec.onIngress(msg); 98 } 99 100 void onActive() { 101 _localAddr = _down.localAddress; 102 _peerAddr = _down.remoteAddress; 103 } 104 105 void inActive() { 106 getCodec.onConnectClose(); 107 logDebug("connect closed!"); 108 } 109 110 void onTimeout() @trusted { 111 if(_codec) 112 _codec.onTimeOut(); 113 } 114 115 //HandlerAdapter} 116 //HTTPTransaction.Transport, { 117 override void pauseIngress(HTTPTransaction txn){} 118 119 override void resumeIngress(HTTPTransaction txn){} 120 121 override void transactionTimeout(HTTPTransaction txn){} 122 123 override void sendHeaders(HTTPTransaction txn, 124 HTTPMessage headers, 125 bool eom) 126 { 127 auto tdata = new HTTPBuffer(); 128 _codec.generateHeader(txn,headers,tdata,eom); 129 130 //auto cback = eom ? bind(&closeWriteCallBack,txn) : &writeCallBack; 131 //_down.httpWrite(tdata.data(true),cback); 132 if(eom){ 133 tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();})); 134 } 135 _down.httpWrite(tdata); 136 137 138 } 139 140 override size_t sendBody(HTTPTransaction txn, 141 in ubyte[] data, 142 bool eom) 143 { 144 145 auto tdata = new HTTPBuffer(); 146 size_t rlen = getCodec.generateBody(txn,tdata,data,eom); 147 148 if(eom){ 149 tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();})); 150 } 151 _down.httpWrite(tdata); 152 153 // logDebug("send length : ", rlen); 154 return rlen; 155 } 156 157 override size_t sendChunkHeader(HTTPTransaction txn,size_t length) 158 { 159 auto tdata = new HTTPByteBuffer!Mallocator(); 160 size_t rlen = getCodec.generateChunkHeader(txn,tdata,length); 161 _down.httpWrite(tdata); 162 return rlen; 163 } 164 165 166 override size_t sendChunkTerminator(HTTPTransaction txn) 167 { 168 auto tdata = new HTTPByteBuffer!Mallocator(); 169 size_t rlen = getCodec.generateChunkTerminator(txn,tdata); 170 171 tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();})); 172 _down.httpWrite(tdata); 173 174 return rlen; 175 } 176 177 178 override size_t sendEOM(HTTPTransaction txn) 179 { 180 HttpWriteBuffer tdata = new HTTPByteBuffer!Mallocator(); 181 size_t rlen = getCodec.generateEOM(txn,tdata); 182 logDebug("send eom!! ",rlen); 183 //if(rlen) 184 //_down.httpWrite(tdata.data(true),bind(&closeWriteCallBack,txn)); 185 if(rlen){ 186 tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();})); 187 _down.httpWrite(tdata); 188 } else { 189 _down.post((){ 190 closeWriteCallBack(); 191 txn.onDelayedDestroy(); 192 }); 193 } 194 return rlen; 195 } 196 197 // size_t sendAbort(HTTPTransaction txn, 198 // HTTPErrorCode statusCode); 199 200 override void socketWrite(HTTPTransaction txn,StreamWriteBuffer buffer) { 201 _down.httpWrite(buffer); 202 } 203 204 205 override size_t sendWsData(HTTPTransaction txn,OpCode code,ubyte[] data) 206 { 207 auto tdata = new HTTPByteBuffer!Mallocator(); 208 size_t rlen = getCodec.generateWsFrame(txn,tdata,code,data); 209 if(rlen) { 210 bool eom = getCodec.shouldClose(); 211 // auto cback = eom ? bind(&closeWriteCallBack,txn) : &writeCallBack; 212 // _down.httpWrite(tdata.data(true),cback); 213 if(eom){ 214 tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();})); 215 } 216 _down.httpWrite(tdata); 217 } 218 return rlen; 219 } 220 221 override void notifyPendingEgress() 222 {} 223 224 override void detach(HTTPTransaction txn) 225 { 226 if(_codec) 227 _codec.detach(txn); 228 } 229 230 // void notifyIngressBodyProcessed(uint32_t bytes); 231 // 232 // void notifyEgressBodyBuffered(int64_t bytes); 233 234 override Address getLocalAddress(){ 235 return _localAddr; 236 } 237 238 override Address getPeerAddress(){ 239 return _peerAddr; 240 } 241 242 243 override HTTPCodec getCodec(){ 244 return _codec; 245 } 246 247 void restCodeC(HTTPCodec codec){ 248 if(_codec) 249 _codec.setCallback(null); 250 codec.setCallback(this); 251 _codec = codec; 252 } 253 254 override bool isDraining(){return false;} 255 //HTTPTransaction.Transport, } 256 257 258 // HTTPCodec.CallBack { 259 override void onMessageBegin(HTTPTransaction txn, HTTPMessage msg) 260 { 261 if(txn){ 262 txn.transport = this; 263 } 264 logDebug("begin a http requst or reaponse!"); 265 } 266 267 override void onHeadersComplete(HTTPTransaction txn, 268 HTTPMessage msg){ 269 logDebug("onHeadersComplete ------url: ", msg.url); 270 msg.clientAddress = getPeerAddress(); 271 setupOnHeadersComplete(txn,msg); 272 } 273 274 override void onNativeProtocolUpgrade(HTTPTransaction txn,CodecProtocol protocol,string protocolString,HTTPMessage msg) 275 { 276 msg.clientAddress = getPeerAddress(); 277 setupProtocolUpgrade(txn,protocol,protocolString,msg); 278 } 279 280 override void onBody(HTTPTransaction txn,const ubyte[] data){ 281 if(txn) 282 txn.onIngressBody(data,cast(ushort)0); 283 } 284 285 override void onChunkHeader(HTTPTransaction txn, size_t length){ 286 if(txn) 287 txn.onIngressChunkHeader(length); 288 } 289 290 override void onChunkComplete(HTTPTransaction txn){ 291 if(txn) 292 txn.onIngressChunkComplete(); 293 } 294 295 override void onMessageComplete(HTTPTransaction txn, bool upgrade){ 296 if(txn) 297 txn.onIngressEOM(); 298 } 299 300 override void onError(HTTPTransaction txn,HTTPErrorCode code){ 301 logDebug("ERRO : ", code); 302 _down.httpClose(); 303 } 304 305 override void onAbort(HTTPTransaction txn, 306 HTTPErrorCode code){ 307 _down.httpClose(); 308 } 309 310 override void onWsFrame(HTTPTransaction txn,ref WSFrame wsf){ 311 logDebug("....."); 312 if(txn) 313 txn.onWsFrame(wsf); 314 } 315 316 // HTTPCodec.CallBack } 317 protected: 318 /** 319 * Called by onHeadersComplete(). This function allows downstream and 320 * upstream to do any setup (like preparing a handler) when headers are 321 * first received from the remote side on a given transaction. 322 */ 323 void setupOnHeadersComplete(ref HTTPTransaction txn, 324 HTTPMessage msg); 325 326 void setupProtocolUpgrade(ref HTTPTransaction txn,CodecProtocol protocol,string protocolString,HTTPMessage msg); 327 protected: 328 final void closeWriteCallBack(){ 329 // logDebug("shodle close!????????????"); 330 //txn.onDelayedDestroy(); 331 if(_codec is null || _codec.shouldClose()) { 332 logDebug("\t\t --------do close!!!"); 333 _down.httpClose(); 334 } 335 } 336 protected: 337 Address _localAddr; 338 Address _peerAddr; 339 HTTPCodec _codec; 340 341 HTTPSessionController _controller; 342 SessionDown _down; 343 } 344