001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.proxy; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.Iterator; 023import java.util.concurrent.CopyOnWriteArrayList; 024import org.apache.activemq.Service; 025import org.apache.activemq.transport.CompositeTransport; 026import org.apache.activemq.transport.Transport; 027import org.apache.activemq.transport.TransportAcceptListener; 028import org.apache.activemq.transport.TransportFactory; 029import org.apache.activemq.transport.TransportFilter; 030import org.apache.activemq.transport.TransportServer; 031import org.apache.activemq.util.ServiceStopper; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * @org.apache.xbean.XBean 037 * 038 * 039 */ 040public class ProxyConnector implements Service { 041 042 private static final Logger LOG = LoggerFactory.getLogger(ProxyConnector.class); 043 private TransportServer server; 044 private URI bind; 045 private URI remote; 046 private URI localUri; 047 private String name; 048 /** 049 * Should we proxy commands to the local broker using VM transport as well? 050 */ 051 private boolean proxyToLocalBroker = true; 052 053 private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>(); 054 055 public void start() throws Exception { 056 057 this.getServer().setAcceptListener(new TransportAcceptListener() { 058 public void onAccept(Transport localTransport) { 059 try { 060 Transport remoteTransport = createRemoteTransport(); 061 ProxyConnection connection = new ProxyConnection(localTransport, remoteTransport); 062 connections.add(connection); 063 connection.start(); 064 } catch (Exception e) { 065 onAcceptError(e); 066 } 067 } 068 069 public void onAcceptError(Exception error) { 070 LOG.error("Could not accept connection: " + error, error); 071 } 072 }); 073 getServer().start(); 074 LOG.info("Proxy Connector " + getName() + " Started"); 075 076 } 077 078 public void stop() throws Exception { 079 ServiceStopper ss = new ServiceStopper(); 080 if (this.server != null) { 081 ss.stop(this.server); 082 } 083 for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) { 084 LOG.info("Connector stopped: Stopping proxy."); 085 ss.stop(iter.next()); 086 } 087 ss.throwFirstException(); 088 LOG.info("Proxy Connector " + getName() + " Stopped"); 089 } 090 091 // Properties 092 // ------------------------------------------------------------------------- 093 094 public URI getLocalUri() { 095 return localUri; 096 } 097 098 public void setLocalUri(URI localURI) { 099 this.localUri = localURI; 100 } 101 102 public URI getBind() { 103 return bind; 104 } 105 106 public void setBind(URI bind) { 107 this.bind = bind; 108 } 109 110 public URI getRemote() { 111 return remote; 112 } 113 114 public void setRemote(URI remote) { 115 this.remote = remote; 116 } 117 118 public TransportServer getServer() throws IOException, URISyntaxException { 119 if (server == null) { 120 server = createServer(); 121 } 122 return server; 123 } 124 125 public void setServer(TransportServer server) { 126 this.server = server; 127 } 128 129 protected TransportServer createServer() throws IOException, URISyntaxException { 130 if (bind == null) { 131 throw new IllegalArgumentException("You must specify either a server or the bind property"); 132 } 133 return TransportFactory.bind(bind); 134 } 135 136 private Transport createRemoteTransport() throws Exception { 137 Transport transport = TransportFactory.compositeConnect(remote); 138 CompositeTransport ct = transport.narrow(CompositeTransport.class); 139 if (ct != null && localUri != null && proxyToLocalBroker) { 140 ct.add(false,new URI[] {localUri}); 141 } 142 143 // Add a transport filter so that we can track the transport life cycle 144 transport = new TransportFilter(transport) { 145 @Override 146 public void stop() throws Exception { 147 LOG.info("Stopping proxy."); 148 super.stop(); 149 connections.remove(this); 150 } 151 }; 152 return transport; 153 } 154 155 public String getName() { 156 if (name == null) { 157 if (server != null) { 158 name = server.getConnectURI().toString(); 159 } else { 160 name = "proxy"; 161 } 162 } 163 return name; 164 } 165 166 public void setName(String name) { 167 this.name = name; 168 } 169 170 public boolean isProxyToLocalBroker() { 171 return proxyToLocalBroker; 172 } 173 174 public void setProxyToLocalBroker(boolean proxyToLocalBroker) { 175 this.proxyToLocalBroker = proxyToLocalBroker; 176 } 177 178}