1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.codec.http.session.httpsession; 12 13 import collie.codec.http.headers; 14 import collie.codec.http.httpmessage; 15 import collie.codec.http.httptansaction; 16 import collie.codec.http.codec.httpcodec; 17 import collie.codec.http.codec.wsframe; 18 import collie.codec.http.errocode; 19 import kiss.logger; 20 21 import kiss.net.TcpStream; 22 import kiss.util.functional; 23 import kiss.net; 24 import kiss.event.core; 25 import kiss.event.task; 26 import std.socket; 27 import std.experimental.allocator.mallocator; 28 import collie.codec.http.httpwritebuffer; 29 30 alias HTTPBuffer = HTTPByteBuffer!(Mallocator); 31 abstract class HTTPSessionController 32 { 33 HTTPTransactionHandler getRequestHandler(HTTPTransaction txn, HTTPMessage msg); 34 35 void attachSession(HTTPSession session){} 36 37 /** 38 * logInformed at the end when the given HTTPSession is going away. 39 */ 40 void detachSession(HTTPSession session){} 41 42 /** 43 * logInform the controller that the session's codec changed 44 */ 45 void onSessionCodecChange(HTTPSession session) {} 46 } 47 48 interface SessionDown 49 { 50 void httpWrite(StreamWriteBuffer buffer); 51 void httpClose(); 52 void post(void delegate()); 53 Address localAddress(); 54 Address remoteAddress(); 55 } 56 57 /// HTTPSession will not send any read event 58 abstract class HTTPSession : HTTPTransaction.Transport, 59 HTTPCodec.CallBack 60 { 61 alias StreamID = HTTPCodec.StreamID; 62 interface logInfoCallback { 63 // Note: you must not start any asynchronous work from onCreate() 64 void onCreate(HTTPSession); 65 //void onIngressError(const HTTPSession, ProxygenError); 66 void onIngressEOF(); 67 void onRequestBegin(HTTPSession); 68 void onRequestEnd(HTTPSession, 69 uint maxIngressQueueSize); 70 void onActivateConnection(HTTPSession); 71 void onDeactivateConnection(HTTPSession); 72 // Note: you must not start any asynchronous work from onDestroy() 73 void onDestroy(HTTPSession); 74 void onIngressMessage(HTTPSession, 75 HTTPMessage); 76 void onIngressLimitExceeded(HTTPSession); 77 void onIngressPaused(HTTPSession); 78 void onTransactionDetached(HTTPSession); 79 void onPingReplySent(ulong latency); 80 void onPingReplyReceived(); 81 void onSettingsOutgoingStreamsFull(HTTPSession); 82 void onSettingsOutgoingStreamsNotFull(HTTPSession); 83 void onFlowControlWindowClosed(HTTPSession); 84 void onEgressBuffered(HTTPSession); 85 void onEgressBufferCleared(HTTPSession); 86 } 87 88 this(HTTPSessionController controller,HTTPCodec codec,SessionDown down) 89 { 90 _controller = controller; 91 _down = down; 92 _codec = codec; 93 _codec.setCallback(this); 94 } 95 96 //HandlerAdapter { 97 void onRead(ubyte[] msg) { 98 //logDebug("on read: ", cast(string)msg); 99 _codec.onIngress(msg); 100 } 101 102 void onActive() { 103 _localAddr = _down.localAddress; 104 _peerAddr = _down.remoteAddress; 105 } 106 107 void inActive() { 108 getCodec.onConnectClose(); 109 version(CollieDebugMode) logDebug("connection closed!"); 110 } 111 112 void onTimeout() @trusted { 113 if(_codec) 114 _codec.onTimeOut(); 115 } 116 117 //HandlerAdapter} 118 //HTTPTransaction.Transport, { 119 override void pauseIngress(HTTPTransaction txn){} 120 121 override void resumeIngress(HTTPTransaction txn){} 122 123 override void transactionTimeout(HTTPTransaction txn){} 124 125 override void sendHeaders(HTTPTransaction txn, 126 HTTPMessage headers, 127 bool eom) 128 { 129 HTTPBuffer tdata = new HTTPBuffer(); 130 _codec.generateHeader(txn,headers,tdata,eom); 131 132 version(CollieDebugMode) 133 logDebug("sending headers: length=", tdata.length); 134 //auto cback = eom ? bind(&closeWriteCallBack,txn) : &writeCallBack; 135 //_down.httpWrite(tdata.data(true),cback); 136 if(eom){ 137 tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();})); 138 } 139 _down.httpWrite(tdata); 140 141 } 142 143 override size_t sendBody(HTTPTransaction txn, 144 in ubyte[] data, 145 bool eom) 146 { 147 HTTPBuffer tdata = new HTTPBuffer(); 148 size_t rlen = getCodec.generateBody(txn,tdata,data,eom); 149 150 version(CollieDebugMode) 151 logDebug("sending body: length=", rlen); 152 153 if(eom){ 154 tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();})); 155 } 156 _down.httpWrite(tdata); 157 158 return rlen; 159 } 160 161 override size_t sendChunkHeader(HTTPTransaction txn,size_t length) 162 { 163 auto tdata = new HTTPByteBuffer!Mallocator(); 164 size_t rlen = getCodec.generateChunkHeader(txn,tdata,length); 165 _down.httpWrite(tdata); 166 return rlen; 167 } 168 169 170 override size_t sendChunkTerminator(HTTPTransaction txn) 171 { 172 auto tdata = new HTTPByteBuffer!Mallocator(); 173 size_t rlen = getCodec.generateChunkTerminator(txn,tdata); 174 175 tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();})); 176 _down.httpWrite(tdata); 177 178 return rlen; 179 } 180 181 182 override size_t sendEOM(HTTPTransaction txn) 183 { 184 HttpWriteBuffer tdata = new HTTPByteBuffer!Mallocator(); 185 size_t rlen = getCodec.generateEOM(txn,tdata); 186 logDebug("send eom!! ",rlen); 187 //if(rlen) 188 //_down.httpWrite(tdata.data(true),bind(&closeWriteCallBack,txn)); 189 if(rlen){ 190 tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();})); 191 _down.httpWrite(tdata); 192 } else { 193 _down.post((){ 194 closeWriteCallBack(); 195 txn.onDelayedDestroy(); 196 }); 197 } 198 return rlen; 199 } 200 201 // size_t sendAbort(HTTPTransaction txn, 202 // HTTPErrorCode statusCode); 203 204 override void socketWrite(HTTPTransaction txn,StreamWriteBuffer buffer) { 205 _down.httpWrite(buffer); 206 } 207 208 209 override size_t sendWsData(HTTPTransaction txn,OpCode code,ubyte[] data) 210 { 211 auto tdata = new HTTPByteBuffer!Mallocator(); 212 size_t rlen = getCodec.generateWsFrame(txn,tdata,code,data); 213 if(rlen) { 214 bool eom = getCodec.shouldClose(); 215 // auto cback = eom ? bind(&closeWriteCallBack,txn) : &writeCallBack; 216 // _down.httpWrite(tdata.data(true),cback); 217 if(eom){ 218 tdata.setFinalTask(newTask((){closeWriteCallBack();txn.onDelayedDestroy();})); 219 } 220 _down.httpWrite(tdata); 221 } 222 return rlen; 223 } 224 225 override void notifyPendingEgress() 226 {} 227 228 override void detach(HTTPTransaction txn) 229 { 230 if(_codec) 231 _codec.detach(txn); 232 } 233 234 // void notifyIngressBodyProcessed(uint32_t bytes); 235 // 236 // void notifyEgressBodyBuffered(int64_t bytes); 237 238 override Address getLocalAddress(){ 239 return _localAddr; 240 } 241 242 override Address getPeerAddress(){ 243 return _peerAddr; 244 } 245 246 247 override HTTPCodec getCodec(){ 248 return _codec; 249 } 250 251 void restCodeC(HTTPCodec codec){ 252 if(_codec) 253 _codec.setCallback(null); 254 codec.setCallback(this); 255 _codec = codec; 256 } 257 258 override bool isDraining(){return false;} 259 //HTTPTransaction.Transport, } 260 261 262 // HTTPCodec.CallBack { 263 override void onMessageBegin(HTTPTransaction txn, HTTPMessage msg) 264 { 265 if(txn){ 266 txn.transport = this; 267 } 268 version(CollieDebugMode) logDebug("begin a http requst or reaponse!"); 269 } 270 271 override void onHeadersComplete(HTTPTransaction txn, 272 HTTPMessage msg){ 273 version(CollieDebugMode) logDebug("url: ", msg.url); 274 msg.clientAddress = getPeerAddress(); 275 setupOnHeadersComplete(txn,msg); 276 } 277 278 override void onNativeProtocolUpgrade(HTTPTransaction txn,CodecProtocol protocol,string protocolString,HTTPMessage msg) 279 { 280 msg.clientAddress = getPeerAddress(); 281 setupProtocolUpgrade(txn,protocol,protocolString,msg); 282 } 283 284 override void onBody(HTTPTransaction txn,const ubyte[] data){ 285 if(txn) 286 txn.onIngressBody(data,cast(ushort)0); 287 } 288 289 override void onChunkHeader(HTTPTransaction txn, size_t length){ 290 if(txn) 291 txn.onIngressChunkHeader(length); 292 } 293 294 override void onChunkComplete(HTTPTransaction txn){ 295 if(txn) 296 txn.onIngressChunkComplete(); 297 } 298 299 override void onMessageComplete(HTTPTransaction txn, bool upgrade){ 300 if(txn) 301 txn.onIngressEOM(); 302 } 303 304 override void onError(HTTPTransaction txn,HTTPErrorCode code){ 305 logDebug("ERRO : ", code); 306 _down.httpClose(); 307 } 308 309 override void onAbort(HTTPTransaction txn, 310 HTTPErrorCode code){ 311 _down.httpClose(); 312 } 313 314 override void onWsFrame(HTTPTransaction txn,ref WSFrame wsf){ 315 logDebug("....."); 316 if(txn) 317 txn.onWsFrame(wsf); 318 } 319 320 // HTTPCodec.CallBack } 321 protected: 322 /** 323 * Called by onHeadersComplete(). This function allows downstream and 324 * upstream to do any setup (like preparing a handler) when headers are 325 * first received from the remote side on a given transaction. 326 */ 327 void setupOnHeadersComplete(ref HTTPTransaction txn, 328 HTTPMessage msg); 329 330 void setupProtocolUpgrade(ref HTTPTransaction txn,CodecProtocol protocol,string protocolString,HTTPMessage msg); 331 332 final void closeWriteCallBack(){ 333 if(_codec is null || _codec.shouldClose()) { 334 logDebug("\t\t --------do close!!!"); 335 _down.httpClose(); 336 } 337 } 338 339 Address _localAddr; 340 Address _peerAddr; 341 HTTPCodec _codec; 342 343 HTTPSessionController _controller; 344 SessionDown _down; 345 } 346