001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.LinkedList;
023import java.util.StringTokenizer;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.regex.Pattern;
026
027import javax.management.ObjectName;
028
029import org.apache.activemq.broker.jmx.ManagedTransportConnector;
030import org.apache.activemq.broker.jmx.ManagementContext;
031import org.apache.activemq.broker.region.ConnectorStatistics;
032import org.apache.activemq.command.BrokerInfo;
033import org.apache.activemq.command.ConnectionControl;
034import org.apache.activemq.security.MessageAuthorizationPolicy;
035import org.apache.activemq.thread.TaskRunnerFactory;
036import org.apache.activemq.transport.Transport;
037import org.apache.activemq.transport.TransportAcceptListener;
038import org.apache.activemq.transport.TransportFactorySupport;
039import org.apache.activemq.transport.TransportServer;
040import org.apache.activemq.transport.discovery.DiscoveryAgent;
041import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
042import org.apache.activemq.util.ServiceStopper;
043import org.apache.activemq.util.ServiceSupport;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * @org.apache.xbean.XBean
049 */
050public class TransportConnector implements Connector, BrokerServiceAware {
051
052    final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
053
054    protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
055    protected TransportStatusDetector statusDector;
056    private BrokerService brokerService;
057    private TransportServer server;
058    private URI uri;
059    private BrokerInfo brokerInfo = new BrokerInfo();
060    private TaskRunnerFactory taskRunnerFactory;
061    private MessageAuthorizationPolicy messageAuthorizationPolicy;
062    private DiscoveryAgent discoveryAgent;
063    private final ConnectorStatistics statistics = new ConnectorStatistics();
064    private URI discoveryUri;
065    private String name;
066    private boolean disableAsyncDispatch;
067    private boolean enableStatusMonitor = false;
068    private Broker broker;
069    private boolean updateClusterClients = false;
070    private boolean rebalanceClusterClients;
071    private boolean updateClusterClientsOnRemove = false;
072    private String updateClusterFilter;
073    private boolean auditNetworkProducers = false;
074    private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
075    private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
076    private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
077    private boolean allowLinkStealing;
078
079    LinkedList<String> peerBrokers = new LinkedList<String>();
080
081    public TransportConnector() {
082    }
083
084    public TransportConnector(TransportServer server) {
085        this();
086        setServer(server);
087        if (server != null && server.getConnectURI() != null) {
088            URI uri = server.getConnectURI();
089            if (uri != null && uri.getScheme().equals("vm")) {
090                setEnableStatusMonitor(false);
091            }
092        }
093    }
094
095    /**
096     * @return Returns the connections.
097     */
098    public CopyOnWriteArrayList<TransportConnection> getConnections() {
099        return connections;
100    }
101
102    /**
103     * Factory method to create a JMX managed version of this transport
104     * connector
105     */
106    public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
107        ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
108        rc.setBrokerInfo(getBrokerInfo());
109        rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
110        rc.setDiscoveryAgent(getDiscoveryAgent());
111        rc.setDiscoveryUri(getDiscoveryUri());
112        rc.setEnableStatusMonitor(isEnableStatusMonitor());
113        rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
114        rc.setName(getName());
115        rc.setTaskRunnerFactory(getTaskRunnerFactory());
116        rc.setUri(getUri());
117        rc.setBrokerService(brokerService);
118        rc.setUpdateClusterClients(isUpdateClusterClients());
119        rc.setRebalanceClusterClients(isRebalanceClusterClients());
120        rc.setUpdateClusterFilter(getUpdateClusterFilter());
121        rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
122        rc.setAuditNetworkProducers(isAuditNetworkProducers());
123        rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
124        rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
125        rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
126        rc.setAllowLinkStealing(isAllowLinkStealing());
127        return rc;
128    }
129
130    @Override
131    public BrokerInfo getBrokerInfo() {
132        return brokerInfo;
133    }
134
135    public void setBrokerInfo(BrokerInfo brokerInfo) {
136        this.brokerInfo = brokerInfo;
137    }
138
139    public TransportServer getServer() throws IOException, URISyntaxException {
140        if (server == null) {
141            setServer(createTransportServer());
142        }
143        return server;
144    }
145
146    public void setServer(TransportServer server) {
147        this.server = server;
148    }
149
150    public URI getUri() {
151        if (uri == null) {
152            try {
153                uri = getConnectUri();
154            } catch (Throwable e) {
155            }
156        }
157        return uri;
158    }
159
160    /**
161     * Sets the server transport URI to use if there is not a
162     * {@link TransportServer} configured via the
163     * {@link #setServer(TransportServer)} method. This value is used to lazy
164     * create a {@link TransportServer} instance
165     *
166     * @param uri
167     */
168    public void setUri(URI uri) {
169        this.uri = uri;
170    }
171
172    public TaskRunnerFactory getTaskRunnerFactory() {
173        return taskRunnerFactory;
174    }
175
176    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
177        this.taskRunnerFactory = taskRunnerFactory;
178    }
179
180    /**
181     * @return the statistics for this connector
182     */
183    @Override
184    public ConnectorStatistics getStatistics() {
185        return statistics;
186    }
187
188    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
189        return messageAuthorizationPolicy;
190    }
191
192    /**
193     * Sets the policy used to decide if the current connection is authorized to
194     * consume a given message
195     */
196    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
197        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
198    }
199
200    @Override
201    public void start() throws Exception {
202        broker = brokerService.getBroker();
203        brokerInfo.setBrokerName(broker.getBrokerName());
204        brokerInfo.setBrokerId(broker.getBrokerId());
205        brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
206        brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
207        brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
208        getServer().setAcceptListener(new TransportAcceptListener() {
209            @Override
210            public void onAccept(final Transport transport) {
211                try {
212                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
213                        @Override
214                        public void run() {
215                            try {
216                                if (!brokerService.isStopping()) {
217                                    Connection connection = createConnection(transport);
218                                    connection.start();
219                                } else {
220                                    throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
221                                }
222                            } catch (Exception e) {
223                                String remoteHost = transport.getRemoteAddress();
224                                ServiceSupport.dispose(transport);
225                                onAcceptError(e, remoteHost);
226                            }
227                        }
228                    });
229                } catch (Exception e) {
230                    String remoteHost = transport.getRemoteAddress();
231                    ServiceSupport.dispose(transport);
232                    onAcceptError(e, remoteHost);
233                }
234            }
235
236            @Override
237            public void onAcceptError(Exception error) {
238                onAcceptError(error, null);
239            }
240
241            private void onAcceptError(Exception error, String remoteHost) {
242                if (brokerService != null && brokerService.isStopping()) {
243                    LOG.info("Could not accept connection during shutdown {} : {}", (remoteHost == null ? "" : "from " + remoteHost), error);
244                } else {
245                    LOG.error("Could not accept connection {} : {}", (remoteHost == null ? "" : "from " + remoteHost), error);
246                    LOG.debug("Reason: " + error, error);
247                }
248            }
249        });
250        getServer().setBrokerInfo(brokerInfo);
251        getServer().start();
252
253        DiscoveryAgent da = getDiscoveryAgent();
254        if (da != null) {
255            da.registerService(getPublishableConnectString());
256            da.start();
257        }
258        if (enableStatusMonitor) {
259            this.statusDector = new TransportStatusDetector(this);
260            this.statusDector.start();
261        }
262
263        LOG.info("Connector {} started", getName());
264    }
265
266    public String getPublishableConnectString() throws Exception {
267        String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this);
268        LOG.debug("Publishing: {} for broker transport URI: {}", publishableConnectString, getConnectUri());
269        return publishableConnectString;
270    }
271
272    public URI getPublishableConnectURI() throws Exception {
273        return publishedAddressPolicy.getPublishableConnectURI(this);
274    }
275
276    @Override
277    public void stop() throws Exception {
278        ServiceStopper ss = new ServiceStopper();
279        if (discoveryAgent != null) {
280            ss.stop(discoveryAgent);
281        }
282        if (server != null) {
283            ss.stop(server);
284        }
285        if (this.statusDector != null) {
286            this.statusDector.stop();
287        }
288
289        for (TransportConnection connection : connections) {
290            ss.stop(connection);
291        }
292        server = null;
293        ss.throwFirstException();
294        LOG.info("Connector {} stopped", getName());
295    }
296
297    // Implementation methods
298    // -------------------------------------------------------------------------
299    protected Connection createConnection(Transport transport) throws IOException {
300        // prefer to use task runner from broker service as stop task runner, as we can then
301        // tie it to the lifecycle of the broker service
302        TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
303                : taskRunnerFactory, brokerService.getTaskRunnerFactory());
304        boolean statEnabled = this.getStatistics().isEnabled();
305        answer.getStatistics().setEnabled(statEnabled);
306        answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
307        return answer;
308    }
309
310    protected TransportServer createTransportServer() throws IOException, URISyntaxException {
311        if (uri == null) {
312            throw new IllegalArgumentException("You must specify either a server or uri property");
313        }
314        if (brokerService == null) {
315            throw new IllegalArgumentException(
316                    "You must specify the brokerService property. Maybe this connector should be added to a broker?");
317        }
318        return TransportFactorySupport.bind(brokerService, uri);
319    }
320
321    public DiscoveryAgent getDiscoveryAgent() throws IOException {
322        if (discoveryAgent == null) {
323            discoveryAgent = createDiscoveryAgent();
324        }
325        return discoveryAgent;
326    }
327
328    protected DiscoveryAgent createDiscoveryAgent() throws IOException {
329        if (discoveryUri != null) {
330            DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
331
332            if (agent != null && agent instanceof BrokerServiceAware) {
333                ((BrokerServiceAware) agent).setBrokerService(brokerService);
334            }
335
336            return agent;
337        }
338        return null;
339    }
340
341    public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
342        this.discoveryAgent = discoveryAgent;
343    }
344
345    public URI getDiscoveryUri() {
346        return discoveryUri;
347    }
348
349    public void setDiscoveryUri(URI discoveryUri) {
350        this.discoveryUri = discoveryUri;
351    }
352
353    public URI getConnectUri() throws IOException, URISyntaxException {
354        if (server != null) {
355            return server.getConnectURI();
356        } else {
357            return uri;
358        }
359    }
360
361    public void onStarted(TransportConnection connection) {
362        connections.add(connection);
363    }
364
365    public void onStopped(TransportConnection connection) {
366        connections.remove(connection);
367    }
368
369    public String getName() {
370        if (name == null) {
371            uri = getUri();
372            if (uri != null) {
373                name = uri.toString();
374            }
375        }
376        return name;
377    }
378
379    public void setName(String name) {
380        this.name = name;
381    }
382
383    @Override
384    public String toString() {
385        String rc = getName();
386        if (rc == null) {
387            rc = super.toString();
388        }
389        return rc;
390    }
391
392    protected ConnectionControl getConnectionControl() {
393        boolean rebalance = isRebalanceClusterClients();
394        String connectedBrokers = "";
395        String separator = "";
396
397        if (isUpdateClusterClients()) {
398            synchronized (peerBrokers) {
399                for (String uri : getPeerBrokers()) {
400                    connectedBrokers += separator + uri;
401                    separator = ",";
402                }
403
404                if (rebalance) {
405                    String shuffle = peerBrokers.removeFirst();
406                    peerBrokers.addLast(shuffle);
407                }
408            }
409        }
410        ConnectionControl control = new ConnectionControl();
411        control.setConnectedBrokers(connectedBrokers);
412        control.setRebalanceConnection(rebalance);
413        return control;
414    }
415
416    public void addPeerBroker(BrokerInfo info) {
417        if (isMatchesClusterFilter(info.getBrokerName())) {
418            synchronized (peerBrokers) {
419                getPeerBrokers().addLast(info.getBrokerURL());
420            }
421        }
422    }
423
424    public void removePeerBroker(BrokerInfo info) {
425        synchronized (peerBrokers) {
426            getPeerBrokers().remove(info.getBrokerURL());
427        }
428    }
429
430    public LinkedList<String> getPeerBrokers() {
431        synchronized (peerBrokers) {
432            if (peerBrokers.isEmpty()) {
433                peerBrokers.add(brokerService.getDefaultSocketURIString());
434            }
435            return peerBrokers;
436        }
437    }
438
439    @Override
440    public void updateClientClusterInfo() {
441        if (isRebalanceClusterClients() || isUpdateClusterClients()) {
442            ConnectionControl control = getConnectionControl();
443            for (Connection c : this.connections) {
444                c.updateClient(control);
445                if (isRebalanceClusterClients()) {
446                    control = getConnectionControl();
447                }
448            }
449        }
450    }
451
452    private boolean isMatchesClusterFilter(String brokerName) {
453        boolean result = true;
454        String filter = getUpdateClusterFilter();
455        if (filter != null) {
456            filter = filter.trim();
457            if (filter.length() > 0) {
458                StringTokenizer tokenizer = new StringTokenizer(filter, ",");
459                while (result && tokenizer.hasMoreTokens()) {
460                    String token = tokenizer.nextToken();
461                    result = isMatchesClusterFilter(brokerName, token);
462                }
463            }
464        }
465
466        return result;
467    }
468
469    private boolean isMatchesClusterFilter(String brokerName, String match) {
470        boolean result = true;
471        if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
472            result = Pattern.matches(match, brokerName);
473        }
474        return result;
475    }
476
477    public boolean isDisableAsyncDispatch() {
478        return disableAsyncDispatch;
479    }
480
481    public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
482        this.disableAsyncDispatch = disableAsyncDispatch;
483    }
484
485    /**
486     * @return the enableStatusMonitor
487     */
488    public boolean isEnableStatusMonitor() {
489        return enableStatusMonitor;
490    }
491
492    /**
493     * @param enableStatusMonitor
494     *            the enableStatusMonitor to set
495     */
496    public void setEnableStatusMonitor(boolean enableStatusMonitor) {
497        this.enableStatusMonitor = enableStatusMonitor;
498    }
499
500    /**
501     * This is called by the BrokerService right before it starts the transport.
502     */
503    @Override
504    public void setBrokerService(BrokerService brokerService) {
505        this.brokerService = brokerService;
506    }
507
508    public Broker getBroker() {
509        return broker;
510    }
511
512    public BrokerService getBrokerService() {
513        return brokerService;
514    }
515
516    /**
517     * @return the updateClusterClients
518     */
519    @Override
520    public boolean isUpdateClusterClients() {
521        return this.updateClusterClients;
522    }
523
524    /**
525     * @param updateClusterClients
526     *            the updateClusterClients to set
527     */
528    public void setUpdateClusterClients(boolean updateClusterClients) {
529        this.updateClusterClients = updateClusterClients;
530    }
531
532    /**
533     * @return the rebalanceClusterClients
534     */
535    @Override
536    public boolean isRebalanceClusterClients() {
537        return this.rebalanceClusterClients;
538    }
539
540    /**
541     * @param rebalanceClusterClients
542     *            the rebalanceClusterClients to set
543     */
544    public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
545        this.rebalanceClusterClients = rebalanceClusterClients;
546    }
547
548    /**
549     * @return the updateClusterClientsOnRemove
550     */
551    @Override
552    public boolean isUpdateClusterClientsOnRemove() {
553        return this.updateClusterClientsOnRemove;
554    }
555
556    /**
557     * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
558     */
559    public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
560        this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
561    }
562
563    /**
564     * @return the updateClusterFilter
565     */
566    @Override
567    public String getUpdateClusterFilter() {
568        return this.updateClusterFilter;
569    }
570
571    /**
572     * @param updateClusterFilter
573     *            the updateClusterFilter to set
574     */
575    public void setUpdateClusterFilter(String updateClusterFilter) {
576        this.updateClusterFilter = updateClusterFilter;
577    }
578
579    @Override
580    public int connectionCount() {
581        return connections.size();
582    }
583
584    @Override
585    public boolean isAllowLinkStealing() {
586        return server.isAllowLinkStealing();
587    }
588
589    public void setAllowLinkStealing (boolean allowLinkStealing) {
590        this.allowLinkStealing=allowLinkStealing;
591    }
592
593    public boolean isAuditNetworkProducers() {
594        return auditNetworkProducers;
595    }
596
597    /**
598     * Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
599     * Note: does not work with conduit=false, networked composite destinations or networked virtual topics
600     * @param auditNetworkProducers
601     */
602    public void setAuditNetworkProducers(boolean auditNetworkProducers) {
603        this.auditNetworkProducers = auditNetworkProducers;
604    }
605
606    public int getMaximumProducersAllowedPerConnection() {
607        return maximumProducersAllowedPerConnection;
608    }
609
610    public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
611        this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
612    }
613
614    public int getMaximumConsumersAllowedPerConnection() {
615        return maximumConsumersAllowedPerConnection;
616    }
617
618    public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
619        this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
620    }
621
622    /**
623     * Gets the currently configured policy for creating the published connection address of this
624     * TransportConnector.
625     *
626     * @return the publishedAddressPolicy
627     */
628    public PublishedAddressPolicy getPublishedAddressPolicy() {
629        return publishedAddressPolicy;
630    }
631
632    /**
633     * Sets the configured policy for creating the published connection address of this
634     * TransportConnector.
635     *
636     * @return the publishedAddressPolicy
637     */
638    public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) {
639        this.publishedAddressPolicy = publishedAddressPolicy;
640    }
641}