001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.ft;
018
019import java.util.Map;
020import java.util.concurrent.ConcurrentHashMap;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import org.apache.activemq.broker.Connection;
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.broker.ConsumerBrokerExchange;
026import org.apache.activemq.broker.InsertableMutableBrokerFilter;
027import org.apache.activemq.broker.MutableBrokerFilter;
028import org.apache.activemq.broker.ProducerBrokerExchange;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.command.Command;
031import org.apache.activemq.command.ConnectionControl;
032import org.apache.activemq.command.ConnectionInfo;
033import org.apache.activemq.command.ConsumerId;
034import org.apache.activemq.command.ConsumerInfo;
035import org.apache.activemq.command.DestinationInfo;
036import org.apache.activemq.command.ExceptionResponse;
037import org.apache.activemq.command.Message;
038import org.apache.activemq.command.MessageAck;
039import org.apache.activemq.command.MessageDispatch;
040import org.apache.activemq.command.MessageDispatchNotification;
041import org.apache.activemq.command.ProducerInfo;
042import org.apache.activemq.command.RemoveInfo;
043import org.apache.activemq.command.RemoveSubscriptionInfo;
044import org.apache.activemq.command.Response;
045import org.apache.activemq.command.SessionInfo;
046import org.apache.activemq.command.TransactionId;
047import org.apache.activemq.command.TransactionInfo;
048import org.apache.activemq.transport.MutexTransport;
049import org.apache.activemq.transport.ResponseCorrelator;
050import org.apache.activemq.transport.Transport;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * The Message Broker which passes messages to a slave
056 * 
057 * 
058 */
059public class MasterBroker extends InsertableMutableBrokerFilter {
060
061    private static final Logger LOG = LoggerFactory.getLogger(MasterBroker.class);
062    private Transport slave;
063    private AtomicBoolean started = new AtomicBoolean(false);
064
065    private Map<ConsumerId, ConsumerId> consumers = new ConcurrentHashMap<ConsumerId, ConsumerId>();
066    
067    /**
068     * Constructor
069     * 
070     * @param parent
071     * @param transport
072     */
073    public MasterBroker(MutableBrokerFilter parent, Transport transport) {
074        super(parent);
075        this.slave = transport;
076        this.slave = new MutexTransport(slave);
077        this.slave = new ResponseCorrelator(slave);
078        this.slave.setTransportListener(transport.getTransportListener());
079    }
080
081    /**
082     * start processing this broker
083     */
084    public void startProcessing() {
085        started.set(true);
086        try {
087            Connection[] connections = getClients();
088            ConnectionControl command = new ConnectionControl();
089            command.setFaultTolerant(true);
090            if (connections != null) {
091                for (int i = 0; i < connections.length; i++) {
092                    if (connections[i].isActive() && connections[i].isManageable()) {
093                        connections[i].dispatchAsync(command);
094                    }
095                }
096            }
097        } catch (Exception e) {
098            LOG.error("Failed to get Connections", e);
099        }
100    }
101
102    /**
103     * stop the broker
104     * 
105     * @throws Exception
106     */
107    public void stop() throws Exception {
108        stopProcessing();
109    }
110
111    /**
112     * stop processing this broker
113     */
114    public void stopProcessing() {
115        if (started.compareAndSet(true, false)) {
116            remove();
117        }
118    }
119
120    /**
121     * A client is establishing a connection with the broker.
122     * 
123     * @param context
124     * @param info
125     * @throws Exception
126     */
127    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
128        super.addConnection(context, info);
129        sendAsyncToSlave(info);
130    }
131
132    /**
133     * A client is disconnecting from the broker.
134     * 
135     * @param context the environment the operation is being executed under.
136     * @param info
137     * @param error null if the client requested the disconnect or the error
138     *                that caused the client to disconnect.
139     * @throws Exception
140     */
141    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
142        super.removeConnection(context, info, error);
143        sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
144    }
145
146    /**
147     * Adds a session.
148     * 
149     * @param context
150     * @param info
151     * @throws Exception
152     */
153    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
154        super.addSession(context, info);
155        sendAsyncToSlave(info);
156    }
157
158    /**
159     * Removes a session.
160     * 
161     * @param context
162     * @param info
163     * @throws Exception
164     */
165    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
166        super.removeSession(context, info);
167        sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
168    }
169
170    /**
171     * Adds a producer.
172     * 
173     * @param context the enviorment the operation is being executed under.
174     * @param info
175     * @throws Exception
176     */
177    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
178        super.addProducer(context, info);
179        sendAsyncToSlave(info);
180    }
181
182    /**
183     * Removes a producer.
184     * 
185     * @param context the environment the operation is being executed under.
186     * @param info
187     * @throws Exception
188     */
189    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
190        super.removeProducer(context, info);
191        sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
192    }
193
194    /**
195     * add a consumer
196     * 
197     * @param context
198     * @param info
199     * @return the associated subscription
200     * @throws Exception
201     */
202    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
203        sendSyncToSlave(info);
204        consumers.put(info.getConsumerId(), info.getConsumerId());
205        return super.addConsumer(context, info);
206    }
207
208    @Override
209    public void removeConsumer(ConnectionContext context, ConsumerInfo info)
210            throws Exception {
211        super.removeConsumer(context, info);
212        consumers.remove(info.getConsumerId());
213        sendSyncToSlave(new RemoveInfo(info.getConsumerId()));
214   }
215
216    /**
217     * remove a subscription
218     * 
219     * @param context
220     * @param info
221     * @throws Exception
222     */
223    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
224        super.removeSubscription(context, info);
225        sendAsyncToSlave(info);
226    }
227    
228    @Override
229    public void addDestinationInfo(ConnectionContext context,
230            DestinationInfo info) throws Exception {
231        super.addDestinationInfo(context, info);
232        if (info.getDestination().isTemporary()) {
233            sendAsyncToSlave(info);
234        }
235    }
236
237    @Override
238    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
239        super.removeDestinationInfo(context, info);
240        if (info.getDestination().isTemporary()) {
241            sendAsyncToSlave(info);
242        }
243    }
244    
245    /**
246     * begin a transaction
247     * 
248     * @param context
249     * @param xid
250     * @throws Exception
251     */
252    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
253        TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN);
254        sendAsyncToSlave(info);
255        super.beginTransaction(context, xid);
256    }
257
258    /**
259     * Prepares a transaction. Only valid for xa transactions.
260     * 
261     * @param context
262     * @param xid
263     * @return the state
264     * @throws Exception
265     */
266    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
267        TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE);
268        sendSyncToSlave(info);
269        int result = super.prepareTransaction(context, xid);
270        return result;
271    }
272
273    /**
274     * Rollsback a transaction.
275     * 
276     * @param context
277     * @param xid
278     * @throws Exception
279     */
280    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
281        TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK);
282        sendAsyncToSlave(info);
283        super.rollbackTransaction(context, xid);
284    }
285
286    /**
287     * Commits a transaction.
288     * 
289     * @param context
290     * @param xid
291     * @param onePhase
292     * @throws Exception
293     */
294    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
295        TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.COMMIT_ONE_PHASE);
296        sendSyncToSlave(info);
297        super.commitTransaction(context, xid, onePhase);
298    }
299
300    /**
301     * Forgets a transaction.
302     * 
303     * @param context
304     * @param xid
305     * @throws Exception
306     */
307    public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
308        TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET);
309        sendAsyncToSlave(info);
310        super.forgetTransaction(context, xid);
311    }
312
313    /**
314     * Notifiy the Broker that a dispatch will happen
315     * Do in 'pre' so that slave will avoid getting ack before dispatch
316     * similar logic to send() below.
317     * @param messageDispatch
318     */
319    public void preProcessDispatch(MessageDispatch messageDispatch) {
320        super.preProcessDispatch(messageDispatch);
321        MessageDispatchNotification mdn = new MessageDispatchNotification();
322        mdn.setConsumerId(messageDispatch.getConsumerId());
323        mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
324        mdn.setDestination(messageDispatch.getDestination());
325        if (messageDispatch.getMessage() != null) {
326            Message msg = messageDispatch.getMessage();
327            mdn.setMessageId(msg.getMessageId());
328            if (consumers.containsKey(messageDispatch.getConsumerId())) {
329                sendSyncToSlave(mdn);
330            }
331        }
332    }
333
334    /**
335     * @param context
336     * @param message
337     * @throws Exception
338     */
339    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
340        /**
341         * A message can be dispatched before the super.send() method returns so -
342         * here the order is switched to avoid problems on the slave with
343         * receiving acks for messages not received yet
344         * copy ensures we don't mess with the correlator and command ids
345         */
346        sendSyncToSlave(message.copy());
347        super.send(producerExchange, message);
348    }
349
350    /**
351     * @param context
352     * @param ack
353     * @throws Exception
354     */
355    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
356        sendToSlave(ack);
357        super.acknowledge(consumerExchange, ack);
358    }
359
360    public boolean isFaultTolerantConfiguration() {
361        return true;
362    }
363
364    protected void sendToSlave(Message message) {
365        if (message.isResponseRequired()) {
366            sendSyncToSlave(message);
367        } else {
368            sendAsyncToSlave(message);
369        }
370    }
371
372    protected void sendToSlave(MessageAck ack) {
373        if (ack.isResponseRequired()) {
374            sendAsyncToSlave(ack);
375        } else {
376            sendSyncToSlave(ack);
377        }
378    }
379
380    protected void sendAsyncToSlave(Command command) {
381        try {
382            slave.oneway(command);
383        } catch (Throwable e) {
384            LOG.error("Slave Failed", e);
385            stopProcessing();
386        }
387    }
388
389    protected void sendSyncToSlave(Command command) {
390        try {
391            Response response = (Response)slave.request(command);
392            if (response.isException()) {
393                ExceptionResponse er = (ExceptionResponse)response;
394                LOG.error("Slave Failed", er.getException());
395            }
396        } catch (Throwable e) {
397            LOG.error("Slave Failed", e);
398        }
399    }
400}