001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.openwire;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.lang.reflect.Method;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.command.CommandTypes;
027import org.apache.activemq.command.DataStructure;
028import org.apache.activemq.command.WireFormatInfo;
029import org.apache.activemq.util.ByteSequence;
030import org.apache.activemq.util.ByteSequenceData;
031import org.apache.activemq.util.DataByteArrayInputStream;
032import org.apache.activemq.util.DataByteArrayOutputStream;
033import org.apache.activemq.wireformat.WireFormat;
034
035/**
036 * 
037 * 
038 */
039public final class OpenWireFormat implements WireFormat {
040
041    public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
042    public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
043    public static final int DEFAULT_MAX_FRAME_SIZE = 100 * 1024 * 1024; //100 MB
044
045    static final byte NULL_TYPE = CommandTypes.NULL;
046    private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
047    private static final int MARSHAL_CACHE_FREE_SPACE = 100;
048
049    private DataStreamMarshaller dataMarshallers[];
050    private int version;
051    private boolean stackTraceEnabled;
052    private boolean tcpNoDelayEnabled;
053    private boolean cacheEnabled;
054    private boolean tightEncodingEnabled;
055    private boolean sizePrefixDisabled;
056    private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
057
058    // The following fields are used for value caching
059    private short nextMarshallCacheIndex;
060    private short nextMarshallCacheEvictionIndex;
061    private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
062    private DataStructure marshallCache[] = null;
063    private DataStructure unmarshallCache[] = null;
064    private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
065    private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
066    private WireFormatInfo preferedWireFormatInfo;
067    
068    public OpenWireFormat() {
069        this(DEFAULT_VERSION);
070    }
071
072    public OpenWireFormat(int i) {
073        setVersion(i);
074    }
075
076    public int hashCode() {
077        return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
078               ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
079               ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
080               ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
081    }
082
083    public OpenWireFormat copy() {
084        OpenWireFormat answer = new OpenWireFormat(version);
085        answer.stackTraceEnabled = stackTraceEnabled;
086        answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
087        answer.cacheEnabled = cacheEnabled;
088        answer.tightEncodingEnabled = tightEncodingEnabled;
089        answer.sizePrefixDisabled = sizePrefixDisabled;
090        answer.preferedWireFormatInfo = preferedWireFormatInfo;
091        return answer;
092    }
093
094    public boolean equals(Object object) {
095        if (object == null) {
096            return false;
097        }
098        OpenWireFormat o = (OpenWireFormat)object;
099        return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
100               && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
101               && o.sizePrefixDisabled == sizePrefixDisabled;
102    }
103
104
105    public String toString() {
106        return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
107               + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled +  ", maxFrameSize=" + maxFrameSize + "}";
108        // return "OpenWireFormat{id="+id+",
109        // tightEncodingEnabled="+tightEncodingEnabled+"}";
110    }
111
112    public int getVersion() {
113        return version;
114    }
115
116    public synchronized ByteSequence marshal(Object command) throws IOException {
117
118        if (cacheEnabled) {
119            runMarshallCacheEvictionSweep();
120        }
121
122//        MarshallAware ma = null;
123//        // If not using value caching, then the marshaled form is always the
124//        // same
125//        if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
126//            ma = (MarshallAware)command;
127//        }
128
129        ByteSequence sequence = null;
130        // if( ma!=null ) {
131        // sequence = ma.getCachedMarshalledForm(this);
132        // }
133
134        if (sequence == null) {
135
136            int size = 1;
137            if (command != null) {
138
139                DataStructure c = (DataStructure)command;
140                byte type = c.getDataStructureType();
141                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
142                if (dsm == null) {
143                    throw new IOException("Unknown data type: " + type);
144                }
145                if (tightEncodingEnabled) {
146
147                    BooleanStream bs = new BooleanStream();
148                    size += dsm.tightMarshal1(this, c, bs);
149                    size += bs.marshalledSize();
150
151                    bytesOut.restart(size);
152                    if (!sizePrefixDisabled) {
153                        bytesOut.writeInt(size);
154                    }
155                    bytesOut.writeByte(type);
156                    bs.marshal(bytesOut);
157                    dsm.tightMarshal2(this, c, bytesOut, bs);
158                    sequence = bytesOut.toByteSequence();
159
160                } else {
161                    bytesOut.restart();
162                    if (!sizePrefixDisabled) {
163                        bytesOut.writeInt(0); // we don't know the final size
164                                                // yet but write this here for
165                                                // now.
166                    }
167                    bytesOut.writeByte(type);
168                    dsm.looseMarshal(this, c, bytesOut);
169                    sequence = bytesOut.toByteSequence();
170
171                    if (!sizePrefixDisabled) {
172                        size = sequence.getLength() - 4;
173                        int pos = sequence.offset;
174                        ByteSequenceData.writeIntBig(sequence, size);
175                        sequence.offset = pos;
176                    }
177                }
178
179            } else {
180                bytesOut.restart(5);
181                bytesOut.writeInt(size);
182                bytesOut.writeByte(NULL_TYPE);
183                sequence = bytesOut.toByteSequence();
184            }
185
186            // if( ma!=null ) {
187            // ma.setCachedMarshalledForm(this, sequence);
188            // }
189        }
190        return sequence;
191    }
192
193    public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
194        bytesIn.restart(sequence);
195        // DataInputStream dis = new DataInputStream(new
196        // ByteArrayInputStream(sequence));
197
198        if (!sizePrefixDisabled) {
199            int size = bytesIn.readInt();
200            if (sequence.getLength() - 4 != size) {
201                // throw new IOException("Packet size does not match marshaled
202                // size");
203            }
204
205            if (size > maxFrameSize) {
206                throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
207            }
208        }
209
210        Object command = doUnmarshal(bytesIn);
211        // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
212        // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
213        // }
214        return command;
215    }
216
217    public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
218
219        if (cacheEnabled) {
220            runMarshallCacheEvictionSweep();
221        }
222
223        int size = 1;
224        if (o != null) {
225
226            DataStructure c = (DataStructure)o;
227            byte type = c.getDataStructureType();
228            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
229            if (dsm == null) {
230                throw new IOException("Unknown data type: " + type);
231            }
232            if (tightEncodingEnabled) {
233                BooleanStream bs = new BooleanStream();
234                size += dsm.tightMarshal1(this, c, bs);
235                size += bs.marshalledSize();
236
237                if (!sizePrefixDisabled) {
238                    dataOut.writeInt(size);
239                }
240
241                dataOut.writeByte(type);
242                bs.marshal(dataOut);
243                dsm.tightMarshal2(this, c, dataOut, bs);
244
245            } else {
246                DataOutput looseOut = dataOut;
247
248                if (!sizePrefixDisabled) {
249                    bytesOut.restart();
250                    looseOut = bytesOut;
251                }
252
253                looseOut.writeByte(type);
254                dsm.looseMarshal(this, c, looseOut);
255
256                if (!sizePrefixDisabled) {
257                    ByteSequence sequence = bytesOut.toByteSequence();
258                    dataOut.writeInt(sequence.getLength());
259                    dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
260                }
261
262            }
263
264        } else {
265            if (!sizePrefixDisabled) {
266                dataOut.writeInt(size);
267            }
268            dataOut.writeByte(NULL_TYPE);
269        }
270    }
271
272    public Object unmarshal(DataInput dis) throws IOException {
273        DataInput dataIn = dis;
274        if (!sizePrefixDisabled) {
275            int size = dis.readInt();
276            if (size > maxFrameSize) {
277                throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
278            }
279            // int size = dis.readInt();
280            // byte[] data = new byte[size];
281            // dis.readFully(data);
282            // bytesIn.restart(data);
283            // dataIn = bytesIn;
284        }
285        return doUnmarshal(dataIn);
286    }
287
288    /**
289     * Used by NIO or AIO transports
290     */
291    public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
292        int size = 1;
293        if (o != null) {
294            DataStructure c = (DataStructure)o;
295            byte type = c.getDataStructureType();
296            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
297            if (dsm == null) {
298                throw new IOException("Unknown data type: " + type);
299            }
300
301            size += dsm.tightMarshal1(this, c, bs);
302            size += bs.marshalledSize();
303        }
304        return size;
305    }
306
307    /**
308     * Used by NIO or AIO transports; note that the size is not written as part
309     * of this method.
310     */
311    public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
312        if (cacheEnabled) {
313            runMarshallCacheEvictionSweep();
314        }
315
316        if (o != null) {
317            DataStructure c = (DataStructure)o;
318            byte type = c.getDataStructureType();
319            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
320            if (dsm == null) {
321                throw new IOException("Unknown data type: " + type);
322            }
323            ds.writeByte(type);
324            bs.marshal(ds);
325            dsm.tightMarshal2(this, c, ds, bs);
326        }
327    }
328
329    /**
330     * Allows you to dynamically switch the version of the openwire protocol
331     * being used.
332     * 
333     * @param version
334     */
335    public void setVersion(int version) {
336        String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
337        Class mfClass;
338        try {
339            mfClass = Class.forName(mfName, false, getClass().getClassLoader());
340        } catch (ClassNotFoundException e) {
341            throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
342                                                                         + ", could not load " + mfName)
343                .initCause(e);
344        }
345        try {
346            Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
347            dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
348        } catch (Throwable e) {
349            throw (IllegalArgumentException)new IllegalArgumentException(
350                                                                         "Invalid version: "
351                                                                             + version
352                                                                             + ", "
353                                                                             + mfName
354                                                                             + " does not properly implement the createMarshallerMap method.")
355                .initCause(e);
356        }
357        this.version = version;
358    }
359
360    public Object doUnmarshal(DataInput dis) throws IOException {
361        byte dataType = dis.readByte();
362        if (dataType != NULL_TYPE) {
363            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
364            if (dsm == null) {
365                throw new IOException("Unknown data type: " + dataType);
366            }
367            Object data = dsm.createObject();
368            if (this.tightEncodingEnabled) {
369                BooleanStream bs = new BooleanStream();
370                bs.unmarshal(dis);
371                dsm.tightUnmarshal(this, data, dis, bs);
372            } else {
373                dsm.looseUnmarshal(this, data, dis);
374            }
375            return data;
376        } else {
377            return null;
378        }
379    }
380
381    // public void debug(String msg) {
382    // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
383    // System.out.println(t+": "+msg);
384    // }
385    public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
386        bs.writeBoolean(o != null);
387        if (o == null) {
388            return 0;
389        }
390
391        if (o.isMarshallAware()) {
392            // MarshallAware ma = (MarshallAware)o;
393            ByteSequence sequence = null;
394            // sequence=ma.getCachedMarshalledForm(this);
395            bs.writeBoolean(sequence != null);
396            if (sequence != null) {
397                return 1 + sequence.getLength();
398            }
399        }
400
401        byte type = o.getDataStructureType();
402        DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
403        if (dsm == null) {
404            throw new IOException("Unknown data type: " + type);
405        }
406        return 1 + dsm.tightMarshal1(this, o, bs);
407    }
408
409    public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
410        throws IOException {
411        if (!bs.readBoolean()) {
412            return;
413        }
414
415        byte type = o.getDataStructureType();
416        ds.writeByte(type);
417
418        if (o.isMarshallAware() && bs.readBoolean()) {
419
420            // We should not be doing any caching
421            throw new IOException("Corrupted stream");
422            // MarshallAware ma = (MarshallAware) o;
423            // ByteSequence sequence=ma.getCachedMarshalledForm(this);
424            // ds.write(sequence.getData(), sequence.getOffset(),
425            // sequence.getLength());
426
427        } else {
428
429            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
430            if (dsm == null) {
431                throw new IOException("Unknown data type: " + type);
432            }
433            dsm.tightMarshal2(this, o, ds, bs);
434
435        }
436    }
437
438    public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
439        if (bs.readBoolean()) {
440
441            byte dataType = dis.readByte();
442            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
443            if (dsm == null) {
444                throw new IOException("Unknown data type: " + dataType);
445            }
446            DataStructure data = dsm.createObject();
447
448            if (data.isMarshallAware() && bs.readBoolean()) {
449
450                dis.readInt();
451                dis.readByte();
452
453                BooleanStream bs2 = new BooleanStream();
454                bs2.unmarshal(dis);
455                dsm.tightUnmarshal(this, data, dis, bs2);
456
457                // TODO: extract the sequence from the dis and associate it.
458                // MarshallAware ma = (MarshallAware)data
459                // ma.setCachedMarshalledForm(this, sequence);
460
461            } else {
462                dsm.tightUnmarshal(this, data, dis, bs);
463            }
464
465            return data;
466        } else {
467            return null;
468        }
469    }
470
471    public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
472        if (dis.readBoolean()) {
473
474            byte dataType = dis.readByte();
475            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
476            if (dsm == null) {
477                throw new IOException("Unknown data type: " + dataType);
478            }
479            DataStructure data = dsm.createObject();
480            dsm.looseUnmarshal(this, data, dis);
481            return data;
482
483        } else {
484            return null;
485        }
486    }
487
488    public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
489        dataOut.writeBoolean(o != null);
490        if (o != null) {
491            byte type = o.getDataStructureType();
492            dataOut.writeByte(type);
493            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
494            if (dsm == null) {
495                throw new IOException("Unknown data type: " + type);
496            }
497            dsm.looseMarshal(this, o, dataOut);
498        }
499    }
500
501    public void runMarshallCacheEvictionSweep() {
502        // Do we need to start evicting??
503        while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
504
505            marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
506            marshallCache[nextMarshallCacheEvictionIndex] = null;
507
508            nextMarshallCacheEvictionIndex++;
509            if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
510                nextMarshallCacheEvictionIndex = 0;
511            }
512
513        }
514    }
515
516    public Short getMarshallCacheIndex(DataStructure o) {
517        return marshallCacheMap.get(o);
518    }
519
520    public Short addToMarshallCache(DataStructure o) {
521        short i = nextMarshallCacheIndex++;
522        if (nextMarshallCacheIndex >= marshallCache.length) {
523            nextMarshallCacheIndex = 0;
524        }
525
526        // We can only cache that item if there is space left.
527        if (marshallCacheMap.size() < marshallCache.length) {
528            marshallCache[i] = o;
529            Short index = new Short(i);
530            marshallCacheMap.put(o, index);
531            return index;
532        } else {
533            // Use -1 to indicate that the value was not cached due to cache
534            // being full.
535            return new Short((short)-1);
536        }
537    }
538
539    public void setInUnmarshallCache(short index, DataStructure o) {
540
541        // There was no space left in the cache, so we can't
542        // put this in the cache.
543        if (index == -1) {
544            return;
545        }
546
547        unmarshallCache[index] = o;
548    }
549
550    public DataStructure getFromUnmarshallCache(short index) {
551        return unmarshallCache[index];
552    }
553
554    public void setStackTraceEnabled(boolean b) {
555        stackTraceEnabled = b;
556    }
557
558    public boolean isStackTraceEnabled() {
559        return stackTraceEnabled;
560    }
561
562    public boolean isTcpNoDelayEnabled() {
563        return tcpNoDelayEnabled;
564    }
565
566    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
567        this.tcpNoDelayEnabled = tcpNoDelayEnabled;
568    }
569
570    public boolean isCacheEnabled() {
571        return cacheEnabled;
572    }
573
574    public void setCacheEnabled(boolean cacheEnabled) {
575        if(cacheEnabled){
576            marshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
577            unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
578        }
579        this.cacheEnabled = cacheEnabled;
580    }
581
582    public boolean isTightEncodingEnabled() {
583        return tightEncodingEnabled;
584    }
585
586    public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
587        this.tightEncodingEnabled = tightEncodingEnabled;
588    }
589
590    public boolean isSizePrefixDisabled() {
591        return sizePrefixDisabled;
592    }
593
594    public void setSizePrefixDisabled(boolean prefixPacketSize) {
595        this.sizePrefixDisabled = prefixPacketSize;
596    }
597
598    public void setPreferedWireFormatInfo(WireFormatInfo info) {
599        this.preferedWireFormatInfo = info;
600    }
601
602    public WireFormatInfo getPreferedWireFormatInfo() {
603        return preferedWireFormatInfo;
604    }
605
606    public long getMaxFrameSize() {
607        return maxFrameSize;
608    }
609
610    public void setMaxFrameSize(long maxFrameSize) {
611        this.maxFrameSize = maxFrameSize;
612    }
613
614    public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
615
616        if (preferedWireFormatInfo == null) {
617            throw new IllegalStateException("Wireformat cannot not be renegotiated.");
618        }
619
620        this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
621        info.setVersion(this.getVersion());
622
623        this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize()));
624        info.setMaxFrameSize(this.getMaxFrameSize());
625
626        this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
627        info.setStackTraceEnabled(this.stackTraceEnabled);
628
629        this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
630        info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
631
632        this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
633        info.setCacheEnabled(this.cacheEnabled);
634
635        this.tightEncodingEnabled = info.isTightEncodingEnabled()
636                                    && preferedWireFormatInfo.isTightEncodingEnabled();
637        info.setTightEncodingEnabled(this.tightEncodingEnabled);
638
639        this.sizePrefixDisabled = info.isSizePrefixDisabled()
640                                  && preferedWireFormatInfo.isSizePrefixDisabled();
641        info.setSizePrefixDisabled(this.sizePrefixDisabled);
642
643        if (cacheEnabled) {
644
645            int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
646            info.setCacheSize(size);
647
648            if (size == 0) {
649                size = MARSHAL_CACHE_SIZE;
650            }
651
652            marshallCache = new DataStructure[size];
653            unmarshallCache = new DataStructure[size];
654            nextMarshallCacheIndex = 0;
655            nextMarshallCacheEvictionIndex = 0;
656            marshallCacheMap = new HashMap<DataStructure, Short>();
657        } else {
658            marshallCache = null;
659            unmarshallCache = null;
660            nextMarshallCacheIndex = 0;
661            nextMarshallCacheEvictionIndex = 0;
662            marshallCacheMap = null;
663        }
664
665    }
666
667    protected int min(int version1, int version2) {
668        if (version1 < version2 && version1 > 0 || version2 <= 0) {
669            return version1;
670        }
671        return version2;
672    }
673
674    protected long min(long version1, long version2) {
675        if (version1 < version2 && version1 > 0 || version2 <= 0) {
676            return version1;
677        }
678        return version2;
679    }
680}