001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.Date;
022import java.util.HashSet;
023import java.util.Set;
024import java.util.TreeSet;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.atomic.AtomicBoolean;
028
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.command.Message;
032import org.apache.activemq.command.MessageAck;
033import org.apache.activemq.command.MessageId;
034import org.apache.activemq.command.TransactionId;
035import org.apache.activemq.command.XATransactionId;
036import org.apache.activemq.store.AbstractMessageStore;
037import org.apache.activemq.store.ListenableFuture;
038import org.apache.activemq.store.MessageStore;
039import org.apache.activemq.store.PersistenceAdapter;
040import org.apache.activemq.store.ProxyMessageStore;
041import org.apache.activemq.store.ProxyTopicMessageStore;
042import org.apache.activemq.store.TopicMessageStore;
043import org.apache.activemq.store.TransactionRecoveryListener;
044import org.apache.activemq.store.TransactionStore;
045import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
046import org.apache.activemq.store.kahadb.data.KahaEntryType;
047import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
048import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
049import org.apache.activemq.store.kahadb.disk.journal.Journal;
050import org.apache.activemq.store.kahadb.disk.journal.Location;
051import org.apache.activemq.util.DataByteArrayInputStream;
052import org.apache.activemq.util.DataByteArrayOutputStream;
053import org.apache.activemq.util.IOHelper;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057public class MultiKahaDBTransactionStore implements TransactionStore {
058    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
059    final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
060    final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
061    final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
062    private Journal journal;
063    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
064    private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
065    private final AtomicBoolean started = new AtomicBoolean(false);
066
067    public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
068        this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
069    }
070
071    public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
072        return new ProxyMessageStore(messageStore) {
073            @Override
074            public void addMessage(ConnectionContext context, final Message send) throws IOException {
075                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
076            }
077
078            @Override
079            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
080                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
081            }
082
083            @Override
084            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
085                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
086            }
087
088            @Override
089            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
090                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
091            }
092
093            @Override
094            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
095                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
096            }
097
098            @Override
099            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
100                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
101            }
102        };
103    }
104
105    public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
106        return new ProxyTopicMessageStore(messageStore) {
107            @Override
108            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
109                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
110            }
111
112            @Override
113            public void addMessage(ConnectionContext context, final Message send) throws IOException {
114                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
115            }
116
117            @Override
118            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
119                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
120            }
121
122            @Override
123            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
124                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
125            }
126
127            @Override
128            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
129                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
130            }
131
132            @Override
133            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
134                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
135            }
136
137            @Override
138            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
139                                    MessageId messageId, MessageAck ack) throws IOException {
140                MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
141                        subscriptionName, messageId, ack);
142            }
143        };
144    }
145
146    public void deleteAllMessages() {
147        IOHelper.deleteChildren(getDirectory());
148    }
149
150    public int getJournalMaxFileLength() {
151        return journalMaxFileLength;
152    }
153
154    public void setJournalMaxFileLength(int journalMaxFileLength) {
155        this.journalMaxFileLength = journalMaxFileLength;
156    }
157
158    public int getJournalMaxWriteBatchSize() {
159        return journalWriteBatchSize;
160    }
161
162    public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
163        this.journalWriteBatchSize = journalWriteBatchSize;
164    }
165
166    public class Tx {
167        private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
168        private int prepareLocationId = 0;
169
170        public void trackStore(TransactionStore store) {
171            stores.add(store);
172        }
173
174        public Set<TransactionStore> getStores() {
175            return stores;
176        }
177
178        public void trackPrepareLocation(Location location) {
179            this.prepareLocationId = location.getDataFileId();
180        }
181
182        public int getPreparedLocationId() {
183            return prepareLocationId;
184        }
185    }
186
187    public Tx getTx(TransactionId txid) {
188        Tx tx = inflightTransactions.get(txid);
189        if (tx == null) {
190            tx = new Tx();
191            inflightTransactions.put(txid, tx);
192        }
193        return tx;
194    }
195
196    public Tx removeTx(TransactionId txid) {
197        return inflightTransactions.remove(txid);
198    }
199
200    @Override
201    public void prepare(TransactionId txid) throws IOException {
202        Tx tx = getTx(txid);
203        for (TransactionStore store : tx.getStores()) {
204            store.prepare(txid);
205        }
206    }
207
208    @Override
209    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
210            throws IOException {
211
212        if (preCommit != null) {
213            preCommit.run();
214        }
215
216        Tx tx = getTx(txid);
217        if (wasPrepared) {
218            for (TransactionStore store : tx.getStores()) {
219                store.commit(txid, true, null, null);
220            }
221        } else {
222            // can only do 1pc on a single store
223            if (tx.getStores().size() == 1) {
224                for (TransactionStore store : tx.getStores()) {
225                    store.commit(txid, false, null, null);
226                }
227            } else {
228                // need to do local 2pc
229                for (TransactionStore store : tx.getStores()) {
230                    store.prepare(txid);
231                }
232                persistOutcome(tx, txid);
233                for (TransactionStore store : tx.getStores()) {
234                    store.commit(txid, true, null, null);
235                }
236                persistCompletion(txid);
237            }
238        }
239        removeTx(txid);
240        if (postCommit != null) {
241            postCommit.run();
242        }
243    }
244
245    public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
246        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
247    }
248
249    public void persistCompletion(TransactionId txid) throws IOException {
250        store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
251    }
252
253    private Location store(JournalCommand<?> data) throws IOException {
254        int size = data.serializedSizeFramed();
255        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
256        os.writeByte(data.type().getNumber());
257        data.writeFramed(os);
258        Location location = journal.write(os.toByteSequence(), true);
259        journal.setLastAppendLocation(location);
260        return location;
261    }
262
263    @Override
264    public void rollback(TransactionId txid) throws IOException {
265        Tx tx = removeTx(txid);
266        if (tx != null) {
267            for (TransactionStore store : tx.getStores()) {
268                store.rollback(txid);
269            }
270        }
271    }
272
273    @Override
274    public void start() throws Exception {
275        if (started.compareAndSet(false, true)) {
276            journal = new Journal() {
277                @Override
278                public void cleanup() {
279                    super.cleanup();
280                    txStoreCleanup();
281                }
282            };
283            journal.setDirectory(getDirectory());
284            journal.setMaxFileLength(journalMaxFileLength);
285            journal.setWriteBatchSize(journalWriteBatchSize);
286            IOHelper.mkdirs(journal.getDirectory());
287            journal.start();
288            recoverPendingLocalTransactions();
289            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
290        }
291    }
292
293    private void txStoreCleanup() {
294        Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
295        for (Tx tx : inflightTransactions.values()) {
296            knownDataFileIds.remove(tx.getPreparedLocationId());
297        }
298        try {
299            journal.removeDataFiles(knownDataFileIds);
300        } catch (Exception e) {
301            LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
302        }
303    }
304
305    private File getDirectory() {
306        return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
307    }
308
309    @Override
310    public void stop() throws Exception {
311        if (started.compareAndSet(true, false) && journal != null) {
312            journal.close();
313            journal = null;
314        }
315    }
316
317    private void recoverPendingLocalTransactions() throws IOException {
318        Location location = journal.getNextLocation(null);
319        while (location != null) {
320            process(load(location));
321            location = journal.getNextLocation(location);
322        }
323        recoveredPendingCommit.addAll(inflightTransactions.keySet());
324        LOG.info("pending local transactions: " + recoveredPendingCommit);
325    }
326
327    public JournalCommand<?> load(Location location) throws IOException {
328        DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
329        byte readByte = is.readByte();
330        KahaEntryType type = KahaEntryType.valueOf(readByte);
331        if (type == null) {
332            throw new IOException("Could not load journal record. Invalid location: " + location);
333        }
334        JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
335        message.mergeFramed(is);
336        return message;
337    }
338
339    public void process(JournalCommand<?> command) throws IOException {
340        switch (command.type()) {
341            case KAHA_PREPARE_COMMAND:
342                KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
343                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
344                break;
345            case KAHA_COMMIT_COMMAND:
346                KahaCommitCommand commitCommand = (KahaCommitCommand) command;
347                removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
348                break;
349            case KAHA_TRACE_COMMAND:
350                break;
351            default:
352                throw new IOException("Unexpected command in transaction journal: " + command);
353        }
354    }
355
356
357    @Override
358    public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
359
360        for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
361            adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
362                @Override
363                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
364                    try {
365                        getTx(xid).trackStore(adapter.createTransactionStore());
366                    } catch (IOException e) {
367                        LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
368                    }
369                    listener.recover(xid, addedMessages, acks);
370                }
371            });
372        }
373
374        try {
375            Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
376            // force completion of local xa
377            for (TransactionId txid : broker.getPreparedTransactions(null)) {
378                if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
379                    try {
380                        if (recoveredPendingCommit.contains(txid)) {
381                            LOG.info("delivering pending commit outcome for tid: " + txid);
382                            broker.commitTransaction(null, txid, false);
383
384                        } else {
385                            LOG.info("delivering rollback outcome to store for tid: " + txid);
386                            broker.forgetTransaction(null, txid);
387                        }
388                        persistCompletion(txid);
389                    } catch (Exception ex) {
390                        LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
391                    }
392                }
393            }
394        } catch (Exception e) {
395            LOG.error("failed to resolve pending local transactions", e);
396        }
397    }
398
399    void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
400            throws IOException {
401        if (message.getTransactionId() != null) {
402            getTx(message.getTransactionId()).trackStore(transactionStore);
403        }
404        destination.addMessage(context, message);
405    }
406
407    ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
408            throws IOException {
409        if (message.getTransactionId() != null) {
410            getTx(message.getTransactionId()).trackStore(transactionStore);
411            destination.addMessage(context, message);
412            return AbstractMessageStore.FUTURE;
413        } else {
414            return destination.asyncAddQueueMessage(context, message);
415        }
416    }
417
418    ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
419            throws IOException {
420
421        if (message.getTransactionId() != null) {
422            getTx(message.getTransactionId()).trackStore(transactionStore);
423            destination.addMessage(context, message);
424            return AbstractMessageStore.FUTURE;
425        } else {
426            return destination.asyncAddTopicMessage(context, message);
427        }
428    }
429
430    final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
431            throws IOException {
432        if (ack.getTransactionId() != null) {
433            getTx(ack.getTransactionId()).trackStore(transactionStore);
434        }
435        destination.removeMessage(context, ack);
436    }
437
438    final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
439            throws IOException {
440        if (ack.getTransactionId() != null) {
441            getTx(ack.getTransactionId()).trackStore(transactionStore);
442        }
443        destination.removeAsyncMessage(context, ack);
444    }
445
446    final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
447                           final String clientId, final String subscriptionName,
448                           final MessageId messageId, final MessageAck ack) throws IOException {
449        if (ack.getTransactionId() != null) {
450            getTx(ack.getTransactionId()).trackStore(transactionStore);
451        }
452        destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
453    }
454
455}