001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.io.IOException;
020import java.util.LinkedList;
021import java.util.List;
022import org.apache.activemq.ActiveMQMessageAudit;
023import org.apache.activemq.Service;
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.broker.region.Destination;
026import org.apache.activemq.broker.region.MessageReference;
027import org.apache.activemq.command.MessageId;
028import org.apache.activemq.usage.SystemUsage;
029
030/**
031 * Interface to pending message (messages awaiting disptach to a consumer)
032 * cursor
033 * 
034 * 
035 */
036public interface PendingMessageCursor extends Service {
037
038    /**
039     * Add a destination
040     * 
041     * @param context
042     * @param destination
043     * @throws Exception
044     */
045    void add(ConnectionContext context, Destination destination) throws Exception;
046
047    /**
048     * remove a destination
049     * 
050     * @param context
051     * @param destination
052     * @throws Exception
053     */
054    List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception;
055
056    /**
057     * @return true if there are no pending messages
058     */
059    boolean isEmpty();
060
061    /**
062     * check if a Destination is Empty for this cursor
063     * 
064     * @param destination
065     * @return true id the Destination is empty
066     */
067    boolean isEmpty(Destination destination);
068
069    /**
070     * reset the cursor
071     */
072    void reset();
073
074    /**
075     * hint to the cursor to release any locks it might have grabbed after a
076     * reset
077     */
078    void release();
079
080    /**
081     * add message to await dispatch
082     * 
083     * @param node
084     * @throws IOException
085     * @throws Exception
086     */
087    void addMessageLast(MessageReference node) throws Exception;
088    /**
089     * add message to await dispatch - if it can
090     * 
091     * @param node
092     * @param maxWaitTime 
093     * @return true if successful
094     * @throws IOException
095     * @throws Exception
096     */
097    boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception;
098
099    /**
100     * add message to await dispatch
101     * 
102     * @param node
103     * @throws Exception
104     */
105    void addMessageFirst(MessageReference node) throws Exception;
106
107    /**
108     * Add a message recovered from a retroactive policy
109     * 
110     * @param node
111     * @throws Exception
112     */
113    void addRecoveredMessage(MessageReference node) throws Exception;
114
115    /**
116     * @return true if there pending messages to dispatch
117     */
118    boolean hasNext();
119
120    /**
121     * @return the next pending message with its reference count increment
122     */
123    MessageReference next();
124
125    /**
126     * remove the message at the cursor position
127     */
128    void remove();
129
130    /**
131     * @return the number of pending messages
132     */
133    int size();
134
135    /**
136     * clear all pending messages
137     */
138    void clear();
139
140    /**
141     * Informs the Broker if the subscription needs to intervention to recover
142     * it's state e.g. DurableTopicSubscriber may do
143     * 
144     * @return true if recovery required
145     */
146    boolean isRecoveryRequired();
147
148    /**
149     * @return the maximum batch size
150     */
151    int getMaxBatchSize();
152
153    /**
154     * Set the max batch size
155     * 
156     * @param maxBatchSize
157     */
158    void setMaxBatchSize(int maxBatchSize);
159
160    /**
161     * Give the cursor a hint that we are about to remove messages from memory
162     * only
163     */
164    void resetForGC();
165
166    /**
167     * remove a node
168     * 
169     * @param node
170     */
171    void remove(MessageReference node);
172
173    /**
174     * free up any internal buffers
175     */
176    void gc();
177
178    /**
179     * Set the UsageManager
180     * 
181     * @param systemUsage
182     * @see org.apache.activemq.usage.SystemUsage
183     */
184    void setSystemUsage(SystemUsage systemUsage);
185
186    /**
187     * @return the usageManager
188     */
189    SystemUsage getSystemUsage();
190
191    /**
192     * @return the memoryUsageHighWaterMark
193     */
194    int getMemoryUsageHighWaterMark();
195
196    /**
197     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
198     */
199    void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark);
200
201    /**
202     * @return true if the cursor is full
203     */
204    boolean isFull();
205    
206    /**
207     * @return true if the cursor has space to page messages into
208     */
209    public boolean hasSpace();
210
211    /**
212     * @return true if the cursor has buffered messages ready to deliver
213     */
214    boolean hasMessagesBufferedToDeliver();
215
216    /**
217     * destroy the cursor
218     * 
219     * @throws Exception
220     */
221    void destroy() throws Exception;
222
223    /**
224     * Page in a restricted number of messages and increment the reference count
225     * 
226     * @param maxItems
227     * @return a list of paged in messages
228     */
229    LinkedList<MessageReference> pageInList(int maxItems);
230    
231    /**
232     * set the maximum number of producers to track at one time
233     * @param value
234     */
235    void setMaxProducersToAudit(int value);
236    
237    /**
238     * @return the maximum number of producers to audit
239     */
240    int getMaxProducersToAudit();
241    
242    /**
243     * Set the maximum depth of message ids to track
244     * @param depth 
245     */
246    void setMaxAuditDepth(int depth);
247    
248    /**
249     * @return the audit depth
250     */
251    int getMaxAuditDepth();
252    
253    /**
254     * @return the enableAudit
255     */
256    public boolean isEnableAudit();
257    /**
258     * @param enableAudit the enableAudit to set
259     */
260    public void setEnableAudit(boolean enableAudit);
261    
262    /**
263     * @return true if the underlying state of this cursor 
264     * disappears when the broker shuts down
265     */
266    public boolean isTransient();
267    
268    
269    /**
270     * set the audit
271     * @param audit
272     */
273    public void setMessageAudit(ActiveMQMessageAudit audit);
274    
275    
276    /**
277     * @return the audit - could be null
278     */
279    public ActiveMQMessageAudit getMessageAudit();
280    
281    /**
282     * use a cache to improve performance
283     * @param useCache
284     */
285    public void setUseCache(boolean useCache);
286    
287    /**
288     * @return true if a cache may be used
289     */
290    public boolean isUseCache();
291    
292    /**
293     * remove from auditing the message id
294     * @param id
295     */
296    public void rollback(MessageId id);
297
298    /**
299     * @return true if cache is being used
300     */
301    public boolean isCacheEnabled();
302   
303}