1 module collie.codec.lengthbaseframe;
2 
3 import std.bitmanip;
4 import std.conv;
5 import kiss.log;
6 
7 import collie.channel;
8 import collie.codec.exception;
9 import kiss.bytes;
10 
11 /// The Pack format
12 /// header: ubytes 4 "00 00 00 00" -> uint 
13 /// Compress Type: ubyte one "00"
14 /// the data is a data.
15 
16 class MsgLengthTooBig : CollieCodecException
17 {
18 	pure nothrow @nogc @safe this(string msg, string file = __FILE__, size_t line = __LINE__)
19 	{
20 		super(msg, file, line);
21 	}
22 }
23 
24 class LengthBasedFrame(bool littleEndian = false) : Handler!(const(ubyte[]),ubyte[],ubyte[],StreamWriteBuffer)
25 {
26 	this(uint max, ubyte compressType = 0x00)
27 	{
28 		_max = max;
29 		_compressType = compressType;
30 		//    clear();
31 	}
32 
33 	final override void read(Context ctx, const(ubyte[]) msg)
34 	{
35 
36 		void doFireRead()
37 		{
38 			if(_data.length > 0)
39 				_data = unCompress(_readComType,_data);
40 			ctx.fireRead(_data);
41 			_data = null;
42 			_pos = ReadPOS.Length_Begin;
43 		}
44 
45 		size_t len = msg.length;
46 		for(size_t i = 0; i < len; ++i)
47 		{
48 			const ubyte ch = msg[i];
49 			final switch(_pos)
50 			{
51 				case ReadPOS.Length_Begin:
52 					_lenByte[0] = ch;
53 					_pos = ReadPOS.Length_1;
54 					break;
55 				case ReadPOS.Length_1:
56 					_lenByte[1] = ch;
57 					_pos = ReadPOS.Length_2;
58 					break;
59 				case ReadPOS.Length_2:
60 					_lenByte[2] = ch;
61 					_pos = ReadPOS.Length_End;
62 					break;
63 				case ReadPOS.Length_End:
64 					_lenByte[3] = ch;
65 					_pos = ReadPOS.Compress_Type;
66 					break;
67 				case ReadPOS.Compress_Type:
68 					_readComType = ch;
69 					_pos = ReadPOS.Body;
70 					_readLen = 0;
71 					_msgLen = endianToNative!(littleEndian,uint)(_lenByte);
72 					if(_msgLen == 0) {
73 						doFireRead();
74 						continue;
75 					} else if(_msgLen > _max){
76 						throw new MsgLengthTooBig("the max is : " ~ to!string(_max) ~ " the length is :" ~ to!string(_msgLen));
77 					}
78 					_data = new ubyte[_msgLen];
79 					break;
80 				case ReadPOS.Body:
81 				{
82 					const size_t needLen = _msgLen - _readLen;
83 					const size_t canRead = len - i;
84 					logDebug();
85 					if(canRead >= needLen){
86 						auto tlen = i + needLen;
87 						_data[_readLen.._msgLen] = msg[i..tlen];
88 						i = tlen - 1;
89 						doFireRead();
90 					} else {
91 						auto tlen = _readLen + canRead;
92 						_data[_readLen..tlen] = msg[i..len];
93 						_readLen = cast(uint)tlen;
94 						return;
95 					}
96 				}
97 					break;
98 			}
99 		}
100 	}
101 
102 	final override void write(Context ctx, ubyte[] msg, TheCallBack cback = null)
103 	{
104 		logDebug("writeln data!");
105 		try 
106 		{
107 			ubyte ctype = _compressType;
108 			auto tmsg = doCompress(ctype, msg);
109 			uint size = cast(uint) tmsg.length;
110 			if(size > _max){
111 				throw new MsgLengthTooBig("the max is : " ~ to!string(_max) ~ " the length is :" ~ to!string(_msgLen));
112 			}
113 			ubyte[] data = new ubyte[size + 5];
114 			ubyte[4] length = nativeToEndian!(littleEndian,uint)(size); 
115 			data[0 .. 4] = length[];
116 			data[4] = ctype;
117 			data[5 .. $] = tmsg[];
118 			ctx.fireWrite(new WarpStreamBuffer(data,null),null);
119 			if (cback)
120 				cback(msg, size);
121 		}
122 		catch (Exception e)
123 		{
124 			import collie.utils.exception;
125 			showException(e);
126 			if (cback)
127 				cback(msg, 0);
128 		}
129 	}
130 
131 protected:
132 	ubyte[] doCompress(ref ubyte type, ubyte[] data)
133 	{
134 		return data;
135 	}
136 	
137 	ubyte[] unCompress(in ubyte type, ubyte[] data)
138 	{
139 		return data;
140 	}
141 
142 private:
143 	enum ReadPOS : ubyte
144 	{
145 		Length_Begin,
146 		Length_1,
147 		Length_2,
148 		Length_End,
149 		Compress_Type,
150 		Body
151 	}
152 
153 private:
154 	ubyte[] _data;
155 	ubyte[4] _lenByte;
156 	ubyte _readComType;
157 	uint _msgLen;
158 	uint _readLen;
159 	ReadPOS _pos = ReadPOS.Length_Begin;
160 
161 	uint _max;
162 	ubyte _compressType;
163 }
164 
165 
166 unittest
167 {
168 	import collie.net;
169 	import kiss.net.TcpStream;
170 	import collie.channel.handlercontext;
171 	import std.stdio;
172 	
173 	ubyte[] gloaData;
174 	
175 	class Contex : HandlerContext!(ubyte[],StreamWriteBuffer)
176 	{
177 		override void fireRead(ubyte[] msg)
178 		{
179 			writeln("the msg is : ", cast(string) msg);
180 		}
181 		
182 		override void fireTimeOut()
183 		{
184 		}
185 		
186 		override void fireTransportActive()
187 		{
188 		}
189 		
190 		override void fireTransportInactive()
191 		{
192 		}
193 		
194 		override void fireWrite(StreamWriteBuffer msg, void delegate(StreamWriteBuffer, size_t) cback = null)
195 		{
196 			auto data = msg.sendData;
197 			gloaData ~= data;
198 			writeln("length is : ", data[0 .. 4], " \n the data is : ", cast(string)(data[4 .. $]));
199 		}
200 		
201 		override void fireClose()
202 		{
203 		}
204 		
205 		override @property PipelineBase pipeline()
206 		{
207 			return null;
208 		}
209 		
210 		override @property Transport transport()
211 		{
212 			return null;
213 		}
214 	}
215 	
216 	Contex ctx = new Contex();
217 	
218 	auto hander = new LengthBasedFrame!false(2048);
219 	string data = "i am a test string";
220 	ubyte[] tdata = cast(ubyte[]) data;
221 	hander.write(ctx, tdata);
222 	
223 	hander.write(ctx, gloaData);
224 	
225 	hander.read(ctx, gloaData);
226 	
227 	hander.read(ctx, gloaData[0 .. 3]);
228 	hander.read(ctx, gloaData[3 .. 20]);
229 	hander.read(ctx, gloaData[20 .. $]);
230 	
231 }