001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.advisory;
018
019import java.util.ArrayList;
020
021import javax.jms.Destination;
022import javax.jms.JMSException;
023import org.apache.activemq.ActiveMQMessageTransformation;
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.command.ActiveMQTopic;
026
027public final class AdvisorySupport {
028    public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
029    public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
030            + "Connection");
031    public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
032    public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
033    public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
034    public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
035    public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
036    public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
037    public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
038    public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
039    public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
040    public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
041    public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
042    public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
043    public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
044    public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
045    public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
046    public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastProducer.";
047    public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
048    public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
049    public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
050    public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
051    public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
052    public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
053    public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
054    public static final String AGENT_TOPIC = "ActiveMQ.Agent";
055    public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
056    public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
057    public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName";
058    public static final String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL";
059    public static final String MSG_PROPERTY_USAGE_NAME = "usageName";
060    public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
061    public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
062    public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
063    public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
064    public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
065
066    public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
067            TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
068                    TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
069    public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
070            TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
071    private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
072
073    private AdvisorySupport() {
074    }
075
076    public static ActiveMQTopic getConnectionAdvisoryTopic() {
077        return CONNECTION_ADVISORY_TOPIC;
078    }
079
080    public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(Destination destination) throws JMSException {
081        return getAllDestinationAdvisoryTopics(ActiveMQMessageTransformation.transformDestination(destination));
082    }
083
084    public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(ActiveMQDestination destination) throws JMSException {
085        ArrayList<ActiveMQTopic> result = new ArrayList<ActiveMQTopic>();
086
087        result.add(getConsumerAdvisoryTopic(destination));
088        result.add(getProducerAdvisoryTopic(destination));
089        result.add(getExpiredMessageTopic(destination));
090        result.add(getNoConsumersAdvisoryTopic(destination));
091        result.add(getSlowConsumerAdvisoryTopic(destination));
092        result.add(getFastProducerAdvisoryTopic(destination));
093        result.add(getMessageDiscardedAdvisoryTopic(destination));
094        result.add(getMessageDeliveredAdvisoryTopic(destination));
095        result.add(getMessageConsumedAdvisoryTopic(destination));
096        result.add(getMessageDLQdAdvisoryTopic(destination));
097        result.add(getFullAdvisoryTopic(destination));
098
099        return result.toArray(new ActiveMQTopic[0]);
100    }
101
102    public static ActiveMQTopic getConsumerAdvisoryTopic(Destination destination) throws JMSException {
103        return getConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
104    }
105
106    public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
107        if (destination.isQueue()) {
108            return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
109        } else {
110            return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
111        }
112    }
113
114    public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException {
115        return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
116    }
117
118    public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
119        if (destination.isQueue()) {
120            return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
121        } else {
122            return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
123        }
124    }
125
126    public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {
127        return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
128    }
129
130    public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
131        if (destination.isQueue()) {
132            return getExpiredQueueMessageAdvisoryTopic(destination);
133        }
134        return getExpiredTopicMessageAdvisoryTopic(destination);
135    }
136
137    public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
138        String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
139        return new ActiveMQTopic(name);
140    }
141
142    public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(Destination destination) throws JMSException {
143        return getExpiredQueueMessageAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
144    }
145
146    public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
147        String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
148        return new ActiveMQTopic(name);
149    }
150
151    public static ActiveMQTopic getNoConsumersAdvisoryTopic(Destination destination) throws JMSException {
152        return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
153    }
154
155    public static ActiveMQTopic getNoConsumersAdvisoryTopic(ActiveMQDestination destination) {
156        if (destination.isQueue()) {
157            return getNoQueueConsumersAdvisoryTopic(destination);
158        }
159        return getNoTopicConsumersAdvisoryTopic(destination);
160    }
161
162    public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(Destination destination) throws JMSException {
163        return getNoTopicConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
164    }
165
166    public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
167        String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
168        return new ActiveMQTopic(name);
169    }
170
171    public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(Destination destination) throws JMSException {
172        return getNoQueueConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
173    }
174
175    public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
176        String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
177        return new ActiveMQTopic(name);
178    }
179
180    public static ActiveMQTopic getSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
181        return getSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
182    }
183
184    public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
185        String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
186                + destination.getPhysicalName();
187        return new ActiveMQTopic(name);
188    }
189
190    public static ActiveMQTopic getFastProducerAdvisoryTopic(Destination destination) throws JMSException {
191        return getFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
192    }
193
194    public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
195        String name = FAST_PRODUCER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
196                + destination.getPhysicalName();
197        return new ActiveMQTopic(name);
198    }
199
200    public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
201        return getMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
202    }
203
204    public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
205        String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
206                + destination.getPhysicalName();
207        return new ActiveMQTopic(name);
208    }
209
210    public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
211        return getMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
212    }
213
214    public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
215        String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
216                + destination.getPhysicalName();
217        return new ActiveMQTopic(name);
218    }
219
220    public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
221        return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
222    }
223
224    public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
225        String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
226                + destination.getPhysicalName();
227        return new ActiveMQTopic(name);
228    }
229
230    public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
231        String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
232                + destination.getPhysicalName();
233        return new ActiveMQTopic(name);
234    }
235
236    public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
237        return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
238    }
239
240    public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() {
241        return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX);
242    }
243
244    public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException {
245        return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
246    }
247
248    public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
249        String name = FULL_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
250                + destination.getPhysicalName();
251        return new ActiveMQTopic(name);
252    }
253
254    public static ActiveMQTopic getDestinationAdvisoryTopic(Destination destination) throws JMSException {
255        return getDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
256    }
257
258    public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
259        switch (destination.getDestinationType()) {
260            case ActiveMQDestination.QUEUE_TYPE:
261                return QUEUE_ADVISORY_TOPIC;
262            case ActiveMQDestination.TOPIC_TYPE:
263                return TOPIC_ADVISORY_TOPIC;
264            case ActiveMQDestination.TEMP_QUEUE_TYPE:
265                return TEMP_QUEUE_ADVISORY_TOPIC;
266            case ActiveMQDestination.TEMP_TOPIC_TYPE:
267                return TEMP_TOPIC_ADVISORY_TOPIC;
268            default:
269                throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
270        }
271    }
272
273    public static boolean isDestinationAdvisoryTopic(Destination destination) throws JMSException {
274        return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
275    }
276
277    public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination destination) {
278        if (destination.isComposite()) {
279            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
280            for (int i = 0; i < compositeDestinations.length; i++) {
281                if (!isTempDestinationAdvisoryTopic(compositeDestinations[i])) {
282                    return false;
283                }
284            }
285            return true;
286        } else {
287            return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC);
288        }
289    }
290
291    public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
292        if (destination.isComposite()) {
293            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
294            for (int i = 0; i < compositeDestinations.length; i++) {
295                if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
296                    return true;
297                }
298            }
299            return false;
300        } else {
301            return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC)
302                    || destination.equals(QUEUE_ADVISORY_TOPIC) || destination.equals(TOPIC_ADVISORY_TOPIC);
303        }
304    }
305
306    public static boolean isAdvisoryTopic(Destination destination) throws JMSException {
307        return isAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
308    }
309
310    public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
311        if (destination != null) {
312            if (destination.isComposite()) {
313                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
314                for (int i = 0; i < compositeDestinations.length; i++) {
315                    if (isAdvisoryTopic(compositeDestinations[i])) {
316                        return true;
317                    }
318                }
319                return false;
320            } else {
321                return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
322            }
323        }
324        return false;
325    }
326
327    public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException {
328        return isConnectionAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
329    }
330
331    public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
332        if (destination.isComposite()) {
333            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
334            for (int i = 0; i < compositeDestinations.length; i++) {
335                if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
336                    return true;
337                }
338            }
339            return false;
340        } else {
341            return destination.equals(CONNECTION_ADVISORY_TOPIC);
342        }
343    }
344
345    public static boolean isProducerAdvisoryTopic(Destination destination) throws JMSException {
346        return isProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
347    }
348
349    public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
350        if (destination.isComposite()) {
351            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
352            for (int i = 0; i < compositeDestinations.length; i++) {
353                if (isProducerAdvisoryTopic(compositeDestinations[i])) {
354                    return true;
355                }
356            }
357            return false;
358        } else {
359            return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
360        }
361    }
362
363    public static boolean isConsumerAdvisoryTopic(Destination destination) throws JMSException {
364        return isConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
365    }
366
367    public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
368        if (destination.isComposite()) {
369            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
370            for (int i = 0; i < compositeDestinations.length; i++) {
371                if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
372                    return true;
373                }
374            }
375            return false;
376        } else {
377            return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
378        }
379    }
380
381    public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
382        return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
383    }
384
385    public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
386        if (destination.isComposite()) {
387            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
388            for (int i = 0; i < compositeDestinations.length; i++) {
389                if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
390                    return true;
391                }
392            }
393            return false;
394        } else {
395            return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
396        }
397    }
398
399    public static boolean isFastProducerAdvisoryTopic(Destination destination) throws JMSException {
400        return isFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
401    }
402
403    public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
404        if (destination.isComposite()) {
405            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
406            for (int i = 0; i < compositeDestinations.length; i++) {
407                if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
408                    return true;
409                }
410            }
411            return false;
412        } else {
413            return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
414        }
415    }
416
417    public static boolean isMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
418        return isMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
419    }
420
421    public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
422        if (destination.isComposite()) {
423            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
424            for (int i = 0; i < compositeDestinations.length; i++) {
425                if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
426                    return true;
427                }
428            }
429            return false;
430        } else {
431            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
432        }
433    }
434
435    public static boolean isMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
436        return isMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
437    }
438
439    public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
440        if (destination.isComposite()) {
441            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
442            for (int i = 0; i < compositeDestinations.length; i++) {
443                if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
444                    return true;
445                }
446            }
447            return false;
448        } else {
449            return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
450        }
451    }
452
453    public static boolean isMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
454        return isMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
455    }
456
457    public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
458        if (destination.isComposite()) {
459            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
460            for (int i = 0; i < compositeDestinations.length; i++) {
461                if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
462                    return true;
463                }
464            }
465            return false;
466        } else {
467            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
468        }
469    }
470
471    public static boolean isMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
472        return isMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
473    }
474
475    public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
476        if (destination.isComposite()) {
477            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
478            for (int i = 0; i < compositeDestinations.length; i++) {
479                if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
480                    return true;
481                }
482            }
483            return false;
484        } else {
485            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
486        }
487    }
488
489    public static boolean isFullAdvisoryTopic(Destination destination) throws JMSException {
490        return isFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
491    }
492
493    public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
494        if (destination.isComposite()) {
495            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
496            for (int i = 0; i < compositeDestinations.length; i++) {
497                if (isFullAdvisoryTopic(compositeDestinations[i])) {
498                    return true;
499                }
500            }
501            return false;
502        } else {
503            return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
504        }
505    }
506
507    public static boolean isNetworkBridgeAdvisoryTopic(Destination destination) throws JMSException {
508        return isNetworkBridgeAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
509    }
510
511    public static boolean isNetworkBridgeAdvisoryTopic(ActiveMQDestination destination) {
512        if (destination.isComposite()) {
513            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
514            for (int i = 0; i < compositeDestinations.length; i++) {
515                if (isNetworkBridgeAdvisoryTopic(compositeDestinations[i])) {
516                    return true;
517                }
518            }
519            return false;
520        } else {
521            return destination.isTopic() && destination.getPhysicalName().startsWith(NETWORK_BRIDGE_TOPIC_PREFIX);
522        }
523    }
524
525    /**
526     * Returns the agent topic which is used to send commands to the broker
527     */
528    public static Destination getAgentDestination() {
529        return AGENT_TOPIC_DESTINATION;
530    }
531}