1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.codec.mqtt.mqttencoder;
12 
13 import std.stdio;
14 import std.array;
15 import std.conv;
16 import std.experimental.allocator;
17 import std.experimental.allocator.gc_allocator;
18 import collie.codec.messagetobyteencoder;
19 import kiss.container.Vector;
20 import collie.codec.mqtt.bytebuf;
21 import collie.codec.mqtt.mqttcodecutil;
22 import collie.codec.mqtt.mqttconnackmessage;
23 import collie.codec.mqtt.mqttconnackvariableheader;
24 import collie.codec.mqtt.mqttconnectmsg;
25 import collie.codec.mqtt.mqttconnectpayload;
26 import collie.codec.mqtt.mqttconnectreturncode;
27 import collie.codec.mqtt.mqttconnectvariableheader;
28 import collie.codec.mqtt.mqttfixedheader;
29 import collie.codec.mqtt.mqttmsg;
30 import collie.codec.mqtt.mqttmsgidvariableheader;
31 import collie.codec.mqtt.mqttmsgtype;
32 import collie.codec.mqtt.mqttpubackmsg;
33 import collie.codec.mqtt.mqttpublishmsg;
34 import collie.codec.mqtt.mqttpublishvariableheader;
35 import collie.codec.mqtt.mqttqos;
36 import collie.codec.mqtt.mqttsubackmsg;
37 import collie.codec.mqtt.mqttsubackpayload;
38 import collie.codec.mqtt.mqttsubscribemsg;
39 import collie.codec.mqtt.mqttsubscribepayload;
40 import collie.codec.mqtt.mqtttopicsubscription;
41 import collie.codec.mqtt.mqttunsubscribemsg;
42 import collie.codec.mqtt.mqttunsubscribepayload;
43 import collie.codec.mqtt.mqttversion;
44 
45 import collie.codec.messagetobyteencoder;
46 
47  class MqttEncoder :MessageToByteEncoder!MqttMsg{
48 
49 	 override ubyte[] encode(ref MqttMsg msg){
50 		return doEncode(msg).data();
51 	}
52 
53 	override void callBack(ubyte[] data, size_t len)
54 	{
55 	}
56 	/**
57      * This is the main encoding method.
58      * It's only visible for testing.
59      *
60      * @param byteBufAllocator Allocates ByteBuf
61      * @param message MQTT message to encode
62      * @return ByteBuf with encoded bytes
63      */
64 public:
65 	static ByteBuf doEncode(MqttMsg message) {
66 		
67 		switch (message.fixedHeader().messageType()) {
68 			case MqttMsgType.CONNECT:
69 				return encodeConnectMessage( cast(MqttConnectMsg) message);
70 				
71 			case MqttMsgType.CONNACK:
72 				return encodeConnAckMessage(cast(MqttConnAckMessage) message);
73 				
74 			case MqttMsgType.PUBLISH:
75 				return encodePublishMessage( cast(MqttPublishMsg) message);
76 
77 			case MqttMsgType.SUBSCRIBE:
78 				return encodeSubscribeMessage( cast(MqttSubscribeMsg) message);
79 
80 			case MqttMsgType.UNSUBSCRIBE:
81 				return encodeUnsubscribeMessage(cast(MqttUnsubscribeMsg) message);
82 				
83 			case MqttMsgType.SUBACK:
84 				return encodeSubAckMessage( cast(MqttSubAckMsg) message);
85 				
86 			case MqttMsgType.UNSUBACK:
87 			case MqttMsgType.PUBACK:
88 			case MqttMsgType.PUBREC:
89 			case MqttMsgType.PUBREL:
90 			case MqttMsgType.PUBCOMP:
91 				return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(message);
92 				
93 			case MqttMsgType.PINGREQ:
94 			case MqttMsgType.PINGRESP:
95 			case MqttMsgType.DISCONNECT:
96 				return encodeMessageWithOnlySingleByteFixedHeader(message);
97 				
98 			default:
99 				throw new Exception(
100 					"Unknown message type: " ~ to!string(message.fixedHeader().messageType()));
101 		}
102 	}
103 private:
104 	 static ByteBuf encodeConnectMessage(
105 		MqttConnectMsg message) {
106 		int payloadBufferSize = 0;
107 		
108 		MqttFixedHeader mqttFixedHeader = message.fixedHeader();
109 		MqttConnectVariableHeader variableHeader = message.variableHeader();
110 		MqttConnectPayload payload = message.payload();
111 		MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),cast(byte) variableHeader.mqtt_version());
112 		
113 		// Client id
114 		string clientIdentifier = payload.clientIdentifier();
115 		if (!MqttCodecUtil.isValidClientId(mqttVersion, clientIdentifier)) {
116 			throw new Exception("invalid clientIdentifier: " ~ clientIdentifier);
117 		}
118 		//("clientIdentifier :"~clientIdentifier);
119 		ubyte[] clientIdentifierBytes = cast(ubyte[])(clientIdentifier);
120 
121 		//writeln("clientIdentifierBytes :-->");
122 		//writeln(clientIdentifierBytes);
123 		payloadBufferSize += 2 + clientIdentifierBytes.length;
124 		
125 		// Will topic and message
126 		string willTopic = payload.willTopic();
127 		ubyte[] willTopicBytes = cast(ubyte[])(willTopic);
128 		string willMessage = payload.willMessage();
129 		ubyte[] willMessageBytes = cast(ubyte[])(willMessage) ;
130 		if (variableHeader.isWillFlag()) {
131 			payloadBufferSize += 2 + willTopicBytes.length;
132 			payloadBufferSize += 2 + willMessageBytes.length;
133 		}
134 		
135 		string userName = payload.userName();
136 		ubyte[] userNameBytes =  cast(ubyte[])(userName) ;
137 		if (variableHeader.hasUserName()) {
138 			payloadBufferSize += 2 + userNameBytes.length;
139 		}
140 
141 		string password = payload.password();
142 		ubyte[] passwordBytes = cast(ubyte[])(password) ;
143 		if (variableHeader.hasPassword()) {
144 			payloadBufferSize += 2 + passwordBytes.length;
145 		}
146 		
147 		// Fixed header
148 		//writeln("protocolName -->",mqttVersion.protocolName());
149 		ubyte[] protocolNameBytes = cast(ubyte[])(mqttVersion.protocolName());
150 		int variableHeaderBufferSize = 2 +cast(int)protocolNameBytes.length + 4;
151 		int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
152 		int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
153 
154 	
155 
156 		ByteBuf buf = new ByteBuf;
157 		buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
158 		writeVariableLengthInt(buf, variablePartSize);
159 		
160 		buf.writeShort(cast(int)protocolNameBytes.length);
161 		buf.writeBytes(protocolNameBytes);
162 		
163 		buf.writeByte(cast(ubyte)variableHeader.mqtt_version());
164 		buf.writeByte(getConnVariableHeaderFlag(variableHeader));
165 		buf.writeShort(variableHeader.keepAliveTimeSeconds());
166 		
167 		// Payload
168 		buf.writeShort(cast(int)clientIdentifierBytes.length);
169 		//writeln("clientIdentifierBytes len ---->: ",clientIdentifierBytes.length);
170 		buf.writeBytes(clientIdentifierBytes, 0, cast(int)clientIdentifierBytes.length);
171 		if (variableHeader.isWillFlag()) {
172 			buf.writeShort(cast(int)willTopicBytes.length);
173 			buf.writeBytes(willTopicBytes, 0, cast(int)willTopicBytes.length);
174 			buf.writeShort(cast(int)willMessageBytes.length);
175 			buf.writeBytes(willMessageBytes, 0, cast(int)willMessageBytes.length);
176 		}
177 		if (variableHeader.hasUserName()) {
178 			buf.writeShort(cast(int)userNameBytes.length);
179 			buf.writeBytes(userNameBytes, 0, cast(int)userNameBytes.length);
180 		}
181 		if (variableHeader.hasPassword()) {
182 			buf.writeShort(cast(int)passwordBytes.length);
183 			buf.writeBytes(passwordBytes, 0,cast(int)passwordBytes.length);
184 		}
185 
186 		return buf;
187 	}
188 	
189 	 static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
190 		int flagByte = 0;
191 		if (variableHeader.hasUserName()) {
192 			flagByte |= 0x80;
193 		}
194 		if (variableHeader.hasPassword()) {
195 			flagByte |= 0x40;
196 		}
197 		if (variableHeader.isWillRetain()) {
198 			flagByte |= 0x20;
199 		}
200 		flagByte |= (variableHeader.willQos() & 0x03) << 3;
201 		if (variableHeader.isWillFlag()) {
202 			flagByte |= 0x04;
203 		}
204 		if (variableHeader.isCleanSession()) {
205 			flagByte |= 0x02;
206 		}
207 		return flagByte;
208 	}
209 	
210 	 static ByteBuf encodeConnAckMessage(
211 		MqttConnAckMessage message) {
212 
213 		ByteBuf buf = new ByteBuf;
214 		buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
215 		buf.writeByte(cast(ubyte)(2 & 0xff));
216 		buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
217 
218 		buf.writeByte(cast(ubyte)(message.variableHeader().connectReturnCode()));
219 
220 		return buf;
221 	}
222 	
223 	 static ByteBuf encodeSubscribeMessage(
224 		MqttSubscribeMsg message) {
225 		int variableHeaderBufferSize = 2;
226 		int payloadBufferSize = 0;
227 		
228 		MqttFixedHeader mqttFixedHeader = message.fixedHeader();
229 		MqttMsgIdVariableHeader variableHeader = message.variableHeader();
230 		MqttSubscribePayload payload = message.payload();
231 		
232 		foreach (MqttTopicSubscription topic ; payload.topicSubscriptions()) {
233 			string topicName = topic.topicName();
234 			ubyte[] topicNameBytes = cast(ubyte[])(topicName);
235 			payloadBufferSize += 2 + topicNameBytes.length;
236 			payloadBufferSize += 1;
237 		}
238 		
239 		int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
240 		int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
241 		
242 		ByteBuf buf = new ByteBuf;
243 		buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
244 		writeVariableLengthInt(buf, variablePartSize);
245 		
246 		// Variable Header
247 		int messageId = variableHeader.messageId();
248 		buf.writeShort(messageId);
249 		
250 		// Payload
251 		foreach (MqttTopicSubscription topic ; payload.topicSubscriptions()) {
252 			string topicName = topic.topicName();
253 			ubyte[] topicNameBytes = cast(ubyte[])(topicName);
254 			buf.writeShort(cast(int)topicNameBytes.length);
255 			buf.writeBytes(topicNameBytes, 0, cast(int)topicNameBytes.length);
256 			buf.writeByte(to!(int)(topic.qualityOfService()));
257 		}
258 		
259 		return buf;
260 	}
261 	
262 	 static ByteBuf encodeUnsubscribeMessage(
263 		MqttUnsubscribeMsg message) {
264 		int variableHeaderBufferSize = 2;
265 		int payloadBufferSize = 0;
266 		
267 		MqttFixedHeader mqttFixedHeader = message.fixedHeader();
268 		MqttMsgIdVariableHeader variableHeader = message.variableHeader();
269 		MqttUnsubscribePayload payload = message.payload();
270 		
271 		foreach (string topicName ; payload.topics()) {
272 			ubyte[] topicNameBytes = cast(ubyte[])(topicName);
273 			payloadBufferSize += 2 + topicNameBytes.length;
274 		}
275 		
276 		int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
277 		int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
278 		
279 		ByteBuf buf = new ByteBuf;
280 		buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
281 		writeVariableLengthInt(buf, variablePartSize);
282 		
283 		// Variable Header
284 		int messageId = variableHeader.messageId();
285 		buf.writeShort(messageId);
286 		
287 		// Payload
288 		foreach (string topicName ; payload.topics()) {
289 			ubyte[] topicNameBytes = cast(ubyte[])(topicName);
290 			buf.writeShort(cast(int)topicNameBytes.length);
291 			buf.writeBytes(topicNameBytes, 0, cast(int)topicNameBytes.length);
292 		}
293 		
294 		return buf;
295 	}
296 	
297 	 static ByteBuf encodeSubAckMessage(
298 		MqttSubAckMsg message) {
299 		int variableHeaderBufferSize = 2;
300 		int payloadBufferSize = cast(int)message.payload().grantedQoSLevels().length;
301 		int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
302 		int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
303 		ByteBuf buf = new ByteBuf;
304 		buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
305 		writeVariableLengthInt(buf, variablePartSize);
306 		buf.writeShort(message.variableHeader().messageId());
307 		foreach (int qos ; message.payload().grantedQoSLevels()) {
308 			buf.writeByte(qos);
309 		}
310 		
311 		return buf;
312 	}
313 	
314 	 static ByteBuf encodePublishMessage(
315 		MqttPublishMsg message) {
316 		MqttFixedHeader mqttFixedHeader = message.fixedHeader();
317 		MqttPublishVariableHeader variableHeader = message.variableHeader();
318 		//ByteBuf payload = message.payload().duplicate();
319 		
320 		string topicName = variableHeader.topicName();
321 		ubyte[] topicNameBytes = cast(ubyte[])(topicName);
322 
323 		int variableHeaderBufferSize = 2 + cast(int)topicNameBytes.length +
324 			(to!(int)(mqttFixedHeader.qosLevel())> 0 ? 2 : 0);
325 		int payloadBufferSize = cast(int)message.content().length;
326 		int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
327 		int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
328 		
329 		ByteBuf buf = new ByteBuf;
330 		buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
331 		writeVariableLengthInt(buf, variablePartSize);
332 		buf.writeShort(cast(int)topicNameBytes.length);
333 		buf.writeBytes(topicNameBytes);
334 		if (to!(int)(mqttFixedHeader.qosLevel()) > 0) {
335 			buf.writeShort(variableHeader.messageId());
336 		}
337 		buf.writeBytes(message.content());
338 		
339 		return buf;
340 	}
341 	
342 	 static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
343 		MqttMsg message) {
344 		MqttFixedHeader mqttFixedHeader = message.fixedHeader();
345 		MqttMsgIdVariableHeader variableHeader = cast(MqttMsgIdVariableHeader) message.variableHeader();
346 		int msgId = variableHeader.messageId();
347 
348 		int variableHeaderBufferSize = 2; // variable part only has a message id
349 		int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
350 		ByteBuf buf =new ByteBuf;
351 		buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
352 		writeVariableLengthInt(buf, variableHeaderBufferSize);
353 		buf.writeShort(msgId);
354 		
355 		return buf;
356 	}
357 	
358 	 static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
359 		MqttMsg message) {
360 		MqttFixedHeader mqttFixedHeader = message.fixedHeader();
361 		ByteBuf buf = new ByteBuf;
362 		buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
363 		buf.writeByte(0);
364 		
365 		return buf;
366 	}
367 	
368 	 static int getFixedHeaderByte1(MqttFixedHeader header) {
369 		int ret = 0;
370 		int va = to!(int)(header.messageType());
371 		ret |= va<< 4;
372 		if (header.isDup()) {
373 			ret |= 0x08;
374 		}
375 		int hv = to!(int)(header.qosLevel());
376 		ret |=  hv<< 1;
377 		if (header.isRetain()) {
378 			ret |= 0x01;
379 		}
380 		return ret;
381 	}
382 
383 	 static void writeVariableLengthInt(ref ByteBuf buf, int num) {
384 		do {
385 			int digit = num % 128;
386 			num /= 128;
387 			if (num > 0) {
388 				digit |= 0x80;
389 			}
390 			buf.writeByte(digit);
391 		} while (num > 0);
392 	}
393 	
394 	 static int getVariableLengthInt(int num) {
395 		int count = 0;
396 		do {
397 			num /= 128;
398 			count++;
399 		} while (num > 0);
400 		return count;
401 	}
402 	
403 //	 static byte[] encodeStringUtf8(String s) {
404 //		return s.getBytes(CharsetUtil.UTF_8);
405 //	}
406 }
407