001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.IOException; 020import java.net.InetAddress; 021import java.net.InetSocketAddress; 022import java.net.ServerSocket; 023import java.net.Socket; 024import java.net.SocketException; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.UnknownHostException; 029import java.nio.channels.ClosedChannelException; 030import java.nio.channels.SelectionKey; 031import java.nio.channels.Selector; 032import java.nio.channels.ServerSocketChannel; 033import java.nio.channels.SocketChannel; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.Set; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicInteger; 041 042import javax.net.ServerSocketFactory; 043import javax.net.ssl.SSLServerSocket; 044 045import org.apache.activemq.Service; 046import org.apache.activemq.ThreadPriorities; 047import org.apache.activemq.TransportLoggerSupport; 048import org.apache.activemq.command.BrokerInfo; 049import org.apache.activemq.openwire.OpenWireFormatFactory; 050import org.apache.activemq.transport.Transport; 051import org.apache.activemq.transport.TransportFactory; 052import org.apache.activemq.transport.TransportServer; 053import org.apache.activemq.transport.TransportServerThreadSupport; 054import org.apache.activemq.util.IOExceptionSupport; 055import org.apache.activemq.util.InetAddressUtil; 056import org.apache.activemq.util.IntrospectionSupport; 057import org.apache.activemq.util.ServiceListener; 058import org.apache.activemq.util.ServiceStopper; 059import org.apache.activemq.util.ServiceSupport; 060import org.apache.activemq.wireformat.WireFormat; 061import org.apache.activemq.wireformat.WireFormatFactory; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * A TCP based implementation of {@link TransportServer} 067 */ 068public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { 069 070 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); 071 072 protected volatile ServerSocket serverSocket; 073 protected volatile Selector selector; 074 protected int backlog = 5000; 075 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 076 protected final TcpTransportFactory transportFactory; 077 protected long maxInactivityDuration = 30000; 078 protected long maxInactivityDurationInitalDelay = 10000; 079 protected int minmumWireFormatVersion; 080 protected boolean useQueueForAccept = true; 081 protected boolean allowLinkStealing; 082 083 /** 084 * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer 085 * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, 086 * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or 087 * TransportConnector URIs. 088 */ 089 protected boolean trace = false; 090 091 protected int soTimeout = 0; 092 protected int socketBufferSize = 64 * 1024; 093 protected int connectionTimeout = 30000; 094 095 /** 096 * Name of the LogWriter implementation to use. Names are mapped to classes in the 097 * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably 098 * set in Connection or TransportConnector URIs. 099 */ 100 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; 101 102 /** 103 * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 104 * TransportLogger which is manageable, a TransportLoggerControl MBean will me created. 105 */ 106 protected boolean dynamicManagement = false; 107 108 /** 109 * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. 110 * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the 111 * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or 112 * TransportConnector URIs. 113 */ 114 protected boolean startLogging = true; 115 protected final ServerSocketFactory serverSocketFactory; 116 protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); 117 protected Thread socketHandlerThread; 118 119 /** 120 * The maximum number of sockets allowed for this server 121 */ 122 protected int maximumConnections = Integer.MAX_VALUE; 123 protected final AtomicInteger currentTransportCount = new AtomicInteger(); 124 125 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, 126 URISyntaxException { 127 super(location); 128 this.transportFactory = transportFactory; 129 this.serverSocketFactory = serverSocketFactory; 130 } 131 132 public void bind() throws IOException { 133 URI bind = getBindLocation(); 134 135 String host = bind.getHost(); 136 host = (host == null || host.length() == 0) ? "localhost" : host; 137 InetAddress addr = InetAddress.getByName(host); 138 139 try { 140 serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 141 configureServerSocket(serverSocket); 142 } catch (IOException e) { 143 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 144 } 145 try { 146 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), 147 bind.getQuery(), bind.getFragment())); 148 } catch (URISyntaxException e) { 149 // it could be that the host name contains invalid characters such 150 // as _ on unix platforms so lets try use the IP address instead 151 try { 152 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), 153 bind.getQuery(), bind.getFragment())); 154 } catch (URISyntaxException e2) { 155 throw IOExceptionSupport.create(e2); 156 } 157 } 158 } 159 160 private void configureServerSocket(ServerSocket socket) throws SocketException { 161 socket.setSoTimeout(2000); 162 if (transportOptions != null) { 163 164 // If the enabledCipherSuites option is invalid we don't want to ignore it as the call 165 // to SSLServerSocket to configure it has a side effect on the socket rendering it 166 // useless as all suites are enabled many of which are considered as insecure. We 167 // instead trap that option here and throw an exception. We should really consider 168 // all invalid options as breaking and not start the transport but the current design 169 // doesn't really allow for this. 170 // 171 // see: https://issues.apache.org/jira/browse/AMQ-4582 172 // 173 if (socket instanceof SSLServerSocket) { 174 if (transportOptions.containsKey("enabledCipherSuites")) { 175 Object cipherSuites = transportOptions.remove("enabledCipherSuites"); 176 177 if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) { 178 throw new SocketException(String.format( 179 "Invalid transport options {enabledCipherSuites=%s}", cipherSuites)); 180 } 181 } 182 } 183 184 IntrospectionSupport.setProperties(socket, transportOptions); 185 } 186 } 187 188 /** 189 * @return Returns the wireFormatFactory. 190 */ 191 public WireFormatFactory getWireFormatFactory() { 192 return wireFormatFactory; 193 } 194 195 /** 196 * @param wireFormatFactory 197 * The wireFormatFactory to set. 198 */ 199 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 200 this.wireFormatFactory = wireFormatFactory; 201 } 202 203 /** 204 * Associates a broker info with the transport server so that the transport can do discovery advertisements of the 205 * broker. 206 * 207 * @param brokerInfo 208 */ 209 @Override 210 public void setBrokerInfo(BrokerInfo brokerInfo) { 211 } 212 213 public long getMaxInactivityDuration() { 214 return maxInactivityDuration; 215 } 216 217 public void setMaxInactivityDuration(long maxInactivityDuration) { 218 this.maxInactivityDuration = maxInactivityDuration; 219 } 220 221 public long getMaxInactivityDurationInitalDelay() { 222 return this.maxInactivityDurationInitalDelay; 223 } 224 225 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { 226 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; 227 } 228 229 public int getMinmumWireFormatVersion() { 230 return minmumWireFormatVersion; 231 } 232 233 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 234 this.minmumWireFormatVersion = minmumWireFormatVersion; 235 } 236 237 public boolean isTrace() { 238 return trace; 239 } 240 241 public void setTrace(boolean trace) { 242 this.trace = trace; 243 } 244 245 public String getLogWriterName() { 246 return logWriterName; 247 } 248 249 public void setLogWriterName(String logFormat) { 250 this.logWriterName = logFormat; 251 } 252 253 public boolean isDynamicManagement() { 254 return dynamicManagement; 255 } 256 257 public void setDynamicManagement(boolean useJmx) { 258 this.dynamicManagement = useJmx; 259 } 260 261 public boolean isStartLogging() { 262 return startLogging; 263 } 264 265 public void setStartLogging(boolean startLogging) { 266 this.startLogging = startLogging; 267 } 268 269 /** 270 * @return the backlog 271 */ 272 public int getBacklog() { 273 return backlog; 274 } 275 276 /** 277 * @param backlog 278 * the backlog to set 279 */ 280 public void setBacklog(int backlog) { 281 this.backlog = backlog; 282 } 283 284 /** 285 * @return the useQueueForAccept 286 */ 287 public boolean isUseQueueForAccept() { 288 return useQueueForAccept; 289 } 290 291 /** 292 * @param useQueueForAccept 293 * the useQueueForAccept to set 294 */ 295 public void setUseQueueForAccept(boolean useQueueForAccept) { 296 this.useQueueForAccept = useQueueForAccept; 297 } 298 299 /** 300 * pull Sockets from the ServerSocket 301 */ 302 @Override 303 public void run() { 304 if (!isStopped() && !isStopping()) { 305 final ServerSocket serverSocket = this.serverSocket; 306 if (serverSocket == null) { 307 onAcceptError(new IOException("Server started without a valid ServerSocket")); 308 } 309 310 final ServerSocketChannel channel = serverSocket.getChannel(); 311 if (channel != null) { 312 doRunWithServerSocketChannel(channel); 313 } else { 314 doRunWithServerSocket(serverSocket); 315 } 316 } 317 } 318 319 private void doRunWithServerSocketChannel(final ServerSocketChannel channel) { 320 try { 321 channel.configureBlocking(false); 322 final Selector selector = Selector.open(); 323 324 try { 325 channel.register(selector, SelectionKey.OP_ACCEPT); 326 } catch (ClosedChannelException ex) { 327 try { 328 selector.close(); 329 } catch (IOException ignore) {} 330 331 throw ex; 332 } 333 334 // Update object instance for later cleanup. 335 this.selector = selector; 336 337 while (!isStopped()) { 338 int count = selector.select(10); 339 340 if (count == 0) { 341 continue; 342 } 343 344 Set<SelectionKey> keys = selector.selectedKeys(); 345 346 for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { 347 final SelectionKey key = i.next(); 348 if (key.isAcceptable()) { 349 try { 350 SocketChannel sc = channel.accept(); 351 if (sc != null) { 352 if (isStopped() || getAcceptListener() == null) { 353 sc.close(); 354 } else { 355 if (useQueueForAccept) { 356 socketQueue.put(sc.socket()); 357 } else { 358 handleSocket(sc.socket()); 359 } 360 } 361 } 362 363 } catch (SocketTimeoutException ste) { 364 // expect this to happen 365 } catch (Exception e) { 366 e.printStackTrace(); 367 if (!isStopping()) { 368 onAcceptError(e); 369 } else if (!isStopped()) { 370 LOG.warn("run()", e); 371 onAcceptError(e); 372 } 373 } 374 } 375 i.remove(); 376 } 377 } 378 } catch (IOException ex) { 379 if (!isStopping()) { 380 onAcceptError(ex); 381 } else if (!isStopped()) { 382 LOG.warn("run()", ex); 383 onAcceptError(ex); 384 } 385 } 386 } 387 388 private void doRunWithServerSocket(final ServerSocket serverSocket) { 389 while (!isStopped()) { 390 Socket socket = null; 391 try { 392 socket = serverSocket.accept(); 393 if (socket != null) { 394 if (isStopped() || getAcceptListener() == null) { 395 socket.close(); 396 } else { 397 if (useQueueForAccept) { 398 socketQueue.put(socket); 399 } else { 400 handleSocket(socket); 401 } 402 } 403 } 404 } catch (SocketTimeoutException ste) { 405 // expect this to happen 406 } catch (Exception e) { 407 if (!isStopping()) { 408 onAcceptError(e); 409 } else if (!isStopped()) { 410 LOG.warn("run()", e); 411 onAcceptError(e); 412 } 413 } 414 } 415 } 416 417 /** 418 * Allow derived classes to override the Transport implementation that this transport server creates. 419 * 420 * @param socket 421 * @param format 422 * 423 * @return a new Transport instance. 424 * 425 * @throws IOException 426 */ 427 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 428 return new TcpTransport(format, socket); 429 } 430 431 /** 432 * @return pretty print of this 433 */ 434 @Override 435 public String toString() { 436 return "" + getBindLocation(); 437 } 438 439 /** 440 * @param socket 441 * @param bindAddress 442 * @return real hostName 443 * @throws UnknownHostException 444 */ 445 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { 446 String result = null; 447 if (socket.isBound()) { 448 if (socket.getInetAddress().isAnyLocalAddress()) { 449 // make it more human readable and useful, an alternative to 0.0.0.0 450 result = InetAddressUtil.getLocalHostName(); 451 } else { 452 result = socket.getInetAddress().getCanonicalHostName(); 453 } 454 } else { 455 result = bindAddress.getCanonicalHostName(); 456 } 457 return result; 458 } 459 460 @Override 461 protected void doStart() throws Exception { 462 if (useQueueForAccept) { 463 Runnable run = new Runnable() { 464 @Override 465 public void run() { 466 try { 467 while (!isStopped() && !isStopping()) { 468 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 469 if (sock != null) { 470 try { 471 handleSocket(sock); 472 } catch (Throwable thrown) { 473 if (!isStopping()) { 474 onAcceptError(new Exception(thrown)); 475 } else if (!isStopped()) { 476 LOG.warn("Unexpected error thrown during accept handling: ", thrown); 477 onAcceptError(new Exception(thrown)); 478 } 479 } 480 } 481 } 482 483 } catch (InterruptedException e) { 484 if (!isStopped() || !isStopping()) { 485 LOG.info("socketQueue interrupted - stopping"); 486 onAcceptError(e); 487 } 488 } 489 } 490 }; 491 socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize()); 492 socketHandlerThread.setDaemon(true); 493 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1); 494 socketHandlerThread.start(); 495 } 496 super.doStart(); 497 } 498 499 @Override 500 protected void doStop(ServiceStopper stopper) throws Exception { 501 Exception firstFailure = null; 502 503 try { 504 if (selector != null) { 505 selector.close(); 506 selector = null; 507 } 508 } catch (Exception error) { 509 } 510 511 try { 512 final ServerSocket serverSocket = this.serverSocket; 513 if (serverSocket != null) { 514 this.serverSocket = null; 515 serverSocket.close(); 516 } 517 } catch (Exception error) { 518 firstFailure = error; 519 } 520 521 if (socketHandlerThread != null) { 522 socketHandlerThread.interrupt(); 523 socketHandlerThread = null; 524 } 525 526 try { 527 super.doStop(stopper); 528 } catch (Exception error) { 529 if (firstFailure != null) { 530 firstFailure = error; 531 } 532 } 533 534 if (firstFailure != null) { 535 throw firstFailure; 536 } 537 } 538 539 @Override 540 public InetSocketAddress getSocketAddress() { 541 return (InetSocketAddress) serverSocket.getLocalSocketAddress(); 542 } 543 544 protected void handleSocket(Socket socket) { 545 doHandleSocket(socket); 546 } 547 548 final protected void doHandleSocket(Socket socket) { 549 boolean closeSocket = true; 550 boolean countIncremented = false; 551 try { 552 int currentCount; 553 do { 554 currentCount = currentTransportCount.get(); 555 if (currentCount >= this.maximumConnections) { 556 throw new ExceededMaximumConnectionsException( 557 "Exceeded the maximum number of allowed client connections. See the '" + 558 "maximumConnections' property on the TCP transport configuration URI " + 559 "in the ActiveMQ configuration file (e.g., activemq.xml)"); 560 } 561 562 //Increment this value before configuring the transport 563 //This is necessary because some of the transport servers must read from the 564 //socket during configureTransport() so we want to make sure this value is 565 //accurate as the transport server could pause here waiting for data to be sent from a client 566 } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1)); 567 countIncremented = true; 568 569 HashMap<String, Object> options = new HashMap<String, Object>(); 570 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); 571 options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); 572 options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); 573 options.put("trace", Boolean.valueOf(trace)); 574 options.put("soTimeout", Integer.valueOf(soTimeout)); 575 options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); 576 options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); 577 options.put("logWriterName", logWriterName); 578 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); 579 options.put("startLogging", Boolean.valueOf(startLogging)); 580 options.putAll(transportOptions); 581 582 TransportInfo transportInfo = configureTransport(this, socket); 583 closeSocket = false; 584 585 if (transportInfo.transport instanceof ServiceSupport) { 586 ((ServiceSupport) transportInfo.transport).addServiceListener(this); 587 } 588 589 Transport configuredTransport = transportInfo.transportFactory.serverConfigure( 590 transportInfo.transport, transportInfo.format, options); 591 592 getAcceptListener().onAccept(configuredTransport); 593 594 } catch (SocketTimeoutException ste) { 595 // expect this to happen 596 } catch (Exception e) { 597 if (closeSocket) { 598 try { 599 //if closing the socket, only decrement the count it was actually incremented 600 //where it was incremented 601 if (countIncremented) { 602 currentTransportCount.decrementAndGet(); 603 } 604 socket.close(); 605 } catch (Exception ignore) { 606 } 607 } 608 609 if (!isStopping()) { 610 onAcceptError(e); 611 } else if (!isStopped()) { 612 LOG.warn("run()", e); 613 onAcceptError(e); 614 } 615 } 616 } 617 618 protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { 619 WireFormat format = wireFormatFactory.createWireFormat(); 620 Transport transport = createTransport(socket, format); 621 return new TransportInfo(format, transport, transportFactory); 622 } 623 624 protected class TransportInfo { 625 final WireFormat format; 626 final Transport transport; 627 final TransportFactory transportFactory; 628 629 public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) { 630 this.format = format; 631 this.transport = transport; 632 this.transportFactory = transportFactory; 633 } 634 } 635 636 public int getSoTimeout() { 637 return soTimeout; 638 } 639 640 public void setSoTimeout(int soTimeout) { 641 this.soTimeout = soTimeout; 642 } 643 644 public int getSocketBufferSize() { 645 return socketBufferSize; 646 } 647 648 public void setSocketBufferSize(int socketBufferSize) { 649 this.socketBufferSize = socketBufferSize; 650 } 651 652 public int getConnectionTimeout() { 653 return connectionTimeout; 654 } 655 656 public void setConnectionTimeout(int connectionTimeout) { 657 this.connectionTimeout = connectionTimeout; 658 } 659 660 /** 661 * @return the maximumConnections 662 */ 663 public int getMaximumConnections() { 664 return maximumConnections; 665 } 666 667 /** 668 * @param maximumConnections 669 * the maximumConnections to set 670 */ 671 public void setMaximumConnections(int maximumConnections) { 672 this.maximumConnections = maximumConnections; 673 } 674 675 public AtomicInteger getCurrentTransportCount() { 676 return currentTransportCount; 677 } 678 679 @Override 680 public void started(Service service) { 681 } 682 683 @Override 684 public void stopped(Service service) { 685 this.currentTransportCount.decrementAndGet(); 686 } 687 688 @Override 689 public boolean isSslServer() { 690 return false; 691 } 692 693 @Override 694 public boolean isAllowLinkStealing() { 695 return allowLinkStealing; 696 } 697 698 @Override 699 public void setAllowLinkStealing(boolean allowLinkStealing) { 700 this.allowLinkStealing = allowLinkStealing; 701 } 702}