1 module collie.codec.lengthbaseframe; 2 3 import std.bitmanip; 4 import std.conv; 5 import kiss.log; 6 7 import collie.channel; 8 import collie.codec.exception; 9 import kiss.bytes; 10 11 /// The Pack format 12 /// header: ubytes 4 "00 00 00 00" -> uint 13 /// Compress Type: ubyte one "00" 14 /// the data is a data. 15 16 class MsgLengthTooBig : CollieCodecException 17 { 18 pure nothrow @nogc @safe this(string msg, string file = __FILE__, size_t line = __LINE__) 19 { 20 super(msg, file, line); 21 } 22 } 23 24 class LengthBasedFrame(bool littleEndian = false) : Handler!(const(ubyte[]),ubyte[],ubyte[],StreamWriteBuffer) 25 { 26 this(uint max, ubyte compressType = 0x00) 27 { 28 _max = max; 29 _compressType = compressType; 30 // clear(); 31 } 32 33 final override void read(Context ctx, const(ubyte[]) msg) 34 { 35 36 void doFireRead() 37 { 38 if(_data.length > 0) 39 _data = unCompress(_readComType,_data); 40 ctx.fireRead(_data); 41 _data = null; 42 _pos = ReadPOS.Length_Begin; 43 } 44 45 size_t len = msg.length; 46 for(size_t i = 0; i < len; ++i) 47 { 48 const ubyte ch = msg[i]; 49 final switch(_pos) 50 { 51 case ReadPOS.Length_Begin: 52 _lenByte[0] = ch; 53 _pos = ReadPOS.Length_1; 54 break; 55 case ReadPOS.Length_1: 56 _lenByte[1] = ch; 57 _pos = ReadPOS.Length_2; 58 break; 59 case ReadPOS.Length_2: 60 _lenByte[2] = ch; 61 _pos = ReadPOS.Length_End; 62 break; 63 case ReadPOS.Length_End: 64 _lenByte[3] = ch; 65 _pos = ReadPOS.Compress_Type; 66 break; 67 case ReadPOS.Compress_Type: 68 _readComType = ch; 69 _pos = ReadPOS.Body; 70 _readLen = 0; 71 _msgLen = endianToNative!(littleEndian,uint)(_lenByte); 72 if(_msgLen == 0) { 73 doFireRead(); 74 continue; 75 } else if(_msgLen > _max){ 76 throw new MsgLengthTooBig("the max is : " ~ to!string(_max) ~ " the length is :" ~ to!string(_msgLen)); 77 } 78 _data = new ubyte[_msgLen]; 79 break; 80 case ReadPOS.Body: 81 { 82 const size_t needLen = _msgLen - _readLen; 83 const size_t canRead = len - i; 84 logDebug(); 85 if(canRead >= needLen){ 86 auto tlen = i + needLen; 87 _data[_readLen.._msgLen] = msg[i..tlen]; 88 i = tlen - 1; 89 doFireRead(); 90 } else { 91 auto tlen = _readLen + canRead; 92 _data[_readLen..tlen] = msg[i..len]; 93 _readLen = cast(uint)tlen; 94 return; 95 } 96 } 97 break; 98 } 99 } 100 } 101 102 final override void write(Context ctx, ubyte[] msg, TheCallBack cback = null) 103 { 104 logDebug("writeln data!"); 105 try 106 { 107 ubyte ctype = _compressType; 108 auto tmsg = doCompress(ctype, msg); 109 uint size = cast(uint) tmsg.length; 110 if(size > _max){ 111 throw new MsgLengthTooBig("the max is : " ~ to!string(_max) ~ " the length is :" ~ to!string(_msgLen)); 112 } 113 ubyte[] data = new ubyte[size + 5]; 114 ubyte[4] length = nativeToEndian!(littleEndian,uint)(size); 115 data[0 .. 4] = length[]; 116 data[4] = ctype; 117 data[5 .. $] = tmsg[]; 118 ctx.fireWrite(new WarpStreamBuffer(data,null),null); 119 if (cback) 120 cback(msg, size); 121 } 122 catch (Exception e) 123 { 124 import collie.utils.exception; 125 showException(e); 126 if (cback) 127 cback(msg, 0); 128 } 129 } 130 131 protected: 132 ubyte[] doCompress(ref ubyte type, ubyte[] data) 133 { 134 return data; 135 } 136 137 ubyte[] unCompress(in ubyte type, ubyte[] data) 138 { 139 return data; 140 } 141 142 private: 143 enum ReadPOS : ubyte 144 { 145 Length_Begin, 146 Length_1, 147 Length_2, 148 Length_End, 149 Compress_Type, 150 Body 151 } 152 153 private: 154 ubyte[] _data; 155 ubyte[4] _lenByte; 156 ubyte _readComType; 157 uint _msgLen; 158 uint _readLen; 159 ReadPOS _pos = ReadPOS.Length_Begin; 160 161 uint _max; 162 ubyte _compressType; 163 } 164 165 166 unittest 167 { 168 import collie.net; 169 import kiss.net.TcpStream; 170 import collie.channel.handlercontext; 171 import std.stdio; 172 173 ubyte[] gloaData; 174 175 class Contex : HandlerContext!(ubyte[],StreamWriteBuffer) 176 { 177 override void fireRead(ubyte[] msg) 178 { 179 writeln("the msg is : ", cast(string) msg); 180 } 181 182 override void fireTimeOut() 183 { 184 } 185 186 override void fireTransportActive() 187 { 188 } 189 190 override void fireTransportInactive() 191 { 192 } 193 194 override void fireWrite(StreamWriteBuffer msg, void delegate(StreamWriteBuffer, size_t) cback = null) 195 { 196 auto data = msg.sendData; 197 gloaData ~= data; 198 writeln("length is : ", data[0 .. 4], " \n the data is : ", cast(string)(data[4 .. $])); 199 } 200 201 override void fireClose() 202 { 203 } 204 205 override @property PipelineBase pipeline() 206 { 207 return null; 208 } 209 210 override @property Transport transport() 211 { 212 return null; 213 } 214 } 215 216 Contex ctx = new Contex(); 217 218 auto hander = new LengthBasedFrame!false(2048); 219 string data = "i am a test string"; 220 ubyte[] tdata = cast(ubyte[]) data; 221 hander.write(ctx, tdata); 222 223 hander.write(ctx, gloaData); 224 225 hander.read(ctx, gloaData); 226 227 hander.read(ctx, gloaData[0 .. 3]); 228 hander.read(ctx, gloaData[3 .. 20]); 229 hander.read(ctx, gloaData[20 .. $]); 230 231 }