001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.policy; 018 019import java.util.concurrent.atomic.AtomicLong; 020import javax.jms.JMSException; 021import javax.jms.Message; 022import javax.jms.MessageListener; 023import org.apache.activemq.ActiveMQMessageTransformation; 024import org.apache.activemq.broker.Broker; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.region.Destination; 027import org.apache.activemq.broker.region.MessageReference; 028import org.apache.activemq.broker.region.SubscriptionRecovery; 029import org.apache.activemq.broker.region.Topic; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.apache.activemq.command.ActiveMQMessage; 032import org.apache.activemq.command.ConnectionId; 033import org.apache.activemq.command.MessageId; 034import org.apache.activemq.command.ProducerId; 035import org.apache.activemq.command.SessionId; 036import org.apache.activemq.util.IdGenerator; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * This implementation of {@link SubscriptionRecoveryPolicy} will perform a user 042 * specific query mechanism to load any messages they may have missed. 043 * 044 * @org.apache.xbean.XBean 045 * 046 */ 047public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { 048 049 private static final Logger LOG = LoggerFactory.getLogger(QueryBasedSubscriptionRecoveryPolicy.class); 050 051 private MessageQuery query; 052 private final AtomicLong messageSequence = new AtomicLong(0); 053 private final IdGenerator idGenerator = new IdGenerator(); 054 private final ProducerId producerId = createProducerId(); 055 056 public SubscriptionRecoveryPolicy copy() { 057 QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy(); 058 rc.setQuery(query); 059 return rc; 060 } 061 062 public boolean add(ConnectionContext context, MessageReference message) throws Exception { 063 return query.validateUpdate(message.getMessage()); 064 } 065 066 public void recover(final ConnectionContext context, final Topic topic, final SubscriptionRecovery sub) throws Exception { 067 if (query != null) { 068 ActiveMQDestination destination = sub.getActiveMQDestination(); 069 query.execute(destination, new MessageListener() { 070 071 public void onMessage(Message message) { 072 dispatchInitialMessage(message, topic, context, sub); 073 } 074 }); 075 } 076 } 077 078 public void start() throws Exception { 079 if (query == null) { 080 throw new IllegalArgumentException("No query property configured"); 081 } 082 } 083 084 public void stop() throws Exception { 085 } 086 087 public MessageQuery getQuery() { 088 return query; 089 } 090 091 /** 092 * Sets the query strategy to load initial messages 093 */ 094 public void setQuery(MessageQuery query) { 095 this.query = query; 096 } 097 098 public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Exception { 099 return new org.apache.activemq.command.Message[0]; 100 } 101 102 public void setBroker(Broker broker) { 103 } 104 105 protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) { 106 try { 107 ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null); 108 ActiveMQDestination destination = activeMessage.getDestination(); 109 if (destination == null) { 110 destination = sub.getActiveMQDestination(); 111 activeMessage.setDestination(destination); 112 } 113 activeMessage.setRegionDestination(regionDestination); 114 configure(activeMessage); 115 sub.addRecoveredMessage(context, activeMessage); 116 } catch (Throwable e) { 117 LOG.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e); 118 } 119 } 120 121 protected void configure(ActiveMQMessage msg) throws JMSException { 122 long sequenceNumber = messageSequence.incrementAndGet(); 123 msg.setMessageId(new MessageId(producerId, sequenceNumber)); 124 msg.onSend(); 125 msg.setProducerId(producerId); 126 } 127 128 protected ProducerId createProducerId() { 129 String id = idGenerator.generateId(); 130 ConnectionId connectionId = new ConnectionId(id); 131 SessionId sessionId = new SessionId(connectionId, 1); 132 return new ProducerId(sessionId, 1); 133 } 134}