001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.FileFilter;
021import java.io.IOException;
022import java.nio.charset.Charset;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.CopyOnWriteArrayList;
030
031import javax.transaction.xa.Xid;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.broker.Lockable;
037import org.apache.activemq.broker.LockableServiceSupport;
038import org.apache.activemq.broker.Locker;
039import org.apache.activemq.broker.scheduler.JobSchedulerStore;
040import org.apache.activemq.command.ActiveMQDestination;
041import org.apache.activemq.command.ActiveMQQueue;
042import org.apache.activemq.command.ActiveMQTopic;
043import org.apache.activemq.command.LocalTransactionId;
044import org.apache.activemq.command.ProducerId;
045import org.apache.activemq.command.TransactionId;
046import org.apache.activemq.command.XATransactionId;
047import org.apache.activemq.filter.AnyDestination;
048import org.apache.activemq.filter.DestinationMap;
049import org.apache.activemq.filter.DestinationMapEntry;
050import org.apache.activemq.store.MessageStore;
051import org.apache.activemq.store.NoLocalSubscriptionAware;
052import org.apache.activemq.store.PersistenceAdapter;
053import org.apache.activemq.store.SharedFileLocker;
054import org.apache.activemq.store.TopicMessageStore;
055import org.apache.activemq.store.TransactionIdTransformer;
056import org.apache.activemq.store.TransactionIdTransformerAware;
057import org.apache.activemq.store.TransactionStore;
058import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
059import org.apache.activemq.usage.SystemUsage;
060import org.apache.activemq.util.IOExceptionSupport;
061import org.apache.activemq.util.IOHelper;
062import org.apache.activemq.util.IntrospectionSupport;
063import org.apache.activemq.util.ServiceStopper;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
069 * distribution of destinations across multiple kahaDB persistence adapters
070 *
071 * @org.apache.xbean.XBean element="mKahaDB"
072 */
073public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter,
074    BrokerServiceAware, NoLocalSubscriptionAware {
075
076    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
077
078    final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
079    final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
080
081    final class DelegateDestinationMap extends DestinationMap {
082        @Override
083        public void setEntries(List<DestinationMapEntry>  entries) {
084            super.setEntries(entries);
085        }
086    };
087    final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
088
089    List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<PersistenceAdapter>();
090    private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
091
092    MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
093
094    // all local store transactions are XA, 2pc if more than one adapter involved
095    TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
096        @Override
097        public TransactionId transform(TransactionId txid) {
098            if (txid == null) {
099                return null;
100            }
101            if (txid.isLocalTransaction()) {
102                final LocalTransactionId t = (LocalTransactionId) txid;
103                return new XATransactionId(new Xid() {
104                    @Override
105                    public int getFormatId() {
106                        return LOCAL_FORMAT_ID_MAGIC;
107                    }
108
109                    @Override
110                    public byte[] getGlobalTransactionId() {
111                        return t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"));
112                    }
113
114                    @Override
115                    public byte[] getBranchQualifier() {
116                        return Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"));
117                    }
118                });
119            } else {
120                return txid;
121            }
122        }
123    };
124
125    /**
126     * Sets the  FilteredKahaDBPersistenceAdapter entries
127     *
128     * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
129     */
130    @SuppressWarnings({ "rawtypes", "unchecked" })
131    public void setFilteredPersistenceAdapters(List entries) {
132        for (Object entry : entries) {
133            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
134            PersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
135            if (filteredAdapter.getDestination() == null) {
136                filteredAdapter.setDestination(matchAll);
137            }
138
139            if (filteredAdapter.isPerDestination()) {
140                configureDirectory(adapter, null);
141                // per destination adapters will be created on demand or during recovery
142                continue;
143            } else {
144                configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
145            }
146
147            configureAdapter(adapter);
148            adapters.add(adapter);
149        }
150        destinationMap.setEntries(entries);
151    }
152
153    private String nameFromDestinationFilter(ActiveMQDestination destination) {
154        if (destination.getQualifiedName().length() > IOHelper.getMaxFileNameLength()) {
155            LOG.warn("Destination name is longer than 'MaximumFileNameLength' system property, " +
156                     "potential problem with recovery can result from name truncation.");
157        }
158
159        return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
160    }
161
162    public boolean isLocalXid(TransactionId xid) {
163        return xid instanceof XATransactionId &&
164                ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
165    }
166
167    @Override
168    public void beginTransaction(ConnectionContext context) throws IOException {
169        throw new IllegalStateException();
170    }
171
172    @Override
173    public void checkpoint(final boolean sync) throws IOException {
174        for (PersistenceAdapter persistenceAdapter : adapters) {
175            persistenceAdapter.checkpoint(sync);
176        }
177    }
178
179    @Override
180    public void commitTransaction(ConnectionContext context) throws IOException {
181        throw new IllegalStateException();
182    }
183
184    @Override
185    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
186        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
187        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
188    }
189
190    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) throws IOException {
191        Object result = destinationMap.chooseValue(destination);
192        if (result == null) {
193            throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
194        }
195        FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
196        if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
197            filteredAdapter = addAdapter(filteredAdapter, destination);
198            if (LOG.isTraceEnabled()) {
199                LOG.info("created per destination adapter for: " + destination  + ", " + result);
200            }
201        }
202        startAdapter(filteredAdapter.getPersistenceAdapter(), destination.getQualifiedName());
203        LOG.debug("destination {} matched persistence adapter {}", new Object[]{destination.getQualifiedName(), filteredAdapter.getPersistenceAdapter()});
204        return filteredAdapter.getPersistenceAdapter();
205    }
206
207    private void startAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
208        try {
209            kahaDBPersistenceAdapter.start();
210        } catch (Exception e) {
211            RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
212            LOG.error(detail.toString(), e);
213            throw detail;
214        }
215    }
216
217    private void stopAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
218        try {
219            kahaDBPersistenceAdapter.stop();
220        } catch (Exception e) {
221            RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
222            LOG.error(detail.toString(), e);
223            throw detail;
224        }
225    }
226
227    @Override
228    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
229        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
230        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
231    }
232
233    @Override
234    public TransactionStore createTransactionStore() throws IOException {
235        return transactionStore;
236    }
237
238    @Override
239    public void deleteAllMessages() throws IOException {
240        for (PersistenceAdapter persistenceAdapter : adapters) {
241            persistenceAdapter.deleteAllMessages();
242        }
243        transactionStore.deleteAllMessages();
244        IOHelper.deleteChildren(getDirectory());
245    }
246
247    @Override
248    public Set<ActiveMQDestination> getDestinations() {
249        Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
250        for (PersistenceAdapter persistenceAdapter : adapters) {
251            results.addAll(persistenceAdapter.getDestinations());
252        }
253        return results;
254    }
255
256    @Override
257    public long getLastMessageBrokerSequenceId() throws IOException {
258        long maxId = -1;
259        for (PersistenceAdapter persistenceAdapter : adapters) {
260            maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
261        }
262        return maxId;
263    }
264
265    @Override
266    public long getLastProducerSequenceId(ProducerId id) throws IOException {
267        long maxId = -1;
268        for (PersistenceAdapter persistenceAdapter : adapters) {
269            maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
270        }
271        return maxId;
272    }
273
274    @Override
275    public void removeQueueMessageStore(ActiveMQQueue destination) {
276        PersistenceAdapter adapter = null;
277        try {
278            adapter = getMatchingPersistenceAdapter(destination);
279        } catch (IOException e) {
280            throw new RuntimeException(e);
281        }
282        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
283            adapter.removeQueueMessageStore(destination);
284            removeMessageStore(adapter, destination);
285            destinationMap.removeAll(destination);
286        }
287    }
288
289    @Override
290    public void removeTopicMessageStore(ActiveMQTopic destination) {
291        PersistenceAdapter adapter = null;
292        try {
293            adapter = getMatchingPersistenceAdapter(destination);
294        } catch (IOException e) {
295            throw new RuntimeException(e);
296        }
297        if (adapter instanceof PersistenceAdapter && adapter.getDestinations().isEmpty()) {
298            adapter.removeTopicMessageStore(destination);
299            removeMessageStore(adapter, destination);
300            destinationMap.removeAll(destination);
301        }
302    }
303
304    private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination) {
305        stopAdapter(adapter, destination.toString());
306        File adapterDir = adapter.getDirectory();
307        if (adapterDir != null) {
308            if (IOHelper.deleteFile(adapterDir)) {
309                if (LOG.isTraceEnabled()) {
310                    LOG.info("deleted per destination adapter directory for: " + destination);
311                }
312            } else {
313                if (LOG.isTraceEnabled()) {
314                    LOG.info("failed to deleted per destination adapter directory for: " + destination);
315                }
316            }
317        }
318    }
319
320    @Override
321    public void rollbackTransaction(ConnectionContext context) throws IOException {
322        throw new IllegalStateException();
323    }
324
325    @Override
326    public void setBrokerName(String brokerName) {
327        for (PersistenceAdapter persistenceAdapter : adapters) {
328            persistenceAdapter.setBrokerName(brokerName);
329        }
330    }
331
332    @Override
333    public void setUsageManager(SystemUsage usageManager) {
334        for (PersistenceAdapter persistenceAdapter : adapters) {
335            persistenceAdapter.setUsageManager(usageManager);
336        }
337    }
338
339    @Override
340    public long size() {
341        long size = 0;
342        for (PersistenceAdapter persistenceAdapter : adapters) {
343            size += persistenceAdapter.size();
344        }
345        return size;
346    }
347
348    @Override
349    public void doStart() throws Exception {
350        Object result = destinationMap.chooseValue(matchAll);
351        if (result != null) {
352            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
353            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
354                findAndRegisterExistingAdapters(filteredAdapter);
355            }
356        }
357        for (PersistenceAdapter persistenceAdapter : adapters) {
358            persistenceAdapter.start();
359        }
360    }
361
362    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException {
363        FileFilter destinationNames = new FileFilter() {
364            @Override
365            public boolean accept(File file) {
366                return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
367            }
368        };
369        File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
370        if (candidates != null) {
371            for (File candidate : candidates) {
372                registerExistingAdapter(template, candidate);
373            }
374        }
375    }
376
377    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) throws IOException {
378        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
379        startAdapter(adapter, candidate.getName());
380        Set<ActiveMQDestination> destinations = adapter.getDestinations();
381        if (destinations.size() != 0) {
382            registerAdapter(adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
383        } else {
384            stopAdapter(adapter, candidate.getName());
385        }
386    }
387
388    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) throws IOException {
389        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
390        return registerAdapter(adapter, destination);
391    }
392
393    private PersistenceAdapter adapterFromTemplate(PersistenceAdapter template, String destinationName) throws IOException {
394        PersistenceAdapter adapter = kahaDBFromTemplate(template);
395        configureAdapter(adapter);
396        configureDirectory(adapter, destinationName);
397        return adapter;
398    }
399
400    private void configureDirectory(PersistenceAdapter adapter, String fileName) {
401        File directory = null;
402        File defaultDir = MessageDatabase.DEFAULT_DIRECTORY;
403        try {
404            defaultDir = adapter.getClass().newInstance().getDirectory();
405        } catch (Exception e) {
406        }
407        if (defaultDir.equals(adapter.getDirectory())) {
408            // not set so inherit from mkahadb
409            directory = getDirectory();
410        } else {
411            directory = adapter.getDirectory();
412        }
413
414        if (fileName != null) {
415            directory = new File(directory, fileName);
416        }
417        adapter.setDirectory(directory);
418    }
419
420    private FilteredKahaDBPersistenceAdapter registerAdapter(PersistenceAdapter adapter, ActiveMQDestination destination) {
421        adapters.add(adapter);
422        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
423        destinationMap.put(destination, result);
424        return result;
425    }
426
427    private void configureAdapter(PersistenceAdapter adapter) {
428        // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
429        ((TransactionIdTransformerAware)adapter).setTransactionIdTransformer(transactionIdTransformer);
430        if (isUseLock()) {
431            if( adapter instanceof Lockable ) {
432                ((Lockable)adapter).setUseLock(false);
433            }
434        }
435        if( adapter instanceof BrokerServiceAware ) {
436            ((BrokerServiceAware)adapter).setBrokerService(getBrokerService());
437        }
438    }
439
440    private PersistenceAdapter kahaDBFromTemplate(PersistenceAdapter template) throws IOException {
441        try {
442            Map<String, Object> configuration = new HashMap<String, Object>();
443            IntrospectionSupport.getProperties(template, configuration, null);
444            PersistenceAdapter adapter = template.getClass().newInstance();
445            IntrospectionSupport.setProperties(adapter, configuration);
446            return adapter;
447        } catch (Exception e) {
448            throw IOExceptionSupport.create(e);
449        }
450    }
451
452    @Override
453    protected void doStop(ServiceStopper stopper) throws Exception {
454        for (PersistenceAdapter persistenceAdapter : adapters) {
455            stopper.stop(persistenceAdapter);
456        }
457    }
458
459    @Override
460    public File getDirectory() {
461        return this.directory;
462    }
463
464    @Override
465    public void setDirectory(File directory) {
466        this.directory = directory;
467    }
468
469    @Override
470    public void init() throws Exception {
471    }
472
473    @Override
474    public void setBrokerService(BrokerService brokerService) {
475        super.setBrokerService(brokerService);
476        for (PersistenceAdapter persistenceAdapter : adapters) {
477            if( persistenceAdapter instanceof BrokerServiceAware ) {
478                ((BrokerServiceAware)persistenceAdapter).setBrokerService(getBrokerService());
479            }
480        }
481    }
482
483    public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
484        this.transactionStore = transactionStore;
485    }
486
487    /**
488     * Set the max file length of the transaction journal
489     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
490     * be used
491     *
492     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
493     */
494    public void setJournalMaxFileLength(int maxFileLength) {
495        transactionStore.setJournalMaxFileLength(maxFileLength);
496    }
497
498    public int getJournalMaxFileLength() {
499        return transactionStore.getJournalMaxFileLength();
500    }
501
502    /**
503     * Set the max write batch size of  the transaction journal
504     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
505     * be used
506     *
507     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
508     */
509    public void setJournalWriteBatchSize(int journalWriteBatchSize) {
510        transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
511    }
512
513    public int getJournalWriteBatchSize() {
514        return transactionStore.getJournalMaxWriteBatchSize();
515    }
516
517    public List<PersistenceAdapter> getAdapters() {
518        return Collections.unmodifiableList(adapters);
519    }
520
521    @Override
522    public String toString() {
523        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
524        return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
525    }
526
527    @Override
528    public Locker createDefaultLocker() throws IOException {
529        SharedFileLocker locker = new SharedFileLocker();
530        locker.configure(this);
531        return locker;
532    }
533
534    @Override
535    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
536        return new JobSchedulerStoreImpl();
537    }
538
539    /* (non-Javadoc)
540     * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
541     */
542    @Override
543    public boolean isPersistNoLocal() {
544        // Prior to v11 the broker did not store the noLocal value for durable subs.
545        return brokerService.getStoreOpenWireVersion() >= 11;
546    }
547}