001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.List;
022
023import org.apache.activemq.broker.region.MessageReference;
024import org.apache.activemq.broker.region.Subscription;
025import org.apache.activemq.command.ConsumerId;
026import org.apache.activemq.command.ConsumerInfo;
027import org.apache.activemq.filter.MessageEvaluationContext;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * dispatch policy that ignores lower priority duplicate network consumers,
033 * used in conjunction with network bridge suppresDuplicateTopicSubscriptions
034 * 
035 * @org.apache.xbean.XBean
036 */
037public class PriorityNetworkDispatchPolicy extends SimpleDispatchPolicy {
038
039    private static final Logger LOG = LoggerFactory.getLogger(PriorityNetworkDispatchPolicy.class);
040    @Override
041    public boolean dispatch(MessageReference node,
042            MessageEvaluationContext msgContext,
043            List<Subscription> consumers) throws Exception {
044        
045        List<Subscription> duplicateFreeSubs = new ArrayList<Subscription>();
046        synchronized (consumers) {
047            for (Subscription sub: consumers) {
048                ConsumerInfo info = sub.getConsumerInfo();
049                if (info.isNetworkSubscription()) {    
050                    boolean highestPrioritySub = true;
051                    for (Iterator<Subscription> it =  duplicateFreeSubs.iterator(); it.hasNext(); ) {
052                        Subscription candidate = it.next();
053                        if (matches(candidate, info)) {
054                            if (hasLowerPriority(candidate, info)) {
055                                it.remove();
056                            } else {
057                                // higher priority matching sub exists
058                                highestPrioritySub = false;
059                                if (LOG.isDebugEnabled()) {
060                                LOG.debug("ignoring lower priority: " + candidate 
061                                        + "[" +candidate.getConsumerInfo().getNetworkConsumerIds() +", "
062                                        + candidate.getConsumerInfo().getNetworkConsumerIds() +"] in favour of: " 
063                                        + sub
064                                        + "[" +sub.getConsumerInfo().getNetworkConsumerIds() +", "
065                                        + sub.getConsumerInfo().getNetworkConsumerIds() +"]");
066                                }
067                            }
068                        }
069                    }
070                    if (highestPrioritySub) {
071                        duplicateFreeSubs.add(sub);
072                    } 
073                } else {
074                    duplicateFreeSubs.add(sub);
075                }
076            }
077        }
078        
079        return super.dispatch(node, msgContext, duplicateFreeSubs);
080    }
081
082    private boolean hasLowerPriority(Subscription candidate,
083            ConsumerInfo info) {
084       return candidate.getConsumerInfo().getPriority() < info.getPriority();
085    }
086
087    private boolean matches(Subscription candidate, ConsumerInfo info) {
088        boolean matched = false;
089        for (ConsumerId candidateId: candidate.getConsumerInfo().getNetworkConsumerIds()) {
090            for (ConsumerId subId: info.getNetworkConsumerIds()) {
091                if (candidateId.equals(subId)) {
092                    matched = true;
093                    break;
094                }
095            }
096        }
097        return matched;
098    }
099
100}