001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.atomic.AtomicLong;
022import javax.jms.Destination;
023import javax.jms.IllegalStateException;
024import javax.jms.InvalidDestinationException;
025import javax.jms.JMSException;
026import javax.jms.Message;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.ProducerAck;
029import org.apache.activemq.command.ProducerId;
030import org.apache.activemq.command.ProducerInfo;
031import org.apache.activemq.management.JMSProducerStatsImpl;
032import org.apache.activemq.management.StatsCapable;
033import org.apache.activemq.management.StatsImpl;
034import org.apache.activemq.usage.MemoryUsage;
035import org.apache.activemq.util.IntrospectionSupport;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
041 * destination. A <CODE>MessageProducer</CODE> object is created by passing a
042 * <CODE>Destination</CODE> object to a message-producer creation method
043 * supplied by a session.
044 * <P>
045 * <CODE>MessageProducer</CODE> is the parent interface for all message
046 * producers.
047 * <P>
048 * A client also has the option of creating a message producer without supplying
049 * a destination. In this case, a destination must be provided with every send
050 * operation. A typical use for this kind of message producer is to send replies
051 * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
052 * <P>
053 * A client can specify a default delivery mode, priority, and time to live for
054 * messages sent by a message producer. It can also specify the delivery mode,
055 * priority, and time to live for an individual message.
056 * <P>
057 * A client can specify a time-to-live value in milliseconds for each message it
058 * sends. This value defines a message expiration time that is the sum of the
059 * message's time-to-live and the GMT when it is sent (for transacted sends,
060 * this is the time the client sends the message, not the time the transaction
061 * is committed).
062 * <P>
063 * A JMS provider should do its best to expire messages accurately; however, the
064 * JMS API does not define the accuracy provided.
065 *
066 *
067 * @see javax.jms.TopicPublisher
068 * @see javax.jms.QueueSender
069 * @see javax.jms.Session#createProducer
070 */
071public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
072
073    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageProducer.class);
074
075    protected ProducerInfo info;
076    protected boolean closed;
077
078    private final JMSProducerStatsImpl stats;
079    private AtomicLong messageSequence;
080    private final long startTime;
081    private MessageTransformer transformer;
082    private MemoryUsage producerWindow;
083
084    protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
085        super(session);
086        this.info = new ProducerInfo(producerId);
087        this.info.setWindowSize(session.connection.getProducerWindowSize());
088        // Allows the options on the destination to configure the producerInfo
089        if (destination != null && destination.getOptions() != null) {
090            Map<String, Object> options = IntrospectionSupport.extractProperties(
091                new HashMap<String, Object>(destination.getOptions()), "producer.");
092            IntrospectionSupport.setProperties(this.info, options);
093            if (options.size() > 0) {
094                String msg = "There are " + options.size()
095                    + " producer options that couldn't be set on the producer."
096                    + " Check the options are spelled correctly."
097                    + " Unknown parameters=[" + options + "]."
098                    + " This producer cannot be started.";
099                LOG.warn(msg);
100                throw new ConfigurationException(msg);
101            }
102        }
103
104        this.info.setDestination(destination);
105
106        // Enable producer window flow control if protocol > 3 and the window
107        // size > 0
108        if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
109            producerWindow = new MemoryUsage("Producer Window: " + producerId);
110            producerWindow.setExecutor(session.getConnectionExecutor());
111            producerWindow.setLimit(this.info.getWindowSize());
112            producerWindow.start();
113        }
114
115        this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
116        this.defaultPriority = Message.DEFAULT_PRIORITY;
117        this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
118        this.startTime = System.currentTimeMillis();
119        this.messageSequence = new AtomicLong(0);
120        this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
121        this.session.addProducer(this);
122        this.session.asyncSendPacket(info);
123        this.setSendTimeout(sendTimeout);
124        setTransformer(session.getTransformer());
125    }
126
127    public StatsImpl getStats() {
128        return stats;
129    }
130
131    public JMSProducerStatsImpl getProducerStats() {
132        return stats;
133    }
134
135    /**
136     * Gets the destination associated with this <CODE>MessageProducer</CODE>.
137     *
138     * @return this producer's <CODE>Destination/ <CODE>
139     * @throws JMSException if the JMS provider fails to close the producer due to
140     *                      some internal error.
141     * @since 1.1
142     */
143    public Destination getDestination() throws JMSException {
144        checkClosed();
145        return this.info.getDestination();
146    }
147
148    /**
149     * Closes the message producer.
150     * <P>
151     * Since a provider may allocate some resources on behalf of a <CODE>
152     * MessageProducer</CODE>
153     * outside the Java virtual machine, clients should close them when they are
154     * not needed. Relying on garbage collection to eventually reclaim these
155     * resources may not be timely enough.
156     *
157     * @throws JMSException if the JMS provider fails to close the producer due
158     *                 to some internal error.
159     */
160    public void close() throws JMSException {
161        if (!closed) {
162            dispose();
163            this.session.asyncSendPacket(info.createRemoveCommand());
164        }
165    }
166
167    public void dispose() {
168        if (!closed) {
169            this.session.removeProducer(this);
170            if (producerWindow != null) {
171                producerWindow.stop();
172            }
173            closed = true;
174        }
175    }
176
177    /**
178     * Check if the instance of this producer has been closed.
179     *
180     * @throws IllegalStateException
181     */
182    @Override
183    protected void checkClosed() throws IllegalStateException {
184        if (closed) {
185            throw new IllegalStateException("The producer is closed");
186        }
187    }
188
189    /**
190     * Sends a message to a destination for an unidentified message producer,
191     * specifying delivery mode, priority and time to live.
192     * <P>
193     * Typically, a message producer is assigned a destination at creation time;
194     * however, the JMS API also supports unidentified message producers, which
195     * require that the destination be supplied every time a message is sent.
196     *
197     * @param destination the destination to send this message to
198     * @param message the message to send
199     * @param deliveryMode the delivery mode to use
200     * @param priority the priority for this message
201     * @param timeToLive the message's lifetime (in milliseconds)
202     * @throws JMSException if the JMS provider fails to send the message due to
203     *                 some internal error.
204     * @throws UnsupportedOperationException if an invalid destination is
205     *                 specified.
206     * @throws InvalidDestinationException if a client uses this method with an
207     *                 invalid destination.
208     * @see javax.jms.Session#createProducer
209     * @since 1.1
210     */
211    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
212        this.send(destination, message, deliveryMode, priority, timeToLive, null);
213    }
214
215    public void send(Message message, AsyncCallback onComplete) throws JMSException {
216        this.send(this.getDestination(),
217                  message,
218                  this.defaultDeliveryMode,
219                  this.defaultPriority,
220                  this.defaultTimeToLive, onComplete);
221    }
222
223    public void send(Destination destination, Message message, AsyncCallback onComplete) throws JMSException {
224        this.send(destination,
225                  message,
226                  this.defaultDeliveryMode,
227                  this.defaultPriority,
228                  this.defaultTimeToLive,
229                  onComplete);
230    }
231
232    public void send(Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
233        this.send(this.getDestination(),
234                  message,
235                  deliveryMode,
236                  priority,
237                  timeToLive,
238                  onComplete);
239    }
240
241    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
242        checkClosed();
243        if (destination == null) {
244            if (info.getDestination() == null) {
245                throw new UnsupportedOperationException("A destination must be specified.");
246            }
247            throw new InvalidDestinationException("Don't understand null destinations");
248        }
249
250        ActiveMQDestination dest;
251        if (destination == info.getDestination()) {
252            dest = (ActiveMQDestination)destination;
253        } else if (info.getDestination() == null) {
254            dest = ActiveMQDestination.transform(destination);
255        } else {
256            throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
257        }
258        if (dest == null) {
259            throw new JMSException("No destination specified");
260        }
261
262        if (transformer != null) {
263            Message transformedMessage = transformer.producerTransform(session, this, message);
264            if (transformedMessage != null) {
265                message = transformedMessage;
266            }
267        }
268
269        if (producerWindow != null) {
270            try {
271                producerWindow.waitForSpace();
272            } catch (InterruptedException e) {
273                throw new JMSException("Send aborted due to thread interrupt.");
274            }
275        }
276
277        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
278
279        stats.onMessage();
280    }
281
282    public MessageTransformer getTransformer() {
283        return transformer;
284    }
285
286    /**
287     * Sets the transformer used to transform messages before they are sent on
288     * to the JMS bus
289     */
290    public void setTransformer(MessageTransformer transformer) {
291        this.transformer = transformer;
292    }
293
294    /**
295     * @return the time in milli second when this object was created.
296     */
297    protected long getStartTime() {
298        return this.startTime;
299    }
300
301    /**
302     * @return Returns the messageSequence.
303     */
304    protected long getMessageSequence() {
305        return messageSequence.incrementAndGet();
306    }
307
308    /**
309     * @param messageSequence The messageSequence to set.
310     */
311    protected void setMessageSequence(AtomicLong messageSequence) {
312        this.messageSequence = messageSequence;
313    }
314
315    /**
316     * @return Returns the info.
317     */
318    protected ProducerInfo getProducerInfo() {
319        return this.info != null ? this.info : null;
320    }
321
322    /**
323     * @param info The info to set
324     */
325    protected void setProducerInfo(ProducerInfo info) {
326        this.info = info;
327    }
328
329    @Override
330    public String toString() {
331        return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
332    }
333
334    public void onProducerAck(ProducerAck pa) {
335        if (this.producerWindow != null) {
336            this.producerWindow.decreaseUsage(pa.getSize());
337        }
338    }
339
340}