001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.ByteArrayInputStream;
020import java.io.DataOutputStream;
021import java.io.EOFException;
022import java.io.IOException;
023import java.net.Socket;
024import java.net.URI;
025import java.net.UnknownHostException;
026import java.nio.ByteBuffer;
027import java.nio.channels.SelectionKey;
028import java.nio.channels.SocketChannel;
029
030import javax.net.SocketFactory;
031
032import org.apache.activemq.transport.Transport;
033import org.apache.activemq.transport.nio.NIOOutputStream;
034import org.apache.activemq.transport.nio.SelectorManager;
035import org.apache.activemq.transport.nio.SelectorSelection;
036import org.apache.activemq.transport.tcp.TcpTransport;
037import org.apache.activemq.util.IOExceptionSupport;
038import org.apache.activemq.util.ServiceStopper;
039import org.apache.activemq.wireformat.WireFormat;
040
041/**
042 * An implementation of the {@link Transport} interface for using Stomp over NIO
043 *
044 *
045 */
046public class StompNIOTransport extends TcpTransport {
047
048    private SocketChannel channel;
049    private SelectorSelection selection;
050
051    private ByteBuffer inputBuffer;
052    StompCodec codec;
053
054    public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
055        super(wireFormat, socketFactory, remoteLocation, localLocation);
056    }
057
058    public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
059        super(wireFormat, socket);
060    }
061
062    protected void initializeStreams() throws IOException {
063        channel = socket.getChannel();
064        channel.configureBlocking(false);
065
066        // listen for events telling us when the socket is readable.
067        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
068            public void onSelect(SelectorSelection selection) {
069                serviceRead();
070            }
071
072            public void onError(SelectorSelection selection, Throwable error) {
073                if (error instanceof IOException) {
074                    onException((IOException)error);
075                } else {
076                    onException(IOExceptionSupport.create(error));
077                }
078            }
079        });
080
081        inputBuffer = ByteBuffer.allocate(8 * 1024);
082        NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
083        this.dataOut = new DataOutputStream(outPutStream);
084        this.buffOut = outPutStream;
085        codec = new StompCodec(this);
086    }
087
088    private void serviceRead() {
089        try {
090
091           while (true) {
092               // read channel
093               int readSize = channel.read(inputBuffer);
094               // channel is closed, cleanup
095               if (readSize == -1) {
096                   onException(new EOFException());
097                   selection.close();
098                   break;
099               }
100               // nothing more to read, break
101               if (readSize == 0) {
102                   break;
103               }
104
105               inputBuffer.flip();
106
107               ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
108               codec.parse(input, readSize);
109
110               // clear the buffer
111               inputBuffer.clear();
112
113           }
114        } catch (IOException e) {
115            onException(e);
116        } catch (Throwable e) {
117            onException(IOExceptionSupport.create(e));
118        }
119    }
120
121    protected void doStart() throws Exception {
122        connect();
123        selection.setInterestOps(SelectionKey.OP_READ);
124        selection.enable();
125    }
126
127    protected void doStop(ServiceStopper stopper) throws Exception {
128        try {
129            if (selection != null) {
130                selection.close();
131            }
132        } finally {
133            super.doStop(stopper);
134        }
135    }
136}