001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.Future;
024
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.command.Message;
027import org.apache.activemq.command.MessageAck;
028import org.apache.activemq.command.MessageId;
029import org.apache.activemq.command.TransactionId;
030import org.apache.activemq.command.XATransactionId;
031import org.apache.activemq.store.AbstractMessageStore;
032import org.apache.activemq.store.MessageStore;
033import org.apache.activemq.store.PersistenceAdapter;
034import org.apache.activemq.store.ProxyMessageStore;
035import org.apache.activemq.store.ProxyTopicMessageStore;
036import org.apache.activemq.store.TopicMessageStore;
037import org.apache.activemq.store.TransactionRecoveryListener;
038import org.apache.activemq.store.TransactionStore;
039
040/**
041 * Provides a TransactionStore implementation that can create transaction aware
042 * MessageStore objects from non transaction aware MessageStore objects.
043 *
044 *
045 */
046public class MemoryTransactionStore implements TransactionStore {
047
048    ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
049    ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
050    final PersistenceAdapter persistenceAdapter;
051
052    private boolean doingRecover;
053
054    public class Tx {
055        private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
056
057        private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
058
059        public void add(AddMessageCommand msg) {
060            messages.add(msg);
061        }
062
063        public void add(RemoveMessageCommand ack) {
064            acks.add(ack);
065        }
066
067        public Message[] getMessages() {
068            Message rc[] = new Message[messages.size()];
069            int count = 0;
070            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
071                AddMessageCommand cmd = iter.next();
072                rc[count++] = cmd.getMessage();
073            }
074            return rc;
075        }
076
077        public MessageAck[] getAcks() {
078            MessageAck rc[] = new MessageAck[acks.size()];
079            int count = 0;
080            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
081                RemoveMessageCommand cmd = iter.next();
082                rc[count++] = cmd.getMessageAck();
083            }
084            return rc;
085        }
086
087        /**
088         * @throws IOException
089         */
090        public void commit() throws IOException {
091            ConnectionContext ctx = new ConnectionContext();
092            persistenceAdapter.beginTransaction(ctx);
093            try {
094
095                // Do all the message adds.
096                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
097                    AddMessageCommand cmd = iter.next();
098                    cmd.run(ctx);
099                }
100                // And removes..
101                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
102                    RemoveMessageCommand cmd = iter.next();
103                    cmd.run(ctx);
104                }
105
106            } catch ( IOException e ) {
107                persistenceAdapter.rollbackTransaction(ctx);
108                throw e;
109            }
110            persistenceAdapter.commitTransaction(ctx);
111        }
112    }
113
114    public interface AddMessageCommand {
115        Message getMessage();
116
117        void run(ConnectionContext context) throws IOException;
118    }
119
120    public interface RemoveMessageCommand {
121        MessageAck getMessageAck();
122
123        void run(ConnectionContext context) throws IOException;
124    }
125
126    public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
127        this.persistenceAdapter=persistenceAdapter;
128    }
129
130    public MessageStore proxy(MessageStore messageStore) {
131        return new ProxyMessageStore(messageStore) {
132            @Override
133            public void addMessage(ConnectionContext context, final Message send) throws IOException {
134                MemoryTransactionStore.this.addMessage(getDelegate(), send);
135            }
136
137            @Override
138            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
139                MemoryTransactionStore.this.addMessage(getDelegate(), send);
140            }
141
142            @Override
143            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
144                MemoryTransactionStore.this.addMessage(getDelegate(), message);
145                return AbstractMessageStore.FUTURE;
146             }
147
148            @Override
149            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
150                MemoryTransactionStore.this.addMessage(getDelegate(), message);
151                return AbstractMessageStore.FUTURE;
152             }
153
154            @Override
155            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
156                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
157            }
158
159            @Override
160            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
161                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
162            }
163        };
164    }
165
166    public TopicMessageStore proxy(TopicMessageStore messageStore) {
167        return new ProxyTopicMessageStore(messageStore) {
168            @Override
169            public void addMessage(ConnectionContext context, final Message send) throws IOException {
170                MemoryTransactionStore.this.addMessage(getDelegate(), send);
171            }
172
173            @Override
174            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
175                MemoryTransactionStore.this.addMessage(getDelegate(), send);
176            }
177
178            @Override
179            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
180                MemoryTransactionStore.this.addMessage(getDelegate(), message);
181                return AbstractMessageStore.FUTURE;
182             }
183
184            @Override
185            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
186                MemoryTransactionStore.this.addMessage(getDelegate(), message);
187                return AbstractMessageStore.FUTURE;
188             }
189
190            @Override
191            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
192                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
193            }
194
195            @Override
196            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
197                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
198            }
199
200            @Override
201            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
202                            MessageId messageId, MessageAck ack) throws IOException {
203                MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
204                        subscriptionName, messageId, ack);
205            }
206        };
207    }
208
209    /**
210     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
211     */
212    public void prepare(TransactionId txid) {
213        Tx tx = inflightTransactions.remove(txid);
214        if (tx == null) {
215            return;
216        }
217        preparedTransactions.put(txid, tx);
218    }
219
220    public Tx getTx(Object txid) {
221        Tx tx = inflightTransactions.get(txid);
222        if (tx == null) {
223            tx = new Tx();
224            inflightTransactions.put(txid, tx);
225        }
226        return tx;
227    }
228
229    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
230        if (preCommit != null) {
231            preCommit.run();
232        }
233        Tx tx;
234        if (wasPrepared) {
235            tx = preparedTransactions.remove(txid);
236        } else {
237            tx = inflightTransactions.remove(txid);
238        }
239
240        if (tx == null) {
241            if (postCommit != null) {
242                postCommit.run();
243            }
244            return;
245        }
246        // ensure message order w.r.t to cursor and store for setBatch()
247        synchronized (this) {
248            tx.commit();
249            if (postCommit != null) {
250                postCommit.run();
251            }
252        }
253    }
254
255    /**
256     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
257     */
258    public void rollback(TransactionId txid) {
259        preparedTransactions.remove(txid);
260        inflightTransactions.remove(txid);
261    }
262
263    public void start() throws Exception {
264    }
265
266    public void stop() throws Exception {
267    }
268
269    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
270        // All the inflight transactions get rolled back..
271        inflightTransactions.clear();
272        this.doingRecover = true;
273        try {
274            for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
275                Object txid = iter.next();
276                Tx tx = preparedTransactions.get(txid);
277                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
278            }
279        } finally {
280            this.doingRecover = false;
281        }
282    }
283
284    /**
285     * @param message
286     * @throws IOException
287     */
288    void addMessage(final MessageStore destination, final Message message) throws IOException {
289
290        if (doingRecover) {
291            return;
292        }
293
294        if (message.getTransactionId() != null) {
295            Tx tx = getTx(message.getTransactionId());
296            tx.add(new AddMessageCommand() {
297                public Message getMessage() {
298                    return message;
299                }
300
301                public void run(ConnectionContext ctx) throws IOException {
302                    destination.addMessage(ctx, message);
303                }
304
305            });
306        } else {
307            destination.addMessage(null, message);
308        }
309    }
310
311    /**
312     * @param ack
313     * @throws IOException
314     */
315    final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
316        if (doingRecover) {
317            return;
318        }
319
320        if (ack.isInTransaction()) {
321            Tx tx = getTx(ack.getTransactionId());
322            tx.add(new RemoveMessageCommand() {
323                public MessageAck getMessageAck() {
324                    return ack;
325                }
326
327                public void run(ConnectionContext ctx) throws IOException {
328                    destination.removeMessage(ctx, ack);
329                }
330            });
331        } else {
332            destination.removeMessage(null, ack);
333        }
334    }
335
336    final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
337                           final MessageId messageId, final MessageAck ack) throws IOException {
338        if (doingRecover) {
339            return;
340        }
341
342        if (ack.isInTransaction()) {
343            Tx tx = getTx(ack.getTransactionId());
344            tx.add(new RemoveMessageCommand() {
345                public MessageAck getMessageAck() {
346                    return ack;
347                }
348
349                public void run(ConnectionContext ctx) throws IOException {
350                    destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
351                }
352            });
353        } else {
354            destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
355        }
356    }
357
358
359    public void delete() {
360        inflightTransactions.clear();
361        preparedTransactions.clear();
362        doingRecover = false;
363    }
364
365}