1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.codec.mqtt.mqttencoder; 12 13 import std.stdio; 14 import std.array; 15 import std.conv; 16 import std.experimental.allocator; 17 import std.experimental.allocator.gc_allocator; 18 import collie.codec.messagetobyteencoder; 19 import kiss.container.Vector; 20 import collie.codec.mqtt.bytebuf; 21 import collie.codec.mqtt.mqttcodecutil; 22 import collie.codec.mqtt.mqttconnackmessage; 23 import collie.codec.mqtt.mqttconnackvariableheader; 24 import collie.codec.mqtt.mqttconnectmsg; 25 import collie.codec.mqtt.mqttconnectpayload; 26 import collie.codec.mqtt.mqttconnectreturncode; 27 import collie.codec.mqtt.mqttconnectvariableheader; 28 import collie.codec.mqtt.mqttfixedheader; 29 import collie.codec.mqtt.mqttmsg; 30 import collie.codec.mqtt.mqttmsgidvariableheader; 31 import collie.codec.mqtt.mqttmsgtype; 32 import collie.codec.mqtt.mqttpubackmsg; 33 import collie.codec.mqtt.mqttpublishmsg; 34 import collie.codec.mqtt.mqttpublishvariableheader; 35 import collie.codec.mqtt.mqttqos; 36 import collie.codec.mqtt.mqttsubackmsg; 37 import collie.codec.mqtt.mqttsubackpayload; 38 import collie.codec.mqtt.mqttsubscribemsg; 39 import collie.codec.mqtt.mqttsubscribepayload; 40 import collie.codec.mqtt.mqtttopicsubscription; 41 import collie.codec.mqtt.mqttunsubscribemsg; 42 import collie.codec.mqtt.mqttunsubscribepayload; 43 import collie.codec.mqtt.mqttversion; 44 45 import collie.codec.messagetobyteencoder; 46 47 class MqttEncoder :MessageToByteEncoder!MqttMsg{ 48 49 override ubyte[] encode(ref MqttMsg msg){ 50 return doEncode(msg).data(); 51 } 52 53 override void callBack(ubyte[] data, size_t len) 54 { 55 } 56 /** 57 * This is the main encoding method. 58 * It's only visible for testing. 59 * 60 * @param byteBufAllocator Allocates ByteBuf 61 * @param message MQTT message to encode 62 * @return ByteBuf with encoded bytes 63 */ 64 public: 65 static ByteBuf doEncode(MqttMsg message) { 66 67 switch (message.fixedHeader().messageType()) { 68 case MqttMsgType.CONNECT: 69 return encodeConnectMessage( cast(MqttConnectMsg) message); 70 71 case MqttMsgType.CONNACK: 72 return encodeConnAckMessage(cast(MqttConnAckMessage) message); 73 74 case MqttMsgType.PUBLISH: 75 return encodePublishMessage( cast(MqttPublishMsg) message); 76 77 case MqttMsgType.SUBSCRIBE: 78 return encodeSubscribeMessage( cast(MqttSubscribeMsg) message); 79 80 case MqttMsgType.UNSUBSCRIBE: 81 return encodeUnsubscribeMessage(cast(MqttUnsubscribeMsg) message); 82 83 case MqttMsgType.SUBACK: 84 return encodeSubAckMessage( cast(MqttSubAckMsg) message); 85 86 case MqttMsgType.UNSUBACK: 87 case MqttMsgType.PUBACK: 88 case MqttMsgType.PUBREC: 89 case MqttMsgType.PUBREL: 90 case MqttMsgType.PUBCOMP: 91 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(message); 92 93 case MqttMsgType.PINGREQ: 94 case MqttMsgType.PINGRESP: 95 case MqttMsgType.DISCONNECT: 96 return encodeMessageWithOnlySingleByteFixedHeader(message); 97 98 default: 99 throw new Exception( 100 "Unknown message type: " ~ to!string(message.fixedHeader().messageType())); 101 } 102 } 103 private: 104 static ByteBuf encodeConnectMessage( 105 MqttConnectMsg message) { 106 int payloadBufferSize = 0; 107 108 MqttFixedHeader mqttFixedHeader = message.fixedHeader(); 109 MqttConnectVariableHeader variableHeader = message.variableHeader(); 110 MqttConnectPayload payload = message.payload(); 111 MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),cast(byte) variableHeader.mqtt_version()); 112 113 // Client id 114 string clientIdentifier = payload.clientIdentifier(); 115 if (!MqttCodecUtil.isValidClientId(mqttVersion, clientIdentifier)) { 116 throw new Exception("invalid clientIdentifier: " ~ clientIdentifier); 117 } 118 //("clientIdentifier :"~clientIdentifier); 119 ubyte[] clientIdentifierBytes = cast(ubyte[])(clientIdentifier); 120 121 //writeln("clientIdentifierBytes :-->"); 122 //writeln(clientIdentifierBytes); 123 payloadBufferSize += 2 + clientIdentifierBytes.length; 124 125 // Will topic and message 126 string willTopic = payload.willTopic(); 127 ubyte[] willTopicBytes = cast(ubyte[])(willTopic); 128 string willMessage = payload.willMessage(); 129 ubyte[] willMessageBytes = cast(ubyte[])(willMessage) ; 130 if (variableHeader.isWillFlag()) { 131 payloadBufferSize += 2 + willTopicBytes.length; 132 payloadBufferSize += 2 + willMessageBytes.length; 133 } 134 135 string userName = payload.userName(); 136 ubyte[] userNameBytes = cast(ubyte[])(userName) ; 137 if (variableHeader.hasUserName()) { 138 payloadBufferSize += 2 + userNameBytes.length; 139 } 140 141 string password = payload.password(); 142 ubyte[] passwordBytes = cast(ubyte[])(password) ; 143 if (variableHeader.hasPassword()) { 144 payloadBufferSize += 2 + passwordBytes.length; 145 } 146 147 // Fixed header 148 //writeln("protocolName -->",mqttVersion.protocolName()); 149 ubyte[] protocolNameBytes = cast(ubyte[])(mqttVersion.protocolName()); 150 int variableHeaderBufferSize = 2 +cast(int)protocolNameBytes.length + 4; 151 int variablePartSize = variableHeaderBufferSize + payloadBufferSize; 152 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); 153 154 155 156 ByteBuf buf = new ByteBuf; 157 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); 158 writeVariableLengthInt(buf, variablePartSize); 159 160 buf.writeShort(cast(int)protocolNameBytes.length); 161 buf.writeBytes(protocolNameBytes); 162 163 buf.writeByte(cast(ubyte)variableHeader.mqtt_version()); 164 buf.writeByte(getConnVariableHeaderFlag(variableHeader)); 165 buf.writeShort(variableHeader.keepAliveTimeSeconds()); 166 167 // Payload 168 buf.writeShort(cast(int)clientIdentifierBytes.length); 169 //writeln("clientIdentifierBytes len ---->: ",clientIdentifierBytes.length); 170 buf.writeBytes(clientIdentifierBytes, 0, cast(int)clientIdentifierBytes.length); 171 if (variableHeader.isWillFlag()) { 172 buf.writeShort(cast(int)willTopicBytes.length); 173 buf.writeBytes(willTopicBytes, 0, cast(int)willTopicBytes.length); 174 buf.writeShort(cast(int)willMessageBytes.length); 175 buf.writeBytes(willMessageBytes, 0, cast(int)willMessageBytes.length); 176 } 177 if (variableHeader.hasUserName()) { 178 buf.writeShort(cast(int)userNameBytes.length); 179 buf.writeBytes(userNameBytes, 0, cast(int)userNameBytes.length); 180 } 181 if (variableHeader.hasPassword()) { 182 buf.writeShort(cast(int)passwordBytes.length); 183 buf.writeBytes(passwordBytes, 0,cast(int)passwordBytes.length); 184 } 185 186 return buf; 187 } 188 189 static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) { 190 int flagByte = 0; 191 if (variableHeader.hasUserName()) { 192 flagByte |= 0x80; 193 } 194 if (variableHeader.hasPassword()) { 195 flagByte |= 0x40; 196 } 197 if (variableHeader.isWillRetain()) { 198 flagByte |= 0x20; 199 } 200 flagByte |= (variableHeader.willQos() & 0x03) << 3; 201 if (variableHeader.isWillFlag()) { 202 flagByte |= 0x04; 203 } 204 if (variableHeader.isCleanSession()) { 205 flagByte |= 0x02; 206 } 207 return flagByte; 208 } 209 210 static ByteBuf encodeConnAckMessage( 211 MqttConnAckMessage message) { 212 213 ByteBuf buf = new ByteBuf; 214 buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); 215 buf.writeByte(cast(ubyte)(2 & 0xff)); 216 buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00); 217 218 buf.writeByte(cast(ubyte)(message.variableHeader().connectReturnCode())); 219 220 return buf; 221 } 222 223 static ByteBuf encodeSubscribeMessage( 224 MqttSubscribeMsg message) { 225 int variableHeaderBufferSize = 2; 226 int payloadBufferSize = 0; 227 228 MqttFixedHeader mqttFixedHeader = message.fixedHeader(); 229 MqttMsgIdVariableHeader variableHeader = message.variableHeader(); 230 MqttSubscribePayload payload = message.payload(); 231 232 foreach (MqttTopicSubscription topic ; payload.topicSubscriptions()) { 233 string topicName = topic.topicName(); 234 ubyte[] topicNameBytes = cast(ubyte[])(topicName); 235 payloadBufferSize += 2 + topicNameBytes.length; 236 payloadBufferSize += 1; 237 } 238 239 int variablePartSize = variableHeaderBufferSize + payloadBufferSize; 240 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); 241 242 ByteBuf buf = new ByteBuf; 243 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); 244 writeVariableLengthInt(buf, variablePartSize); 245 246 // Variable Header 247 int messageId = variableHeader.messageId(); 248 buf.writeShort(messageId); 249 250 // Payload 251 foreach (MqttTopicSubscription topic ; payload.topicSubscriptions()) { 252 string topicName = topic.topicName(); 253 ubyte[] topicNameBytes = cast(ubyte[])(topicName); 254 buf.writeShort(cast(int)topicNameBytes.length); 255 buf.writeBytes(topicNameBytes, 0, cast(int)topicNameBytes.length); 256 buf.writeByte(to!(int)(topic.qualityOfService())); 257 } 258 259 return buf; 260 } 261 262 static ByteBuf encodeUnsubscribeMessage( 263 MqttUnsubscribeMsg message) { 264 int variableHeaderBufferSize = 2; 265 int payloadBufferSize = 0; 266 267 MqttFixedHeader mqttFixedHeader = message.fixedHeader(); 268 MqttMsgIdVariableHeader variableHeader = message.variableHeader(); 269 MqttUnsubscribePayload payload = message.payload(); 270 271 foreach (string topicName ; payload.topics()) { 272 ubyte[] topicNameBytes = cast(ubyte[])(topicName); 273 payloadBufferSize += 2 + topicNameBytes.length; 274 } 275 276 int variablePartSize = variableHeaderBufferSize + payloadBufferSize; 277 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); 278 279 ByteBuf buf = new ByteBuf; 280 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); 281 writeVariableLengthInt(buf, variablePartSize); 282 283 // Variable Header 284 int messageId = variableHeader.messageId(); 285 buf.writeShort(messageId); 286 287 // Payload 288 foreach (string topicName ; payload.topics()) { 289 ubyte[] topicNameBytes = cast(ubyte[])(topicName); 290 buf.writeShort(cast(int)topicNameBytes.length); 291 buf.writeBytes(topicNameBytes, 0, cast(int)topicNameBytes.length); 292 } 293 294 return buf; 295 } 296 297 static ByteBuf encodeSubAckMessage( 298 MqttSubAckMsg message) { 299 int variableHeaderBufferSize = 2; 300 int payloadBufferSize = cast(int)message.payload().grantedQoSLevels().length; 301 int variablePartSize = variableHeaderBufferSize + payloadBufferSize; 302 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); 303 ByteBuf buf = new ByteBuf; 304 buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); 305 writeVariableLengthInt(buf, variablePartSize); 306 buf.writeShort(message.variableHeader().messageId()); 307 foreach (int qos ; message.payload().grantedQoSLevels()) { 308 buf.writeByte(qos); 309 } 310 311 return buf; 312 } 313 314 static ByteBuf encodePublishMessage( 315 MqttPublishMsg message) { 316 MqttFixedHeader mqttFixedHeader = message.fixedHeader(); 317 MqttPublishVariableHeader variableHeader = message.variableHeader(); 318 //ByteBuf payload = message.payload().duplicate(); 319 320 string topicName = variableHeader.topicName(); 321 ubyte[] topicNameBytes = cast(ubyte[])(topicName); 322 323 int variableHeaderBufferSize = 2 + cast(int)topicNameBytes.length + 324 (to!(int)(mqttFixedHeader.qosLevel())> 0 ? 2 : 0); 325 int payloadBufferSize = cast(int)message.content().length; 326 int variablePartSize = variableHeaderBufferSize + payloadBufferSize; 327 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); 328 329 ByteBuf buf = new ByteBuf; 330 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); 331 writeVariableLengthInt(buf, variablePartSize); 332 buf.writeShort(cast(int)topicNameBytes.length); 333 buf.writeBytes(topicNameBytes); 334 if (to!(int)(mqttFixedHeader.qosLevel()) > 0) { 335 buf.writeShort(variableHeader.messageId()); 336 } 337 buf.writeBytes(message.content()); 338 339 return buf; 340 } 341 342 static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId( 343 MqttMsg message) { 344 MqttFixedHeader mqttFixedHeader = message.fixedHeader(); 345 MqttMsgIdVariableHeader variableHeader = cast(MqttMsgIdVariableHeader) message.variableHeader(); 346 int msgId = variableHeader.messageId(); 347 348 int variableHeaderBufferSize = 2; // variable part only has a message id 349 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize); 350 ByteBuf buf =new ByteBuf; 351 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); 352 writeVariableLengthInt(buf, variableHeaderBufferSize); 353 buf.writeShort(msgId); 354 355 return buf; 356 } 357 358 static ByteBuf encodeMessageWithOnlySingleByteFixedHeader( 359 MqttMsg message) { 360 MqttFixedHeader mqttFixedHeader = message.fixedHeader(); 361 ByteBuf buf = new ByteBuf; 362 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); 363 buf.writeByte(0); 364 365 return buf; 366 } 367 368 static int getFixedHeaderByte1(MqttFixedHeader header) { 369 int ret = 0; 370 int va = to!(int)(header.messageType()); 371 ret |= va<< 4; 372 if (header.isDup()) { 373 ret |= 0x08; 374 } 375 int hv = to!(int)(header.qosLevel()); 376 ret |= hv<< 1; 377 if (header.isRetain()) { 378 ret |= 0x01; 379 } 380 return ret; 381 } 382 383 static void writeVariableLengthInt(ref ByteBuf buf, int num) { 384 do { 385 int digit = num % 128; 386 num /= 128; 387 if (num > 0) { 388 digit |= 0x80; 389 } 390 buf.writeByte(digit); 391 } while (num > 0); 392 } 393 394 static int getVariableLengthInt(int num) { 395 int count = 0; 396 do { 397 num /= 128; 398 count++; 399 } while (num > 0); 400 return count; 401 } 402 403 // static byte[] encodeStringUtf8(String s) { 404 // return s.getBytes(CharsetUtil.UTF_8); 405 // } 406 } 407