/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.ejb.plugins.jms; import java.lang.reflect.Method; import java.security.Principal; import java.util.Collection; import java.util.Hashtable; import javax.ejb.EJBHome; import javax.ejb.EJBMetaData; import javax.ejb.EJBObject; import javax.jms.*; import javax.management.MBeanServer; import javax.management.MBeanServerFactory; import javax.management.ObjectName; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.Name; import javax.naming.NamingException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import org.jboss.deployment.DeploymentException; import org.jboss.ejb.Container; import org.jboss.ejb.ContainerInvoker; import org.jboss.ejb.MethodInvocation; import org.jboss.jms.ConnectionFactoryHelper; import org.jboss.jms.asf.ServerSessionPoolFactory; import org.jboss.jms.asf.StdServerSessionPool; import org.jboss.jms.jndi.JMSProviderAdapter; import org.jboss.logging.Logger; import org.jboss.metadata.MessageDrivenMetaData; import org.jboss.metadata.MetaData; import org.jboss.metadata.XmlLoadable; import org.w3c.dom.Element; import org.w3c.dom.Node; /** * ContainerInvoker for JMS MessageDrivenBeans, based on JRMPContainerInvoker. * * @author Peter Antman . * @author Rickard Öberg * @author Sebastien Alborini * * @author Marc Fleury * @author Jason Dillon * @version $Revision: 1.12.4.6 $ */ public class JMSContainerInvoker implements ContainerInvoker, XmlLoadable { // Constants ----------------------------------------------------- /** * {@link MessageListener#onMessage} reference. */ protected static Method ON_MESSAGE; /** * Default destination type. Used when no message-driven-destination is given * in ejb-jar, and a lookup of destinationJNDI from jboss.xml is not * successfull. Default value: javax.jms.Topic. */ protected final static String DEFAULT_DESTINATION_TYPE = "javax.jms.Topic"; // Attributes ---------------------------------------------------- /** * Description of the Field */ protected boolean optimize; // = false; /** * Maximu number provider is allowed to stuff into a session. */ protected int maxMessagesNr = 1; /** * Maximun pool size of server sessions. */ protected int maxPoolSize = 15; /** * Time to wait before retrying to reconnect a lost connection. */ protected long reconnectInterval = 10000; /** * If Dead letter queue should be used or not. */ protected boolean useDLQ = false; /** * JNDI name of the provider adapter. * @see org.jboss.jms.jndi.JMSProviderAdapter */ protected String providerAdapterJNDI; /** * JNDI name of the server session factory. * @see org.jboss.jms.asf.ServerSessionPoolFactory */ protected String serverSessionPoolFactoryJNDI; /** * JMS acknowledge mode, used when session is not XA. */ protected int acknowledgeMode; /** * escription of the Field */ protected boolean isContainerManagedTx; /** * Description of the Field */ protected boolean isNotSupportedTx; /** * The container. */ protected Container container; /** * The JMS connection. */ protected Connection connection; /** * TH JMS connection consumer. */ protected ConnectionConsumer connectionConsumer; /** * Description of the Field */ protected TransactionManager tm; /** * Description of the Field */ protected ServerSessionPool pool; /** * Description of the Field */ protected ExceptionListenerImpl exListener; /** * Description of the Field */ protected String beanName; /** * Dead letter queue handler. */ protected DLQHandler dlqHandler; /** * DLQConfig element from MDBConfig element from jboss.xml. */ protected Element dlqConfig; /** * Instance logger. */ private final Logger log = Logger.getLogger(this.getClass()); // ContainerService implementation ------------------------------- /** * Set the container for which this is an invoker to. * * @param container The container for which this is an invoker to. */ public void setContainer(final Container container) { this.container = container; //jndiName = container.getBeanMetaData().getJndiName(); } // Static -------------------------------------------------------- // Constructors -------------------------------------------------- // Public -------------------------------------------------------- /** * Sets the Optimized attribute of the JMSContainerInvoker object * * @param optimize The new Optimized value */ public void setOptimized(final boolean optimize) { log.debug("Container Invoker optimize set to " + optimize); this.optimize = optimize; } // ContainerInvoker implementation /** * Gets the EJBHome attribute of the JMSContainerInvoker object * * @return The EJBHome value */ public EJBHome getEJBHome() { throw new Error("Not valid for MessageDriven beans"); } /** * Gets the EJBMetaData attribute of the JMSContainerInvoker object * * @return The EJBMetaData value */ public EJBMetaData getEJBMetaData() { throw new Error("Not valid for MessageDriven beans"); } /** * Gets the EntityCollection attribute of the JMSContainerInvoker object * * @param ids Description of Parameter * @return The EntityCollection value */ public Collection getEntityCollection(Collection ids) { throw new Error("Not valid for MessageDriven beans"); } /** * Gets the EntityEJBObject attribute of the JMSContainerInvoker object * * @param id Description of Parameter * @return The EntityEJBObject value */ public EJBObject getEntityEJBObject(Object id) { throw new Error("Not valid for MessageDriven beans"); } /** * Gets the StatefulSessionEJBObject attribute of the JMSContainerInvoker * object * * @param id Description of Parameter * @return The StatefulSessionEJBObject value */ public EJBObject getStatefulSessionEJBObject(Object id) { throw new Error("Not valid for MessageDriven beans"); } /** * Gets the StatelessSessionEJBObject attribute of the JMSContainerInvoker * object * * @return The StatelessSessionEJBObject value */ public EJBObject getStatelessSessionEJBObject() { throw new Error("Not valid for MessageDriven beans"); } /** * Gets the Optimized attribute of the JMSContainerInvoker object * * @return The Optimized value */ public boolean isOptimized() { log.debug("Optimize in action: " + optimize); return optimize; } /** * Take down all fixtures. */ public void destroy() { log.debug("Destroying JMSContainerInvoker for bean " + beanName); // Take down DLQ if ( dlqHandler != null) { dlqHandler.destroy(); } // close the connection consumer try { if (connectionConsumer != null) { connectionConsumer.close(); } } catch (Exception e) { log.error("Could not close consumer", e); } // clear the server session pool (if it is clearable) try { if (pool instanceof StdServerSessionPool) { StdServerSessionPool p = (StdServerSessionPool)pool; p.clear(); } } catch (Exception e) { log.error("Could not clear ServerSessionPool", e); } // close the connection if (connection != null) { try { connection.close(); } catch (Exception e) { log.error("Could not close connection", e); } } } /** * XmlLoadable implementation. * * FIXME - we ought to move all config into MDBConfig, but I do not * do that now due to backward compatibility. * * @param element Description of Parameter * @exception DeploymentException Description of Exception */ public void importXml(Element element) throws DeploymentException { try { String maxMessages = MetaData.getElementContent (MetaData.getUniqueChild(element, "MaxMessages")); maxMessagesNr = Integer.parseInt(maxMessages); String maxSize = MetaData.getElementContent (MetaData.getUniqueChild(element, "MaximumSize")); maxPoolSize = Integer.parseInt(maxSize); Element mdbConfig = MetaData.getUniqueChild(element, "MDBConfig"); String reconnect = MetaData.getElementContent (MetaData.getUniqueChild(mdbConfig, "ReconnectIntervalSec")); reconnectInterval = Long.parseLong(reconnect)*1000; // Get Dead letter queue config - and save it for later use Element dlqEl = MetaData.getOptionalChild(mdbConfig, "DLQConfig"); if (dlqEl != null) { dlqConfig = (Element)((Node)dlqEl).cloneNode(true); useDLQ = true; } else { useDLQ = false; } } catch (NumberFormatException e) { //Noop will take default value } catch (DeploymentException e) { //Noop will take default value } // If these are not found we will get a DeploymentException, I hope providerAdapterJNDI = MetaData.getElementContent (MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI")); serverSessionPoolFactoryJNDI = MetaData.getElementContent (MetaData.getUniqueChild(element, "ServerSessionPoolFactoryJNDI")); // Check java:/ prefix if (!providerAdapterJNDI.startsWith("java:/")) { providerAdapterJNDI = "java:/" + providerAdapterJNDI; } if (!serverSessionPoolFactoryJNDI.startsWith("java:/")) { serverSessionPoolFactoryJNDI = "java:/" + serverSessionPoolFactoryJNDI; } } /** * Initialize the container invoker. Sets up a connection, a server session * pool and a connection consumer for the configured destination. * * @throws Exception Failed to initalize. */ public void init() throws Exception { log.debug("initializing"); // Set up Dead Letter Queue handler if (useDLQ) { dlqHandler = new DLQHandler(); dlqHandler.importXml(dlqConfig); dlqHandler.init(); } // Store TM reference locally - should we test for CMT Required tm = container.getTransactionManager(); // Get configuration information - from EJB-xml MessageDrivenMetaData config = ((MessageDrivenMetaData)container.getBeanMetaData()); // Selector String messageSelector = config.getMessageSelector(); // Queue or Topic - optional unfortunately String destinationType = config.getDestinationType(); // Bean Name beanName = config.getEjbName(); // Is container managed? isContainerManagedTx = config.isContainerManagedTx(); acknowledgeMode = config.getAcknowledgeMode(); isNotSupportedTx = config.getMethodTransactionType("onMessage", new Class[] {Message.class}, false) == MetaData.TX_NOT_SUPPORTED; // Get configuration data from jboss.xml String destinationJNDI = config.getDestinationJndiName(); String user = config.getUser(); String password = config.getPasswd(); // Get the JMS provider JMSProviderAdapter adapter = getJMSProviderAdapter(); log.debug("provider adapter: " + adapter); // Connect to the JNDI server and get a reference to root context Context context = adapter.getInitialContext(); log.debug("context: " + context); // if we can't get the root context then exit with an exception if (context == null) { throw new RuntimeException("Failed to get the root context"); } // Get the JNDI suffix of the destination String jndiSuffix = parseJndiSuffix(destinationJNDI, config.getEjbName()); log.debug("jndiSuffix: " + jndiSuffix); // Unfortunately the destination is optional, so if we do not have one // here we have to look it up if we have a destinationJNDI, else give it // a default. if (destinationType == null) { log.info("No message-driven-destination given, guessing type"); destinationType = getDestinationType(context, destinationJNDI); } if (destinationType.equals("javax.jms.Topic")) { log.debug("Got destination type Topic for " + config.getEjbName()); // create a topic connection Object factory = context.lookup(adapter.getTopicFactoryRef()); TopicConnection tConnection = (TopicConnection)ConnectionFactoryHelper.createTopicConnection (factory, user, password); connection = tConnection; // lookup or create the destination topic Topic topic = (Topic)createDestination(Topic.class, context, "topic/" + jndiSuffix, jndiSuffix); // set up the server session pool pool = createSessionPool(tConnection, maxPoolSize, true, // tx acknowledgeMode , new MessageListenerImpl(this)); // To be no-durable or durable if (config.getSubscriptionDurability() != MessageDrivenMetaData.DURABLE_SUBSCRIPTION) { // Create non durable connectionConsumer = tConnection.createConnectionConsumer(topic, messageSelector, pool, maxMessagesNr); } else { //Durable subscription String clientId = config.getClientId(); String durableName = clientId != null ? clientId : config.getEjbName(); connectionConsumer = tConnection.createDurableConnectionConsumer(topic, durableName, messageSelector, pool, maxMessagesNr); } log.debug("Topic connectionConsumer set up"); } else if (destinationType.equals("javax.jms.Queue")) { log.debug("Got destination type Queue for " + config.getEjbName()); // create a queue connection Object qFactory = context.lookup(adapter.getQueueFactoryRef()); QueueConnection qConnection = (QueueConnection)ConnectionFactoryHelper.createQueueConnection (qFactory, user, password); connection = qConnection; // lookup or create the destination queue Queue queue = (Queue)createDestination(Queue.class, context, "queue/" + jndiSuffix, jndiSuffix); // set up the server session pool pool = createSessionPool(qConnection, maxPoolSize, true, // tx acknowledgeMode, new MessageListenerImpl(this)); log.debug("server session pool: " + pool); // create the connection consumer connectionConsumer = qConnection.createConnectionConsumer(queue, messageSelector, pool, maxMessagesNr); log.debug("connection consumer: " + connectionConsumer); } log.debug("initialized with config " + toString()); } /** * #Description of the Method * * @param id Description of Parameter * @param m Description of Parameter * @param args Description of Parameter * @param tx Description of Parameter * @param identity Description of Parameter * @param credential Description of Parameter * @return Description of the Returned Value * @exception Exception Description of Exception */ public Object invoke(Object id, Method m, Object[] args, Transaction tx, Principal identity, Object credential) throws Exception { MethodInvocation mi = new MethodInvocation(id, m, args, tx, identity, credential); // Set the right context classloader ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(container.getClassLoader()); try { return container.invoke(mi); } finally { Thread.currentThread().setContextClassLoader(oldCl); } } /** * Start the connection. * * @exception Exception Description of Exception */ public void start() throws Exception { log.debug("Starting JMSContainerInvoker for bean " + beanName); exListener = new ExceptionListenerImpl(this); connection.setExceptionListener(exListener); connection.start(); } /** * Stop the connection. */ public void stop() { log.debug("Stopping JMSContainerInvoker for bean " + beanName); // Silence the exception listener if (exListener != null) { exListener.stop(); } innerStop(); } /** * Try to get a destination type by looking up the destination JNDI, or * provide a default if there is not destinationJNDI or if it is not possible * to lookup. * * @param ctx The naming context to lookup destinations from. * @param destinationJNDI The name to use when looking up destinations. * @return The destination type, either derived from * destinationJDNI or DEFAULT_DESTINATION_TYPE */ protected String getDestinationType(Context ctx, String destinationJNDI) { String destType = null; if (destinationJNDI != null) { try { Destination dest = (Destination)ctx.lookup(destinationJNDI); if (dest instanceof javax.jms.Topic) { destType = "javax.jms.Topic"; } else if (dest instanceof javax.jms.Queue) { destType = "javax.jms.Queue"; } } catch (NamingException ex) { log.debug("Could not do heristic lookup of destination ", ex); } } if (destType == null) { log.info("WARNING Could not determine destination type, defaults to: " + DEFAULT_DESTINATION_TYPE); destType = DEFAULT_DESTINATION_TYPE; } return destType; } /** * Return the JMSProviderAdapter that should be used. * * @return The JMSProviderAdapter to use. * @exception NamingException Description of Exception */ protected JMSProviderAdapter getJMSProviderAdapter() throws NamingException { Context context = new InitialContext(); try { log.debug("looking up provider adapter: " + providerAdapterJNDI); return (JMSProviderAdapter)context.lookup(providerAdapterJNDI); } finally { context.close(); } } /** * Create and or lookup a JMS destination. * * @param type Either javax.jms.Queue or * javax.jms.Topic. * @param ctx The naming context to lookup * destinations from. * @param jndiName The name to use when looking up * destinations. * @param jndiSuffix The name to use when creating * destinations. * @return The destination. * @throws IllegalArgumentException Type is not Queue or Topic. * @exception Exception Description of Exception */ protected Destination createDestination(final Class type, final Context ctx, final String jndiName, final String jndiSuffix) throws Exception { try { // first try to look it up return (Destination)ctx.lookup(jndiName); } catch (NamingException e) { // if the lookup failes, the try to create it log.warn("destination not found: " + jndiName + " reason: " + e); log.warn("creating a new temporary destination: " + jndiName); // // attempt to create the destination (note, this is very // very, very unportable). // MBeanServer server = (MBeanServer) MBeanServerFactory.findMBeanServer(null).iterator().next(); String methodName; if (type == Topic.class) { methodName = "createTopic"; } else if (type == Queue.class) { methodName = "createQueue"; } else { // type was not a Topic or Queue, bad user throw new IllegalArgumentException ("expected javax.jms.Queue or javax.jms.Topic: " + type); } // invoke the server to create the destination server.invoke(new ObjectName("JBossMQ", "service", "Server"), methodName, new Object[] {jndiSuffix}, new String[] {"java.lang.String"}); // try to look it up again return (Destination)ctx.lookup(jndiName); } } /** * Create a server session pool for the given connection. * * @param connection The connection to use. * @param maxSession The maximum number of sessions. * @param isTransacted True if the sessions are transacted. * @param ack The session acknowledgement mode. * @param listener The message listener. * @return A server session pool. * @throws JMSException * @exception NamingException Description of Exception */ protected ServerSessionPool createSessionPool(final Connection connection, final int maxSession, final boolean isTransacted, final int ack, final MessageListener listener) throws NamingException, JMSException { ServerSessionPool pool; Context context = new InitialContext(); try { // first lookup the factory log.debug("looking up session pool factory: " + serverSessionPoolFactoryJNDI); ServerSessionPoolFactory factory = (ServerSessionPoolFactory) context.lookup(serverSessionPoolFactoryJNDI); // the create the pool pool = factory.getServerSessionPool (connection, maxSession, isTransacted, ack, !isContainerManagedTx || isNotSupportedTx, listener); } finally { context.close(); } return pool; } /** * Stop done from inside, we should not stop the exceptionListener in inner * stop. */ protected void innerStop() { try { if (connection != null) { connection.setExceptionListener(null); log.debug("unset exception listener"); } } catch (Exception e) { log.error("Could not set ExceptionListener to null", e); } // Stop the connection try { if (connection != null) { connection.stop(); log.debug("connection stopped"); } } catch (Exception e) { log.error("Could not stop JMS connection", e); } } /** * Parse the JNDI suffix from the given JNDI name. * * @param jndiname The JNDI name used to lookup the destination. * @param defautSuffix Description of Parameter * @return The parsed suffix or the defaultSuffix */ protected String parseJndiSuffix(final String jndiname, final String defautSuffix) { // jndiSuffix is merely the name that the user has given the MDB. // since the jndi name contains the message type I have to split // at the "/" if there is no slash then I use the entire jndi name... String jndiSuffix = ""; if (jndiname != null) { int indexOfSlash = jndiname.indexOf("/"); if (indexOfSlash != -1) { jndiSuffix = jndiname.substring(indexOfSlash + 1); } else { jndiSuffix = jndiname; } } else { // if the jndi name from jboss.xml is null then lets use the ejbName jndiSuffix = defautSuffix; } return jndiSuffix; } // Package protected --------------------------------------------- // Protected ----------------------------------------------------- // Private ------------------------------------------------------- // Inner classes ------------------------------------------------- /** * An implementation of MessageListener that passes messages on to the * container invoker. */ class MessageListenerImpl implements MessageListener { /** * The container invoker. */ JMSContainerInvoker invoker; // = null; /** * Construct a MessageListenerImpl . * * @param invoker The container invoker. Must not be null. */ MessageListenerImpl(final JMSContainerInvoker invoker) { // assert invoker != null; this.invoker = invoker; } /** * Process a message. * * @param message The message to process. */ public void onMessage(final Message message) { // assert message != null; if (log.isTraceEnabled()) { log.trace("processing message: " + message); } Object id; try { id = message.getJMSMessageID(); } catch (JMSException e) { // what ? id = "JMSContainerInvoker"; } // Invoke, shuld we catch any Exceptions?? try { // DLQHandling if (useDLQ && // Is Dead Letter Queue used at all message.getJMSRedelivered() && // Was message resent dlqHandler.handleRedeliveredMessage(message)) //Did the DLQ handler take care of the message { // Message will be placed on Dead Letter Queue, // if redelivered to many times return; } invoker.invoke(id, // Object id - where used? ON_MESSAGE, // Method to invoke new Object[] {message}, // argument tm.getTransaction(), // Transaction null, // Principal null); // Cred } catch (Exception e) { log.error("Exception in JMSCI message listener", e); } } } /** * ExceptionListener for failover handling. */ class ExceptionListenerImpl implements ExceptionListener { JMSContainerInvoker invoker; // = null; Thread currentThread; // = null; boolean notStoped = true; ExceptionListenerImpl(final JMSContainerInvoker invoker) { this.invoker = invoker; } /** * #Description of the Method * * @param ex Description of Parameter */ public void onException(JMSException ex) { currentThread = Thread.currentThread(); log.warn("MDB lost connection to provider", ex); boolean tryIt = true; while (tryIt && notStoped) { log.info("MDB Trying to reconnect..."); try { try { Thread.sleep(reconnectInterval); } catch (InterruptedException ie) { tryIt = false; return; } // Reboot container invoker.innerStop(); invoker.destroy(); invoker.init(); invoker.start(); tryIt = false; log.info("OK - reconnected"); } catch (Exception e) { log.error("MDB error reconnecting", e); } } currentThread = null; } void stop() { log.debug("stop requested"); notStoped = false; if (currentThread != null) { currentThread.interrupt(); log.debug("current thread interrupted"); } } } /** * Return a string representation of the current config state. */ public String toString() { StringBuffer buff = new StringBuffer(); buff.append("JMSContainerInvoker: {"); buff.append("beanName=").append(beanName); buff.append(";maxMessagesNr=").append(maxMessagesNr); buff.append(";maxPoolSize=").append(maxPoolSize); buff.append(";reconnectInterval=").append(reconnectInterval); buff.append(";providerAdapterJNDI=").append(providerAdapterJNDI); buff.append(";serverSessionPoolFactoryJNDI=").append(serverSessionPoolFactoryJNDI); buff.append(";acknowledgeMode=").append(acknowledgeMode); buff.append(";isContainerManagedTx=").append(isContainerManagedTx); buff.append(";isNotSupportedTx=").append(isNotSupportedTx); buff.append(";useDLQ=").append(useDLQ); if (dlqHandler != null) buff.append(";dlqHandler=").append(dlqHandler.toString()); buff.append("}"); return buff.toString(); } /** * Initialize the ON_MESSAGE reference. */ static { try { final Class type = MessageListener.class; final Class arg = Message.class; ON_MESSAGE = type.getMethod("onMessage", new Class[] {arg}); } catch (Exception e) { e.printStackTrace(); throw new ExceptionInInitializerError(e); } } }