001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.util; 018 019import org.apache.activemq.broker.BrokerPluginSupport; 020import org.apache.activemq.broker.ProducerBrokerExchange; 021import org.apache.activemq.broker.region.Destination; 022import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 023import org.apache.activemq.command.ActiveMQDestination; 024import org.apache.activemq.command.ActiveMQMessage; 025import org.apache.activemq.command.Message; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * A Broker interceptor which updates a JMS Client's timestamp on the message 031 * with a broker timestamp. Useful when the clocks on client machines are known 032 * to not be correct and you can only trust the time set on the broker machines. 033 * 034 * Enabling this plugin will break JMS compliance since the timestamp that the 035 * producer sees on the messages after as send() will be different from the 036 * timestamp the consumer will observe when he receives the message. This plugin 037 * is not enabled in the default ActiveMQ configuration. 038 * 039 * 2 new attributes have been added which will allow the administrator some override control 040 * over the expiration time for incoming messages: 041 * 042 * Attribute 'zeroExpirationOverride' can be used to apply an expiration 043 * time to incoming messages with no expiration defined (messages that would never expire) 044 * 045 * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time 046 * 047 * @org.apache.xbean.XBean element="timeStampingBrokerPlugin" 048 * 049 * 050 */ 051public class TimeStampingBrokerPlugin extends BrokerPluginSupport { 052 private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class); 053 /** 054 * variable which (when non-zero) is used to override 055 * the expiration date for messages that arrive with 056 * no expiration date set (in Milliseconds). 057 */ 058 long zeroExpirationOverride = 0; 059 060 /** 061 * variable which (when non-zero) is used to limit 062 * the expiration date (in Milliseconds). 063 */ 064 long ttlCeiling = 0; 065 066 /** 067 * If true, the plugin will not update timestamp to past values 068 * False by default 069 */ 070 boolean futureOnly = false; 071 072 073 /** 074 * if true, update timestamp even if message has passed through a network 075 * default false 076 */ 077 boolean processNetworkMessages = false; 078 079 /** 080 * setter method for zeroExpirationOverride 081 */ 082 public void setZeroExpirationOverride(long ttl) 083 { 084 this.zeroExpirationOverride = ttl; 085 } 086 087 /** 088 * setter method for ttlCeiling 089 */ 090 public void setTtlCeiling(long ttlCeiling) 091 { 092 this.ttlCeiling = ttlCeiling; 093 } 094 095 public void setFutureOnly(boolean futureOnly) { 096 this.futureOnly = futureOnly; 097 } 098 099 public void setProcessNetworkMessages(Boolean processNetworkMessages) { 100 this.processNetworkMessages = processNetworkMessages; 101 } 102 103 @Override 104 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 105 106 if (message.getTimestamp() > 0 && !isDestinationDLQ(message) && 107 (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) { 108 // timestamp not been disabled and has not passed through a network or processNetworkMessages=true 109 110 long oldExpiration = message.getExpiration(); 111 long newTimeStamp = System.currentTimeMillis(); 112 long timeToLive = zeroExpirationOverride; 113 long oldTimestamp = message.getTimestamp(); 114 if (oldExpiration > 0) { 115 timeToLive = oldExpiration - oldTimestamp; 116 } 117 if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) { 118 timeToLive = ttlCeiling; 119 } 120 long expiration = timeToLive + newTimeStamp; 121 // In the scenario that the Broker is behind the clients we never want to set the 122 // Timestamp and Expiration in the past 123 if(!futureOnly || (expiration > oldExpiration)) { 124 if (timeToLive > 0 && expiration > 0) { 125 message.setExpiration(expiration); 126 } 127 message.setTimestamp(newTimeStamp); 128 LOG.debug("Set message {} timestamp from {} to {}", new Object[]{ message.getMessageId(), oldTimestamp, newTimeStamp }); 129 } 130 } 131 super.send(producerExchange, message); 132 } 133 134 private boolean isDestinationDLQ(Message message) { 135 DeadLetterStrategy deadLetterStrategy; 136 Message tmp; 137 138 Destination regionDestination = (Destination) message.getRegionDestination(); 139 if (message != null && regionDestination != null) { 140 deadLetterStrategy = regionDestination.getDeadLetterStrategy(); 141 if (deadLetterStrategy != null && message.getOriginalDestination() != null) { 142 // Cheap copy, since we only need two fields 143 tmp = new ActiveMQMessage(); 144 tmp.setDestination(message.getOriginalDestination()); 145 tmp.setRegionDestination(regionDestination); 146 147 // Determine if we are headed for a DLQ 148 ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(tmp, null); 149 if (deadLetterDestination.equals(message.getDestination())) { 150 return true; 151 } 152 } 153 } 154 return false; 155 } 156}