001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.kaha.impl.data;
018
019import java.io.File;
020import java.io.FilenameFilter;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.apache.activemq.kaha.Marshaller;
030import org.apache.activemq.kaha.StoreLocation;
031import org.apache.activemq.kaha.impl.DataManager;
032import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
033import org.apache.activemq.util.IOExceptionSupport;
034import org.apache.activemq.util.IOHelper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Manages DataFiles
040 * 
041 * 
042 */
043public final class DataManagerImpl implements DataManager {
044
045    public static final int ITEM_HEAD_SIZE = 5; // type + length
046    public static final byte DATA_ITEM_TYPE = 1;
047    public static final byte REDO_ITEM_TYPE = 2;
048    public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32;
049    
050    private static final Logger LOG = LoggerFactory.getLogger(DataManagerImpl.class);
051    private static final String NAME_PREFIX = "data-";
052    
053    private final File directory;
054    private final String name;
055    private SyncDataFileReader reader;
056    private SyncDataFileWriter writer;
057    private DataFile currentWriteFile;
058    private long maxFileLength = MAX_FILE_LENGTH;
059    private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
060    private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
061    private String dataFilePrefix;
062    private final AtomicLong storeSize;
063
064    public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
065        this.directory = dir;
066        this.name = name;
067        this.storeSize=storeSize;
068
069        dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-");
070        // build up list of current dataFiles
071        File[] files = dir.listFiles(new FilenameFilter() {
072            public boolean accept(File dir, String n) {
073                return dir.equals(directory) && n.startsWith(dataFilePrefix);
074            }
075        });
076        if (files != null) {
077            for (int i = 0; i < files.length; i++) {
078                File file = files[i];
079                String n = file.getName();
080                String numStr = n.substring(dataFilePrefix.length(), n.length());
081                int num = Integer.parseInt(numStr);
082                DataFile dataFile = new DataFile(file, num);
083                storeSize.addAndGet(dataFile.getLength());
084                fileMap.put(dataFile.getNumber(), dataFile);
085                if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
086                    currentWriteFile = dataFile;
087                }
088            }
089        }
090    }
091
092    private DataFile createAndAddDataFile(int num) {
093        String fileName = dataFilePrefix + num;
094        File file = new File(directory, fileName);
095        DataFile result = new DataFile(file, num);
096        fileMap.put(result.getNumber(), result);
097        return result;
098    }
099
100    /*
101     * (non-Javadoc)
102     * 
103     * @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
104     */
105    public String getName() {
106        return name;
107    }
108
109    synchronized DataFile findSpaceForData(DataItem item) throws IOException {
110        if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) {
111            int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1;
112            if (currentWriteFile != null && currentWriteFile.isUnused()) {
113                removeDataFile(currentWriteFile);
114            }
115            currentWriteFile = createAndAddDataFile(nextNum);
116        }
117        item.setOffset(currentWriteFile.getLength());
118        item.setFile(currentWriteFile.getNumber().intValue());
119        int len = item.getSize() + ITEM_HEAD_SIZE;
120        currentWriteFile.incrementLength(len);
121        storeSize.addAndGet(len);
122        return currentWriteFile;
123    }
124
125    DataFile getDataFile(StoreLocation item) throws IOException {
126        Integer key = Integer.valueOf(item.getFile());
127        DataFile dataFile = fileMap.get(key);
128        if (dataFile == null) {
129            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
130            throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile());
131        }
132        return dataFile;
133    }
134
135    /*
136     * (non-Javadoc)
137     * 
138     * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller,
139     *      org.apache.activemq.kaha.StoreLocation)
140     */
141    public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException {
142        return getReader().readItem(marshaller, item);
143    }
144
145    /*
146     * (non-Javadoc)
147     * 
148     * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller,
149     *      java.lang.Object)
150     */
151    public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
152        return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE);
153    }
154
155    /*
156     * (non-Javadoc)
157     * 
158     * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
159     */
160    public synchronized StoreLocation storeRedoItem(Object payload) throws IOException {
161        return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
162    }
163
164    /*
165     * (non-Javadoc)
166     * 
167     * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation,
168     *      org.apache.activemq.kaha.Marshaller, java.lang.Object)
169     */
170    public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload)
171        throws IOException {
172        getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE);
173    }
174
175    /*
176     * (non-Javadoc)
177     * 
178     * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
179     */
180    public synchronized void recoverRedoItems(RedoListener listener) throws IOException {
181
182        // Nothing to recover if there is no current file.
183        if (currentWriteFile == null) {
184            return;
185        }
186
187        DataItem item = new DataItem();
188        item.setFile(currentWriteFile.getNumber().intValue());
189        item.setOffset(0);
190        while (true) {
191            byte type;
192            try {
193                type = getReader().readDataItemSize(item);
194            } catch (IOException ignore) {
195                LOG.trace("End of data file reached at (header was invalid): " + item);
196                return;
197            }
198            if (type == REDO_ITEM_TYPE) {
199                // Un-marshal the redo item
200                Object object;
201                try {
202                    object = readItem(redoMarshaller, item);
203                } catch (IOException e1) {
204                    LOG.trace("End of data file reached at (payload was invalid): " + item);
205                    return;
206                }
207                try {
208
209                    listener.onRedoItem(item, object);
210                    // in case the listener is holding on to item references,
211                    // copy it
212                    // so we don't change it behind the listener's back.
213                    item = item.copy();
214
215                } catch (Exception e) {
216                    throw IOExceptionSupport.create("Recovery handler failed: " + e, e);
217                }
218            }
219            // Move to the next item.
220            item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize());
221        }
222    }
223
224    /*
225     * (non-Javadoc)
226     * 
227     * @see org.apache.activemq.kaha.impl.data.IDataManager#close()
228     */
229    public synchronized void close() throws IOException {
230        getWriter().close();
231        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
232            DataFile dataFile = i.next();
233            getWriter().force(dataFile);
234            dataFile.close();
235        }
236        fileMap.clear();
237    }
238
239    /*
240     * (non-Javadoc)
241     * 
242     * @see org.apache.activemq.kaha.impl.data.IDataManager#force()
243     */
244    public synchronized void force() throws IOException {
245        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
246            DataFile dataFile = i.next();
247            getWriter().force(dataFile);
248        }
249    }
250
251    /*
252     * (non-Javadoc)
253     * 
254     * @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
255     */
256    public synchronized boolean delete() throws IOException {
257        boolean result = true;
258        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
259            DataFile dataFile = i.next();
260            storeSize.addAndGet(-dataFile.getLength());
261            result &= dataFile.delete();
262        }
263        fileMap.clear();
264        return result;
265    }
266
267    /*
268     * (non-Javadoc)
269     * 
270     * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
271     */
272    public synchronized void addInterestInFile(int file) throws IOException {
273        if (file >= 0) {
274            Integer key = Integer.valueOf(file);
275            DataFile dataFile = fileMap.get(key);
276            if (dataFile == null) {
277                dataFile = createAndAddDataFile(file);
278            }
279            addInterestInFile(dataFile);
280        }
281    }
282
283    synchronized void addInterestInFile(DataFile dataFile) {
284        if (dataFile != null) {
285            dataFile.increment();
286        }
287    }
288
289    /*
290     * (non-Javadoc)
291     * 
292     * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
293     */
294    public synchronized void removeInterestInFile(int file) throws IOException {
295        if (file >= 0) {
296            Integer key = Integer.valueOf(file);
297            DataFile dataFile = fileMap.get(key);
298            removeInterestInFile(dataFile);
299        }
300    }
301
302    synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
303        if (dataFile != null) {
304           
305            if (dataFile.decrement() <= 0) {
306                if (dataFile != currentWriteFile) {
307                    removeDataFile(dataFile);
308                }
309            }
310        }
311    }
312
313    /*
314     * (non-Javadoc)
315     * 
316     * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
317     */
318    public synchronized void consolidateDataFiles() throws IOException {
319        List<DataFile> purgeList = new ArrayList<DataFile>();
320        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
321            DataFile dataFile = i.next();
322            if (dataFile.isUnused() && dataFile != currentWriteFile) {
323                purgeList.add(dataFile);
324            }
325        }
326        for (int i = 0; i < purgeList.size(); i++) {
327            DataFile dataFile = purgeList.get(i);
328            removeDataFile(dataFile);
329        }
330    }
331
332    private void removeDataFile(DataFile dataFile) throws IOException {
333        fileMap.remove(dataFile.getNumber());
334        if (writer != null) {
335            writer.force(dataFile);
336        }
337        storeSize.addAndGet(-dataFile.getLength());
338        boolean result = dataFile.delete();
339        LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
340    }
341
342    /*
343     * (non-Javadoc)
344     * 
345     * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
346     */
347    public Marshaller getRedoMarshaller() {
348        return redoMarshaller;
349    }
350
351    /*
352     * (non-Javadoc)
353     * 
354     * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
355     */
356    public void setRedoMarshaller(Marshaller redoMarshaller) {
357        this.redoMarshaller = redoMarshaller;
358    }
359
360    /**
361     * @return the maxFileLength
362     */
363    public long getMaxFileLength() {
364        return maxFileLength;
365    }
366
367    /**
368     * @param maxFileLength the maxFileLength to set
369     */
370    public void setMaxFileLength(long maxFileLength) {
371        this.maxFileLength = maxFileLength;
372    }
373
374    public String toString() {
375        return "DataManager:(" + NAME_PREFIX + name + ")";
376    }
377
378    public synchronized SyncDataFileReader getReader() {
379        if (reader == null) {
380            reader = createReader();
381        }
382        return reader;
383    }
384
385    protected synchronized SyncDataFileReader createReader() {
386        return new SyncDataFileReader(this);
387    }
388
389    public synchronized void setReader(SyncDataFileReader reader) {
390        this.reader = reader;
391    }
392
393    public synchronized SyncDataFileWriter getWriter() {
394        if (writer == null) {
395            writer = createWriter();
396        }
397        return writer;
398    }
399
400    private SyncDataFileWriter createWriter() {
401        return new SyncDataFileWriter(this);
402    }
403
404    public synchronized void setWriter(SyncDataFileWriter writer) {
405        this.writer = writer;
406    }
407
408}