001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.kahadb.index;
018
019import java.io.IOException;
020import java.lang.ref.WeakReference;
021import java.util.Iterator;
022import java.util.Map;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.AtomicLong;
025
026import org.apache.kahadb.index.ListNode.ListIterator;
027import org.apache.kahadb.page.Page;
028import org.apache.kahadb.page.PageFile;
029import org.apache.kahadb.page.Transaction;
030import org.apache.kahadb.util.Marshaller;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034public class ListIndex<Key,Value> implements Index<Key,Value> {
035
036    private static final Logger LOG = LoggerFactory.getLogger(ListIndex.class);
037    public  final static long NOT_SET = -1;
038    protected PageFile pageFile;
039    protected long headPageId;
040    protected long tailPageId;
041    private AtomicLong size = new AtomicLong(0);
042
043    protected AtomicBoolean loaded = new AtomicBoolean();
044
045    private ListNode.NodeMarshaller<Key, Value> marshaller;
046    private Marshaller<Key> keyMarshaller;
047    private Marshaller<Value> valueMarshaller;
048
049    public ListIndex() {
050    }
051
052    public ListIndex(PageFile pageFile, long headPageId) {
053        this.pageFile = pageFile;
054        setHeadPageId(headPageId);
055    }
056
057    @SuppressWarnings("rawtypes")
058    public ListIndex(PageFile pageFile, Page page) {
059        this(pageFile, page.getPageId());
060    }
061
062    synchronized public void load(Transaction tx) throws IOException {
063        if (loaded.compareAndSet(false, true)) {
064            LOG.debug("loading");
065            if( keyMarshaller == null ) {
066                throw new IllegalArgumentException("The key marshaller must be set before loading the ListIndex");
067            }
068            if( valueMarshaller == null ) {
069                throw new IllegalArgumentException("The value marshaller must be set before loading the ListIndex");
070            }
071
072            marshaller = new ListNode.NodeMarshaller<Key, Value>(keyMarshaller, valueMarshaller);
073            final Page<ListNode<Key,Value>> p = tx.load(getHeadPageId(), null);
074            if( p.getType() == Page.PAGE_FREE_TYPE ) {
075                 // Need to initialize it..
076                ListNode<Key, Value> root = createNode(p);
077                storeNode(tx, root, true);
078                setHeadPageId(p.getPageId());
079                setTailPageId(getHeadPageId());
080            } else {
081                ListNode<Key, Value> node = loadNode(tx, getHeadPageId());
082                setTailPageId(getHeadPageId());
083                size.addAndGet(node.size(tx));
084                while (node.getNext() != NOT_SET ) {
085                    node = loadNode(tx, node.getNext());
086                    size.addAndGet(node.size(tx));
087                    setTailPageId(node.getPageId());
088                }
089            }
090        }
091    }
092
093    synchronized public void unload(Transaction tx) {
094        if (loaded.compareAndSet(true, false)) {
095        }
096    }
097
098    protected ListNode<Key,Value> getHead(Transaction tx) throws IOException {
099        return loadNode(tx, getHeadPageId());
100    }
101
102    protected ListNode<Key,Value> getTail(Transaction tx) throws IOException {
103        return loadNode(tx, getTailPageId());
104    }
105
106    synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
107        assertLoaded();
108
109        if (size.get() == 0) {
110            return false;
111        }
112
113        for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
114            Map.Entry<Key,Value> candidate = iterator.next();
115            if (key.equals(candidate.getKey())) {
116                return true;
117            }
118        }
119        return false;
120    }
121
122    private ListNode<Key, Value> lastGetNodeCache = null;
123    private Map.Entry<Key, Value> lastGetEntryCache = null;
124    private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null);
125
126    @SuppressWarnings({ "rawtypes", "unchecked" })
127    synchronized public Value get(Transaction tx, Key key) throws IOException {
128        assertLoaded();
129        for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) {
130            Map.Entry<Key,Value> candidate = iterator.next();
131            if (key.equals(candidate.getKey())) {
132                this.lastGetNodeCache = ((ListIterator) iterator).getCurrent();
133                this.lastGetEntryCache = candidate;
134                this.lastCacheTxSrc = new WeakReference<Transaction>(tx);
135                return candidate.getValue();
136            }
137        }
138        return null;
139    }
140
141    /**
142     * Update the value of the item with the given key in the list if ot exists, otherwise
143     * it appends the value to the end of the list.
144     *
145     * @return the old value contained in the list if one exists or null.
146     */
147    @SuppressWarnings({ "rawtypes" })
148    synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
149
150        Value oldValue = null;
151
152        if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
153
154            if(lastGetEntryCache.getKey().equals(key)) {
155                oldValue = lastGetEntryCache.setValue(value);
156                lastGetEntryCache.setValue(value);
157                lastGetNodeCache.storeUpdate(tx);
158                flushCache();
159                return oldValue;
160            }
161
162            // This searches from the last location of a call to get for the element to replace
163            // all the way to the end of the ListIndex.
164            Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
165            while (iterator.hasNext()) {
166                Map.Entry<Key, Value> entry = iterator.next();
167                if (entry.getKey().equals(key)) {
168                    oldValue = entry.setValue(value);
169                    ((ListIterator) iterator).getCurrent().storeUpdate(tx);
170                    flushCache();
171                    return oldValue;
172                }
173            }
174        } else {
175            flushCache();
176        }
177
178        // Not found because the cache wasn't set or its not at the end of the list so we
179        // start from the beginning and go to the cached location or the end, then we do
180        // an add if its not found.
181        Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
182        while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) {
183            Map.Entry<Key, Value> entry = iterator.next();
184            if (entry.getKey().equals(key)) {
185                oldValue = entry.setValue(value);
186                ((ListIterator) iterator).getCurrent().storeUpdate(tx);
187                flushCache();
188                return oldValue;
189            }
190        }
191
192        // Not found so add it last.
193        flushCache();
194
195        return add(tx, key, value);
196    }
197
198    synchronized public Value add(Transaction tx, Key key, Value value) throws IOException {
199        assertLoaded();
200        getTail(tx).put(tx, key, value);
201        size.incrementAndGet();
202        flushCache();
203        return null;
204    }
205
206    synchronized public Value addFirst(Transaction tx, Key key, Value value) throws IOException {
207        assertLoaded();
208        getHead(tx).addFirst(tx, key, value);
209        size.incrementAndGet();
210        flushCache();
211        return null;
212    }
213
214    @SuppressWarnings("rawtypes")
215    synchronized public Value remove(Transaction tx, Key key) throws IOException {
216        assertLoaded();
217
218        if (size.get() == 0) {
219            return null;
220        }
221
222        if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
223
224            // This searches from the last location of a call to get for the element to remove
225            // all the way to the end of the ListIndex.
226            Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx);
227            while (iterator.hasNext()) {
228                Map.Entry<Key, Value> entry = iterator.next();
229                if (entry.getKey().equals(key)) {
230                    iterator.remove();
231                    flushCache();
232                    return entry.getValue();
233                }
234            }
235        } else {
236            flushCache();
237        }
238
239        // Not found because the cache wasn't set or its not at the end of the list so we
240        // start from the beginning and go to the cached location or the end to find the
241        // element to remove.
242        Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
243        while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) {
244            Map.Entry<Key, Value> entry = iterator.next();
245            if (entry.getKey().equals(key)) {
246                iterator.remove();
247                flushCache();
248                return entry.getValue();
249            }
250        }
251
252        return null;
253    }
254
255    public void onRemove() {
256        size.decrementAndGet();
257        flushCache();
258    }
259
260    public boolean isTransient() {
261        return false;
262    }
263
264    synchronized public void clear(Transaction tx) throws IOException {
265        for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) {
266            ListNode<Key,Value>candidate = iterator.next();
267            candidate.clear(tx);
268            // break up the transaction
269            tx.commit();
270        }
271        flushCache();
272        size.set(0);
273    }
274
275    synchronized public Iterator<ListNode<Key, Value>> listNodeIterator(Transaction tx) throws IOException {
276        return getHead(tx).listNodeIterator(tx);
277    }
278
279    synchronized public boolean isEmpty(final Transaction tx) throws IOException {
280        return getHead(tx).isEmpty(tx);
281    }
282
283    synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
284        return getHead(tx).iterator(tx);
285    }
286
287    synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, long initialPosition) throws IOException {
288        return getHead(tx).iterator(tx, initialPosition);
289    }
290
291    synchronized public Map.Entry<Key,Value> getFirst(Transaction tx) throws IOException {
292        return getHead(tx).getFirst(tx);
293    }
294
295    synchronized public Map.Entry<Key,Value> getLast(Transaction tx) throws IOException {
296        return getTail(tx).getLast(tx);
297    }
298
299    private void assertLoaded() throws IllegalStateException {
300        if( !loaded.get() ) {
301            throw new IllegalStateException("TheListIndex is not loaded");
302        }
303    }
304
305    ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
306        Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
307        ListNode<Key, Value> node = page.get();
308        node.setPage(page);
309        node.setContainingList(this);
310        return node;
311    }
312
313    ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
314        ListNode<Key,Value> node = new ListNode<Key,Value>();
315        node.setPage(page);
316        page.set(node);
317        node.setContainingList(this);
318        return node;
319    }
320
321    public ListNode<Key,Value> createNode(Transaction tx) throws IOException {
322        return createNode(tx.<ListNode<Key,Value>>load(tx.<ListNode<Key,Value>>allocate().getPageId(), null));
323    }
324
325    public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException {
326        tx.store(node.getPage(), marshaller, overflow);
327        flushCache();
328    }
329
330    public PageFile getPageFile() {
331        return pageFile;
332    }
333
334    public void setPageFile(PageFile pageFile) {
335        this.pageFile = pageFile;
336    }
337
338    public long getHeadPageId() {
339        return headPageId;
340    }
341
342    public void setHeadPageId(long headPageId) {
343        this.headPageId = headPageId;
344    }
345
346    public Marshaller<Key> getKeyMarshaller() {
347        return keyMarshaller;
348    }
349    public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
350        this.keyMarshaller = keyMarshaller;
351    }
352
353    public Marshaller<Value> getValueMarshaller() {
354        return valueMarshaller;
355    }
356    public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
357        this.valueMarshaller = valueMarshaller;
358    }
359
360    public void setTailPageId(long tailPageId) {
361        this.tailPageId = tailPageId;
362    }
363
364    public long getTailPageId() {
365       return tailPageId;
366    }
367
368    public long size() {
369        return size.get();
370    }
371
372    private void flushCache() {
373        this.lastGetEntryCache = null;
374        this.lastGetNodeCache = null;
375        this.lastCacheTxSrc.clear();
376    }
377}