001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020import java.util.Map; 021import java.util.concurrent.ConcurrentHashMap; 022import java.util.concurrent.ConcurrentMap; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.zip.DataFormatException; 025import java.util.zip.Inflater; 026 027import javax.jms.Destination; 028import javax.jms.InvalidClientIDException; 029import javax.jms.JMSException; 030import javax.jms.Message; 031import javax.security.auth.login.CredentialException; 032 033import org.apache.activemq.broker.BrokerService; 034import org.apache.activemq.broker.BrokerServiceAware; 035import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 036import org.apache.activemq.command.ActiveMQBytesMessage; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ActiveMQMapMessage; 039import org.apache.activemq.command.ActiveMQMessage; 040import org.apache.activemq.command.ActiveMQTextMessage; 041import org.apache.activemq.command.Command; 042import org.apache.activemq.command.ConnectionError; 043import org.apache.activemq.command.ConnectionId; 044import org.apache.activemq.command.ConnectionInfo; 045import org.apache.activemq.command.ExceptionResponse; 046import org.apache.activemq.command.MessageAck; 047import org.apache.activemq.command.MessageDispatch; 048import org.apache.activemq.command.MessageId; 049import org.apache.activemq.command.ProducerId; 050import org.apache.activemq.command.ProducerInfo; 051import org.apache.activemq.command.Response; 052import org.apache.activemq.command.SessionId; 053import org.apache.activemq.command.SessionInfo; 054import org.apache.activemq.command.ShutdownInfo; 055import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy; 056import org.apache.activemq.util.ByteArrayOutputStream; 057import org.apache.activemq.util.ByteSequence; 058import org.apache.activemq.util.FactoryFinder; 059import org.apache.activemq.util.IOExceptionSupport; 060import org.apache.activemq.util.IdGenerator; 061import org.apache.activemq.util.JMSExceptionSupport; 062import org.apache.activemq.util.LRUCache; 063import org.apache.activemq.util.LongSequenceGenerator; 064import org.fusesource.hawtbuf.Buffer; 065import org.fusesource.hawtbuf.UTF8Buffer; 066import org.fusesource.mqtt.client.QoS; 067import org.fusesource.mqtt.client.Topic; 068import org.fusesource.mqtt.codec.CONNACK; 069import org.fusesource.mqtt.codec.CONNECT; 070import org.fusesource.mqtt.codec.DISCONNECT; 071import org.fusesource.mqtt.codec.MQTTFrame; 072import org.fusesource.mqtt.codec.PINGREQ; 073import org.fusesource.mqtt.codec.PINGRESP; 074import org.fusesource.mqtt.codec.PUBACK; 075import org.fusesource.mqtt.codec.PUBCOMP; 076import org.fusesource.mqtt.codec.PUBLISH; 077import org.fusesource.mqtt.codec.PUBREC; 078import org.fusesource.mqtt.codec.PUBREL; 079import org.fusesource.mqtt.codec.SUBACK; 080import org.fusesource.mqtt.codec.SUBSCRIBE; 081import org.fusesource.mqtt.codec.UNSUBACK; 082import org.fusesource.mqtt.codec.UNSUBSCRIBE; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086public class MQTTProtocolConverter { 087 088 private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class); 089 090 public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; 091 public static final int V3_1 = 3; 092 public static final int V3_1_1 = 4; 093 094 public static final String SINGLE_LEVEL_WILDCARD = "+"; 095 public static final String MULTI_LEVEL_WILDCARD = "#"; 096 097 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 098 private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); 099 private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5; 100 static final int DEFAULT_CACHE_SIZE = 5000; 101 102 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 103 private final SessionId sessionId = new SessionId(connectionId, -1); 104 private final ProducerId producerId = new ProducerId(sessionId, 1); 105 private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator(); 106 107 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 108 private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE); 109 private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE); 110 111 private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); 112 private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); 113 114 private final MQTTTransport mqttTransport; 115 private final BrokerService brokerService; 116 117 private final Object commnadIdMutex = new Object(); 118 private int lastCommandId; 119 private final AtomicBoolean connected = new AtomicBoolean(false); 120 private final ConnectionInfo connectionInfo = new ConnectionInfo(); 121 private CONNECT connect; 122 private String clientId; 123 private long defaultKeepAlive; 124 private int activeMQSubscriptionPrefetch = -1; 125 private final MQTTPacketIdGenerator packetIdGenerator; 126 private boolean publishDollarTopics; 127 128 public int version; 129 130 private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/"); 131 132 /* 133 * Subscription strategy configuration element. 134 * > mqtt-default-subscriptions 135 * > mqtt-virtual-topic-subscriptions 136 */ 137 private String subscriptionStrategyName = "mqtt-default-subscriptions"; 138 private MQTTSubscriptionStrategy subsciptionStrategy; 139 140 public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) { 141 this.mqttTransport = mqttTransport; 142 this.brokerService = brokerService; 143 this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService); 144 this.defaultKeepAlive = 0; 145 } 146 147 int generateCommandId() { 148 synchronized (commnadIdMutex) { 149 return lastCommandId++; 150 } 151 } 152 153 public void sendToActiveMQ(Command command, ResponseHandler handler) { 154 155 // Lets intercept message send requests.. 156 if (command instanceof ActiveMQMessage) { 157 ActiveMQMessage msg = (ActiveMQMessage) command; 158 try { 159 if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) { 160 // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 161 // specification requirements for system assigned destinations. 162 if (handler != null) { 163 try { 164 handler.onResponse(this, new Response()); 165 } catch (IOException e) { 166 LOG.warn("Failed to send command " + command, e); 167 } 168 } 169 return; 170 } 171 } catch (IOException e) { 172 LOG.warn("Failed to send command " + command, e); 173 } 174 } 175 176 command.setCommandId(generateCommandId()); 177 if (handler != null) { 178 command.setResponseRequired(true); 179 resposeHandlers.put(command.getCommandId(), handler); 180 } 181 getMQTTTransport().sendToActiveMQ(command); 182 } 183 184 void sendToMQTT(MQTTFrame frame) { 185 try { 186 mqttTransport.sendToMQTT(frame); 187 } catch (IOException e) { 188 LOG.warn("Failed to send frame " + frame, e); 189 } 190 } 191 192 /** 193 * Convert a MQTT command 194 */ 195 public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException { 196 switch (frame.messageType()) { 197 case PINGREQ.TYPE: 198 LOG.debug("Received a ping from client: " + getClientId()); 199 checkConnected(); 200 sendToMQTT(PING_RESP_FRAME); 201 LOG.debug("Sent Ping Response to " + getClientId()); 202 break; 203 case CONNECT.TYPE: 204 CONNECT connect = new CONNECT().decode(frame); 205 onMQTTConnect(connect); 206 LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version()); 207 break; 208 case DISCONNECT.TYPE: 209 LOG.debug("MQTT Client {} disconnecting", getClientId()); 210 onMQTTDisconnect(); 211 break; 212 case SUBSCRIBE.TYPE: 213 onSubscribe(new SUBSCRIBE().decode(frame)); 214 break; 215 case UNSUBSCRIBE.TYPE: 216 onUnSubscribe(new UNSUBSCRIBE().decode(frame)); 217 break; 218 case PUBLISH.TYPE: 219 onMQTTPublish(new PUBLISH().decode(frame)); 220 break; 221 case PUBACK.TYPE: 222 onMQTTPubAck(new PUBACK().decode(frame)); 223 break; 224 case PUBREC.TYPE: 225 onMQTTPubRec(new PUBREC().decode(frame)); 226 break; 227 case PUBREL.TYPE: 228 onMQTTPubRel(new PUBREL().decode(frame)); 229 break; 230 case PUBCOMP.TYPE: 231 onMQTTPubComp(new PUBCOMP().decode(frame)); 232 break; 233 default: 234 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); 235 } 236 } 237 238 void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException { 239 if (connected.get()) { 240 throw new MQTTProtocolException("Already connected."); 241 } 242 this.connect = connect; 243 244 // The Server MUST respond to the CONNECT Packet with a CONNACK return code 0x01 245 // (unacceptable protocol level) and then disconnect the Client if the Protocol Level 246 // is not supported by the Server [MQTT-3.1.2-2]. 247 if (connect.version() < 3 || connect.version() > 4) { 248 CONNACK ack = new CONNACK(); 249 ack.code(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION); 250 try { 251 getMQTTTransport().sendToMQTT(ack.encode()); 252 getMQTTTransport().onException(IOExceptionSupport.create("Unsupported or invalid protocol version", null)); 253 } catch (IOException e) { 254 getMQTTTransport().onException(IOExceptionSupport.create(e)); 255 } 256 return; 257 } 258 259 String clientId = ""; 260 if (connect.clientId() != null) { 261 clientId = connect.clientId().toString(); 262 } 263 264 String userName = null; 265 if (connect.userName() != null) { 266 userName = connect.userName().toString(); 267 } 268 String passswd = null; 269 if (connect.password() != null) { 270 271 if (userName == null && connect.version() != V3_1) { 272 // [MQTT-3.1.2-22]: If the user name is not present then the 273 // password must also be absent. 274 // [MQTT-3.1.4-1]: would seem to imply we don't send a CONNACK here. 275 getMQTTTransport().onException(IOExceptionSupport.create("Password given without a user name", null)); 276 return; 277 } 278 279 passswd = connect.password().toString(); 280 } 281 282 version = connect.version(); 283 284 configureInactivityMonitor(connect.keepAlive()); 285 286 connectionInfo.setConnectionId(connectionId); 287 if (clientId != null && !clientId.isEmpty()) { 288 connectionInfo.setClientId(clientId); 289 } else { 290 // Clean Session MUST be set for 0 length Client Id 291 if (!connect.cleanSession()) { 292 CONNACK ack = new CONNACK(); 293 ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); 294 try { 295 getMQTTTransport().sendToMQTT(ack.encode()); 296 getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null)); 297 } catch (IOException e) { 298 getMQTTTransport().onException(IOExceptionSupport.create(e)); 299 } 300 return; 301 } 302 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 303 } 304 305 connectionInfo.setResponseRequired(true); 306 connectionInfo.setUserName(userName); 307 connectionInfo.setPassword(passswd); 308 connectionInfo.setTransportContext(mqttTransport.getPeerCertificates()); 309 310 sendToActiveMQ(connectionInfo, new ResponseHandler() { 311 @Override 312 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 313 314 if (response.isException()) { 315 // If the connection attempt fails we close the socket. 316 Throwable exception = ((ExceptionResponse) response).getException(); 317 //let the client know 318 CONNACK ack = new CONNACK(); 319 if (exception instanceof InvalidClientIDException) { 320 ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); 321 } else if (exception instanceof SecurityException) { 322 ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED); 323 } else if (exception instanceof CredentialException) { 324 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); 325 } else { 326 ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE); 327 } 328 getMQTTTransport().sendToMQTT(ack.encode()); 329 getMQTTTransport().onException(IOExceptionSupport.create(exception)); 330 return; 331 } 332 333 final SessionInfo sessionInfo = new SessionInfo(sessionId); 334 sendToActiveMQ(sessionInfo, null); 335 336 final ProducerInfo producerInfo = new ProducerInfo(producerId); 337 sendToActiveMQ(producerInfo, new ResponseHandler() { 338 @Override 339 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 340 341 if (response.isException()) { 342 // If the connection attempt fails we close the socket. 343 Throwable exception = ((ExceptionResponse) response).getException(); 344 CONNACK ack = new CONNACK(); 345 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); 346 getMQTTTransport().sendToMQTT(ack.encode()); 347 getMQTTTransport().onException(IOExceptionSupport.create(exception)); 348 return; 349 } 350 351 CONNACK ack = new CONNACK(); 352 ack.code(CONNACK.Code.CONNECTION_ACCEPTED); 353 connected.set(true); 354 getMQTTTransport().sendToMQTT(ack.encode()); 355 356 if (connect.cleanSession()) { 357 packetIdGenerator.stopClientSession(getClientId()); 358 } else { 359 packetIdGenerator.startClientSession(getClientId()); 360 } 361 362 findSubscriptionStrategy().onConnect(connect); 363 } 364 }); 365 } 366 }); 367 } 368 369 void onMQTTDisconnect() throws MQTTProtocolException { 370 if (connected.compareAndSet(true, false)) { 371 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 372 sendToActiveMQ(new ShutdownInfo(), null); 373 } 374 stopTransport(); 375 } 376 377 void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException { 378 checkConnected(); 379 LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}", 380 command.messageId(), clientId, connectionInfo.getConnectionId()); 381 Topic[] topics = command.topics(); 382 if (topics != null) { 383 byte[] qos = new byte[topics.length]; 384 for (int i = 0; i < topics.length; i++) { 385 MQTTProtocolSupport.validate(topics[i].name().toString()); 386 try { 387 qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]); 388 } catch (IOException e) { 389 throw new MQTTProtocolException("Failed to process subscription request", true, e); 390 } 391 } 392 SUBACK ack = new SUBACK(); 393 ack.messageId(command.messageId()); 394 ack.grantedQos(qos); 395 try { 396 getMQTTTransport().sendToMQTT(ack.encode()); 397 } catch (IOException e) { 398 LOG.warn("Couldn't send SUBACK for " + command, e); 399 } 400 } else { 401 LOG.warn("No topics defined for Subscription " + command); 402 throw new MQTTProtocolException("SUBSCRIBE command received with no topic filter"); 403 } 404 } 405 406 public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException { 407 checkConnected(); 408 if (command.qos() != QoS.AT_LEAST_ONCE && (version != V3_1 || publishDollarTopics != true)) { 409 throw new MQTTProtocolException("Failed to process unsubscribe request", true, new Exception("UNSUBSCRIBE frame not properly formatted, QoS")); 410 } 411 UTF8Buffer[] topics = command.topics(); 412 if (topics != null) { 413 for (UTF8Buffer topic : topics) { 414 MQTTProtocolSupport.validate(topic.toString()); 415 try { 416 findSubscriptionStrategy().onUnSubscribe(topic.toString()); 417 } catch (IOException e) { 418 throw new MQTTProtocolException("Failed to process unsubscribe request", true, e); 419 } 420 } 421 UNSUBACK ack = new UNSUBACK(); 422 ack.messageId(command.messageId()); 423 sendToMQTT(ack.encode()); 424 } else { 425 LOG.warn("No topics defined for Subscription " + command); 426 throw new MQTTProtocolException("UNSUBSCRIBE command received with no topic filter"); 427 } 428 } 429 430 /** 431 * Dispatch an ActiveMQ command 432 */ 433 public void onActiveMQCommand(Command command) throws Exception { 434 if (command.isResponse()) { 435 Response response = (Response) command; 436 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 437 if (rh != null) { 438 rh.onResponse(this, response); 439 } else { 440 // Pass down any unexpected errors. Should this close the connection? 441 if (response.isException()) { 442 Throwable exception = ((ExceptionResponse) response).getException(); 443 handleException(exception, null); 444 } 445 } 446 } else if (command.isMessageDispatch()) { 447 MessageDispatch md = (MessageDispatch) command; 448 MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId()); 449 if (sub != null) { 450 MessageAck ack = sub.createMessageAck(md); 451 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage()); 452 switch (publish.qos()) { 453 case AT_LEAST_ONCE: 454 case EXACTLY_ONCE: 455 publish.dup(publish.dup() ? true : md.getMessage().isRedelivered()); 456 case AT_MOST_ONCE: 457 } 458 if (ack != null && sub.expectAck(publish)) { 459 synchronized (consumerAcks) { 460 consumerAcks.put(publish.messageId(), ack); 461 } 462 } 463 LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}", 464 publish.messageId(), clientId, connectionInfo.getConnectionId()); 465 getMQTTTransport().sendToMQTT(publish.encode()); 466 if (ack != null && !sub.expectAck(publish)) { 467 getMQTTTransport().sendToActiveMQ(ack); 468 } 469 } 470 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 471 // Pass down any unexpected async errors. Should this close the connection? 472 Throwable exception = ((ConnectionError) command).getException(); 473 handleException(exception, null); 474 } else if (command.isBrokerInfo()) { 475 //ignore 476 } else { 477 LOG.debug("Do not know how to process ActiveMQ Command {}", command); 478 } 479 } 480 481 void onMQTTPublish(PUBLISH command) throws IOException, JMSException { 482 checkConnected(); 483 LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}", 484 command.messageId(), clientId, connectionInfo.getConnectionId()); 485 //Both version 3.1 and 3.1.1 do not allow the topic name to contain a wildcard in the publish packet 486 if (containsMqttWildcard(command.topicName().toString())) { 487 // [MQTT-3.3.2-2]: The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters 488 getMQTTTransport().onException(IOExceptionSupport.create("The topic name must not contain wildcard characters.", null)); 489 return; 490 } 491 ActiveMQMessage message = convertMessage(command); 492 message.setProducerId(producerId); 493 message.onSend(); 494 sendToActiveMQ(message, createResponseHandler(command)); 495 } 496 497 void onMQTTPubAck(PUBACK command) { 498 short messageId = command.messageId(); 499 LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}", 500 messageId, clientId, connectionInfo.getConnectionId()); 501 packetIdGenerator.ackPacketId(getClientId(), messageId); 502 MessageAck ack; 503 synchronized (consumerAcks) { 504 ack = consumerAcks.remove(messageId); 505 } 506 if (ack != null) { 507 getMQTTTransport().sendToActiveMQ(ack); 508 } 509 } 510 511 void onMQTTPubRec(PUBREC commnand) { 512 //from a subscriber - send a PUBREL in response 513 PUBREL pubrel = new PUBREL(); 514 pubrel.messageId(commnand.messageId()); 515 sendToMQTT(pubrel.encode()); 516 } 517 518 void onMQTTPubRel(PUBREL command) { 519 PUBREC ack; 520 synchronized (publisherRecs) { 521 ack = publisherRecs.remove(command.messageId()); 522 } 523 if (ack == null) { 524 LOG.warn("Unknown PUBREL: {} received", command.messageId()); 525 } 526 PUBCOMP pubcomp = new PUBCOMP(); 527 pubcomp.messageId(command.messageId()); 528 sendToMQTT(pubcomp.encode()); 529 } 530 531 void onMQTTPubComp(PUBCOMP command) { 532 short messageId = command.messageId(); 533 packetIdGenerator.ackPacketId(getClientId(), messageId); 534 MessageAck ack; 535 synchronized (consumerAcks) { 536 ack = consumerAcks.remove(messageId); 537 } 538 if (ack != null) { 539 getMQTTTransport().sendToActiveMQ(ack); 540 } 541 } 542 543 ActiveMQMessage convertMessage(PUBLISH command) throws JMSException { 544 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 545 546 msg.setProducerId(producerId); 547 MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId()); 548 msg.setMessageId(id); 549 LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", 550 command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId()); 551 msg.setTimestamp(System.currentTimeMillis()); 552 msg.setPriority((byte) Message.DEFAULT_PRIORITY); 553 msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain()); 554 msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); 555 if (command.retain()) { 556 msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true); 557 } 558 559 ActiveMQDestination destination; 560 synchronized (activeMQDestinationMap) { 561 destination = activeMQDestinationMap.get(command.topicName()); 562 if (destination == null) { 563 String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString()); 564 try { 565 destination = findSubscriptionStrategy().onSend(topicName); 566 } catch (IOException e) { 567 throw JMSExceptionSupport.create(e); 568 } 569 570 activeMQDestinationMap.put(command.topicName().toString(), destination); 571 } 572 } 573 574 msg.setJMSDestination(destination); 575 msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length); 576 return msg; 577 } 578 579 public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException { 580 PUBLISH result = new PUBLISH(); 581 // packet id is set in MQTTSubscription 582 QoS qoS; 583 if (message.propertyExists(QOS_PROPERTY_NAME)) { 584 int ordinal = message.getIntProperty(QOS_PROPERTY_NAME); 585 qoS = QoS.values()[ordinal]; 586 587 } else { 588 qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE; 589 } 590 result.qos(qoS); 591 if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) { 592 result.retain(true); 593 } 594 595 String topicName; 596 synchronized (mqttTopicMap) { 597 topicName = mqttTopicMap.get(message.getJMSDestination()); 598 if (topicName == null) { 599 String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination()); 600 topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName); 601 mqttTopicMap.put(message.getJMSDestination(), topicName); 602 } 603 } 604 result.topicName(new UTF8Buffer(topicName)); 605 606 if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { 607 ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); 608 msg.setReadOnlyBody(true); 609 String messageText = msg.getText(); 610 if (messageText != null) { 611 result.payload(new Buffer(messageText.getBytes("UTF-8"))); 612 } 613 } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { 614 ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); 615 msg.setReadOnlyBody(true); 616 byte[] data = new byte[(int) msg.getBodyLength()]; 617 msg.readBytes(data); 618 result.payload(new Buffer(data)); 619 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 620 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); 621 msg.setReadOnlyBody(true); 622 Map<String, Object> map = msg.getContentMap(); 623 if (map != null) { 624 result.payload(new Buffer(map.toString().getBytes("UTF-8"))); 625 } 626 } else { 627 ByteSequence byteSequence = message.getContent(); 628 if (byteSequence != null && byteSequence.getLength() > 0) { 629 if (message.isCompressed()) { 630 Inflater inflater = new Inflater(); 631 inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length); 632 byte[] data = new byte[4096]; 633 int read; 634 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 635 while ((read = inflater.inflate(data)) != 0) { 636 bytesOut.write(data, 0, read); 637 } 638 byteSequence = bytesOut.toByteSequence(); 639 bytesOut.close(); 640 } 641 result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length)); 642 } 643 } 644 LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", 645 result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId()); 646 return result; 647 } 648 649 public MQTTTransport getMQTTTransport() { 650 return mqttTransport; 651 } 652 653 boolean willSent = false; 654 public void onTransportError() { 655 if (connect != null) { 656 if (connected.get()) { 657 if (connect.willTopic() != null && connect.willMessage() != null && !willSent) { 658 willSent = true; 659 try { 660 PUBLISH publish = new PUBLISH(); 661 publish.topicName(connect.willTopic()); 662 publish.qos(connect.willQos()); 663 publish.messageId(packetIdGenerator.getNextSequenceId(getClientId())); 664 publish.payload(connect.willMessage()); 665 publish.retain(connect.willRetain()); 666 ActiveMQMessage message = convertMessage(publish); 667 message.setProducerId(producerId); 668 message.onSend(); 669 670 sendToActiveMQ(message, null); 671 } catch (Exception e) { 672 LOG.warn("Failed to publish Will Message " + connect.willMessage()); 673 } 674 } 675 // remove connection info 676 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 677 } 678 } 679 } 680 681 void configureInactivityMonitor(short keepAliveSeconds) { 682 MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); 683 684 // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false, 685 // then ignore configuring it because it won't exist 686 if (monitor == null) { 687 return; 688 } 689 690 // Client has sent a valid CONNECT frame, we can stop the connect checker. 691 monitor.stopConnectChecker(); 692 693 long keepAliveMS = keepAliveSeconds * 1000; 694 695 LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS); 696 697 try { 698 // if we have a default keep-alive value, and the client is trying to turn off keep-alive, 699 700 // we'll observe the server-side configured default value (note, no grace period) 701 if (keepAliveMS == 0 && defaultKeepAlive > 0) { 702 keepAliveMS = defaultKeepAlive; 703 } 704 705 long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD); 706 707 monitor.setProtocolConverter(this); 708 monitor.setReadKeepAliveTime(keepAliveMS); 709 monitor.setReadGraceTime(readGracePeriod); 710 monitor.startReadChecker(); 711 712 LOG.debug("MQTT Client {} established heart beat of {} ms ({} ms + {} ms grace period)", 713 new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod }); 714 } catch (Exception ex) { 715 LOG.warn("Failed to start MQTT InactivityMonitor ", ex); 716 } 717 } 718 719 void handleException(Throwable exception, MQTTFrame command) { 720 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); 721 LOG.debug("Exception detail", exception); 722 723 if (connected.get() && connectionInfo != null) { 724 connected.set(false); 725 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 726 } 727 stopTransport(); 728 } 729 730 void checkConnected() throws MQTTProtocolException { 731 if (!connected.get()) { 732 throw new MQTTProtocolException("Not connected."); 733 } 734 } 735 736 private void stopTransport() { 737 try { 738 getMQTTTransport().stop(); 739 } catch (Throwable e) { 740 LOG.debug("Failed to stop MQTT transport ", e); 741 } 742 } 743 744 ResponseHandler createResponseHandler(final PUBLISH command) { 745 if (command != null) { 746 return new ResponseHandler() { 747 @Override 748 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 749 if (response.isException()) { 750 Throwable error = ((ExceptionResponse) response).getException(); 751 LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage()); 752 LOG.trace("Error trace: {}", error); 753 } 754 755 switch (command.qos()) { 756 case AT_LEAST_ONCE: 757 PUBACK ack = new PUBACK(); 758 ack.messageId(command.messageId()); 759 LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", 760 command.messageId(), clientId, connectionInfo.getConnectionId()); 761 converter.getMQTTTransport().sendToMQTT(ack.encode()); 762 break; 763 case EXACTLY_ONCE: 764 PUBREC req = new PUBREC(); 765 req.messageId(command.messageId()); 766 synchronized (publisherRecs) { 767 publisherRecs.put(command.messageId(), req); 768 } 769 LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}", 770 command.messageId(), clientId, connectionInfo.getConnectionId()); 771 converter.getMQTTTransport().sendToMQTT(req.encode()); 772 break; 773 default: 774 break; 775 } 776 } 777 }; 778 } 779 return null; 780 } 781 782 public long getDefaultKeepAlive() { 783 return defaultKeepAlive; 784 } 785 786 /** 787 * Set the default keep alive time (in milliseconds) that would be used if configured on server side 788 * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame 789 * @param keepAlive the keepAlive in milliseconds 790 */ 791 public void setDefaultKeepAlive(long keepAlive) { 792 this.defaultKeepAlive = keepAlive; 793 } 794 795 public int getActiveMQSubscriptionPrefetch() { 796 return activeMQSubscriptionPrefetch; 797 } 798 799 /** 800 * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one 801 * The default = 1 802 * 803 * @param activeMQSubscriptionPrefetch 804 * set the prefetch for the corresponding ActiveMQ subscription 805 */ 806 public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { 807 this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch; 808 } 809 810 public MQTTPacketIdGenerator getPacketIdGenerator() { 811 return packetIdGenerator; 812 } 813 814 public void setPublishDollarTopics(boolean publishDollarTopics) { 815 this.publishDollarTopics = publishDollarTopics; 816 } 817 818 public boolean getPublishDollarTopics() { 819 return publishDollarTopics; 820 } 821 822 public ConnectionId getConnectionId() { 823 return connectionId; 824 } 825 826 public SessionId getSessionId() { 827 return sessionId; 828 } 829 830 public boolean isCleanSession() { 831 return this.connect.cleanSession(); 832 } 833 834 public String getSubscriptionStrategy() { 835 return subscriptionStrategyName; 836 } 837 838 public void setSubscriptionStrategy(String name) { 839 this.subscriptionStrategyName = name; 840 } 841 842 public String getClientId() { 843 if (clientId == null) { 844 if (connect != null && connect.clientId() != null) { 845 clientId = connect.clientId().toString(); 846 } else { 847 clientId = ""; 848 } 849 } 850 return clientId; 851 } 852 853 protected boolean containsMqttWildcard(String value) { 854 return value != null && (value.contains(SINGLE_LEVEL_WILDCARD) || 855 value.contains(MULTI_LEVEL_WILDCARD)); 856 } 857 858 protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException { 859 if (subsciptionStrategy == null) { 860 synchronized (STRATAGY_FINDER) { 861 if (subsciptionStrategy != null) { 862 return subsciptionStrategy; 863 } 864 865 MQTTSubscriptionStrategy strategy = null; 866 if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) { 867 try { 868 strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName); 869 LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName); 870 if (strategy instanceof BrokerServiceAware) { 871 ((BrokerServiceAware)strategy).setBrokerService(brokerService); 872 } 873 strategy.initialize(this); 874 } catch (Exception e) { 875 throw IOExceptionSupport.create(e); 876 } 877 } else { 878 throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName); 879 } 880 881 this.subsciptionStrategy = strategy; 882 } 883 } 884 return subsciptionStrategy; 885 } 886}