001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadaptor;
018
019import java.io.IOException;
020import java.util.HashSet;
021import java.util.Iterator;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.Map.Entry;
026import java.util.concurrent.ConcurrentHashMap;
027
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.command.MessageAck;
032import org.apache.activemq.command.MessageId;
033import org.apache.activemq.command.SubscriptionInfo;
034import org.apache.activemq.kaha.ListContainer;
035import org.apache.activemq.kaha.MapContainer;
036import org.apache.activemq.kaha.Marshaller;
037import org.apache.activemq.kaha.Store;
038import org.apache.activemq.kaha.StoreEntry;
039import org.apache.activemq.store.MessageRecoveryListener;
040import org.apache.activemq.store.TopicReferenceStore;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
045    private static final Logger LOG = LoggerFactory.getLogger(KahaTopicReferenceStore.class);
046    protected ListContainer<TopicSubAck> ackContainer;
047    protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
048    private MapContainer<String, SubscriptionInfo> subscriberContainer;
049    private Store store;
050    private static final String TOPIC_SUB_NAME = "tsn";
051
052    public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
053                                   MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer,
054                                   MapContainer<String, SubscriptionInfo> subsContainer, ActiveMQDestination destination)
055        throws IOException {
056        super(adapter, messageContainer, destination);
057        this.store = store;
058        this.ackContainer = ackContainer;
059        subscriberContainer = subsContainer;
060        // load all the Ack containers
061        for (Iterator<SubscriptionInfo> i = subscriberContainer.values().iterator(); i.hasNext();) {
062            SubscriptionInfo info = i.next();
063            addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
064        }
065    }
066
067    public void dispose(ConnectionContext context) {
068        super.dispose(context);
069        subscriberContainer.delete();
070    }
071
072    protected MessageId getMessageId(Object object) {
073        return new MessageId(((ReferenceRecord)object).getMessageId());
074    }
075
076    public void addMessage(ConnectionContext context, Message message) throws IOException {
077        throw new RuntimeException("Use addMessageReference instead");
078    }
079
080    public Message getMessage(MessageId identity) throws IOException {
081        throw new RuntimeException("Use addMessageReference instead");
082    }
083
084    public boolean addMessageReference(final ConnectionContext context, final MessageId messageId,
085                                    final ReferenceData data) {
086        boolean uniqueReferenceAdded = false;
087        lock.lock();
088        try {
089            final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
090            final int subscriberCount = subscriberMessages.size();
091            if (subscriberCount > 0 && !isDuplicate(messageId)) {
092                final StoreEntry messageEntry = messageContainer.place(messageId, record);
093                addInterest(record);
094                uniqueReferenceAdded = true;
095                final TopicSubAck tsa = new TopicSubAck();
096                tsa.setCount(subscriberCount);
097                tsa.setMessageEntry(messageEntry);
098                final StoreEntry ackEntry = ackContainer.placeLast(tsa);
099                for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
100                    final TopicSubContainer container = i.next();
101                    final ConsumerMessageRef ref = new ConsumerMessageRef();
102                    ref.setAckEntry(ackEntry);
103                    ref.setMessageEntry(messageEntry);
104                    ref.setMessageId(messageId);
105                    container.add(ref);
106                }
107                if (LOG.isTraceEnabled()) {
108                    LOG.trace(destination.getPhysicalName() + " add reference: " + messageId);
109                }
110            } else {
111                if (LOG.isTraceEnabled()) {
112                    LOG.trace("no subscribers or duplicate add for: "  + messageId);
113                }
114            }
115        } finally {
116            lock.unlock();
117        }
118        return uniqueReferenceAdded;
119    }
120
121    public ReferenceData getMessageReference(final MessageId identity) throws IOException {
122        final ReferenceRecord result = messageContainer.get(identity);
123        if (result == null) {
124            return null;
125        }
126        return result.getData();
127    }
128
129    public void addReferenceFileIdsInUse() {
130        for (StoreEntry entry = ackContainer.getFirst(); entry != null; entry = ackContainer.getNext(entry)) {
131            TopicSubAck subAck = ackContainer.get(entry);
132            if (subAck.getCount() > 0) {
133                ReferenceRecord rr = messageContainer.getValue(subAck.getMessageEntry());
134                addInterest(rr);
135            }
136        }
137    }
138
139    
140    protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
141        String containerName = getSubscriptionContainerName(getSubscriptionKey(clientId, subscriptionName));
142        MapContainer container = store.getMapContainer(containerName,containerName);
143        container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
144        Marshaller marshaller = new ConsumerMessageRefMarshaller();
145        container.setValueMarshaller(marshaller);
146        TopicSubContainer tsc = new TopicSubContainer(container);
147        subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc);
148        return container;
149    }
150
151    public boolean acknowledgeReference(ConnectionContext context,
152            String clientId, String subscriptionName, MessageId messageId)
153            throws IOException {
154        boolean removeMessage = false;
155        lock.lock();
156            try {
157            String key = getSubscriptionKey(clientId, subscriptionName);
158    
159            TopicSubContainer container = subscriberMessages.get(key);
160            if (container != null) {
161                ConsumerMessageRef ref = null;
162                if((ref = container.remove(messageId)) != null) {
163                    StoreEntry entry = ref.getAckEntry();
164                    //ensure we get up to-date pointers
165                    entry = ackContainer.refresh(entry);
166                    TopicSubAck tsa = ackContainer.get(entry);
167                    if (tsa != null) {
168                        if (tsa.decrementCount() <= 0) {
169                            ackContainer.remove(entry);
170                            ReferenceRecord rr = messageContainer.get(messageId);
171                            if (rr != null) {
172                                entry = tsa.getMessageEntry();
173                                entry = messageContainer.refresh(entry);
174                                messageContainer.remove(entry);
175                                removeInterest(rr);
176                                removeMessage = true;
177                                dispatchAudit.isDuplicate(messageId);
178                            }
179                        }else {
180                            ackContainer.update(entry,tsa);
181                        }
182                    }
183                    if (LOG.isTraceEnabled()) {
184                        LOG.trace(destination.getPhysicalName() + " remove: " + messageId);
185                    }
186                }else{
187                    if (ackContainer.isEmpty() || subscriberMessages.size() == 1 || isUnreferencedBySubscribers(key, subscriberMessages, messageId)) {
188                        // no message reference held        
189                        removeMessage = true;
190                        // ensure we don't later add a reference
191                        dispatchAudit.isDuplicate(messageId);
192                        if (LOG.isDebugEnabled()) {
193                            LOG.debug(destination.getPhysicalName() + " remove with no outstanding reference (ack before add): " + messageId);
194                        }
195                    }
196                }
197            }
198        }finally {
199            lock.unlock();
200        }
201        return removeMessage;
202    }
203    
204    // verify that no subscriber has a reference to this message. In the case where the subscribers
205    // references are persisted but more than the persisted consumers get the message, the ack from the non
206    // persisted consumer would remove the message in error
207    //
208    // see: https://issues.apache.org/activemq/browse/AMQ-2123
209    private boolean isUnreferencedBySubscribers(
210            String key, Map<String, TopicSubContainer> subscriberContainers, MessageId messageId) {
211        boolean isUnreferenced = true;
212        for (Entry<String, TopicSubContainer> entry : subscriberContainers.entrySet()) {
213            if (!key.equals(entry.getKey()) && !entry.getValue().isEmpty()) {
214                TopicSubContainer container = entry.getValue();
215                for (Iterator i = container.iterator(); i.hasNext();) {
216                    ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
217                    if (messageId.equals(ref.getMessageId())) {
218                        isUnreferenced = false;
219                        break;
220                    }
221                }
222            }
223        }
224        return isUnreferenced; 
225    }
226
227    public void acknowledge(ConnectionContext context,
228                        String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
229            acknowledgeReference(context, clientId, subscriptionName, messageId);
230        }
231
232    public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
233        String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
234        lock.lock();
235        try {
236            // if already exists - won't add it again as it causes data files
237            // to hang around
238            if (!subscriberContainer.containsKey(key)) {
239                subscriberContainer.put(key, info);
240                adapter.addSubscriberState(info);
241            }
242            // add the subscriber
243            addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
244            if (retroactive) {
245                /*
246                 * for(StoreEntry
247                 * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
248                 * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
249                 * ConsumerMessageRef ref=new ConsumerMessageRef();
250                 * ref.setAckEntry(entry);
251                 * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); }
252                 */
253            }
254        }finally {
255            lock.unlock();
256        }
257    }
258
259    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
260        lock.lock();
261        try {
262            SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
263            if (info != null) {
264                adapter.removeSubscriberState(info);
265            }
266        removeSubscriberMessageContainer(clientId,subscriptionName);
267        }finally {
268            lock.unlock();
269        }
270    }
271
272    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
273        SubscriptionInfo[] result = subscriberContainer.values()
274            .toArray(new SubscriptionInfo[subscriberContainer.size()]);
275        return result;
276    }
277
278    public int getMessageCount(String clientId, String subscriberName) throws IOException {
279        String key = getSubscriptionKey(clientId, subscriberName);
280        TopicSubContainer container = subscriberMessages.get(key);
281        return container != null ? container.size() : 0;
282    }
283
284    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
285        return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
286    }
287
288    public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
289                                                 MessageRecoveryListener listener) throws Exception {
290        String key = getSubscriptionKey(clientId, subscriptionName);
291        lock.lock();
292        try {
293            TopicSubContainer container = subscriberMessages.get(key);
294            if (container != null) {
295                int count = 0;
296                StoreEntry entry = container.getBatchEntry();
297                if (entry == null) {
298                    entry = container.getEntry();
299                } else {
300                    entry = container.refreshEntry(entry);
301                    if (entry != null) {
302                        entry = container.getNextEntry(entry);
303                    }
304                }
305               
306                if (entry != null) {
307                    do {
308                        ConsumerMessageRef consumerRef = container.get(entry);
309                        ReferenceRecord msg = messageContainer.getValue(consumerRef
310                                .getMessageEntry());
311                        if (msg != null) {
312                            if (recoverReference(listener, msg)) {
313                                count++;
314                                container.setBatchEntry(msg.getMessageId(), entry);
315                            }
316                        } else {
317                            container.reset();
318                        }
319    
320                        entry = container.getNextEntry(entry);
321                    } while (entry != null && count < maxReturned && listener.hasSpace());
322                }
323            }
324        }finally {
325            lock.unlock();
326        }
327    }
328
329    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
330        throws Exception {
331        String key = getSubscriptionKey(clientId, subscriptionName);
332        TopicSubContainer container = subscriberMessages.get(key);
333        if (container != null) {
334            for (Iterator i = container.iterator(); i.hasNext();) {
335                ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
336                ReferenceRecord msg = messageContainer.getValue(ref.getMessageEntry());
337                if (msg != null) {
338                    if (!recoverReference(listener, msg)) {
339                        break;
340                    }
341                }
342            }
343        }
344    }
345
346    public void resetBatching(String clientId, String subscriptionName) {
347        lock.lock();
348        try {
349            String key = getSubscriptionKey(clientId, subscriptionName);
350            TopicSubContainer topicSubContainer = subscriberMessages.get(key);
351            if (topicSubContainer != null) {
352                topicSubContainer.reset();
353            }
354        }finally {
355            lock.unlock();
356        }
357    }
358    
359    public void removeAllMessages(ConnectionContext context) throws IOException {
360        lock.lock();
361        try {
362            Set<String> tmpSet = new HashSet<String>(subscriberContainer.keySet());
363            for (String key:tmpSet) {
364                TopicSubContainer container = subscriberMessages.get(key);
365                if (container != null) {
366                    container.clear();
367                }
368            }
369            ackContainer.clear();
370        }finally {
371            lock.unlock();
372        }
373        super.removeAllMessages(context);
374    }
375
376    protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
377        String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
378        String containerName = getSubscriptionContainerName(subscriberKey);
379        subscriberContainer.remove(subscriberKey);
380        TopicSubContainer container = subscriberMessages.remove(subscriberKey);
381        if (container != null) {
382            for (Iterator i = container.iterator(); i.hasNext();) {
383                ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
384                if (ref != null) {
385                    TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
386                    if (tsa != null) {
387                        if (tsa.decrementCount() <= 0) {
388                            ackContainer.remove(ref.getAckEntry());
389                            messageContainer.remove(tsa.getMessageEntry());
390                        } else {
391                            ackContainer.update(ref.getAckEntry(), tsa);
392                        }
393                    }
394                }
395            }
396        }
397        store.deleteMapContainer(containerName,containerName);
398    }
399
400    protected String getSubscriptionKey(String clientId, String subscriberName) {
401        StringBuffer buffer = new StringBuffer();
402        buffer.append(clientId).append(":");  
403        String name = subscriberName != null ? subscriberName : "NOT_SET";
404        return buffer.append(name).toString();
405    }
406    
407    private String getSubscriptionContainerName(String subscriptionKey) {
408        StringBuffer result = new StringBuffer(TOPIC_SUB_NAME);
409        result.append(destination.getQualifiedName());
410        result.append(subscriptionKey);
411        return result.toString();
412    }
413}