001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.Map.Entry;
031import java.util.concurrent.*;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.activemq.broker.ConnectionContext;
035import org.apache.activemq.broker.region.Destination;
036import org.apache.activemq.broker.region.RegionBroker;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ActiveMQQueue;
039import org.apache.activemq.command.ActiveMQTempQueue;
040import org.apache.activemq.command.ActiveMQTempTopic;
041import org.apache.activemq.command.ActiveMQTopic;
042import org.apache.activemq.command.Message;
043import org.apache.activemq.command.MessageAck;
044import org.apache.activemq.command.MessageId;
045import org.apache.activemq.command.ProducerId;
046import org.apache.activemq.command.SubscriptionInfo;
047import org.apache.activemq.command.TransactionId;
048import org.apache.activemq.openwire.OpenWireFormat;
049import org.apache.activemq.protobuf.Buffer;
050import org.apache.activemq.store.AbstractMessageStore;
051import org.apache.activemq.store.MessageRecoveryListener;
052import org.apache.activemq.store.MessageStore;
053import org.apache.activemq.store.PersistenceAdapter;
054import org.apache.activemq.store.TopicMessageStore;
055import org.apache.activemq.store.TransactionStore;
056import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
057import org.apache.activemq.store.kahadb.data.KahaDestination;
058import org.apache.activemq.store.kahadb.data.KahaLocation;
059import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
060import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
061import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
062import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
063import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
064import org.apache.activemq.usage.MemoryUsage;
065import org.apache.activemq.usage.SystemUsage;
066import org.apache.activemq.util.ServiceStopper;
067import org.apache.activemq.wireformat.WireFormat;
068import org.apache.kahadb.util.ByteSequence;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071import org.apache.kahadb.journal.Location;
072import org.apache.kahadb.page.Transaction;
073
074public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
075    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
076    private static final int MAX_ASYNC_JOBS = 10000;
077
078    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
079    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
080            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
081    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
082    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
083            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
084
085    protected ExecutorService queueExecutor;
086    protected ExecutorService topicExecutor;
087    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
088    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
089    final WireFormat wireFormat = new OpenWireFormat();
090    private SystemUsage usageManager;
091    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
092    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
093    Semaphore globalQueueSemaphore;
094    Semaphore globalTopicSemaphore;
095    private boolean concurrentStoreAndDispatchQueues = true;
096    // when true, message order may be compromised when cache is exhausted if store is out
097    // or order w.r.t cache
098    private boolean concurrentStoreAndDispatchTopics = false;
099    private boolean concurrentStoreAndDispatchTransactions = false;
100    private int maxAsyncJobs = MAX_ASYNC_JOBS;
101    private final KahaDBTransactionStore transactionStore;
102    private TransactionIdTransformer transactionIdTransformer;
103
104    public KahaDBStore() {
105        this.transactionStore = new KahaDBTransactionStore(this);
106        this.transactionIdTransformer = new TransactionIdTransformer() {
107            @Override
108            public KahaTransactionInfo transform(TransactionId txid) {
109                return TransactionIdConversion.convert(txid);
110            }
111        };
112    }
113
114    @Override
115    public String toString() {
116        return "KahaDB:[" + directory.getAbsolutePath() + "]";
117    }
118
119    public void setBrokerName(String brokerName) {
120    }
121
122    public void setUsageManager(SystemUsage usageManager) {
123        this.usageManager = usageManager;
124    }
125
126    public SystemUsage getUsageManager() {
127        return this.usageManager;
128    }
129
130    /**
131     * @return the concurrentStoreAndDispatch
132     */
133    public boolean isConcurrentStoreAndDispatchQueues() {
134        return this.concurrentStoreAndDispatchQueues;
135    }
136
137    /**
138     * @param concurrentStoreAndDispatch
139     *            the concurrentStoreAndDispatch to set
140     */
141    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
142        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
143    }
144
145    /**
146     * @return the concurrentStoreAndDispatch
147     */
148    public boolean isConcurrentStoreAndDispatchTopics() {
149        return this.concurrentStoreAndDispatchTopics;
150    }
151
152    /**
153     * @param concurrentStoreAndDispatch
154     *            the concurrentStoreAndDispatch to set
155     */
156    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
157        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
158    }
159
160    public boolean isConcurrentStoreAndDispatchTransactions() {
161        return this.concurrentStoreAndDispatchTransactions;
162    }
163
164    /**
165     * @return the maxAsyncJobs
166     */
167    public int getMaxAsyncJobs() {
168        return this.maxAsyncJobs;
169    }
170    /**
171     * @param maxAsyncJobs
172     *            the maxAsyncJobs to set
173     */
174    public void setMaxAsyncJobs(int maxAsyncJobs) {
175        this.maxAsyncJobs = maxAsyncJobs;
176    }
177
178    @Override
179    public void doStart() throws Exception {
180        super.doStart();
181        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
182        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
183        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
184        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
185        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
186                asyncQueueJobQueue, new ThreadFactory() {
187                    public Thread newThread(Runnable runnable) {
188                        Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
189                        thread.setDaemon(true);
190                        return thread;
191                    }
192                });
193        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
194                asyncTopicJobQueue, new ThreadFactory() {
195                    public Thread newThread(Runnable runnable) {
196                        Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
197                        thread.setDaemon(true);
198                        return thread;
199                    }
200                });
201    }
202
203    @Override
204    public void doStop(ServiceStopper stopper) throws Exception {
205        // drain down async jobs
206        LOG.info("Stopping async queue tasks");
207        if (this.globalQueueSemaphore != null) {
208            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
209        }
210        synchronized (this.asyncQueueMaps) {
211            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
212                synchronized (m) {
213                    for (StoreTask task : m.values()) {
214                        task.cancel();
215                    }
216                }
217            }
218            this.asyncQueueMaps.clear();
219        }
220        LOG.info("Stopping async topic tasks");
221        if (this.globalTopicSemaphore != null) {
222            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
223        }
224        synchronized (this.asyncTopicMaps) {
225            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
226                synchronized (m) {
227                    for (StoreTask task : m.values()) {
228                        task.cancel();
229                    }
230                }
231            }
232            this.asyncTopicMaps.clear();
233        }
234        if (this.globalQueueSemaphore != null) {
235            this.globalQueueSemaphore.drainPermits();
236        }
237        if (this.globalTopicSemaphore != null) {
238            this.globalTopicSemaphore.drainPermits();
239        }
240        if (this.queueExecutor != null) {
241            this.queueExecutor.shutdownNow();
242        }
243        if (this.topicExecutor != null) {
244            this.topicExecutor.shutdownNow();
245        }
246        LOG.info("Stopped KahaDB");
247        super.doStop(stopper);
248    }
249
250    void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException {
251        Location location;
252        this.indexLock.writeLock().lock();
253        try {
254              location = findMessageLocation(key, destination);
255        } finally {
256            this.indexLock.writeLock().unlock();
257        }
258
259        if (location != null) {
260            KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
261            Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
262
263            message.incrementRedeliveryCounter();
264            if (LOG.isTraceEnabled()) {
265                LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter());
266            }
267            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
268            addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
269
270            final Location rewriteLocation = journal.write(toByteSequence(addMessage), true);
271
272            this.indexLock.writeLock().lock();
273            try {
274                pageFile.tx().execute(new Transaction.Closure<IOException>() {
275                    public void execute(Transaction tx) throws IOException {
276                        StoredDestination sd = getStoredDestination(destination, tx);
277                        Long sequence = sd.messageIdIndex.get(tx, key);
278                        MessageKeys keys = sd.orderIndex.get(tx, sequence);
279                        sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation));
280                    }
281                });
282            } finally {
283                this.indexLock.writeLock().unlock();
284            }
285        }
286    }
287
288    @Override
289    void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
290        if (brokerService != null) {
291            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
292            if (regionBroker != null) {
293                Set<Destination> destinationSet = regionBroker.getDestinations(convert(commandDestination));
294                for (Destination destination : destinationSet) {
295                    destination.getDestinationStatistics().getMessages().decrement();
296                    destination.getDestinationStatistics().getEnqueues().decrement();
297                }
298            }
299        }
300    }
301
302    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
303        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
304            public Location execute(Transaction tx) throws IOException {
305                StoredDestination sd = getStoredDestination(destination, tx);
306                Long sequence = sd.messageIdIndex.get(tx, key);
307                if (sequence == null) {
308                    return null;
309                }
310                return sd.orderIndex.get(tx, sequence).location;
311            }
312        });
313    }
314
315    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
316        StoreQueueTask task = null;
317        synchronized (store.asyncTaskMap) {
318            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
319        }
320        return task;
321    }
322
323    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
324        synchronized (store.asyncTaskMap) {
325            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
326        }
327        this.queueExecutor.execute(task);
328    }
329
330    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
331        StoreTopicTask task = null;
332        synchronized (store.asyncTaskMap) {
333            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
334        }
335        return task;
336    }
337
338    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
339        synchronized (store.asyncTaskMap) {
340            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
341        }
342        this.topicExecutor.execute(task);
343    }
344
345    public TransactionStore createTransactionStore() throws IOException {
346        return this.transactionStore;
347    }
348
349    public boolean getForceRecoverIndex() {
350        return this.forceRecoverIndex;
351    }
352
353    public void setForceRecoverIndex(boolean forceRecoverIndex) {
354        this.forceRecoverIndex = forceRecoverIndex;
355    }
356
357    public class KahaDBMessageStore extends AbstractMessageStore {
358        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
359        protected KahaDestination dest;
360        private final int maxAsyncJobs;
361        private final Semaphore localDestinationSemaphore;
362
363        double doneTasks, canceledTasks = 0;
364
365        public KahaDBMessageStore(ActiveMQDestination destination) {
366            super(destination);
367            this.dest = convert(destination);
368            this.maxAsyncJobs = getMaxAsyncJobs();
369            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
370        }
371
372        @Override
373        public ActiveMQDestination getDestination() {
374            return destination;
375        }
376
377        @Override
378        public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
379                throws IOException {
380            if (isConcurrentStoreAndDispatchQueues()) {
381                StoreQueueTask result = new StoreQueueTask(this, context, message);
382                result.aquireLocks();
383                addQueueTask(this, result);
384                return result.getFuture();
385            } else {
386                return super.asyncAddQueueMessage(context, message);
387            }
388        }
389
390        @Override
391        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
392            if (isConcurrentStoreAndDispatchQueues()) {
393                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
394                StoreQueueTask task = null;
395                synchronized (asyncTaskMap) {
396                    task = (StoreQueueTask) asyncTaskMap.get(key);
397                }
398                if (task != null) {
399                    if (!task.cancel()) {
400                        try {
401
402                            task.future.get();
403                        } catch (InterruptedException e) {
404                            throw new InterruptedIOException(e.toString());
405                        } catch (Exception ignored) {
406                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
407                        }
408                        removeMessage(context, ack);
409                    } else {
410                        synchronized (asyncTaskMap) {
411                            asyncTaskMap.remove(key);
412                        }
413                    }
414                } else {
415                    removeMessage(context, ack);
416                }
417            } else {
418                removeMessage(context, ack);
419            }
420        }
421
422        public void addMessage(ConnectionContext context, Message message) throws IOException {
423            KahaAddMessageCommand command = new KahaAddMessageCommand();
424            command.setDestination(dest);
425            command.setMessageId(message.getMessageId().toString());
426            command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
427            command.setPriority(message.getPriority());
428            command.setPrioritySupported(isPrioritizedMessages());
429            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
430            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
431            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
432
433        }
434
435        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
436            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
437            command.setDestination(dest);
438            command.setMessageId(ack.getLastMessageId().toString());
439            command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
440
441            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
442            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
443            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
444        }
445
446        public void removeAllMessages(ConnectionContext context) throws IOException {
447            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
448            command.setDestination(dest);
449            store(command, true, null, null);
450        }
451
452        public Message getMessage(MessageId identity) throws IOException {
453            final String key = identity.toString();
454
455            // Hopefully one day the page file supports concurrent read
456            // operations... but for now we must
457            // externally synchronize...
458            Location location;
459            indexLock.writeLock().lock();
460            try {
461                location = findMessageLocation(key, dest);
462            }finally {
463                indexLock.writeLock().unlock();
464            }
465            if (location == null) {
466                return null;
467            }
468
469            return loadMessage(location);
470        }
471
472        public int getMessageCount() throws IOException {
473            try {
474                lockAsyncJobQueue();
475                indexLock.writeLock().lock();
476                try {
477                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
478                        public Integer execute(Transaction tx) throws IOException {
479                            // Iterate through all index entries to get a count
480                            // of
481                            // messages in the destination.
482                            StoredDestination sd = getStoredDestination(dest, tx);
483                            int rc = 0;
484                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
485                                    .hasNext();) {
486                                iterator.next();
487                                rc++;
488                            }
489                            return rc;
490                        }
491                    });
492                }finally {
493                    indexLock.writeLock().unlock();
494                }
495            } finally {
496                unlockAsyncJobQueue();
497            }
498        }
499
500        @Override
501        public boolean isEmpty() throws IOException {
502            indexLock.writeLock().lock();
503            try {
504                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
505                    public Boolean execute(Transaction tx) throws IOException {
506                        // Iterate through all index entries to get a count of
507                        // messages in the destination.
508                        StoredDestination sd = getStoredDestination(dest, tx);
509                        return sd.locationIndex.isEmpty(tx);
510                    }
511                });
512            }finally {
513                indexLock.writeLock().unlock();
514            }
515        }
516
517        public void recover(final MessageRecoveryListener listener) throws Exception {
518            // recovery may involve expiry which will modify
519            indexLock.writeLock().lock();
520            try {
521                pageFile.tx().execute(new Transaction.Closure<Exception>() {
522                    public void execute(Transaction tx) throws Exception {
523                        StoredDestination sd = getStoredDestination(dest, tx);
524                        sd.orderIndex.resetCursorPosition();
525                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
526                                .hasNext(); ) {
527                            Entry<Long, MessageKeys> entry = iterator.next();
528                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
529                                continue;
530                            }
531                            Message msg = loadMessage(entry.getValue().location);
532                            listener.recoverMessage(msg);
533                        }
534                    }
535                });
536            }finally {
537                indexLock.writeLock().unlock();
538            }
539        }
540
541
542        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
543            indexLock.writeLock().lock();
544            try {
545                pageFile.tx().execute(new Transaction.Closure<Exception>() {
546                    public void execute(Transaction tx) throws Exception {
547                        StoredDestination sd = getStoredDestination(dest, tx);
548                        Entry<Long, MessageKeys> entry = null;
549                        int counter = 0;
550                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
551                             listener.hasSpace() && iterator.hasNext(); ) {
552                            entry = iterator.next();
553                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
554                                continue;
555                            }
556                            Message msg = loadMessage(entry.getValue().location);
557                            listener.recoverMessage(msg);
558                            counter++;
559                            if (counter >= maxReturned) {
560                                break;
561                            }
562                        }
563                        sd.orderIndex.stoppedIterating();
564                    }
565                });
566            }finally {
567                indexLock.writeLock().unlock();
568            }
569        }
570
571        public void resetBatching() {
572            if (pageFile.isLoaded()) {
573                indexLock.writeLock().lock();
574                try {
575                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
576                        public void execute(Transaction tx) throws Exception {
577                            StoredDestination sd = getExistingStoredDestination(dest, tx);
578                            if (sd != null) {
579                                sd.orderIndex.resetCursorPosition();}
580                            }
581                        });
582                } catch (Exception e) {
583                    LOG.error("Failed to reset batching",e);
584                }finally {
585                    indexLock.writeLock().unlock();
586                }
587            }
588        }
589
590        @Override
591        public void setBatch(MessageId identity) throws IOException {
592            try {
593                final String key = identity.toString();
594                lockAsyncJobQueue();
595
596                // Hopefully one day the page file supports concurrent read
597                // operations... but for now we must
598                // externally synchronize...
599
600                indexLock.writeLock().lock();
601                try {
602                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
603                        public void execute(Transaction tx) throws IOException {
604                            StoredDestination sd = getStoredDestination(dest, tx);
605                            Long location = sd.messageIdIndex.get(tx, key);
606                            if (location != null) {
607                                sd.orderIndex.setBatch(tx, location);
608                            }
609                        }
610                    });
611                } finally {
612                    indexLock.writeLock().unlock();
613                }
614            } finally {
615                unlockAsyncJobQueue();
616            }
617        }
618
619        @Override
620        public void setMemoryUsage(MemoryUsage memoeyUSage) {
621        }
622        @Override
623        public void start() throws Exception {
624            super.start();
625        }
626        @Override
627        public void stop() throws Exception {
628            super.stop();
629        }
630
631        protected void lockAsyncJobQueue() {
632            try {
633                this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
634            } catch (Exception e) {
635                LOG.error("Failed to lock async jobs for " + this.destination, e);
636            }
637        }
638
639        protected void unlockAsyncJobQueue() {
640            this.localDestinationSemaphore.release(this.maxAsyncJobs);
641        }
642
643        protected void acquireLocalAsyncLock() {
644            try {
645                this.localDestinationSemaphore.acquire();
646            } catch (InterruptedException e) {
647                LOG.error("Failed to aquire async lock for " + this.destination, e);
648            }
649        }
650
651        protected void releaseLocalAsyncLock() {
652            this.localDestinationSemaphore.release();
653        }
654
655    }
656
657    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
658        private final AtomicInteger subscriptionCount = new AtomicInteger();
659        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
660            super(destination);
661            this.subscriptionCount.set(getAllSubscriptions().length);
662            asyncTopicMaps.add(asyncTaskMap);
663        }
664
665        @Override
666        public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
667                throws IOException {
668            if (isConcurrentStoreAndDispatchTopics()) {
669                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
670                result.aquireLocks();
671                addTopicTask(this, result);
672                return result.getFuture();
673            } else {
674                return super.asyncAddTopicMessage(context, message);
675            }
676        }
677
678        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
679                                MessageId messageId, MessageAck ack)
680                throws IOException {
681            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
682            if (isConcurrentStoreAndDispatchTopics()) {
683                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
684                StoreTopicTask task = null;
685                synchronized (asyncTaskMap) {
686                    task = (StoreTopicTask) asyncTaskMap.get(key);
687                }
688                if (task != null) {
689                    if (task.addSubscriptionKey(subscriptionKey)) {
690                        removeTopicTask(this, messageId);
691                        if (task.cancel()) {
692                            synchronized (asyncTaskMap) {
693                                asyncTaskMap.remove(key);
694                            }
695                        }
696                    }
697                } else {
698                    doAcknowledge(context, subscriptionKey, messageId, ack);
699                }
700            } else {
701                doAcknowledge(context, subscriptionKey, messageId, ack);
702            }
703        }
704
705        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
706                throws IOException {
707            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
708            command.setDestination(dest);
709            command.setSubscriptionKey(subscriptionKey);
710            command.setMessageId(messageId.toString());
711            command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
712            if (ack != null && ack.isUnmatchedAck()) {
713                command.setAck(UNMATCHED);
714            }
715            store(command, false, null, null);
716        }
717
718        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
719            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
720                    .getSubscriptionName());
721            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
722            command.setDestination(dest);
723            command.setSubscriptionKey(subscriptionKey.toString());
724            command.setRetroactive(retroactive);
725            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
726            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
727            store(command, isEnableJournalDiskSyncs() && true, null, null);
728            this.subscriptionCount.incrementAndGet();
729        }
730
731        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
732            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
733            command.setDestination(dest);
734            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
735            store(command, isEnableJournalDiskSyncs() && true, null, null);
736            this.subscriptionCount.decrementAndGet();
737        }
738
739        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
740
741            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
742            indexLock.writeLock().lock();
743            try {
744                pageFile.tx().execute(new Transaction.Closure<IOException>() {
745                    public void execute(Transaction tx) throws IOException {
746                        StoredDestination sd = getStoredDestination(dest, tx);
747                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
748                                .hasNext();) {
749                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
750                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
751                                    .getValue().getSubscriptionInfo().newInput()));
752                            subscriptions.add(info);
753
754                        }
755                    }
756                });
757            }finally {
758                indexLock.writeLock().unlock();
759            }
760
761            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
762            subscriptions.toArray(rc);
763            return rc;
764        }
765
766        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
767            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
768            indexLock.writeLock().lock();
769            try {
770                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
771                    public SubscriptionInfo execute(Transaction tx) throws IOException {
772                        StoredDestination sd = getStoredDestination(dest, tx);
773                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
774                        if (command == null) {
775                            return null;
776                        }
777                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
778                                .getSubscriptionInfo().newInput()));
779                    }
780                });
781            }finally {
782                indexLock.writeLock().unlock();
783            }
784        }
785
786        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
787            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
788            indexLock.writeLock().lock();
789            try {
790                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
791                    public Integer execute(Transaction tx) throws IOException {
792                        StoredDestination sd = getStoredDestination(dest, tx);
793                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
794                        if (cursorPos == null) {
795                            // The subscription might not exist.
796                            return 0;
797                        }
798
799                        return (int) getStoredMessageCount(tx, sd, subscriptionKey);
800                    }
801                });
802            }finally {
803                indexLock.writeLock().unlock();
804            }
805        }
806
807        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
808                throws Exception {
809            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
810            @SuppressWarnings("unused")
811            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
812            indexLock.writeLock().lock();
813            try {
814                pageFile.tx().execute(new Transaction.Closure<Exception>() {
815                    public void execute(Transaction tx) throws Exception {
816                        StoredDestination sd = getStoredDestination(dest, tx);
817                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
818                        sd.orderIndex.setBatch(tx, cursorPos);
819                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
820                                .hasNext();) {
821                            Entry<Long, MessageKeys> entry = iterator.next();
822                            listener.recoverMessage(loadMessage(entry.getValue().location));
823                        }
824                        sd.orderIndex.resetCursorPosition();
825                    }
826                });
827            }finally {
828                indexLock.writeLock().unlock();
829            }
830        }
831
832        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
833                final MessageRecoveryListener listener) throws Exception {
834            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
835            @SuppressWarnings("unused")
836            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
837            indexLock.writeLock().lock();
838            try {
839                pageFile.tx().execute(new Transaction.Closure<Exception>() {
840                    public void execute(Transaction tx) throws Exception {
841                        StoredDestination sd = getStoredDestination(dest, tx);
842                        sd.orderIndex.resetCursorPosition();
843                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
844                        if (moc == null) {
845                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
846                            if (pos == null) {
847                                // sub deleted
848                                return;
849                            }
850                            sd.orderIndex.setBatch(tx, pos);
851                            moc = sd.orderIndex.cursor;
852                        } else {
853                            sd.orderIndex.cursor.sync(moc);
854                        }
855
856                        Entry<Long, MessageKeys> entry = null;
857                        int counter = 0;
858                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
859                                .hasNext();) {
860                            entry = iterator.next();
861                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
862                                counter++;
863                            }
864                            if (counter >= maxReturned || listener.hasSpace() == false) {
865                                break;
866                            }
867                        }
868                        sd.orderIndex.stoppedIterating();
869                        if (entry != null) {
870                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
871                            sd.subscriptionCursors.put(subscriptionKey, copy);
872                        }
873                    }
874                });
875            }finally {
876                indexLock.writeLock().unlock();
877            }
878        }
879
880        public void resetBatching(String clientId, String subscriptionName) {
881            try {
882                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
883                indexLock.writeLock().lock();
884                try {
885                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
886                        public void execute(Transaction tx) throws IOException {
887                            StoredDestination sd = getStoredDestination(dest, tx);
888                            sd.subscriptionCursors.remove(subscriptionKey);
889                        }
890                    });
891                }finally {
892                    indexLock.writeLock().unlock();
893                }
894            } catch (IOException e) {
895                throw new RuntimeException(e);
896            }
897        }
898    }
899
900    String subscriptionKey(String clientId, String subscriptionName) {
901        return clientId + ":" + subscriptionName;
902    }
903
904    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
905        return this.transactionStore.proxy(new KahaDBMessageStore(destination));
906    }
907
908    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
909        return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
910    }
911
912    /**
913     * Cleanup method to remove any state associated with the given destination.
914     * This method does not stop the message store (it might not be cached).
915     *
916     * @param destination
917     *            Destination to forget
918     */
919    public void removeQueueMessageStore(ActiveMQQueue destination) {
920    }
921
922    /**
923     * Cleanup method to remove any state associated with the given destination
924     * This method does not stop the message store (it might not be cached).
925     *
926     * @param destination
927     *            Destination to forget
928     */
929    public void removeTopicMessageStore(ActiveMQTopic destination) {
930    }
931
932    public void deleteAllMessages() throws IOException {
933        deleteAllMessages = true;
934    }
935
936    public Set<ActiveMQDestination> getDestinations() {
937        try {
938            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
939            indexLock.writeLock().lock();
940            try {
941                pageFile.tx().execute(new Transaction.Closure<IOException>() {
942                    public void execute(Transaction tx) throws IOException {
943                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
944                                .hasNext();) {
945                            Entry<String, StoredDestination> entry = iterator.next();
946                            if (!isEmptyTopic(entry, tx)) {
947                                rc.add(convert(entry.getKey()));
948                            }
949                        }
950                    }
951
952                    private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
953                            throws IOException {
954                        boolean isEmptyTopic = false;
955                        ActiveMQDestination dest = convert(entry.getKey());
956                        if (dest.isTopic()) {
957                            StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
958                            if (loadedStore.subscriptionAcks.isEmpty(tx)) {
959                                isEmptyTopic = true;
960                            }
961                        }
962                        return isEmptyTopic;
963                    }
964                });
965            }finally {
966                indexLock.writeLock().unlock();
967            }
968            return rc;
969        } catch (IOException e) {
970            throw new RuntimeException(e);
971        }
972    }
973
974    public long getLastMessageBrokerSequenceId() throws IOException {
975        return 0;
976    }
977
978    public long getLastProducerSequenceId(ProducerId id) {
979        indexLock.readLock().lock();
980        try {
981            return metadata.producerSequenceIdTracker.getLastSeqId(id);
982        } finally {
983            indexLock.readLock().unlock();
984        }
985    }
986
987    public long size() {
988        return storeSize.get();
989    }
990
991    public void beginTransaction(ConnectionContext context) throws IOException {
992        throw new IOException("Not yet implemented.");
993    }
994    public void commitTransaction(ConnectionContext context) throws IOException {
995        throw new IOException("Not yet implemented.");
996    }
997    public void rollbackTransaction(ConnectionContext context) throws IOException {
998        throw new IOException("Not yet implemented.");
999    }
1000
1001    public void checkpoint(boolean sync) throws IOException {
1002        super.checkpointCleanup(sync);
1003    }
1004
1005    // /////////////////////////////////////////////////////////////////
1006    // Internal helper methods.
1007    // /////////////////////////////////////////////////////////////////
1008
1009    /**
1010     * @param location
1011     * @return
1012     * @throws IOException
1013     */
1014    Message loadMessage(Location location) throws IOException {
1015        KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
1016        Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1017        return msg;
1018    }
1019
1020    // /////////////////////////////////////////////////////////////////
1021    // Internal conversion methods.
1022    // /////////////////////////////////////////////////////////////////
1023
1024    KahaLocation convert(Location location) {
1025        KahaLocation rc = new KahaLocation();
1026        rc.setLogId(location.getDataFileId());
1027        rc.setOffset(location.getOffset());
1028        return rc;
1029    }
1030
1031    KahaDestination convert(ActiveMQDestination dest) {
1032        KahaDestination rc = new KahaDestination();
1033        rc.setName(dest.getPhysicalName());
1034        switch (dest.getDestinationType()) {
1035        case ActiveMQDestination.QUEUE_TYPE:
1036            rc.setType(DestinationType.QUEUE);
1037            return rc;
1038        case ActiveMQDestination.TOPIC_TYPE:
1039            rc.setType(DestinationType.TOPIC);
1040            return rc;
1041        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1042            rc.setType(DestinationType.TEMP_QUEUE);
1043            return rc;
1044        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1045            rc.setType(DestinationType.TEMP_TOPIC);
1046            return rc;
1047        default:
1048            return null;
1049        }
1050    }
1051
1052    ActiveMQDestination convert(String dest) {
1053        int p = dest.indexOf(":");
1054        if (p < 0) {
1055            throw new IllegalArgumentException("Not in the valid destination format");
1056        }
1057        int type = Integer.parseInt(dest.substring(0, p));
1058        String name = dest.substring(p + 1);
1059        return convert(type, name);
1060    }
1061
1062    private ActiveMQDestination convert(KahaDestination commandDestination) {
1063        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1064    }
1065
1066    private ActiveMQDestination convert(int type, String name) {
1067        switch (KahaDestination.DestinationType.valueOf(type)) {
1068        case QUEUE:
1069            return new ActiveMQQueue(name);
1070        case TOPIC:
1071            return new ActiveMQTopic(name);
1072        case TEMP_QUEUE:
1073            return new ActiveMQTempQueue(name);
1074        case TEMP_TOPIC:
1075            return new ActiveMQTempTopic(name);
1076        default:
1077            throw new IllegalArgumentException("Not in the valid destination format");
1078        }
1079    }
1080
1081    public TransactionIdTransformer getTransactionIdTransformer() {
1082        return transactionIdTransformer;
1083    }
1084
1085    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1086        this.transactionIdTransformer = transactionIdTransformer;
1087    }
1088
1089    static class AsyncJobKey {
1090        MessageId id;
1091        ActiveMQDestination destination;
1092
1093        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1094            this.id = id;
1095            this.destination = destination;
1096        }
1097
1098        @Override
1099        public boolean equals(Object obj) {
1100            if (obj == this) {
1101                return true;
1102            }
1103            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1104                    && destination.equals(((AsyncJobKey) obj).destination);
1105        }
1106
1107        @Override
1108        public int hashCode() {
1109            return id.hashCode() + destination.hashCode();
1110        }
1111
1112        @Override
1113        public String toString() {
1114            return destination.getPhysicalName() + "-" + id;
1115        }
1116    }
1117
1118    public interface StoreTask {
1119        public boolean cancel();
1120
1121        public void aquireLocks();
1122
1123        public void releaseLocks();
1124    }
1125
1126    class StoreQueueTask implements Runnable, StoreTask {
1127        protected final Message message;
1128        protected final ConnectionContext context;
1129        protected final KahaDBMessageStore store;
1130        protected final InnerFutureTask future;
1131        protected final AtomicBoolean done = new AtomicBoolean();
1132        protected final AtomicBoolean locked = new AtomicBoolean();
1133
1134        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1135            this.store = store;
1136            this.context = context;
1137            this.message = message;
1138            this.future = new InnerFutureTask(this);
1139        }
1140
1141        public Future<Object> getFuture() {
1142            return this.future;
1143        }
1144
1145        public boolean cancel() {
1146            if (this.done.compareAndSet(false, true)) {
1147                return this.future.cancel(false);
1148            }
1149            return false;
1150        }
1151
1152        public void aquireLocks() {
1153            if (this.locked.compareAndSet(false, true)) {
1154                try {
1155                    globalQueueSemaphore.acquire();
1156                    store.acquireLocalAsyncLock();
1157                    message.incrementReferenceCount();
1158                } catch (InterruptedException e) {
1159                    LOG.warn("Failed to aquire lock", e);
1160                }
1161            }
1162
1163        }
1164
1165        public void releaseLocks() {
1166            if (this.locked.compareAndSet(true, false)) {
1167                store.releaseLocalAsyncLock();
1168                globalQueueSemaphore.release();
1169                message.decrementReferenceCount();
1170            }
1171        }
1172
1173        public void run() {
1174            this.store.doneTasks++;
1175            try {
1176                if (this.done.compareAndSet(false, true)) {
1177                    this.store.addMessage(context, message);
1178                    removeQueueTask(this.store, this.message.getMessageId());
1179                    this.future.complete();
1180                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1181                    System.err.println(this.store.dest.getName() + " cancelled: "
1182                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1183                    this.store.canceledTasks = this.store.doneTasks = 0;
1184                }
1185            } catch (Exception e) {
1186                this.future.setException(e);
1187            }
1188        }
1189
1190        protected Message getMessage() {
1191            return this.message;
1192        }
1193
1194        private class InnerFutureTask extends FutureTask<Object> {
1195
1196            public InnerFutureTask(Runnable runnable) {
1197                super(runnable, null);
1198
1199            }
1200
1201            public void setException(final Exception e) {
1202                super.setException(e);
1203            }
1204
1205            public void complete() {
1206                super.set(null);
1207            }
1208        }
1209    }
1210
1211    class StoreTopicTask extends StoreQueueTask {
1212        private final int subscriptionCount;
1213        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1214        private final KahaDBTopicMessageStore topicStore;
1215        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1216                int subscriptionCount) {
1217            super(store, context, message);
1218            this.topicStore = store;
1219            this.subscriptionCount = subscriptionCount;
1220
1221        }
1222
1223        @Override
1224        public void aquireLocks() {
1225            if (this.locked.compareAndSet(false, true)) {
1226                try {
1227                    globalTopicSemaphore.acquire();
1228                    store.acquireLocalAsyncLock();
1229                    message.incrementReferenceCount();
1230                } catch (InterruptedException e) {
1231                    LOG.warn("Failed to aquire lock", e);
1232                }
1233            }
1234
1235        }
1236
1237        @Override
1238        public void releaseLocks() {
1239            if (this.locked.compareAndSet(true, false)) {
1240                message.decrementReferenceCount();
1241                store.releaseLocalAsyncLock();
1242                globalTopicSemaphore.release();
1243            }
1244        }
1245
1246        /**
1247         * add a key
1248         *
1249         * @param key
1250         * @return true if all acknowledgements received
1251         */
1252        public boolean addSubscriptionKey(String key) {
1253            synchronized (this.subscriptionKeys) {
1254                this.subscriptionKeys.add(key);
1255            }
1256            return this.subscriptionKeys.size() >= this.subscriptionCount;
1257        }
1258
1259        @Override
1260        public void run() {
1261            this.store.doneTasks++;
1262            try {
1263                if (this.done.compareAndSet(false, true)) {
1264                    this.topicStore.addMessage(context, message);
1265                    // apply any acks we have
1266                    synchronized (this.subscriptionKeys) {
1267                        for (String key : this.subscriptionKeys) {
1268                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1269
1270                        }
1271                    }
1272                    removeTopicTask(this.topicStore, this.message.getMessageId());
1273                    this.future.complete();
1274                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1275                    System.err.println(this.store.dest.getName() + " cancelled: "
1276                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1277                    this.store.canceledTasks = this.store.doneTasks = 0;
1278                }
1279            } catch (Exception e) {
1280                this.future.setException(e);
1281            }
1282        }
1283    }
1284
1285    public class StoreTaskExecutor extends ThreadPoolExecutor {
1286
1287        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1288            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1289        }
1290
1291        protected void afterExecute(Runnable runnable, Throwable throwable) {
1292            super.afterExecute(runnable, throwable);
1293
1294            if (runnable instanceof StoreTask) {
1295               ((StoreTask)runnable).releaseLocks();
1296            }
1297
1298        }
1299    }
1300}