001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.List; 023 024import org.apache.activemq.broker.region.MessageReference; 025import org.apache.activemq.command.MessageId; 026 027/** 028 * An abstraction that keeps the correct order of messages that need to be dispatched 029 * to consumers, but also hides the fact that there might be redelivered messages that 030 * should be dispatched ahead of any other paged in messages. 031 * 032 * Direct usage of this class is recommended as you can control when redeliveries need 033 * to be added vs regular pending messages (the next set of messages that can be dispatched) 034 * 035 * Created by ceposta 036 * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>. 037 */ 038public class QueueDispatchPendingList implements PendingList { 039 040 private PendingList pagedInPendingDispatch = new OrderedPendingList(); 041 private PendingList redeliveredWaitingDispatch = new OrderedPendingList(); 042 private boolean prioritized = false; 043 044 045 @Override 046 public boolean isEmpty() { 047 return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty(); 048 } 049 050 @Override 051 public void clear() { 052 pagedInPendingDispatch.clear(); 053 redeliveredWaitingDispatch.clear(); 054 } 055 056 /** 057 * Messages added are added directly to the pagedInPendingDispatch set of messages. If 058 * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() 059 * method 060 * @param message 061 * The MessageReference that is to be added to this list. 062 * 063 * @return the pending node. 064 */ 065 @Override 066 public PendingNode addMessageFirst(MessageReference message) { 067 return pagedInPendingDispatch.addMessageFirst(message); 068 } 069 070 /** 071 * Messages added are added directly to the pagedInPendingDispatch set of messages. If 072 * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() 073 * method 074 * @param message 075 * The MessageReference that is to be added to this list. 076 * 077 * @return the pending node. 078 */ 079 @Override 080 public PendingNode addMessageLast(MessageReference message) { 081 return pagedInPendingDispatch.addMessageLast(message); 082 } 083 084 @Override 085 public PendingNode remove(MessageReference message) { 086 if (pagedInPendingDispatch.contains(message)) { 087 return pagedInPendingDispatch.remove(message); 088 } else if (redeliveredWaitingDispatch.contains(message)) { 089 return redeliveredWaitingDispatch.remove(message); 090 } 091 return null; 092 } 093 094 @Override 095 public int size() { 096 return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size(); 097 } 098 099 @Override 100 public long messageSize() { 101 return pagedInPendingDispatch.messageSize() + redeliveredWaitingDispatch.messageSize(); 102 } 103 104 @Override 105 public Iterator<MessageReference> iterator() { 106 if (prioritized && hasRedeliveries()) { 107 final QueueDispatchPendingList delegate = this; 108 final PrioritizedPendingList priorityOrderedRedeliveredAndPending = new PrioritizedPendingList(); 109 priorityOrderedRedeliveredAndPending.addAll(redeliveredWaitingDispatch); 110 priorityOrderedRedeliveredAndPending.addAll(pagedInPendingDispatch); 111 112 return new Iterator<MessageReference>() { 113 114 Iterator<MessageReference> combinedIterator = priorityOrderedRedeliveredAndPending.iterator(); 115 MessageReference current = null; 116 117 @Override 118 public boolean hasNext() { 119 return combinedIterator.hasNext(); 120 } 121 122 @Override 123 public MessageReference next() { 124 current = combinedIterator.next(); 125 return current; 126 } 127 128 @Override 129 public void remove() { 130 if (current!=null) { 131 delegate.remove(current); 132 } 133 } 134 }; 135 136 } else { 137 138 return new Iterator<MessageReference>() { 139 140 Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator(); 141 Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator(); 142 Iterator<MessageReference> current = redeliveries; 143 144 145 @Override 146 public boolean hasNext() { 147 if (!redeliveries.hasNext() && (current == redeliveries)) { 148 current = pendingDispatch; 149 } 150 return current.hasNext(); 151 } 152 153 @Override 154 public MessageReference next() { 155 return current.next(); 156 } 157 158 @Override 159 public void remove() { 160 current.remove(); 161 } 162 }; 163 } 164 } 165 166 @Override 167 public boolean contains(MessageReference message) { 168 return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message); 169 } 170 171 @Override 172 public Collection<MessageReference> values() { 173 List<MessageReference> messageReferences = new ArrayList<MessageReference>(); 174 Iterator<MessageReference> iterator = iterator(); 175 while (iterator.hasNext()) { 176 messageReferences.add(iterator.next()); 177 } 178 return messageReferences; 179 } 180 181 @Override 182 public void addAll(PendingList pendingList) { 183 pagedInPendingDispatch.addAll(pendingList); 184 } 185 186 @Override 187 public MessageReference get(MessageId messageId) { 188 MessageReference rc = pagedInPendingDispatch.get(messageId); 189 if (rc == null) { 190 return redeliveredWaitingDispatch.get(messageId); 191 } 192 return rc; 193 } 194 195 public void setPrioritizedMessages(boolean prioritizedMessages) { 196 prioritized = prioritizedMessages; 197 if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { 198 pagedInPendingDispatch = new PrioritizedPendingList(); 199 redeliveredWaitingDispatch = new PrioritizedPendingList(); 200 } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) { 201 pagedInPendingDispatch = new OrderedPendingList(); 202 redeliveredWaitingDispatch = new OrderedPendingList(); 203 } 204 } 205 206 public boolean hasRedeliveries(){ 207 return !redeliveredWaitingDispatch.isEmpty(); 208 } 209 210 public void addForRedelivery(List<MessageReference> list, boolean noConsumers) { 211 if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList && willBeInOrder(list)) { 212 // a single consumer can expect repeatable redelivery order irrespective 213 // of transaction or prefetch boundaries 214 ((OrderedPendingList)redeliveredWaitingDispatch).insertAtHead(list); 215 } else { 216 for (MessageReference ref : list) { 217 redeliveredWaitingDispatch.addMessageLast(ref); 218 } 219 } 220 } 221 222 private boolean willBeInOrder(List<MessageReference> list) { 223 // for a single consumer inserting at head will be in order w.r.t brokerSequence but 224 // will not be if there were multiple consumers in the mix even if this is the last 225 // consumer to close (noConsumers==true) 226 return !redeliveredWaitingDispatch.isEmpty() && list != null && !list.isEmpty() && 227 redeliveredWaitingDispatch.iterator().next().getMessageId().getBrokerSequenceId() > list.get(list.size() - 1).getMessageId().getBrokerSequenceId(); 228 } 229}