001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import org.apache.activemq.command.ConsumerId; 020import org.apache.activemq.command.Message; 021import org.apache.activemq.command.MessageId; 022 023/** 024 * Keeps track of a message that is flowing through the Broker. This object may 025 * hold a hard reference to the message or only hold the id of the message if 026 * the message has been persisted on in a MessageStore. 027 * 028 * 029 */ 030public class IndirectMessageReference implements QueueMessageReference { 031 032 /** The subscription that has locked the message */ 033 private LockOwner lockOwner; 034 /** Has the message been dropped? */ 035 private boolean dropped; 036 /** Has the message been acked? */ 037 private boolean acked; 038 /** Direct reference to the message */ 039 private final Message message; 040 private final MessageId messageId; 041 042 /** 043 * @param message 044 */ 045 public IndirectMessageReference(final Message message) { 046 this.message = message; 047 this.messageId = message.getMessageId().copy(); 048 message.getMessageId(); 049 message.getGroupID(); 050 message.getGroupSequence(); 051 } 052 053 @Override 054 public Message getMessageHardRef() { 055 return message; 056 } 057 058 @Override 059 public int getReferenceCount() { 060 return message.getReferenceCount(); 061 } 062 063 @Override 064 public int incrementReferenceCount() { 065 return message.incrementReferenceCount(); 066 } 067 068 @Override 069 public int decrementReferenceCount() { 070 return message.decrementReferenceCount(); 071 } 072 073 @Override 074 public Message getMessage() { 075 return message; 076 } 077 078 @Override 079 public String toString() { 080 return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null); 081 } 082 083 @Override 084 public void incrementRedeliveryCounter() { 085 message.incrementRedeliveryCounter(); 086 } 087 088 @Override 089 public synchronized boolean isDropped() { 090 return dropped; 091 } 092 093 @Override 094 public synchronized void drop() { 095 dropped = true; 096 lockOwner = null; 097 message.decrementReferenceCount(); 098 } 099 100 /** 101 * Check if the message has already been dropped before 102 * dropping. Return true if dropped, else false. 103 * This method exists so that this can be done atomically 104 * under the intrinisic lock 105 */ 106 @Override 107 public synchronized boolean dropIfLive() { 108 if (isDropped()) { 109 return false; 110 } else { 111 drop(); 112 return true; 113 } 114 } 115 116 @Override 117 public boolean lock(LockOwner subscription) { 118 synchronized (this) { 119 if (dropped || lockOwner != null) { 120 return false; 121 } 122 lockOwner = subscription; 123 return true; 124 } 125 } 126 127 @Override 128 public synchronized boolean unlock() { 129 boolean result = lockOwner != null; 130 lockOwner = null; 131 return result; 132 } 133 134 @Override 135 public synchronized LockOwner getLockOwner() { 136 return lockOwner; 137 } 138 139 @Override 140 public int getRedeliveryCounter() { 141 return message.getRedeliveryCounter(); 142 } 143 144 @Override 145 public MessageId getMessageId() { 146 return messageId; 147 } 148 149 @Override 150 public Message.MessageDestination getRegionDestination() { 151 return message.getRegionDestination(); 152 } 153 154 @Override 155 public boolean isPersistent() { 156 return message.isPersistent(); 157 } 158 159 public synchronized boolean isLocked() { 160 return lockOwner != null; 161 } 162 163 @Override 164 public synchronized boolean isAcked() { 165 return acked; 166 } 167 168 @Override 169 public synchronized void setAcked(boolean b) { 170 acked = b; 171 } 172 173 @Override 174 public String getGroupID() { 175 return message.getGroupID(); 176 } 177 178 @Override 179 public int getGroupSequence() { 180 return message.getGroupSequence(); 181 } 182 183 @Override 184 public ConsumerId getTargetConsumerId() { 185 return message.getTargetConsumerId(); 186 } 187 188 @Override 189 public long getExpiration() { 190 return message.getExpiration(); 191 } 192 193 @Override 194 public boolean isExpired() { 195 return message.isExpired(); 196 } 197 198 @Override 199 public synchronized int getSize() { 200 return message.getSize(); 201 } 202 203 @Override 204 public boolean isAdvisory() { 205 return message.isAdvisory(); 206 } 207 208 @Override 209 public boolean canProcessAsExpired() { 210 return message.canProcessAsExpired(); 211 } 212}